mina 2 心跳包

接收到心跳后先解码,先不调用经过messageReceived()方法,先触发心跳接收发送类KeepAliveMessageFactoryImpl中的isRequest()方法,当判断是心跳时,就会发一个心跳,不再调用messageReceived(),当判断不是心跳时,回调messageReceived()方法,输出内容。


//服务器


import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;




import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.filter.keepalive.KeepAliveFilter;
import org.apache.mina.filter.keepalive.KeepAliveMessageFactory;
import org.apache.mina.filter.keepalive.KeepAliveRequestTimeoutHandler;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.SocketAcceptor;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import public_protocol.MsgProtocol;




import server.KeepAliveRequestTimeoutHandlerImpl;
import serverUntil.ServerInfo;




/**
 * 服务端启动
 * @author Lhy
 *
 */
public class ServerStart {
    //设置服务器的端口,从全局变量类中获取服务器端口
private static final int PORT =ServerInfo.getServerPort();
/** 30秒后超时 */
//private static final int IDELTIMEOUT = 15;
/** 15秒发送一次心跳包 */
private static final int HEARTBEATRATE = 5;
//心跳超时时间
private static final int RequestTimeout=15;






private static SocketAcceptor acceptor;


    private ServerStart() {}
    
    public static SocketAcceptor getAcceptor(){
    if(null==acceptor){
    // 创建非阻塞的server端的Socket连接
    acceptor = new NioSocketAcceptor();
    }
    return acceptor;
    }
public static boolean serverStart() {
IoAcceptor acceptor = getAcceptor();
//缓冲区大小
acceptor.getSessionConfig().setReadBufferSize(1024);

//添加日志过滤器
acceptor.getFilterChain().addLast("logger", new LoggingFilter());
//添加编码过滤器
acceptor.getFilterChain().addLast("codec",new ProtocolCodecFilter(new MsgProtocol()));
//心跳包类,用于接收和发送心跳
KeepAliveMessageFactory heartBeatFactory = new KeepAliveMessageFactoryImpl();
//心跳超时类
KeepAliveRequestTimeoutHandler heartBeatHandler = new KeepAliveRequestTimeoutHandlerImpl();
        //心跳超时过滤器,IdleStatus.BOTH_IDLE表示在此连接上的读写操作
KeepAliveFilter heartBeat = new KeepAliveFilter(heartBeatFactory,IdleStatus.BOTH_IDLE,heartBeatHandler);
//当设置为true时同时执行心跳判断和空闲判断的MyIoHandler类的sessionIdle()方法
//当为false时就不在调用sessionIdle()方法
heartBeat.setForwardEvent(false);
//设置多长时间发送一条心跳
heartBeat.setRequestInterval(HEARTBEATRATE);
//心跳超时时间,当超时后调用自己写的KeepAliveRequestTimeoutHandlerImpl类进行操作
heartBeat.setRequestTimeout(RequestTimeout);
//把心跳加入过滤器
acceptor.getFilterChain().addLast("heartbeat", heartBeat);
//设置,用于事件触发回调的类
acceptor.setHandler(new MyIoHandler());
try {
acceptor.bind(new InetSocketAddress(PORT));
return true;
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("Server started on port: " + PORT);
return false;
}

}



//客户端


package lhy.client;


import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;


import lhy.client_util.ServerInfo;
import lhy.protocol.MsgProtocol;


import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.filter.keepalive.KeepAliveFilter;
import org.apache.mina.filter.keepalive.KeepAliveMessageFactory;
import org.apache.mina.filter.keepalive.KeepAliveRequestTimeoutHandler;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;






/**
 * 客户端启动连接
 * @author Lhy
 *
 */


public class ClientStart {



private static IoSession is;
/** 30秒后超时 */
// private static final int IDELTIMEOUT = 15;
/** 15秒发送一次心跳包 */
private static final int HEARTBEATRATE = 15;
//心跳超时时间
private static final int RequestTimeout=15;


private static NioSocketConnector connector ;


public static NioSocketConnector getConnector(){
    if(null==connector){
    // 创建非阻塞的server端的Socket连接
    connector = new NioSocketConnector();
    }
    return connector;
    }

public static IoSession getIoSession(){
    return is;
}
public static boolean clientStart() {
//从全局变量类中获取服务器ip,port
 String serverIp=ServerInfo.getServerIp();
 int serverPort=ServerInfo.getServerPort();
System.out.println(serverIp+" "+serverPort);
NioSocketConnector connector = getConnector();
connector.getSessionConfig().setReadBufferSize(1024);


// connector.getFilterChain().addLast("logger", new LoggingFilter());
connector.getFilterChain().addLast("codec",new ProtocolCodecFilter(new MsgProtocol()));
KeepAliveMessageFactory heartBeatFactory = new KeepAliveMessageFactoryImpl();
KeepAliveRequestTimeoutHandler heartBeatHandler = new KeepAliveRequestTimeoutHandlerImpl();
        KeepAliveFilter heartBeat = new KeepAliveFilter(heartBeatFactory,IdleStatus.BOTH_IDLE,heartBeatHandler);
heartBeat.setRequestInterval(HEARTBEATRATE);
//心跳超时
heartBeat.setRequestTimeout(RequestTimeout);
connector.getFilterChain().addLast("heartbeat", heartBeat);
connector.setHandler(new MyIoHandler());
// connector.getSessionConfig().setBothIdleTime(IDELTIMEOUT);
// connector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE,
// IDELTIMEOUT);
//设置连接超时时间
connector.setConnectTimeoutMillis(5000);
ConnectFuture cf = connector.connect(new InetSocketAddress(serverIp,serverPort));// 建立连接
cf.awaitUninterruptibly();// 等待连接创建完成
try {
is=cf.getSession();
//getIoSession().write(new String(XmlUtils.getXml().getBytes("UTF-8")));// 发送消息
} catch (Exception e) {
connector.getFilterChain().remove("codec");
connector.getFilterChain().remove("heartbeat");
System.out.println("连接超时");
return false;
}

return true;
}


/**
* @ClassName KeepAliveMessageFactoryImpl
* @Description 内部类,实现KeepAliveMessageFactory(心跳工厂)
* @author cruise
*
*/
//
// }
}

//心跳发送接收类
package server;




import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.keepalive.KeepAliveMessageFactory;


import server_domain.HeartReq102;
import server_domain.HeartRes101;
import server_domain.MsgPack;








/**
 * @see 发送心跳包的内容
 */
public class KeepAliveMessageFactoryImpl implements KeepAliveMessageFactory{

//心跳包内容
private static HeartReq102 hq=new HeartReq102();
private static HeartRes101 hs=new HeartRes101();
    private static final MsgPack HEARTBEATREQUEST = new MsgPack(hq.getCode(),hq.getData());
    private static final MsgPack HEARTBEATRESPONSE = new MsgPack(hs.getCode(),hs.getData());
    
/**
     * @see 返回给客户端的心跳包数据 return 返回结果才是客户端收到的心跳包数据
     * @author Herman.Xiong
     */
    @Override
    public boolean isRequest(IoSession session, Object message) {
    MsgPack msg=(MsgPack)message;
    
    if (msg.getMsgCode()==HEARTBEATREQUEST.getMsgCode())
    {
    //System.out.println("获得请求心跳"+message.toString());
    return true;
    }
       
    return false;
    }


    @Override
    public boolean isResponse(IoSession session, Object message) {
    
    MsgPack msg=(MsgPack)message;
    if(msg.getMsgCode()==HEARTBEATRESPONSE.getMsgCode())
    {
    //System.out.println("收到心跳响应"+message);
    return true;
    }
    return false;
    }


    @Override
    public Object getRequest(IoSession session) {
    
    //System.out.println("发送请求心跳"+HEARTBEATREQUEST);
    return HEARTBEATREQUEST;
    
    }


    @Override
    public Object getResponse(IoSession session, Object request) {
    
    //System.out.println("回复心跳"+HEARTBEATRESPONSE);
    return HEARTBEATRESPONSE;
//   return null;
    }


}




//事件触发类

package server;


import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;




import serverUntil.ClientRequestInfo;
import server_command.ServerCommandFactory;
import server_domain.MsgPack;


/**
 * 连接IoHeadler
 * @author Lhy
 *
 */


public class MyIoHandler extends IoHandlerAdapter{
private final static Logger log = LoggerFactory
.getLogger(MyIoHandler.class);


@Override
public void sessionOpened(IoSession session) throws Exception {

}


@Override
public void sessionClosed(IoSession session) throws Exception {
System.out.println("当前连接已经关闭"+session.getRemoteAddress());
//删除全局变量中的为sessionId的映射
ClientRequestInfo.deleteWithSessionId(session.getId());
System.out.println("保存的当前用于服务器定时刷新的映射为"+ClientRequestInfo.cReqInfo);

}


@Override
public void messageReceived(IoSession session, Object message)
throws Exception {
MsgPack msg=(MsgPack)message;
System.out.println("收到消息"+msg.toString());

int code=msg.getMsgCode();
String data=msg.getMsgPack();
ServerCommandFactory serverCmd=new ServerCommandFactory();
serverCmd.CodeToDo(session,code,data);





}

public void sessionCreated(IoSession session) throws Exception {
        System.out.println("创建一个新连接:"+ session.getRemoteAddress()+"  id:  "+session.getId());
        //session.write("welcome to the chat room !");
    }

public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
        System.out.println("当前连接处于空闲状态:"+ session.getRemoteAddress()+ status);
       // session.close(true);
    }



}


上一篇:Android学习笔记--创建Dialog


下一篇:python 异常处理