GRPC 学习笔记

admin 2023年3月5日11:12:53评论23 views字数 19184阅读63分56秒阅读模式

简介

gRPC 是 Google 开源的一个远程过程调用(Remote Procedure Call) 框架,在 gRPC 中,客户端应用程序可以直接调用不同机器上的服务器应用程序上的方法,就像它是本地对象一样,更容易创建分布式应用程序和服务。默认情况下,gRPC 使用 协议缓冲区 通信, 并支持以下语言:

  • C#
  • C++
  • Dart
  • Go
  • Java
  • Kotlin
  • Node
  • Objective-C
  • PHP
  • Python
  • Ruby
GRPC 学习笔记
image.png

协议缓冲区 (protocol buffers)

协议缓冲区提供了一种语言中立、平台中立、可扩展的机制,用于以向前兼容和向后兼容的方式序列化结构化数据。它类似于 JSON,只是它更小更快,并且生成本地语言绑定。 协议缓冲区是定义语言(在 .proto文件中创建)、proto 编译器生成的与数据接口的代码、特定于语言的运行时库以及写入文件(或通过网络连接)。

使用协议缓冲区的第一步是定义要在_proto 文件_中序列化的数据的结构:这是一个带有.proto扩展名的普通文本文件。协议缓冲区数据被构造为_消息_,其中每条消息都是一个小的信息逻辑记录,包含一系列称为_字段_的名称-值对。这是一个简单的例子:

// 指定使用 proto3 语法
syntax = "proto3";

// 声明包名,防止冲突
package helloworld;

// 消息, 指定字段类型,并为字段分配一个唯一的编号
message HelloRequest{
string name = 1;
int64 id = 2;
}

更多语法请看 https://developers.google.com/protocol-buffers/docs/proto3

生成 gRPC 代码

// 指定使用 proto3 语法
syntax = "proto3";

// 指定最后生成的go文件是处在哪个目录哪个包中,.代表在当前目录生成,helloworld 代表了生成的go文件的包名是 helloworld。
option go_package = ".;helloworld";

// 声明包名,防止冲突
package helloworld;

// 远程调用定义的消息类型
service Greeter {
// Greeter 服务有一个 SayHello 方法,可以让服务端从远程客户端接收一个 HelloRequest 消息后,返回 HelloReply 消息
// 服务端必须实现该方法
rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// 消息, 指定字段类型,并为字段分配一个唯一的编号
message HelloRequest{
string name = 1;
int64 id = 2;
}

message HelloReply {
string message = 1;
}

一旦定义好服务,我们可以使用 protocol buffer 编译器 protoc 来生成创建应用所需的特定客户端和服务端的代码。 首先运行 brew install protobuf 安装 protoc  https://github.com/protocolbuffers/protobuf/releases

> protoc --version
libprotoc 3.19.4

其次安装 go 的插件

go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest

默认下载到$GOPATH/bin下。 指定源目录(应用程序的源代码所在的位置——如果不提供值,则使用当前目录)、目标目录(希望生成的代码所在的位置;通常与$SRC_DIR相同) ,以及.protp文件.

protoc  --go_out=helloworld --go-grpc_out=helloworld helloworld/helloworld.proto

生成两个文件:

  • helloworld.pb.go  文件 ,包含用于填充、序列化和检索请求和响应消息类型的所有协议缓冲区代码。
  • helloworld_grpc.pb.go 文件, 其中包含以下内容:
    • 客户端使用Greeter服务中定义的方法调用的接口类型(或存根)。
    • 服务器要实现的接口类型,也可以使用Greeter服务中定义的方法。

服务端

package main

import (
"context"
"fmt"
"google.golang.org/grpc"
pb "grpcTest/helloworld"
"log"
"net"
)

// server is used to implement helloworld.GreeterServer.
type server struct {
pb.UnimplementedGreeterServer
}

// 必须实现 SayHello 方法

// SayHello implements helloworld.GreeterServer
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
log.Printf("Received: %v %d", in.GetName(), in.GetId())
return &pb.HelloReply{Message: fmt.Sprintf("Hello %s %dn", in.GetName(), in.GetId())}, nil
}

/**
@author: yhy
@since: 2022/8/8
@desc: //TODO
**/

func main() {
// 监听
lis, err := net.Listen("tcp", ":8999")

if err != nil {
log.Fatalf("failed to listen: %v", err)
return
}

s := grpc.NewServer()

pb.RegisterGreeterServer(s, &server{})
log.Printf("server listening at %v", lis.Addr())
err = s.Serve(lis)
if err != nil {
fmt.Println(err)
return
}

}

客户端

package main

import (
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
pb "grpcTest/helloworld"
"log"
"time"
)

/**
@author: yhy
@since: 2022/8/8
@desc: //TODO
**/


func main() {
conn, err := grpc.Dial("0.0.0.0:8999", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := pb.NewGreeterClient(conn)

// Contact the server and print out its response.
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
r, err := c.SayHello(ctx, &pb.HelloRequest{
Name: "yhy",
Id: 1,
})
if err != nil {
log.Fatalf("could not greet: %v", err)
}
log.Printf("Greeting: %s", r.GetMessage())
}

service 方法的4 种请求和响应模式类型

  • 一个_简单的 RPC_,其中客户端使用存根向服务器发送请求并等待响应返回,就像正常的函数调用一样。

前面的SayHello方法 就是一个简单RPC

rpc GetFeature(Point) returns (Feature) {}
  • 服务器端流式 RPC ,客户端发送请求到服务器,拿到一个流去读取返回的消息序列。 客户端读取返回的流,直到里面没有任何消息。从例子中可以看出,通过在 响应 类型前插入stream关键字,可以指定一个服务器端的流方法。
// Obtains the Features available within the given Rectangle.  Results are
// streamed rather than returned at once (e.g. in a response message with a
// repeated field), as the rectangle may cover a large area and contain a
// huge number of features.
rpc ListFeatures(Rectangle) returns (stream Feature) {}
  • 一个 客户端流式 RPC , 客户端写入一个消息序列并将其发送到服务器,同样也是使用流。一旦客户端完成写入消息,它等待服务器完成读取返回它的响应。通过在 请求 类型前指定 stream 关键字来指定一个客户端的流方法。
// Accepts a stream of Points on a route being traversed, returning a
// RouteSummary when traversal is completed.
rpc RecordRoute(stream Point) returns (RouteSummary) {}
  • 一个 双向流式 RPC 是双方使用读写流去发送一个消息序列。两个流独立操作,因此客户端和服务器可以以任意喜欢的顺序读写:比如, 服务器可以在写入响应前等待接收所有的客户端消息,或者可以交替的读取和写入消息,或者其他读写的组合。 每个流中的消息顺序被预留。你可以通过在请求和响应前加 stream 关键字去制定方法的类型。
// Accepts a stream of RouteNotes sent while a route is being traversed,
// while receiving other RouteNotes (e.g. from other users).
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

身份验证

gRPC 底层是基于 HTTP/2 协议的,HTTP 本身不带任何加密传输功能,基于 SSL 的 HTTPS 协议才是加密传输。gRPC 使用了 HTTP/2 协议但是并未使用 HTTPS,即少了加密传输的部分。 对于加密传输的部分 gRPC 将它抽出来作为一个组件,可以由用户自由选择。gRPC 内默认提供了两种 内置的认证方式:

  • 基于 CA 证书的 SSL/TLS 认证方式;
  • 基于 Token 的认证方式。

gRPC 中的连接类型一共有以下 3 种:

  • insecure connection:不使用 TLS 加密;
  • server-side TLS:仅服务端 TLS 加密;
  • mutual TLS:客户端、服务端都使用 TLS 加密。

基于 TLS/SSL 认证

1.ca根证书生成

新建ca.conf,填入以下内容:

[ req ]
default_bits = 4096
distinguished_name = req_distinguished_name

[ req_distinguished_name ]
countryName = Country Name (2 letter code)
countryName_default = CN
stateOrProvinceName = State or Province Name (full name)
stateOrProvinceName_default = JiangSu
localityName = Locality Name (eg, city)
localityName_default = NanJing
organizationName = Organization Name (eg, company)
organizationName_default = Step
commonName = CommonName (e.g. server FQDN or YOUR name)
commonName_max = 64
commonName_default = XXX(自定义)

生成ca.key:

openssl genrsa -out ca.key 4096

生成ca.csr:(直接回车,采用default默认配置值)

openssl req -new -sha256 -out ca.csr -key ca.key -config ca.conf

生成ca.crt:

openssl x509 -req -days 3650 -in ca.csr -signkey ca.key -out ca.crt

2.server证书生成

新建server.conf,填入以下内容:

[ req ]
default_bits = 2048
distinguished_name = req_distinguished_name

[ req_distinguished_name ]
countryName = Country Name (2 letter code)
countryName_default = CN
stateOrProvinceName = State or Province Name (full name)
stateOrProvinceName_default = JiangSu
localityName = Locality Name (eg, city)
localityName_default = NanJing
organizationName = Organization Name (eg, company)
organizationName_default = Step
commonName = CommonName (e.g. server FQDN or YOUR name)
commonName_max = 64
commonName_default = XXX(自定义,客户端需要此字段做匹配)
[ req_ext ]
subjectAltName = @alt_names
[alt_names]
DNS.1 = XXX(自定义)
IP = 127.0.0.1

生成server.key:

openssl genrsa -out server.key 2048

生成server.csr:(直接回车,采用default默认配置值)

openssl req -new -sha256 -out server.csr -key server.key -config server.conf

生成server.crt:

openssl x509 -req -days 3650 -CA ca.crt -CAkey ca.key -CAcreateserial -in server.csr -out server.pem -extensions req_ext -extfile server.conf

最终只需要 server.pemserver.key两个文件

package main

import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
pb "grpcTest/helloworld"
"log"
"net"
)

// server is used to implement helloworld.GreeterServer.
type server struct {
pb.UnimplementedGreeterServer
}

// 必须实现 SayHello 方法

// SayHello implements helloworld.GreeterServer
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
log.Printf("Received: %v %d", in.GetName(), in.GetId())
return &pb.HelloReply{Message: fmt.Sprintf("Hello %s %dn", in.GetName(), in.GetId())}, nil
}

// LoggingInterceptor 拦截器 - 打印日志
func LoggingInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler)
(interface{}, error)
{
fmt.Printf("gRPC method: %s, %v", info.FullMethod, req)
resp, err := handler(ctx, req)
fmt.Printf("gRPC method: %s, %v", info.FullMethod, resp)
return resp, err
}

/**
@author: yhy
@since: 2022/8/8
@desc: //TODO
**/

func main() {
// 监听
lis, err := net.Listen("tcp", ":8999")

if err != nil {
log.Fatalf("failed to listen: %v", err)
return
}

// TLS认证
creds, err := credentials.NewServerTLSFromFile("/Users/yhy/go/workplace/grpcTest/ca/server.pem", "/Users/yhy/go/workplace/grpcTest/ca/server.key")

if err != nil {
grpclog.Fatalf("Failed to generate credentials %v", err)
}

//创建gRPC服务器,开启TLS认证
s := grpc.NewServer(grpc.Creds(creds))
// 在gRPC服务端注册服务
pb.RegisterGreeterServer(s, &server{})

log.Printf("server listening at %v", lis.Addr())
err = s.Serve(lis)
if err != nil {
fmt.Println(err)
return
}

}

package main

import (
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
pb "grpcTest/helloworld"
"log"
"time"
)

/**
@author: yhy
@since: 2022/8/8
@desc: //TODO
**/


func main() {
// TLS认证
creds, err := credentials.NewClientTLSFromFile("/Users/yhy/go/workplace/grpcTest/ca/server.pem", "*.yhy.com") // 指定的 DNS.1

if err != nil {
grpclog.Fatalf("Failed to generate credentials %v", err)
}

conn, err := grpc.Dial("0.0.0.0:8999", grpc.WithTransportCredentials(creds))
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := pb.NewGreeterClient(conn)

// Contact the server and print out its response.
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
r, err := c.SayHello(ctx, &pb.HelloRequest{
Name: "yhy",
Id: 1,
})
if err != nil {
log.Fatalf("could not greet: %v", err)
}
log.Printf("Greeting: %s", r.GetMessage())
}

这样就可以对客户端和服务端之间交互的所有数据进行加密。

Token认证

客户端发请求时,添加Token到上下文context.Context中,服务器接收到请求,先从上下文中获取Token验证,验证通过才进行下一步处理。 gRPC 中默认定义了 PerRPCCredentials,是提供用于自定义认证的接口,它的作用是将所需的安全认证信息添加到每个RPC方法的上下文中。其包含 2 个方法:

  • GetRequestMetadata:获取当前请求认证所需的元数据
  • RequireTransportSecurity:是否需要基于 TLS 认证进行安全传输
type PerRPCCredentials interface {
GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error)
RequireTransportSecurity() bool
}

使用 token 认证必须实现这两个方法。 这里以 JWT作为 Token 认证为例,新建jwt.go文件

package token

import (
"context"
"fmt"
"github.com/dgrijalva/jwt-go"
"time"

"google.golang.org/grpc/metadata"
)

/**
@author: yhy
@since: 2022/8/9
@desc: https://www.cnblogs.com/rickiyang/p/14989375.html
**/


const JwtSecret = "yhy" // jwt 秘钥

func GenerateToken(userName string) (string, error) {
claims := jwt.MapClaims{
"iss": "test",
"aud": "test",
"nbf": time.Now().Unix(),
"exp": time.Now().Add(24 * time.Hour).Unix(), // 过期时间
"sub": "user",
"username": userName,
}
// sha256
tokenClaims := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
token, err := tokenClaims.SignedString([]byte(JwtSecret))
return token, err
}

// AuthToken 自定义认证
type AuthToken struct {
Token string
}

// 必须实现以下两个方法

// GetRequestMetadata 获取当前请求认证所需的元数据
func (c AuthToken) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
return map[string]string{
"authorization": c.Token,
}, nil
}

// RequireTransportSecurity 是否需要基于 TLS 认证进行安全传输
func (c AuthToken) RequireTransportSecurity() bool {
return false
}

// Claims defines the struct containing the token claims.
type Claims struct {
jwt.StandardClaims

// Username defines the identity of the user.
Username string `json:"username"`
}

// Step1. 从 context 的 metadata 中,取出 token

func getTokenFromContext(ctx context.Context) (string, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return "", fmt.Errorf("Err NoMetadata In Context")
}
// md 的类型是 type MD map[string][]string
token, ok := md["authorization"]
if !ok || len(token) == 0 {
return "", fmt.Errorf("Err NoAuthorization In Metadata")
}
// 因此,token 是一个字符串数组,我们只用了 token[0]
return token[0], nil
}

func CheckAuth(ctx context.Context) (username string, err error) {
tokenStr, err := getTokenFromContext(ctx)
if err != nil {
panic("get token from context error")
}
var clientClaims Claims
token, err := jwt.ParseWithClaims(tokenStr, &clientClaims, func(token *jwt.Token) (interface{}, error) {
if token.Header["alg"] != "HS256" {
panic("ErrInvalidAlgorithm")
}
return []byte(JwtSecret), nil
})
if err != nil {
return "", err
}

if !token.Valid {
return "", err
}

return clientClaims.Username, nil
}


helloworld.proto增加以下代码


// 远程调用定义的消息类型
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply) {}
// 服务端必须实现该方法
rpc Login (LoginRequest) returns (LoginResp) {} //新增的 login 方法
}

// 消息, 指定字段类型,并为字段分配一个唯一的编号
message LoginRequest{
string username = 1;
string password = 2;
}
message LoginResp{
string status = 1;
string token = 2;
}

执行 protoc --go_out=helloworld --go-grpc_out=helloworld helloworld/helloworld.proto重新生成对应的文件server.go 实现 Login方法

func (s *server) Login(ctx context.Context, request *pb.LoginRequest) (resp *pb.LoginResp, err error) {
if request.Username == "yhy" && request.Password == "123456" {
jwtToken, err := token.GenerateToken(request.Username)
if err != nil {
return nil, err
}
return &pb.LoginResp{Status: "200", Token: jwtToken}, nil
}
return &pb.LoginResp{Status: "401", Token: ""}, nil
}

然后服务端代码中,每个服务的方法都需要添加CheckAuth(ctx)来验证Token,这样十分麻烦。这里使用gRPC拦截器,能够很好地解决这个问题。gRPC拦截器功能类似中间件,拦截器收到请求后,先进行一些操作,然后才进入服务的代码处理。 服务端完整代码

package main

/**
@author: yhy
@since: 2022/8/8
@desc: //TODO
**/


import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
pb "grpcTest/helloworld"
"grpcTest/token"
"log"
"net"
)

// server is used to implement helloworld.GreeterServer.
type server struct {
pb.UnimplementedGreeterServer
}

// 必须实现 SayHello 方法

// SayHello implements helloworld.GreeterServer
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
log.Printf("Received: %v %d", in.GetName(), in.GetId())
return &pb.HelloReply{Message: fmt.Sprintf("Hello %s %dn", in.GetName(), in.GetId())}, nil
}

func (s *server) Login(ctx context.Context, request *pb.LoginRequest) (resp *pb.LoginResp, err error) {
if request.Username == "yhy" && request.Password == "123456" {
jwtToken, err := token.GenerateToken(request.Username)
if err != nil {
return nil, err
}
return &pb.LoginResp{Status: "200", Token: jwtToken}, nil
}
return &pb.LoginResp{Status: "401", Token: ""}, nil
}

// Interceptor 拦截器
func Interceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
fmt.Printf("gRPC method1: %s, %vn", info.FullMethod, req)
if info.FullMethod != "/helloworld.Greeter/Login" {
//拦截普通方法请求,验证Token
username, err := token.CheckAuth(ctx)
if err != nil {
log.Printf("err : %vn", err)
return resp, err
}
log.Printf("用户 %s 登录n", username)
}

resp, err = handler(ctx, req)
fmt.Printf("gRPC method2: %s, %vn", info.FullMethod, resp)

// 继续处理请求
return resp, err
}

func main() {
// 监听
lis, err := net.Listen("tcp", ":8999")

if err != nil {
log.Fatalf("failed to listen: %v", err)
return
}

// TLS认证
creds, err := credentials.NewServerTLSFromFile("/Users/yhy/go/workplace/grpcTest/ca/server.pem", "/Users/yhy/go/workplace/grpcTest/ca/server.key")

if err != nil {
grpclog.Fatalf("Failed to generate credentials %v", err)
}

//创建gRPC服务器,并开启TLS认证和Token认证
s := grpc.NewServer(grpc.Creds(creds), grpc.UnaryInterceptor(Interceptor))

// 在gRPC服务端注册服务
pb.RegisterGreeterServer(s, &server{})

log.Printf("server listening at %v", lis.Addr())
err = s.Serve(lis)
if err != nil {
fmt.Println(err)
return
}

}

客户端代码

package main

import (
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
pb "grpcTest/helloworld"
"grpcTest/token"
"log"
"time"
)

/**
@author: yhy
@since: 2022/8/8
@desc: //TODO
**/


func main() {
// TLS认证
creds, err := credentials.NewClientTLSFromFile("/Users/yhy/go/workplace/grpcTest/ca/server.pem", "*.kuafu.com")

if err != nil {
grpclog.Fatalf("Failed to generate credentials %v", err)
}
//连接服务端
conn, err := grpc.Dial("0.0.0.0:8999", grpc.WithTransportCredentials(creds))
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := pb.NewGreeterClient(conn)

// 调用 login 方法进行验证
login, err := c.Login(context.Background(), &pb.LoginRequest{Username: "yhy", Password: "123456"})
if err != nil {
log.Fatalf("did not connect: %v", err)
}

// 获取 token
requestToken := new(token.AuthToken)
requestToken.Token = login.Token

// 加上 token 连接服务端
conn, err = grpc.Dial(":8999", grpc.WithTransportCredentials(creds), grpc.WithPerRPCCredentials(requestToken))
if err != nil {
log.Fatalf("faild to connect: %v", err)
}
defer conn.Close()
c = pb.NewGreeterClient(conn)

// Contact the server and print out its response.
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
r, err := c.SayHello(ctx, &pb.HelloRequest{
Name: "yhy",
Id: 1,
})
if err != nil {
log.Fatalf("could not greet: %v", err)
}
log.Printf("Greeting: %s", r.GetMessage())
}

双向流

普通RPC 就像正常调用方法一样,主要就是流式的使用,这里以双向流为例

// 指定使用 proto3 语法
syntax = "proto3";

// 指定最后生成的go文件是处在哪个目录哪个包中,.代表在当前目录生成,kuafu 代表了生成的go文件的包名是 kuafu。
option go_package = "./historyPb";

// 远程调用定义的消息类型
service History {
// 双向流
rpc User(stream UserHistory) returns (stream UserHistory) {}
}

message UserHistory {
string msg = 1;
}

服务端实现

// User 用户历史记录 客户端输入啥就返回啥
func (h *History) User(stream historyPb.History_UserServer) error {
for {
res, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
err = stream.Send(&historyPb.UserHistory{Msg: res.Msg})
if err != nil {
return err
}
}
}

客户端

func main() {
... 之前的一样
historyStream := historyPb.NewHistoryClient(conn)
streaming(historyStream)
}

// 客户端流式处理
func streaming(client historyPb.HistoryClient) error {
stream, err := client.User(context.Background())
if err != nil {
logging.Logger.Errorf("stream err %v", err)
}
for n := 0; n < 10; n++ {
err := stream.Send(&historyPb.UserHistory{Msg: strconv.Itoa(n)})
if err != nil {
return err
}
fmt.Println("Streaming Send: ", n)
res, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}
fmt.Println("Streaming Recv: ", res.Msg)
}
stream.CloseSend()
return nil
}

流式拦截器

流拦截器过程和一元拦截器有所不同,同样可以分为 3 个阶段:

  • 预处理(pre-processing)
  • 调用 RPC 方法(invoking RPC method)
  • 后处理(post-processing)

预处理阶段的拦截只是在流式请求第一次 发起的时候进行拦截,后面的流式请求不会再进入到处理逻辑。 后面两种情况对应着 Streamer api 提供的两个扩展方法来进行,分别是 SendMsg 和 RecvMsg 方法。 正常情况下实现一个流式拦截器与普通拦截器一样,实现这个已经定义好的拦截器方法即可:

func StreamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {

}

如果是想在发消息之前和之进行处理, 则实现 SendMsg 和 RecvMsg

type wrappedStream struct {
grpc.ServerStream
}

func newWrappedStream(s grpc.ServerStream) grpc.ServerStream {
return &wrappedStream{s}
}

func (w *wrappedStream) RecvMsg(m interface{}) error {
fmt.Printf("Receive a message (Type: %T) at %s", m, time.Now().Format(time.RFC3339))
return w.ServerStream.RecvMsg(m)
}

func (w *wrappedStream) SendMsg(m interface{}) error {
fmt.Printf("Send a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339))
return w.ServerStream.SendMsg(m)
}

使用流式拦截器服务端修改, 下面就是同时使用流式和普通拦截器

//创建gRPC服务器,并开启TLS认证和Token认证
s := grpc.NewServer(grpc.Creds(creds),grpc.UnaryInterceptor(service.Commonnterceptor), grpc.StreamInterceptor(service.StreamInterceptor))

流式拦截器认证

// StreamInterceptor 流拦截器
func StreamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
logging.Logger.Infof("gRPC stream method1: %s ", info.FullMethod)

// 流式验证Token
username, err := middleware.CheckAuth(ss.Context())
if err != nil {
logging.Logger.Warnf("err : %vn", err)
return err
}

err = handler(srv, newWrappedStream(ss))

if err != nil {
logging.Logger.Warnf("err11 : %vn", err)
return err
}
logging.Logger.Infof("gRPC stream method2: %s ", info.FullMethod)
logging.Logger.Infof("用户 %s ", username)

return err
}

发布订阅模式

服务端的信息更改时,通知客户端

syntax="proto3";
package pb;
option go_package="./pb";

message Msg{
string value=1;
}

service PubsubService {
// 发布是rpc的普通方法
rpc Publish (Msg) returns (Msg);
// 订阅则是一个单向的流服务,服务端返回的数据可能很大
rpc Subscribe (Msg) returns (stream Msg);
}

package service

import (
"context"
"github.com/ZhuriLab/KuaFuServer/grpc/pb/pb"
"github.com/docker/docker/pkg/pubsub"
"strings"
"time"
)

/**
@author: yhy
@since: 2022/8/11
@desc: //TODO
**/


type PubsubService struct {
pub *pubsub.Publisher
pb.UnimplementedPubsubServiceServer
}

func NewPubsubService() *PubsubService {
return &PubsubService{
pub: pubsub.NewPublisher(100*time.Millisecond, 10),
}
}

// Publish 实现发布方法
func (p *PubsubService) Publish(ctx context.Context, arg *pb.Msg) (*pb.Msg, error) {
// 发布消息
p.pub.Publish(arg.GetValue())
return &pb.Msg{}, nil
}

// Subscribe 实现订阅方法
func (p *PubsubService) Subscribe(arg *pb.Msg, stream pb.PubsubService_SubscribeServer) error {
// SubscribeTopic 增加一个使用函数过滤器的订阅者
// func(v interface{}) 定义函数过滤的规则
// SubscribeTopic 返回一个chan interface{}
ch := p.pub.SubscribeTopic(func(v interface{}) bool {
// 接收数据是string,并且key是以arg为前缀的
if key, ok := v.(string); ok {
if strings.HasPrefix(key, arg.GetValue()) {
return true
}
}
return false
})

// 服务器遍历chan,并将其中信息发送给订阅客户端
for v := range ch {
if err := stream.Send(&pb.Msg{Value: v.(string)}); err != nil {
return err
}
}

return nil
}

服务端

pb.RegisterPubsubServiceServer(s, service.NewPubsubService())

客户端

func main(){
....
go Subscribe(conn)

client := pb.NewPubsubServiceClient(conn)
// 客户端发布信息 golang :hello Go
_, err := client.Publish(context.Background(), &pb.Msg{Value: "golang: hello Go"})
if err != nil {
log.Fatal(err)
}
}

// 订阅
func Subscribe(conn *grpc.ClientConn) {
// 新建一个客户端
client := pb.NewPubsubServiceClient(conn)
// 订阅服务,传入参数是 golang:
// 会想过滤器函数,订阅者应该收到的信息为 golang: hello Go
stream, err := client.Subscribe(context.Background(), &pb.Msg{Value: "golang: "})
if err != nil {
log.Fatal("123 ", err)
}

// 阻塞遍历流,输出结果
for {
reply, err := stream.Recv()

if err != nil {
if err == io.EOF {
break
}
fmt.Println("=======1111=========")
log.Fatal("22 ", err)
}
fmt.Println("================")
fmt.Println(reply.GetValue())
pkg.UserMsgContent += reply.GetValue()
}
}


原文始发于微信公众号(谁不想当剑仙):GRPC 学习笔记

  • 左青龙
  • 微信扫一扫
  • weinxin
  • 右白虎
  • 微信扫一扫
  • weinxin
admin
  • 本文由 发表于 2023年3月5日11:12:53
  • 转载请保留本文链接(CN-SEC中文网:感谢原作者辛苦付出):
                   GRPC 学习笔记https://cn-sec.com/archives/1588186.html

发表评论

匿名网友 填写信息