针对prototobuf的操作
(1)引入protobuf的mvn配置
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.6.1</version>
</dependency>
(2)编写*.proto文件
syntax="proto3";//协议的版本
option java_outer_classname="StudentPOJO" //生成外部类名字 同时也是文件名字
message Student{
//会在StudentPOJO外部类 生成一个内部类Student 真正发送的POJO对象
int32 id=1; //1 表示属性序号 int32 是protobuf的类型
string name=2;
}
syntax="proto3";//协议的版本
option optimize_for=SPEED
option java_package=""//java文件生成到哪个路径
option java_outer_classname="MyData" //生成外部类名字 同时也是文件名字
//protobuferr 通过message管理
//传输多类型
message MyMessage{
//定义一个枚举
enum DataType{
StudentType=0;
WorkerType=1;
}
DataType dateType=1;//用DataType来标识传的是哪个枚举类型
//枚举类型最多只能出现1个
oneof dataBody{
Student student=2;
Worker worker=3;
}
}
message Student{
//会在StudentPOJO外部类 生成一个内部类Student 真正发送的POJO对象
int32 id=1; //1 表示属性序号 int32 是protobuf的类型
string name=2;
}
message Worker{
//会在StudentPOJO外部类 生成一个内部类Student 真正发送的POJO对象
int32 id=1; //1 表示属性序号 int32 是protobuf的类型
string name=2;
int32 age=3;
}
(3)使用proto.exec文件将上面的.proto文件生成.java文件
然后在netty的客户端与服务端进行数据通过,针对指定的对象传输。如果针对多个类型的对象传输,那么自定义枚举类型进行区分
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import java.net.InetSocketAddress;
public class EchoClient {
private final String host;
private final int port;
public EchoClient() {
this(0);
}
public EchoClient(int port) {
this("localhost", port);
}
public EchoClient(String host, int port) {
this.host = host;
this.port = port;
}
public void start() throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group) // 注册线程池
.channel(NioSocketChannel.class) // 使用NioSocketChannel来作为连接用的channel类
.remoteAddress(new InetSocketAddress(this.host, this.port)) // 绑定连接端口和host信息
.handler(new ChannelInitializer<SocketChannel>() { // 绑定连接初始化器
@Override
protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("connected...");
ChannelPipeline pipeline = ch.pipeline();
//加入一个protobuf的解码器
pipeline.addLast("encoder",new ProtobufEncoder());
pipeline.addLast(new EchoClientHandler());
}
});
System.out.println("created..");
ChannelFuture cf = b.connect().sync(); // 异步连接服务器
System.out.println("connected..."); // 连接完成
cf.channel().closeFuture().sync(); // 异步等待关闭连接channel
System.out.println("closed.."); // 关闭完成
} finally {
group.shutdownGracefully().sync(); // 释放线程池资源
}
}
public static void main(String[] args) throws Exception {
new EchoClient("127.0.0.1", 65535).start(); // 连接127.0.0.1/65535,并启动
}
}
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
import java.nio.charset.Charset;
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client channelActive..");
ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8)); // 必须有flush
//发送一个Student对象到服务端
StudentPOJO.Student student=StudentPOJO.Student.newBuilder().setId(4).setName("haha").build();
ctx.writeAndFlush(student);
// 必须存在flush
// ctx.write(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8));
// ctx.flush();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("client channelRead..");
ByteBuf buf = msg.readBytes(msg.readableBytes());
System.out.println("Client received:" + ByteBufUtil.hexDump(buf) + "; The value is:" + buf.toString(Charset.forName("utf-8")));
//ctx.channel().close().sync();// client关闭channel连接
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
public class EchoServer {
private final int port;
public EchoServer(int port) {
this.port = port;
}
public void start() throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
ServerBootstrap sb = new ServerBootstrap();
sb.group(group) // 绑定线程池
.channel(NioServerSocketChannel.class) // 指定使用的channel
.localAddress(this.port)// 绑定监听端口
.childHandler(new ChannelInitializer<SocketChannel>() { // 绑定客户端连接时候触发操作
@Override
protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("connected...; Client:" + ch.remoteAddress());
ChannelPipeline pipeline = ch.pipeline();
//指定对哪种对象进行解码
pipeline.addLast("decoder",new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance()));
pipeline.addLast(new EchoServerHandler()); // 客户端触发操作
}
});
ChannelFuture cf = sb.bind().sync(); // 服务器异步创建绑定
System.out.println(EchoServer.class + " started and listen on " + cf.channel().localAddress());
cf.channel().closeFuture().sync(); // 关闭服务器通道
} finally {
group.shutdownGracefully().sync(); // 释放线程池资源
}
}
public static void main(String[] args) throws Exception {
new EchoServer(65535).start(); // 启动
}
}
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("server channelRead...; received:" + msg);
StudentPOJO.Student student=(StudentPOJO.Student)msg;
System.out.println(student.getId()+"--"+student.getName());
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("server channelReadComplete..");
// 第一种方法:写一个空的buf,并刷新写出区域。完成后关闭sock channel连接。
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
//ctx.flush(); // 第二种方法:在client端关闭channel连接,这样的话,会触发两次channelReadComplete方法。
//ctx.flush().close().sync(); // 第三种:改成这种写法也可以,但是这中写法,没有第一种方法的好。
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("server occur exception:" + cause.getMessage());
cause.printStackTrace();
ctx.close(); // 关闭发生异常的连接
}
}