Netty:使用protobuf发送和接受消息

1.下载protobuf3
2.编写.proto文件

syntax="proto3";
package cn.edu.tju.nt;

option java_outer_classname = "SubscribeReqProto";

message SubscribeReq{
    int32 subReqID = 1 ;
     string userName = 2;
     string productName = 3;
     repeated string address = 4;
}
syntax="proto3";
package cn.edu.tju.nt;

option java_outer_classname = "SubscribeRespProto";

message SubscribeResp{
    int32 subReqID = 1;
    int32 respCode = 2;
    string desc = 3;
}



3.进入.proto文件所在路径,执行命令

protoc SubscribeReq.proto --java_out=.
protoc SubscribeResp.proto --java_out=.

4.创建maven项目,并把步骤3中生成的文件夹拷贝到项目中
5.增加Netty和protobuf相关依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.edu.tju</groupId>
    <artifactId>netty-protobuf</artifactId>
    <version>1.0-SNAPSHOT</version>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>

        </plugins>
    </build>
    <dependencies>
        <!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.0.0.Final</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>3.0.0</version>
        </dependency>




    </dependencies>



</project>

6.创建服务器端通道处理器和客户端通道处理器:

package cn.edu.tju.handler;

import cn.edu.tju.nt.SubscribeReqProto;
import cn.edu.tju.nt.SubscribeRespProto;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.Date;

public class ObjectServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        SubscribeReqProto.SubscribeReq req = (SubscribeReqProto.SubscribeReq) msg;
        System.out.println("Server received : " + req.toString() );
        ctx.writeAndFlush(resp(req.getSubReqID()));
    }

    private SubscribeRespProto.SubscribeResp resp(int subReqID) {
        SubscribeRespProto.SubscribeResp.Builder builder = SubscribeRespProto.SubscribeResp
                .newBuilder();
        builder.setSubReqID(subReqID);
        builder.setRespCode(0);
        builder.setDesc("Java in action");
        return builder.build();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        System.out.println(cause.getMessage());
        ctx.close();
    }
}

package cn.edu.tju.handler;



import java.util.ArrayList;
import java.util.List;

import cn.edu.tju.nt.SubscribeReqProto;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class ClientHandler extends ChannelInboundHandlerAdapter
{
    public ClientHandler()
    {

    }

    @Override
    public void channelActive(ChannelHandlerContext ctx)
    {
        for(int i=0;i<1;i++) {
            ctx.write(subReq(i));
        }
        ctx.flush();
    }

    private SubscribeReqProto.SubscribeReq subReq(int i)
    {
        SubscribeReqProto.SubscribeReq.Builder builder=SubscribeReqProto.SubscribeReq.newBuilder();
        builder.setSubReqID(i);
        builder.setUserName("amadeus");
        builder.setProductName("javaStudy");
        List<String> address = new ArrayList<String>();
        address.add("beijing");
        address.add("tianjin");
        address.add("shanghai");
        builder.addAllAddress(address);
        return builder.build();

    }

    @Override
    public void channelRead(ChannelHandlerContext ctx,Object msg)
    {
        System.out.println("Client received: "+msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception
    {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx,Throwable e)
    {
        System.out.println(e.getMessage());
    }

}

7.创建服务器端启动类和客户端启动类:

package cn.edu.tju;

import cn.edu.tju.handler.ObjectServerHandler;
import cn.edu.tju.nt.SubscribeReqProto;
import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;

public class NettyServer {
    private static int port=9095;
    public static void main(String[] args) {
        EventLoopGroup bossGroup=new NioEventLoopGroup();
        EventLoopGroup workerGroup=new NioEventLoopGroup();
        try{
            ServerBootstrap serverBootstrap=new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<Channel>() {
                        @Override
                        protected void initChannel(Channel channel) throws Exception {
                            channel.pipeline().addLast(new ProtobufVarint32FrameDecoder());
                            channel.pipeline().addLast(new ProtobufDecoder(SubscribeReqProto.SubscribeReq.getDefaultInstance()));
                            channel.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
                            channel.pipeline().addLast(new ProtobufEncoder());
                            channel.pipeline().addLast(new ObjectServerHandler());


                        }
                    });
            ChannelFuture channelFuture=serverBootstrap.bind(port).sync();
            channelFuture.channel().closeFuture().sync();




        }catch (Exception ex){

        }

    }
}

package cn.edu.tju;

import cn.edu.tju.handler.ClientHandler;
import cn.edu.tju.nt.SubscribeRespProto;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;

import java.net.InetAddress;

public class NettyClient {
    private static int port=9095;
    public static void main(String[] args) {
        EventLoopGroup workerGroup=new NioEventLoopGroup();
        try{
            Bootstrap bootstrap=new Bootstrap();
            bootstrap.group(workerGroup);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.handler(new ChannelInitializer<SocketChannel>(){

                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline().addLast(new ProtobufVarint32FrameDecoder());
                    socketChannel.pipeline().addLast(new ProtobufDecoder(SubscribeRespProto.SubscribeResp.getDefaultInstance()));
                    socketChannel.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
                    socketChannel.pipeline().addLast(new ProtobufEncoder());
                    socketChannel.pipeline().addLast(new ClientHandler());
                }
            });
            ChannelFuture channelFuture = bootstrap.connect("localhost", port).sync();
            channelFuture.channel().closeFuture().sync();

        }catch (Exception ex){

        }
        finally {
            workerGroup.shutdownGracefully();
        }


    }
}


8.启动服务器端和客户端,服务器端和客户端分别输出:
Netty:使用protobuf发送和接受消息
Netty:使用protobuf发送和接受消息

上一篇:go 使用 Protobuf


下一篇:RPC基础