gRPC案例

定义.proto文件

// 如果使用此注释,则使用proto3; 否则使用proto2
syntax = "proto3";
// 生成类的包名
option java_package = "com.zjw.grpc.gen";
//生成的数据访问类的类名,如果没有指定此值,则生成的类名为proto文件名的驼峰命名方法
option java_outer_classname = "GreeterEntity";
option java_multiple_files = true;

message HelloReply {
  string message = 1;
}

message StudentRequest {
  string name = 1;
}

message StudentResponse {
  string name = 1;
  int32 age = 2;
}

message StudentResponseList {
  repeated StudentResponse studentResponse = 1;
}

message StreamRequest {
  string request_info = 1;
}

message StreamResponse {
  string response_info = 1;
}

service PersonService {
  // A simple RPC
  rpc RequestResponse (StudentRequest) returns (HelloReply) {}
  // A server-side streaming RPC
  rpc RequestStreamResponse(StudentRequest) returns (stream StudentResponse) {}
  // A client-side streaming RPC
  rpc StreamRequestResponse(stream StudentRequest) returns (StudentResponseList) {}
  // A bidirectional streaming RPC
  rpc StreamRequestStreamResponse(stream StreamRequest) returns (stream StreamResponse) {}
}

引入Mavn插件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>org.example</groupId>
  <artifactId>grpc-study</artifactId>
  <version>1.0-SNAPSHOT</version>

  <properties>
    <java.version>1.8</java.version>
    <protoc.version>3.7.1</protoc.version>
    <grpc.version>1.20.0</grpc.version>
    <protobuf.version>2.6.1</protobuf.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>io.grpc</groupId>
      <artifactId>grpc-protobuf</artifactId>
      <version>${grpc.version}</version>
    </dependency>
    <dependency>
      <groupId>io.grpc</groupId>
      <artifactId>grpc-stub</artifactId>
      <version>${grpc.version}</version>
    </dependency>
    <dependency>
      <groupId>io.grpc</groupId>
      <artifactId>grpc-netty</artifactId>
      <version>${grpc.version}</version>
    </dependency>


  </dependencies>


  <build>
    <extensions>
      <extension>
        <groupId>kr.motd.maven</groupId>
        <artifactId>os-maven-plugin</artifactId>
        <version>1.4.1.Final</version>
      </extension>
    </extensions>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>2.3.2</version>
        <configuration>
          <source>${java.version}</source>
          <target>${java.version}</target>
          <encoding>UTF-8</encoding>
        </configuration>
      </plugin>

      <plugin>
        <groupId>org.xolstice.maven.plugins</groupId>
        <artifactId>protobuf-maven-plugin</artifactId>
        <version>0.6.1</version>
        <configuration>
          <protocArtifact>com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}</protocArtifact>
          <pluginId>grpc-java</pluginId>
          <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
        </configuration>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
              <goal>compile-custom</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
</project>

生成gRPC代码

Server端开发

package com.zjw.grpc;

import com.zjw.grpc.gen.HelloReply;
import com.zjw.grpc.gen.PersonServiceGrpc;
import com.zjw.grpc.gen.StreamRequest;
import com.zjw.grpc.gen.StreamResponse;
import com.zjw.grpc.gen.StudentRequest;
import com.zjw.grpc.gen.StudentResponse;
import com.zjw.grpc.gen.StudentResponseList;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.Date;
import java.util.UUID;

/**
 * Created by zjwblog<co.zjwblog@gmail.com> on 2021/7/17
 */
public class GRPCServerDemo {

  private Server server;

  private void start() throws IOException {
    int port = 50051;
    server = ServerBuilder.forPort(port)
        .addService(new PersonServiceImpl())
        .build()
        .start();
    System.out.println("Server started, listening on " + port);
    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
      System.err.println("*** shutting down gRPC server since JVM is shutting down");
      GRPCServerDemo.this.stop();
      System.err.println("*** server shut down");
    }));
  }

  private void stop() {
    if (server != null) {
      server.shutdown();
    }
  }

  private void blockUntilShutdown() throws InterruptedException {
    if (server != null) {
      server.awaitTermination();
    }
  }

  public static void main(String[] args) throws IOException, InterruptedException {
    final GRPCServerDemo server = new GRPCServerDemo();
    server.start();
    server.blockUntilShutdown();
  }

  static class PersonServiceImpl extends PersonServiceGrpc.PersonServiceImplBase {

    /**
     * 接收一次请求,一次响应
     */
    @Override
    public void requestResponse(StudentRequest request,
        StreamObserver<HelloReply> responseObserver) {
      System.out.println("客户端消息: " + request.getName());

      HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + request.getName()).build();
      responseObserver.onNext(reply);
      responseObserver.onCompleted();
    }

    /**
     * 接收一次请求,流式响应
     */
    @Override
    public void requestStreamResponse(StudentRequest request,
        StreamObserver<StudentResponse> responseObserver) {
      System.out.println("客户端消息: " + request.getName());

      responseObserver.onNext(StudentResponse.newBuilder().setName("Kerry")
          .setAge(30)
          .build());
      responseObserver.onNext(StudentResponse.newBuilder().setName("John")
          .setAge(40)
          .build());
      responseObserver.onNext(StudentResponse.newBuilder().setName("Jerry")
          .setAge(50)
          .build());

      responseObserver.onCompleted();
    }

    /**
     * 接收流式请求,一次响应
     */
    @Override
    public StreamObserver<StudentRequest> streamRequestResponse(
        StreamObserver<StudentResponseList> responseObserver) {
      return new StreamObserver<StudentRequest>() {
        public void onNext(StudentRequest studentRequest) {
          System.out.println("onNext:" + studentRequest.getName());
        }

        public void one rror(Throwable throwable) {
          System.out.println(throwable.getMessage());
        }

        public void onCompleted() {
          StudentResponse studentResponse1 = StudentResponse.newBuilder()
              .setName("Tom")
              .setAge(20)
              .build();
          StudentResponse studentResponse2 = StudentResponse.newBuilder()
              .setName("Lea")
              .setAge(30)
              .build();

          StudentResponseList studentResponseList = StudentResponseList.newBuilder()
              .addStudentResponse(studentResponse1).addStudentResponse(studentResponse2).build();

          responseObserver.onNext(studentResponseList);
          responseObserver.onCompleted();

        }
      };
    }

    /**
     * 接收流式请求,流式响应
     */
    @Override
    public StreamObserver<StreamRequest> streamRequestStreamResponse(
        StreamObserver<StreamResponse> responseObserver) {
      return new StreamObserver<StreamRequest>() {
        @Override
        public void onNext(StreamRequest streamRequest) {
          System.out.println("onNext:" + streamRequest.getRequestInfo());

          responseObserver.onNext(
              StreamResponse.newBuilder()
                  .setResponseInfo(Long.toString(new Date().getTime()))
                  .build()
          );
        }

        @Override
        public void one rror(Throwable throwable) {
          System.out.println(throwable.getMessage());
        }

        @Override
        public void onCompleted() {
          responseObserver.onCompleted();
        }
      };
    }
  }
}

Client端代码

package com.zjw.grpc;

import com.zjw.grpc.gen.HelloReply;
import com.zjw.grpc.gen.PersonServiceGrpc;
import com.zjw.grpc.gen.StreamRequest;
import com.zjw.grpc.gen.StreamResponse;
import com.zjw.grpc.gen.StudentRequest;
import com.zjw.grpc.gen.StudentResponse;
import com.zjw.grpc.gen.StudentResponseList;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.time.LocalDate;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;

public class GRPCClientDemo {

  public static void main(String[] args) throws Exception {
    test0();
    test1();
    test2();
    test3();
  }

  public static void test0() throws InterruptedException {
    ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
        .usePlaintext().build();
    PersonServiceGrpc.PersonServiceBlockingStub stub = PersonServiceGrpc.newBlockingStub(channel);

    System.out.println("-----------------------------");
    System.out.println("请求-响应");
    StudentRequest request = StudentRequest.newBuilder().setName("Kerry").build();
    HelloReply response = stub.requestResponse(request);
    System.out.println("返回结果: " + response.getMessage());

    sleep(5000);
    channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
  }

  public static void test1() throws InterruptedException {
    ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
        .usePlaintext().build();
    PersonServiceGrpc.PersonServiceBlockingStub stub = PersonServiceGrpc.newBlockingStub(channel);

    System.out.println("-----------------------------");
    System.out.println("请求-流式响应");
    Iterator<StudentResponse> iter = stub.requestStreamResponse(
        StudentRequest.newBuilder().setName("Kerry").build());
    while (iter.hasNext()) {
      StudentResponse resp = iter.next();
      System.out.println("[" + resp.getName() + "," + resp.getAge() + "]");
    }
    sleep(5000);
    channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
  }

  public static void test2() throws InterruptedException {
    ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
        .usePlaintext().build();

    PersonServiceGrpc.PersonServiceStub stub = PersonServiceGrpc.newStub(channel);

    System.out.println("-----------------------------");
    System.out.println("流式请求-响应");
    StreamObserver<StudentResponseList> listObserver = new StreamObserver<StudentResponseList>() {
      public void onNext(StudentResponseList studentResponseList) {
        studentResponseList.getStudentResponseList().forEach(resp -> {
          System.out.println("[" + resp.getName() + "," + resp.getAge() + "]");
        });
      }

      public void one rror(Throwable throwable) {
        System.out.println(throwable.getMessage());
      }

      public void onCompleted() {
        System.out.println("请求发送完成");
      }
    };
    StreamObserver<StudentRequest> observer = stub.streamRequestResponse(listObserver);
    observer.onNext(StudentRequest.newBuilder().setName("Tom").build());
    observer.onNext(StudentRequest.newBuilder().setName("Jerry").build());
    observer.onNext(StudentRequest.newBuilder().setName("John").build());
    observer.onCompleted();

    sleep(5000);
    channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
  }

  public static void test3() throws InterruptedException {
    ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
        .usePlaintext().build();
    PersonServiceGrpc.PersonServiceStub stub = PersonServiceGrpc.newStub(channel);

    System.out.println("-----------------------------");
    System.out.println("流式请求-流式响应");

    StreamObserver<StreamRequest> observer = stub.streamRequestStreamResponse(
        new StreamObserver<StreamResponse>() {
          @Override
          public void onNext(StreamResponse streamResponse) {
            System.out.println(streamResponse.getResponseInfo());
          }

          @Override
          public void one rror(Throwable throwable) {
            System.out.println(throwable.getMessage());
          }

          @Override
          public void onCompleted() {
            System.out.println("请求发送完成");
          }
        }
    );

    for (int i = 0; i < 10; i++) {
      observer.onNext(
          StreamRequest.newBuilder().setRequestInfo(Long.toString(i)).build()
      );
      sleep(1000);
    }

    sleep(5000);
    channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
  }

  private static void sleep(long timeout) {
    try {
      Thread.sleep(timeout);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}

上一篇:docker容器的创建与管理过程


下一篇:go 中的grpc的stream 使用