一、背景
在编程中通信可以说是使用的最频繁的了,一个好的通信框架可以很大的提高系统的稳定性和编程的简洁性。可能之前我们使用现有的框架可以不考虑消息同步的问题,但是我觉得了解一点这方面的内容还是很有帮助的。今天给大家讲讲一个比较好的消息的同步的策略。
二、为什么使用消息同步
在通信的时候使用消息同步可以降低代码的复杂度,避免使用回调来处理业务,可以使代码更加的优雅。使用封装好的消息同步模块,即使是在在编写会使用通信的代码时,也可以像编写普通的同步代码一样,在代码中完全感受不到通信的存在。(只需要调用一个方法,返回值就是服务器发送过来的消息)
三、消息同步的实现
我们先来网络通信的时序图吧。
下面是消息同步的实现过程
然后我们看看使用了消息同步策略的发送消息的时序图
封装好了的话我们编写需要通信的消息就可以像下面这样了
BaseMsg reponseMsg = syncSendMsg(msg); // 发送一条请求并接收响应。
怎么样,业务代码是不是很简单。下面我们看下具体的实现流程吧!
首先我们先定义一个消息对象BaseMsg,所有与外部应用通信的消息都必须使用同样的消息格式。然后定义一个处理消息的单实例对象SendMsgObject,用来发送和接收所有的消息。然后在定义一个消息等待对象RspMsgWaiter,发送一个消息后,就new一个消息等待对象,并将该对象放进一个表中,通过执行wait()方法来等待接收消息。当接收到响应后,首先先去表中查找有没有对应的消息等待对象,如果有的话,则调用该消息等待对象的notifyAll()方法,消息等待对象执行了notifyAll()方法后,等待接收对象的wait()方法就会跳过,将消息响应返回给业务层。
四:注意事项
1.每个消息必须有个唯一的消息码。系统会根据消息码寻找对应的发送消息的对象。
五、代码实例(基于Java语言)
客户端
public class BaseMsg {
/**消息长度 */
private int length;
/**消息序号 */
private int sequence;
/**消息体 */
private byte[] msg;
/**
* 解码消息
* @throws IOException
* */
public void decodeMsg(DataInputStream inputStream) throws IOException {
length = inputStream.readInt();
sequence = inputStream.readInt();
msg = new byte[length];
inputStream.read(msg);
}
/**
* 编码消息
* @throws IOException
* */
public void encodingMsg(OutputStream outPutStream ) throws IOException {
DataOutputStream dataOutputStream = new DataOutputStream(outPutStream);
dataOutputStream.writeInt(msg.length);
dataOutputStream.writeInt(sequence);
dataOutputStream.write(msg);
}
/**
* 编码消息
* @throws IOException
* */
public byte[] encodingMsg() throws IOException {
ByteArrayOutputStream stream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(stream);
dataOutputStream.writeInt(msg.length);
dataOutputStream.writeInt(sequence);
dataOutputStream.write(msg);
return stream.toByteArray();
}
public Integer getLength() {
return length;
}
public void setLength(Integer length) {
this.length = length;
}
public byte[] getMsg() {
return msg;
}
public void setMsg(byte[] msg) {
this.msg = msg;
}
public Integer getSequence() {
return sequence;
}
public void setSequence(Integer sequence) {
this.sequence = (int) (Math.random() *Integer.MAX_VALUE);
}
@Override
public String toString() {
try {
return "BaseMsg [length=" + length + ", sequence=" + sequence + ", msg=" + new String(msg,"utf-8") + "]";
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return null;
}
}
public class ClientMain {
static int count = 0;
public static void main(String[] args) throws IOException, InterruptedException, ExecutionException {
MsgSendObject sendObject = MsgSendObject.getInstance(); // 得到一个消息发送对象
BaseMsg msg = new BaseMsg(); // 定义一个需要发送的消息
String msgBody = System.currentTimeMillis() + "clientSend"+ count;
msg.setMsg(msgBody.getBytes("utf-8"));
BaseMsg receiveMsg = sendObject.syncSendMsg(msg); // 使用消息发送对象将消息发送出去,并且收到响应
System.out.println("receiveMsg:" + receiveMsg);
}
}
public class MsgSendObject {
private static MsgSendObject INSTANEC;
private Socket socket = null;
private int DEFAULT_PORT = 9389;
private OutputStream outStream ;
private InputStream inputStream;
private ExecutorService executors; // 定义一个线程池
private Map<Integer, RspMsgWaiter> msgPool = new HashMap<>();
private MsgSendObject() throws IOException {
executors = Executors.newSingleThreadExecutor();
socket = new Socket();
socket.connect(new InetSocketAddress("127.0.0.1", DEFAULT_PORT)); // 连接到服务器
outStream = socket.getOutputStream();
inputStream = socket.getInputStream();
receiveMsg(); // 接收消息
}
public static synchronized MsgSendObject getInstance() throws IOException {
if(INSTANEC==null){
INSTANEC = new MsgSendObject();
}
return INSTANEC;
}
/**
* 发送同步消息
* @throws ExecutionException
* @throws InterruptedException
* */
public BaseMsg syncSendMsg(final BaseMsg msg) throws InterruptedException, ExecutionException {
return sendMsg(msg, null);
}
/**
* 发送同步消息
* @param waitTime 超时时间
* @param repertTime 重试
* @throws ExecutionException
* @throws InterruptedException
* */
public BaseMsg syncSendMsg(final BaseMsg msg, Long waitTime, int repertTime) throws InterruptedException, ExecutionException {
BaseMsg retBaseMsg = null;
for(int i = 0; i < repertTime; i ++) {
retBaseMsg = sendMsg(msg, waitTime);
if(retBaseMsg != null) {
return retBaseMsg;
}
}
return retBaseMsg;
}
/**
* 发送消息
* */
private BaseMsg sendMsg(final BaseMsg msg, Long waitTime) throws InterruptedException, ExecutionException {
Future<BaseMsg> baseMsgFuture = executors.submit(new Callable<BaseMsg>() {
@Override
public BaseMsg call() throws Exception {
RspMsgWaiter waiter = null;
if(waitTime == null){
waiter = new RspMsgWaiter();
} else {
waiter = new RspMsgWaiter(waitTime);
}
msgPool.put(msg.getSequence(), waiter);
outStream.write(msg.encodingMsg());
return waiter.waitRsp();
}
});
return baseMsgFuture.get();
}
/**
* 接收消息
*
* @throws IOException
*/
public void receiveMsg() throws IOException {
new Thread(){
@Override
public void run() {
while (true) {
try {
BaseMsg baseMsg = new BaseMsg();
baseMsg.decodeMsg(new DataInputStream(inputStream));
if(msgPool.containsKey(baseMsg.getSequence())) {
RspMsgWaiter waiter = msgPool.get(baseMsg.getSequence());
waiter.onRsp(baseMsg);
msgPool.remove(baseMsg.getSequence());
}
} catch (Exception e) {
}
}
}
}.start();
}
}
interface RspCallback {
/**
* 超时
* */
public void onTimeout(long time);
/**
* 接收到响应
* */
public void onRsp(BaseMsg rsp);
public BaseMsg waitRsp() throws InterruptedException;
}
class RspMsgWaiter implements RspCallback {
public static final int DEFAULT_WAIT_TIMEOUT = 500;
private BaseMsg rspMsg;
private long timeout;
public RspMsgWaiter() {
timeout = DEFAULT_WAIT_TIMEOUT;
}
public RspMsgWaiter(long timeout) {
this.timeout = timeout;
}
@Override
public void onRsp(BaseMsg rsp) {
synchronized(this) {
rspMsg = rsp;
notifyAll();
}
}
@Override
public void onTimeout(long time) {
synchronized(this) {
notifyAll();
}
}
@Override
public BaseMsg waitRsp() throws InterruptedException {
//有可能waitRsp还没来得及调用,应答就来了(onRsp被调用)
//所以这里先判断rspMsg是否为null再wait,rspMsg不为null说明应答早就来了,直接返回rspMsg
synchronized(this) {
if(rspMsg != null) {
return rspMsg;
}
wait(timeout);
return rspMsg;
}
}
}
服务端
public class ServiceMain {
static ServerSocket server = null;
static int default_port = 9389;
public static List<Client> clientList = new LinkedList<>();
public static void main(String[] args) throws IOException {
server = new ServerSocket(default_port);
clientConnect();
}
public static void clientConnect() throws IOException {
new Thread(){
public void run(){
while(true){
Socket clientSocket = null;
try {
clientSocket = server.accept();
} catch (IOException e) {
e.printStackTrace();
}
Client client = null;
try {
client = new Client(clientSocket);
} catch (IOException e) {
e.printStackTrace();
}
clientList.add(client);
}
}
}.start();
}
}
class Client implements Runnable {
private boolean isConnect = true;
private InputStream inputStream;
private OutputStream outputStream;
public Client(Socket socket) throws IOException {
this.inputStream = socket.getInputStream();
this.outputStream = socket.getOutputStream();
new Thread(this).start();
}
@Override
public void run() {
while(isConnect) {
try {
BaseMsg baseMsg = new BaseMsg();
baseMsg.decodeMsg(new DataInputStream(inputStream));
System.out.println("Server Receive msg"+ baseMsg.toString());
baseMsg.setMsg(baseMsg.toString().getBytes("UTF-8"));
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
sendMsg(baseMsg);
} catch (IOException e) {
isConnect = false;
ServiceMain.clientList.remove(this);
}
}
}
/**
*
* */
private void sendMsg(BaseMsg baseMsg) throws IOException{
baseMsg.encodingMsg(outputStream);
}
}