接收到心跳后先解码,先不调用经过messageReceived()方法,先触发心跳接收发送类KeepAliveMessageFactoryImpl中的isRequest()方法,当判断是心跳时,就会发一个心跳,不再调用messageReceived(),当判断不是心跳时,回调messageReceived()方法,输出内容。
//服务器
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.filter.keepalive.KeepAliveFilter;
import org.apache.mina.filter.keepalive.KeepAliveMessageFactory;
import org.apache.mina.filter.keepalive.KeepAliveRequestTimeoutHandler;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.SocketAcceptor;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import public_protocol.MsgProtocol;
import server.KeepAliveRequestTimeoutHandlerImpl;
import serverUntil.ServerInfo;
/**
* 服务端启动
* @author Lhy
*
*/
public class ServerStart {
//设置服务器的端口,从全局变量类中获取服务器端口
private static final int PORT =ServerInfo.getServerPort();
/** 30秒后超时 */
//private static final int IDELTIMEOUT = 15;
/** 15秒发送一次心跳包 */
private static final int HEARTBEATRATE = 5;
//心跳超时时间
private static final int RequestTimeout=15;
private static SocketAcceptor acceptor;
private ServerStart() {}
public static SocketAcceptor getAcceptor(){
if(null==acceptor){
// 创建非阻塞的server端的Socket连接
acceptor = new NioSocketAcceptor();
}
return acceptor;
}
public static boolean serverStart() {
IoAcceptor acceptor = getAcceptor();
//缓冲区大小
acceptor.getSessionConfig().setReadBufferSize(1024);
//添加日志过滤器
acceptor.getFilterChain().addLast("logger", new LoggingFilter());
//添加编码过滤器
acceptor.getFilterChain().addLast("codec",new ProtocolCodecFilter(new MsgProtocol()));
//心跳包类,用于接收和发送心跳
KeepAliveMessageFactory heartBeatFactory = new KeepAliveMessageFactoryImpl();
//心跳超时类
KeepAliveRequestTimeoutHandler heartBeatHandler = new KeepAliveRequestTimeoutHandlerImpl();
//心跳超时过滤器,IdleStatus.BOTH_IDLE表示在此连接上的读写操作
KeepAliveFilter heartBeat = new KeepAliveFilter(heartBeatFactory,IdleStatus.BOTH_IDLE,heartBeatHandler);
//当设置为true时同时执行心跳判断和空闲判断的MyIoHandler类的sessionIdle()方法
//当为false时就不在调用sessionIdle()方法
heartBeat.setForwardEvent(false);
//设置多长时间发送一条心跳
heartBeat.setRequestInterval(HEARTBEATRATE);
//心跳超时时间,当超时后调用自己写的KeepAliveRequestTimeoutHandlerImpl类进行操作
heartBeat.setRequestTimeout(RequestTimeout);
//把心跳加入过滤器
acceptor.getFilterChain().addLast("heartbeat", heartBeat);
//设置,用于事件触发回调的类
acceptor.setHandler(new MyIoHandler());
try {
acceptor.bind(new InetSocketAddress(PORT));
return true;
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("Server started on port: " + PORT);
return false;
}
}
//客户端
package lhy.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import lhy.client_util.ServerInfo;
import lhy.protocol.MsgProtocol;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.filter.keepalive.KeepAliveFilter;
import org.apache.mina.filter.keepalive.KeepAliveMessageFactory;
import org.apache.mina.filter.keepalive.KeepAliveRequestTimeoutHandler;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 客户端启动连接
* @author Lhy
*
*/
public class ClientStart {
private static IoSession is;
/** 30秒后超时 */
// private static final int IDELTIMEOUT = 15;
/** 15秒发送一次心跳包 */
private static final int HEARTBEATRATE = 15;
//心跳超时时间
private static final int RequestTimeout=15;
private static NioSocketConnector connector ;
public static NioSocketConnector getConnector(){
if(null==connector){
// 创建非阻塞的server端的Socket连接
connector = new NioSocketConnector();
}
return connector;
}
public static IoSession getIoSession(){
return is;
}
public static boolean clientStart() {
//从全局变量类中获取服务器ip,port
String serverIp=ServerInfo.getServerIp();
int serverPort=ServerInfo.getServerPort();
System.out.println(serverIp+" "+serverPort);
NioSocketConnector connector = getConnector();
connector.getSessionConfig().setReadBufferSize(1024);
// connector.getFilterChain().addLast("logger", new LoggingFilter());
connector.getFilterChain().addLast("codec",new ProtocolCodecFilter(new MsgProtocol()));
KeepAliveMessageFactory heartBeatFactory = new KeepAliveMessageFactoryImpl();
KeepAliveRequestTimeoutHandler heartBeatHandler = new KeepAliveRequestTimeoutHandlerImpl();
KeepAliveFilter heartBeat = new KeepAliveFilter(heartBeatFactory,IdleStatus.BOTH_IDLE,heartBeatHandler);
heartBeat.setRequestInterval(HEARTBEATRATE);
//心跳超时
heartBeat.setRequestTimeout(RequestTimeout);
connector.getFilterChain().addLast("heartbeat", heartBeat);
connector.setHandler(new MyIoHandler());
// connector.getSessionConfig().setBothIdleTime(IDELTIMEOUT);
// connector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE,
// IDELTIMEOUT);
//设置连接超时时间
connector.setConnectTimeoutMillis(5000);
ConnectFuture cf = connector.connect(new InetSocketAddress(serverIp,serverPort));// 建立连接
cf.awaitUninterruptibly();// 等待连接创建完成
try {
is=cf.getSession();
//getIoSession().write(new String(XmlUtils.getXml().getBytes("UTF-8")));// 发送消息
} catch (Exception e) {
connector.getFilterChain().remove("codec");
connector.getFilterChain().remove("heartbeat");
System.out.println("连接超时");
return false;
}
return true;
}
/**
* @ClassName KeepAliveMessageFactoryImpl
* @Description 内部类,实现KeepAliveMessageFactory(心跳工厂)
* @author cruise
*
*/
//
// }
}
//心跳发送接收类
package server;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.keepalive.KeepAliveMessageFactory;
import server_domain.HeartReq102;
import server_domain.HeartRes101;
import server_domain.MsgPack;
/**
* @see 发送心跳包的内容
*/
public class KeepAliveMessageFactoryImpl implements KeepAliveMessageFactory{
//心跳包内容
private static HeartReq102 hq=new HeartReq102();
private static HeartRes101 hs=new HeartRes101();
private static final MsgPack HEARTBEATREQUEST = new MsgPack(hq.getCode(),hq.getData());
private static final MsgPack HEARTBEATRESPONSE = new MsgPack(hs.getCode(),hs.getData());
/**
* @see 返回给客户端的心跳包数据 return 返回结果才是客户端收到的心跳包数据
* @author Herman.Xiong
*/
@Override
public boolean isRequest(IoSession session, Object message) {
MsgPack msg=(MsgPack)message;
if (msg.getMsgCode()==HEARTBEATREQUEST.getMsgCode())
{
//System.out.println("获得请求心跳"+message.toString());
return true;
}
return false;
}
@Override
public boolean isResponse(IoSession session, Object message) {
MsgPack msg=(MsgPack)message;
if(msg.getMsgCode()==HEARTBEATRESPONSE.getMsgCode())
{
//System.out.println("收到心跳响应"+message);
return true;
}
return false;
}
@Override
public Object getRequest(IoSession session) {
//System.out.println("发送请求心跳"+HEARTBEATREQUEST);
return HEARTBEATREQUEST;
}
@Override
public Object getResponse(IoSession session, Object request) {
//System.out.println("回复心跳"+HEARTBEATRESPONSE);
return HEARTBEATRESPONSE;
// return null;
}
}
//事件触发类
package server;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import serverUntil.ClientRequestInfo;
import server_command.ServerCommandFactory;
import server_domain.MsgPack;
/**
* 连接IoHeadler
* @author Lhy
*
*/
public class MyIoHandler extends IoHandlerAdapter{
private final static Logger log = LoggerFactory
.getLogger(MyIoHandler.class);
@Override
public void sessionOpened(IoSession session) throws Exception {
}
@Override
public void sessionClosed(IoSession session) throws Exception {
System.out.println("当前连接已经关闭"+session.getRemoteAddress());
//删除全局变量中的为sessionId的映射
ClientRequestInfo.deleteWithSessionId(session.getId());
System.out.println("保存的当前用于服务器定时刷新的映射为"+ClientRequestInfo.cReqInfo);
}
@Override
public void messageReceived(IoSession session, Object message)
throws Exception {
MsgPack msg=(MsgPack)message;
System.out.println("收到消息"+msg.toString());
int code=msg.getMsgCode();
String data=msg.getMsgPack();
ServerCommandFactory serverCmd=new ServerCommandFactory();
serverCmd.CodeToDo(session,code,data);
}
public void sessionCreated(IoSession session) throws Exception {
System.out.println("创建一个新连接:"+ session.getRemoteAddress()+" id: "+session.getId());
//session.write("welcome to the chat room !");
}
public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
System.out.println("当前连接处于空闲状态:"+ session.getRemoteAddress()+ status);
// session.close(true);
}
}