Android 通过Socket 和服务器通讯

Android 通过Socket 和服务器通讯,是一种比较常用的通讯方式,时间比较紧,说下大致的思路,希望能帮到使用socket 进行通信的人

(1)开启一个线程发送消息    SocketOutputThread

      消息是放在队列里的,当有消息后,进入队列,线程唤醒,发送消息,并反馈发送是否成功的回调

(2)开启一个线程接受服务器消息 SocketInputThread

       为了防止一直收数据,浪费电池的电,采用NIO的方式读socket的数据,这个是本文的关键 

(3)开启一个线程,做心跳,防止socket连接终断 , SocketHeartThread

(4)构建 SocketThreadManager对以上三个thread进行管理

(5)构建 TCPClient 发送socket消息

     在NIO的方式实现TCP,特别是在接收服务器的数据,不用写个线程定时去读了。

 

Android 通过Socket 和服务器通讯

DEMO 截图

 

主要代码如下,详细代码在附件里。

SocketOutPutThread 类

package com.example.socketblockdemo;

import java.io.IOException;
import java.io.ObjectOutputStream;
import java.net.Socket;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

import android.os.Bundle;
import android.os.Handler;
import android.os.Message;


/**
 * 客户端写消息线程
 * 
 * @author way
 * 
 */
public class SocketOutputThread extends Thread
{
    private boolean isStart = true;
    private static String tag = "socketOutputThread";
    private List<MsgEntity> sendMsgList;
    
    public SocketOutputThread( )
    {

        sendMsgList = new CopyOnWriteArrayList<MsgEntity>();
    }
    
    public void setStart(boolean isStart)
    {
        this.isStart = isStart;
        synchronized (this)
        {
            notify();
        }
    }

    // 使用socket发送消息
    public boolean sendMsg(byte[] msg) throws Exception
    {
                
        
        if (msg == null)
        {
            CLog.e(tag, "sendMsg is null");
            return false;
        }
        
        try
        {
            TCPClient.instance().sendMsg(msg);
            
        } catch (Exception e)
        {
            throw (e);
        }
        
        return true;
    }
    
    // 使用socket发送消息
    public void addMsgToSendList(MsgEntity msg) 
    {

        synchronized (this)
        {
            this.sendMsgList.add(msg);
            notify();
        }
    }
    
    @Override
    public void run()
    {
        while (isStart)
        {
            // 锁发送list
            synchronized (sendMsgList)
            {
                // 发送消息
                for (MsgEntity msg : sendMsgList)
                {
                    
                    Handler handler = msg.getHandler();
                    try
                    {
                        sendMsg(msg.getBytes());
                        sendMsgList.remove(msg);
                        // 成功消息,通过hander回传
                        if (handler != null)
                        {
                            Message message =  new Message();
                            message.obj = msg.getBytes();
                            message.what =1;
                           handler.sendMessage(message);
                        //    handler.sendEmptyMessage(1);
                        }
                        
                    } catch (Exception e)
                    {
                        e.printStackTrace();
                        CLog.e(tag, e.toString());
                        // 错误消息,通过hander回传
                        if (handler != null)
                        {
                            Message message =  new Message();
                            message.obj = msg.getBytes();
                            message.what = 0;;
                            handler.sendMessage(message);
                        
                        }
                    }
                }
            }
            
            synchronized (this)
            {
                try
                {
                    wait();
                    
                } catch (InterruptedException e)
                {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }// 发送完消息后,线程进入等待状态
            }
        }
        
    }
}

SocketInputThread

package com.example.socketblockdemo;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;


import android.content.Intent;
import android.text.TextUtils;

/**
 * 客户端读消息线程
 * 
 * @author way
 * 
 */
public class SocketInputThread extends Thread
{
    private boolean isStart = true;
    
    private static String tag = "socket";
    
    // private MessageListener messageListener;// 消息监听接口对象
    
    public SocketInputThread()
    {
    }
    
    public void setStart(boolean isStart)
    {
        this.isStart = isStart;
    }
    
    @Override
    public void run()
    {
        while (isStart)
        {
            // 手机能联网,读socket数据
            if (NetManager.instance().isNetworkConnected())
            {
                
                if (!TCPClient.instance().isConnect())
                {
                    CLog.e(tag, "TCPClient connet server is fail read thread sleep second" +Const.SOCKET_SLEEP_SECOND );
                    
                    try
                    {
                        sleep(Const.SOCKET_SLEEP_SECOND * 1000);
                    } catch (InterruptedException e)
                    {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
                
                readSocket();
                
                // 如果连接服务器失败,服务器连接失败,sleep固定的时间,能联网,就不需要sleep
                
                CLog.e("socket","TCPClient.instance().isConnect() " + TCPClient.instance().isConnect() );
                
                
            }
        }
    }
    
    public void readSocket()
    {
        Selector selector = TCPClient.instance().getSelector();
        if (selector == null)
        {
            return;
        }
        try
        {
            // 如果没有数据过来,一直柱塞
            while (selector.select() > 0)
            {
                for (SelectionKey sk : selector.selectedKeys())
                {
                    // 如果该SelectionKey对应的Channel中有可读的数据
                    if (sk.isReadable())
                    {
                        // 使用NIO读取Channel中的数据
                        SocketChannel sc = (SocketChannel) sk.channel();
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        try
                        {
                            sc.read(buffer);
                        } catch (IOException e)
                        {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                            // continue;
                        }
                        buffer.flip();
                        String receivedString = "";
                        // 打印收到的数据
                        try
                        {
                            receivedString = Charset.forName("UTF-8")
                                    .newDecoder().decode(buffer).toString();
                            
                            CLog.e(tag, receivedString);
                            

                            Intent i = new Intent(Const.BC);
                            
                            i.putExtra("response", receivedString);
                            
                            MainActivity.s_context.sendBroadcast(i );
                            
                        } catch (CharacterCodingException e)
                        {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                        buffer.clear();
                        buffer = null;
                        
                        try
                        {
                            // 为下一次读取作准备
                            sk.interestOps(SelectionKey.OP_READ);
                            // 删除正在处理的SelectionKey
                            selector.selectedKeys().remove(sk);
                            
                        } catch (CancelledKeyException e)
                        {
                            e.printStackTrace();
                        }
                        
                    
                    }
                }
            }
            // selector.close();
            // TCPClient.instance().repareRead();
            
        } catch (IOException e1)
        {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        } catch (ClosedSelectorException e2)
        {
        }
    }
    
}

SocketHeartHread 心跳类

package com.example.socketblockdemo;

import java.io.IOException;

import android.text.TextUtils;


class SocketHeartThread extends Thread
{
    boolean isStop = false;
    boolean mIsConnectSocketSuccess = false;
    static SocketHeartThread s_instance;
    
    private TCPClient mTcpClient = null;
    
    static final String tag = "SocketHeartThread";
    
    public static synchronized SocketHeartThread instance()
    {
        if (s_instance == null)
        {
            s_instance = new SocketHeartThread();
        }
        return s_instance;
    }
    
    public SocketHeartThread()
    {
       TCPClient.instance();
                // 连接服务器
    //    mIsConnectSocketSuccess = connect();

    }

    public void stopThread()
    {
        isStop = true;
    }
    
    /**
     * 连接socket到服务器, 并发送初始化的Socket信息
     * 
     * @return
     */
    
    
    private boolean reConnect()
    {
        return TCPClient.instance().reConnect();
    }

    
    public void run()
    {
        isStop = false;
        while (!isStop)
        {
                // 发送一个心跳包看服务器是否正常
                boolean canConnectToServer = TCPClient.instance().canConnectToServer();
                
                if(canConnectToServer == false){
                    reConnect();
                }
                try
                {
                    Thread.sleep(Const.SOCKET_HEART_SECOND * 1000);
                    
                } catch (InterruptedException e)
                {
                    e.printStackTrace();
                }
            }
    }
}
线程管理类

package com.example.socketblockdemo;

import android.os.Handler;
import android.text.TextUtils;


public class SocketThreadManager
{
    
    private static SocketThreadManager s_SocketManager = null;
    
    private SocketInputThread mInputThread = null;
    
    private SocketOutputThread mOutThread = null;
    
    private SocketHeartThread mHeartThread = null;

    
    // 获取单例
    public static SocketThreadManager sharedInstance()
    {
        if (s_SocketManager == null)
        {
            s_SocketManager = new SocketThreadManager();
            s_SocketManager.startThreads();
        }
        return s_SocketManager;
    }
    
    // 单例,不允许在外部构建对象
    private SocketThreadManager()
    {
        mHeartThread = new SocketHeartThread();
        mInputThread = new SocketInputThread();
        mOutThread = new SocketOutputThread();
    }
    
    /**
     * 启动线程
     */
    
    private void startThreads()
    {
        mHeartThread.start();
        mInputThread.start();
        mInputThread.setStart(true);
        mOutThread.start();
        mInputThread.setStart(true);
        // mDnsthread.start();
    }
    
    /**
     * stop线程
     */
    public void stopThreads()
    {
        mHeartThread.stopThread();
        mInputThread.setStart(false);
        mOutThread.setStart(false);
    }
    
    public static void releaseInstance()
    {
        if (s_SocketManager != null)
        {
            s_SocketManager.stopThreads();
            s_SocketManager = null;
        }
    }
    
    public void sendMsg(byte [] buffer, Handler handler)
    {
        MsgEntity entity = new MsgEntity(buffer, handler);
        mOutThread.addMsgToSendList(entity);
    }
    
}

TCPClient ,采用NIO的方式构建

package com.example.socketblockdemo;

import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;


/**
 * NIO TCP 客户端
 * 
 */
public class TCPClient
{
    // 信道选择器
    private Selector selector;
    
    // 与服务器通信的信道
    SocketChannel socketChannel;
    
    // 要连接的服务器Ip地址
    private String hostIp;
    
    // 要连接的远程服务器在监听的端口
    private int hostListenningPort;
    
    private static TCPClient s_Tcp = null;
    
    public boolean isInitialized = false;
    
    public static synchronized TCPClient instance()
    {
        if (s_Tcp == null)
        {
            
            s_Tcp = new TCPClient(Const.SOCKET_SERVER,
                    Const.SOCKET_PORT);
        }
        return s_Tcp;
    }
    
    /**
     * 构造函数
     * 
     * @param HostIp
     * @param HostListenningPort
     * @throws IOException
     */
    public TCPClient(String HostIp, int HostListenningPort)
    {
        this.hostIp = HostIp;
        this.hostListenningPort = HostListenningPort;
        
        try
        {
            initialize();
            this.isInitialized = true;
        } catch (IOException e)
        {
            this.isInitialized = false;
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (Exception e)
        {
            this.isInitialized = false;
            e.printStackTrace();
        }
    }
    
    /**
     * 初始化
     * 
     * @throws IOException
     */
    public void initialize() throws IOException
    {
        boolean done = false;
        
        try
        {
            // 打开监听信道并设置为非阻塞模式
            socketChannel = SocketChannel.open(new InetSocketAddress(hostIp,
                    hostListenningPort));
            if (socketChannel != null)
            {
                socketChannel.socket().setTcpNoDelay(false);
                socketChannel.socket().setKeepAlive(true);
                // 设置 读socket的timeout时间
                socketChannel.socket().setSoTimeout(
                        Const.SOCKET_READ_TIMOUT);
                socketChannel.configureBlocking(false);
                
                // 打开并注册选择器到信道
                selector = Selector.open();
                if (selector != null)
                {
                    socketChannel.register(selector, SelectionKey.OP_READ);
                    done = true;
                }
            }
        } finally
        {
            if (!done && selector != null)
            {
                selector.close();
            }
            if (!done)
            {
                socketChannel.close();
            }
        }
    }
    
    static void blockUntil(SelectionKey key, long timeout) throws IOException
    {
        
        int nkeys = 0;
        if (timeout > 0)
        {
            nkeys = key.selector().select(timeout);
            
        } else if (timeout == 0)
        {
            nkeys = key.selector().selectNow();
        }
        
        if (nkeys == 0)
        {
            throw new SocketTimeoutException();
        }
    }
    
    /**
     * 发送字符串到服务器
     * 
     * @param message
     * @throws IOException
     */
    public void sendMsg(String message) throws IOException
    {
        ByteBuffer writeBuffer = ByteBuffer.wrap(message.getBytes("utf-8"));
        
        if (socketChannel == null)
        {
            throw new IOException();
        }
        socketChannel.write(writeBuffer);
    }
    
    /**
     * 发送数据
     * 
     * @param bytes
     * @throws IOException
     */
    public void sendMsg(byte[] bytes) throws IOException
    {
        ByteBuffer writeBuffer = ByteBuffer.wrap(bytes);
        
        if (socketChannel == null)
        {
            throw new IOException();
        }
        socketChannel.write(writeBuffer);
    }
    
    /**
     * 
     * @return
     */
    public synchronized Selector getSelector()
    {
        return this.selector;
    }
    
    /**
     * Socket连接是否是正常的
     * 
     * @return
     */
    public boolean isConnect()
    {
        boolean isConnect = false;
        if (this.isInitialized)
        {
            isConnect =  this.socketChannel.isConnected();
        }
        return isConnect;
    }
    
    /**
     * 关闭socket 重新连接
     * 
     * @return
     */
    public boolean reConnect()
    {
        closeTCPSocket();
        
        try
        {
            initialize();
            isInitialized = true;
        } catch (IOException e)
        {
            isInitialized = false;
            e.printStackTrace();
        }
        catch (Exception e)
        {
            isInitialized = false;
            e.printStackTrace();
        }
        return isInitialized;
    }
    
    /**
     * 服务器是否关闭,通过发送一个socket信息
     * 
     * @return
     */
    public boolean canConnectToServer()
    {
        try
        {
            if (socketChannel != null)
            {
                socketChannel.socket().sendUrgentData(0xff);
            }
        } catch (IOException e)
        {
            // TODO Auto-generated catch block
            e.printStackTrace();
            return false;
        }
        catch (Exception e){
            e.printStackTrace();
            return false;
        }
        return true;
    }
    
    /**
     * 关闭socket
     */
    public void closeTCPSocket()
    {
        try
        {
            if (socketChannel != null)
            {
                socketChannel.close();
            }
            
        } catch (IOException e)
        {
            
        }
        try
        {
            if (selector != null)
            {
                selector.close();
            }
        } catch (IOException e)
        {
        }
    }
    
    /**
     * 每次读完数据后,需要重新注册selector,读取数据
     */
    public synchronized void repareRead()
    {
        if (socketChannel != null)
        {
            try
            {
                selector = Selector.open();
                socketChannel.register(selector, SelectionKey.OP_READ);
            } catch (ClosedChannelException e)
            {
                e.printStackTrace();
                
            } catch (IOException e)
            {
                e.printStackTrace();
            }
        }
    }
}

如何使用

// 发送消息,失败或者成功的handler
            SocketThreadManager.sharedInstance().sendMsg(str.getBytes(), handler);

代码下载 http://files.cnblogs.com/likwo/SocketBlockDemo.zip

上一篇:圈复杂度评价及工具


下一篇:通过 Consul-Template 实现动态配置Nginx负载服务