Android 通过Socket 和服务器通讯,是一种比较常用的通讯方式,时间比较紧,说下大致的思路,希望能帮到使用socket 进行通信的人
(1)开启一个线程发送消息 SocketOutputThread
消息是放在队列里的,当有消息后,进入队列,线程唤醒,发送消息,并反馈发送是否成功的回调
(2)开启一个线程接受服务器消息 SocketInputThread
为了防止一直收数据,浪费电池的电,采用NIO的方式读socket的数据,这个是本文的关键
(3)开启一个线程,做心跳,防止socket连接终断 , SocketHeartThread
(4)构建 SocketThreadManager对以上三个thread进行管理
(5)构建 TCPClient 发送socket消息
在NIO的方式实现TCP,特别是在接收服务器的数据,不用写个线程定时去读了。
DEMO 截图
主要代码如下,详细代码在附件里。
SocketOutPutThread 类
package com.example.socketblockdemo; import java.io.IOException; import java.io.ObjectOutputStream; import java.net.Socket; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import android.os.Bundle; import android.os.Handler; import android.os.Message; /** * 客户端写消息线程 * * @author way * */ public class SocketOutputThread extends Thread { private boolean isStart = true; private static String tag = "socketOutputThread"; private List<MsgEntity> sendMsgList; public SocketOutputThread( ) { sendMsgList = new CopyOnWriteArrayList<MsgEntity>(); } public void setStart(boolean isStart) { this.isStart = isStart; synchronized (this) { notify(); } } // 使用socket发送消息 public boolean sendMsg(byte[] msg) throws Exception { if (msg == null) { CLog.e(tag, "sendMsg is null"); return false; } try { TCPClient.instance().sendMsg(msg); } catch (Exception e) { throw (e); } return true; } // 使用socket发送消息 public void addMsgToSendList(MsgEntity msg) { synchronized (this) { this.sendMsgList.add(msg); notify(); } } @Override public void run() { while (isStart) { // 锁发送list synchronized (sendMsgList) { // 发送消息 for (MsgEntity msg : sendMsgList) { Handler handler = msg.getHandler(); try { sendMsg(msg.getBytes()); sendMsgList.remove(msg); // 成功消息,通过hander回传 if (handler != null) { Message message = new Message(); message.obj = msg.getBytes(); message.what =1; handler.sendMessage(message); // handler.sendEmptyMessage(1); } } catch (Exception e) { e.printStackTrace(); CLog.e(tag, e.toString()); // 错误消息,通过hander回传 if (handler != null) { Message message = new Message(); message.obj = msg.getBytes(); message.what = 0;; handler.sendMessage(message); } } } } synchronized (this) { try { wait(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); }// 发送完消息后,线程进入等待状态 } } } }
SocketInputThread
package com.example.socketblockdemo; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.CancelledKeyException; import java.nio.channels.ClosedSelectorException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; import android.content.Intent; import android.text.TextUtils; /** * 客户端读消息线程 * * @author way * */ public class SocketInputThread extends Thread { private boolean isStart = true; private static String tag = "socket"; // private MessageListener messageListener;// 消息监听接口对象 public SocketInputThread() { } public void setStart(boolean isStart) { this.isStart = isStart; } @Override public void run() { while (isStart) { // 手机能联网,读socket数据 if (NetManager.instance().isNetworkConnected()) { if (!TCPClient.instance().isConnect()) { CLog.e(tag, "TCPClient connet server is fail read thread sleep second" +Const.SOCKET_SLEEP_SECOND ); try { sleep(Const.SOCKET_SLEEP_SECOND * 1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } readSocket(); // 如果连接服务器失败,服务器连接失败,sleep固定的时间,能联网,就不需要sleep CLog.e("socket","TCPClient.instance().isConnect() " + TCPClient.instance().isConnect() ); } } } public void readSocket() { Selector selector = TCPClient.instance().getSelector(); if (selector == null) { return; } try { // 如果没有数据过来,一直柱塞 while (selector.select() > 0) { for (SelectionKey sk : selector.selectedKeys()) { // 如果该SelectionKey对应的Channel中有可读的数据 if (sk.isReadable()) { // 使用NIO读取Channel中的数据 SocketChannel sc = (SocketChannel) sk.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); try { sc.read(buffer); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); // continue; } buffer.flip(); String receivedString = ""; // 打印收到的数据 try { receivedString = Charset.forName("UTF-8") .newDecoder().decode(buffer).toString(); CLog.e(tag, receivedString); Intent i = new Intent(Const.BC); i.putExtra("response", receivedString); MainActivity.s_context.sendBroadcast(i ); } catch (CharacterCodingException e) { // TODO Auto-generated catch block e.printStackTrace(); } buffer.clear(); buffer = null; try { // 为下一次读取作准备 sk.interestOps(SelectionKey.OP_READ); // 删除正在处理的SelectionKey selector.selectedKeys().remove(sk); } catch (CancelledKeyException e) { e.printStackTrace(); } } } } // selector.close(); // TCPClient.instance().repareRead(); } catch (IOException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } catch (ClosedSelectorException e2) { } } }
SocketHeartHread 心跳类
package com.example.socketblockdemo; import java.io.IOException; import android.text.TextUtils; class SocketHeartThread extends Thread { boolean isStop = false; boolean mIsConnectSocketSuccess = false; static SocketHeartThread s_instance; private TCPClient mTcpClient = null; static final String tag = "SocketHeartThread"; public static synchronized SocketHeartThread instance() { if (s_instance == null) { s_instance = new SocketHeartThread(); } return s_instance; } public SocketHeartThread() { TCPClient.instance(); // 连接服务器 // mIsConnectSocketSuccess = connect(); } public void stopThread() { isStop = true; } /** * 连接socket到服务器, 并发送初始化的Socket信息 * * @return */ private boolean reConnect() { return TCPClient.instance().reConnect(); } public void run() { isStop = false; while (!isStop) { // 发送一个心跳包看服务器是否正常 boolean canConnectToServer = TCPClient.instance().canConnectToServer(); if(canConnectToServer == false){ reConnect(); } try { Thread.sleep(Const.SOCKET_HEART_SECOND * 1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }线程管理类
package com.example.socketblockdemo; import android.os.Handler; import android.text.TextUtils; public class SocketThreadManager { private static SocketThreadManager s_SocketManager = null; private SocketInputThread mInputThread = null; private SocketOutputThread mOutThread = null; private SocketHeartThread mHeartThread = null; // 获取单例 public static SocketThreadManager sharedInstance() { if (s_SocketManager == null) { s_SocketManager = new SocketThreadManager(); s_SocketManager.startThreads(); } return s_SocketManager; } // 单例,不允许在外部构建对象 private SocketThreadManager() { mHeartThread = new SocketHeartThread(); mInputThread = new SocketInputThread(); mOutThread = new SocketOutputThread(); } /** * 启动线程 */ private void startThreads() { mHeartThread.start(); mInputThread.start(); mInputThread.setStart(true); mOutThread.start(); mInputThread.setStart(true); // mDnsthread.start(); } /** * stop线程 */ public void stopThreads() { mHeartThread.stopThread(); mInputThread.setStart(false); mOutThread.setStart(false); } public static void releaseInstance() { if (s_SocketManager != null) { s_SocketManager.stopThreads(); s_SocketManager = null; } } public void sendMsg(byte [] buffer, Handler handler) { MsgEntity entity = new MsgEntity(buffer, handler); mOutThread.addMsgToSendList(entity); } }
TCPClient ,采用NIO的方式构建
package com.example.socketblockdemo; import java.io.IOException; import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; /** * NIO TCP 客户端 * */ public class TCPClient { // 信道选择器 private Selector selector; // 与服务器通信的信道 SocketChannel socketChannel; // 要连接的服务器Ip地址 private String hostIp; // 要连接的远程服务器在监听的端口 private int hostListenningPort; private static TCPClient s_Tcp = null; public boolean isInitialized = false; public static synchronized TCPClient instance() { if (s_Tcp == null) { s_Tcp = new TCPClient(Const.SOCKET_SERVER, Const.SOCKET_PORT); } return s_Tcp; } /** * 构造函数 * * @param HostIp * @param HostListenningPort * @throws IOException */ public TCPClient(String HostIp, int HostListenningPort) { this.hostIp = HostIp; this.hostListenningPort = HostListenningPort; try { initialize(); this.isInitialized = true; } catch (IOException e) { this.isInitialized = false; // TODO Auto-generated catch block e.printStackTrace(); } catch (Exception e) { this.isInitialized = false; e.printStackTrace(); } } /** * 初始化 * * @throws IOException */ public void initialize() throws IOException { boolean done = false; try { // 打开监听信道并设置为非阻塞模式 socketChannel = SocketChannel.open(new InetSocketAddress(hostIp, hostListenningPort)); if (socketChannel != null) { socketChannel.socket().setTcpNoDelay(false); socketChannel.socket().setKeepAlive(true); // 设置 读socket的timeout时间 socketChannel.socket().setSoTimeout( Const.SOCKET_READ_TIMOUT); socketChannel.configureBlocking(false); // 打开并注册选择器到信道 selector = Selector.open(); if (selector != null) { socketChannel.register(selector, SelectionKey.OP_READ); done = true; } } } finally { if (!done && selector != null) { selector.close(); } if (!done) { socketChannel.close(); } } } static void blockUntil(SelectionKey key, long timeout) throws IOException { int nkeys = 0; if (timeout > 0) { nkeys = key.selector().select(timeout); } else if (timeout == 0) { nkeys = key.selector().selectNow(); } if (nkeys == 0) { throw new SocketTimeoutException(); } } /** * 发送字符串到服务器 * * @param message * @throws IOException */ public void sendMsg(String message) throws IOException { ByteBuffer writeBuffer = ByteBuffer.wrap(message.getBytes("utf-8")); if (socketChannel == null) { throw new IOException(); } socketChannel.write(writeBuffer); } /** * 发送数据 * * @param bytes * @throws IOException */ public void sendMsg(byte[] bytes) throws IOException { ByteBuffer writeBuffer = ByteBuffer.wrap(bytes); if (socketChannel == null) { throw new IOException(); } socketChannel.write(writeBuffer); } /** * * @return */ public synchronized Selector getSelector() { return this.selector; } /** * Socket连接是否是正常的 * * @return */ public boolean isConnect() { boolean isConnect = false; if (this.isInitialized) { isConnect = this.socketChannel.isConnected(); } return isConnect; } /** * 关闭socket 重新连接 * * @return */ public boolean reConnect() { closeTCPSocket(); try { initialize(); isInitialized = true; } catch (IOException e) { isInitialized = false; e.printStackTrace(); } catch (Exception e) { isInitialized = false; e.printStackTrace(); } return isInitialized; } /** * 服务器是否关闭,通过发送一个socket信息 * * @return */ public boolean canConnectToServer() { try { if (socketChannel != null) { socketChannel.socket().sendUrgentData(0xff); } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); return false; } catch (Exception e){ e.printStackTrace(); return false; } return true; } /** * 关闭socket */ public void closeTCPSocket() { try { if (socketChannel != null) { socketChannel.close(); } } catch (IOException e) { } try { if (selector != null) { selector.close(); } } catch (IOException e) { } } /** * 每次读完数据后,需要重新注册selector,读取数据 */ public synchronized void repareRead() { if (socketChannel != null) { try { selector = Selector.open(); socketChannel.register(selector, SelectionKey.OP_READ); } catch (ClosedChannelException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } } }
如何使用
// 发送消息,失败或者成功的handler SocketThreadManager.sharedInstance().sendMsg(str.getBytes(), handler);
代码下载 http://files.cnblogs.com/likwo/SocketBlockDemo.zip