Netty之java序列化

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u010741376/article/details/46367701

通过一个实例来说明:Netty服务端接受到客户端的用户订购请求消息,服务端接受到请求消息,对用户名进行合法性校验,则构造订购成功的应答消息返回给客户端。使用Netty的ObjectEncoder和ObjectDecoder对订购请求和应答消息进行序列化.

 订购请求POJO类的定义:

  

import java.io.Serializable;

/**
 * 客户端请求消息
 * @author Administrator
 *
 */
public class SubscribeReq implements Serializable{

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;
     
	private int subReqID;
	private String userName;
	private String productName;
	private String phoneNumber;
	private String address;
	public int getSubReqID() {
		return subReqID;
	}
	public void setSubReqID(int subReqID) {
		this.subReqID = subReqID;
	}
	public String getUserName() {
		return userName;
	}
	public void setUserName(String userName) {
		this.userName = userName;
	}
	public String getProductName() {
		return productName;
	}
	public void setProductName(String productName) {
		this.productName = productName;
	}
	public String getPhoneNumber() {
		return phoneNumber;
	}
	public void setPhoneNumber(String phoneNumber) {
		this.phoneNumber = phoneNumber;
	}
	public String getAddress() {
		return address;
	}
	public void setAddress(String address) {
		this.address = address;
	}
	@Override
	public String toString() {
		return subReqID+"-"+userName+"-"+productName+"-"+phoneNumber+"-"+address;
		
	}
	
	
	
	
	
}
订购应答POJO类:

/**
 * 服务端应答消息
 * @author Administrator
 *
 */
public class SubscribeResp implements Serializable{

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;
   
	private int subReqID;
	private int respCode;
	private String desc;
	public int getSubReqID() {
		return subReqID;
	}
	public void setSubReqID(int subReqID) {
		this.subReqID = subReqID;
	}
	public int getRespCode() {
		return respCode;
	}
	public void setRespCode(int respCode) {
		this.respCode = respCode;
	}
	public String getDesc() {
		return desc;
	}
	public void setDesc(String desc) {
		this.desc = desc;
	}
	@Override
	public String toString() {
		return subReqID+"-"+respCode+"-"+desc;
	}
	
	
}
订购服务端主函数:

  

public class SubReqServer {
         public void bind(int port)throws Exception{
        	 //配置服务端的NIO线程组
        	 EventLoopGroup bossGroup=new NioEventLoopGroup();
        	 EventLoopGroup workerGroup=new NioEventLoopGroup();
        	 
        	 try {
				ServerBootstrap b=new ServerBootstrap();
				b.group(bossGroup,workerGroup)
				.channel(NioServerSocketChannel.class)
				.option(ChannelOption.SO_BACKLOG,100)
				.handler(new LoggingHandler(LogLevel.INFO))
				.childHandler(new ChannelInitializer<SocketChannel>() {

					@Override
					protected void initChannel(SocketChannel ch)
							throws Exception {
						ch.pipeline().addLast(new ObjectDecoder(1024*1024, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
						ch.pipeline().addLast(new ObjectEncoder());
						ch.pipeline().addLast(new SubReqServerHandler());
											
					}
					
				});
        		 
        		 //绑定端口,同步等待成功
				ChannelFuture f=b.bind(port).sync();
        		//等待服务端监听端口关闭 
				
				f.channel().closeFuture().sync();
        		 
			} finally{
				bossGroup.shutdownGracefully();
				workerGroup.shutdownGracefully();
			}
 
         }
         
         public static void main(String[] args) throws Exception {
			int port=8088;
			if(args!=null&&args.length>0){
				try {
					port=Integer.valueOf(args[0]);
				} catch (Exception e) {
					// TODO: handle exception
				}
			}
        	 
        	 new SubReqServer().bind(port);
        	 
        	 
        	 
		}
}

上述代码中,我们首先创建一个新的ObjectDecoder,它负责对实现Serializable的POJO对象进行解码,它有多个构造函数,支持不同的ClassResolver,在此我们使用weakCachingConcurrentResolver创建线程安全的WeakReferenceMap对类加载器进行缓存,它支持多线程并发访问,当虚拟机内存不足时,会释放缓存中的内存,防止内存泄露。

订单处理类:

public class SubReqServerHandler extends ChannelHandlerAdapter{
    public void channelRead(ChannelHandlerContext ctx,Object msg){
    	SubscribeReq req=(SubscribeReq)msg;
    	if("yxs".equalsIgnoreCase(req.getUserName())){
    		System.out.println("Server accept client subscript req:["+req.toString()+"]");
    		ctx.writeAndFlush(resp(req.getSubReqID()));
    		
    	}
    	
    	
    }
	
	private SubscribeResp resp(int subReqID){
		SubscribeResp resp=new SubscribeResp();
		resp.setSubReqID(subReqID);
		resp.setRespCode(0);
		resp.setDesc("Netty book order succeed,3 days later ,sent to the designated address");
		return resp;
			
	}
	public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
		cause.printStackTrace();
		ctx.close();
		
	}
	
}
客户端开发:将Netty对象解码器和编码器添加到ChannelPipline中,链路被激活的时候构造订购请求消息发送,为了检验Netty的java序列化功能是否支持TCP粘包/拆包,客户端一次构造10条订购请求,最后一次性发送给服务端,客户端订购处理handler将接受到的订购响应消息打印出来.

产品订购客户端:

public class SubReqClient {
     public void connect(int port,String host)throws Exception{
    	 //配置客户端NIO 线程组
    	 EventLoopGroup group=new NioEventLoopGroup();
    	 
    	 try {
    		  Bootstrap b=new Bootstrap();
        	  b.group(group).channel(NioSocketChannel.class)
        	  .option(ChannelOption.TCP_NODELAY,true)
        	  .handler(new ChannelInitializer<SocketChannel>() {

    			@Override
    			protected void initChannel(SocketChannel arg0) throws Exception {
    				arg0.pipeline().addLast(new ObjectDecoder(1024,ClassResolvers.cacheDisabled(this.getClass().getClassLoader())));
    				arg0.pipeline().addLast(new ObjectEncoder());
    				arg0.pipeline().addLast(new SubReqClientHandler());
    			}
        		  
    		});
        	  //发起异步链接操作
        	 ChannelFuture f=b.connect(host,port).sync();
        	  //等待客户端链路关闭
        	 f.channel().closeFuture().sync();
        	  
        	  
        	  
		} finally{
			//优雅的退出,释放NIO线程组
			group.shutdownGracefully();
		}
    	
    	 
    	 
    	 
    	 
     }
     public static void main(String[] args) throws Exception {
		int port=8088;
		if(args!=null&&args.length>0){
			try {
				port=Integer.valueOf(args[0]);
			} catch (Exception e) {
				// TODO: handle exception
			}
		}
    	 
    	 new SubReqClient().connect(port, "127.0.0.1");
    	 
    	 
	}
     
}
客户端处理类:

public class SubReqClientHandler extends ChannelHandlerAdapter{
       public SubReqClientHandler(){}
       
       
       public void channelActive(ChannelHandlerContext ctx){
    	   for(int i=0;i<10;i++){
    		   ctx.write(subReq(i));//写入10条记录
    	   }
    	   ctx.flush();//最后一次性发送 
       }
       private SubscribeReq subReq(int i){
    	   SubscribeReq req=new SubscribeReq();
    	   req.setAddress("武汉东湖新技术开发区");
    	   req.setPhoneNumber("121324343535");
    	   req.setProductName("Netty权威指南");
    	   req.setSubReqID(i);
    	   req.setUserName("Mryang");
    	   return req;
       }
       public void channelRead(ChannelHandlerContext ctx,Object msg)throws Exception{
    	   System.out.println("Receive server response:["+msg+"]");
    	   
       }
       public void channelReadComplete(ChannelHandlerContext ctx)throws Exception{
    	   ctx.flush();
       }
       public void exceptionCaght(ChannelHandlerContext ctx,Throwable cause){
    	   cause.printStackTrace();
    	   ctx.close();
       }
}





上一篇:Docker技术入门与实战(第2版)2.4 推荐实践环境


下一篇:关于 Java 序列化你不知道的 5 件事