Server1
import java.net.InetSocketAddress; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; // netty服务端 public class Server { public static void main(String[] args) { // 服务类 ServerBootstrap bootstrap = new ServerBootstrap(); // boss线程监听端口,worker线程负责数据读写 ExecutorService boss = Executors.newCachedThreadPool(); ExecutorService worker = Executors.newCachedThreadPool(); // 设置niosocket工厂 bootstrap.setFactory(new NioServerSocketChannelFactory(boss, worker)); // 设置管道的工厂 bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("helloHandler", new HelloHandler()); return pipeline; } }); bootstrap.bind(new InetSocketAddress(10101)); System.out.println("server1 start!!!"); } } import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelHandler; // 消息接受处理类 public class HelloHandler extends SimpleChannelHandler { // 接收消息 @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { // 接收消息 ChannelBuffer messageReceived = (ChannelBuffer) e.getMessage(); String messageRecStr = new String(messageReceived.array()); System.out.println(messageRecStr); // 回写数据 if ("hello".equals(messageRecStr)) { ChannelBuffer sendBack = ChannelBuffers.copiedBuffer("hi" .getBytes()); ctx.getChannel().write(sendBack); } else { ChannelBuffer sendBack = ChannelBuffers .copiedBuffer("I don't know what you said".getBytes()); ctx.getChannel().write(sendBack); } super.messageReceived(ctx, e); } // 捕获异常 @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { System.out.println("exceptionCaught"); super.exceptionCaught(ctx, e); } // 新连接 @Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { System.out.println("channelConnected"); super.channelConnected(ctx, e); } // 必须是链接已经建立,关闭通道时才会触发 @Override public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { System.out.println("channelDisconnected"); super.channelDisconnected(ctx, e); } // channel关闭的时候触发(见客户端模拟) @Override public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { System.out.println("channelClosed"); super.channelClosed(ctx, e); } }
Server2
相较示例1中增加了StringEncoder和StringDecoder两个handler对象。
StringDecoder对接受的内容进行解码,继承ChannelUpstreamHandler类。
StringEncoder对发送的内容进行编码,继承ChannelDownstreamHandler类。
请参看本博客《Netty学习6-源码跟踪ChannelPipeline和ChanelHandler工作原理》
http://blog.csdn.net/woshixuye/article/details/53859776
import java.net.InetSocketAddress; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; import org.jboss.netty.handler.codec.string.StringDecoder; import org.jboss.netty.handler.codec.string.StringEncoder; // netty服务端入门 public class Server { public static void main(String[] args) { // 服务类 ServerBootstrap bootstrap = new ServerBootstrap(); // boss线程监听端口,worker线程负责数据读写 ExecutorService boss = Executors.newCachedThreadPool(); ExecutorService worker = Executors.newCachedThreadPool(); // 设置niosocket工厂 bootstrap.setFactory(new NioServerSocketChannelFactory(boss, worker)); // 设置管道的工厂 bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast("helloHandler", new HelloHandler()); return pipeline; } }); bootstrap.bind(new InetSocketAddress(10101)); System.out.println("server2 start!!!"); } } import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelHandler; // 消息接受处理类 public class HelloHandler extends SimpleChannelHandler { // 接收消息 @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { // 接收消息 String messageReceived = (String) e.getMessage(); System.out.println(messageReceived); // 回写数据 if ("hello".equals(messageReceived)) ctx.getChannel().write("hi"); else ctx.getChannel().write("I don't know what you said"); super.messageReceived(ctx, e); } // 捕获异常 @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { System.out.println("exceptionCaught"); super.exceptionCaught(ctx, e); } // 新连接 @Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { System.out.println("channelConnected"); super.channelConnected(ctx, e); } // 必须是链接已经建立,关闭通道时才会触发 @Override public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { System.out.println("channelDisconnected"); super.channelDisconnected(ctx, e); } // channel关闭的时候触发(见客户端模拟) @Override public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { System.out.println("channelClosed"); super.channelClosed(ctx, e); } }
Client
import java.net.InetSocketAddress; import java.util.Scanner; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; import org.jboss.netty.handler.codec.string.StringDecoder; import org.jboss.netty.handler.codec.string.StringEncoder; /** * netty客户端入门 */ public class Client { @SuppressWarnings("resource") public static void main(String[] args) { // 服务类 ClientBootstrap bootstrap = new ClientBootstrap(); // 线程池 ExecutorService boss = Executors.newCachedThreadPool(); ExecutorService worker = Executors.newCachedThreadPool(); // socket工厂 bootstrap.setFactory(new NioClientSocketChannelFactory(boss, worker)); // 管道工厂 bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast("hiHandler", new HiHandler()); return pipeline; } }); // 连接服务端 ChannelFuture connect = bootstrap.connect(new InetSocketAddress( "127.0.0.1", 10101)); Channel channel = connect.getChannel(); System.out.println("client start"); Scanner scanner = new Scanner(System.in); while (true) { System.out.println("请输入"); channel.write(scanner.next()); } } } import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelHandler; /** * 消息接受处理类 */ public class HiHandler extends SimpleChannelHandler { // 接收消息 @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { // 接收消息 String s = (String) e.getMessage(); System.out.println(s); super.messageReceived(ctx, e); } // 捕获异常 @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { System.out.println("exceptionCaught"); super.exceptionCaught(ctx, e); } // 新连接 @Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { System.out.println("channelConnected"); super.channelConnected(ctx, e); } // 必须【连接已经建立】,关闭通道的时候才会触发 @Override public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { System.out.println("channelDisconnected"); super.channelDisconnected(ctx, e); } // channel关闭的时候触发(比如服务端没有启动,客户端发起连接的话,就会报这个错) @Override public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { System.out.println("channelClosed"); super.channelClosed(ctx, e); } }