4、Golang——gRPC+protobuf
1、gRPC引入原因
(传输+序列化)
微服务架构中需要进行进程与进程、服务器与服务器之间的通信,需要发起网络调用
在微服务架构中,http虽然便捷,但是其性能较低,所以引入了RPC(远程过程调度),通过自定义协议发起TCP调度,来加快传输效率。
2、gRPC介绍
数据在网络中传输的时候,需要被序列化,序列化协议有很多种,比如xml、json、protobuf等,其中gRPC默认使用protobuf,这是google开源的一套成熟的结构数据序列化机制。
序列化的目的:便于传输以及存储。
3、protobuf
4、protobuf安装
4.1 下载通用编译器:https://github.com/protocolbuffers/protobuf
4.2 配置环境变量
4.3 安装对应语言的protoc的生成器
4.4 protobuf使用
5、protobuf简单例子
文件目录:
.
├── go.mod
├── go.sum
├── main.go
├── pbfile
│ └── user.proto
└── service
└── user.pb.go user.proto
//指定语法版本
syntax ="proto3";
//对应生成的目录
option go_package="../service";
//option go_package= "path;name";
//path表示生成的go文件的存放地址,name表示生成的go文件所属的报名
//生成文件之后,它的包名
package service;
//消息 传输的对象
message User{
string username=1;
int32 age=2;
} 执行
protoc --go_out=生成文件的位路径 proto文件的路径
protoc --go_out=./ ./user.proto
main.go
package main
import (
"fmt"
"google.golang.org/protobuf/proto"
"myproto/service"
)
func main() {
user := &service.User{
Username: "hyh",
Age: 18,
}
//序列化的过程
marshal, err := proto.Marshal(user)
if err != nil {
panic(err)
}
//反序列化的过程
newUser := &service.User{}
err = proto.Unmarshal(marshal, newUser)
if err != nil {
panic(err)
}
fmt.Println(newUser.String())
} 6、proto文件说明
message:
5.1 关键字 optional repeated
//指定语法版本
syntax ="proto3";
//对应生成的目录
option go_package="../service";
//option go_package= "path;name";
//path表示生成的go文件的存放地址,name表示生成的go文件所属的包名
//生成文件之后,它的包名
package service;
//消息 传输的对象
message User{
string username=1;
int32 age=2;
optional string password=3; //optional 关键字 生成指针
repeated string addresses=4; //repeated 关键字 生成切片
} 5.2 字段映射
5.3 默认值
5.4 标识号
标识号:在消息体的定义中,每个字段都必须有一个唯一的标识号,标识号[0,2^29-1]范围内的一个整数。
5.5 定义多个消息类型
一个proto文件定义多个消息类型:
5.6 定义嵌套消息
5.7 自定义服务
如果想让将消息类型用在RPC系统中,可以在.proto文件中定义一个RPC服务接口,protocol buffer编译器会根据所选择的不同语言生成服务接口代码以及存根(stub)
以上代码在go语言中相当于定义了一个接口。
5.8 protobuf 序列化与反序列化
main.go:
package main
import (
"fmt"
"google.golang.org/protobuf/proto"
"log"
"test/user"
)
func main() {
article := &user.Article{
Aid: 1,
Title: "protobuf",
Views: 100,
}
//序列化 结构体转二进制 bytes, _ := proto.Marshal(article) fmt.Printf("bytes: %v\n", bytes)
//反序列化 二进制转结构体 other := &user.Article{} err := proto.Unmarshal(bytes, other) if err != nil {
log.Fatal("失败", err)
}
fmt.Println("other: %v %v %v", other.Aid, other.Title, other.Views)
} 5.9 protobuf与json相互切换
user.proto:
syntax="proto3";
option go_package="./;blog";
package blog;
message User{
int32 uid=1;
string uname =2;
int32 age=3;
}
message Article{
int32 aid=1;
string title=2;
int32 views=3;
} main.go
package main
import (
"fmt"
"google.golang.org/protobuf/encoding/protojson"
"test/blog"
)
func main() {
article := &blog.Article{
Aid: 1,
Title: "protobuf",
Views: 100,
}
//message to json
s := protojson.Format(article.ProtoReflect().Interface())
fmt.Printf("s:%v\n", s)
//json to message
m := article.ProtoReflect().Interface()
protojson.Unmarshal([]byte(s), m)
fmt.Printf("m:%v\n", m)
} 6、gRPC
暂时无法在飞书文档外展示此内容
6.1 RPC与gRPC
官方文档:https://grpc.io/
底层协议:
HTTP2:
6.2 gRPC实例
代码结构:
myproto
├── client
│ ├── grpc_client.go
│ └── service
│ └── product.pb.go
├── go.mod
├── go.sum
├── grpc_server.gp.go
├── pbfile
│ └── product.proto
└── service
├── product.go
└── product.pb.go 说明:myproto/client/service/product.pb.go与myproto/service/product.pb.go内容完全相同,但是前者是为grpc_client.go提供grpc服务,后者是为grpc_server.gp.go提供grpc服务。
/myproto/pbfile/product.proto
syntax ="proto3";
option go_package="../service";
//option go_package= "path;name";
//path表示生成的go文件的存放地址,name表示生成的go文件所属的报名
package service;
message ProductRequest{
int32 prod_id=1;
}
message ProductResponse{
int32 prod_stock=1;
}
//定义服务主体
service ProService{
//定义方法
rpc GetProductStock(ProductRequest) returns(ProductResponse);
} 执行命令:
protoc --go_out=plugins=grpc:./ ./product.proto
/myproto/server/product.go
package service
import "context"
//定义一个结构体
type productService struct {
}
//定义一个结构体实例
var ProductService = &productService{}
//实现该变量的ProServiceServer接口方法,
func (p *productService) GetProductStock(ctx context.Context, request *ProductRequest) (*ProductResponse, error) {
//todo 具体业务逻辑 stock := p.GetStockById(request.ProdId)
return &ProductResponse{ProdStock: stock}, nil
}
func (p *productService) GetStockById(id int32) int32 {
return 110
} /myproto/client/client.go
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"log"
"myproto/client/service"
)
func main() {
conn, err := grpc.Dial(":8008", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalln("服务端出错,连接不上!", err)
}
defer conn.Close()
prodClient := service.NewProServiceClient(conn)
requst := &service.ProductRequest{
ProdId: 123,
}
stockResponse, err := prodClient.GetProductStock(context.Background(), requst)
if err != nil {
log.Fatal("查询库存失败", err)
}
fmt.Println("查询成功,输出:", stockResponse.ProdStock)
} /myproto/grpc_server.go
package main
import (
"fmt"
"google.golang.org/grpc"
"log"
"myproto/service"
"net"
)
func main() {
//new一个服务端
recServer := grpc.NewServer()
//service 是我们自己的包,里面有一个RegisterProServiceServer的方法注册一个服务
//接口实现service.ProductService被注册上了
service.RegisterProServiceServer(recServer, service.ProductService)
listener, err := net.Listen("tcp", ":8002")
if err != nil {
log.Fatalln("启动监听失败", err)
}
err = recServer.Serve(listener)
if err != nil {
log.Fatalln("启动服务失败", err)
}
fmt.Println("启动gRPC服务成功")
} 7、gRPC安全传输
7.1 生成自签名证书
1) 下载安装openssl
2)生成密钥文件
openssl genrsa -des3 -out ca.key 2048
3)创建证书请求(对应的公钥生成)
openssl req -new -key ca.key -out ca.csr
4)ca.crt
openssl x509 -req -days 365 -in ca.csr -signkey ca.key -out ca.crt
5} 找到openssl.cnf 文件
- 打开copy_extensions = copy
- 打开 req_extensions = v3_req
- 找到[ v3_req ],添加 subjectAltName = @alt_names
- 添加新的标签 [ alt_names ] , 和标签字段
[ alt_names ]DNS.1 = *.zjtd.com
6)生成证书私钥server.key
openssl genpkey -algorithm RSA -out server.key
7)通过私钥server.key生成证书请求文件server.csr
openssl req -new -nodes -key server.key -out server.csr -days 3650 -config ./openssl.cnf -extensions v3_req
8)生成SAN证书
openssl x509 -req -days 365 -in server.csr -out server.pem -CA ca.crt -CAkey ca.key -CAcreateserial -extfile ./openssl.cnf -extensions v3_req
- key: 服务器上的私钥文件,用于对发送给客户端数据的加密,以及对从客户端接收到数据的解密。
- csr: 证书签名请求文件,用于提交给证书颁发机构(CA)对证书签名。
- crt: 由证书颁发机构(CA)签名后的证书,或者是开发者自签名的证书,包含证书持有人的信息,持有人的公钥,以及签署者的签名等信息。
- pem: 是基于Base64编码的证书格式,扩展名包括PEM、CRT和CER。
Q :什么是 SAN?
A :SAN(Subject Alternative Name)是 SSL 标准 x509 中定义的一个扩展。使用了 SAN 字段的 SSL 证书,可以扩展此证书支持的域名,使得一个证书可以支持多个不同域名的解析。
9)实例代码
目录:
.
├── cert
│ ├── ca.crt
│ ├── ca.csr
│ ├── ca.key
│ ├── ca.srl
│ ├── openssl.cnf
│ ├── server.csr
│ ├── server.key
│ └── server.pem
├── client
│ ├── grpc_client.go
│ └── service
│ └── product.pb.go
├── go.mod
├── go.sum
├── grpc_server.gp
├── grpc_server.gp.go
├── pbfile
│ └── product.proto
└── service
├── product.go
└── product.pb.go grpc_server.gp.go:服务端
package main
import (
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"log"
"myproto/service"
"net"
)
func main() {
//添加证书
creds, err2 := credentials.NewServerTLSFromFile("cert/server.pem", "cert/server.key")
if err2 != nil {
log.Fatal("证书生成错误", err2)
}
//new一个服务端
recServer := grpc.NewServer(grpc.Creds(creds))
//service 是我们自己的包,里面有一个RegisterProServiceServer的方法注册一个服务
//接口实现service.ProductService被注册上了
// service.RegisterProServiceServer(服务端, 实现主体函数的任意实例 interface{})
service.RegisterProServiceServer(recServer, service.ProductService)
listener, err := net.Listen("tcp", ":8008")
if err != nil {
log.Fatalln("启动监听失败", err)
}
err = recServer.Serve(listener)
if err != nil {
log.Fatalln("启动服务失败", err)
}
fmt.Println("启动gRPC服务成功")
} grpc_client.go:客户端
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"log"
"myproto/client/service"
)
func main() {
//添加证书
creds, err2 := credentials.NewClientTLSFromFile("cert/server.pem", "*.zjtd.com")
if err2 != nil {
log.Fatal("证书错误", err2)
}
//无证书方式
//conn, err := grpc.Dial(":8008", grpc.WithTransportCredentials(insecure.NewCredentials()))
//if err != nil {
// log.Fatalln("服务端出错,连接不上!", err)
//}
conn, err := grpc.Dial(":8008", grpc.WithTransportCredentials(creds))
if err != nil {
log.Fatalln("服务端出错,连接不上!", err)
}
defer conn.Close()
prodClient := service.NewProServiceClient(conn)
requst := &service.ProductRequest{
ProdId: 123,
}
stockResponse, err := prodClient.GetProductStock(context.Background(), requst)
if err != nil {
log.Fatal("查询库存失败", err)
}
fmt.Println("查询成功,输出:", stockResponse.ProdStock)
} 上述认证方式为单向认证:
中间人攻击
7.2 双向认证
上面的server.pem和server.key 是服务端的 公钥和私钥。
如果双向认证,客户端也需要生成对应的公钥和私钥。
1)私钥:
openssl genpkey -algorithm RSA -out client.key
2)证书:
openssl req -new -nodes -key client.key -out client.csr -days 3650 -config ./openssl.cnf -extensions v3_req
3)SAN证书:
openssl x509 -req -days 365 -in client.csr -out client.pem -CA ca.crt -CAkey ca.key -CAcreateserial -extfile ./openssl.cnf -extensions v3_req
4)实例代码
目录:
.
├── cert
│ ├── ca.crt
│ ├── ca.csr
│ ├── ca.key
│ ├── ca.srl
│ ├── client.csr
│ ├── client.key
│ ├── client.pem
│ ├── openssl.cnf
│ ├── server.csr
│ ├── server.key
│ └── server.pem
├── client
│ ├── grpc_client.go
│ └── service
│ └── product.pb.go
├── go.mod
├── go.sum
├── grpc_server.gp
├── grpc_server.gp.go
├── pbfile
│ └── product.proto
└── service
├── product.go
└── product.pb.go grpc_server.gp.go:
package main
import (
"crypto/tls"
"crypto/x509"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"io/ioutil"
"log"
"myproto/service"
"net"
)
func main() {
////添加单向证书
//creds, err2 := credentials.NewServerTLSFromFile("cert/server.pem", "cert/server.key")
//if err2 != nil {
// log.Fatal("证书生成错误", err2)
//}
//添加双向证书
cert, err := tls.LoadX509KeyPair("cert/server.pem", "cert/server.key")
if err != nil {
log.Fatal("证书读取错误", err)
}
// 创建一个新的、空的 CertPool
certPool := x509.NewCertPool()
ca, err := ioutil.ReadFile("cert/ca.crt")
if err != nil {
log.Fatal("ca证书读取错误", err)
}
// 尝试解析所传入的 PEM 编码的证书。如果解析成功会将其加到 CertPool 中,便于后面的使用
certPool.AppendCertsFromPEM(ca)
// 构建基于 TLS 的 TransportCredentials 选项
creds := credentials.NewTLS(&tls.Config{
// 设置证书链,允许包含一个或多个
Certificates: []tls.Certificate{cert},
// 要求必须校验客户端的证书。可以根据实际情况选用以下参数
ClientAuth: tls.RequireAndVerifyClientCert,
// 设置根证书的集合,校验方式使用 ClientAuth 中设定的模式
ClientCAs: certPool,
})
//new一个服务端
recServer := grpc.NewServer(grpc.Creds(creds))
//service 是我们自己的包,里面有一个RegisterProServiceServer的方法注册一个服务
//接口实现service.ProductService被注册上了
// service.RegisterProServiceServer(服务端, 实现主体函数的任意实例 interface{})
service.RegisterProServiceServer(recServer, service.ProductService)
fmt.Println("认证成功")
listener, err := net.Listen("tcp", ":8008")
if err != nil {
log.Fatalln("启动监听失败", err)
}
err = recServer.Serve(listener)
if err != nil {
log.Fatalln("启动服务失败", err)
}
fmt.Println("启动gRPC服务成功")
} grpc_client.go:
package main
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"io/ioutil"
"log"
"myproto/client/service"
)
func main() {
////添加证书
//creds, err2 := credentials.NewClientTLSFromFile("cert/server.pem", "*.zjtd.com")
//if err2 != nil {
// log.Fatal("证书错误", err2)
//}
////无证书方式
//conn, err := grpc.Dial(":8008", grpc.WithTransportCredentials(insecure.NewCredentials()))
//if err != nil {
// log.Fatalln("服务端出错,连接不上!", err)
//}
//添加双向认证
// 证书认证-双向认证
// 从证书相关文件中读取和解析信息,得到证书公钥、密钥对
cert, _ := tls.LoadX509KeyPair("cert/client.pem", "cert/client.key")
// 创建一个新的、空的 CertPool
certPool := x509.NewCertPool()
ca, _ := ioutil.ReadFile("cert/ca.crt")
// 尝试解析所传入的 PEM 编码的证书。如果解析成功会将其加到 CertPool 中,便于后面的使用
certPool.AppendCertsFromPEM(ca)
// 构建基于 TLS 的 TransportCredentials 选项
creds := credentials.NewTLS(&tls.Config{
// 设置证书链,允许包含一个或多个
Certificates: []tls.Certificate{cert},
// 要求必须校验客户端的证书。可以根据实际情况选用以下参数
ServerName: "*.zjtd.com",
RootCAs: certPool,
})
conn, err := grpc.Dial(":8008", grpc.WithTransportCredentials(creds))
if err != nil {
log.Fatalln("服务端出错,连接不上!", err)
}
defer conn.Close()
prodClient := service.NewProServiceClient(conn)
requst := &service.ProductRequest{
ProdId: 123,
}
stockResponse, err := prodClient.GetProductStock(context.Background(), requst)
if err != nil {
log.Fatal("查询库存失败", err)
}
fmt.Println("查询成功,输出:", stockResponse.ProdStock)
} 7.5 Token 认证
jwt等等
目录:
─ cert
│ ├── ca.crt
│ ├── ca.csr
│ ├── ca.key
│ ├── ca.srl
│ ├── client.csr
│ ├── client.key
│ ├── client.pem
│ ├── openssl.cnf
│ ├── server.csr
│ ├── server.key
│ └── server.pem
├── client
│ ├── auth
│ │ └── auth.go
│ ├── grpc_client.go
│ └── service
│ └── product.pb.go
├── go.mod
├── go.sum
├── grpc_server.gp
├── grpc_server.gp.go
├── pbfile
│ └── product.proto
└── service
├── product.go
└── product.pb.go 1)服务器添加用户名密码的校验
package main
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"io/ioutil"
"log"
"myproto/service"
"net"
)
func main() {
////添加单向证书
//creds, err2 := credentials.NewServerTLSFromFile("cert/server.pem", "cert/server.key")
//if err2 != nil {
// log.Fatal("证书生成错误", err2)
//}
//添加双向证书
cert, err := tls.LoadX509KeyPair("cert/server.pem", "cert/server.key")
if err != nil {
log.Fatal("证书读取错误", err)
}
// 创建一个新的、空的 CertPool
certPool := x509.NewCertPool()
ca, err := ioutil.ReadFile("cert/ca.crt")
if err != nil {
log.Fatal("ca证书读取错误", err)
}
// 尝试解析所传入的 PEM 编码的证书。如果解析成功会将其加到 CertPool 中,便于后面的使用
certPool.AppendCertsFromPEM(ca)
// 构建基于 TLS 的 TransportCredentials 选项
creds := credentials.NewTLS(&tls.Config{
// 设置证书链,允许包含一个或多个
Certificates: []tls.Certificate{cert},
// 要求必须校验客户端的证书。可以根据实际情况选用以下参数
ClientAuth: tls.RequireAndVerifyClientCert,
// 设置根证书的集合,校验方式使用 ClientAuth 中设定的模式
ClientCAs: certPool,
})
//实现token认证,需要合法的用户名和密码
//实现一个拦截器
var AuthInterceptor grpc.UnaryServerInterceptor
AuthInterceptor = func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (resp interface{}, err error) {
//拦截普通方法请求,验证tokrn
err = Auth(ctx)
if err != nil {
return
}
return handler(ctx, req)
}
recServer := grpc.NewServer(grpc.Creds(creds), grpc.UnaryInterceptor(AuthInterceptor))
//new一个服务端
//recServer := grpc.NewServer(grpc.Creds(creds))
//service 是我们自己的包,里面有一个RegisterProServiceServer的方法注册一个服务
//接口实现service.ProductService被注册上了
// service.RegisterProServiceServer(服务端, 实现主体函数的任意实例 interface{})
service.RegisterProServiceServer(recServer, service.ProductService)
fmt.Println("认证成功")
listener, err := net.Listen("tcp", ":8008")
if err != nil {
log.Fatalln("启动监听失败", err)
}
err = recServer.Serve(listener)
if err != nil {
log.Fatalln("启动服务失败", err)
}
fmt.Println("启动gRPC服务成功")
}
func Auth(ctx context.Context) error {
//实际上拿到传输的用户名和密码
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return fmt.Errorf("missing credentials")
}
var user string
var password string
if val, ok := md["user"]; ok {
user = val[0]
}
if val, ok := md["password"]; ok {
password = val[0]
}
if user != "hyh" || password != "123456" {
return status.Error(codes.Unauthenticated, "token不合法")
}
return nil
} 2)auth.go
package auth
import "context"
type Authentication struct {
User string
Password string
}
func (a *Authentication) GetRequestMetadata(context.Context, ...string) (map[string]string, error) {
return map[string]string{"user": a.User, "password": a.Password}, nil
}
func (a *Authentication) RequireTransportSecurity() bool {
return false
} 3)客户端
package main
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"io/ioutil"
"log"
"myproto/client/auth"
"myproto/client/service"
)
func main() {
////添加证书
//creds, err2 := credentials.NewClientTLSFromFile("cert/server.pem", "*.zjtd.com")
//if err2 != nil {
// log.Fatal("证书错误", err2)
//}
////无证书方式
//conn, err := grpc.Dial(":8008", grpc.WithTransportCredentials(insecure.NewCredentials()))
//if err != nil {
// log.Fatalln("服务端出错,连接不上!", err)
//}
//添加双向认证
// 证书认证-双向认证
// 从证书相关文件中读取和解析信息,得到证书公钥、密钥对
cert, _ := tls.LoadX509KeyPair("cert/client.pem", "cert/client.key")
// 创建一个新的、空的 CertPool
certPool := x509.NewCertPool()
ca, _ := ioutil.ReadFile("cert/ca.crt")
// 尝试解析所传入的 PEM 编码的证书。如果解析成功会将其加到 CertPool 中,便于后面的使用
certPool.AppendCertsFromPEM(ca)
// 构建基于 TLS 的 TransportCredentials 选项
creds := credentials.NewTLS(&tls.Config{
// 设置证书链,允许包含一个或多个
Certificates: []tls.Certificate{cert},
// 要求必须校验客户端的证书。可以根据实际情况选用以下参数
ServerName: "*.zjtd.com",
RootCAs: certPool,
})
//Token认证
token := &auth.Authentication{
User: "hyh",
Password: "123456",
}
conn, err := grpc.Dial(":8008", grpc.WithTransportCredentials(creds), grpc.WithPerRPCCredentials(token))
if err != nil {
log.Fatalln("服务端出错,连接不上!", err)
}
defer conn.Close()
prodClient := service.NewProServiceClient(conn)
requst := &service.ProductRequest{
ProdId: 123,
}
stockResponse, err := prodClient.GetProductStock(context.Background(), requst)
if err != nil {
log.Fatal("查询库存失败", err)
}
fmt.Println("查询成功,输出:", stockResponse.ProdStock)
} 8、更换protoc-gen-go生成器
前面的课程中,我们使用的proto的go生成器,使用的命令是
go get github.com/golang/protobuf/protoc-gen-go,在https://www.grpc.io/docs/languages/go/quickstart/ 中,我们发现
官方使用的和我们使用的并不一致。
github的方式,需要使用--go_out=plugins=grpc来去进行生成,而在golang.org方式中,弃用了这种方式,使用protoc-gen-go将不再支持gRPC service的定义,需要使用新的插件protoc-gen-go-grpc。
1. 使用google.golang.org/protobuf
- 安装插件
$ go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.28$ go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2
- 安装完成后会在gopath下的bin目录下生成
- 利用proto文件重新生成go文件
#在myproto文件下运行 protoc --go_out=./service --go-grpc_out=./service pbfile/product.proto
- 修改之前的service实现
package service
import (
"context"
)
var ProductService = &productService{}
type productService struct {
}
func (p *productService) GetProductStock(ctx context.Context, request *ProductRequest) (*ProductResponse, error) {
return &ProductResponse{ProdStock: request.ProdId}, nil
}
func (p *productService) mustEmbedUnimplementedProdServiceServer() {} 9、import使用
1、import
文件目录:
.
├── cert
│ ├── ca.crt
│ ├── ca.csr
│ ├── ca.key
│ ├── ca.srl
│ ├── client.csr
│ ├── client.key
│ ├── client.pem
│ ├── openssl.cnf
│ ├── server.csr
│ ├── server.key
│ └── server.pem
├── client
│ ├── auth
│ │ └── auth.go
│ ├── grpc_client.go
│ └── service
│ ├── product.pb.go
│ ├── product_grpc.pb.go
│ └── user.pb.go
├── go.mod
├── go.sum
├── grpc_server.gp
├── grpc_server.gp.go
├── pbfile
│ ├── product.proto
│ └── user.proto
└── service
├── product.go
├── product.pb.go
├── product_grpc.pb.go
└── user.pb.go product.proto:
syntax ="proto3";
import "pbfile/user.proto";
option go_package="../service";
//option go_package= "path;name";
//path表示生成的go文件的存放地址,name表示生成的go文件所属的报名
package service;
message ProductRequest{
int32 prod_id=1;
}
message ProductResponse{
int32 prod_stock=1;
User user=2;
}
//定义服务主体
service ProService{
//定义方法
rpc GetProductStock(ProductRequest) returns(ProductResponse);
} user.proto:
syntax = "proto3";
option go_package="../service";
package service;
message User{
string username=1;
int32 age=2;
optional string password=3;
repeated string addresses=4;
} 编译proto文件:
protoc --go_out=./service --go-grpc_out=./service pbfile/user.proto protoc --go_out=./service --go-grpc_out=./service pbfile/product.proto
product.go:
package service
import (
"context"
)
//定义一个结构体
type productService struct {
}
//定义一个结构体实例
var ProductService = &productService{}
//实现该变量的ProServiceServer接口方法,
func (p *productService) GetProductStock(ctx context.Context, request *ProductRequest) (*ProductResponse, error) {
//todo 具体业务逻辑 stock := p.GetStockById(request.ProdId)
user := &User{Username: "hyh"}
return &ProductResponse{ProdStock: stock, User: user}, nil
}
func (p *productService) GetStockById(id int32) int32 {
return id
}
func (p *productService) mustEmbedUnimplementedProServiceServer() {} grpc_client.go:
package main
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"io/ioutil"
"log"
"zjtd.com/myproto/client/auth"
"zjtd.com/myproto/client/service"
)
func main() {
////添加证书
//creds, err2 := credentials.NewClientTLSFromFile("cert/server.pem", "*.zjtd.com")
//if err2 != nil {
// log.Fatal("证书错误", err2)
//}
////无证书方式
//conn, err := grpc.Dial(":8008", grpc.WithTransportCredentials(insecure.NewCredentials()))
//if err != nil {
// log.Fatalln("服务端出错,连接不上!", err)
//}
//添加双向认证
// 证书认证-双向认证
// 从证书相关文件中读取和解析信息,得到证书公钥、密钥对
cert, _ := tls.LoadX509KeyPair("cert/client.pem", "cert/client.key")
// 创建一个新的、空的 CertPool
certPool := x509.NewCertPool()
ca, _ := ioutil.ReadFile("cert/ca.crt")
// 尝试解析所传入的 PEM 编码的证书。如果解析成功会将其加到 CertPool 中,便于后面的使用
certPool.AppendCertsFromPEM(ca)
// 构建基于 TLS 的 TransportCredentials 选项
creds := credentials.NewTLS(&tls.Config{
// 设置证书链,允许包含一个或多个
Certificates: []tls.Certificate{cert},
// 要求必须校验客户端的证书。可以根据实际情况选用以下参数
ServerName: "*.zjtd.com",
RootCAs: certPool,
})
//Token认证
token := &auth.Authentication{
User: "hyh",
Password: "123456",
}
conn, err := grpc.Dial(":8008", grpc.WithTransportCredentials(creds), grpc.WithPerRPCCredentials(token))
if err != nil {
log.Fatalln("服务端出错,连接不上!", err)
}
defer conn.Close()
prodClient := service.NewProServiceClient(conn)
requst := &service.ProductRequest{
ProdId: 123,
}
stockResponse, err := prodClient.GetProductStock(context.Background(), requst)
if err != nil {
log.Fatal("查询库存失败", err)
}
fmt.Println("查询成功,输出:", stockResponse.ProdStock, stockResponse.User)
} 2、任意类型
// 使用any类型,需要导入这个 import "google/protobuf/any.proto"; // 定义入参消息 message HelloParam{ // any,代表可以是任何类型 暂时不知道定义为什么类型 google.protobuf.Any data = 1;
} // 这个就是protobuf的中间文件// 指定的当前proto语法的版本,有2和3 syntax = "proto3"; //从执行 protoc这个命令的当前目录开始算起, import "user.proto"; // 使用any类型,需要导入这个 import "google/protobuf/any.proto";
option go_package="../service"; // 指定等会文件生成出来的package package service; // 定义request model message ProductRequest{
int32 prod_id = 1; // 1代表顺序 }
message Content {
string msg = 1;
} // 定义response model message ProductResponse{
int32 prod_stock = 1; // 1代表顺序 User user = 2;
google.protobuf.Any data = 3;
} // 定义服务主体 service ProdService{ // 定义方法 rpc GetProductStock(ProductRequest) returns(ProductResponse);
} func (p *productService) GetProductStock(context context.Context, request *ProductRequest) (*ProductResponse, error) { //实现具体的业务逻辑 stock := p.GetStockById(request.ProdId)
user := User{Username: "mszlu"}
content := Content{Msg: "mszlu msg..."} //转换成any类型 any, _ := anypb.New(&content)
return &ProductResponse{ProdStock: stock, User: &user, Data: any}, nil
} 10、客户端流
在 HTTP/1.1 的时代,同一个时刻只能对一个请求进行处理或者响应,换句话说,下一个请求必须要等当前请求处理完才能继续进行。
HTTP/1.1需要注意的是,在服务端没有response的时候,客户端是可以发起多个request的,但服务端依旧是顺序对请求进行处理, 并按照收到请求的次序予以返回。
HTTP/2 的时代,多路复用的特性让一次同时处理多个请求成为了现实,并且同一个 TCP 通道中的请求不分先后、不会阻塞,HTTP/2 中引入了流(Stream) 和 帧(Frame) 的概念,当 TCP 通道建立以后,后续的所有操作都是以流的方式发送的,而二进制帧则是组成流的最小单位,属于协议层上的流式传输。
HTTP/2 在一个 TCP 连接的基础上虚拟出多个 Stream, Stream 之间可以并发的请求和处理, 并且 HTTP/2 以二进制帧 (frame) 的方式进行数据传送, 并引入了头部压缩 (HPACK), 大大提升了交互效率
10.1 流定义
1 // 普通 RPC2 rpc SimplePing(PingRequest) returns (PingReply); 34 // 客户端流式 RPC5 rpc ClientStreamPing(stream PingRequest) returns (PingReply); 67 // 服务器端流式 RPC8 rpc ServerStreamPing(PingRequest) returns (stream PingReply); 910 // 双向流式 RPC11 rpc BothStreamPing(stream PingRequest) returns (stream PingReply);
stream关键字,当该关键字修饰参数时,表示这是一个客户端流式的 gRPC 接口;当该参数修饰返回值时,表示这是一个服务器端流式的 gRPC 接口;当该关键字同时修饰参数和返回值时,表示这是一个双向流式的 gRPC 接口。
10.2 客户端流定义:
客户端可以源源不断的给服务端发送信息
rpc UpdateStockClientStream(stream ProductRequest) returns(ProductResponse);
product.proto:
定义客户端流rpc方法
syntax ="proto3";
import "pbfile/user.proto";
import "google/protobuf/any.proto";
option go_package="../service";
//option go_package= "path;name";
//path表示生成的go文件的存放地址,name表示生成的go文件所属的报名
package service;
message ProductRequest{
int32 prod_id=1;
}
message Content{
string msg=1;
}
message ProductResponse{
int32 prod_stock=1;
User user=2;
google.protobuf.Any data=3;
}
//定义服务主体
service ProService{
//定义方法
rpc GetProductStock(ProductRequest) returns(ProductResponse); rpc UpdateProductStockClientStream(stream ProductRequest) returns(ProductResponse); } 执行protoc命令:
protoc --go_out=./service --go-grpc_out=./service pbfile/product.proto
/service/product.go:
实现对应接口
package service
import (
"context"
"fmt"
"google.golang.org/protobuf/types/known/anypb"
)
//定义一个结构体
type productService struct {
}
//定义一个结构体实例
var ProductService = &productService{}
//实现该变量的ProServiceServer接口方法,
func (p *productService) GetProductStock(ctx context.Context, request *ProductRequest) (*ProductResponse, error) {
//todo 具体业务逻辑 stock := p.GetStockById(request.ProdId)
user := &User{Username: "hyh"}
contnet := &Content{Msg: "zjtd msg..."}
//转换为any类型
any, _ := anypb.New(contnet)
return &ProductResponse{ProdStock: stock, User: user, Data: any}, nil
} func (p *productService) UpdateProductStockClientStream(stream ProService_UpdateProductStockClientStreamServer) error { count := 0 for { //源源不断的接受客户端发来的信息 recv, err := stream.Recv() if err != nil { return err } fmt.Println("服务端接受到到流", recv.ProdId, count) count++ if count > 10 { rsp := &ProductResponse{ProdStock: recv.ProdId} err := stream.SendAndClose(rsp) if err != nil { return err } return nil } } } func (p *productService) GetStockById(id int32) int32 {
return id
}
func (p *productService) mustEmbedUnimplementedProServiceServer() {} /myproto/grpc_server.go
服务端代码 不需要做修改
package main
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"io/ioutil"
"log"
"net"
"zjtd.com/myproto/service"
)
func main() {
////添加单向证书
//creds, err2 := credentials.NewServerTLSFromFile("cert/server.pem", "cert/server.key")
//if err2 != nil {
// log.Fatal("证书生成错误", err2)
//}
//添加双向证书
cert, err := tls.LoadX509KeyPair("cert/server.pem", "cert/server.key")
if err != nil {
log.Fatal("证书读取错误", err)
}
// 创建一个新的、空的 CertPool
certPool := x509.NewCertPool()
ca, err := ioutil.ReadFile("cert/ca.crt")
if err != nil {
log.Fatal("ca证书读取错误", err)
}
// 尝试解析所传入的 PEM 编码的证书。如果解析成功会将其加到 CertPool 中,便于后面的使用
certPool.AppendCertsFromPEM(ca)
// 构建基于 TLS 的 TransportCredentials 选项
creds := credentials.NewTLS(&tls.Config{
// 设置证书链,允许包含一个或多个
Certificates: []tls.Certificate{cert},
// 要求必须校验客户端的证书。可以根据实际情况选用以下参数
ClientAuth: tls.RequireAndVerifyClientCert,
// 设置根证书的集合,校验方式使用 ClientAuth 中设定的模式
ClientCAs: certPool,
})
//实现token认证,需要合法的用户名和密码
//实现一个拦截器
var AuthInterceptor grpc.UnaryServerInterceptor
AuthInterceptor = func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (resp interface{}, err error) {
//拦截普通方法请求,验证tokrn
err = Auth(ctx)
if err != nil {
return
}
return handler(ctx, req)
}
recServer := grpc.NewServer(grpc.Creds(creds), grpc.UnaryInterceptor(AuthInterceptor))
//new一个服务端
//recServer := grpc.NewServer(grpc.Creds(creds))
//service 是我们自己的包,里面有一个RegisterProServiceServer的方法注册一个服务
//接口实现service.ProductService被注册上了
// service.RegisterProServiceServer(服务端, 实现主体函数的任意实例 interface{})
service.RegisterProServiceServer(recServer, service.ProductService)
fmt.Println("认证成功")
listener, err := net.Listen("tcp", ":8008")
if err != nil {
log.Fatalln("启动监听失败", err)
}
err = recServer.Serve(listener)
if err != nil {
log.Fatalln("启动服务失败", err)
}
fmt.Println("启动gRPC服务成功")
}
func Auth(ctx context.Context) error {
//实际上拿到传输的用户名和密码
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return fmt.Errorf("missing credentials")
}
var user string
var password string
if val, ok := md["user.proto"]; ok {
user = val[0]
}
if val, ok := md["password"]; ok {
password = val[0]
}
if user != "hyh" || password != "123456" {
return status.Error(codes.Unauthenticated, "token不合法")
}
return nil
} /myproto/client/grpc_client.go
需要远程调用服务端的 UpdateProductStockClientStream 函数
package main
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"io/ioutil"
"log"
"time"
"zjtd.com/myproto/client/auth"
"zjtd.com/myproto/client/service"
)
func main() {
////添加证书
//creds, err2 := credentials.NewClientTLSFromFile("cert/server.pem", "*.zjtd.com")
//if err2 != nil {
// log.Fatal("证书错误", err2)
//}
////无证书方式
//conn, err := grpc.Dial(":8008", grpc.WithTransportCredentials(insecure.NewCredentials()))
//if err != nil {
// log.Fatalln("服务端出错,连接不上!", err)
//}
//添加双向认证
// 证书认证-双向认证
// 从证书相关文件中读取和解析信息,得到证书公钥、密钥对
cert, _ := tls.LoadX509KeyPair("cert/client.pem", "cert/client.key")
// 创建一个新的、空的 CertPool
certPool := x509.NewCertPool()
ca, _ := ioutil.ReadFile("cert/ca.crt")
// 尝试解析所传入的 PEM 编码的证书。如果解析成功会将其加到 CertPool 中,便于后面的使用
certPool.AppendCertsFromPEM(ca)
// 构建基于 TLS 的 TransportCredentials 选项
creds := credentials.NewTLS(&tls.Config{
// 设置证书链,允许包含一个或多个
Certificates: []tls.Certificate{cert},
// 要求必须校验客户端的证书。可以根据实际情况选用以下参数
ServerName: "*.zjtd.com",
RootCAs: certPool,
})
//Token认证
token := &auth.Authentication{
User: "hyh",
Password: "123456",
}
conn, err := grpc.Dial(":8008", grpc.WithTransportCredentials(creds), grpc.WithPerRPCCredentials(token))
if err != nil {
log.Fatalln("服务端出错,连接不上!", err)
}
defer conn.Close()
//声明一个客户端
prodClient := service.NewProServiceClient(conn)
//requst := &service.ProductRequest{
// ProdId: 123,
//}
//stockResponse, err := prodClient.GetProductStock(context.Background(), requst)
//if err != nil {
// log.Fatal("查询库存失败", err)
//}
//fmt.Println("查询成功,输出:", stockResponse.ProdStock, stockResponse.User, stockResponse.Data)
//客户端流
stream, err := prodClient.UpdateProductStockClientStream(context.Background())
if err != nil {
log.Fatal("获取流出错", err)
}
rsp := make(chan struct{}, 1)
//声明函数处理 stream go prodRequest(stream, rsp) select {
case <-rsp:
recv, err := stream.CloseAndRecv()
if err != nil {
log.Fatal(err)
}
stock := recv.ProdStock
fmt.Println("客户端收到响应", stock)
}
}
func prodRequest(stream service.ProService_UpdateProductStockClientStreamClient, rsp chan struct{}) {
count := 0
for true {
requst := &service.ProductRequest{
ProdId: 123,
}
err := stream.Send(requst)
if err != nil {
log.Fatal(err)
}
time.Sleep(time.Second)
count++
if count > 10 {
rsp <- struct{}{}
break
}
}
} 10.3 服务端流
定义:客户端一次发送,服务端可以源源不断的回消息
rpc GetProductStockServerStream(ProductRequest) returns(stream ProductResponse);
product.proto:
syntax ="proto3";
import "pbfile/user.proto";
import "google/protobuf/any.proto";
option go_package="../service";
//option go_package= "path;name";
//path表示生成的go文件的存放地址,name表示生成的go文件所属的报名
package service;
message ProductRequest{
int32 prod_id=1;
}
message Content{
string msg=1;
}
message ProductResponse{
int32 prod_stock=1;
User user=2;
google.protobuf.Any data=3;
}
//定义服务主体
service ProService{
//定义方法
rpc GetProductStock(ProductRequest) returns(ProductResponse);
//客户端流定义方法
rpc UpdateProductStockClientStream(stream ProductRequest) returns(ProductResponse);
//服务端流定义方法 rpc GetProductStockServerStream(ProductRequest) returns(stream ProductResponse); } 执行protoc命令:
protoc --go_out=./service --go-grpc_out=./service pbfile/product.proto
/service/product.go:
package service
import (
"context"
"fmt"
"google.golang.org/protobuf/types/known/anypb"
"time"
)
//定义一个结构体
type productService struct {
}
//定义一个结构体实例
var ProductService = &productService{}
//实现该变量的ProServiceServer接口方法,
func (p *productService) GetProductStock(ctx context.Context, request *ProductRequest) (*ProductResponse, error) {
//todo 具体业务逻辑 stock := p.GetStockById(request.ProdId)
user := &User{Username: "hyh"}
contnet := &Content{Msg: "zjtd msg..."}
//转换为any类型
any, _ := anypb.New(contnet)
return &ProductResponse{ProdStock: stock, User: user, Data: any}, nil
}
func (p *productService) UpdateProductStockClientStream(stream ProService_UpdateProductStockClientStreamServer) error {
count := 0
for {
//源源不断的接受客户端发来的信息
recv, err := stream.Recv()
if err != nil {
return err
}
fmt.Println("服务端接受到到流", recv.ProdId, count)
count++
if count > 10 {
rsp := &ProductResponse{ProdStock: recv.ProdId}
err := stream.SendAndClose(rsp)
if err != nil {
return err
}
return nil
}
}
} func (p *productService) GetProductStockServerStream(request *ProductRequest, stream ProService_GetProductStockServerStreamServer) error { count := 0 for { rsp := &ProductResponse{ProdStock: request.ProdId} err := stream.Send(rsp) if err != nil { return err } time.Sleep(time.Second) count++ if count > 10 { return nil } } } func (p *productService) GetStockById(id int32) int32 {
return id
}
func (p *productService) mustEmbedUnimplementedProServiceServer() {} /myproto/grpc_server.go
package main
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"io/ioutil"
"log"
"net"
"zjtd.com/myproto/service"
)
func main() {
////添加单向证书
//creds, err2 := credentials.NewServerTLSFromFile("cert/server.pem", "cert/server.key")
//if err2 != nil {
// log.Fatal("证书生成错误", err2)
//}
//添加双向证书
cert, err := tls.LoadX509KeyPair("cert/server.pem", "cert/server.key")
if err != nil {
log.Fatal("证书读取错误", err)
}
// 创建一个新的、空的 CertPool
certPool := x509.NewCertPool()
ca, err := ioutil.ReadFile("cert/ca.crt")
if err != nil {
log.Fatal("ca证书读取错误", err)
}
// 尝试解析所传入的 PEM 编码的证书。如果解析成功会将其加到 CertPool 中,便于后面的使用
certPool.AppendCertsFromPEM(ca)
// 构建基于 TLS 的 TransportCredentials 选项
creds := credentials.NewTLS(&tls.Config{
// 设置证书链,允许包含一个或多个
Certificates: []tls.Certificate{cert},
// 要求必须校验客户端的证书。可以根据实际情况选用以下参数
ClientAuth: tls.RequireAndVerifyClientCert,
// 设置根证书的集合,校验方式使用 ClientAuth 中设定的模式
ClientCAs: certPool,
})
//实现token认证,需要合法的用户名和密码
//实现一个拦截器
var AuthInterceptor grpc.UnaryServerInterceptor
AuthInterceptor = func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (resp interface{}, err error) {
//拦截普通方法请求,验证tokrn
err = Auth(ctx)
if err != nil {
return
}
return handler(ctx, req)
}
recServer := grpc.NewServer(grpc.Creds(creds), grpc.UnaryInterceptor(AuthInterceptor))
//new一个服务端
//recServer := grpc.NewServer(grpc.Creds(creds))
//service 是我们自己的包,里面有一个RegisterProServiceServer的方法注册一个服务
//接口实现service.ProductService被注册上了
// service.RegisterProServiceServer(服务端, 实现主体函数的任意实例 interface{})
service.RegisterProServiceServer(recServer, service.ProductService)
fmt.Println("认证成功")
listener, err := net.Listen("tcp", ":8008")
if err != nil {
log.Fatalln("启动监听失败", err)
}
err = recServer.Serve(listener)
if err != nil {
log.Fatalln("启动服务失败", err)
}
fmt.Println("启动gRPC服务成功")
}
func Auth(ctx context.Context) error {
//实际上拿到传输的用户名和密码
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return fmt.Errorf("missing credentials")
}
var user string
var password string
if val, ok := md["user.proto"]; ok {
user = val[0]
}
if val, ok := md["password"]; ok {
password = val[0]
}
if user != "hyh" || password != "123456" {
return status.Error(codes.Unauthenticated, "token不合法")
}
return nil
} /myproto/client/grpc_client.go
package main
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"io"
"io/ioutil"
"log"
"zjtd.com/myproto/client/auth"
"zjtd.com/myproto/client/service"
)
func main() {
////添加证书
//creds, err2 := credentials.NewClientTLSFromFile("cert/server.pem", "*.zjtd.com")
//if err2 != nil {
// log.Fatal("证书错误", err2)
//}
////无证书方式
//conn, err := grpc.Dial(":8008", grpc.WithTransportCredentials(insecure.NewCredentials()))
//if err != nil {
// log.Fatalln("服务端出错,连接不上!", err)
//}
//添加双向认证
// 证书认证-双向认证
// 从证书相关文件中读取和解析信息,得到证书公钥、密钥对
cert, _ := tls.LoadX509KeyPair("cert/client.pem", "cert/client.key")
// 创建一个新的、空的 CertPool
certPool := x509.NewCertPool()
ca, _ := ioutil.ReadFile("cert/ca.crt")
// 尝试解析所传入的 PEM 编码的证书。如果解析成功会将其加到 CertPool 中,便于后面的使用
certPool.AppendCertsFromPEM(ca)
// 构建基于 TLS 的 TransportCredentials 选项
creds := credentials.NewTLS(&tls.Config{
// 设置证书链,允许包含一个或多个
Certificates: []tls.Certificate{cert},
// 要求必须校验客户端的证书。可以根据实际情况选用以下参数
ServerName: "*.zjtd.com",
RootCAs: certPool,
})
//Token认证
token := &auth.Authentication{
User: "hyh",
Password: "123456",
}
conn, err := grpc.Dial(":8008", grpc.WithTransportCredentials(creds), grpc.WithPerRPCCredentials(token))
if err != nil {
log.Fatalln("服务端出错,连接不上!", err)
}
defer conn.Close()
prodClient := service.NewProServiceClient(conn)
//requst := &service.ProductRequest{
// ProdId: 123,
//}
//stockResponse, err := prodClient.GetProductStoGetProductStockck(context.Background(), requst)
//if err != nil {
// log.Fatal("查询库存失败", err)
//}
//fmt.Println("查询成功,输出:", stockResponse.ProdStock, stockResponse.User, stockResponse.Data)
//客户端流
// stream, err := prodClient.UpdateProductStockClientStream(context.Background())
// if err != nil {
// log.Fatal("获取流出错", err)
// }
// rsp := make(chan struct{}, 1)
// go prodRequest(stream, rsp)
// select {
// case <-rsp:
// recv, err := stream.CloseAndRecv()
// if err != nil {
// log.Fatal(err)
// }
// stock := recv.ProdStock
// fmt.Println("客户端收到响应", stock)
// }
//服务端流 requst := &service.ProductRequest{ ProdId: 123, } stream, err := prodClient.GetProductStockServerStream(context.Background(), requst) if err != nil { log.Fatal("获取流出错") } for { recv, err := stream.Recv() if err != nil { if err == io.EOF { fmt.Println("客户端数据接受完成") err := stream.CloseSend() if err != nil { log.Fatal(err) } break } log.Fatal(err) } fmt.Println("客户端收到的流:", recv.ProdStock) } }
//func prodRequest(stream service.ProService_UpdateProductStockClientStreamClient, rsp chan struct{}) {
// count := 0
// for true {
// requst := &service.ProductRequest{
// ProdId: 123,
// }
// err := stream.Send(requst)
// if err != nil {
// log.Fatal(err)
// }
// time.Sleep(time.Second)
// count++
// if count > 10 {
// rsp <- struct{}{}
// break
// }
// }
//} 10.4 双向流
双向通信使用双向流,资源消耗过高-》websocket
双向流用于心跳检测还是可以的
定义:互为服务端
product.proto:
syntax ="proto3";
import "pbfile/user.proto";
import "google/protobuf/any.proto";
option go_package="../service";
//option go_package= "path;name";
//path表示生成的go文件的存放地址,name表示生成的go文件所属的报名
package service;
message ProductRequest{
int32 prod_id=1;
}
message Content{
string msg=1;
}
message ProductResponse{
int32 prod_stock=1;
User user=2;
google.protobuf.Any data=3;
}
//定义服务主体
service ProService{
//定义方法
rpc GetProductStock(ProductRequest) returns(ProductResponse);
//客户端流定义方法
rpc UpdateProductStockClientStream(stream ProductRequest) returns(ProductResponse);
//服务端流定义方法
rpc GetProductStockServerStream(ProductRequest) returns(stream ProductResponse); //双向流定义方法 rpc SayHelloStream(stream ProductRequest) returns(stream ProductResponse); } 执行protoc命令:
protoc --go_out=./service --go-grpc_out=./service pbfile/product.proto
/service/product.go:
package service
import (
"context"
"fmt"
"google.golang.org/protobuf/types/known/anypb"
"time"
)
//定义一个结构体
type productService struct {
}
//定义一个结构体实例
var ProductService = &productService{}
//实现该变量的ProServiceServer接口方法,
func (p *productService) GetProductStock(ctx context.Context, request *ProductRequest) (*ProductResponse, error) {
//todo 具体业务逻辑 stock := p.GetStockById(request.ProdId)
user := &User{Username: "hyh"}
contnet := &Content{Msg: "zjtd msg..."}
//转换为any类型
any, _ := anypb.New(contnet)
return &ProductResponse{ProdStock: stock, User: user, Data: any}, nil
}
func (p *productService) UpdateProductStockClientStream(stream ProService_UpdateProductStockClientStreamServer) error {
count := 0
for {
//源源不断的接受客户端发来的信息
recv, err := stream.Recv()
if err != nil {
return err
}
fmt.Println("服务端接受到到流", recv.ProdId, count)
count++
if count > 10 {
rsp := &ProductResponse{ProdStock: recv.ProdId}
err := stream.SendAndClose(rsp)
if err != nil {
return err
}
return nil
}
}
}
func (p *productService) GetProductStockServerStream(request *ProductRequest, stream ProService_GetProductStockServerStreamServer) error {
count := 0
for {
rsp := &ProductResponse{ProdStock: request.ProdId}
err := stream.Send(rsp)
if err != nil {
return err
}
time.Sleep(time.Second)
count++
if count > 10 {
return nil
}
}
} func (p *productService) SayHelloStream(stream ProService_SayHelloStreamServer) error { for { recv, err := stream.Recv() if err != nil { return nil } fmt.Println("服务端收到客户端端消息", recv.ProdId) time.Sleep(time.Second) rsp := &ProductResponse{ProdStock: recv.ProdId} err = stream.Send(rsp) if err != nil { return nil } } } func (p *productService) GetStockById(id int32) int32 {
return id
}
func (p *productService) mustEmbedUnimplementedProServiceServer() {} /myproto/grpc_server.go
package main
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"io/ioutil"
"log"
"net"
"zjtd.com/myproto/service"
)
func main() {
////添加单向证书
//creds, err2 := credentials.NewServerTLSFromFile("cert/server.pem", "cert/server.key")
//if err2 != nil {
// log.Fatal("证书生成错误", err2)
//}
//添加双向证书
cert, err := tls.LoadX509KeyPair("cert/server.pem", "cert/server.key")
if err != nil {
log.Fatal("证书读取错误", err)
}
// 创建一个新的、空的 CertPool
certPool := x509.NewCertPool()
ca, err := ioutil.ReadFile("cert/ca.crt")
if err != nil {
log.Fatal("ca证书读取错误", err)
}
// 尝试解析所传入的 PEM 编码的证书。如果解析成功会将其加到 CertPool 中,便于后面的使用
certPool.AppendCertsFromPEM(ca)
// 构建基于 TLS 的 TransportCredentials 选项
creds := credentials.NewTLS(&tls.Config{
// 设置证书链,允许包含一个或多个
Certificates: []tls.Certificate{cert},
// 要求必须校验客户端的证书。可以根据实际情况选用以下参数
ClientAuth: tls.RequireAndVerifyClientCert,
// 设置根证书的集合,校验方式使用 ClientAuth 中设定的模式
ClientCAs: certPool,
})
//实现token认证,需要合法的用户名和密码
//实现一个拦截器
var AuthInterceptor grpc.UnaryServerInterceptor
AuthInterceptor = func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (resp interface{}, err error) {
//拦截普通方法请求,验证tokrn
err = Auth(ctx)
if err != nil {
return
}
return handler(ctx, req)
}
recServer := grpc.NewServer(grpc.Creds(creds), grpc.UnaryInterceptor(AuthInterceptor))
//new一个服务端
//recServer := grpc.NewServer(grpc.Creds(creds))
//service 是我们自己的包,里面有一个RegisterProServiceServer的方法注册一个服务
//接口实现service.ProductService被注册上了
// service.RegisterProServiceServer(服务端, 实现主体函数的任意实例 interface{})
service.RegisterProServiceServer(recServer, service.ProductService)
fmt.Println("认证成功")
listener, err := net.Listen("tcp", ":8008")
if err != nil {
log.Fatalln("启动监听失败", err)
}
err = recServer.Serve(listener)
if err != nil {
log.Fatalln("启动服务失败", err)
}
fmt.Println("启动gRPC服务成功")
}
func Auth(ctx context.Context) error {
//实际上拿到传输的用户名和密码
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return fmt.Errorf("missing credentials")
}
var user string
var password string
if val, ok := md["user.proto"]; ok {
user = val[0]
}
if val, ok := md["password"]; ok {
password = val[0]
}
if user != "hyh" || password != "123456" {
return status.Error(codes.Unauthenticated, "token不合法")
}
return nil
} /myproto/client/grpc_client.go
package main
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"io/ioutil"
"log"
"time"
"zjtd.com/myproto/client/auth"
"zjtd.com/myproto/client/service"
)
func main() {
////添加证书
//creds, err2 := credentials.NewClientTLSFromFile("cert/server.pem", "*.zjtd.com")
//if err2 != nil {
// log.Fatal("证书错误", err2)
//}
////无证书方式
//conn, err := grpc.Dial(":8008", grpc.WithTransportCredentials(insecure.NewCredentials()))
//if err != nil {
// log.Fatalln("服务端出错,连接不上!", err)
//}
//添加双向认证
// 证书认证-双向认证
// 从证书相关文件中读取和解析信息,得到证书公钥、密钥对
cert, _ := tls.LoadX509KeyPair("cert/client.pem", "cert/client.key")
// 创建一个新的、空的 CertPool
certPool := x509.NewCertPool()
ca, _ := ioutil.ReadFile("cert/ca.crt")
// 尝试解析所传入的 PEM 编码的证书。如果解析成功会将其加到 CertPool 中,便于后面的使用
certPool.AppendCertsFromPEM(ca)
// 构建基于 TLS 的 TransportCredentials 选项
creds := credentials.NewTLS(&tls.Config{
// 设置证书链,允许包含一个或多个
Certificates: []tls.Certificate{cert},
// 要求必须校验客户端的证书。可以根据实际情况选用以下参数
ServerName: "*.zjtd.com",
RootCAs: certPool,
})
//Token认证
token := &auth.Authentication{
User: "hyh",
Password: "123456",
}
conn, err := grpc.Dial(":8008", grpc.WithTransportCredentials(creds), grpc.WithPerRPCCredentials(token))
if err != nil {
log.Fatalln("服务端出错,连接不上!", err)
}
defer conn.Close()
prodClient := service.NewProServiceClient(conn)
//requst := &service.ProductRequest{
// ProdId: 123,
//}
//stockResponse, err := prodClient.GetProductStoGetProductStockck(context.Background(), requst)
//if err != nil {
// log.Fatal("查询库存失败", err)
//}
//fmt.Println("查询成功,输出:", stockResponse.ProdStock, stockResponse.User, stockResponse.Data)
//客户端流
// stream, err := prodClient.UpdateProductStockClientStream(context.Background())
// if err != nil {
// log.Fatal("获取流出错", err)
// }
// rsp := make(chan struct{}, 1)
// go prodRequest(stream, rsp)
// select {
// case <-rsp:
// recv, err := stream.CloseAndRecv()
// if err != nil {
// log.Fatal(err)
// }
// stock := recv.ProdStock
// fmt.Println("客户端收到响应", stock)
// }
////服务端流
//requst := &service.ProductRequest{
// ProdId: 123,
//}
//stream, err := prodClient.GetProductStockServerStream(context.Background(), requst)
//if err != nil {
// log.Fatal("获取流出错")
//}
//for {
// recv, err := stream.Recv()
// if err != nil {
// if err == io.EOF {
// fmt.Println("客户端数据接受完成")
// err := stream.CloseSend()
// if err != nil {
// log.Fatal(err)
// }
// break
// }
// log.Fatal(err)
// }
// fmt.Println("客户端收到的流:", recv.ProdStock)
//}
//双向流 stream, err := prodClient.SayHelloStream(context.Background()) for { requst := &service.ProductRequest{ ProdId: 123, } err = stream.Send(requst) if err != nil { log.Fatal(err) } time.Sleep(time.Second) recv, err := stream.Recv() if err != nil { log.Fatal(err) } fmt.Println("客户端收到流的消息:", recv.ProdStock) } }
//func prodRequest(stream service.ProService_UpdateProductStockClientStreamClient, rsp chan struct{}) {
// count := 0
// for true {
// requst := &service.ProductRequest{
// ProdId: 123,
// }
// err := stream.Send(requst)
// if err != nil {
// log.Fatal(err)
// }
// time.Sleep(time.Second)
// count++
// if count > 10 {
// rsp <- struct{}{}
// break
// }
// }
//} 11、Grpc 拦截器
通常客户端请求到达服务端的时候不会立即进行业务处理,而是进行一些预处理操作,比如监控数据采集(统计 QPS),链路追踪,身份信息校验,必传参数校验等等。gRPC 为此提供了一个拦截器(Interceptor)功能来实现这一系列的操作。按照通信方式可以分为一元拦截器(Unary Interceptor)和流拦截器(Streaming Interceptor),按照应用角色可以分为客户端拦截器(Client-Side Interceptor)和服务端拦截器(Server-Side Interceptor),具体类型如下
// grpc interceptor.go grpc.UnaryClientInterceptor grpc.UnaryServerInterceptor grpc.StreamClientInterceptor grpc.StreamServerInterceptor
1、介绍
grpc服务端和客户端都提供了拦截器interceptor功能,功能类似中间件middleware,很适合在这里处理验证、日志等流程。
grpc针对不同的rpc类型为我们设计了两种类型的拦截器:
- 一元拦截器 UnaryInterceptor,可以拦截一元rpc请求
- 流式拦截器 StreamInterceptor,可以拦截服务端流式rpc、客户端流式、双向流失rpc请求
2、拦截器的声明
2.1、 一元RPC拦截器
func UnaryServerInterceptor(
ctx context.Context, // 请求上下文,可以做一些超时处理
req interface{}, // gRPC 请求参数
info *UnaryServerInfo, // gRPC 服务接口信息
handler UnaryHandler, // gRPC 实际调用方法
) (resp interface{}, err error) ctx context.Context 请求上下文 req interface{} 用户请求的参数 info *UnaryServerInfo RPC方法的所有信息,如:方法名 handler UnaryHandler 执行RPC的方法本身 type UnaryHandler func(ctx context.Context, req interface{}) (interface{}, error) resp interface{} 执行RPC方法后的响应结果 2.2.、流式RPC拦截器
func StreamServerInterceptor(
srv interface{}, // 请求参数
ss ServerStream, // gRPC 服务端流信息
info *StreamServerInfo, // gRPC 服务接口信息
handler StreamHandler // gRPC 实际调用方法
) error 3、拦截器作用?
我们可以通过拦截器执行以下操作:
- 在传递之前更新原始 gRPC 请求——例如,注入额外的信息,例如 token
- 操纵原始调用者函数的行为,例如绕过调用以便我们可以使用缓存的结果
- 在返回给客户端之前更新响应
接下来我们将实现日志记录拦截器示例。
4、拦截器的实现
4.1、服务端定义一元RPC拦截器
下面给出的代码说明了执行以下操作的一元拦截器:
- 它在 执行RPC 之前记录了请求的时间+请求的方法。
- 请求方式异常后将打印异常信息。
- 请求正常结束时,将记录结束的时间和具体的方法。
我们需要定义一个2.1中说明的 UnaryServerInterceptor 类型的函数,表示它为一元RPC拦截器:
func UnaryLoggerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
// 记录开始时间和请求的方法
log.Println("start:" + time.Now().Format("2006-01-02 15:04:05") + " " + info.FullMethod)
// 执行具体rpc方法
resp, err = handler(ctx, req)
if err != nil {
// 记录异常日志信息
log.Println("error:" + time.Now().Format("2006-01-02 15:04:05") + " " + err.Error())
return resp, err
}
// 正常结束,记录结束时间和方法
log.Println("end:" + time.Now().Format("2006-01-02 15:04:05") + " " + info.FullMethod)
return resp, nil
} 4.2 服务器定义流式RPC拦截器
func StreamLoggerInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
log.Println(time.Now().Format("2006-01-02 15:04:05") + " " + info.FullMethod)
err := handler(srv, ss)
if err != nil {
log.Println(time.Now().Format("2006-01-02 15:04:05") + " " + err.Error())
return err
}
log.Println(time.Now().Format("2006-01-02 15:04:05") + " " + info.FullMethod)
return nil
} 4.3服务端注册拦截器
// 注册一元RPC拦截器 server := grpc.NewServer(grpc.UnaryInterceptor(interceptor.UnaryLoggerInterceptor)) // 注册流式RPC拦截器 server := grpc.NewServer(grpc.StreamInterceptor(interceptor.StreamLoggerInterceptor))
5、如何将多个同类型拦截器串联
grpc中为我们提供了将多个同一类型的拦截器串起来的方法,当然也可以使用第三方框架go-grpc-middleware
// 添加多个一元rpc拦截器 unaryInterceptorChain := grpc.ChainUnaryInterceptor(interceptor.UnaryLoggerInterceptor,interceptor.UnaryLoggerInterceptor) // 添加多个流式rpc拦截器 streamInterceptorChain := grpc.ChainStreamInterceptor(interceptor.StreamLoggerInterceptor,interceptor.StreamLoggerInterceptor) // 注册拦截器 server := grpc.NewServer(unaryInterceptorChain,streamInterceptorChain)
6、拦截器的执行
7、客户端拦截器
7.1 定义拦截器
// 定义客户端一元拦截器 func UnaryClientInterceptor(
ctx context.Context, // 请求上下文,可以做一些超时处理
method string, // 请求方法
req, reply interface{}, // 请求和响应
cc *ClientConn, // 连接信息
invoker UnaryInvoker, // 调用的 gRPC 方法
opts ...CallOption // gRPC 调用预处理接***后处理接口
) error // 定义客户端流式拦截器 func StreamClientInterceptor(
ctx context.Context, // 请求上下文
desc *StreamDesc, // 调用 gRPC 方法流信息
cc *ClientConn, // 连接信息
method string, // 调用方法
streamer Streamer, // 流对象,通过 desc 初始化
opts ...CallOption // gRPC 调用预处理接***后处理接口
) (ClientStream, error) 7.2注册拦截器
// 注册一元拦截器
clientUnaryLoggerInterceptor := grpc.WithUnaryInterceptor(ClientUnaryLoggerInterceptor)
// 注册流式拦截器
clientStreamLoggerInterceptor := grpc.WithStreamInterceptor(ClientStreamLoggerInterceptor)
grpc.Dial("127.0.0.1:8000",grpc.WithInsecure(),clientUnaryLoggerInterceptor,clientStreamLoggerInterceptor) 7.3客户端添加多个拦截器
// 串联拦截器 grpc.WithChainUnaryInterceptor(ClientUnaryLoggerInterceptor) grpc.WithChainStreamInterceptor(ClientStreamLoggerInterceptor)
8、拦截器实例
tree:
. ├── client.go ├── go.mod ├── go.sum ├── pb │ ├── user.pb.go │ └── user_grpc.pb.go ├── server │ └── server.go └── user.proto
user.proto:
syntax = "proto3";
import "google/protobuf/timestamp.proto";
package pb;
option go_package = "./pb";
service ToDoService {
rpc DoWork (TodoRequest) returns (TodoResponse);
}
enum Week { Sunday = 0; Monday = 1; Tuesday = 2; Wednesday = 3; Thursday = 4; Friday = 5; Saturday = 6;
}
message TodoRequest{
string todo = 1;
Week week = 2;
map <string,string> bookMap = 3; // BookMap map[string]string
google.protobuf.Timestamp doneTime = 4;
}
message TodoResponse {
bool done = 1;
} srever/server.go:
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"net"
"reflect"
pb2 "test/pb"
"time"
)
//自定义拦截器
//
func MyInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
//从context 获取数据
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
fmt.Println("拦截器 :metadata nil")
return nil, status.Error(codes.Unauthenticated, "认证失败[metadata]")
}
token, ok := md["token"]
if !ok {
fmt.Println("拦截器 :token nil")
return nil, status.Error(codes.Unauthenticated, "认证失败[token]")
}
servertoken := []string{"123456789"}
if !reflect.DeepEqual(servertoken, token) {
return nil, status.Error(codes.Unauthenticated, "密码错误[token]")
}
//fmt.Println("token:", token)
now := time.Now()
resp, err = handler(ctx, req) //前面的先执行; 下面的最后执行类似gin 中 next() 中间件
fmt.Println("req:", req)
lastTime := time.Now().Sub(now)
fmt.Println("执行时间:", lastTime.Milliseconds())
return
}
type TodoInfo struct {
}
//拦截器handle函数
func (t *TodoInfo) DoWork(ctx context.Context, td_req *pb2.TodoRequest) (*pb2.TodoResponse, error) {
//接受context 参数
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
fmt.Println("metadata false")
}
for k, v := range md {
fmt.Println("k:", k, "== v:", v)
}
fmt.Println(td_req.Todo)
fmt.Println(td_req.Week)
fmt.Println(td_req.BookMap)
return &pb2.TodoResponse{
Done: true,
}, nil
}
func main() {
//注册拦截器
serviceOption := grpc.UnaryInterceptor(MyInterceptor)
//实例化grpc
grpcServer := grpc.NewServer(serviceOption)
//注册服务
pb2.RegisterToDoServiceServer(grpcServer, &TodoInfo{})
listen, err := net.Listen("tcp", "127.0.0.1:8081")
if err != nil {
panic(err)
}
err = grpcServer.Serve(listen)
if err != nil {
println(err)
}
} client.go:
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
pb2 "test/pb"
)
//继承PerRPCCredentials 实现认证
//重写GetRequestMetadata 和 RequireTransportSecurity
type MyCredentials struct {
}
func (c *MyCredentials) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
return map[string]string{
"token": "123456897",
}, nil
}
func (c *MyCredentials) RequireTransportSecurity() bool {
return false
}
func main() {
//拦截器
//clientInterceptor := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error{
// md2 := metadata.Pairs("token","99663322poiopui")
// ctx = metadata.NewOutgoingContext(context.Background(), md2)
//
// now := time.Now()
// err := invoker(ctx, method, req, reply, cc, opts ...)
// lastTime := time.Now().Sub(now)
// fmt.Println("客户端执行时间:",lastTime.Milliseconds())
// return err
//}
//opt := grpc.WithUnaryInterceptor(clientInterceptor)
//认证
opt := grpc.WithPerRPCCredentials(&MyCredentials{})
conn, err := grpc.Dial("127.0.0.1:8081", grpc.WithInsecure(), opt)
defer conn.Close()
if err != nil {
panic(err)
}
//实例化 ToDo 服务 client := pb2.NewToDoServiceClient(conn)
md1 := metadata.New(map[string]string{
"name": "hello grpc",
})
//从过context 传递参数
ctx := metadata.NewOutgoingContext(context.Background(), md1)
//调用服务端DoWork方法
rep, err := client.DoWork(ctx, &pb2.TodoRequest{
Todo: "拦截器",
Week: pb2.Week_Monday,
BookMap: map[string]string{
"age": "20",
},
//DoneTime: time.Now(),
})
if err != nil {
panic(err)
}
fmt.Println("rep:", rep)
} 
美团成长空间 2641人发布
查看9道真题和解析