

支持java serial 与 AMF3的混合协议,目前没有基于xml 与 json的实现。


* 9个字节协议头+协议体.
* 协议头1-4字节表示协议长度 =协议体长度+9-4(去掉长度占的4字节)
* 协议头第5字节为标志字节:
* 该字节的最低位为压缩位:0=协议体未压缩 1=协议体已经压缩,
* 该字节的低2-4位为协议位:
* 000=基于AMF3的协议,
* 001=基于java serial协议
* 010=基于protobuf协议
* 5-8位未用,作为以后扩展
* ------------------------
* 6-9字节表示命令号
* 采用网络字节序的整数(高位在前,低位在后)

1-4协议长度                                6-9cmd











数据(AMF3或者java serial)








package tutorial;

option java_package = "com.youxigu.dynasty2.chat.proto";
option java_outer_classname = "ChatMsg"; //异常请求
message BadRequest{
required int32 cmd = 1;
required int32 errCode = 2;
required string err = 3;
} //同步请求标志
message SyncStat{
required int64 id = 1;
required int32 stat = 2[default=1];
} //10009 聊天请求
message ChatActionReceive{
required int32 cmd = 1;
required int32 errCode = 2;
required string err = 3;
optional int64 toUserId = 4;
optional string toUserName = 5;
required string channel = 6;
required string context = 7;
optional string channelId = 8;
} //10010 聊天响应
message ChatActionReceiveReturn{
required int32 cmd = 1;
required int32 errCode = 2;
required string err = 3;
required int32 requestCmd = 4;


cd E:\heart\workspace\dynasty2_mobile_new_v0.1\proto
protoc.exe --java_out=./ ChatMsg.proto


package com.youxigu.wolf.net.codec;

import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry; import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolEncoder; import com.google.protobuf.Message; /**
* 9个字节协议头+协议体.
* 协议头1-4字节表示协议长度 =协议体长度+9-4(去掉长度占的4字节)
* 协议头第5字节为标志字节:
* 该字节的最低位为压缩位:0=协议体未压缩 1=协议体已经压缩,
* 该字节的低2-4位为协议位:
* 000=基于AMF3的协议,
* 001=基于java serial协议
* 010=基于protobuf协议
* 5-8位未用,作为以后扩展
* ------------------------
* 6-9字节表示命令号
* 采用网络字节序的整数(高位在前,低位在后)
* @author Administrator
public class NewMutilCodecFactory implements ProtocolCodecFactory { private NewMutliEncoder encoder;
private NewMutilDecoder decoder; //双向cache
private Map<Integer, Class<? extends Message>> messages;//这个要在配置文件中初始化protobuf pojo和命令号的对应关系
private Map<Class<? extends Message>, Integer> commands = new HashMap<Class<? extends Message>, Integer>(); private Map<Integer, Method> methods = new HashMap<Integer, Method>(); //是否将protobuf转换成Map
private boolean toMap = true; public void setMessages(Map<Integer, Class<? extends Message>> messages) throws NoSuchMethodException,
SecurityException {
this.messages = messages;
if (messages != null) {
Iterator<Entry<Integer, Class<? extends Message>>> lit = messages.entrySet().iterator();
while (lit.hasNext()) {
Entry<Integer, Class<? extends Message>> entry = lit.next();
Class<Message> msgClass = (Class<Message>) entry.getValue();
commands.put(msgClass, entry.getKey());
Class[] paramsTypes = { byte[].class };
Method method = msgClass.getMethod("parseFrom", paramsTypes);
this.methods.put(entry.getKey(), method); }
} public NewMutilCodecFactory() {
encoder = new NewMutliEncoder(this);
decoder = new NewMutilDecoder(this);
} public void setCompressLen(int compressLen) {
} public Method getMessageDecodeMethod(int cmd) { return methods.get(cmd);
} public Class<? extends Message> getMessageClass(int cmd) { return messages.get(cmd);
} public int getMessageCommand(Class msgClass) {
if (commands.containsKey(msgClass)) {
return commands.get(msgClass);
} else {
return -1;
} @Override
public ProtocolDecoder getDecoder(IoSession session) throws Exception {
// TODO Auto-generated method stub
return decoder;
} @Override
public ProtocolEncoder getEncoder(IoSession session) throws Exception {
// TODO Auto-generated method stub
return encoder;
} }


<property name="codecFactory">
<bean class="com.youxigu.wolf.net.codec.NewMutilCodecFactory">
<property name="compressLen" value="${mina.encoder.compressLen:5120}"/> <property name="messages">
<entry key="10009">


package com.youxigu.wolf.net.codec;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.zip.GZIPInputStream;
import java.util.zip.InflaterInputStream; import org.apache.mina.core.buffer.BufferDataException;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.AttributeKey;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import com.google.protobuf.Message;
import com.youxigu.wolf.net.NodeSessionMgr; import flex.messaging.io.SerializationContext;
import flex.messaging.io.amf.Amf3Input; public class NewMutilDecoder extends CumulativeProtocolDecoder {
private static Logger log = LoggerFactory.getLogger(NewMutilDecoder.class); // FLASH安全沙箱标志
private final AttributeKey POLICY = new AttributeKey(this.getClass(), "policy");
// 腾讯TGW协议头
private final AttributeKey TGWHEAD = new AttributeKey(this.getClass(), "TGW");
private final String securityReq = "<policy-file-request/>";
private final String security = "<cross-domain-policy><site-control permitted-cross-domain-policies=\"all\"/><allow-access-from domain=\"*\" to-ports=\"*\" /></cross-domain-policy>\0"; private static SerializationContext context = new SerializationContext(); protected static Field bsBufField = null;
static {
context.createASObjectForMissingType = true;
try {
bsBufField = ByteArrayOutputStream.class.getDeclaredField("buf");
} catch (Exception e) {
} private NewMutilCodecFactory factory; public NewMutilDecoder(NewMutilCodecFactory factory) {
this.factory = factory;
} /**
* decoder最大长度
private int maxDecodeLen = 5 * 1024 * 1024; public void setMaxDecodeLen(int maxDecodeLen) {
this.maxDecodeLen = maxDecodeLen;
} // static{
// context.legacyMap=true;
// }
protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
// long time = System.currentTimeMillis();
if (this.isSecurityRequest(session, in)) {
// 安全策略请求
// ASObject asObject = new ASObject();
// asObject.put("policy", securityReq);
// out.write(asObject);
} else {
// 去掉TGW包头
skipTGWRequest(session, in);
try {
while (in.remaining() > 0) {// /这层循环实际上不需要,CumulativeProtocolDecoder已经处理了
boolean dataAvai = false;
if (session.containsAttribute(NodeSessionMgr.SERVER_TYPE_KEY)) {
dataAvai = in.prefixedDataAvailable(4, Integer.MAX_VALUE);
} else {
dataAvai = in.prefixedDataAvailable(4, maxDecodeLen);
if (dataAvai) {
// 正常Encoder中写入的包头制定长度数据
int len = in.getInt(); byte flag = in.get();// 标志位 // 是否压缩
boolean compressed = ((flag & 0x1) == MutliEncoderNew.BIT_COMPRESSED); // //先把需要的字节数读到数组中,防止decode出错后有剩余的字节保留在IoBuffer,使下一个请求解析不了
byte bytes[] = null;
if ((flag & 0xE) == NewMutliEncoder.BIT_JAVA) {
bytes = new byte[len - 1];
in.get(bytes, 0, len - 1);
javaDecode(out, bytes, compressed);
} else if ((flag & 0xE) == NewMutliEncoder.BIT_PROTOBUF) {
int command = in.getInt();// 包的命令字
bytes = new byte[len - 5];
in.get(bytes, 0, len - 5);
protobufDecode(out, bytes, command);
} else {
bytes = new byte[len - 1];
in.get(bytes, 0, len - 1);
amf3Decode(out, bytes, compressed);
} else {
// 包长度不正确,等待后续包
if (log.isDebugEnabled()) {
// System.out.println("length is error");
return false;
} }
} catch (BufferDataException e) {
log.error("解码数据长度不在限制范围内,丢弃并关闭session.{}", session);
throw e;
} catch (Exception e) {
throw e;
return true; } public void amf3Decode(ProtocolDecoderOutput out, byte[] bytes, boolean compressed) throws Exception { int len = bytes.length;
ByteArrayInputStream bios = new ByteArrayInputStream(bytes, 0, len); Amf3Input amf3in = null;
try {
amf3in = new Amf3Input(context);
if (compressed) {
amf3in.setInputStream(new GZIPInputStream(bios));
} else {
Object message = amf3in.readObject();
if (message != null) {
} } finally {
if (amf3in != null) {
} } public void javaDecode(ProtocolDecoderOutput out, byte[] bytes, boolean compressed) throws Exception {
int len = bytes.length; ByteArrayInputStream bios = new ByteArrayInputStream(bytes, 0, len); ObjectInputStream ois = null; try {
if (compressed) {
ois = new ObjectInputStream(new InflaterInputStream(bios));
} else {
ois = new ObjectInputStream(bios);
Object message = ois.readObject();
if (message != null) {
} } finally {
if (ois != null) {
} // in.close(); } public void protobufDecode(ProtocolDecoderOutput out, byte[] bytes, int command) throws Exception {
if (command < 0 || command > Integer.MAX_VALUE) {
throw new RuntimeException("command:" + command + "没有对应的实现类");
} Class<? extends Message> cls = factory.getMessageClass(command);
Method m = factory.getMessageDecodeMethod(command);
if (cls == null || m == null) {
throw new RuntimeException("command:" + command + "没有对应的实现类");
} Message obj = (Message) m.invoke(null, bytes);// static
// method
if (obj != null) {
} private boolean isSecurityRequest(IoSession session, IoBuffer in) {
Boolean policy = (Boolean) session.getAttribute(POLICY);
if (policy != null) {
return false;
int len = in.limit();
if (len < 23) {
// 不够安全沙箱的长度,不设置
return false;
int start = in.position();
byte[] bytes = new byte[len];
in.get(bytes);// 从IoBuffer中获取数据并放入bytes中
String request = null;
try {
request = new String(bytes, "UTF-8");
} catch (UnsupportedEncodingException e) {
request = null;
} boolean result = false;
if (request != null) {
result = request.startsWith(securityReq);
session.setAttribute(POLICY, new Boolean(result));
if (!result) {
return result;
} /**
* 腾讯TGW协议 :TGW包头格式为如下:tgw_l7_forward\r\nHost:
* app12345-54321.qzoneapp.com:8002\r\n\r\n
* @param session
* @param in
* @return
private void skipTGWRequest(IoSession session, IoBuffer in) {
Boolean tgw = (Boolean) session.getAttribute(TGWHEAD);
if (tgw != null) {
int start = in.position();
int len = in.limit(); boolean result = false;
// byte[] bytes = new byte[len]; int pos = 0;
if (len > 25) {
// tgw开始
if (in.get() == 't' && in.get() == 'g' && in.get() == 'w') {
while (pos < len - 4) {
if (in.get() == '\r') {
if (in.get() == '\n') {
if (in.get() == '\r') {
if (in.get() == '\n') {
result = true;
session.setAttribute(TGWHEAD, new Boolean(result));
if (!result) {
} }


package com.youxigu.wolf.net.codec;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.lang.reflect.Field;
import java.util.Map;
import java.util.zip.Deflater;
import java.util.zip.GZIPOutputStream; import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolEncoderAdapter;
import org.apache.mina.filter.codec.ProtocolEncoderOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import flex.messaging.io.SerializationContext;
import flex.messaging.io.amf.Amf3Output; /**
* 9个字节协议头+协议体.
* 协议头1-4字节表示协议长度 =协议体长度+9-4(去掉长度占的4字节)
* 协议头第5字节为标志字节:
* 该字节的最低位为压缩位:0=协议体未压缩 1=协议体已经压缩,
* 该字节的低2-4位为协议位:
* 000=基于AMF3的协议,
* 001=基于java serial协议
* 010=基于protobuf协议
* 5-8位未用,作为以后扩展
* ------------------------
* 6-9字节表示命令号
* 采用网络字节序的整数(高位在前,低位在后)
* @author Administrator
public class NewMutliEncoder extends ProtocolEncoderAdapter {
private static Logger log = LoggerFactory.getLogger(MutliEncoderNew.class);
public static final byte BIT_COMPRESSED = 0x01;
public static final byte BIT_UNCOMPRESSED = 0x00; public static final int HEAD_LEN = 5;
public static final int PROTOBUF_HEAD_LEN = 9; public static final byte BIT_AMF3 = 0x00;
public static final byte BIT_JAVA = 0x02;
public static final byte BIT_PROTOBUF = 0x04; private static SerializationContext context = new SerializationContext();
protected static Field bsBufField = null;
static {
context.legacyCollection = true;
context.legacyMap = false; try {
bsBufField = ByteArrayOutputStream.class.getDeclaredField("buf");
} catch (Exception e) {
} private NewMutilCodecFactory factory; public NewMutliEncoder(NewMutilCodecFactory factory) {
this.factory = factory;
} private static int cachesize = 1024;
* 字节数》compressLen压缩,否则不压缩
private int compressLen = 5120;// 5120;// 5K Integer.MAX_VALUE;// ; private int maxSendLen = 0;// 统计的最大消息长度 // private final AttributeKey DEFLATER = new AttributeKey(getClass(),
// "deflater"); public void setCompressLen(int compressLen) {
this.compressLen = compressLen;
} @Override
public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
// long time = System.currentTimeMillis();
if (message instanceof String) {// flash 安全沙箱或者TGW头不encode,
String str = (String) message; IoBuffer buffer = null;
if (str.startsWith("tgw_")) {
byte[] bytes = str.getBytes("GBK");
buffer = IoBuffer.allocate(bytes.length);
} else {
byte[] bytes = str.getBytes("UTF-8");
buffer = IoBuffer.allocate(bytes.length + 1);
buffer.put((byte) 0x0);
} else {
byte flag = BIT_AMF3;// 标志位
boolean isProtoBuf = message instanceof com.google.protobuf.Message; // 这里使用的buffer都是非direct的,不用free;
// TODO:这里默认应该分配多大呢?
IoBuffer buffer = null;
int head = 0;
if (isProtoBuf) {
} else {
head = HEAD_LEN;
buffer = IoBuffer.allocate(256, false);
// 留出协议头位置
} int command = 0;
if (message instanceof Map || message instanceof IAMF3Message) {
getAmf3Bytes(buffer, message);
} else if (isProtoBuf) {
command = factory.getMessageCommand(message.getClass());
byte[] bytes = ((com.google.protobuf.Message) message).toByteArray();
buffer = IoBuffer.allocate(bytes.length + head, false);
// 留出协议头位置
} else {
flag = BIT_JAVA;
getJavaBytes(buffer, message);
} // TODO:目前没有处理加密
// 协议体字节数
int position = buffer.position();
int byteLen = position - head;
// System.out.println("=====len"+byteLen);
if (log.isInfoEnabled()) {// 由于多线程的原因,这里显示的不完全正确
if (byteLen > maxSendLen) {
log.info("当前发出的最大消息长度:{}byte", byteLen);
maxSendLen = byteLen;
// 四字节长度位,+一字节标志位(第一位为压缩标志位,第二位为加密标志位)
// 压缩
if (!isProtoBuf && byteLen > compressLen) {
// long time = System.currentTimeMillis();
int oldLen = byteLen;
flag = (byte) (flag | BIT_COMPRESSED);
byte[] inputs = buffer.buf().array();
// // TODO:这里分配多大合适呢?
// buffer = IoBuffer.allocate(3072, false);
// buffer.setAutoExpand(true);
if ((flag & 0xE) == MutliEncoderNew.BIT_JAVA) {
compressJava(session, inputs, HEAD_LEN, byteLen, buffer);
} else {
compress(session, inputs, HEAD_LEN, byteLen, buffer);
position = buffer.position();
byteLen = position - HEAD_LEN; if (log.isDebugEnabled()) {
log.debug("消息体太长,被压缩,from {} bytes to {} bytes.", oldLen, byteLen);
// System.out.println("压缩时间:"
// + (System.currentTimeMillis() - time));
} buffer.position(0);
// 加入长度字段,长度=数据长度+标志位长度+(ptotobuf cmd)
buffer.putInt(byteLen + head - 4);
if (isProtoBuf) {
buffer.position(position); buffer.flip(); out.write(buffer); } // System.out.println("time="+(System.currentTimeMillis()-time)); } protected void getAmf3Bytes(IoBuffer buffer, Object message) throws Exception {
Amf3Output amf3out = null;
try {
amf3out = new Amf3Output(context);
amf3out.setOutputStream(new IoBufferOutputStream(buffer));
} finally {
if (amf3out != null) {
} protected void getJavaBytes(IoBuffer buffer, Object message) throws Exception {
ObjectOutputStream oos = null;
try {
oos = new ObjectOutputStream(new IoBufferOutputStream(buffer));
} finally {
if (oos != null) {
} private void compress(IoSession session, byte[] inputs, int offset, int byteLen, IoBuffer buffer) { GZIPOutputStream os = null;
ByteArrayOutputStream bs = null;
try {
bs = new ByteArrayOutputStream(byteLen / 2);
os = new GZIPOutputStream(bs, cachesize);
os.write(inputs, offset, byteLen); os.flush();
byte[] buf = (byte[]) bsBufField.get(bs);
int bufLen = bs.size();
buffer.put(buf, 0, bufLen);
} catch (IOException e) {
throw new RuntimeException("IO exception compress data");
} catch (IllegalArgumentException e) {
throw new RuntimeException("IllegalArgumentException compress data");
} catch (IllegalAccessException e) {
throw new RuntimeException("IllegalAccessException compress data");
} finally {
try {
if (os != null) {
} catch (Exception e) {
} private void compressJava(IoSession session, byte[] inputs, int offset, int byteLen, IoBuffer buffer) { Deflater deflater = null;
try {
deflater = new Deflater(); byte[] buf = new byte[cachesize]; for (int i = 0; i < byteLen; i += cachesize) {
deflater.setInput(inputs, offset + i, Math.min(cachesize, byteLen - i));
while (!deflater.needsInput()) {
int len = deflater.deflate(buf, 0, cachesize);
if (len > 0) {
buffer.put(buf, 0, len);
if (!deflater.finished()) {
while (!deflater.finished()) {
int len = deflater.deflate(buf, 0, cachesize);
if (len > 0) {
buffer.put(buf, 0, len);
} } finally {
} } }


这里解码对象都是protobuf 中Message接口的实现类的子类,所以我们只需要用反射取出message中cmd,在用cmd找到请求的处理方法,调用就可以了



