定义.proto
文件
// 如果使用此注释,则使用proto3; 否则使用proto2
syntax = "proto3";
// 生成类的包名
option java_package = "com.zjw.grpc.gen";
//生成的数据访问类的类名,如果没有指定此值,则生成的类名为proto文件名的驼峰命名方法
option java_outer_classname = "GreeterEntity";
option java_multiple_files = true;
message HelloReply {
string message = 1;
}
message StudentRequest {
string name = 1;
}
message StudentResponse {
string name = 1;
int32 age = 2;
}
message StudentResponseList {
repeated StudentResponse studentResponse = 1;
}
message StreamRequest {
string request_info = 1;
}
message StreamResponse {
string response_info = 1;
}
service PersonService {
// A simple RPC
rpc RequestResponse (StudentRequest) returns (HelloReply) {}
// A server-side streaming RPC
rpc RequestStreamResponse(StudentRequest) returns (stream StudentResponse) {}
// A client-side streaming RPC
rpc StreamRequestResponse(stream StudentRequest) returns (StudentResponseList) {}
// A bidirectional streaming RPC
rpc StreamRequestStreamResponse(stream StreamRequest) returns (stream StreamResponse) {}
}
引入Mavn插件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>grpc-study</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<java.version>1.8</java.version>
<protoc.version>3.7.1</protoc.version>
<grpc.version>1.20.0</grpc.version>
<protobuf.version>2.6.1</protobuf.version>
</properties>
<dependencies>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<version>${grpc.version}</version>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.4.1.Final</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
生成gRPC代码
略
Server端开发
package com.zjw.grpc;
import com.zjw.grpc.gen.HelloReply;
import com.zjw.grpc.gen.PersonServiceGrpc;
import com.zjw.grpc.gen.StreamRequest;
import com.zjw.grpc.gen.StreamResponse;
import com.zjw.grpc.gen.StudentRequest;
import com.zjw.grpc.gen.StudentResponse;
import com.zjw.grpc.gen.StudentResponseList;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.Date;
import java.util.UUID;
/**
* Created by zjwblog<co.zjwblog@gmail.com> on 2021/7/17
*/
public class GRPCServerDemo {
private Server server;
private void start() throws IOException {
int port = 50051;
server = ServerBuilder.forPort(port)
.addService(new PersonServiceImpl())
.build()
.start();
System.out.println("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.err.println("*** shutting down gRPC server since JVM is shutting down");
GRPCServerDemo.this.stop();
System.err.println("*** server shut down");
}));
}
private void stop() {
if (server != null) {
server.shutdown();
}
}
private void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}
public static void main(String[] args) throws IOException, InterruptedException {
final GRPCServerDemo server = new GRPCServerDemo();
server.start();
server.blockUntilShutdown();
}
static class PersonServiceImpl extends PersonServiceGrpc.PersonServiceImplBase {
/**
* 接收一次请求,一次响应
*/
@Override
public void requestResponse(StudentRequest request,
StreamObserver<HelloReply> responseObserver) {
System.out.println("客户端消息: " + request.getName());
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + request.getName()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
/**
* 接收一次请求,流式响应
*/
@Override
public void requestStreamResponse(StudentRequest request,
StreamObserver<StudentResponse> responseObserver) {
System.out.println("客户端消息: " + request.getName());
responseObserver.onNext(StudentResponse.newBuilder().setName("Kerry")
.setAge(30)
.build());
responseObserver.onNext(StudentResponse.newBuilder().setName("John")
.setAge(40)
.build());
responseObserver.onNext(StudentResponse.newBuilder().setName("Jerry")
.setAge(50)
.build());
responseObserver.onCompleted();
}
/**
* 接收流式请求,一次响应
*/
@Override
public StreamObserver<StudentRequest> streamRequestResponse(
StreamObserver<StudentResponseList> responseObserver) {
return new StreamObserver<StudentRequest>() {
public void onNext(StudentRequest studentRequest) {
System.out.println("onNext:" + studentRequest.getName());
}
public void one rror(Throwable throwable) {
System.out.println(throwable.getMessage());
}
public void onCompleted() {
StudentResponse studentResponse1 = StudentResponse.newBuilder()
.setName("Tom")
.setAge(20)
.build();
StudentResponse studentResponse2 = StudentResponse.newBuilder()
.setName("Lea")
.setAge(30)
.build();
StudentResponseList studentResponseList = StudentResponseList.newBuilder()
.addStudentResponse(studentResponse1).addStudentResponse(studentResponse2).build();
responseObserver.onNext(studentResponseList);
responseObserver.onCompleted();
}
};
}
/**
* 接收流式请求,流式响应
*/
@Override
public StreamObserver<StreamRequest> streamRequestStreamResponse(
StreamObserver<StreamResponse> responseObserver) {
return new StreamObserver<StreamRequest>() {
@Override
public void onNext(StreamRequest streamRequest) {
System.out.println("onNext:" + streamRequest.getRequestInfo());
responseObserver.onNext(
StreamResponse.newBuilder()
.setResponseInfo(Long.toString(new Date().getTime()))
.build()
);
}
@Override
public void one rror(Throwable throwable) {
System.out.println(throwable.getMessage());
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
}
}
Client端代码
package com.zjw.grpc;
import com.zjw.grpc.gen.HelloReply;
import com.zjw.grpc.gen.PersonServiceGrpc;
import com.zjw.grpc.gen.StreamRequest;
import com.zjw.grpc.gen.StreamResponse;
import com.zjw.grpc.gen.StudentRequest;
import com.zjw.grpc.gen.StudentResponse;
import com.zjw.grpc.gen.StudentResponseList;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.time.LocalDate;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
public class GRPCClientDemo {
public static void main(String[] args) throws Exception {
test0();
test1();
test2();
test3();
}
public static void test0() throws InterruptedException {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
.usePlaintext().build();
PersonServiceGrpc.PersonServiceBlockingStub stub = PersonServiceGrpc.newBlockingStub(channel);
System.out.println("-----------------------------");
System.out.println("请求-响应");
StudentRequest request = StudentRequest.newBuilder().setName("Kerry").build();
HelloReply response = stub.requestResponse(request);
System.out.println("返回结果: " + response.getMessage());
sleep(5000);
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}
public static void test1() throws InterruptedException {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
.usePlaintext().build();
PersonServiceGrpc.PersonServiceBlockingStub stub = PersonServiceGrpc.newBlockingStub(channel);
System.out.println("-----------------------------");
System.out.println("请求-流式响应");
Iterator<StudentResponse> iter = stub.requestStreamResponse(
StudentRequest.newBuilder().setName("Kerry").build());
while (iter.hasNext()) {
StudentResponse resp = iter.next();
System.out.println("[" + resp.getName() + "," + resp.getAge() + "]");
}
sleep(5000);
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}
public static void test2() throws InterruptedException {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
.usePlaintext().build();
PersonServiceGrpc.PersonServiceStub stub = PersonServiceGrpc.newStub(channel);
System.out.println("-----------------------------");
System.out.println("流式请求-响应");
StreamObserver<StudentResponseList> listObserver = new StreamObserver<StudentResponseList>() {
public void onNext(StudentResponseList studentResponseList) {
studentResponseList.getStudentResponseList().forEach(resp -> {
System.out.println("[" + resp.getName() + "," + resp.getAge() + "]");
});
}
public void one rror(Throwable throwable) {
System.out.println(throwable.getMessage());
}
public void onCompleted() {
System.out.println("请求发送完成");
}
};
StreamObserver<StudentRequest> observer = stub.streamRequestResponse(listObserver);
observer.onNext(StudentRequest.newBuilder().setName("Tom").build());
observer.onNext(StudentRequest.newBuilder().setName("Jerry").build());
observer.onNext(StudentRequest.newBuilder().setName("John").build());
observer.onCompleted();
sleep(5000);
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}
public static void test3() throws InterruptedException {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
.usePlaintext().build();
PersonServiceGrpc.PersonServiceStub stub = PersonServiceGrpc.newStub(channel);
System.out.println("-----------------------------");
System.out.println("流式请求-流式响应");
StreamObserver<StreamRequest> observer = stub.streamRequestStreamResponse(
new StreamObserver<StreamResponse>() {
@Override
public void onNext(StreamResponse streamResponse) {
System.out.println(streamResponse.getResponseInfo());
}
@Override
public void one rror(Throwable throwable) {
System.out.println(throwable.getMessage());
}
@Override
public void onCompleted() {
System.out.println("请求发送完成");
}
}
);
for (int i = 0; i < 10; i++) {
observer.onNext(
StreamRequest.newBuilder().setRequestInfo(Long.toString(i)).build()
);
sleep(1000);
}
sleep(5000);
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}
private static void sleep(long timeout) {
try {
Thread.sleep(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}