基于Netty的私有协议栈的开发

基于Netty的私有协议栈的开发

书是人类进步的阶梯,每读一本书都使自己得以提升,以前看书都是看了就看了,当时感觉受益匪浅,时间一长就又还回到书本了!所以说,好记性不如烂笔头,以后每次看完一本书都写一些读后感,对于技术书则把对让自己醍醐灌顶的篇章记录下来,以便以后翻阅查看,也是记录自己学习的过程- _ -。

OK!言归正传,最近由于公司需要做一个网关项目,需要用到基于TCP/IP私有协议接收数据,看完了《Netty权威指南》这本书,感觉作者写的很好,有些地方让我获益良多,虽然书上有些例子跑不通(可能是因为环境问题吧),但是不妨碍理解作者想表达的意思,以前看Hadoop的时候总是对它的底层实现挺感兴趣的,但最后不了了之了,看完这本书后,让我明白了很多,感觉有必要拿出来分享一下书中第十四章讲解的关于Netty的私有协议的设计和开发。

一、私有协议介绍

首先需要了解一些什么是私有协议,私有协议本质上是厂商内部发展和采用的标准,或者两系统之间约定的数据交互格式,如书中提到的例子中提到的协议:

名称

字段

类型

长度

描述

Header

crcCode

Int

32

Netty消息校验码

Length

Int

32

整个消息长度

sessionID

Long

64

会话ID

Type

Byte

8

0:业务请求消息

1:业务响应消息

2:业务one way消息

3握手请求消息

4握手应答消息

5:心跳请求消息

6:心跳应答消息

Priority

Byte

8

消息优先级:0~255

Attachment

Map<String,Object>

变长

可选字段,由于推展消息头

Body

Object

变长

对于请求消息,它是方法的参数

对于响应消息,它是返回值

二、Netty协议栈功能设计

Netty协议栈用于内部各模块之间的通信,它基于TCP/IP协议栈,是一个类HTTP协议的应用层协议栈,相比于传统的标准协议栈,它更加轻巧、灵活和实用。

1.协议栈功能描述

Netty协议栈承载了业务内部各模块之间的消息交互和服务调用,它的主要功能如下:

(1) 基于Netty的NIO通信框架,提供高性能的异步通信能力;

(2) 提供消息的编解码框架,可以实现POJO的序列化和反序列化;

(3) 提供基于IP地址的白名单接入认证机制;

(4) 链路的有效性校验机制;

(5) 链路的断连重连机制;

2.通信模型

Netty协议栈通信模型如图所示:

(1)Netty协议栈客户端发送握手请求消息,携带节点ID等有效身份认证;

(2)Netty协议栈服务端对握手请求消息进行合法性校验,包括节点ID有效性校验、节点重复登录校验和IP地址合法性校验,校验通过后,返回登录成功的握手应答消息;

(3)链路建立成功之后,客户端发送业务消息;

(4)链路成功之后,服务端发送心跳消息;

(5)链路建立成功之后,客户端发送心跳消息;

(6)链路建立成功之后,服务端发送业务消息;

(7)服务端退出时,服务端关闭连接,客户端感知对方关闭连接后,被动关闭客户端连接

备注:需要指出的是,Netty协议通信双方链路建立成功之后,双方可以进行全双工通信,无论客户端还是服务器端,都可以主动发送请求消息给对方,通信方式可以是TWO WAY或者ONE WAY。双方之间的心跳采用Ping-Pong机制,当链路处于空闲状态时,客户端主动发送Ping消息给服务端,服务端接收到Ping消息后发送应答消息Pong给客户端,如果客户端连续发送N条Ping消息都没有接收到服务端返回的Pong消息,说明链路已经挂死或者对方出入异常状态,客户端主动关闭连接,间隔周期T后发起重连操作,知道重连成功。

三、消息定义

Netty协议栈消息定义包含消息头和消息体两部分

Netty消息定义表

名称

类型

长度

描述

Header

Header

变长

消息头定义

Body

Object

变长

对于请求消息,它只是方法的参数

对于响应消息,它是返回值

Netty协议消息头定义(Header)

名称

类型

长度

描述

crcCode

Int

32

Netty消息校验码

Length

Int

32

整个消息长度

sessionID

Long

64

会话ID

Type

Byte

8

0:业务请求消息

1:业务响应消息

2:业务one way消息

3握手请求消息

4握手应答消息

5:心跳请求消息

6:心跳应答消息

Priority

Byte

8

消息优先级:0~255

Attachment

Map<String,Object>

变长

可选字段,由于推展消息头

四、私有协议的编解码规范

4.1 协议的编码

Netty协议NettyMessage的编码规范如下:

(1)crcCode:java.nio.ByteBuffer.putInt(int value),如果采用其他缓冲区实现,必须与其等价;

(2)Length:java.nio.ByteBuffer.putInt(int value),如果采用其他缓冲区实现,必须与其等价;

(3)sessionID:java.nio.ByteBuffer.putLong(int value),如果采用其他缓冲区实现,必须与其等价;

(4)Type:java.nio.ByteBuffer.put(byte value),如果采用其他缓冲区实现,必须与其等价;

(5)Priority:java.nio.ByteBuffer.put(byte value),如果采用其他缓冲区实现,必须与其等价;

(6)attachMent:它的编码规则为----如果attachment长度为0,标示没有可选附件,则将长度编码设为0,java.nio.ByteBuffer.putInt(0);如果大于0,说明有附件需要编码,具体的编码规则入校----

  1. 首先对附件的个数进行编码,Java.nio.ByteBuffer.putInt(attachment.size());

然后对Key进行编码,先编码长度,再将它转换成byte数组之后编码内容。

(7)Body的编码:通过JBoss Marshalling将其序列化为byte数组,然后调用java.nio.ByteBuffer.put(byte[] src)将其写入ByteBuffer缓冲区中。

由于整个消息的长度必须等全部字段都编码完成之后才能确认,所以最后需要更新消息头中的length字段,将其重新写入ByteBuffer中。

4.2 协议的解码

相对于Netty的编码,仍旧以java.nio.ByteBuffer为例,给出Netty协议的解码规范。

(1)crcCode:通过java.nio.ByteBuffer.getInt()获取校验码字段,其他缓冲区需要与其等价;

(2)Length:通过java.nio.ByteBuffer.getInt()获取校验码字段,其他缓冲区需要与其等价;

(3)sessionID:通过java.nio.ByteBuffer.getLong()获取校验码字段,其他缓冲区需要与其等价;

(4)Type:通过java.nio.byteBuffer.get()获取消息类型,其他缓冲区需要与其等价;

(5)priority:通过java.nio.byteBuffer.get()获取消息优先级,其他缓冲区需要与其等价;

(6)Attachment:它的剑麻规则为--首先创建一个新的attachment对象,调用java.nio.ByteBuffer.getInt()获取附件的长度,如果为0,说明附件为空,解码结束,继续解码消息头;如果非空,则根据长度通过for循环进行解码。

(7)Body:通过Jboss的marshaller对其进行解码。

五、链路的建立

Netty协议栈支持服务端和客服端,对于使用Netty协议栈的应用程序而言,不需要刻意区分到底是客户端还是服务器端,在分布式组网环境中,一个节点可能既是客户端也是服务器端,这个依据具体的用户场景而定。

Netty协议栈对客户端的说明如下:如果A节点需要调用B节点的服务,但是A和B之间还没有建立物理链路,则有调用方主动发起连接,此时,调用方为客户端,被调用方为服务端。

考虑到安全,链路建立需要通过基于Ip地址或者号段的黑白名单安全认证机制,作为样例,本协议使用基于IP地址的安全认知,如果有多个Ip,通过逗号进行分割。在实际的商用项目中,安全认证机制会更加严格,例如通过密钥对用户名和密码进行安全认证。

客户端与服务端链路建立成功之后,由客户端发送握手请求消息,握手请求消息的定义如下

(1) 消息头的type字段值为3;

(2) 可选附件数为0;

(3) 消息头为空

(4) 握手消息的长度为22个字节

服务端接收到客户端的握手请求消息之后,如果IP校验通过,返回握手成功应答消息给客户端,应用层链路建立成功。握手应答消息定义如下:

(1)消息头的type字段值为4

(2)可选附件个数为0;

(3)消息体为byte类型的结果,0:认证成功;-1认证失败;

链路建立成功之后,客户端和服务端就可以互相发送业务消息了。

六、链路的关闭

由于采用长连接通信,在正常的业务运行期间,双方通过心跳和业务消息维持链路,任何一方都不需要主动关闭连接。

但是,在以下情况下,客户端和服务端需要关闭连接:

(1)当对方宕机或者重启时,会主动关闭链路,另一方读取到操作系统的通知信号

得知对方REST链路,需要关闭连接,释放自身的句柄等资源。由于采用TCP全双工通信,通信双方都需要关闭连接,释放资源;

(2)消息读写过程中,发生了I/O异常,需要主动关闭连接;

(3)心跳消息读写过程发生了I/O异常,需要主动关闭连接;

(4)心跳超时,需要主动关闭连接;

(5)发生编码异常等不可恢复错误时,需要主动关闭连接。

七、可靠性设计

Netty协议栈可能会运行在非常恶劣的网络环境中,网络超时、闪断、对方进程僵死或者处理缓慢等情况都有可能发生。为了保证在这些极端异常场景下Netty协议栈仍能够正常工作或者自动恢复,需要对他的可靠性进行统一规划和设计。

7.1心跳机制

在凌晨等业务低谷时段,如果发生网络闪断、连接被Hang住等问题时,由于没有业务消息,应用程序很难发现。到了白天业务高峰期时,会发生大量的网络通信失败,严重的会导致一段时间进程内无法处理业务消息。为了解决这个问题,在网络空闲时采用心跳机制来检测链路的互通性,一旦发现网络故障,立即关闭链路,主动重连。

具体的设计思路如下:

(1)当网络处于空闲状态持续时间达到T(连续周期T没有读写消息)时,客户端主动发送Ping心跳消息给服务端;

(2)如果在下一个周期T到来时客户端没有收到对方发送的Pong心跳应答消息或者读取到服务端发送的其他业务消息,则心跳失败计数器加1;

(3)每当客户端接收到服务的业务消息或者Pong应答消息,将心跳失败计数器清零;当练习N次没有接收到服务端的Pong消息或者业务消息,则关闭链路,间隔INTERVAL时间后发起重连操作;

(4)服务端网络空闲状态持续时间达到T后,服务端将心跳失败计数器加1;只要接收到客户端发送的Ping消息或者其他业务消息,计数器清零;

(5)服务端连续N次没有接收到客户端的ping消息或者其他业务消息,则关闭链路,释放资源,等到客户端重连。

通过Ping-Pong双向心跳机制,可以保证无论通信哪一方出现网络故障,都能被及时的检查出来,为了防止由于对方短时间内繁忙没有及时返回应答造成的误判,只有连续N次心跳检查都失败才认定链路已经损害,需要关闭链路并重建链路。

当读或者写心跳消息发生I/O异常的时候,说明已经中断,此时需要立即关闭连接,如果是客户端,需要重新发起连接。如果是服务端,需要清空缓存的半包信息,等到客户端重连。

7.2重连机制

如果链路中断,等到INTEVAL时间后,由客户端发起重连操作,如果重连失败,间隔周期INTERVAL后再次发起重连,直到重连成功。

为了保持服务端能够有充足的时间释放句柄资源,在首次断连时客户端需要等待INTERVAL时间之后再发起重连,而不是失败后立即重连。

为了保证句柄资源能够及时释放,无论什么场景下重连失败,客户端必须保证自身的资源被及时释放,包括但不现居SocketChannel、Socket等。

重连失败后,需要打印异常堆栈信息,方便后续的问题定位。

7.3重复登录保护

当客户端握手成功之后,在链路处于正常状态下,不允许客户端重复登录,以防止客户端在异常状态下反复重连导致句柄资源被耗尽。

服务端接收到客户端的握手请求消息之后,首先对IP地址进行合法性校验,如果校验成功,在缓存的地址表中查看客户端是否已经登录,如果登录,则拒绝重复登录,返回错误码-1,同时关闭TCP链路,并在服务端的日志中打印握手失败的原因。

客户端接收到握手失败的应答消息之后,关闭客户端的TCP连接,等待INTERVAL时间之后,再次发起TCP连接,知道认证成功。

为了防止由服务端和客户端对链路状态理解不一致导致的客户端无法握手成功问题,当服务端连续N次心跳超时之后需要主动关闭链路,清空改客户端的地址缓存信息,以保证后续改客户端可以重连成功,防止被重复登录保护机制拒绝掉。

7.4消息缓存重发

无论客户端还是服务端,当发生链路中断之后,在链路恢复之前,缓存的消息队列中待发送的消息不能丢失,等链路恢复之后,重新发送这些消息,保证链路中断期间消息不丢失。

考虑到内存溢出的风险,建议消息缓存队列设置上限,当达到上限之后,应该拒绝继续想该队列添加新的消息。

八、代码设计

8.1 数据结构定义

首先对数据结构进行定义,netty的消息定义如下

package cn.yesway.demo.privateprotocol.model;

public class NettyMessage {

private Header header;//消息头

private Object body;//消息体

public Header getHeader() {

return header;

}

public void setHeader(Header header) {

this.header = header;

}

public Object getBody() {

return body;

}

public void setBody(Object body) {

this.body = body;

}

@Override

public String toString() {

return "NettyMessage [header="+header+"]";

}

}

消息头(Header)的定义如下:

package cn.yesway.demo.privateprotocol.model;

import java.util.HashMap;

import java.util.Map;

public class Header {

private int crcCode=0xabef0101;

private int length;//消息长度

private long sessionID;//回话ID

private byte type;//消息类型

private byte priority;//消息优先级

private Map<String,Object> attachment=new HashMap<String, Object>();//附件

public int getCrcCode() {

return crcCode;

}

public void setCrcCode(int crcCode) {

this.crcCode = crcCode;

}

public int getLength() {

return length;

}

public void setLength(int length) {

this.length = length;

}

public long getSessionID() {

return sessionID;

}

public void setSessionID(long sessionID) {

this.sessionID = sessionID;

}

public byte getType() {

return type;

}

public void setType(byte type) {

this.type = type;

}

public byte getPriority() {

return priority;

}

public void setPriority(byte priority) {

this.priority = priority;

}

public Map<String, Object> getAttachment() {

return attachment;

}

public void setAttachment(Map<String, Object> attachment) {

this.attachment = attachment;

}

@Override

public String toString() {

return "Header [crcCode=" + crcCode + ", length=" + length

+ ", sessionID=" + sessionID + ", type=" + type + ", priority="

+ priority + ", attachment=" + attachment + "]";

}

}

由于心跳消息、握手请求和握手应答消息都可以统一由NettyMessage承载,所以不需要为这几个内控制消息做单独的数据结构定义。

8.2 消息编解码

分别定义NettyMessageDecoder和NettyMessageEncoder用于NettyMessage消息的编解码,他们的具体实现如下:

Netty消息编码类:NettyMessageEncoder

package cn.yesway.demo.privateprotocol.codec;

import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandlerContext;

import io.netty.handler.codec.MessageToByteEncoder;

import java.io.IOException;

import java.util.Map;

import cn.yesway.demo.privateprotocol.model.NettyMessage;

public class NettyMessageEncoder extends MessageToByteEncoder<NettyMessage>{

private MarshallingEncoder marshallingEncoder;

public NettyMessageEncoder() throws IOException{

marshallingEncoder = new MarshallingEncoder();

}

@Override

protected void encode(ChannelHandlerContext ctx, NettyMessage msg,

ByteBuf out) throws Exception {

if(msg==null||msg.getHeader()==null){

throw new Exception("The encode message is null");

}

out.writeInt(msg.getHeader().getCrcCode());

out.writeInt(msg.getHeader().getLength());

out.writeLong(msg.getHeader().getSessionID());

out.writeByte(msg.getHeader().getType());

out.writeByte(msg.getHeader().getPriority());

out.writeInt((msg.getHeader().getAttachment().size()));

String key = null;

byte[] keyArray = null;

Object value = null;

for(Map.Entry<String, Object> param:msg.getHeader().getAttachment().entrySet()){

key = param.getKey();

keyArray = key.getBytes("UTF-8");

out.writeInt(keyArray.length);

out.writeBytes(keyArray);

value = param.getValue();

marshallingEncoder.encode(value, out);

}

key = null;

keyArray = null;

value = null;

if (msg.getBody() != null) {

marshallingEncoder.encode(msg.getBody(), out);

} else{

out.writeInt(0);

}

out.setInt(4, out.readableBytes() - 8);

}

}

其中消息体的编码采用Jboss的Marshalling来编码的,这里不再说明,具体的实现请参见Github源码,地址:https://github.com/wz12406/netty-demo里面有书中的源码,也有我比葫芦画瓢写的代码,不过抄一遍代码也让自己对netty有了进一步的认识。

Netty消息编码工具类:MarshallingEncoder

package cn.yesway.demo.privateprotocol.codec;

import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandler.Sharable;

import java.io.IOException;

import org.jboss.marshalling.Marshaller;

@Sharable

public class MarshallingEncoder {

private static final byte[] LENGTH_PLACEHOLDER = new byte[4];

Marshaller marshaller;

public MarshallingEncoder() throws IOException {

marshaller = MarshallingCodecFactory.buildMarshalling();

}

protected void encode(Object msg, ByteBuf out) throws Exception {

try {

int lengthPos = out.writerIndex();

out.writeBytes(LENGTH_PLACEHOLDER);

ChannelBufferByteOutput output = new ChannelBufferByteOutput(out);

marshaller.start(output);

marshaller.writeObject(msg);

marshaller.finish();

out.setInt(lengthPos, out.writerIndex() - lengthPos - 4);

} finally {

marshaller.close();

}

}

}

Netty消息解码工具类:MarshallingDecoder

package cn.yesway.demo.privateprotocol.codec;

import io.netty.buffer.ByteBuf;

import java.io.IOException;

import java.io.StreamCorruptedException;

import org.jboss.marshalling.ByteInput;

import org.jboss.marshalling.Unmarshaller;

public class MarshallingDecoder {

private final Unmarshaller unmarshaller;

public MarshallingDecoder() throws IOException {

unmarshaller = MarshallingCodecFactory.buildUnMarshalling();

}

protected Object decode(ByteBuf in) throws Exception {

int objectSize = in.readInt();

ByteBuf buf = in.slice(in.readerIndex(), objectSize);

ByteInput input = new ChannelBufferByteInput(buf);

try {

unmarshaller.start(input);

Object obj = unmarshaller.readObject();

unmarshaller.finish();

in.readerIndex(in.readerIndex() + objectSize);

return obj;

} finally {

unmarshaller.close();

}

}

}

Netty消息解码类:NettyMessageDecoder:

package cn.yesway.demo.privateprotocol.codec;

import java.io.IOException;

import java.util.HashMap;

import java.util.Map;

import cn.yesway.demo.privateprotocol.model.Header;

import cn.yesway.demo.privateprotocol.model.NettyMessage;

import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandlerContext;

import io.netty.handler.codec.LengthFieldBasedFrameDecoder;

public class NettyMessageDecoder extends LengthFieldBasedFrameDecoder {

MarshallingDecoder marshallingDecoder ;

public NettyMessageDecoder(int maxFrameLength, int lengthFieldOffset,

int lengthFieldLength) throws IOException {

super(maxFrameLength, lengthFieldOffset, lengthFieldLength);

this.marshallingDecoder = new MarshallingDecoder();

}

@Override

protected Object decode(ChannelHandlerContext ctx, ByteBuf in)

throws Exception {

ByteBuf frame = (ByteBuf) super.decode(ctx, in);

if(frame == null)return null;

NettyMessage message = new NettyMessage();

Header header = new Header();

header.setCrcCode(frame.readInt());

header.setLength(frame.readInt());

header.setSessionID(frame.readLong());

header.setType(frame.readByte());

header.setPriority(frame.readByte());

int size = frame.readInt();

if (size > 0) {

Map<String, Object> attch = new HashMap<String, Object>(size);

int keySize = 0;

byte[] keyArray = null;

String key = null;

for (int i = 0; i < size; i++) {

keySize = frame.readInt();

keyArray = new byte[keySize];

frame.readBytes(keyArray);

key = new String(keyArray, "UTF-8");

attch.put(key, marshallingDecoder.decode(frame));

}

keyArray = null;

key = null;

header.setAttachment(attch);

}

if (frame.readableBytes() > 4) {

message.setBody(marshallingDecoder.decode(frame));

}

message.setHeader(header);

return message;

}

}

在这里我们用到了Netty的LengthFieldBasedFrameDecoder解码器,它支持自动的Tcp粘包和半包处理,只需要给出标识消息长度的字段偏移量和消息长度自身所占的字节数,Netty就能自动实现对半包的处理。对于业务解码器来说,调用父类LengthFieldBasedFrameDecoder的解码方法后,返回的就是整包消息或者为空,如果为空说明是个半包,之间返回继续由I/O线程读取后续的码流。

8.3 握手和安全认证

握手的发起是在客户端和服务器端TCP链路建立成功通道激活时,握手消息的接入和安全认证在服务端的处理。下面看下具体实现。

首先开发一个握手认证的客户端ChannelHandle,用于在通道激活时发起握手请求,具体代码实现如下。

package cn.yesway.demo.privateprotocol.client;

import cn.yesway.demo.privateprotocol.MessageType;

import cn.yesway.demo.privateprotocol.model.Header;

import cn.yesway.demo.privateprotocol.model.NettyMessage;

import io.netty.channel.ChannelHandlerAdapter;

import io.netty.channel.ChannelHandlerContext;

public class LoginAuthReqHandler extends ChannelHandlerAdapter{

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception {

ctx.writeAndFlush(buildLoginReq());

}

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg)

throws Exception {

NettyMessage message = (NettyMessage) msg;

System.out.println("--------------------------------");

//如果是握手应答消息,需要判断是否认证成功

if(message.getHeader()!=null&&message.getHeader().getType()==MessageType.LOGIN_RESP.value()){

byte loginResult = (byte) message.getBody();

if(loginResult!=(byte)0){

ctx.close();

}else{

System.out.println("Login is OK:"+message);

ctx.fireChannelRead(message);

}

}else{

ctx.fireChannelRead(message);

}

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {

System.out.println("--------------------------");

ctx.fireExceptionCaught(cause);

}

private NettyMessage buildLoginReq() {

NettyMessage message = new NettyMessage();

Header header = new Header();

header.setType(MessageType.LOGIN_REQ.value());

message.setHeader(header);

return message;

}

}

接着看服务端的握手接入和安全认证代码。

package cn.yesway.demo.privateprotocol.server;

import io.netty.channel.ChannelHandlerAdapter;

import io.netty.channel.ChannelHandlerContext;

import java.net.InetSocketAddress;

import java.util.Map;

import java.util.concurrent.ConcurrentHashMap;

import cn.yesway.demo.privateprotocol.MessageType;

import cn.yesway.demo.privateprotocol.model.Header;

import cn.yesway.demo.privateprotocol.model.NettyMessage;

public class LoginAuthRespHandler extends ChannelHandlerAdapter{

private Map<String,Boolean> nodeCheck = new ConcurrentHashMap<String,Boolean>();

private String[] whiteList = {"127.0.0.1","10.1.2.95"};

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg)

throws Exception {

NettyMessage message = (NettyMessage) msg;

if(message.getHeader()!=null&&message.getHeader().getType()==MessageType.LOGIN_REQ.value()){

String nodeIndex = ctx.channel().remoteAddress().toString();

NettyMessage loginResp = null;

//重复登录,拒绝

if(nodeCheck.containsKey(nodeIndex)){

loginResp = buildResponse((byte)-1);

}else{

InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();

String ip = address.getAddress().getHostAddress();

boolean isOK = false;

for(String WIP:whiteList){

if(WIP.equals(ip)){

isOK = true;

}

}

loginResp = isOK?buildResponse((byte)0):buildResponse((byte)-1);

if(isOK)nodeCheck.put(ip, true);

System.out.println("The login response is :"+loginResp +" body["+loginResp.getBody()+"]");

ctx.writeAndFlush(loginResp);

}

}else{

ctx.fireChannelRead(msg);

}

}

private NettyMessage buildResponse(byte b) {

NettyMessage message = new NettyMessage();

Header header = new Header();

header.setType(MessageType.LOGIN_RESP.value());

message.setHeader(header);

message.setBody(b);

return message;

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)

throws Exception {

nodeCheck.remove(ctx.channel().remoteAddress().toString());

ctx.close();

ctx.fireExceptionCaught(cause);

}

}

8.4 心跳检测机制

握手成功之后,由客户端主动发送心跳消息,服务端接收到心跳消息之后,返回心跳应答消息。由于心跳消息的目的是为了检测链路的可用性,因此不需要携带消息体。

客户端发送心跳请求的代码如下:

package cn.yesway.demo.book.protocol.netty.client;

import io.netty.channel.ChannelHandlerAdapter;

import io.netty.channel.ChannelHandlerContext;

import java.util.concurrent.ScheduledFuture;

import java.util.concurrent.TimeUnit;

import cn.yesway.demo.book.protocol.netty.MessageType;

import cn.yesway.demo.book.protocol.netty.struct.Header;

import cn.yesway.demo.book.protocol.netty.struct.NettyMessage;

/**

* @author Lilinfeng

* @date 2014年3月15日

* @version 1.0

*/

public class HeartBeatReqHandler extends ChannelHandlerAdapter {

private volatile ScheduledFuture<?> heartBeat;

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg)

throws Exception {

NettyMessage message = (NettyMessage) msg;

// 握手成功,主动发送心跳消息

if (message.getHeader() != null

&& message.getHeader().getType() == MessageType.LOGIN_RESP

.value()) {

heartBeat = ctx.executor().scheduleAtFixedRate(

new HeartBeatReqHandler.HeartBeatTask(ctx), 0, 5000,

TimeUnit.MILLISECONDS);

} else if (message.getHeader() != null

&& message.getHeader().getType() == MessageType.HEARTBEAT_RESP

.value()) {

System.out

.println("Client receive server heart beat message : ---> "

+ message);

} else

ctx.fireChannelRead(msg);

}

private class HeartBeatTask implements Runnable {

private final ChannelHandlerContext ctx;

public HeartBeatTask(final ChannelHandlerContext ctx) {

this.ctx = ctx;

}

@Override

public void run() {

NettyMessage heatBeat = buildHeatBeat();

System.out

.println("Client send heart beat messsage to server : ---> "

+ heatBeat);

ctx.writeAndFlush(heatBeat);

}

private NettyMessage buildHeatBeat() {

NettyMessage message = new NettyMessage();

Header header = new Header();

header.setType(MessageType.HEARTBEAT_REQ.value());

message.setHeader(header);

return message;

}

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)

throws Exception {

cause.printStackTrace();

if (heartBeat != null) {

heartBeat.cancel(true);

heartBeat = null;

}

ctx.fireExceptionCaught(cause);

}

}

服务端的心跳应答Handler代码如下:

package cn.yesway.demo.book.protocol.netty.server;

import io.netty.channel.ChannelHandlerAdapter;

import io.netty.channel.ChannelHandlerContext;

import cn.yesway.demo.book.protocol.netty.MessageType;

import cn.yesway.demo.book.protocol.netty.struct.Header;

import cn.yesway.demo.book.protocol.netty.struct.NettyMessage;

public class HeartBeatRespHandler extends ChannelHandlerAdapter {

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg)

throws Exception {

NettyMessage message = (NettyMessage) msg;

// 返回心跳应答消息

if (message.getHeader() != null

&& message.getHeader().getType() == MessageType.HEARTBEAT_REQ

.value()) {

System.out.println("Receive client heart beat message : ---> "

+ message);

NettyMessage heartBeat = buildHeatBeat();

System.out

.println("Send heart beat response message to client : ---> "

+ heartBeat);

ctx.writeAndFlush(heartBeat);

} else

ctx.fireChannelRead(msg);

}

private NettyMessage buildHeatBeat() {

NettyMessage message = new NettyMessage();

Header header = new Header();

header.setType(MessageType.HEARTBEAT_RESP.value());

message.setHeader(header);

return message;

}

}

服务端的心跳Handler非常简单,接收到心跳请求消息后,构造心跳应答消息返回,并打印接收和发送的心跳消息。

心跳超时的实现非常简单,直接利用Netty的ReadTimeoutHandler机制,当一定周期内(默认值50s)没有读取到对方任何消息时,需要主动关闭链路。如果是客户端,重新发起连接:如果是服务端,释放资源,清除客户端登录缓存信息,等到服务端重连。

8.5 断连重连

当客户端感知断连事件之后,释放资源,重新发起连接,具体实现代码如下:

首先监听网络断连事件,如果channel关闭,则执行后续的重连任务,通过Bootstarp重新发起连接,客户端挂在closeFuture上监听链路关闭信号,一旦关闭,则创建重连定时器,5s之后重新发起连接,直到重连成功。

服务器端感知到断连事件之后,需要情况缓存的登录认证注册消息,以保证后续客户端能够正常连接。

8.6 客户端代码

客户端主要用于初始化系统资源,根据配置信息发起连接,代码如下:

package cn.yesway.demo.privateprotocol.client;

import io.netty.bootstrap.Bootstrap;

import io.netty.channel.Channel;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelOption;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.nio.NioSocketChannel;

import io.netty.handler.timeout.ReadTimeoutHandler;

import java.net.InetSocketAddress;

import java.util.concurrent.Executors;

import java.util.concurrent.ScheduledExecutorService;

import java.util.concurrent.TimeUnit;

import cn.yesway.demo.privateprotocol.NettyConstant;

import cn.yesway.demo.privateprotocol.codec.NettyMessageDecoder;

import cn.yesway.demo.privateprotocol.codec.NettyMessageEncoder;

/**

* @author wangzhen

* @version 1.0

* @createDate:2015年12月16日 下午4:14:47

*

*/

public class NettyClient {

private ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);

EventLoopGroup group = new NioEventLoopGroup();

public static void main(String[] args) {

new NettyClient().connect(NettyConstant.PORT,NettyConstant.REMOTEIP);

}

private void connect(int port, String host)  {

try {

Bootstrap b =new Bootstrap();

b.group(group).channel(NioSocketChannel.class)

.option(ChannelOption.TCP_NODELAY, true)

.handler(new ChannelInitializer<Channel>() {

@Override

protected void initChannel(Channel ch) throws Exception {

ch.pipeline().addLast(new NettyMessageDecoder(1024*1024, 4, 4));

ch.pipeline().addLast("MessageEncoder",new NettyMessageEncoder());

ch.pipeline().addLast("readTimeoutHandler",new ReadTimeoutHandler(50));

ch.pipeline().addLast("LoginAuthHandler",new LoginAuthReqHandler());

ch.pipeline().addLast("HeartBeatHandler",new HeartBeatReqHandler());

}

});

//发起异步连接操作

ChannelFuture future = b.connect(new InetSocketAddress(host, port),

new InetSocketAddress(NettyConstant.LOCALIP,NettyConstant.LOCAL_PORT)).sync();

future.channel().closeFuture().sync();

} catch (InterruptedException e) {

e.printStackTrace();

}finally{

executorService.execute(new Runnable() {

@Override

public void run() {

try {

TimeUnit.SECONDS.sleep(5);

connect(NettyConstant.PORT, NettyConstant.REMOTEIP);

} catch (Exception e) {

e.printStackTrace();

}

}

});

}

}

}

利用Netty的ChannelPipeline和ChannelHandler机制,可以非常方便的实现功能的解耦和业务产品的定制。例如本例中的心跳定时器、握手请求和后端的业务处理可以通过不同的Handler来实现,类似于Aop。通过Handler chain的机制可以方便的实现切面拦截和定制,相比于Aop它的性能更高。

8.7 服务器端代码

相对于客户端,服务器端的代码更简单一些,主要的工作就是握手的接入认证等,不用关心断连重连等事件。

服务端的代码如下:

package cn.yesway.demo.privateprotocol.server;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.Channel;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelOption;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.nio.NioServerSocketChannel;

import io.netty.handler.logging.LogLevel;

import io.netty.handler.logging.LoggingHandler;

import io.netty.handler.timeout.ReadTimeoutHandler;

import cn.yesway.demo.privateprotocol.NettyConstant;

import cn.yesway.demo.privateprotocol.codec.NettyMessageDecoder;

import cn.yesway.demo.privateprotocol.codec.NettyMessageEncoder;

/**

* @author wangzhen

* @version 1.0

* @createDate:2015年12月16日 下午4:37:42

*

*/

public class NettyServer {

public static void main(String[] args) throws Exception{

new NettyServer().bind();

}

private void bind() throws Exception{

EventLoopGroup bossGroup = new NioEventLoopGroup();

EventLoopGroup workerGroup = new NioEventLoopGroup();

ServerBootstrap b = new ServerBootstrap();

b.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)

.option(ChannelOption.SO_BACKLOG, 100)

.handler(new LoggingHandler(LogLevel.INFO))

.childHandler(new ChannelInitializer<Channel>() {

@Override

protected void initChannel(Channel ch) throws Exception {

ch.pipeline().addLast(new NettyMessageDecoder(1024*1024,4,4));

ch.pipeline().addLast("MessageEncoder",new NettyMessageEncoder());

ch.pipeline().addLast("readTimeoutHandler",new ReadTimeoutHandler(50));

ch.pipeline().addLast(new LoginAuthRespHandler());

ch.pipeline().addLast("HeartBeatHandler",new HeartBeatRespHandler());

}

});

//绑定端口,同步等待成功

b.bind(NettyConstant.REMOTEIP,NettyConstant.PORT).sync();

System.out.println("netty server start ok:"+(NettyConstant.REMOTEIP+":"+NettyConstant.PORT));

}

}

代码托管地址:https://github.com/wz12406/netty-demo

上一篇:CentOS 5.4 制作 Python 2.6 RPM 包的方法


下一篇:php中的字符串常用函数(一) strpos() 子字符首次出现的位置