1 Generated-code
首先本地开发环境需要安装grpc和protobuf,这里以macos为例:brew install grpc protobuf
。
无论使用什么编程语言实现GRPC协议的服务,都需要将protobuf定义转换为该语言的代码。
- Java的构建工具Maven提供了自动转换插件
protobuf-maven-plugin
,执行mvn package
会自动使用protoc-gen-grpc-java创建grpc的模板代码。详见hello-grpc-java/pom.xml
。 - Go需要执行
go get github.com/golang/protobuf/protoc-gen-go
安装,然后使用protoc
命令生成grpc代码。详见hello-grpc-go/proto2go.sh
。 - NodeJs需要执行
npm install -g grpc-tools
安装grpc_tools_node_protoc
,然后使用protoc
命令生成grpc代码。详见hello-grpc-nodejs/proto2js.sh
。 - Python需要执行
pip install grpcio-tools
安装grpcio-tools
,然后使用protoc
命令生成grpc代码。详见hello-grpc-python/proto2py.sh
。
在示例工程中,每种语言的代码目录中都有一个proto目录,其中的landing.proto
文件是示例根目录下proto/landing.proto
文件的软连接,这样有利于统一更新protobuf的定义。
2 通信实现
hello数组
private final List<String> HELLO_LIST = Arrays.asList("Hello", "Bonjour", "Hola", "こんにちは", "Ciao", "안녕하세요");
kv.put("data", HELLO_LIST.get(index));
var helloList = []string{"Hello", "Bonjour", "Hola", "こんにちは", "Ciao", "안녕하세요"}
kv["data"] = helloList[index]
let hellos = ["Hello", "Bonjour", "Hola", "こんにちは", "Ciao", "안녕하세요"]
kv.set("data", hellos[index])
hellos = ["Hello", "Bonjour", "Hola", "こんにちは", "Ciao", "안녕하세요"]
result.kv["data"] = hellos[index]
talk
Unary RPC类型的实现是最经典的,阻塞式一发一收。
// 使用blockingStub与服务端通信
public TalkResponse talk(TalkRequest talkRequest) {
return blockingStub.talk(talkRequest);
}
//服务端处理请求后触发StreamObserver实例的两个事件onNext和onCompleted
public void talk(TalkRequest request, StreamObserver<TalkResponse> responseObserver) {
...
responseObserver.onNext(response);
responseObserver.onCompleted();
}
func talk(client pb.LandingServiceClient, request *pb.TalkRequest) {
r, err := client.Talk(context.Background(), request)
}
func (s *ProtoServer) Talk(ctx context.Context, request *pb.TalkRequest) (*pb.TalkResponse, error) {
return &pb.TalkResponse{
Status: 200,
Results: []*pb.TalkResult{s.buildResult(request.Data)},
}, nil
}
function talk(client, request) {
client.talk(request, function (err, response) {
...
})
}
function talk(call, callback) {
const talkResult = buildResult(call.request.getData())
...
callback(null, response)
}
def talk(stub):
response = stub.talk(request)
def talk(self, request, context):
result = build_result(request.data)
...
return response
talkOneAnswerMore
Server streaming RPC类型的实现重点是客户端在发送请求后如何处理流式响应数据,以及服务端的流式返回响应。
public List<TalkResponse> talkOneAnswerMore(TalkRequest request) {
Iterator<TalkResponse> talkResponses = blockingStub.talkOneAnswerMore(request);
talkResponses.forEachRemaining(talkResponseList::add);
return talkResponseList;
}
public void talkOneAnswerMore(TalkRequest request, StreamObserver<TalkResponse> responseObserver) {
String[] datas = request.getData().split(",");
for (String data : datas) {...}
talkResponses.forEach(responseObserver::onNext);
responseObserver.onCompleted();
}
func talkOneAnswerMore(client pb.LandingServiceClient, request *pb.TalkRequest) {
stream, err := client.TalkOneAnswerMore(context.Background(), request)
for {
r, err := stream.Recv()
if err == io.EOF {
break
}
...
}
}
func (s *ProtoServer) TalkOneAnswerMore(request *pb.TalkRequest, stream pb.Landing..Server) error {
datas := strings.Split(request.Data, ",")
for _, d := range datas {
stream.Send(&pb.TalkResponse{...})
}
function talkOneAnswerMore(client, request) {
let call = client.talkOneAnswerMore(request)
call.on('data', function (response) {
...
})
}
function talkOneAnswerMore(call) {
let datas = call.request.getData().split(",")
for (const data in datas) {
...
call.write(response)
}
call.end()
}
def talk_one_answer_more(stub):
responses = stub.talkOneAnswerMore(request)
for response in responses:
logger.info(response)
def talkOneAnswerMore(self, request, context):
datas = request.data.split(",")
for data in datas:
yield response
talkMoreAnswerOne
Client streaming RPC类型的实现重点是客户端以流式发送请求后,告诉服务端请求结束,服务端会将多次请求分别处理,在收到结束时一次返回给客户端。
public void talkMoreAnswerOne(List<TalkRequest> requests) throws InterruptedException {
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<TalkResponse> responseObserver = new StreamObserver<TalkResponse>() {
@Override
public void onNext(TalkResponse talkResponse) {
log.info("Response=\n{}", talkResponse);
}
@Override
public void onCompleted() {
finishLatch.countDown();
}
};
final StreamObserver<TalkRequest> requestObserver = asyncStub.talkMoreAnswerOne(responseObserver);
try {
requests.forEach(request -> {
if (finishLatch.getCount() > 0) {
requestObserver.onNext(request);
});
requestObserver.onCompleted();
}
public StreamObserver<TalkRequest> talkMoreAnswerOne(StreamObserver<TalkResponse> responseObserver) {
return new StreamObserver<TalkRequest>() {
@Override
public void onNext(TalkRequest request) {
talkRequests.add(request);
}
@Override
public void onCompleted() {
responseObserver.onNext(buildResponse(talkRequests));
responseObserver.onCompleted();
}
};
}
func talkMoreAnswerOne(client pb.LandingServiceClient, requests []*pb.TalkRequest) {
stream, err := client.TalkMoreAnswerOne(context.Background())
for _, request := range requests {
stream.Send(request)
}
r, err := stream.CloseAndRecv()
}
func (s *ProtoServer) TalkMoreAnswerOne(stream pb.LandingService_TalkMoreAnswerOneServer) error {
for {
in, err := stream.Recv()
if err == io.EOF {
talkResponse := &pb.TalkResponse{
Status: 200,
Results: rs,
}
stream.SendAndClose(talkResponse)
return nil
}
rs = append(rs, s.buildResult(in.Data))
}
}
function talkMoreAnswerOne(client, requests) {
let call = client.talkMoreAnswerOne(function (err, response) {
...
})
requests.forEach(request => {
call.write(request)
})
call.end()
}
function talkMoreAnswerOne(call, callback) {
let talkResults = []
call.on('data', function (request) {
talkResults.push(buildResult(request.getData()))
})
call.on('end', function () {
let response = new messages.TalkResponse()
response.setStatus(200)
response.setResultsList(talkResults)
callback(null, response)
})
}
def talk_more_answer_one(stub):
response_summary = stub.talkMoreAnswerOne(request_iterator)
def generate_request():
for _ in range(0, 3):
yield request
def talkMoreAnswerOne(self, request_iterator, context):
for request in request_iterator:
response.results.append(build_result(request.data))
return response
talkBidirectional
Bidirectional streaming RPC类型的实现重点是客户端以流式发送请求后,告诉服务端请求结束,服务端会在每次请求后将结果返回,并在收到结束时,告诉客户端结束。
public void talkBidirectional(List<TalkRequest> requests) throws InterruptedException {
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<TalkResponse> responseObserver = new StreamObserver<TalkResponse>() {
@Override
public void onNext(TalkResponse talkResponse) {
log.info("Response=\n{}", talkResponse);
}
@Override
public void onCompleted() {
finishLatch.countDown();
}
};
final StreamObserver<TalkRequest> requestObserver = asyncStub.talkBidirectional(responseObserver);
try {
requests.forEach(request -> {
if (finishLatch.getCount() > 0) {
requestObserver.onNext(request);
...
requestObserver.onCompleted();
}
public StreamObserver<TalkRequest> talkBidirectional(StreamObserver<TalkResponse> responseObserver) {
return new StreamObserver<TalkRequest>() {
@Override
public void onNext(TalkRequest request) {
responseObserver.onNext(TalkResponse.newBuilder()
.setStatus(200)
.addResults(buildResult(request.getData())).build());
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
func talkBidirectional(client pb.LandingServiceClient, requests []*pb.TalkRequest) {
stream, err := client.TalkBidirectional(context.Background())
waitc := make(chan struct{})
go func() {
for {
r, err := stream.Recv()
if err == io.EOF {
// read done.
close(waitc)
return
}
}
}()
for _, request := range requests {
stream.Send(request)
}
stream.CloseSend()
<-waitc
}
func (s *ProtoServer) TalkBidirectional(stream pb.LandingService_TalkBidirectionalServer) error {
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
stream.Send(talkResponse)
}
}
function talkBidirectional(client, requests) {
let call = client.talkBidirectional()
call.on('data', function (response) {
...
})
requests.forEach(request => {
call.write(request)
})
call.end()
}
function talkBidirectional(call) {
call.on('data', function (request) {
call.write(response)
})
call.on('end', function () {
call.end()
})
}
def talk_bidirectional(stub):
responses = stub.talkBidirectional(request_iterator)
for response in responses:
logger.info(response)
def talkBidirectional(self, request_iterator, context):
for request in request_iterator:
yield response
2 要点实现
环境变量
private static String getGrcServer() {
String server = System.getenv("GRPC_SERVER");
if (server == null) {
return "localhost";
}
return server;
}
func grpcServer() string {
server := os.Getenv("GRPC_SERVER")
if len(server) == 0 {
return "localhost"
} else {
return server
}
}
function grpcServer() {
let server = process.env.GRPC_SERVER;
if (typeof server !== 'undefined' && server !== null) {
return server
} else {
return "localhost"
}
}
def grpc_server():
server = os.getenv("GRPC_SERVER")
if server:
return server
else:
return "localhost"
随机数
public static String getRandomId() {
return String.valueOf(random.nextInt(5));
}
func randomId(max int) string {
return strconv.Itoa(rand.Intn(max))
}
function randomId(max) {
return Math.floor(Math.random() * Math.floor(max)).toString()
}
def random_id(end):
return str(random.randint(0, end))
时间戳
TalkResult.newBuilder().setId(System.nanoTime())
result.Id = time.Now().UnixNano()
result.setId(Math.round(Date.now() / 1000))
result.id = int((time.time()))
UUID
kv.put("id", UUID.randomUUID().toString());
import (
"github.com/google/uuid"
)
kv["id"] = uuid.New().String()
kv.set("id", uuid.v1())
result.kv["id"] = str(uuid.uuid1())
Sleep
TimeUnit.SECONDS.sleep(1);
time.Sleep(2 * time.Millisecond)
let sleep = require('sleep')
sleep.msleep(2)
time.sleep(random.uniform(0.5, 1.5))
3 验证
功能验证
完成功能开发后,我们在一个终端启动GRPC服务,在另一个终端启动客户端。客户端分别对4个通信接口进行请求。
java
mvn exec:java -Dexec.mainClass="org.feuyeux.grpc.server.ProtoServer"
mvn exec:java -Dexec.mainClass="org.feuyeux.grpc.client.ProtoClient"
go
go run server.go
go run client/proto_client.go
nodejs
node proto_server.js
node proto_client.js
python
python server/protoServer.py
python client/protoClient.py
交叉通信
在功能验证基础上,我们启动任意一种编程语言实现的Server端,然后使用其他4种客户端进行验证。以确保不同编程语言实现的GRPC通信行为一致。这步验证是后续容器化和网格化的基础,因为每种编程语言的Server端都会作为同一个Kubernetes Service的一个版本的Deployment发布,它们的行为必须一致,以保证路由到不同版本,结果是一致的。
4 构建和分发
构建
通信功能验证完毕后,接下来是将4个工程编译、构建、打包。这一步的输出是制作镜像的输入。
java
分别构建服务端和客户端的jar,将其拷贝到docker目录备用。
mvn clean install -DskipTests -f server_pom
cp target/hello-grpc-java.jar ../docker/
mvn clean install -DskipTests -f client_pom
cp target/hello-grpc-java.jar ../docker/
go
go编译的二进制是平台相关的,因为我们最终要部署到linux上,因此构建命令如下。然后将二进制拷贝到docker目录备用。
env GOOS=linux GOARCH=amd64 go build -o proto_server server.go
mv proto_server ../docker/
env GOOS=linux GOARCH=amd64 go build -o proto_client client/proto_client.go
mv proto_client ../docker/
nodejs
node需要在docker镜像中进行构建,才能支持运行时所需的各种c++依赖。因此这一步主要是拷贝备用。
cp ../hello-grpc-nodejs/proto_server.js node
cp ../hello-grpc-nodejs/package.json node
cp -R ../hello-grpc-nodejs/common node
cp -R ../proto node
cp ../hello-grpc-nodejs/*_client.js node
python
python无需编译,直接拷贝备用即可。
cp -R ../hello-grpc-python/server py
cp ../hello-grpc-python/start_server.sh py
cp -R ../proto py
cp ../hello-grpc-python/proto2py.sh py
cp -R ../hello-grpc-python/client py
cp ../hello-grpc-python/start_client.sh py
dockerfile
构建完毕后,docker路径下存储了dockerfile所需的全部文件。这里将dockerfile中重点信息说明如下。
- 基础镜像我们尽量选择
alpine
,因为尺寸最小。python的基础镜像注意选择2.7版本的python:2
,因为示例使用的是python2。如果对python3有强需求,请基于示例代码酌情修改。 - nodejs需要安装c++及编译器make,npm包需要安装grpc-tools。
这里以nodejs server的镜像作为示例,说明构建镜像的过程。
grpc-server-node.dockerfile
FROM node:14.11-alpine
RUN apk add --update \
python \
make \
g++ \
&& rm -rf /var/cache/apk/*
RUN npm config set registry https://registry.npm.taobao.org && npm install -g node-pre-gyp grpc-tools --unsafe-perm
COPY node/package.json .
RUN npm install --unsafe-perm
COPY node .
ENTRYPOINT ["node","proto_server.js"]
docker build
docker build -f grpc-server-node.dockerfile -t registry.cn-beijing.aliyuncs.com/asm_repo/grpc_server_node:1.0.0 .
镜像列表
最终我们会构建出8个镜像,使用push命令将镜像分发到到阿里云ACR服务,作为下一篇kube的基础。
- registry.cn-beijing.aliyuncs.com/asm_repo/grpc_server_java:1.0.0
- registry.cn-beijing.aliyuncs.com/asm_repo/grpc_client_java:1.0.0
- registry.cn-beijing.aliyuncs.com/asm_repo/grpc_server_go:1.0.0
- registry.cn-beijing.aliyuncs.com/asm_repo/grpc_client_go:1.0.0
- registry.cn-beijing.aliyuncs.com/asm_repo/grpc_server_node:1.0.0
- registry.cn-beijing.aliyuncs.com/asm_repo/grpc_client_node:1.0.0
- registry.cn-beijing.aliyuncs.com/asm_repo/grpc_server_python:1.0.0
- registry.cn-beijing.aliyuncs.com/asm_repo/grpc_client_python:1.0.0