基于Java Socket的自定义协议,实现Android与服务器的长连接(二)

在阅读本文前需要对socket以及自定义协议有一个基本的了解,可以先查看上一篇文章《基于Java Socket的自定义协议,实现Android与服务器的长连接(一)》学习相关的基础知识点。

基于Java Socket的自定义协议,实现Android与服务器的长连接(二)

一、协议定义

上一篇文章中,我们对socket编程和自定义协议做了一个简单的了解,本文将在此基础上加以深入,来实现Android和服务器之间的长连接,现定义协议如下:

  • 数据类协议(Data)
    • 长度(length,32bit)
    • 版本号(version,8bit,前3位预留,后5位用于表示真正的版本号)
    • 数据类型(type,8bit,0表示数据)
    • 业务类型(pattion,8bit,0表示push,其他暂未定)
    • 数据格式(dtype,8bit,0表示json,其他暂未定)
    • 消息id(msgId,32bit)
    • 正文数据(data)
  • 数据ack类协议(DataAck)
    • 长度(length,32bit)
    • 版本号(version,8bit,前3位预留,后5位用于表示真正的版本号)
    • 数据类型(type,8bit,1表示数据ack)
    • ack消息id(ackMsgId,32bit)
    • 预留信息(unused)
  • 心跳类协议(ping)
    • 长度(length,32bit)
    • 版本号(version,8bit,前3位预留,后5位用于表示真正的版本号)
    • 数据类型(type,8bit,2表示心跳)
    • 心跳id(pingId,32bit,client上报取奇数,即1,3,5...,server下发取偶数,即0,2,4...)
    • 预留信息(unused)
  • 心跳ack类协议(pingAck)
    • 长度(length,32bit)
    • 版本号(version,8bit,前3位预留,后5位用于表示真正的版本号)
    • 数据类型(type,8bit,3表示心跳ack)
    • ack心跳id(pingId,32bit,client上报取奇数,即1,3,5...,server下发取偶数,即0,2,4...)
    • 预留信息(unused)

二、协议实现

从上述的协议定义中,我们可以看出,四种协议有共同的3个要素,分别是:长度、版本号、数据类型,那么我们可以先抽象出一个基本的协议,如下:

1. BasicProtocol


  1. import android.util.Log; 
  2.  
  3. import com.shandiangou.sdgprotocol.lib.Config; 
  4. import com.shandiangou.sdgprotocol.lib.ProtocolException; 
  5. import com.shandiangou.sdgprotocol.lib.SocketUtil; 
  6.  
  7. import java.io.ByteArrayOutputStream; 
  8.  
  9. /** 
  10.  * Created by meishan on 16/12/1. 
  11.  * <p> 
  12.  * 协议类型: 0表示数据,1表示数据Ack,2表示ping,3表示pingAck 
  13.  */ 
  14. public abstract class BasicProtocol { 
  15.  
  16.     // 长度均以字节(byte)为单位 
  17.     public static final int LENGTH_LEN = 4;       //记录整条数据长度数值的长度 
  18.     protected static final int VER_LEN = 1;       //协议的版本长度(其中前3位作为预留位,后5位作为版本号) 
  19.     protected static final int TYPE_LEN = 1;      //协议的数据类型长度 
  20.  
  21.     private int reserved = 0;                     //预留信息 
  22.     private int version = Config.VERSION;         //版本号 
  23.  
  24.     /** 
  25.      * 获取整条数据长度 
  26.      * 单位:字节(byte) 
  27.      * 
  28.      * @return 
  29.      */ 
  30.     protected int getLength() { 
  31.         return LENGTH_LEN + VER_LEN + TYPE_LEN; 
  32.     } 
  33.  
  34.     public int getReserved() { 
  35.         return reserved; 
  36.     } 
  37.  
  38.     public void setReserved(int reserved) { 
  39.         this.reserved = reserved; 
  40.     } 
  41.  
  42.     public int getVersion() { 
  43.         return version; 
  44.     } 
  45.  
  46.     public void setVersion(int version) { 
  47.         this.version = version; 
  48.     } 
  49.  
  50.     /** 
  51.      * 获取协议类型,由子类实现 
  52.      * 
  53.      * @return 
  54.      */ 
  55.     public abstract int getProtocolType(); 
  56.      
  57.     /** 
  58.      * 由预留值和版本号计算完整版本号的byte[]值 
  59.      * 
  60.      * @return 
  61.      */ 
  62.     private int getVer(byte r, byte v, int vLen) { 
  63.         int num = 0; 
  64.         int rLen = 8 - vLen; 
  65.         for (int i = 0; i < rLen; i++) { 
  66.             num += (((r >> (rLen - 1 - i)) & 0x1) << (7 - i)); 
  67.         } 
  68.         return num + v; 
  69.     } 
  70.  
  71.     /** 
  72.      * 拼接发送数据,此处拼接了协议版本、协议类型和数据长度,具体内容子类中再拼接 
  73.      * 按顺序拼接 
  74.      * 
  75.      * @return 
  76.      */ 
  77.     public byte[] genContentData() { 
  78.         byte[] length = SocketUtil.int2ByteArrays(getLength()); 
  79.         byte reserved = (byte) getReserved(); 
  80.         byte version = (byte) getVersion(); 
  81.         byte[] ver = {(byte) getVer(reserved, version, 5)}; 
  82.         byte[] type = {(byte) getProtocolType()}; 
  83.  
  84.         ByteArrayOutputStream baos = new ByteArrayOutputStream(LENGTH_LEN + VER_LEN + TYPE_LEN); 
  85.         baos.write(length, 0, LENGTH_LEN); 
  86.         baos.write(ver, 0, VER_LEN); 
  87.         baos.write(type, 0, TYPE_LEN); 
  88.         return baos.toByteArray(); 
  89.     } 
  90.  
  91.     /** 
  92.      * 解析出整条数据长度 
  93.      * 
  94.      * @param data 
  95.      * @return 
  96.      */ 
  97.     protected int parseLength(byte[] data) { 
  98.         return SocketUtil.byteArrayToInt(data, 0, LENGTH_LEN); 
  99.     } 
  100.  
  101.     /** 
  102.      * 解析出预留位 
  103.      * 
  104.      * @param data 
  105.      * @return 
  106.      */ 
  107.     protected int parseReserved(byte[] data) { 
  108.         byte r = data[LENGTH_LEN];//前4个字节(0,1,2,3)为数据长度的int值,与版本号组成一个字节 
  109.         return (r >> 5) & 0xFF; 
  110.     } 
  111.  
  112.     /** 
  113.      * 解析出版本号 
  114.      * 
  115.      * @param data 
  116.      * @return 
  117.      */ 
  118.     protected int parseVersion(byte[] data) { 
  119.         byte v = data[LENGTH_LEN]; //与预留位组成一个字节 
  120.         return ((v << 3) & 0xFF) >> 3; 
  121.     } 
  122.  
  123.     /** 
  124.      * 解析出协议类型 
  125.      * 
  126.      * @param data 
  127.      * @return 
  128.      */ 
  129.     public static int parseType(byte[] data) { 
  130.         byte t = data[LENGTH_LEN + VER_LEN];//前4个字节(0,1,2,3)为数据长度的int值,以及ver占一个字节 
  131.         return t & 0xFF; 
  132.     } 
  133.  
  134.     /** 
  135.      * 解析接收数据,此处解析了协议版本、协议类型和数据长度,具体内容子类中再解析 
  136.      * 
  137.      * @param data 
  138.      * @return 
  139.      * @throws ProtocolException 协议版本不一致,抛出异常 
  140.      */ 
  141.     public int parseContentData(byte[] data) throws ProtocolException { 
  142.         int reserved = parseReserved(data); 
  143.         int version = parseVersion(data); 
  144.         int protocolType = parseType(data); 
  145.         if (version != getVersion()) { 
  146.             throw new ProtocolException("input version is error: " + version); 
  147.         } 
  148.         return LENGTH_LEN + VER_LEN + TYPE_LEN; 
  149.     } 
  150.  
  151.     @Override 
  152.     public String toString() { 
  153.         return "Version: " + getVersion() + ", Type: " + getProtocolType(); 
  154.     } 
  155.  

上述涉及到的Config类和SocketUtil类如下:


  1. /** 
  2.  * Created by meishan on 16/12/2. 
  3.  */ 
  4. public class Config { 
  5.  
  6.     public static final int VERSION = 1;                 //协议版本号 
  7.     public static final String ADDRESS = "10.17.64.237"; //服务器地址 
  8.     public static final int PORT = 9013;                 //服务器端口号 
  9.      
  10.  

  1. import java.io.BufferedInputStream; 
  2. import java.io.BufferedOutputStream; 
  3. import java.io.IOException; 
  4. import java.io.InputStream; 
  5. import java.io.OutputStream; 
  6. import java.nio.ByteBuffer; 
  7. import java.util.HashMap; 
  8. import java.util.Map; 
  9.  
  10. /** 
  11.  * Created by meishan on 16/12/1. 
  12.  */ 
  13. public class SocketUtil { 
  14.  
  15.     private static Map<Integer, String> msgImp = new HashMap<>(); 
  16.  
  17.     static { 
  18.         msgImp.put(DataProtocol.PROTOCOL_TYPE, "com.shandiangou.sdgprotocol.lib.protocol.DataProtocol");       //0 
  19.         msgImp.put(DataAckProtocol.PROTOCOL_TYPE, "com.shandiangou.sdgprotocol.lib.protocol.DataAckProtocol"); //1 
  20.         msgImp.put(PingProtocol.PROTOCOL_TYPE, "com.shandiangou.sdgprotocol.lib.protocol.PingProtocol");       //2 
  21.         msgImp.put(PingAckProtocol.PROTOCOL_TYPE, "com.shandiangou.sdgprotocol.lib.protocol.PingAckProtocol"); //3 
  22.     } 
  23.  
  24.     /** 
  25.      * 解析数据内容 
  26.      * 
  27.      * @param data 
  28.      * @return 
  29.      */ 
  30.     public static BasicProtocol parseContentMsg(byte[] data) { 
  31.         int protocolType = BasicProtocol.parseType(data); 
  32.         String className = msgImp.get(protocolType); 
  33.         BasicProtocol basicProtocol; 
  34.         try { 
  35.             basicProtocol = (BasicProtocol) Class.forName(className).newInstance(); 
  36.             basicProtocol.parseContentData(data); 
  37.         } catch (Exception e) { 
  38.             basicProtocol = null
  39.             e.printStackTrace(); 
  40.         } 
  41.         return basicProtocol; 
  42.     } 
  43.  
  44.     /** 
  45.      * 读数据 
  46.      * 
  47.      * @param inputStream 
  48.      * @return 
  49.      * @throws SocketExceptions 
  50.      */ 
  51.     public static BasicProtocol readFromStream(InputStream inputStream) { 
  52.         BasicProtocol protocol; 
  53.         BufferedInputStream bis; 
  54.          
  55.         //header中保存的是整个数据的长度值,4个字节表示。在下述write2Stream方法中,会先写入header 
  56.         byte[] header = new byte[BasicProtocol.LENGTH_LEN]; 
  57.  
  58.         try { 
  59.             bis = new BufferedInputStream(inputStream); 
  60.  
  61.             int temp
  62.             int len = 0; 
  63.             while (len < header.length) { 
  64.                 temp = bis.read(header, len, header.length - len); 
  65.                 if (temp > 0) { 
  66.                     len += temp
  67.                 } else if (temp == -1) { 
  68.                     bis.close(); 
  69.                     return null
  70.                 } 
  71.             } 
  72.  
  73.             len = 0; 
  74.             int length = byteArrayToInt(header);//数据的长度值 
  75.             byte[] content = new byte[length]; 
  76.             while (len < length) { 
  77.                 temp = bis.read(content, len, length - len); 
  78.  
  79.                 if (temp > 0) { 
  80.                     len += temp
  81.                 } 
  82.             } 
  83.  
  84.             protocol = parseContentMsg(content); 
  85.         } catch (IOException e) { 
  86.             e.printStackTrace(); 
  87.             return null
  88.         } 
  89.  
  90.         return protocol; 
  91.     } 
  92.  
  93.     /** 
  94.      * 写数据 
  95.      * 
  96.      * @param protocol 
  97.      * @param outputStream 
  98.      */ 
  99.     public static void write2Stream(BasicProtocol protocol, OutputStream outputStream) { 
  100.         BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(outputStream); 
  101.         byte[] buffData = protocol.genContentData(); 
  102.         byte[] header = int2ByteArrays(buffData.length); 
  103.         try { 
  104.             bufferedOutputStream.write(header); 
  105.             bufferedOutputStream.write(buffData); 
  106.             bufferedOutputStream.flush(); 
  107.         } catch (IOException e) { 
  108.             e.printStackTrace(); 
  109.         } 
  110.     } 
  111.  
  112.     /** 
  113.      * 关闭输入流 
  114.      * 
  115.      * @param is 
  116.      */ 
  117.     public static void closeInputStream(InputStream is) { 
  118.         try { 
  119.             if (is != null) { 
  120.                 is.close(); 
  121.             } 
  122.         } catch (IOException e) { 
  123.             e.printStackTrace(); 
  124.         } 
  125.     } 
  126.  
  127.     /** 
  128.      * 关闭输出流 
  129.      * 
  130.      * @param os 
  131.      */ 
  132.     public static void closeOutputStream(OutputStream os) { 
  133.         try { 
  134.             if (os != null) { 
  135.                 os.close(); 
  136.             } 
  137.         } catch (IOException e) { 
  138.             e.printStackTrace(); 
  139.         } 
  140.     } 
  141.  
  142.     public static byte[] int2ByteArrays(int i) { 
  143.         byte[] result = new byte[4]; 
  144.         result[0] = (byte) ((i >> 24) & 0xFF); 
  145.         result[1] = (byte) ((i >> 16) & 0xFF); 
  146.         result[2] = (byte) ((i >> 8) & 0xFF); 
  147.         result[3] = (byte) (i & 0xFF); 
  148.         return result; 
  149.     } 
  150.  
  151.     public static int byteArrayToInt(byte[] b) { 
  152.         int intValue = 0; 
  153.         for (int i = 0; i < b.length; i++) { 
  154.             intValue += (b[i] & 0xFF) << (8 * (3 - i)); //int占4个字节(0,1,2,3) 
  155.         } 
  156.         return intValue; 
  157.     } 
  158.  
  159.     public static int byteArrayToInt(byte[] b, int byteOffset, int byteCount) { 
  160.         int intValue = 0; 
  161.         for (int i = byteOffset; i < (byteOffset + byteCount); i++) { 
  162.             intValue += (b[i] & 0xFF) << (8 * (3 - (i - byteOffset))); 
  163.         } 
  164.         return intValue; 
  165.     } 
  166.  
  167.     public static int bytes2Int(byte[] b, int byteOffset) { 
  168.         ByteBuffer byteBuffer = ByteBuffer.allocate(Integer.SIZE / Byte.SIZE); 
  169.         byteBuffer.put(b, byteOffset, 4); //占4个字节 
  170.         byteBuffer.flip(); 
  171.         return byteBuffer.getInt(); 
  172.     } 
  173.  

接下来我们实现具体的协议。

2. DataProtocol


  1. import android.util.Log; 
  2.  
  3. import com.shandiangou.sdgprotocol.lib.ProtocolException; 
  4. import com.shandiangou.sdgprotocol.lib.SocketUtil; 
  5.  
  6. import java.io.ByteArrayOutputStream; 
  7. import java.io.Serializable
  8. import java.io.UnsupportedEncodingException; 
  9.  
  10. /** 
  11.  * Created by meishan on 16/12/1. 
  12.  */ 
  13. public class DataProtocol extends BasicProtocol implements Serializable { 
  14.  
  15.     public static final int PROTOCOL_TYPE = 0; 
  16.  
  17.     private static final int PATTION_LEN = 1; 
  18.     private static final int DTYPE_LEN = 1; 
  19.     private static final int MSGID_LEN = 4; 
  20.  
  21.     private int pattion; 
  22.     private int dtype; 
  23.     private int msgId; 
  24.  
  25.     private String data; 
  26.  
  27.     @Override 
  28.     public int getLength() { 
  29.         return super.getLength() + PATTION_LEN + DTYPE_LEN + MSGID_LEN + data.getBytes().length; 
  30.     } 
  31.  
  32.     @Override 
  33.     public int getProtocolType() { 
  34.         return PROTOCOL_TYPE; 
  35.     } 
  36.  
  37.     public int getPattion() { 
  38.         return pattion; 
  39.     } 
  40.  
  41.     public void setPattion(int pattion) { 
  42.         this.pattion = pattion; 
  43.     } 
  44.  
  45.     public int getDtype() { 
  46.         return dtype; 
  47.     } 
  48.  
  49.     public void setDtype(int dtype) { 
  50.         this.dtype = dtype; 
  51.     } 
  52.  
  53.     public void setMsgId(int msgId) { 
  54.         this.msgId = msgId; 
  55.     } 
  56.  
  57.     public int getMsgId() { 
  58.         return msgId; 
  59.     } 
  60.  
  61.     public String getData() { 
  62.         return data; 
  63.     } 
  64.  
  65.     public void setData(String data) { 
  66.         this.data = data; 
  67.     } 
  68.  
  69.     /** 
  70.      * 拼接发送数据 
  71.      * 
  72.      * @return 
  73.      */ 
  74.     @Override 
  75.     public byte[] genContentData() { 
  76.         byte[] base = super.genContentData(); 
  77.         byte[] pattion = {(byte) this.pattion}; 
  78.         byte[] dtype = {(byte) this.dtype}; 
  79.         byte[] msgid = SocketUtil.int2ByteArrays(this.msgId); 
  80.         byte[] data = this.data.getBytes(); 
  81.  
  82.         ByteArrayOutputStream baos = new ByteArrayOutputStream(getLength()); 
  83.         baos.write(base, 0, base.length);          //协议版本+数据类型+数据长度+消息id 
  84.         baos.write(pattion, 0, PATTION_LEN);       //业务类型 
  85.         baos.write(dtype, 0, DTYPE_LEN);           //业务数据格式 
  86.         baos.write(msgid, 0, MSGID_LEN);           //消息id 
  87.         baos.write(data, 0, data.length);          //业务数据 
  88.         return baos.toByteArray(); 
  89.     } 
  90.  
  91.     /** 
  92.      * 解析接收数据,按顺序解析 
  93.      * 
  94.      * @param data 
  95.      * @return 
  96.      * @throws ProtocolException 
  97.      */ 
  98.     @Override 
  99.     public int parseContentData(byte[] data) throws ProtocolException { 
  100.         int pos = super.parseContentData(data); 
  101.  
  102.         //解析pattion 
  103.         pattion = data[pos] & 0xFF; 
  104.         pos += PATTION_LEN; 
  105.  
  106.         //解析dtype 
  107.         dtype = data[pos] & 0xFF; 
  108.         pos += DTYPE_LEN; 
  109.  
  110.         //解析msgId 
  111.         msgId = SocketUtil.byteArrayToInt(data, pos, MSGID_LEN); 
  112.         pos += MSGID_LEN; 
  113.  
  114.         //解析data 
  115.         try { 
  116.             this.data = new String(data, pos, data.length - pos, "utf-8"); 
  117.         } catch (UnsupportedEncodingException e) { 
  118.             e.printStackTrace(); 
  119.         } 
  120.  
  121.         return pos; 
  122.     } 
  123.  
  124.     @Override 
  125.     public String toString() { 
  126.         return "data: " + data; 
  127.     } 
  128.  

3. DataAckProtocol


  1. import com.shandiangou.sdgprotocol.lib.ProtocolException; 
  2. import com.shandiangou.sdgprotocol.lib.SocketUtil; 
  3.  
  4. import java.io.ByteArrayOutputStream; 
  5. import java.io.UnsupportedEncodingException; 
  6.  
  7. /** 
  8.  * Created by meishan on 16/12/1. 
  9.  */ 
  10. public class DataAckProtocol extends BasicProtocol { 
  11.  
  12.     public static final int PROTOCOL_TYPE = 1; 
  13.  
  14.     private static final int ACKMSGID_LEN = 4; 
  15.  
  16.     private int ackMsgId; 
  17.  
  18.     private String unused; 
  19.  
  20.     @Override 
  21.     public int getLength() { 
  22.         return super.getLength() + ACKMSGID_LEN + unused.getBytes().length; 
  23.     } 
  24.  
  25.     @Override 
  26.     public int getProtocolType() { 
  27.         return PROTOCOL_TYPE; 
  28.     } 
  29.  
  30.     public int getAckMsgId() { 
  31.         return ackMsgId; 
  32.     } 
  33.  
  34.     public void setAckMsgId(int ackMsgId) { 
  35.         this.ackMsgId = ackMsgId; 
  36.     } 
  37.  
  38.     public String getUnused() { 
  39.         return unused; 
  40.     } 
  41.  
  42.     public void setUnused(String unused) { 
  43.         this.unused = unused; 
  44.     } 
  45.  
  46.     /** 
  47.      * 拼接发送数据 
  48.      * 
  49.      * @return 
  50.      */ 
  51.     @Override 
  52.     public byte[] genContentData() { 
  53.         byte[] base = super.genContentData(); 
  54.         byte[] ackMsgId = SocketUtil.int2ByteArrays(this.ackMsgId); 
  55.         byte[] unused = this.unused.getBytes(); 
  56.  
  57.         ByteArrayOutputStream baos = new ByteArrayOutputStream(getLength()); 
  58.         baos.write(base, 0, base.length);              //协议版本+数据类型+数据长度+消息id 
  59.         baos.write(ackMsgId, 0, ACKMSGID_LEN);         //消息id 
  60.         baos.write(unused, 0, unused.length);          //unused 
  61.         return baos.toByteArray(); 
  62.     } 
  63.  
  64.     @Override 
  65.     public int parseContentData(byte[] data) throws ProtocolException { 
  66.         int pos = super.parseContentData(data); 
  67.  
  68.         //解析ackMsgId 
  69.         ackMsgId = SocketUtil.byteArrayToInt(data, pos, ACKMSGID_LEN); 
  70.         pos += ACKMSGID_LEN; 
  71.  
  72.         //解析unused 
  73.         try { 
  74.             unused = new String(data, pos, data.length - pos, "utf-8"); 
  75.         } catch (UnsupportedEncodingException e) { 
  76.             e.printStackTrace(); 
  77.         } 
  78.  
  79.         return pos; 
  80.     } 
  81.  
  82.  

4. PingProtocol


  1. import com.shandiangou.sdgprotocol.lib.ProtocolException; 
  2. import com.shandiangou.sdgprotocol.lib.SocketUtil; 
  3.  
  4. import java.io.ByteArrayOutputStream; 
  5. import java.io.UnsupportedEncodingException; 
  6.  
  7. /** 
  8.  * Created by meishan on 16/12/1. 
  9.  */ 
  10. public class PingProtocol extends BasicProtocol { 
  11.  
  12.     public static final int PROTOCOL_TYPE = 2; 
  13.  
  14.     private static final int PINGID_LEN = 4; 
  15.  
  16.     private int pingId; 
  17.  
  18.     private String unused; 
  19.  
  20.     @Override 
  21.     public int getLength() { 
  22.         return super.getLength() + PINGID_LEN + unused.getBytes().length; 
  23.     } 
  24.  
  25.     @Override 
  26.     public int getProtocolType() { 
  27.         return PROTOCOL_TYPE; 
  28.     } 
  29.  
  30.     public int getPingId() { 
  31.         return pingId; 
  32.     } 
  33.  
  34.     public void setPingId(int pingId) { 
  35.         this.pingId = pingId; 
  36.     } 
  37.  
  38.     public String getUnused() { 
  39.         return unused; 
  40.     } 
  41.  
  42.     public void setUnused(String unused) { 
  43.         this.unused = unused; 
  44.     } 
  45.  
  46.     /** 
  47.      * 拼接发送数据 
  48.      * 
  49.      * @return 
  50.      */ 
  51.     @Override 
  52.     public byte[] genContentData() { 
  53.         byte[] base = super.genContentData(); 
  54.         byte[] pingId = SocketUtil.int2ByteArrays(this.pingId); 
  55.         byte[] unused = this.unused.getBytes(); 
  56.  
  57.         ByteArrayOutputStream baos = new ByteArrayOutputStream(getLength()); 
  58.         baos.write(base, 0, base.length);          //协议版本+数据类型+数据长度+消息id 
  59.         baos.write(pingId, 0, PINGID_LEN);         //消息id 
  60.         baos.write(unused, 0, unused.length);            //unused 
  61.         return baos.toByteArray(); 
  62.     } 
  63.  
  64.     @Override 
  65.     public int parseContentData(byte[] data) throws ProtocolException { 
  66.         int pos = super.parseContentData(data); 
  67.  
  68.         //解析pingId 
  69.         pingId = SocketUtil.byteArrayToInt(data, pos, PINGID_LEN); 
  70.         pos += PINGID_LEN; 
  71.  
  72.         try { 
  73.             unused = new String(data, pos, data.length - pos, "utf-8"); 
  74.         } catch (UnsupportedEncodingException e) { 
  75.             e.printStackTrace(); 
  76.         } 
  77.  
  78.         return pos; 
  79.     } 
  80.  
  81.  

5. PingAckProtocol


  1. import com.shandiangou.sdgprotocol.lib.ProtocolException; 
  2. import com.shandiangou.sdgprotocol.lib.SocketUtil; 
  3.  
  4. import java.io.ByteArrayOutputStream; 
  5. import java.io.UnsupportedEncodingException; 
  6.  
  7. /** 
  8.  * Created by meishan on 16/12/1. 
  9.  */ 
  10. public class PingAckProtocol extends BasicProtocol { 
  11.  
  12.     public static final int PROTOCOL_TYPE = 3; 
  13.  
  14.     private static final int ACKPINGID_LEN = 4; 
  15.  
  16.     private int ackPingId; 
  17.  
  18.     private String unused; 
  19.  
  20.     @Override 
  21.     public int getLength() { 
  22.         return super.getLength() + ACKPINGID_LEN + unused.getBytes().length; 
  23.     } 
  24.  
  25.     @Override 
  26.     public int getProtocolType() { 
  27.         return PROTOCOL_TYPE; 
  28.     } 
  29.  
  30.     public int getAckPingId() { 
  31.         return ackPingId; 
  32.     } 
  33.  
  34.     public void setAckPingId(int ackPingId) { 
  35.         this.ackPingId = ackPingId; 
  36.     } 
  37.  
  38.     public String getUnused() { 
  39.         return unused; 
  40.     } 
  41.  
  42.     public void setUnused(String unused) { 
  43.         this.unused = unused; 
  44.     } 
  45.  
  46.     /** 
  47.      * 拼接发送数据 
  48.      * 
  49.      * @return 
  50.      */ 
  51.     @Override 
  52.     public byte[] genContentData() { 
  53.         byte[] base = super.genContentData(); 
  54.         byte[] ackPingId = SocketUtil.int2ByteArrays(this.ackPingId); 
  55.         byte[] unused = this.unused.getBytes(); 
  56.  
  57.         ByteArrayOutputStream baos = new ByteArrayOutputStream(getLength()); 
  58.         baos.write(base, 0, base.length);                //协议版本+数据类型+数据长度+消息id 
  59.         baos.write(ackPingId, 0, ACKPINGID_LEN);         //消息id 
  60.         baos.write(unused, 0, unused.length);            //unused 
  61.         return baos.toByteArray(); 
  62.     } 
  63.  
  64.     @Override 
  65.     public int parseContentData(byte[] data) throws ProtocolException { 
  66.         int pos = super.parseContentData(data); 
  67.  
  68.         //解析ackPingId 
  69.         ackPingId = SocketUtil.byteArrayToInt(data, pos, ACKPINGID_LEN); 
  70.         pos += ACKPINGID_LEN; 
  71.  
  72.         //解析unused 
  73.         try { 
  74.             unused = new String(data, pos, data.length - pos, "utf-8"); 
  75.         } catch (UnsupportedEncodingException e) { 
  76.             e.printStackTrace(); 
  77.         } 
  78.  
  79.         return pos; 
  80.     } 
  81.  
  82.  

三、任务调度

上述已经给出了四种协议的实现,接下来我们将使用它们来实现app和服务端之间的通信,这里我们把数据的发送、接收和心跳分别用一个线程去实现,具体如下:

1. 客户端


  1. import android.os.Handler; 
  2. import android.os.Looper; 
  3. import android.os.Message; 
  4. import android.util.Log; 
  5.  
  6. import com.shandiangou.sdgprotocol.lib.protocol.BasicProtocol; 
  7. import com.shandiangou.sdgprotocol.lib.protocol.DataProtocol; 
  8. import com.shandiangou.sdgprotocol.lib.protocol.PingProtocol; 
  9.  
  10. import java.io.IOException; 
  11. import java.io.InputStream; 
  12. import java.io.OutputStream; 
  13. import java.net.ConnectException; 
  14. import java.net.Socket; 
  15. import java.util.concurrent.ConcurrentLinkedQueue; 
  16.  
  17. import javax.net.SocketFactory; 
  18.  
  19. /** 
  20.  * 写数据采用死循环,没有数据时wait,有新消息时notify 
  21.  * <p> 
  22.  * Created by meishan on 16/12/1. 
  23.  */ 
  24. public class ClientRequestTask implements Runnable { 
  25.  
  26.     private static final int SUCCESS = 100; 
  27.     private static final int FAILED = -1; 
  28.  
  29.     private boolean isLongConnection = true
  30.     private Handler mHandler; 
  31.     private SendTask mSendTask; 
  32.     private ReciveTask mReciveTask; 
  33.     private HeartBeatTask mHeartBeatTask; 
  34.     private Socket mSocket; 
  35.  
  36.     private boolean isSocketAvailable; 
  37.     private boolean closeSendTask; 
  38.  
  39.     protected volatile ConcurrentLinkedQueue<BasicProtocol> dataQueue = new ConcurrentLinkedQueue<>(); 
  40.  
  41.     public ClientRequestTask(RequestCallBack requestCallBacks) { 
  42.         mHandler = new MyHandler(requestCallBacks); 
  43.     } 
  44.  
  45.     @Override 
  46.     public void run() { 
  47.         try { 
  48.             try { 
  49.                 mSocket = SocketFactory.getDefault().createSocket(Config.ADDRESS, Config.PORT); 
  50. //                mSocket.setSoTimeout(10); 
  51.             } catch (ConnectException e) { 
  52.                 failedMessage(-1, "服务器连接异常,请检查网络"); 
  53.                 return
  54.             } 
  55.  
  56.             isSocketAvailable = true
  57.  
  58.             //开启接收线程 
  59.             mReciveTask = new ReciveTask(); 
  60.             mReciveTask.inputStream = mSocket.getInputStream(); 
  61.             mReciveTask.start(); 
  62.  
  63.             //开启发送线程 
  64.             mSendTask = new SendTask(); 
  65.             mSendTask.outputStream = mSocket.getOutputStream(); 
  66.             mSendTask.start(); 
  67.  
  68.             //开启心跳线程 
  69.             if (isLongConnection) { 
  70.                 mHeartBeatTask = new HeartBeatTask(); 
  71.                 mHeartBeatTask.outputStream = mSocket.getOutputStream(); 
  72.                 mHeartBeatTask.start(); 
  73.             } 
  74.         } catch (IOException e) { 
  75.             failedMessage(-1, "网络发生异常,请稍后重试"); 
  76.             e.printStackTrace(); 
  77.         } 
  78.     } 
  79.  
  80.     public void addRequest(DataProtocol data) { 
  81.         dataQueue.add(data); 
  82.         toNotifyAll(dataQueue);//有新增待发送数据,则唤醒发送线程 
  83.     } 
  84.  
  85.     public synchronized void stop() { 
  86.  
  87.         //关闭接收线程 
  88.         closeReciveTask(); 
  89.  
  90.         //关闭发送线程 
  91.         closeSendTask = true
  92.         toNotifyAll(dataQueue); 
  93.  
  94.         //关闭心跳线程 
  95.         closeHeartBeatTask(); 
  96.  
  97.         //关闭socket 
  98.         closeSocket(); 
  99.  
  100.         //清除数据 
  101.         clearData(); 
  102.  
  103.         failedMessage(-1, "断开连接"); 
  104.     } 
  105.  
  106.     /** 
  107.      * 关闭接收线程 
  108.      */ 
  109.     private void closeReciveTask() { 
  110.         if (mReciveTask != null) { 
  111.             mReciveTask.interrupt(); 
  112.             mReciveTask.isCancle = true
  113.             if (mReciveTask.inputStream != null) { 
  114.                 try { 
  115.                     if (isSocketAvailable && !mSocket.isClosed() && mSocket.isConnected()) { 
  116.                         mSocket.shutdownInput();//解决java.net.SocketException问题,需要先shutdownInput 
  117.                     } 
  118.                 } catch (IOException e) { 
  119.                     e.printStackTrace(); 
  120.                 } 
  121.                 SocketUtil.closeInputStream(mReciveTask.inputStream); 
  122.                 mReciveTask.inputStream = null
  123.             } 
  124.             mReciveTask = null
  125.         } 
  126.     } 
  127.  
  128.     /** 
  129.      * 关闭发送线程 
  130.      */ 
  131.     private void closeSendTask() { 
  132.         if (mSendTask != null) { 
  133.             mSendTask.isCancle = true
  134.             mSendTask.interrupt(); 
  135.             if (mSendTask.outputStream != null) { 
  136.                 synchronized (mSendTask.outputStream) {//防止写数据时停止,写完再停 
  137.                     SocketUtil.closeOutputStream(mSendTask.outputStream); 
  138.                     mSendTask.outputStream = null
  139.                 } 
  140.             } 
  141.             mSendTask = null
  142.         } 
  143.     } 
  144.  
  145.     /** 
  146.      * 关闭心跳线程 
  147.      */ 
  148.     private void closeHeartBeatTask() { 
  149.         if (mHeartBeatTask != null) { 
  150.             mHeartBeatTask.isCancle = true
  151.             if (mHeartBeatTask.outputStream != null) { 
  152.                 SocketUtil.closeOutputStream(mHeartBeatTask.outputStream); 
  153.                 mHeartBeatTask.outputStream = null
  154.             } 
  155.             mHeartBeatTask = null
  156.         } 
  157.     } 
  158.  
  159.     /** 
  160.      * 关闭socket 
  161.      */ 
  162.     private void closeSocket() { 
  163.         if (mSocket != null) { 
  164.             try { 
  165.                 mSocket.close(); 
  166.                 isSocketAvailable = false
  167.             } catch (IOException e) { 
  168.                 e.printStackTrace(); 
  169.             } 
  170.         } 
  171.     } 
  172.  
  173.     /** 
  174.      * 清除数据 
  175.      */ 
  176.     private void clearData() { 
  177.         dataQueue.clear(); 
  178.         isLongConnection = false
  179.     } 
  180.  
  181.     private void toWait(Object o) { 
  182.         synchronized (o) { 
  183.             try { 
  184.                 o.wait(); 
  185.             } catch (InterruptedException e) { 
  186.                 e.printStackTrace(); 
  187.             } 
  188.         } 
  189.     } 
  190.  
  191.     /** 
  192.      * notify()调用后,并不是马上就释放对象锁的,而是在相应的synchronized(){}语句块执行结束,自动释放锁后 
  193.      * 
  194.      * @param o 
  195.      */ 
  196.     protected void toNotifyAll(Object o) { 
  197.         synchronized (o) { 
  198.             o.notifyAll(); 
  199.         } 
  200.     } 
  201.  
  202.     private void failedMessage(int code, String msg) { 
  203.         Message message = mHandler.obtainMessage(FAILED); 
  204.         message.what = FAILED; 
  205.         message.arg1 = code; 
  206.         message.obj = msg; 
  207.         mHandler.sendMessage(message); 
  208.     } 
  209.  
  210.     private void successMessage(BasicProtocol protocol) { 
  211.         Message message = mHandler.obtainMessage(SUCCESS); 
  212.         message.what = SUCCESS; 
  213.         message.obj = protocol; 
  214.         mHandler.sendMessage(message); 
  215.     } 
  216.  
  217.     private boolean isConnected() { 
  218.         if (mSocket.isClosed() || !mSocket.isConnected()) { 
  219.             ClientRequestTask.this.stop(); 
  220.             return false
  221.         } 
  222.         return true
  223.     } 
  224.  
  225.     /** 
  226.      * 服务器返回处理,主线程运行 
  227.      */ 
  228.     public class MyHandler extends Handler { 
  229.  
  230.         private RequestCallBack mRequestCallBack; 
  231.  
  232.         public MyHandler(RequestCallBack callBack) { 
  233.             super(Looper.getMainLooper()); 
  234.             this.mRequestCallBack = callBack; 
  235.         } 
  236.  
  237.         @Override 
  238.         public void handleMessage(Message msg) { 
  239.             super.handleMessage(msg); 
  240.             switch (msg.what) { 
  241.                 case SUCCESS: 
  242.                     mRequestCallBack.onSuccess((BasicProtocol) msg.obj); 
  243.                     break; 
  244.                 case FAILED: 
  245.                     mRequestCallBack.onFailed(msg.arg1, (String) msg.obj); 
  246.                     break; 
  247.                 default
  248.                     break; 
  249.             } 
  250.         } 
  251.     } 
  252.  
  253.     /** 
  254.      * 数据接收线程 
  255.      */ 
  256.     public class ReciveTask extends Thread { 
  257.  
  258.         private boolean isCancle = false
  259.         private InputStream inputStream; 
  260.  
  261.         @Override 
  262.         public void run() { 
  263.             while (!isCancle) { 
  264.                 if (!isConnected()) { 
  265.                     break; 
  266.                 } 
  267.  
  268.                 if (inputStream != null) { 
  269.                     BasicProtocol reciverData = SocketUtil.readFromStream(inputStream); 
  270.                     if (reciverData != null) { 
  271.                         if (reciverData.getProtocolType() == 1 || reciverData.getProtocolType() == 3) { 
  272.                             successMessage(reciverData); 
  273.                         } 
  274.                     } else { 
  275.                         break; 
  276.                     } 
  277.                 } 
  278.             } 
  279.  
  280.             SocketUtil.closeInputStream(inputStream);//循环结束则退出输入流 
  281.         } 
  282.     } 
  283.  
  284.     /** 
  285.      * 数据发送线程 
  286.      * 当没有发送数据时让线程等待 
  287.      */ 
  288.     public class SendTask extends Thread { 
  289.  
  290.         private boolean isCancle = false
  291.         private OutputStream outputStream; 
  292.  
  293.         @Override 
  294.         public void run() { 
  295.             while (!isCancle) { 
  296.                 if (!isConnected()) { 
  297.                     break; 
  298.                 } 
  299.  
  300.                 BasicProtocol dataContent = dataQueue.poll(); 
  301.                 if (dataContent == null) { 
  302.                     toWait(dataQueue);//没有发送数据则等待 
  303.                     if (closeSendTask) { 
  304.                         closeSendTask();//notify()调用后,并不是马上就释放对象锁的,所以在此处中断发送线程 
  305.                     } 
  306.                 } else if (outputStream != null) { 
  307.                     synchronized (outputStream) { 
  308.                         SocketUtil.write2Stream(dataContent, outputStream); 
  309.                     } 
  310.                 } 
  311.             } 
  312.  
  313.             SocketUtil.closeOutputStream(outputStream);//循环结束则退出输出流 
  314.         } 
  315.     } 
  316.  
  317.     /** 
  318.      * 心跳实现,频率5秒 
  319.      * Created by meishan on 16/12/1. 
  320.      */ 
  321.     public class HeartBeatTask extends Thread { 
  322.  
  323.         private static final int REPEATTIME = 5000; 
  324.         private boolean isCancle = false
  325.         private OutputStream outputStream; 
  326.         private int pingId; 
  327.  
  328.         @Override 
  329.         public void run() { 
  330.             pingId = 1; 
  331.             while (!isCancle) { 
  332.                 if (!isConnected()) { 
  333.                     break; 
  334.                 } 
  335.  
  336.                 try { 
  337.                     mSocket.sendUrgentData(0xFF); 
  338.                 } catch (IOException e) { 
  339.                     isSocketAvailable = false
  340.                     ClientRequestTask.this.stop(); 
  341.                     break; 
  342.                 } 
  343.  
  344.                 if (outputStream != null) { 
  345.                     PingProtocol pingProtocol = new PingProtocol(); 
  346.                     pingProtocol.setPingId(pingId); 
  347.                     pingProtocol.setUnused("ping..."); 
  348.                     SocketUtil.write2Stream(pingProtocol, outputStream); 
  349.                     pingId = pingId + 2; 
  350.                 } 
  351.  
  352.                 try { 
  353.                     Thread.sleep(REPEATTIME); 
  354.                 } catch (InterruptedException e) { 
  355.                     e.printStackTrace(); 
  356.                 } 
  357.             } 
  358.  
  359.             SocketUtil.closeOutputStream(outputStream); 
  360.         } 
  361.     } 
  362.  

其中涉及到的RequestCallBack接口如下:


  1. /** 
  2.  * Created by meishan on 16/12/1. 
  3.  */ 
  4. public interface RequestCallBack { 
  5.  
  6.     void onSuccess(BasicProtocol msg); 
  7.  
  8.     void onFailed(int errorCode, String msg); 
  9.  

2. 服务端


  1. import java.io.DataInputStream; 
  2. import java.io.DataOutputStream; 
  3. import java.net.Socket; 
  4. import java.util.Iterator; 
  5. import java.util.concurrent.ConcurrentHashMap; 
  6. import java.util.concurrent.ConcurrentLinkedQueue; 
  7.  
  8. /** 
  9.  * Created by meishan on 16/12/1. 
  10.  */ 
  11. public class ServerResponseTask implements Runnable { 
  12.  
  13.     private ReciveTask reciveTask; 
  14.     private SendTask sendTask; 
  15.     private Socket socket; 
  16.     private ResponseCallback tBack; 
  17.  
  18.     private volatile ConcurrentLinkedQueue<BasicProtocol> dataQueue = new ConcurrentLinkedQueue<>(); 
  19.     private static ConcurrentHashMap<String, Socket> onLineClient = new ConcurrentHashMap<>(); 
  20.  
  21.     private String userIP; 
  22.  
  23.     public String getUserIP() { 
  24.         return userIP; 
  25.     } 
  26.  
  27.     public ServerResponseTask(Socket socket, ResponseCallback tBack) { 
  28.         this.socket = socket; 
  29.         this.tBack = tBack; 
  30.         this.userIP = socket.getInetAddress().getHostAddress(); 
  31.         System.out.println("用户IP地址:" + userIP); 
  32.     } 
  33.  
  34.     @Override 
  35.     public void run() { 
  36.         try { 
  37.             //开启接收线程 
  38.             reciveTask = new ReciveTask(); 
  39.             reciveTask.inputStream = new DataInputStream(socket.getInputStream()); 
  40.             reciveTask.start(); 
  41.  
  42.             //开启发送线程 
  43.             sendTask = new SendTask(); 
  44.             sendTask.outputStream = new DataOutputStream(socket.getOutputStream()); 
  45.             sendTask.start(); 
  46.         } catch (Exception e) { 
  47.             e.printStackTrace(); 
  48.         } 
  49.     } 
  50.  
  51.     public void stop() { 
  52.         if (reciveTask != null) { 
  53.             reciveTask.isCancle = true
  54.             reciveTask.interrupt(); 
  55.             if (reciveTask.inputStream != null) { 
  56.                 SocketUtil.closeInputStream(reciveTask.inputStream); 
  57.                 reciveTask.inputStream = null
  58.             } 
  59.             reciveTask = null
  60.         } 
  61.  
  62.         if (sendTask != null) { 
  63.             sendTask.isCancle = true
  64.             sendTask.interrupt(); 
  65.             if (sendTask.outputStream != null) { 
  66.                 synchronized (sendTask.outputStream) {//防止写数据时停止,写完再停 
  67.                     sendTask.outputStream = null
  68.                 } 
  69.             } 
  70.             sendTask = null
  71.         } 
  72.     } 
  73.  
  74.     public void addMessage(BasicProtocol data) { 
  75.         if (!isConnected()) { 
  76.             return
  77.         } 
  78.  
  79.         dataQueue.offer(data); 
  80.         toNotifyAll(dataQueue);//有新增待发送数据,则唤醒发送线程 
  81.     } 
  82.  
  83.     public Socket getConnectdClient(String clientID) { 
  84.         return onLineClient.get(clientID); 
  85.     } 
  86.  
  87.     /** 
  88.      * 打印已经链接的客户端 
  89.      */ 
  90.     public static void printAllClient() { 
  91.         if (onLineClient == null) { 
  92.             return
  93.         } 
  94.         Iterator<String> inter = onLineClient.keySet().iterator(); 
  95.         while (inter.hasNext()) { 
  96.             System.out.println("client:" + inter.next()); 
  97.         } 
  98.     } 
  99.  
  100.     public void toWaitAll(Object o) { 
  101.         synchronized (o) { 
  102.             try { 
  103.                 o.wait(); 
  104.             } catch (InterruptedException e) { 
  105.                 e.printStackTrace(); 
  106.             } 
  107.         } 
  108.     } 
  109.  
  110.     public void toNotifyAll(Object obj) { 
  111.         synchronized (obj) { 
  112.             obj.notifyAll(); 
  113.         } 
  114.     } 
  115.  
  116.     private boolean isConnected() { 
  117.         if (socket.isClosed() || !socket.isConnected()) { 
  118.             onLineClient.remove(userIP); 
  119.             ServerResponseTask.this.stop(); 
  120.             System.out.println("socket closed..."); 
  121.             return false
  122.         } 
  123.         return true
  124.     } 
  125.  
  126.     public class ReciveTask extends Thread { 
  127.  
  128.         private DataInputStream inputStream; 
  129.         private boolean isCancle; 
  130.  
  131.         @Override 
  132.         public void run() { 
  133.             while (!isCancle) { 
  134.                 if (!isConnected()) { 
  135.                     isCancle = true
  136.                     break; 
  137.                 } 
  138.  
  139.                 BasicProtocol clientData = SocketUtil.readFromStream(inputStream); 
  140.  
  141.                 if (clientData != null) { 
  142.                     if (clientData.getProtocolType() == 0) { 
  143.                         System.out.println("dtype: " + ((DataProtocol) clientData).getDtype() + ", pattion: " + ((DataProtocol) clientData).getPattion() + ", msgId: " + ((DataProtocol) clientData).getMsgId() + ", data: " + ((DataProtocol) clientData).getData()); 
  144.  
  145.                         DataAckProtocol dataAck = new DataAckProtocol(); 
  146.                         dataAck.setUnused("收到消息:" + ((DataProtocol) clientData).getData()); 
  147.                         dataQueue.offer(dataAck); 
  148.                         toNotifyAll(dataQueue); //唤醒发送线程 
  149.  
  150.                         tBack.targetIsOnline(userIP); 
  151.                     } else if (clientData.getProtocolType() == 2) { 
  152.                         System.out.println("pingId: " + ((PingProtocol) clientData).getPingId()); 
  153.  
  154.                         PingAckProtocol pingAck = new PingAckProtocol(); 
  155.                         pingAck.setUnused("收到心跳"); 
  156.                         dataQueue.offer(pingAck); 
  157.                         toNotifyAll(dataQueue); //唤醒发送线程 
  158.  
  159.                         tBack.targetIsOnline(userIP); 
  160.                     } 
  161.                 } else { 
  162.                     System.out.println("client is offline..."); 
  163.                     break; 
  164.                 } 
  165.             } 
  166.  
  167.             SocketUtil.closeInputStream(inputStream); 
  168.         } 
  169.     } 
  170.  
  171.     public class SendTask extends Thread { 
  172.  
  173.         private DataOutputStream outputStream; 
  174.         private boolean isCancle; 
  175.  
  176.         @Override 
  177.         public void run() { 
  178.             while (!isCancle) { 
  179.                 if (!isConnected()) { 
  180.                     isCancle = true
  181.                     break; 
  182.                 } 
  183.  
  184.                 BasicProtocol procotol = dataQueue.poll(); 
  185.                 if (procotol == null) { 
  186.                     toWaitAll(dataQueue); 
  187.                 } else if (outputStream != null) { 
  188.                     synchronized (outputStream) { 
  189.                         SocketUtil.write2Stream(procotol, outputStream); 
  190.                     } 
  191.                 } 
  192.             } 
  193.  
  194.             SocketUtil.closeOutputStream(outputStream); 
  195.         } 
  196.     } 

其中涉及到的ResponseCallback接口如下:


  1. /** 
  2.  * Created by meishan on 16/12/1. 
  3.  */ 
  4. public interface ResponseCallback { 
  5.  
  6.     void targetIsOffline(DataProtocol reciveMsg); 
  7.  
  8.     void targetIsOnline(String clientIp); 
  9.  

上述代码中处理了几种情况下的异常,比如,建立连接后,服务端停止运行,此时客户端的输入流还在阻塞状态,怎么保证客户端不抛出异常,这些处理可以结合SocketUtil类来看。

四、调用封装

1. 客户端

import com.shandiangou.sdgprotocol.lib.protocol.DataProtocol;


  1. import com.shandiangou.sdgprotocol.lib.protocol.DataProtocol; 
  2.  
  3. /** 
  4.  * Created by meishan on 16/12/1. 
  5.  */ 
  6. public class ConnectionClient { 
  7.  
  8.     private boolean isClosed; 
  9.  
  10.     private ClientRequestTask mClientRequestTask; 
  11.  
  12.     public ConnectionClient(RequestCallBack requestCallBack) { 
  13.         mClientRequestTask = new ClientRequestTask(requestCallBack); 
  14.         new Thread(mClientRequestTask).start(); 
  15.     } 
  16.  
  17.     public void addNewRequest(DataProtocol data) { 
  18.         if (mClientRequestTask != null && !isClosed) 
  19.             mClientRequestTask.addRequest(data); 
  20.     } 
  21.  
  22.     public void closeConnect() { 
  23.         isClosed = true
  24.         mClientRequestTask.stop(); 
  25.     } 
  26.  

2. 服务端


  1. import com.shandiangou.sdgprotocol.lib.protocol.DataProtocol; 
  2.  
  3. import java.io.IOException; 
  4. import java.net.ServerSocket; 
  5. import java.net.Socket; 
  6. import java.util.concurrent.ExecutorService; 
  7. import java.util.concurrent.Executors; 
  8.  
  9. /** 
  10.  * Created by meishan on 16/12/1. 
  11.  */ 
  12. public class ConnectionServer { 
  13.  
  14.     private static boolean isStart = true
  15.     private static ServerResponseTask serverResponseTask; 
  16.  
  17.     public ConnectionServer() { 
  18.  
  19.     } 
  20.  
  21.     public static void main(String[] args) { 
  22.  
  23.         ServerSocket serverSocket = null
  24.         ExecutorService executorService = Executors.newCachedThreadPool(); 
  25.         try { 
  26.             serverSocket = new ServerSocket(Config.PORT); 
  27.             while (isStart) { 
  28.                 Socket socket = serverSocket.accept(); 
  29.                 serverResponseTask = new ServerResponseTask(socket, 
  30.                         new ResponseCallback() { 
  31.  
  32.                             @Override 
  33.                             public void targetIsOffline(DataProtocol reciveMsg) {// 对方不在线 
  34.                                 if (reciveMsg != null) { 
  35.                                     System.out.println(reciveMsg.getData()); 
  36.                                 } 
  37.                             } 
  38.  
  39.                             @Override 
  40.                             public void targetIsOnline(String clientIp) { 
  41.                                 System.out.println(clientIp + " is onLine"); 
  42.                                 System.out.println("-----------------------------------------"); 
  43.                             } 
  44.                         }); 
  45.  
  46.                 if (socket.isConnected()) { 
  47.                     executorService.execute(serverResponseTask); 
  48.                 } 
  49.             } 
  50.  
  51.             serverSocket.close(); 
  52.  
  53.         } catch (IOException e) { 
  54.             e.printStackTrace(); 
  55.         } finally { 
  56.             if (serverSocket != null) { 
  57.                 try { 
  58.                     isStart = false
  59.                     serverSocket.close(); 
  60.                     if (serverSocket != null
  61.                         serverResponseTask.stop(); 
  62.                 } catch (IOException e) { 
  63.                     e.printStackTrace(); 
  64.                 } 
  65.             } 
  66.         } 
  67.     } 
  68.  

总结

实现自定义协议的关键在于协议的拼装和解析,上述已给出了关键的代码,如果需要查看完整的代码以及demo,可以下载源码

注意:先运行服务端demo的main函数,再查看本机的ip地址,然后修改客户端(android)代码中Config.java里面的ip地址,当然,要确保android手机和服务端在同一个局域里面,最后再打开客户端。




作者:枚杉
来源:51CTO
上一篇:PostgreSQL 11 preview - bloom filter 误报率评估测试及如何降低误报 - 暨bloom filter应用于HEAP与INDEX的一致性检测


下一篇:mybatis Example条件查询