grpc 使用
简介
参考文档
1. 安装
2. 简介
3. Protobuf转换
4. Protobuf语法
5.Hello gRPC
5.1 目录结构
root@www:/opt/go/src/gitee.com/dong9205/grpcStudy/topgoer/grpc_hello/hello# tree
.
├── client
│ └── main.go
└── server
├── main.go
root@www:/opt/go/src/gitee.com/dong9205/grpcStudy/topgoer/grpc_hello/proto/hello# tree
.
└── hello.proto
5.2 server.go
package main
import (
"fmt"
"net"
"os"
pb "grpc-hello/proto/hello" // 引入编译生成的包
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
)
const (
// Address gRPC服务地址
Address = "127.0.0.1:50052"
)
// 定义helloService并实现约定的接口
type helloService struct{}
// HelloService Hello服务
var HelloService = helloService{}
// SayHello 实现Hello服务接口
func (h helloService) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloResponse, error) {
grpclog.Println("收到请求grpclog")
resp := new(pb.HelloResponse)
resp.Message = fmt.Sprintf("Hello %s.", in.Name)
return resp, nil
}
func main() {
grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stdout, os.Stdout, os.Stdout))
listen, err := net.Listen("tcp", Address)
if err != nil {
grpclog.Fatalf("Failed to listen: %v", err)
}
// 实例化grpc Server
s := grpc.NewServer()
// 注册HelloService
pb.RegisterHelloServer(s, HelloService)
grpclog.Println("Listen on " + Address)
s.Serve(listen)
}
5.3 client.go
package main
import (
pb "grpc-hello/proto/hello" // 引入proto包
"os"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
)
const (
// Address gRPC服务地址
Address = "127.0.0.1:50052"
)
func main() {
grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stdout, os.Stdout, os.Stdout))
// 连接
conn, err := grpc.Dial(Address, grpc.WithInsecure())
if err != nil {
grpclog.Fatalln(err)
}
defer conn.Close()
// 初始化客户端
c := pb.NewHelloClient(conn)
// 调用方法
req := &pb.HelloRequest{Name: "gRPC"}
res, err := c.SayHello(context.Background(), req)
if err != nil {
grpclog.Fatalln(err)
}
grpclog.Println(res.Message)
}
5.4 hello.proto
syntax = "proto3"; // 指定proto版本
package hello; // 指定默认包名
// 指定golang包名
option go_package = "./hello";
// 定义Hello服务
service Hello {
// 定义SayHello方法
rpc SayHello(HelloRequest) returns (HelloResponse) {}
}
// HelloRequest 请求结构
message HelloRequest {
string name = 1;
}
// HelloResponse 响应结构
message HelloResponse {
string message = 1;
}
5.5 将proto转换为go
root@www:/opt/go/src/gitee.com/dong9205/grpcStudy/topgoer/grpc_hello/proto# protoc -I . --go_out=plugins=grpc:. ./hello/hello.proto
5.6 示例
server
root@www:/opt/go/src/gitee.com/dong9205/grpcStudy/topgoer/grpc_hello/hello# ./server/server
2022/01/10 19:05:30 INFO: Listen on 127.0.0.1:50052
2022/01/10 19:05:32 INFO: 收到请求grpclog
client
root@www:/opt/go/src/gitee.com/dong9205/grpcStudy/topgoer/grpc_hello/hello# ./client/client
2022/01/10 19:05:32 INFO: [core] original dial target is: "127.0.0.1:50052"
2022/01/10 19:05:32 INFO: [core] dial target "127.0.0.1:50052" parse failed: parse "127.0.0.1:50052": first path segment in URL cannot contain colon
2022/01/10 19:05:32 INFO: [core] fallback to scheme "passthrough"
2022/01/10 19:05:32 INFO: [core] parsed dial target is: {Scheme:passthrough Authority: Endpoint:127.0.0.1:50052 URL:{Scheme:passthrough Opaque: User: Host: Path:/127.0.0.1:50052 RawPath: ForceQuery:false RawQuery: Fragment: RawFragment:}}
2022/01/10 19:05:32 INFO: [core] Channel authority set to "127.0.0.1:50052"
2022/01/10 19:05:32 INFO: [core] ccResolverWrapper: sending update to cc: {[{127.0.0.1:50052 <nil> <nil> 0 <nil>}] <nil> <nil>}
2022/01/10 19:05:32 INFO: [core] ClientConn switching balancer to "pick_first"
2022/01/10 19:05:32 INFO: [core] Channel switches to new LB policy "pick_first"
2022/01/10 19:05:32 INFO: [core] blockingPicker: the picked transport is not ready, loop back to repick
2022/01/10 19:05:32 INFO: [core] Subchannel Connectivity change to CONNECTING
2022/01/10 19:05:32 INFO: [core] Subchannel picks a new address "127.0.0.1:50052" to connect
2022/01/10 19:05:32 INFO: [core] Channel Connectivity change to CONNECTING
2022/01/10 19:05:32 INFO: [core] Subchannel Connectivity change to READY
2022/01/10 19:05:32 INFO: [core] Channel Connectivity change to READY
2022/01/10 19:05:32 INFO: Hello gRPC.
2022/01/10 19:05:32 INFO: [core] Channel Connectivity change to SHUTDOWN
2022/01/10 19:05:32 INFO: [core] Subchannel Connectivity change to SHUTDOWN
6 TLS认证
proto共用hello项目
6.1 生成证书
6.2 目录结构
root@www:/opt/go/src/gitee.com/dong9205/grpcStudy/topgoer/grpc_hello/proto# tree hello
hello
├── hello.pb.go
└── hello.proto
root@www:/opt/go/src/gitee.com/dong9205/grpcStudy/topgoer/grpc_hello/hello_tls# tree
.
├── client
│ └── main.go
├── keys
│ ├── hello.csr
│ ├── hello.key
│ ├── hello.pem
│ ├── openssl.cnf
│ ├── server.key
│ ├── server.pem
│ └── server.srl
└── server
├── main.go
3 directories, 11 files
6.3 server.go
package main
import (
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"net"
"os"
pb "grpc-hello/proto/hello" // 引入编译生成的包
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
)
const (
// Address gRPC服务地址
Address = "127.0.0.1:50052"
)
// 定义helloService并实现约定的接口
type helloService struct{}
// HelloService Hello服务
var HelloService = helloService{}
// SayHello 实现Hello服务接口
func (h helloService) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloResponse, error) {
grpclog.Println("收到请求grpclog")
resp := new(pb.HelloResponse)
resp.Message = fmt.Sprintf("Hello %s.", in.Name)
return resp, nil
}
func main() {
grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stdout, os.Stdout, os.Stdout))
listen, err := net.Listen("tcp", Address)
if err != nil {
grpclog.Fatalf("Failed to listen: %v", err)
}
// TLS认证
cert, _ := tls.LoadX509KeyPair("../keys/hello.pem", "../keys/hello.key")
certPool := x509.NewCertPool()
ca, _ := ioutil.ReadFile("../keys/server.pem")
certPool.AppendCertsFromPEM(ca)
creds := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{cert},
ClientAuth: tls.RequireAndVerifyClientCert,
ClientCAs: certPool,
})
// 实例化grpc Server
s := grpc.NewServer(grpc.Creds(creds))
// 注册HelloService
pb.RegisterHelloServer(s, HelloService)
grpclog.Infoln("Listen on " + Address + " with TSL")
s.Serve(listen)
}
6.3 client.go
package main
import (
"crypto/tls"
"crypto/x509"
pb "grpc-hello/proto/hello" // 引入proto包
"io/ioutil"
"os"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
)
const (
// Address gRPC服务地址
Address = "127.0.0.1:50052"
)
func main() {
grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stdout, os.Stdout, os.Stdout))
// TLS连接 记得把server name改成你写的服务器地址
cert, _ := tls.LoadX509KeyPair("../keys/hello.pem", "../keys/hello.key")
certPool := x509.NewCertPool()
ca, _ := ioutil.ReadFile("../keys/server.pem")
certPool.AppendCertsFromPEM(ca)
creds := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{cert},
ServerName: "www.p-pp.cn",
RootCAs: certPool,
})
// 连接
conn, err := grpc.Dial(Address, grpc.WithTransportCredentials(creds))
if err != nil {
grpclog.Fatalln(err)
}
defer conn.Close()
// 初始化客户端
c := pb.NewHelloClient(conn)
// 调用方法
req := &pb.HelloRequest{Name: "gRPC"}
res, err := c.SayHello(context.Background(), req)
if err != nil {
grpclog.Fatalln(err)
}
grpclog.Println(res.Message)
}
6.4 示例
server
root@www:/opt/go/src/gitee.com/dong9205/grpcStudy/topgoer/grpc_hello/hello_tls/server# ./server
2022/01/10 19:07:15 INFO: Listen on 127.0.0.1:50052 with TSL
2022/01/10 19:07:19 INFO: 收到请求grpclog
client
root@www:/opt/go/src/gitee.com/dong9205/grpcStudy/topgoer/grpc_hello/hello_tls/client# ./client
2022/01/10 19:07:45 INFO: [core] original dial target is: "127.0.0.1:50052"
2022/01/10 19:07:45 INFO: [core] dial target "127.0.0.1:50052" parse failed: parse "127.0.0.1:50052": first path segment in URL cannot contain colon
2022/01/10 19:07:45 INFO: [core] fallback to scheme "passthrough"
2022/01/10 19:07:45 INFO: [core] parsed dial target is: {Scheme:passthrough Authority: Endpoint:127.0.0.1:50052 URL:{Scheme:passthrough Opaque: User: Host: Path:/127.0.0.1:50052 RawPath: ForceQuery:false RawQuery: Fragment: RawFragment:}}
2022/01/10 19:07:45 INFO: [core] Channel authority set to "www.p-pp.cn"
2022/01/10 19:07:45 INFO: [core] ccResolverWrapper: sending update to cc: {[{127.0.0.1:50052 <nil> <nil> 0 <nil>}] <nil> <nil>}
2022/01/10 19:07:45 INFO: [core] ClientConn switching balancer to "pick_first"
2022/01/10 19:07:45 INFO: [core] Channel switches to new LB policy "pick_first"
2022/01/10 19:07:45 INFO: [core] blockingPicker: the picked transport is not ready, loop back to repick
2022/01/10 19:07:45 INFO: [core] Subchannel Connectivity change to CONNECTING
2022/01/10 19:07:45 INFO: [core] Subchannel picks a new address "127.0.0.1:50052" to connect
2022/01/10 19:07:45 INFO: [core] Channel Connectivity change to CONNECTING
2022/01/10 19:07:45 INFO: [core] Subchannel Connectivity change to READY
2022/01/10 19:07:45 INFO: [core] Channel Connectivity change to READY
2022/01/10 19:07:45 INFO: Hello gRPC.
2022/01/10 19:07:45 INFO: [core] Channel Connectivity change to SHUTDOWN
2022/01/10 19:07:45 INFO: [core] Subchannel Connectivity change to SHUTDOWN
6.5 TLS + Token认证
6.5.1 server.go
package main
import (
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"net"
"os"
pb "grpc-hello/proto/hello" // 引入编译生成的包
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes" // grpc 响应状态码
"google.golang.org/grpc/credentials" // 引入grpc认证包
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata" // 引入grpc meta包
)
const (
// Address gRPC服务地址
Address = "127.0.0.1:50052"
)
// 定义helloService并实现约定的接口
type helloService struct{}
// HelloService Hello服务
var HelloService = helloService{}
// SayHello 实现Hello服务接口
func (h helloService) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloResponse, error) {
grpclog.Println("收到请求grpclog")
// 解析metada中的信息并验证
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, grpc.Errorf(codes.Unauthenticated, "无Token认证信息")
}
var (
appid string
appkey string
)
if val, ok := md["appid"]; ok {
appid = val[0]
}
if val, ok := md["appkey"]; ok {
appkey = val[0]
}
if appid != "101010" || appkey != "i am key" {
return nil, grpc.Errorf(codes.Unauthenticated, "Token认证信息无效: appid=%s, appkey=%s", appid, appkey)
}
resp := new(pb.HelloResponse)
resp.Message = fmt.Sprintf("Hello %s.", in.Name)
return resp, nil
}
func main() {
grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stdout, os.Stdout, os.Stdout))
listen, err := net.Listen("tcp", Address)
if err != nil {
grpclog.Fatalf("Failed to listen: %v", err)
}
// TLS认证
cert, _ := tls.LoadX509KeyPair("../keys/hello.pem", "../keys/hello.key")
certPool := x509.NewCertPool()
ca, _ := ioutil.ReadFile("../keys/server.pem")
certPool.AppendCertsFromPEM(ca)
creds := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{cert},
ClientAuth: tls.RequireAndVerifyClientCert,
ClientCAs: certPool,
})
// 实例化grpc Server
s := grpc.NewServer(grpc.Creds(creds))
// 注册HelloService
pb.RegisterHelloServer(s, HelloService)
grpclog.Infoln("Listen on " + Address + " with TSL + Token")
s.Serve(listen)
}
6.5.2 client.go
package main
import (
"crypto/tls"
"crypto/x509"
pb "grpc-hello/proto/hello" // 引入proto包
"io/ioutil"
"os"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
)
const (
// Address gRPC服务地址
Address = "127.0.0.1:50052"
// OpenTLS 是否开启TLS认证
OpenTLS = true
)
// customCredential 自定义认证
type customCredential struct{}
// GetRequestMetadata 实现自定义认证接口
func (c customCredential) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
return map[string]string{
"appid": "101010",
"appkey": "i am key",
}, nil
}
// RequireTransportSecurity 自定义认证是否开启TLS
func (c customCredential) RequireTransportSecurity() bool {
return OpenTLS
}
func main() {
var err error
var opts []grpc.DialOption
grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stdout, os.Stdout, os.Stdout))
// TLS连接 记得把server name改成你写的服务器地址
if OpenTLS {
cert, _ := tls.LoadX509KeyPair("../keys/hello.pem", "../keys/hello.key")
certPool := x509.NewCertPool()
ca, _ := ioutil.ReadFile("../keys/server.pem")
certPool.AppendCertsFromPEM(ca)
creds := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{cert},
ServerName: "www.p-pp.cn",
RootCAs: certPool,
})
opts = append(opts, grpc.WithTransportCredentials(creds))
}
// 使用自定义认证
opts = append(opts, grpc.WithPerRPCCredentials(new(customCredential)))
// 连接
conn, err := grpc.Dial(Address, opts...)
if err != nil {
grpclog.Fatalln(err)
}
defer conn.Close()
// 初始化客户端
c := pb.NewHelloClient(conn)
// 调用方法
req := &pb.HelloRequest{Name: "gRPC"}
res, err := c.SayHello(context.Background(), req)
if err != nil {
grpclog.Fatalln(err)
}
grpclog.Println(res.Message)
}
6.5.2 示例
server
root@www:/opt/go/src/gitee.com/dong9205/grpcStudy/topgoer/grpc_hello/hello_tls_token/server# ./server
2022/01/10 19:13:05 INFO: Listen on 127.0.0.1:50052 with TSL + Token
2022/01/10 19:13:08 INFO: 收到请求grpclog
client
root@www:/opt/go/src/gitee.com/dong9205/grpcStudy/topgoer/grpc_hello/hello_tls_token/client# ./client
2022/01/10 19:13:18 INFO: [core] original dial target is: "127.0.0.1:50052"
2022/01/10 19:13:18 INFO: [core] dial target "127.0.0.1:50052" parse failed: parse "127.0.0.1:50052": first path segment in URL cannot contain colon
2022/01/10 19:13:18 INFO: [core] fallback to scheme "passthrough"
2022/01/10 19:13:18 INFO: [core] parsed dial target is: {Scheme:passthrough Authority: Endpoint:127.0.0.1:50052 URL:{Scheme:passthrough Opaque: User: Host: Path:/127.0.0.1:50052 RawPath: ForceQuery:false RawQuery: Fragment: RawFragment:}}
2022/01/10 19:13:18 INFO: [core] Channel authority set to "www.p-pp.cn"
2022/01/10 19:13:18 INFO: [core] ccResolverWrapper: sending update to cc: {[{127.0.0.1:50052 <nil> <nil> 0 <nil>}] <nil> <nil>}
2022/01/10 19:13:18 INFO: [core] ClientConn switching balancer to "pick_first"
2022/01/10 19:13:18 INFO: [core] Channel switches to new LB policy "pick_first"
2022/01/10 19:13:18 INFO: [core] blockingPicker: the picked transport is not ready, loop back to repick
2022/01/10 19:13:18 INFO: [core] Subchannel Connectivity change to CONNECTING
2022/01/10 19:13:18 INFO: [core] Subchannel picks a new address "127.0.0.1:50052" to connect
2022/01/10 19:13:18 INFO: [core] Channel Connectivity change to CONNECTING
2022/01/10 19:13:18 INFO: [core] Subchannel Connectivity change to READY
2022/01/10 19:13:18 INFO: [core] Channel Connectivity change to READY
2022/01/10 19:13:18 INFO: Hello gRPC.
2022/01/10 19:13:18 INFO: [core] Channel Connectivity change to SHUTDOWN
2022/01/10 19:13:18 INFO: [core] Subchannel Connectivity change to SHUTDOWN
Cleint修改appid后测试
root@www:/opt/go/src/gitee.com/dong9205/grpcStudy/topgoer/grpc_hello/hello_tls_token/client# ./client 2022/01/10 19:13:41 INFO: [core] original dial target is: "127.0.0.1:50052"
2022/01/10 19:13:41 INFO: [core] dial target "127.0.0.1:50052" parse failed: parse "127.0.0.1:50052": first path segment in URL cannot contain colon
2022/01/10 19:13:41 INFO: [core] fallback to scheme "passthrough"
2022/01/10 19:13:41 INFO: [core] parsed dial target is: {Scheme:passthrough Authority: Endpoint:127.0.0.1:50052 URL:{Scheme:passthrough Opaque: User: Host: Path:/127.0.0.1:50052 RawPath: ForceQuery:false RawQuery: Fragment: RawFragment:}}
2022/01/10 19:13:41 INFO: [core] Channel authority set to "www.p-pp.cn"
2022/01/10 19:13:41 INFO: [core] ccResolverWrapper: sending update to cc: {[{127.0.0.1:50052 <nil> <nil> 0 <nil>}] <nil> <nil>}
2022/01/10 19:13:41 INFO: [core] ClientConn switching balancer to "pick_first"
2022/01/10 19:13:41 INFO: [core] Channel switches to new LB policy "pick_first"
2022/01/10 19:13:41 INFO: [core] blockingPicker: the picked transport is not ready, loop back to repick
2022/01/10 19:13:41 INFO: [core] Subchannel Connectivity change to CONNECTING
2022/01/10 19:13:41 INFO: [core] Subchannel picks a new address "127.0.0.1:50052" to connect
2022/01/10 19:13:41 INFO: [core] Channel Connectivity change to CONNECTING
2022/01/10 19:13:41 INFO: [core] Subchannel Connectivity change to READY
2022/01/10 19:13:41 INFO: [core] Channel Connectivity change to READY
2022/01/10 19:13:41 FATAL: rpc error: code = Unauthenticated desc = Token认证信息无效: appid=1010101, appkey=i am key
2022/01/10 19:13:41 FATAL: rpc error: code = Unauthenticated desc = Token认证信息无效: appid=1010101, appkey=i am key
2022/01/10 19:13:41 FATAL: rpc error: code = Unauthenticated desc = Token认证信息无效: appid=1010101, appkey=i am key
6.6 TLS + Token + 拦截器
6.6.1 server.go
package main
import (
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"net"
"os"
pb "grpc-hello/proto/hello" // 引入编译生成的包
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes" // grpc 响应状态码
"google.golang.org/grpc/credentials" // 引入grpc认证包
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata" // 引入grpc meta包
)
const (
// Address gRPC服务地址
Address = "127.0.0.1:50052"
)
// 定义helloService并实现约定的接口
type helloService struct{}
// HelloService Hello服务
var HelloService = helloService{}
// SayHello 实现Hello服务接口
func (h helloService) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloResponse, error) {
grpclog.Println("收到请求grpclog")
resp := new(pb.HelloResponse)
resp.Message = fmt.Sprintf("Hello %s.", in.Name)
return resp, nil
}
func main() {
grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stdout, os.Stdout, os.Stdout))
listen, err := net.Listen("tcp", Address)
if err != nil {
grpclog.Fatalf("Failed to listen: %v", err)
}
var opts []grpc.ServerOption
// TLS认证
cert, _ := tls.LoadX509KeyPair("../keys/hello.pem", "../keys/hello.key")
certPool := x509.NewCertPool()
ca, _ := ioutil.ReadFile("../keys/server.pem")
certPool.AppendCertsFromPEM(ca)
creds := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{cert},
ClientAuth: tls.RequireAndVerifyClientCert,
ClientCAs: certPool,
})
opts = append(opts, grpc.Creds(creds))
// 注册interceptor
opts = append(opts, grpc.UnaryInterceptor(interceptor))
// 实例化grpc Server
s := grpc.NewServer(opts...)
// 注册HelloService
pb.RegisterHelloServer(s, HelloService)
grpclog.Infoln("Listen on " + Address + " with TSL + Token")
s.Serve(listen)
}
// auth 验证Token
func auth(ctx context.Context) error {
// 解析metada中的信息并验证
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return grpc.Errorf(codes.Unauthenticated, "无Token认证信息")
}
var (
appid string
appkey string
)
if val, ok := md["appid"]; ok {
appid = val[0]
}
if val, ok := md["appkey"]; ok {
appkey = val[0]
}
if appid != "101010" || appkey != "i am key" {
return grpc.Errorf(codes.Unauthenticated, "Token认证信息无效: appid=%s, appkey=%s", appid, appkey)
}
return nil
}
// interceptor 拦截器
func interceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
grpclog.Println("收到请求grpclog, 拦截器interceptor")
err := auth(ctx)
if err != nil {
return nil, err
}
// 继续处理请求
return handler(ctx, req)
}
6.6.2 client.go
package main
import (
"crypto/tls"
"crypto/x509"
pb "grpc-hello/proto/hello" // 引入proto包
"io/ioutil"
"os"
"time"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
)
const (
// Address gRPC服务地址
Address = "127.0.0.1:50052"
// OpenTLS 是否开启TLS认证
OpenTLS = true
)
// customCredential 自定义认证
type customCredential struct{}
// GetRequestMetadata 实现自定义认证接口
func (c customCredential) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
return map[string]string{
"appid": "101010",
"appkey": "i am key",
}, nil
}
// RequireTransportSecurity 自定义认证是否开启TLS
func (c customCredential) RequireTransportSecurity() bool {
return OpenTLS
}
func main() {
var err error
var opts []grpc.DialOption
grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stdout, os.Stdout, os.Stdout))
// TLS连接 记得把server name改成你写的服务器地址
if OpenTLS {
cert, _ := tls.LoadX509KeyPair("../keys/hello.pem", "../keys/hello.key")
certPool := x509.NewCertPool()
ca, _ := ioutil.ReadFile("../keys/server.pem")
certPool.AppendCertsFromPEM(ca)
creds := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{cert},
ServerName: "www.p-pp.cn",
RootCAs: certPool,
})
opts = append(opts, grpc.WithTransportCredentials(creds))
}
// 使用自定义认证
opts = append(opts, grpc.WithPerRPCCredentials(new(customCredential)))
// 指定客户端interceptor
opts = append(opts, grpc.WithUnaryInterceptor(interceptor))
// 连接
conn, err := grpc.Dial(Address, opts...)
if err != nil {
grpclog.Fatalln(err)
}
defer conn.Close()
// 初始化客户端
c := pb.NewHelloClient(conn)
// 调用方法
req := &pb.HelloRequest{Name: "gRPC"}
res, err := c.SayHello(context.Background(), req)
if err != nil {
grpclog.Fatalln(err)
}
grpclog.Println(res.Message)
}
// interceptor 客户端拦截器
func interceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
grpclog.Println("interceptor, 客户端拦截器")
start := time.Now()
err := invoker(ctx, method, req, reply, cc, opts...)
grpclog.Printf("method=%s req=%v rep=%v duration=%s error=%v\n", method, req, reply, time.Since(start), err)
return err
}
6.6.3 示例
server
root@www:/opt/go/src/gitee.com/dong9205/grpcStudy/topgoer/grpc_hello/hello_tls_token_interceprot/server# ./server
2022/01/10 19:15:56 INFO: Listen on 127.0.0.1:50052 with TSL + Token
2022/01/10 19:16:00 INFO: 收到请求grpclog, 拦截器interceptor
2022/01/10 19:16:00 INFO: 收到请求grpclog
root@www:/opt/go/src/gitee.com/dong9205/grpcStudy/topgoer/grpc_hello/hello_tls_token_interceprot/client# ./client
2022/01/10 19:16:00 INFO: [core] original dial target is: "127.0.0.1:50052"
2022/01/10 19:16:00 INFO: [core] dial target "127.0.0.1:50052" parse failed: parse "127.0.0.1:50052": first path segment in URL cannot contain colon
2022/01/10 19:16:00 INFO: [core] fallback to scheme "passthrough"
2022/01/10 19:16:00 INFO: [core] parsed dial target is: {Scheme:passthrough Authority: Endpoint:127.0.0.1:50052 URL:{Scheme:passthrough Opaque: User: Host: Path:/127.0.0.1:50052 RawPath: ForceQuery:false RawQuery: Fragment: RawFragment:}}
2022/01/10 19:16:00 INFO: [core] Channel authority set to "www.p-pp.cn"
2022/01/10 19:16:00 INFO: [core] ccResolverWrapper: sending update to cc: {[{127.0.0.1:50052 <nil> <nil> 0 <nil>}] <nil> <nil>}
2022/01/10 19:16:00 INFO: [core] ClientConn switching balancer to "pick_first"
2022/01/10 19:16:00 INFO: [core] Channel switches to new LB policy "pick_first"
2022/01/10 19:16:00 INFO: interceptor, 客户端拦截器
2022/01/10 19:16:00 INFO: [core] blockingPicker: the picked transport is not ready, loop back to repick
2022/01/10 19:16:00 INFO: [core] Subchannel Connectivity change to CONNECTING
2022/01/10 19:16:00 INFO: [core] Subchannel picks a new address "127.0.0.1:50052" to connect
2022/01/10 19:16:00 INFO: [core] Channel Connectivity change to CONNECTING
2022/01/10 19:16:00 INFO: [core] Subchannel Connectivity change to READY
2022/01/10 19:16:00 INFO: [core] Channel Connectivity change to READY
2022/01/10 19:16:00 INFO: method=/hello.Hello/SayHello req=name:"gRPC" rep=message:"Hello gRPC." duration=51.797747ms error=<nil>
2022/01/10 19:16:00 INFO: Hello gRPC.
2022/01/10 19:16:00 INFO: [core] Channel Connectivity change to SHUTDOWN
2022/01/10 19:16:00 INFO: [core] Subchannel Connectivity change to SHUTDOWN
7 内置trace
8 http网关
etcd3 API全面升级为gRPC后,同时要提供REST API服务,维护两个版本的服务显然不太合理,所以grpc-gateway诞生了。通过protobuf的自定义option实现了一个网关,服务端同时开启gRPC和HTTP服务,HTTP服务接收客户端请求后转换为grpc请求数据,获取响应后转为json数据返回给客户端。
安装
go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway
8.1 目录结构
root@www:/opt/go/src/gitee.com/dong9205/grpcStudy/topgoer/grpc_hello/proto# tree
.
├── google
│ └── api
│ ├── annotations.proto
│ └── http.proto
├── hello
│ ├── hello.pb.go
│ └── hello.proto
└── hello_http
└── hello_http.proto
root@www:/opt/go/src/gitee.com/dong9205/grpcStudy/topgoer/grpc_hello/hello_http# tree
.
├── client
│ ├── client
│ └── main.go
├── grpcserver
│ ├── main.go
│ └── server
├── httpgrpcserver
│ ├── main.go
│ └── server
├── httpserver
│ ├── main.go
│ └── server
└── keys
├── hello.csr
├── hello.key
├── hello.pem
├── openssl.cnf
├── server.key
├── server.pem
└── server.srl
5 directories, 15 files
8.2 编译proto
# /usr/local/include/google/protobuf/ 目录是安装protoc时候,解压的include目录
root@www:/opt/go/src/gitee.com/dong9205/grpcStudy/topgoer/grpc_hello/proto# ls /usr/local/include/google/protobuf/
any.proto compiler duration.proto field_mask.proto struct.proto type.proto
api.proto descriptor.proto empty.proto source_context.proto timestamp.proto wrappers.proto
# 编译google.api
root@www:/opt/go/src/gitee.com/dong9205/grpcStudy/topgoer/grpc_hello/proto# protoc -I /usr/local/include -I . --go_out=plugins=grpc:. google/api/*.proto
# 编译hello_http.proto
root@www:/opt/go/src/gitee.com/dong9205/grpcStudy/topgoer/grpc_hello/proto# protoc -I /usr/local/include -I . --go_out=plugins=grpc:. hello_http/hello_http.proto
# 编译hello_http.proto gateway
root@www:/opt/go/src/gitee.com/dong9205/grpcStudy/topgoer/grpc_hello/proto# protoc -I /usr/local/include -I . --grpc-gateway_out=logtostderr=true:. hello_http/hello_http.proto
8.3 使用http端口代理至grpc端口
目录:
8.3.1 grpcserver/server.go
package main
import (
"fmt"
"net"
"os"
pb "grpc-hello/proto/hello_http" // 引入编译生成的包
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
)
const (
// Address gRPC服务地址
Address = "127.0.0.1:50052"
)
// 定义helloService并实现约定的接口
type helloService struct{}
// HelloService Hello服务
var HelloService = helloService{}
// SayHello 实现Hello服务接口
func (h helloService) SayHello(ctx context.Context, in *pb.HelloHTTPRequest) (*pb.HelloHTTPResponse, error) {
grpclog.Println("收到请求grpclog")
resp := new(pb.HelloHTTPResponse)
resp.Message = fmt.Sprintf("Hello %s.", in.Name)
return resp, nil
}
func main() {
grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stdout, os.Stdout, os.Stdout))
listen, err := net.Listen("tcp", Address)
if err != nil {
grpclog.Fatalf("Failed to listen: %v", err)
}
// 实例化grpc Server
s := grpc.NewServer()
// 注册HelloService
// pb.RegisterHelloServer(s, HelloService)
pb.RegisterHelloHTTPServer(s, HelloService)
grpclog.Println("Listen on " + Address)
s.Serve(listen)
}
8.3.2 httpserver/main.go
package main
import (
"net/http"
"os"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
gw "grpc-hello/proto/hello_http"
)
func main() {
grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stdout, os.Stdout, os.Stdout))
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// grpc服务地址
endpoint := "127.0.0.1:50052"
mux := runtime.NewServeMux()
opts := []grpc.DialOption{grpc.WithInsecure()}
// HTTP转grpc
err := gw.RegisterHelloHTTPHandlerFromEndpoint(ctx, mux, endpoint, opts)
if err != nil {
grpclog.Fatalf("Register handler err:%v\n", err)
}
grpclog.Println("HTTP Listen on 8080")
if err := http.ListenAndServe(":8080", mux); err != nil {
grpclog.Fatalf("Register handler err:%v\n", err)
}
}
8.3.3 示例
Grpc Server
root@www:/opt/go/src/gitee.com/dong9205/grpcStudy/topgoer/grpc_hello/hello_http/grpcserver# ./server
2022/01/10 19:40:23 INFO: Listen on 127.0.0.1:50052
2022/01/10 19:40:30 INFO: 收到请求grpclog
HTTP Server
root@www:/opt/go/src/gitee.com/dong9205/grpcStudy/topgoer/grpc_hello/hello_http/httpserver# ./server
2022/01/10 19:36:36 INFO: [core] original dial target is: "127.0.0.1:50052"
2022/01/10 19:36:36 INFO: [core] dial target "127.0.0.1:50052" parse failed: parse "127.0.0.1:50052": first path segment in URL cannot contain colon
2022/01/10 19:36:36 INFO: [core] fallback to scheme "passthrough"
2022/01/10 19:36:36 INFO: [core] parsed dial target is: {Scheme:passthrough Authority: Endpoint:127.0.0.1:50052 URL:{Scheme:passthrough Opaque: User: Host: Path:/127.0.0.1:50052 RawPath: ForceQuery:false RawQuery: Fragment: RawFragment:}}
2022/01/10 19:36:36 INFO: [core] Channel authority set to "127.0.0.1:50052"
2022/01/10 19:36:36 INFO: [core] ccResolverWrapper: sending update to cc: {[{127.0.0.1:50052 <nil> <nil> 0 <nil>}] <nil> <nil>}
2022/01/10 19:36:36 INFO: [core] ClientConn switching balancer to "pick_first"
2022/01/10 19:36:36 INFO: [core] Channel switches to new LB policy "pick_first"
2022/01/10 19:36:36 INFO: HTTP Listen on 8080
2022/01/10 19:36:36 INFO: [core] Subchannel Connectivity change to CONNECTING
2022/01/10 19:36:36 INFO: [core] Subchannel picks a new address "127.0.0.1:50052" to connect
2022/01/10 19:36:36 INFO: [core] Channel Connectivity change to CONNECTING
2022/01/10 19:36:36 INFO: [core] Subchannel Connectivity change to TRANSIENT_FAILURE
2022/01/10 19:36:36 INFO: [core] Channel Connectivity change to TRANSIENT_FAILURE
2022/01/10 19:36:37 INFO: [core] Subchannel Connectivity change to IDLE
2022/01/10 19:36:37 INFO: [core] Channel Connectivity change to IDLE
Http Client
root@www:/opt/go/src/gitee.com/dong9205/grpcStudy# curl -X POST http://localhost:8080/example/echo -d '{"name": "gRPC-HTTP is working!"}'
{"message":"Hello gRPC-HTTP is working!."}
8.4 HTTP、GRPC使用同一端口
8.4.1 httpgrpcserver/server.go
package main
import (
"crypto/tls"
"crypto/x509"
"io/ioutil"
"net"
"net/http"
"os"
"strings"
pb "grpc-hello/proto/hello_http"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"golang.org/x/net/context"
"golang.org/x/net/http2"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
)
// 定义helloHTTPService并实现约定的接口
type helloHTTPService struct{}
// HelloHTTPService Hello HTTP服务
var HelloHTTPService = helloHTTPService{}
// SayHello 实现Hello服务接口
func (h helloHTTPService) SayHello(ctx context.Context, in *pb.HelloHTTPRequest) (*pb.HelloHTTPResponse, error) {
grpclog.Infoln("收到请求grpclog")
resp := new(pb.HelloHTTPResponse)
resp.Message = "Hello " + in.Name + "."
return resp, nil
}
func main() {
grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stdout, os.Stdout, os.Stdout))
endpoint := "127.0.0.1:50052"
conn, err := net.Listen("tcp", endpoint)
if err != nil {
grpclog.Fatalf("TCP Listen err:%v\n", err)
}
// grpc tls server
// TLS认证
cert, _ := tls.LoadX509KeyPair("../keys/hello.pem", "../keys/hello.key")
ca, _ := ioutil.ReadFile("../keys/server.pem")
certPool := x509.NewCertPool()
certPool.AppendCertsFromPEM(ca)
tslConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
ClientAuth: tls.RequireAndVerifyClientCert,
ClientCAs: certPool,
}
creds := credentials.NewTLS(tslConfig)
grpcServer := grpc.NewServer(grpc.Creds(creds))
pb.RegisterHelloHTTPServer(grpcServer, HelloHTTPService)
// gw server
ctx := context.Background()
tslConfig = &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: certPool,
ClientCAs: certPool,
ClientAuth: tls.RequireAndVerifyClientCert,
ServerName: "www.p-pp.cn",
}
creds = credentials.NewTLS(tslConfig)
dopts := []grpc.DialOption{grpc.WithTransportCredentials(creds)}
gwmux := runtime.NewServeMux()
if err = pb.RegisterHelloHTTPHandlerFromEndpoint(ctx, gwmux, endpoint, dopts); err != nil {
grpclog.Fatalf("Failed to register gw server: %v\n", err)
}
// http服务
mux := http.NewServeMux()
mux.Handle("/", gwmux)
tslConfig = &tls.Config{
Certificates: []tls.Certificate{cert},
NextProtos: []string{http2.NextProtoTLS}, // HTTP2 TLS支持
RootCAs: certPool,
ClientCAs: certPool,
ClientAuth: tls.NoClientCert,
}
// creds = credentials.NewTLS(tslConfig)
srv := &http.Server{
Addr: endpoint,
Handler: grpcHandlerFunc(grpcServer, mux),
TLSConfig: tslConfig,
}
grpclog.Infof("gRPC and https listen on: %s\n", endpoint)
if err = srv.Serve(tls.NewListener(conn, srv.TLSConfig)); err != nil {
grpclog.Fatal("ListenAndServe: ", err)
}
}
func getTLSConfig() *tls.Config {
cert, _ := ioutil.ReadFile("../keys/hello.pem")
key, _ := ioutil.ReadFile("../keys/hello.key")
var demoKeyPair *tls.Certificate
pair, err := tls.X509KeyPair(cert, key)
if err != nil {
grpclog.Fatalf("TLS KeyPair err: %v\n", err)
}
demoKeyPair = &pair
return &tls.Config{
Certificates: []tls.Certificate{*demoKeyPair},
NextProtos: []string{http2.NextProtoTLS}, // HTTP2 TLS支持
}
}
// grpcHandlerFunc returns an http.Handler that delegates to grpcServer on incoming gRPC
// connections or otherHandler otherwise. Copied from cockroachdb.
func grpcHandlerFunc(grpcServer *grpc.Server, otherHandler http.Handler) http.Handler {
if otherHandler == nil {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
grpcServer.ServeHTTP(w, r)
})
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") {
grpcServer.ServeHTTP(w, r)
} else {
otherHandler.ServeHTTP(w, r)
}
})
}
8.4.2 client/client.go
package main
import (
"crypto/tls"
"crypto/x509"
pb "grpc-hello/proto/hello_http" // 引入proto包
"io/ioutil"
"os"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
)
const (
// Address gRPC服务地址
Address = "127.0.0.1:50052"
)
func main() {
grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stdout, os.Stdout, os.Stdout))
// TLS连接 记得把server name改成你写的服务器地址
cert, _ := tls.LoadX509KeyPair("../keys/hello.pem", "../keys/hello.key")
certPool := x509.NewCertPool()
ca, _ := ioutil.ReadFile("../keys/server.pem")
certPool.AppendCertsFromPEM(ca)
creds := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{cert},
ServerName: "www.p-pp.cn",
RootCAs: certPool,
})
// 连接
conn, err := grpc.Dial(Address, grpc.WithTransportCredentials(creds))
if err != nil {
grpclog.Fatalln(err)
}
defer conn.Close()
// 初始化客户端
c := pb.NewHelloHTTPClient(conn)
// 调用方法
req := &pb.HelloHTTPRequest{Name: "gRPC"}
res, err := c.SayHello(context.Background(), req)
if err != nil {
grpclog.Fatalln(err)
}
grpclog.Println(res.Message)
}
8.4.3 示例
Server
root@www:/opt/go/src/gitee.com/dong9205/grpcStudy/topgoer/grpc_hello/hello_http/httpgrpcserver# ./server
2022/01/10 19:44:00 INFO: [core] original dial target is: "127.0.0.1:50052"
2022/01/10 19:44:00 INFO: [core] dial target "127.0.0.1:50052" parse failed: parse "127.0.0.1:50052": first path segment in URL cannot contain colon
2022/01/10 19:44:00 INFO: [core] fallback to scheme "passthrough"
2022/01/10 19:44:00 INFO: [core] parsed dial target is: {Scheme:passthrough Authority: Endpoint:127.0.0.1:50052 URL:{Scheme:passthrough Opaque: User: Host: Path:/127.0.0.1:50052 RawPath: ForceQuery:false RawQuery: Fragment: RawFragment:}}
2022/01/10 19:44:00 INFO: [core] Channel authority set to "www.p-pp.cn"
2022/01/10 19:44:00 INFO: [core] ccResolverWrapper: sending update to cc: {[{127.0.0.1:50052 <nil> <nil> 0 <nil>}] <nil> <nil>}
2022/01/10 19:44:00 INFO: [core] ClientConn switching balancer to "pick_first"
2022/01/10 19:44:00 INFO: [core] Channel switches to new LB policy "pick_first"
2022/01/10 19:44:00 INFO: gRPC and https listen on: 127.0.0.1:50052
2022/01/10 19:44:00 INFO: [core] Subchannel Connectivity change to CONNECTING
2022/01/10 19:44:00 INFO: [core] Subchannel picks a new address "127.0.0.1:50052" to connect
2022/01/10 19:44:00 INFO: [core] Channel Connectivity change to CONNECTING
2022/01/10 19:44:00 INFO: [core] Subchannel Connectivity change to READY
2022/01/10 19:44:00 INFO: [core] Channel Connectivity change to READY
2022/01/10 19:44:03 INFO: 收到请求grpclog
Grpc Client
root@www:/opt/go/src/gitee.com/dong9205/grpcStudy/topgoer/grpc_hello/hello_http/client# ./client 2022/01/10 19:44:03 INFO: [core] original dial target is: "127.0.0.1:50052"
2022/01/10 19:44:03 INFO: [core] dial target "127.0.0.1:50052" parse failed: parse "127.0.0.1:50052": first path segment in URL cannot contain colon
2022/01/10 19:44:03 INFO: [core] fallback to scheme "passthrough"
2022/01/10 19:44:03 INFO: [core] parsed dial target is: {Scheme:passthrough Authority: Endpoint:127.0.0.1:50052 URL:{Scheme:passthrough Opaque: User: Host: Path:/127.0.0.1:50052 RawPath: ForceQuery:false RawQuery: Fragment: RawFragment:}}
2022/01/10 19:44:03 INFO: [core] Channel authority set to "www.p-pp.cn"
2022/01/10 19:44:03 INFO: [core] ccResolverWrapper: sending update to cc: {[{127.0.0.1:50052 <nil> <nil> 0 <nil>}] <nil> <nil>}
2022/01/10 19:44:03 INFO: [core] ClientConn switching balancer to "pick_first"
2022/01/10 19:44:03 INFO: [core] Channel switches to new LB policy "pick_first"
2022/01/10 19:44:03 INFO: [core] blockingPicker: the picked transport is not ready, loop back to repick
2022/01/10 19:44:03 INFO: [core] Subchannel Connectivity change to CONNECTING
2022/01/10 19:44:03 INFO: [core] Subchannel picks a new address "127.0.0.1:50052" to connect
2022/01/10 19:44:03 INFO: [core] Channel Connectivity change to CONNECTING
2022/01/10 19:44:03 INFO: [core] Subchannel Connectivity change to READY
2022/01/10 19:44:03 INFO: [core] Channel Connectivity change to READY
2022/01/10 19:44:03 INFO: Hello gRPC.
2022/01/10 19:44:03 INFO: [core] Channel Connectivity change to SHUTDOWN
2022/01/10 19:44:03 INFO: [core] Subchannel Connectivity change to SHUTDOWN
HTTP Client
root@www:/opt/go/src/gitee.com/dong9205/grpcStudy# curl -X POST -k https://localhost:50052/example/echo -d '{"name": "gRPC-HTTP is working!"}'
{"message":"Hello gRPC-HTTP is working!."}