-
首先对比一下Java自带的序列化与Protobuf的序列化的大小差异
public static void main(String[] args) throws Exception { Person.User person = Person.User.newBuilder().setName("wangziqiang") .setAge(20) .setAddress("hebei") .setGender("男") .setTimestamp(System.currentTimeMillis()).build(); ObjectOutputStream os = new ObjectOutputStream(new FileOutputStream("javaobj")); os.writeObject(person); os.close(); person.writeTo(new FileOutputStream("protoobj")); }
-
之后的javaobj是217字节,而proto是34字节,很明显的大小差异,下面是proto的反序列化代码
public void test() throws Exception { Person.User user = Person.User.parseFrom(new FileInputStream("protoobj")); System.out.println(user); } /** * name: "wangziqiang" * age: 20 * address: "hebei" * gender: "\347\224\267" * timestamp: 1546783663870 */
- 下面说一下序列化和反序列化的速度,直接上图,图来自网上
- 另附一篇protobuf的测试帖子: 即时通讯网,看完后只有一个结论:使用protobuf基本上是稳赚不亏的
- 之前自己接触过AVRO,也是可以进行序列化和RPC的框架,然后自己就产生了一个疑问:protobuf可以直接做RPC吗?网上的帖子大多是C的,自己就产生了一些误导,认为protobuf可以直接来做RPC的应用,但是这是错误的,经过请教,在钉钉群和云栖社区的问答都给了我很好的答案,如果你也有这个疑问,请移步到 我的云栖问答-protobuf
- 由于gRPC也是Google开源的RPC框架,所以我将使用这个框架做一个小demo,本质上是跟着网上贴的代码,先会用再说
-
maven
<dependencies> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-netty</artifactId> <version>1.17.1</version> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-protobuf</artifactId> <version>1.17.1</version> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-stub</artifactId> <version>1.17.1</version> </dependency> </dependencies> <build> <extensions> <extension> <groupId>kr.motd.maven</groupId> <artifactId>os-maven-plugin</artifactId> <version>1.4.1.Final</version> </extension> </extensions> <plugins> <plugin> <groupId>org.xolstice.maven.plugins</groupId> <artifactId>protobuf-maven-plugin</artifactId> <version>0.5.0</version> <configuration> <protocArtifact>com.google.protobuf:protoc:3.0.0:exe:${os.detected.classifier}</protocArtifact> <pluginId>grpc-java</pluginId> <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.0.0:exe:${os.detected.classifier}</pluginArtifact> </configuration> <executions> <execution> <goals> <goal>compile</goal> <goal>compile-custom</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
-
proto文件
syntax = "proto3"; option java_package = "com.qidai.proto"; option java_outer_classname = "MyThing"; message Request { string name = 1; } message Response { string name = 2; } service MyRPC { rpc sayHi(Request) returns(Response); }
- 到这后maven插件compile,进行编译proto文件
-
Server
import com.qidai.proto.MyRPCGrpc; import com.qidai.proto.MyThing; import io.grpc.ServerBuilder; import io.grpc.stub.StreamObserver; import java.io.IOException; public class Server { private static final int PORT = 2222; private final io.grpc.Server server; public Server() throws IOException { //这个部分启动server this.server = ServerBuilder.forPort(PORT) .addService(new MyRPCImpl()) .build() .start(); System.out.println("Server Started ..."); } private void stop() { if (server != null) { server.shutdown(); } } private void blockUntilShutdown() throws InterruptedException { if (server != null) { server.awaitTermination(); } } public static void main(String[] args) throws IOException, InterruptedException { Server server = new Server(); //block Server防止关闭 server.blockUntilShutdown(); } static class MyRPCImpl extends MyRPCGrpc.MyRPCImplBase{ @Override //这个方法是在proto中定义的rpc方法名,client传来的rpc请求以怎样的逻辑处理 public void sayHi(MyThing.Request request, StreamObserver<MyThing.Response> responseObserver) { System.out.println("request -> " + request.getName()); MyThing.Response response = MyThing.Response.newBuilder() .setName("Client Hello "+ request.getName()) .build(); //包装返回信息 responseObserver.onNext(response); //等待结束一次请求 responseObserver.onCompleted(); } } }
-
Client
import com.qidai.proto.MyRPCGrpc; import com.qidai.proto.MyThing; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import java.util.concurrent.TimeUnit; public class Client { private final ManagedChannelBuilder<?> managedChannelBuilder; private final MyRPCGrpc.MyRPCBlockingStub blockingStub; private final ManagedChannel channel; public Client(String name, int port) { managedChannelBuilder = ManagedChannelBuilder.forAddress(name, port); channel = managedChannelBuilder.usePlaintext().build(); blockingStub = MyRPCGrpc.newBlockingStub(channel); } public void shutdown() throws InterruptedException { channel.shutdown().awaitTermination(5, TimeUnit.SECONDS); } public void sayHi(String name){ MyThing.Request request = MyThing.Request.newBuilder().setName(name).build(); MyThing.Response response = blockingStub.sayHi(request); System.out.println(response.getName()); } public static void main(String[] args) throws Exception{ Client client = new Client("localhost", 2222); for (int i = 0; i < 10; i++) { Thread.sleep(1000); //进行rpc调用的真正逻辑 client.sayHi("Hello Server -> " + i); } client.shutdown(); } }
-
到这了就可以使用了,先启动server然后启动client,效果如下
Server request -> Hello Server -> 0 request -> Hello Server -> 1 request -> Hello Server -> 2 request -> Hello Server -> 3 request -> Hello Server -> 4 request -> Hello Server -> 5 request -> Hello Server -> 6 request -> Hello Server -> 7 request -> Hello Server -> 8 request -> Hello Server -> 9 Client Client Hello Hello Server -> 0 Client Hello Hello Server -> 1 Client Hello Hello Server -> 2 Client Hello Hello Server -> 3 Client Hello Hello Server -> 4 Client Hello Hello Server -> 5 Client Hello Hello Server -> 6 Client Hello Hello Server -> 7 Client Hello Hello Server -> 8 Client Hello Hello Server -> 9