因为最近要从公司离职,害怕用nio写的网络程序没有人能看懂(或许是因为写的不好吧),就调整成了mina(这样大家接触起来非常方便,即使没有socket基础,用起来也不难),所以之前基于nio写的网络程序就开放出来好了!
写的比较挫,大家见谅!
首先是PollServer类,主要处理select,做网络事件的监听和基于FutureTask的数据发送,代码如下:
package gs.gate;
import gs.gate.handle.ClientHandle;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.Vector;
import java.util.concurrent.FutureTask;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import org.apache.log4j.Logger;
import door.HeartTimer;
public class PollServer implements Runnable
{
private Logger log = Logger.getLogger(getClass());
private Selector select = null;
private ServerSocketChannel serverSocketChannel = null;
private HeartTimer writerExpire = null;
private volatile boolean run = true;
private List<FutureTask<Integer>> writeTasks =
new Vector<FutureTask<Integer>>();
public PollServer(String host,int port) throws IOException
{
select = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().setReuseAddress(true);
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(host, port));
serverSocketChannel.register(select, SelectionKey.OP_ACCEPT);
}
public Selector getSelector()
{
return this.select;
}
public void stop()
{
this.run = false;
}
public void run()
{
if(writerExpire == null)
{
writerExpire = new HeartTimer(50);
}
while(this.run)
{
try
{
this.listen();
}
catch (Exception e)
{
log.info("PollServer listen() err! " + e.toString());
}
try
{
List<FutureTask<Integer>> writeTasks_ = null;
synchronized (writeTasks)
{
writeTasks_ = new ArrayList<FutureTask<Integer>>(this.writeTasks);
this.writeTasks.clear();
}
for(FutureTask<Integer> task : writeTasks_)
{
task.run();
}
}
catch (Exception e)
{
log.error("PollServer processOutput() err" ,e);
}
}
}
public void listen() throws IOException
{
select.select(10);
Set<SelectionKey> readyKeys = select.selectedKeys();
Iterator<SelectionKey> itr = readyKeys.iterator();
//处理接受
while(itr.hasNext())
{
SelectionKey key = itr.next();
itr.remove();
if(key.isAcceptable())
{
SocketChannel newConnection = serverSocketChannel.accept();
this.addClient(newConnection);
}
else if(key.isReadable())
{
ClientHandle handle = (ClientHandle)key.attachment();
try
{
if(handle.handleRead() <= 0)
{
log.info("if handleRead < 0");
this.removeClient(handle);
}
}
catch (Exception e)
{
log.error("exception",e);
this.removeClient(handle);
}
}
else if(key.isWritable())
{
ClientHandle handle = (ClientHandle)key.attachment();
try
{
handle.handleWrite();
if(handle.hasRemaining() == false)
{
key.cancel();
}
}
catch (Exception e)
{
this.removeClient(handle);
log.error("if handleWrite error",e);
}
}
}
}
public void addWriteTask(FutureTask<Integer> future)
{
this.writeTasks.add(future);
}
public void addClient(SocketChannel socket)
{
ClientHandle handle = new ClientHandle(socket,this);
try
{
socket.socket().setTcpNoDelay(run);
socket.configureBlocking(false);
socket.register(select, SelectionKey.OP_READ,handle);
}
catch (Exception e)
{
try
{
log.error("create client err",e);
socket.close();
}
catch (Exception err)
{}
}
}
public void removeClient(ClientHandle handle)
{
if(handle == null)
{
return ;
}
log.info(" remove Client ");
handle.handleDisConnected();
}
}
主要函数: listen();作用:基于网络事件处理接受新链接和消息的接收!
主要函数: processOutput(); 作用: 做统一的发送处理,在这篇 浅谈游戏服务器的发送数据处理 中有讲解!每个连接在发送的时候,将数据和连接封装成FutureTask,然后投递到Pollserver中的安全队列中,在这里统一将安全队列中的任务执行完毕! 如果有数据没有发送完毕,就监听写时间,直到这个链接成为可写事件(即:写缓冲区中有空闲)。
下面是ClientHandle类的代码,做每个连接的处理,比如拆包分包,代码如下
package gs.gate.handle;
import gs.gate.PollServer;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.concurrent.FutureTask;
import org.apache.log4j.Logger;
import dc.control.DCThread;
import dc.util.DcTask;
import door.IPlayer;
import senv.server.ServerKit;
import slib.net.ISession;
import slib.util.ByteBuffer;
public class ClientHandle implements ISession
{
private Logger log = Logger.getLogger(getClass());
public final static int RW_BUFFER_SIZE = 1024;
private SocketChannel socket = null;
private java.nio.ByteBuffer reader = java.nio.ByteBuffer.allocate(8*RW_BUFFER_SIZE);
private java.nio.ByteBuffer writer = java.nio.ByteBuffer.allocate(10*RW_BUFFER_SIZE);
private volatile IPlayer player = null;
private PollServer poll = null;
private boolean active = false;
private boolean tgwProcessed = false;
private String tgw = "tgw_l7_forward\r\nHost:" + ServerKit.ip+ ":" + ServerKit.port + "\r\n\r\n";
public ClientHandle(SocketChannel socket,PollServer poll)
{
this.socket = socket;
this.writer.limit(this.writer.capacity());
this.active = true;
this.poll = poll;
try
{
this.socket.socket().setSendBufferSize(10*RW_BUFFER_SIZE);
this.socket.socket().setReceiveBufferSize(8*RW_BUFFER_SIZE);
this.socket.socket().setTcpNoDelay(true);
this.socket.socket().setSoLinger(true, 3600);
}
catch (Exception e)
{
log.error("err",e);
}
}
public SocketChannel getSocketChannel()
{
return this.socket;
}
public int handleWrite() throws IOException
{
if (!this.isActive())
{
return -1;
}
this.writer.flip();
this.socket.write(writer);
if(this.writer.hasRemaining())
{
this.writer.compact();
}
else
{
this.writer.clear();
}
return 0;
}
public boolean hasRemaining()
{
return this.writer.position() > 0;
}
public Integer writeData(ByteBuffer data) throws IOException
{
if (!this.isActive())
{
return -1;
}
if(data == null)
{
return 0;
}
this.writer.putInt(data.length());
this.writer.put(data.toByteArray(), 0, data.length());
this.writer.flip();
int result = this.socket.write(this.writer);
if(this.writer.hasRemaining())
{
this.writer.compact();
}
else
{
this.writer.clear();
}
return result;
}
@SuppressWarnings("deprecation")
public int handleRead() throws IOException
{
if(socket == null)
{
return -1;
}
if(!this.isActive())
{
return -1;
}
int r = this.socket.read(this.reader);
if(r <= 0)
{
return r;
}
if(this.tgwProcessed == false)
{
//腾讯平台 你mb
this.reader.flip();
byte bytes[] = new byte[tgw.length()];
this.reader.get(bytes);
String vali = new String(bytes,"UTF-8");
if(vali.equals(tgw))
{
log.info("tgw 校验成功");
}
else
{
log.info("tgw 校验失败");
}
this.tgwProcessed = true;
this.reader.compact();
}
while(true)
{
this.reader.flip();
ByteBuffer data = this.createBuffer();
if(data == null)
{
break;
}
this.reader.get(data.getByteArray(), data.top(), data.capacity());
this.processData(data);
if(this.reader.hasRemaining())
{
this.reader.compact();
}
else
{
this.reader.clear();
break;
}
}
return 1;
}
public void processData(ByteBuffer data)
{
if(player == null)
{
DcTask task = new DcTask();
task.object = this;
task.data = data;
DCThread.getInstance().insertTask(task);
}
else
{
player.insertData(data);
}
}
public void handleDisConnected()
{
if(!this.isActive())
{
return ;
}
if(player != null)
{
this.player.setSession(null);
this.player.logOut();
}
this.close();
this.player = null;
}
private ByteBuffer createBuffer()
{
if(reader.remaining() < 4)
{
return null;
}
int len = reader.getInt();
if(len > reader.remaining())
{
this.reader.rewind();
this.reader.compact();
return null;
}
if (len > 0 && len <= 10 * 1024)
{
return new ByteBuffer(len);
}
return null;
}
@Override
public void close()
{
setActive(false);
try
{
log.error("socket close !");
this.socket.close();
this.socket.keyFor(this.poll.getSelector()).cancel();
}
catch (Exception e)
{
log.error("err" , e);
}
}
@Override
public long getActiveTime()
{
return 0;
}
@Override
public InetAddress getAddress()
{
return null;
}
@Override
public String getCode()
{
return null;
}
@Override
public int getPing()
{
return 0;
}
@Override
public long getPingTime()
{
return 0;
}
@Override
public int getPort()
{
return 0;
}
@Override
public int getServerId()
{
return 0;
}
@Override
public int getSessionId()
{
return 0;
}
@Override
public Object getSource()
{
return this.player;
}
@Override
public int getTimeout()
{
return 0;
}
@Override
public boolean isActive()
{
return this.active;
}
@Override
public void send(ByteBuffer data)
{
WriteTask task = new WriteTask(this,data);
FutureTask<Integer> future = new FutureTask<Integer>(task);
poll.addWriteTask(future);
}
@Override
public void send(ByteBuffer arg0, ByteBuffer arg1)
{
}
@Override
public void setCode(String arg0)
{
}
@Override
public void setPing(int arg0)
{
}
@Override
public void setPingTime(long arg0)
{
}
public void enableWriteEvent()
{
try
{
this.socket.register(this.poll.getSelector(), SelectionKey.OP_WRITE, this);
}
catch (Exception e)
{
e.printStackTrace();
}
}
public void shutdownWriteEvent()
{
}
@Override
public void setSource(Object arg0)
{
this.player = (IPlayer) arg0;
}
protected void setActive(boolean b)
{
this.active = b;
}
}
重要的几个函数: send(ByteBuffer data) 将发送处理包装成FutureTask,投递到PollServer中进行处理,就是PollServer::processOutput中处理
handleRead() 这里处理接受数据事件,做了拆包,将二进制数据,按照 长度-内容 的格式进行解析,拆分成一个个ByteBuffer(定义见下文)包,然后进行处理。
ClientHandle继承自ISession接口,其实这个无所谓,大家可以自己定义。我这里因为要和之前的系统兼容,所以才继承了这个。这里一不小心居然用到了适配器模式,我以为这辈子只会用到创建者模式呢? 个人还是觉得,这些设计模式还是为了解决问题用到的,而不是为了多变的需求而想太多用到的;设计模式用得多用的频繁,反而增加代码的可读性!
最后看下WriteTask的封装
package gs.gate.handle;
import java.util.concurrent.Callable;
import slib.util.ByteBuffer;
public class WriteTask implements Callable<Integer>
{
private ClientHandle client = null;
private ByteBuffer data = null;
public WriteTask(ClientHandle handle,ByteBuffer data)
{
client = handle;
this.data = data;
}
@Override
public Integer call() throws Exception
{
return this.client.writeData(data);
}
}
这个就不多解析了!
缺陷1: 没有做空闲连接的处理,后来的mina库,提供了这个功能!有兴趣的同学自己写个吧!
缺陷2: 自定义的消息包,用了ByteBuffer类,和nio提供的ByteBuffer 重复!
给出自定义的ByteBuffer的处理:
package slib.util;
/**
* 类说明:字节缓存类,字节操作高位在前,低位在后
*
* @version 1.0
* @author fxxxysh <hanshuang@linekong.com>
*/
public class ByteBuffer
{
/* static fields */
/** 默认的初始容量大小 */
public static final int CAPACITY = 32;
/** 默认的动态数据或文字的最大长度,400k */
public static final int MAX_DATA_LENGTH = 400 * 1024;
/* fields */
/** 字节数组 */
byte[] bytes;
/** 字节缓存的长度 */
int top;
/** 字节缓存的偏移量 */
int offset;
/* constructors */
/** 按默认的大小构造一个字节缓存对象 */
public ByteBuffer()
{
this(CAPACITY);
}
/** 按指定的大小构造一个字节缓存对象 */
public ByteBuffer(int capacity)
{
if (capacity < 1)
throw new IllegalArgumentException(getClass().getName()
+ " <init>, invalid capatity:" + capacity);
bytes = new byte[capacity];
top = 0;
offset = 0;
}
/** 按指定的字节数组构造一个字节缓存对象 */
public ByteBuffer(byte[] data)
{
if (data == null)
throw new IllegalArgumentException(getClass().getName()
+ " <init>, null data");
bytes = data;
top = data.length;
offset = 0;
}
/** 按指定的字节数组构造一个字节缓存对象 */
public ByteBuffer(byte[] data, int index, int length)
{
if (data == null)
throw new IllegalArgumentException(getClass().getName()
+ " <init>, null data");
if (index < 0 || index > data.length)
throw new IllegalArgumentException(getClass().getName()
+ " <init>, invalid index:" + index);
if (length < 0 || data.length < index + length)
throw new IllegalArgumentException(getClass().getName()
+ " <init>, invalid length:" + length);
bytes = data;
top = index + length;
offset = index;
}
/* properties */
/** 得到字节缓存的容积 */
public int capacity()
{
return bytes.length;
}
/** 设置字节缓存的容积,只能扩大容积 */
public void setCapacity(int len)
{
int c = bytes.length;
if (len <= c)
return;
for (; c < len; c = (c << 1) + 1)
;
byte[] temp = new byte[c];
System.arraycopy(bytes, 0, temp, 0, top);
bytes = temp;
}
/** 得到字节缓存的长度 */
public int top()
{
return top;
}
/** 设置字节缓存的长度 */
public void setTop(int top)
{
if (top < offset)
throw new IllegalArgumentException(this + " setTop, invalid top:"
+ top);
if (top > bytes.length)
setCapacity(top);
this.top = top;
}
/** 得到字节缓存的偏移量 */
public int offset()
{
return offset;
}
/** 设置字节缓存的偏移量 */
public void setOffset(int offset)
{
if (offset < 0 || offset > top)
throw new IllegalArgumentException(this
+ " setOffset, invalid offset:" + offset);
this.offset = offset;
}
/** 得到字节缓存的使用长度 */
public int length()
{
return top - offset;
}
/** 得到字节缓存的字节数组,一般使用toArray()方法 */
public byte[] getByteArray()
{
return bytes;
}
/* methods */
/* byte methods */
/** 得到指定偏移位置的字节 */
public byte read(int pos)
{
return bytes[pos];
}
/** 设置指定偏移位置的字节 */
public void write(int b, int pos)
{
bytes[pos] = (byte) b;
}
/* read methods */
/**
* 按当前偏移位置读入指定的字节数组
*
* @param data
* 指定的字节数组
* @param pos
* 指定的字节数组的起始位置
* @param len
* 读入的长度
*/
public void read(byte[] data, int pos, int len)
{
System.arraycopy(bytes, offset, data, pos, len);
offset += len;
}
/** 读出一个布尔值 */
public boolean readBoolean()
{
return (bytes[offset++] != 0);
}
/** 读出一个字节 */
public byte readByte()
{
return bytes[offset++];
}
/** 读出一个无符号字节 */
public int readUnsignedByte()
{
return bytes[offset++] & 0xff;
}
/** 读出一个字符 */
public char readChar()
{
return (char) readUnsignedShort();
}
/** 读出一个短整型数值 */
public short readShort()
{
return (short) readUnsignedShort();
}
/** 读出一个无符号的短整型数值 */
public int readUnsignedShort()
{
int pos = offset;
offset += 2;
return (bytes[pos + 1] & 0xff) + ((bytes[pos] & 0xff) << 8);
}
/** 读出一个整型数值 */
public int readInt()
{
int pos = offset;
offset += 4;
return (bytes[pos + 3] & 0xff) + ((bytes[pos + 2] & 0xff) << 8)
+ ((bytes[pos + 1] & 0xff) << 16) + ((bytes[pos] & 0xff) << 24);
}
/** 读出一个浮点数值 */
public float readFloat()
{
return Float.intBitsToFloat(readInt());
}
/** 读出一个长整型数值 */
public long readLong()
{
int pos = offset;
offset += 8;
return (bytes[pos + 7] & 0xffL) + ((bytes[pos + 6] & 0xffL) << 8)
+ ((bytes[pos + 5] & 0xffL) << 16)
+ ((bytes[pos + 4] & 0xffL) << 24)
+ ((bytes[pos + 3] & 0xffL) << 32)
+ ((bytes[pos + 2] & 0xffL) << 40)
+ ((bytes[pos + 1] & 0xffL) << 48)
+ ((bytes[pos] & 0xffL) << 56);
}
/** 读出一个双浮点数值 */
public double readDouble()
{
return Double.longBitsToDouble(readLong());
}
/**
* 读出动态长度, 数据大小采用动态长度,整数类型下,最大为512M 1xxx,xxxx表示(0~0x80) 0~128B
* 01xx,xxxx,xxxx,xxxx表示(0~0x4000) 0~16K
* 001x,xxxx,xxxx,xxxx,xxxx,xxxx,xxxx,xxxx表示(0~0x20000000) 0~512M
*/
public int readLength()
{
int n = bytes[offset] & 0xff;
if (n >= 0x80)
{
offset++;
return n - 0x80;
}
else if (n >= 0x40)
return readUnsignedShort() - 0x4000;
else if (n >= 0x20)
return readInt() - 0x20000000;
else
throw new IllegalArgumentException(this
+ " readLength, invalid number:" + n);
}
/** 读出一个指定长度的字节数组,可以为null */
public byte[] readData()
{
int len = readLength() - 1;
if (len < 0)
return null;
if (len > MAX_DATA_LENGTH)
throw new IllegalArgumentException(this
+ " readData, data overflow:" + len);
byte[] data = new byte[len];
read(data, 0, len);
return data;
}
/** 读出一个短字节数组,长度不超过254 */
public byte[] readShortData()
{
int len = readUnsignedByte();
if (len == 255)
return null;
byte[] data = new byte[len];
if (len != 0)
read(data, 0, len);
return data;
}
/** 读出一个指定长度的字符串 */
public String readString(int len)
{
byte[] data = new byte[len];
if (len == 0)
return "";
read(data, 0, len);
return new String(data);
}
/** 读出一个短字符串,长度不超过254 */
public String readShortString()
{
int len = readUnsignedByte();
if (len == 255)
return null;
return readString(len);
}
/** 读出一个字符串,长度不超过65534 */
public String readString()
{
int len = readUnsignedShort();
if (len == 65535)
return null;
return readString(len);
}
/** 读出一个指定长度和编码类型的字符串 */
public String readUTF(String charsetName)
{
int len = readLength() - 1;
if (len < 0)
return null;
if (len > MAX_DATA_LENGTH)
throw new IllegalArgumentException(this
+ " readUTF, data overflow:" + len);
byte[] data = new byte[len];
read(data, 0, len);
if (charsetName == null)
return new String(data);
try
{
return new String(data, charsetName);
}
catch (Exception e)
{
throw new IllegalArgumentException(this
+ " readUTF, invalid charsetName:" + charsetName);
}
}
/** 读出一个指定长度的utf字符串 */
public String readUTF()
{
int len = readLength() - 1;
if (len < 0)
return null;
if (len == 0)
return "";
if (len > MAX_DATA_LENGTH)
throw new IllegalArgumentException(this
+ " readUTF, data overflow:" + len);
StringBuffer sb = new StringBuffer(len);
int pos = ByteKit.readUTF(bytes, offset, len, sb);
if (pos > 0)
throw new IllegalArgumentException(this
+ " readUTF, format err, len=" + len + ", pos:" + pos);
offset += len;
return sb.toString();
}
/* write methods */
/**
* 写入指定字节数组
*
* @param data
* 指定的字节数组
* @param pos
* 指定的字节数组的起始位置
* @param len
* 写入的长度
*/
public void write(byte[] data, int pos, int len)
{
if (bytes.length < top + len)
setCapacity(top + len);
System.arraycopy(data, pos, bytes, top, len);
top += len;
}
/** 写入一个布尔值 */
public void writeBoolean(boolean b)
{
if (bytes.length < top + 1)
setCapacity(top + CAPACITY);
bytes[top++] = (byte) (b ? 1 : 0);
}
/** 写入一个字节 */
public void writeByte(int b)
{
if (bytes.length < top + 1)
setCapacity(top + CAPACITY);
bytes[top++] = (byte) b;
}
/** 写入一个字符 */
public void writeChar(int c)
{
writeShort(c);
}
/** 写入一个短整型数值 */
public void writeShort(int s)
{
int pos = top;
if (bytes.length < pos + 2)
setCapacity(pos + CAPACITY);
bytes[pos] = (byte) (s >>> 8);
bytes[pos + 1] = (byte) s;
top += 2;
}
/** 在指定位置写入一个短整型数值,length不变 */
public void writeShort(int s, int pos)
{
if (bytes.length < pos + 2)
setCapacity(pos + CAPACITY);
bytes[pos] = (byte) (s >>> 8);
bytes[pos + 1] = (byte) s;
}
/** 写入一个整型数值 */
public void writeInt(int i)
{
int pos = top;
if (bytes.length < pos + 4)
setCapacity(pos + CAPACITY);
bytes[pos] = (byte) (i >>> 24);
bytes[pos + 1] = (byte) (i >>> 16);
bytes[pos + 2] = (byte) (i >>> 8);
bytes[pos + 3] = (byte) i;
top += 4;
}
/** 在指定位置写入一个整型数值,length不变 */
public void writeInt(int i, int pos)
{
if (bytes.length < pos + 4)
setCapacity(pos + CAPACITY);
bytes[pos] = (byte) (i >>> 24);
bytes[pos + 1] = (byte) (i >>> 16);
bytes[pos + 2] = (byte) (i >>> 8);
bytes[pos + 3] = (byte) i;
}
/** 写入一个浮点数值 */
public void writeFloat(float f)
{
writeInt(Float.floatToIntBits(f));
}
/** 写入一个长整型数值 */
public void writeLong(long l)
{
int pos = top;
if (bytes.length < pos + 8)
setCapacity(pos + CAPACITY);
bytes[pos] = (byte) (l >>> 56);
bytes[pos + 1] = (byte) (l >>> 48);
bytes[pos + 2] = (byte) (l >>> 40);
bytes[pos + 3] = (byte) (l >>> 32);
bytes[pos + 4] = (byte) (l >>> 24);
bytes[pos + 5] = (byte) (l >>> 16);
bytes[pos + 6] = (byte) (l >>> 8);
bytes[pos + 7] = (byte) l;
top += 8;
}
/** 写入一个双浮点数值 */
public void writeDouble(double d)
{
writeLong(Double.doubleToLongBits(d));
}
/** 写入动态长度 */
public void writeLength(int len)
{
if (len >= 0x20000000 || len < 0)
throw new IllegalArgumentException(this
+ " writeLength, invalid len:" + len);
if (len >= 0x4000)
writeInt(len + 0x20000000);
else if (len >= 0x80)
writeShort(len + 0x4000);
else
writeByte(len + 0x80);
}
/** 写入一个字节数组,可以为null */
public void writeData(byte[] data)
{
writeData(data, 0, (data != null) ? data.length : 0);
}
/** 写入一个字节数组,可以为null */
public void writeData(byte[] data, int pos, int len)
{
if (data == null)
{
writeLength(0);
return;
}
writeLength(len + 1);
write(data, pos, len);
}
/** 写入一个字符串,可以为null */
public void writeString(String s)
{
if (s != null)
{
byte[] temp = s.getBytes();
if (temp.length > 65534)
throw new IllegalArgumentException(getClass().getName()
+ " writeString, invalid s:" + s);
writeShort(temp.length);
if (temp.length != 0)
write(temp, 0, temp.length);
}
else
writeShort(65535);
}
/** 写入一个字符串,以指定的字符进行编码 */
public void writeUTF(String str, String charsetName)
{
if (str == null)
{
writeLength(0);
return;
}
byte[] data;
if (charsetName != null)
{
try
{
data = str.getBytes(charsetName);
}
catch (Exception e)
{
throw new IllegalArgumentException(this
+ " writeUTF, invalid charsetName:" + charsetName);
}
}
else
data = str.getBytes();
writeLength(data.length + 1);
write(data, 0, data.length);
}
/** 写入一个utf字符串,可以为null */
public void writeUTF(String str)
{
writeUTF(str, 0, (str != null) ? str.length() : 0);
}
/** 写入一个utf字符串中指定的部分,可以为null */
public void writeUTF(String str, int index, int length)
{
if (str == null)
{
writeLength(0);
return;
}
int len = ByteKit.getUTFLength(str, index, length);
writeLength(len + 1);
int pos = top;
if (bytes.length < pos + len)
setCapacity(pos + len);
ByteKit.writeUTF(str, index, length, bytes, pos);
top += len;
}
/** 检查是否为相同类型的实例 */
public boolean checkClass(Object obj)
{
return (obj instanceof ByteBuffer);
}
/** 在指定位置写入一个字节,length不变 */
public void writeByte(int b, int pos)
{
if (bytes.length < pos + 1)
setCapacity(pos + CAPACITY);
bytes[pos] = (byte) b;
}
/** 得到字节缓存当前长度的字节数组 */
public byte[] toByteArray()
{
byte[] data = new byte[top - offset];
System.arraycopy(bytes, offset, data, 0, data.length);
return data;
}
/** 清除字节缓存对象 */
public void clear()
{
top = 0;
offset = 0;
}
/* common methods */
public int hashCode()
{
int hash = 17;
for (int i = top - 1; i >= 0; i--)
hash = 65537 * hash + bytes[i];
return hash;
}
public boolean equals(Object obj)
{
if (this == obj)
return true;
if (!checkClass(obj))
return false;
ByteBuffer bb = (ByteBuffer) obj;
if (bb.top != top)
return false;
if (bb.offset != offset)
return false;
for (int i = top - 1; i >= 0; i--)
{
if (bb.bytes[i] != bytes[i])
return false;
}
return true;
}
public String toString()
{
return super.toString() + "[" + top + "," + offset + "," + bytes.length
+ "] ";
}
}
相应的ByteKit类代码:
/**
* Copyright 2001 by seasky <www.seasky.cn>.
*/
package slib.util;
/**
* 类说明: 字节及字节数组的方法操作库
*
* @version 1.0
* @author zminleo <zmin@seasky.cn>
*/
public final class ByteKit
{
/* static fields */
/** 库信息 */
public static final String toString=ByteKit.class.getName();
/* static methods */
/** 在字节数组中指定位置读出一个布尔值 */
public static boolean readBoolean(byte[] bytes,int pos)
{
return bytes[pos]!=0;
}
/** 在字节数组中指定位置读出一个字节 */
public static byte readByte(byte[] bytes,int pos)
{
return bytes[pos];
}
/** 在字节数组中指定位置读出一个无符号字节 */
public static int readUnsignedByte(byte[] bytes,int pos)
{
return bytes[pos]&0xff;
}
/** 在字节数组中指定位置读出一个字符 */
public static char readChar(byte[] bytes,int pos)
{
return (char)readUnsignedShort(bytes,pos);
}
/** 在字节数组中指定位置读出一个字符,低位在前,高位在后 */
public static char readChar_(byte[] bytes,int pos)
{
return (char)readUnsignedShort_(bytes,pos);
}
/** 在字节数组中指定位置读出一个短整型数值 */
public static short readShort(byte[] bytes,int pos)
{
return (short)readUnsignedShort(bytes,pos);
}
/** 在字节数组中指定位置读出一个短整型数值,低位在前,高位在后 */
public static short readShort_(byte[] bytes,int pos)
{
return (short)readUnsignedShort_(bytes,pos);
}
/** 在字节数组中指定位置读出一个无符号短整型数值 */
public static int readUnsignedShort(byte[] bytes,int pos)
{
return (bytes[pos+1]&0xff)+((bytes[pos]&0xff)<<8);
}
/** 在字节数组中指定位置读出一个无符号短整型数值,低位在前,高位在后 */
public static int readUnsignedShort_(byte[] bytes,int pos)
{
return ((bytes[pos+1]&0xff)<<8)+(bytes[pos]&0xff);
}
/** 在字节数组中指定位置读出一个整型数值 */
public static int readInt(byte[] bytes,int pos)
{
return ((bytes[pos+3]&0xff))+((bytes[pos+2]&0xff)<<8)
+((bytes[pos+1]&0xff)<<16)+((bytes[pos]&0xff)<<24);
}
/** 在字节数组中指定位置读出一个整型数值,低位在前,高位在后 */
public static int readInt_(byte[] bytes,int pos)
{
return ((bytes[pos+3]&0xff)<<24)+((bytes[pos+2]&0xff)<<16)
+((bytes[pos+1]&0xff)<<8)+((bytes[pos]&0xff));
}
/** 在字节数组中指定位置读出一个浮点数值 */
public static float readFloat(byte[] bytes,int pos)
{
return Float.intBitsToFloat(readInt(bytes,pos));
}
/** 在字节数组中指定位置读出一个浮点数值,低位在前,高位在后 */
public static float readFloat_(byte[] bytes,int pos)
{
return Float.intBitsToFloat(readInt_(bytes,pos));
}
/** 在字节数组中指定位置读出一个长整型数值 */
public static long readLong(byte[] bytes,int pos)
{
return (bytes[pos+7]&0xffL)+((bytes[pos+6]&0xffL)<<8)
+((bytes[pos+5]&0xffL)<<16)+((bytes[pos+4]&0xffL)<<24)
+((bytes[pos+3]&0xffL)<<32)+((bytes[pos+2]&0xffL)<<40)
+((bytes[pos+1]&0xffL)<<48)+((bytes[pos]&0xffL)<<56);
}
/** 在字节数组中指定位置读出一个长整型数值,低位在前,高位在后 */
public static long readLong_(byte[] bytes,int pos)
{
return ((bytes[pos+7]&0xffL)<<56)+((bytes[pos+6]&0xffL)<<48)
+((bytes[pos+5]&0xffL)<<40)+((bytes[pos+4]&0xffL)<<32)
+((bytes[pos+3]&0xffL)<<24)+((bytes[pos+2]&0xffL)<<16)
+((bytes[pos+1]&0xffL)<<8)+(bytes[pos]&0xffL);
}
/** 在字节数组中指定位置读出一个双浮点数值 */
public static double readDouble(byte[] bytes,int pos)
{
return Double.longBitsToDouble(readLong(bytes,pos));
}
/** 在字节数组中指定位置读出一个双浮点数值,低位在前,高位在后 */
public static double readDouble_(byte[] bytes,int pos)
{
return Double.longBitsToDouble(readLong_(bytes,pos));
}
/** 写入一个布尔值在字节数组中指定位置 */
public static void writeBoolean(boolean b,byte[] bytes,int pos)
{
bytes[pos]=(byte)(b?1:0);
}
/** 写入一个字节在字节数组中指定位置 */
public static void writeByte(int b,byte[] bytes,int pos)
{
bytes[pos]=(byte)b;
}
/** 在字节数组中指定位置写入一个字符 */
public static void writeChar(int c,byte[] bytes,int pos)
{
writeShort(c,bytes,pos);
}
/** 写入一个字符在字节数组中指定位置,低位在前,高位在后 */
public static void writeChar_(int c,byte[] bytes,int pos)
{
writeShort_(c,bytes,pos);
}
/** 写入一个短整型数值在字节数组中指定位置 */
public static void writeShort(int s,byte[] bytes,int pos)
{
bytes[pos]=(byte)(s>>>8);
bytes[pos+1]=(byte)s;
}
/** 写入一个短整型数值在字节数组中指定位置,低位在前,高位在后 */
public static void writeShort_(int s,byte[] bytes,int pos)
{
bytes[pos]=(byte)s;
bytes[pos+1]=(byte)(s>>>8);
}
/** 写入一个整型数值在字节数组中指定位置 */
public static void writeInt(int i,byte[] bytes,int pos)
{
bytes[pos]=(byte)(i>>>24);
bytes[pos+1]=(byte)(i>>>16);
bytes[pos+2]=(byte)(i>>>8);
bytes[pos+3]=(byte)i;
}
/** 在字节数组中指定位置写入一个整型数值,低位在前,高位在后 */
public static void writeInt_(int i,byte[] bytes,int pos)
{
bytes[pos]=(byte)i;
bytes[pos+1]=(byte)(i>>>8);
bytes[pos+2]=(byte)(i>>>16);
bytes[pos+3]=(byte)(i>>>24);
}
/** 写入一个浮点数值在字节数组中指定位置 */
public static void writeFloat(float f,byte[] bytes,int pos)
{
writeInt(Float.floatToIntBits(f),bytes,pos);
}
/** 写入一个浮点数值在字节数组中指定位置,低位在前,高位在后 */
public static void writeFloat_(float f,byte[] bytes,int pos)
{
writeInt_(Float.floatToIntBits(f),bytes,pos);
}
/** 写入一个长整型数值在字节数组中指定位置 */
public static void writeLong(long l,byte[] bytes,int pos)
{
bytes[pos]=(byte)(l>>>56);
bytes[pos+1]=(byte)(l>>>48);
bytes[pos+2]=(byte)(l>>>40);
bytes[pos+3]=(byte)(l>>>32);
bytes[pos+4]=(byte)(l>>>24);
bytes[pos+5]=(byte)(l>>>16);
bytes[pos+6]=(byte)(l>>>8);
bytes[pos+7]=(byte)l;
}
/** 写入一个长整型数值在字节数组中指定位置,低位在前,高位在后 */
public static void writeLong_(long l,byte[] bytes,int pos)
{
bytes[pos]=(byte)l;
bytes[pos+1]=(byte)(l>>>8);
bytes[pos+2]=(byte)(l>>>16);
bytes[pos+3]=(byte)(l>>>24);
bytes[pos+4]=(byte)(l>>>32);
bytes[pos+5]=(byte)(l>>>40);
bytes[pos+6]=(byte)(l>>>48);
bytes[pos+7]=(byte)(l>>>56);
}
/** 写入一个双浮点数值在字节数组中指定位置 */
public static void writeDouble(double d,byte[] bytes,int pos)
{
writeLong(Double.doubleToLongBits(d),bytes,pos);
}
/** 写入一个双浮点数值在字节数组中指定位置,低位在前,高位在后 */
public static void writeDouble_(double d,byte[] bytes,int pos)
{
writeLong_(Double.doubleToLongBits(d),bytes,pos);
}
/** 将指定的字节数据转换为ISO-8859-1格式的字符串 */
public static String readISO8859_1(byte[] data)
{
return readISO8859_1(data,0,data.length);
}
/** 将指定的字节数据转换为ISO-8859-1格式的字符串 */
public static String readISO8859_1(byte[] data,int pos,int len)
{
char[] array=new char[len];
for(int i=pos+len-1,j=array.length-1;i>=pos;i--,j--)
array[j]=(char)data[i];
return new String(array);
}
/** 将指定的字符串转换为ISO-8859-1格式的字节数据 */
public static byte[] writeISO8859_1(String str)
{
return writeISO8859_1(str,0,str.length());
}
/** 将指定的字符串转换为ISO-8859-1格式的字节数据 */
public static byte[] writeISO8859_1(String str,int index,int len)
{
byte[] data=new byte[len];
writeISO8859_1(str,index,len,data,0);
return data;
}
/** 将指定的字符串转换为ISO-8859-1格式的字节数据 */
public static void writeISO8859_1(String str,int index,int len,
byte[] data,int pos)
{
int c;
for(int i=index+len-1,j=pos+len-1;i>=index;i--,j--)
{
c=str.charAt(i);
data[j]=(c>256)?63:(byte)c;
}
}
/** 将指定的字符数组转换为ISO-8859-1格式的字节数据 */
public static void writeISO8859_1(char[] chars,int index,int len,
byte[] data,int pos)
{
int c;
for(int i=index+len-1,j=pos+len-1;i>=index;i--,j--)
{
c=chars[i];
data[j]=(c>256)?63:(byte)c;
}
}
/** 将指定的UTF8格式的字节数据转换为字符串,返回null表示失败 */
public static String readUTF(byte[] data)
{
StringBuffer sb=new StringBuffer(data.length);
int pos=readUTF(data,0,data.length,sb);
return (pos==0)?sb.toString():null;
}
/**
* 将指定的UTF8格式的字节数据转换为字符串, 返回0表示成功,否则表示失败位置
*/
public static int readUTF(byte[] data,StringBuffer sb)
{
return readUTF(data,0,data.length,sb);
}
/**
* 将指定的UTF8格式的字节数据转换为字符串, 返回0表示成功,否则表示失败位置
*/
public static int readUTF(byte[] data,int pos,int len,StringBuffer sb)
{
int i,c,cc,ccc;
int end=pos+len;
while(pos<end)
{
c=data[pos]&0xff;
i=c>>4;
if(i<8)
{
// 0xxx xxxx
pos++;
sb.append((char)c);
}
else if(i==12||i==13)
{
// 110x xxxx 10xx xxxx
pos+=2;
if(pos>end) return pos;
cc=data[pos-1];
if((cc&0xC0)!=0x80) return pos;
sb.append((char)(((c&0x1f)<<6)|(cc&0x3f)));
}
else if(i==14)
{
// 1110 xxxx 10xx xxxx 10xx
// xxxx
pos+=3;
if(pos>end) return pos;
cc=data[pos-2];
ccc=data[pos-1];
if(((cc&0xC0)!=0x80)||((ccc&0xC0)!=0x80)) return pos;
sb.append((char)(((c&0x0f)<<12)|((cc&0x3f)<<6)|(ccc&0x3f)));
}
else
// 10xx xxxx 1111 xxxx
return pos;
}
return 0;
}
/** 获得指定的字符串转换为UTF8格式的字节数据的长度 */
public static int getUTFLength(String str,int index,int len)
{
int utfLen=0;
int c;
for(int i=index;i<len;i++)
{
c=str.charAt(i);
if((c>=0x0001)&&(c<=0x007f))
utfLen++;
else if(c>0x07ff)
utfLen+=3;
else
utfLen+=2;
}
return utfLen;
}
/** 在字节数组中指定位置写入一个短整型数值 */
public static void writeShort(byte[] bytes,int pos,int s)
{
bytes[pos]=(byte)(s>>>8);
bytes[pos+1]=(byte)s;
}
/** 在字节数组中指定位置写入一个字节 */
public static void writeByte(byte[] bytes,int pos,int b)
{
bytes[pos]=(byte)b;
}
/** 获得指定的字符数组转换为UTF8格式的字节数据的长度 */
public static int getUTFLength(char[] chars,int index,int len)
{
int utfLen=0;
int c;
for(int i=index;i<len;i++)
{
c=chars[i];
if((c>=0x0001)&&(c<=0x007f))
utfLen++;
else if(c>0x07ff)
utfLen+=3;
else
utfLen+=2;
}
return utfLen;
}
/** 将指定的字符串转换为UTF8格式的字节数据 */
public static byte[] writeUTF(String str)
{
return writeUTF(str,0,str.length());
}
/** 将指定的字符串转换为UTF8格式的字节数据 */
public static byte[] writeUTF(String str,int index,int len)
{
byte[] data=new byte[getUTFLength(str,index,len)];
writeUTF(str,index,len,data,0);
return data;
}
/** 将指定的字符串转换为UTF8格式的字节数据 */
public static void writeUTF(String str,int index,int len,byte[] data,
int pos)
{
int c;
for(int i=index;i<len;i++)
{
c=str.charAt(i);
if((c>=0x0001)&&(c<=0x007f))
{
data[pos++]=(byte)c;
}
else if(c>0x07ff)
{
data[pos++]=(byte)(0xe0|((c>>12)&0x0f));
data[pos++]=(byte)(0x80|((c>>6)&0x3f));
data[pos++]=(byte)(0x80|(c&0x3f));
}
else
{
data[pos++]=(byte)(0xc0|((c>>6)&0x1f));
data[pos++]=(byte)(0x80|(c&0x3f));
}
}
}
/** 将指定的字符数组转换为UTF8格式的字节数据 */
public static void writeUTF(char[] chars,int index,int len,byte[] data,
int pos)
{
int c;
for(int i=index;i<len;i++)
{
c=chars[i];
if((c>=0x0001)&&(c<=0x007f))
{
data[pos++]=(byte)c;
}
else if(c>0x07ff)
{
data[pos++]=(byte)(0xe0|((c>>12)&0x0f));
data[pos++]=(byte)(0x80|((c>>6)&0x3f));
data[pos++]=(byte)(0x80|(c&0x3f));
}
else
{
data[pos++]=(byte)(0xc0|((c>>6)&0x1f));
data[pos++]=(byte)(0x80|(c&0x3f));
}
}
}
/* constructors */
private ByteKit()
{
}
}
使用方法:
for(Map.Entry<String, Map<String, String>> entry : gateConfig.entrySet())
{
String host = entry.getValue().get("host");
String port = entry.getValue().get("port");
PollServer poll = new PollServer(host,Integer.parseInt(port));
Thread gateThread = new Thread(poll);
gateThread.setName(entry.getKey());
gateThread.start();
}
可能调整成mina库,还有其他的一个原因,就是在服务器端会无辜收到一个rst标识导致服务器断开。起初以为是代码问题,后来经过很长时间的排查和咨询,发现服务器用的是南方电信的网络,而一些北方网通的客户端在访问的时候,就会随机出现rst 连接复位现象!查询了好久,最后还是运维的大哥给的思维!当然在起初解决这个问题的时候,我还是本着代码的问题;顺便还去专门研究了Tcp/ip协议详解,翻出了大学里面学的计算机网络这本书。无论如何解决了就好!欢迎拍砖!