gRPC拦截器
做过web开发的都知道拦截器,拦截器就是一个做事先或事后处理的一个机制,使得琐碎的其他业务得到自动化式的进行。在服务端和客户端,他们的拦截器的使用目的不同。
客户端:我们可能会在请求发送之前,对请求的内容做进一步的补充,比如:添加认证信息(如 JWT 令牌)到请求头中,这样服务端在接收到请求后,就可以根据请求头中的认证信息来验证客户端的身份,从而实现安全的服务访问。日志记录、服务发现等
服务端:拦截请求,对请求的数据校验?认证信息身份校验等等
下面,我们将看看在 gRPC 中,拦截器的使用:
简单的gRPC的一元调用
syntax = "proto3";option go_package = ".;protos";message User {string username = 1;string password = 2;
}message Msg {map<string, string> data = 1;
}message UserRequest {Msg msg = 1;
}message UserResponse {Msg msg = 1;
}service UserService {rpc Regist(UserRequest) returns (UserResponse);rpc Login(UserRequest) returns (UserResponse);
}
package mainimport ("context""fmt""mini/intercepter/protos""net""google.golang.org/grpc"
)// 介绍 gRPC 的拦截器的简单用法type UserService struct {protos.UnimplementedUserServiceServer
}func (*UserService) Regist(ctx context.Context, req *protos.UserRequest) (*protos.UserResponse, error) {user := protos.User{Username: req.Msg.GetData()["username"], Password:req.Msg.GetData()["password"] }_ ,err:= fmt.Println("regist:",user.Username,"-->",user.Password)if err != nil {return nil, err}return &protos.UserResponse{Msg: req.Msg}, nil
}func (*UserService) Login(ctx context.Context, req *protos.UserRequest) (*protos.UserResponse, error) {user := protos.User{Username: req.Msg.GetData()["username"], Password:req.Msg.GetData()["password"] }_ ,err:= fmt.Println("login:",user.Username,"-->",user.Password)if err != nil {return nil, err}return &protos.UserResponse{Msg: req.Msg}, nil
}func main(){listen, err := net.Listen("tcp", "127.0.0.1:8080")if err != nil {panic(err)}g := grpc.NewServer()//g.RegisterService(&protos.UserService_ServiceDesc, &UserService{})protos.RegisterUserServiceServer(g, new(UserService))g.Serve(listen)
}
package mainimport ("context""mini/intercepter/protos""google.golang.org/grpc""google.golang.org/grpc/credentials/insecure"
)func main() {dial, err:= grpc.Dial("127.0.0.1:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))if err != nil {panic(err)}client := protos.NewUserServiceClient(dial)data := map[string]string{"username" : "admin","password" : "12345",}resp, err := client.Login(context.Background(), &protos.UserRequest{Msg: &protos.Msg{Data: data}})if err != nil {panic(err)}println(resp.Msg.Data["username"],"--->",resp.Msg.Data["password"])
}
服务端的拦截器:
package mainimport ("context""fmt""mini/intercepter/protos""net""google.golang.org/grpc"
)// 介绍 gRPC 的拦截器的简单用法type UserService struct {protos.UnimplementedUserServiceServer
}func (*UserService) Regist(ctx context.Context, req *protos.UserRequest) (*protos.UserResponse, error) {user := protos.User{Username: req.Msg.GetData()["username"], Password:req.Msg.GetData()["password"] }_ ,err:= fmt.Println("regist:",user.Username,"-->",user.Password)if err != nil {return nil, err}return &protos.UserResponse{Msg: req.Msg}, nil
}func (*UserService) Login(ctx context.Context, req *protos.UserRequest) (*protos.UserResponse, error) {user := protos.User{Username: req.Msg.GetData()["username"], Password:req.Msg.GetData()["password"] }_ ,err:= fmt.Println("login:",user.Username,"-->",user.Password)if err != nil {return nil, err}return &protos.UserResponse{Msg: req.Msg}, nil
}// 服务端的拦截器:
/*
参数:之前的上下文、请求、元数据、处理函数
返回值:响应(元数据)、错误
*/
func Login_Intercepter(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error){// 其他要求:1. 认证 2. 日志 3. 限流 4. 监控 。。。fmt.Println("服务端登录拦截器来了~~~")// 拦截器处理data, err := Login_Intercepter_Handler(ctx, req)if err != nil {return nil, err}fmt.Println(data)// 继续处理请求resp, err = handler(ctx, req)// ... 其他处理return resp, err
}func Login_Intercepter_Handler(ctx context.Context, req any) (any, error){data := req.(*protos.UserRequest).Msg.Dataif data["username"] == "admin" && data["password"] == "123456" {return "通过拦截器", nil}return nil, fmt.Errorf("用户名或密码错误")
}func main(){listen, err := net.Listen("tcp", "127.0.0.1:8080")if err != nil {panic(err)}var opts []grpc.ServerOption// 注册单个 拦截器opt := grpc.UnaryInterceptor(Login_Intercepter)// 将拦截器添加到服务端的拦截器集合中opts = append(opts, opt)// 创建服务端,并添加 拦截器g := grpc.NewServer(opts...)//g.RegisterService(&protos.UserService_ServiceDesc, &UserService{})protos.RegisterUserServiceServer(g, new(UserService))g.Serve(listen)
}
客户端拦截器
package mainimport ("context""fmt""mini/intercepter/protos""time""google.golang.org/grpc""google.golang.org/grpc/credentials/insecure"
)// 客户端登录拦截器
func Login_Intercepter (ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error{// ... 其他处理 : 增加日志、统计、需求等// 假设:我要统计 调用时间start := time.Now()err := invoker(ctx, method, req, reply, cc, opts...)fmt.Printf("method=%s req=%v rep=%v duration=%s error=%v\n", method, req, reply, time.Since(start), err)return err
}
func main() {// 增加客户端 拦截器 : 实现 grpc.UnaryClientInterceptorvar opts []grpc.DialOptionopt := grpc.WithUnaryInterceptor(Login_Intercepter)opts = append(opts, opt)// grpc.WithTransportCredentials函数用于为 gRPC 客户端或服务端设置传输层安全凭证,但是:这里传输了一个不安全的凭证opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))dial, err:= grpc.Dial("127.0.0.1:8080",opts...)if err != nil {panic(err)}client := protos.NewUserServiceClient(dial)data := map[string]string{"username" : "admin","password" : "123456",}resp, err := client.Login(context.Background(), &protos.UserRequest{Msg: &protos.Msg{Data: data}})if err != nil {panic(err)}println(resp.Msg.Data["username"],"--->",resp.Msg.Data["password"])
}
附加:通过拦截器和metadata实现grpc的auth认证
通过使用metadata和拦截器做到对go代码的非侵入式的功能扩展:(直接作用于拦截器):下面为了可以整体使用,所以全部粘贴了。然后核心部分用两条--------分割好了。
package mainimport ("context""fmt""mini/grpc_auth/protos""net""google.golang.org/grpc"// 注意:导包正确"google.golang.org/grpc/metadata"//--------------------------------------------------------------------------
)// 介绍 gRPC 的拦截器的简单用法type UserService struct {protos.UnimplementedUserServiceServer
}func (*UserService) Regist(ctx context.Context, req *protos.UserRequest) (*protos.UserResponse, error) {user := protos.User{Username: req.Msg.GetData()["username"], Password:req.Msg.GetData()["password"] }_ ,err:= fmt.Println("regist:",user.Username,"-->",user.Password)if err != nil {return nil, err}return &protos.UserResponse{Msg: req.Msg}, nil
}func (*UserService) Login(ctx context.Context, req *protos.UserRequest) (*protos.UserResponse, error) {user := protos.User{Username: req.Msg.GetData()["username"], Password:req.Msg.GetData()["password"] }_ ,err:= fmt.Println("login:",user.Username,"-->",user.Password)if err != nil {return nil, err}return &protos.UserResponse{Msg: req.Msg}, nil
}func Login_Intercepter(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error){ fmt.Println("服务端登录拦截器来了~~~")data, err := Login_Intercepter_Handler(ctx, req)if err != nil {return nil, err}fmt.Println(data)resp, err = handler(ctx, req)return resp, err
}//--------------------------------------------------------------------------
func Token_Intercepter(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error){fmt.Println("服务端token拦截器来了~~~")md, ok := metadata.FromIncomingContext(ctx)if ok && md.Get("token")[0] == "123456"{resp,err = handler(ctx, req)return resp, err//return handler(ctx, req)}return nil, fmt.Errorf("token错误")
}
//--------------------------------------------------------------------------func Login_Intercepter_Handler(ctx context.Context, req any) (any, error){data := req.(*protos.UserRequest).Msg.Dataif data["username"] == "admin" && data["password"] == "123456" {return "通过拦截器", nil}return nil, fmt.Errorf("用户名或密码错误")
}func main(){listen, err := net.Listen("tcp", "127.0.0.1:8080")if err != nil {panic(err)}var opts []grpc.ServerOption
//--------------------------------------------------------------------------//在同一个 grpc 服务器实例上不能重复设置一元拦截器,需要按照之前介绍的方法将它们合理组合成一个总的拦截器后再添加到 opts 中combineInterceptor := func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler)(resp interface{}, err error){resp, err = Login_Intercepter(ctx, req, info, handler)if err != nil {return resp, err}return Token_Intercepter(ctx, req, info, handler)// 第三个也一样 。。。} // 链式 调用 组合拦截器 : 适合简单的顺序执行// 注册 拦截器opt := grpc.UnaryInterceptor(combineInterceptor)// 或者直接这样:创建一个拦截器调用链//opt := grpc.ChainUnaryInterceptor(Login_Intercepter, Token_Intercepter)//--------------------------------------------------------------------------// 追加opts = append(opts, opt)g := grpc.NewServer(opts...)g.RegisterService(&protos.UserService_ServiceDesc, &UserService{}) // 更底层、更灵活//protos.RegisterUserServiceServer(g, new(UserService)) // 更简单、更方便g.Serve(listen)
}
package mainimport ("context""fmt""mini/grpc_auth/protos""time""google.golang.org/grpc""google.golang.org/grpc/credentials/insecure"//--------------------------------------------------------------------------"google.golang.org/grpc/metadata"//--------------------------------------------------------------------------
)func Login_Intercepter (ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error{start := time.Now()err := invoker(ctx, method, req, reply, cc, opts...)fmt.Printf("method=%s req=%v rep=%v duration=%s error=%v\n", method, req, reply, time.Since(start), err)return err
}
func main() {var opts []grpc.DialOptionopt := grpc.WithUnaryInterceptor(Login_Intercepter)opts = append(opts, opt)opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))dial, err:= grpc.Dial("127.0.0.1:8080",opts...)if err != nil {panic(err)}client := protos.NewUserServiceClient(dial)data := map[string]string{"username" : "admin","password" : "123456",}
//--------------------------------------------------------------------------// 将 metadata 添加到 context 中md := metadata.New(map[string]string{"token": "123456",})ctx := metadata.NewOutgoingContext(context.Background(), md)
//--------------------------------------------------------------------------resp, err := client.Login(ctx, &protos.UserRequest{Msg: &protos.Msg{Data: data}})if err != nil {panic(err)}println(resp.Msg.Data["username"],"--->",resp.Msg.Data["password"])
}