Protobuf&GRPC 的基本使用

  • 首先对比一下Java自带的序列化与Protobuf的序列化的大小差异

    public static void main(String[] args) throws Exception {
        Person.User person = Person.User.newBuilder().setName("wangziqiang")
                .setAge(20)
                .setAddress("hebei")
                .setGender("男")
                .setTimestamp(System.currentTimeMillis()).build();
        ObjectOutputStream os = new ObjectOutputStream(new FileOutputStream("javaobj"));
        os.writeObject(person);
        os.close();
    
        person.writeTo(new FileOutputStream("protoobj"));
    }
  • 之后的javaobj是217字节,而proto是34字节,很明显的大小差异,下面是proto的反序列化代码

    public void test() throws Exception {
        Person.User user = Person.User.parseFrom(new FileInputStream("protoobj"));
        System.out.println(user);
    }
    /**
     * name: "wangziqiang"
     * age: 20
     * address: "hebei"
     * gender: "\347\224\267"
     * timestamp: 1546783663870
     */
  • 下面说一下序列化和反序列化的速度,直接上图,图来自网上

Protobuf&GRPC 的基本使用

Protobuf&GRPC 的基本使用

  • 另附一篇protobuf的测试帖子: 即时通讯网,看完后只有一个结论:使用protobuf基本上是稳赚不亏的
  • 之前自己接触过AVRO,也是可以进行序列化和RPC的框架,然后自己就产生了一个疑问:protobuf可以直接做RPC吗?网上的帖子大多是C的,自己就产生了一些误导,认为protobuf可以直接来做RPC的应用,但是这是错误的,经过请教,在钉钉群和云栖社区的问答都给了我很好的答案,如果你也有这个疑问,请移步到 我的云栖问答-protobuf
  • 由于gRPC也是Google开源的RPC框架,所以我将使用这个框架做一个小demo,本质上是跟着网上贴的代码,先会用再说
  • maven

    <dependencies>
        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-netty</artifactId>
            <version>1.17.1</version>
        </dependency>
        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-protobuf</artifactId>
            <version>1.17.1</version>
        </dependency>
        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-stub</artifactId>
            <version>1.17.1</version>
        </dependency>
    </dependencies>
    <build>
        <extensions>
            <extension>
                <groupId>kr.motd.maven</groupId>
                <artifactId>os-maven-plugin</artifactId>
                <version>1.4.1.Final</version>
            </extension>
        </extensions>
        <plugins>
            <plugin>
                <groupId>org.xolstice.maven.plugins</groupId>
                <artifactId>protobuf-maven-plugin</artifactId>
                <version>0.5.0</version>
                <configuration>
                    <protocArtifact>com.google.protobuf:protoc:3.0.0:exe:${os.detected.classifier}</protocArtifact>
                    <pluginId>grpc-java</pluginId>
                    <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.0.0:exe:${os.detected.classifier}</pluginArtifact>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>compile-custom</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
  • proto文件

    syntax = "proto3";
    option java_package = "com.qidai.proto";
    option java_outer_classname = "MyThing";
    
    message Request {
        string name = 1;
    }
    message Response {
        string name = 2;
    }
    service MyRPC {
        rpc sayHi(Request) returns(Response);
    }
  • 到这后maven插件compile,进行编译proto文件
  • Server

    import com.qidai.proto.MyRPCGrpc;
    import com.qidai.proto.MyThing;
    import io.grpc.ServerBuilder;
    import io.grpc.stub.StreamObserver;
    import java.io.IOException;
    public class Server {
        private static final int PORT = 2222;
        private final io.grpc.Server server;
        public Server() throws IOException {
            //这个部分启动server
            this.server = ServerBuilder.forPort(PORT)
                    .addService(new MyRPCImpl())
                    .build()
                    .start();
            System.out.println("Server Started ...");
        }
        private void stop() {
            if (server != null) {
                server.shutdown();
            }
        }
        private void blockUntilShutdown() throws InterruptedException {
            if (server != null) {
                server.awaitTermination();
            }
        }
        public static void main(String[] args) throws IOException, InterruptedException {
            Server server = new Server();
            //block Server防止关闭
            server.blockUntilShutdown();
        }
        static class MyRPCImpl extends MyRPCGrpc.MyRPCImplBase{
            @Override  //这个方法是在proto中定义的rpc方法名,client传来的rpc请求以怎样的逻辑处理
            public void sayHi(MyThing.Request request, StreamObserver<MyThing.Response> responseObserver) {
                System.out.println("request -> " + request.getName());
                MyThing.Response response = MyThing.Response.newBuilder()
                        .setName("Client Hello "+ request.getName())
                        .build();
                //包装返回信息
                responseObserver.onNext(response);
                //等待结束一次请求
                responseObserver.onCompleted();
            }
        }
    }
  • Client

    import com.qidai.proto.MyRPCGrpc;
    import com.qidai.proto.MyThing;
    import io.grpc.ManagedChannel;
    import io.grpc.ManagedChannelBuilder;
    import java.util.concurrent.TimeUnit;
    public class Client {
        private final ManagedChannelBuilder<?> managedChannelBuilder;
        private final MyRPCGrpc.MyRPCBlockingStub blockingStub;
        private final ManagedChannel channel;
        public Client(String name, int port) {
            managedChannelBuilder = ManagedChannelBuilder.forAddress(name, port);
            channel = managedChannelBuilder.usePlaintext().build();
            blockingStub = MyRPCGrpc.newBlockingStub(channel);
        }
        public void shutdown() throws InterruptedException {
            channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
        }
        public  void sayHi(String name){
            MyThing.Request request = MyThing.Request.newBuilder().setName(name).build();
            MyThing.Response response = blockingStub.sayHi(request);
            System.out.println(response.getName());
        }
        public static void main(String[] args) throws Exception{
            Client client = new Client("localhost", 2222);
            for (int i = 0; i < 10; i++) {
                Thread.sleep(1000);
                //进行rpc调用的真正逻辑
                client.sayHi("Hello Server -> " + i);
            }
            client.shutdown();
        }
    }
  • 到这了就可以使用了,先启动server然后启动client,效果如下

    Server
      request -> Hello Server -> 0
      request -> Hello Server -> 1
      request -> Hello Server -> 2
      request -> Hello Server -> 3
      request -> Hello Server -> 4
      request -> Hello Server -> 5
      request -> Hello Server -> 6
      request -> Hello Server -> 7
      request -> Hello Server -> 8
      request -> Hello Server -> 9  
    Client
      Client Hello Hello Server -> 0
      Client Hello Hello Server -> 1
      Client Hello Hello Server -> 2
      Client Hello Hello Server -> 3
      Client Hello Hello Server -> 4
      Client Hello Hello Server -> 5
      Client Hello Hello Server -> 6
      Client Hello Hello Server -> 7
      Client Hello Hello Server -> 8
      Client Hello Hello Server -> 9
上一篇:安装ffmpeg及nginx模块


下一篇:Linux下ffmpeg安装与开发配置