Netty之Protobuf使用与实战

Protobuf

netty自带的编码/解码器

数据再网络中传输都是二进制字节码数据、发送数据需要编码、接收数据需要解码

codec的组成部分由两个:decoderencoder

Netty之Protobuf使用与实战

缺点:

1、无法跨语言

2、底层采用java序列化技术、序列化后体积大(二进制编码的五倍多)、性能低

Protobuf介绍

1、高效的结构化数据存储格式,可用于结构化数据串行化(序列化)、适合做数据存储或者==RPC【远程过程调用 remote procedure call】数据交换格式==

2、支持跨平台、跨语言

3、高性能、高可靠性

4、将类的定义使用.proto文件进行描述

5、通过protoc.exe.proto进行编译城java文件

Netty之Protobuf使用与实战

使用案例

Netty之Protobuf使用与实战

创建一个 .proto 文件

syntax="proto3";
option optimize_for=SPEED;//加快解析
option java_package="com.cyurs.netty.codec2"; //指定生成到那个包下
option java_outer_classname="MyDataInfo"; //指定外部类名称
//protobuf 可以使用message 管理其他message
message MyMessage{
  //定义一个枚举
  enum DataType{
    StudentType=0; //在proto3 要求enum编号从0开始
    WorkerType=1;
  }
  DataType data_type=1;  //用data_type标识传的是哪一个枚举类型
  //oneof表示每次枚举类型最多只能出现定义的message(Student、Worker)的其中一个
  oneof dataBody{
    Student student =2;
    Worker worker =3;
  }

}
message Student{
  int32 id=1; //Student类的属性
  string name =2;
}
message Worker{
  string name=1; //Student类的属性
  int32 age =2;
}

使用protoc.exe生成.java文件

Netty之Protobuf使用与实战

Netty之Protobuf使用与实战

配置客户端Handler

public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    /**
     * 当通道就绪就会触发
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        //随机发送student或者worker对象
        int random = new Random().nextInt(2);
        MyDataInfo.MyMessage myMessage = null;
        if (random == 0) {
            myMessage = MyDataInfo.MyMessage.newBuilder().setDataType(MyDataInfo.MyMessage.DataType.StudentType)
                    .setStudent(MyDataInfo.Student.newBuilder().setId(5).setName("玉麒麟").build()).build();
        }else{
            myMessage = MyDataInfo.MyMessage.newBuilder().setDataType(MyDataInfo.MyMessage.DataType.WorkerType)
                .setWorker(MyDataInfo.Worker.newBuilder().setAge(6).setName("黑旋风").build()).build();
        }
        ctx.writeAndFlush(myMessage);
    }
    /**
     * 当通道有读取事件时,会触发
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf)msg;
        System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8));
        System.out.println("服务器的地址:" + ctx.channel().remoteAddress());
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

客户端

public class NettyClient {
    public static void main(String[] args) throws InterruptedException {
        //客户端需要一个事件循环组
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            //创建客户端启动对象
            //注意:客户端使用的不是 ServerBootStrap 而是 Bootstrap
            Bootstrap bootstrap = new Bootstrap();

            //设置相关参数
            bootstrap.group(group) //设置线程组
                    .channel(NioSocketChannel.class) //设置客户端通道的实现类(使用反射)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast("encoder", new ProtobufEncoder());
                            ch.pipeline().addLast(new NettyClientHandler()); //加入自己的处理器
                        }
                    });
            System.out.println("客户端 OK...");

            //启动客户端去连接服务器端
            //关于 channelFuture 涉及到 netty 的异步模型
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
            //给关闭通道进行监听
            channelFuture.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}

配置服务器端Handler

public class NettyServerHandler extends SimpleChannelInboundHandler<MyDataInfo.MyMessage> {	
    /**
     *读取客户端发送过来的消息
     * @param ctx 上下文对象,含有 管道pipeline,通道channel,地址
     * @param msg 就是客户端发送的数据,默认Object
     * @throws Exception
     */
    @Override
    public void channelRead0(ChannelHandlerContext ctx, MyDataInfo.MyMessage msg) throws Exception {

        //根据dataType显示不同信息
        MyDataInfo.MyMessage.DataType dataType = msg.getDataType();
        if (dataType == MyDataInfo.MyMessage.DataType.StudentType) {
            MyDataInfo.Student student = msg.getStudent();
            System.out.println("学生id="+student.getId()+"学生姓名="+student.getName());

        } else if (dataType == MyDataInfo.MyMessage.DataType.WorkerType) {
            MyDataInfo.Worker worker = msg.getWorker();

            System.out.println("工人年龄="+worker.getAge()+"工人姓名="+worker.getName());
        } else {
            System.out.println("传输的类型不正确");
        }


    }

    //数据读取完毕
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //它是 write + flush,将数据写入到缓存buffer,并将buffer中的数据flush进通道
        //一般讲,我们对这个发送的数据进行编码
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~", CharsetUtil.UTF_8));
    }

    //处理异常,一般是关闭通道
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

服务器端

public class NettyServer {
    public static void main(String[] args) throws InterruptedException {
        //创建BossGroup 和 WorkerGroup
        //1、创建两个线程组,bossGroup 和 workerGroup
        //2、bossGroup 只是处理连接请求,真正的和客户端业务处理,会交给 workerGroup 完成
        //3、两个都是无限循环
        //4、bossGroup 和 workerGroup 含有的子线程(NioEventLoop)个数为实际 cpu 核数 * 2
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            //创建服务器端的启动对象,配置参数
            ServerBootstrap bootstrap = new ServerBootstrap();

            //使用链式编程来进行设置,配置
            bootstrap.group(bossGroup, workerGroup) //设置两个线程组
                    .channel(NioServerSocketChannel.class) //使用 NioServerSocketChannel 作为服务器的通道实现
                    .option(ChannelOption.SO_BACKLOG, 128) //设置线程队列得到连接个数
                    .childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态
                    .childHandler(new ChannelInitializer<SocketChannel>() { //为accept channel的pipeline预添加的handler
                        //给 pipeline 添加处理器,每当有连接accept时,就会运行到此处。
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast("decoder", new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance()));


                            ch.pipeline().addLast(new NettyServerHandler());
                        }
                    }); //给我们的 workerGroup 的 EventLoop 对应的管道设置处理器
            System.out.println("........服务器 is ready...");
            //绑定一个端口并且同步,生成了一个ChannelFuture 对象
            //启动服务器(并绑定端口)
            ChannelFuture cf = bootstrap.bind(6668).sync();
            cf.addListener(future -> {
                if(cf.isDone()){
                    System.out.println("操作完成");
                    if (cf.isSuccess()) {
                        System.out.println("绑定成功");
                    } else {
                        System.out.println("绑定失败");
                    }
                }
            });
            //对关闭通道进行监听
            cf.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
上一篇:google protocol buffer——protobuf的问题及改进一


下一篇:Centos7 yum 安装nginx php7