当前位置: 首页 > news >正文

gRPC-4种通信模式

4种通信模式

1、简单模式(Simple RPC)

简单模式:也称简单 RPC,即客户端发起一次请求,服务端响应处理后返回一个结果给客户端。

在 proto 文件中可如下定义:

rpc SayHello(HelloRequest) returns (HelloResponse);
2、服务端数据流模式(Server-side streaming RPC)

服务端数据流模式:也称服务端流式 RPC,即客户端发起一次请求,服务端可以连续返回数据流。
比如:客户端向服务端发送了一个查询数据库的请求,服务端持续返回多次结果。(即客户端发送一次请求,服务端查询到数据库有一万条数据,服务端分批返回10次,每次返回1000条数据给客户端)。

在 proto 文件中可如下定义:

rpc LotsOfReplies(HelloRequest) returns (stream HelloResponse);

注意:在返回值前面加了stream

3、客户端数据流模式(Client-side streaming RPC)

客户端数据流模式:也称客户端流式 RPC,与服务端数据流模式相反,客户端持续向服务端发送数据流,在发送结束后,由服务端返回一个响应。
比如:客户端有一万条数据 ,分批多次请求服务端,服务端接收后把这些数据都存到数据库,然后返回一次结果给客户端。

在 proto 文件中可如下定义:

rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse);

注意:在参数前面加了stream

4、双向数据流模式(Bidirectional streaming RPC)

双向数据流模式:也称双向流式 RPC,即客户端和服务端都可以向对方多次收发数据。

比如:客户端有一万条数据 ,分批多次请求服务端,服务端每次接收后存到数据库后都发送一次结果给客户端。

在 proto 文件中可如下定义:

rpc BidiHello(stream HelloRequest) returns (stream HelloResponse);

注意:参数和返回前面都加了stream

代码示例

1、简单模式

product.proto

syntax = "proto3";option java_multiple_files = true;
option java_package = "com.github.xjs.grpcapi";
option java_outer_classname = "ProductProto";package product;service ProductInfo {rpc addProduct (Product) returns (ProductId);rpc getProduct(ProductId) returns(Product);
}message Product {string id = 1;string name=2;string description=3;float price=4;
}message ProductId {string value = 1;
}

客户端:

public static void main(String[] args) {ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50050).usePlaintext().build();ProductInfoGrpc.ProductInfoBlockingStub stub = ProductInfoGrpc.newBlockingStub(channel);Product p = Product.newBuilder().setId("1").setPrice(100).setName("21天精通Java").setDescription("21天精通Java").build();ProductId productId = stub.addProduct(p);System.out.println("productId.getValue() = " + productId.getValue());Product product = stub.getProduct(ProductId.newBuilder().setValue("99999").build());System.out.println("product.getName() = " + product.getName());channel.shutdown();
}

服务端:

public class ProductInfoImpl extends ProductInfoGrpc.ProductInfoImplBase {@Overridepublic void addProduct(Product request, StreamObserver<ProductId> responseObserver) {// System.out.println("request.toString() = " + request.toString());System.out.println(TextFormat.printer().escapingNonAscii(false).printToString(request));ProductId productId = ProductId.newBuilder().setValue(request.getId()).build();responseObserver.onNext(productId);responseObserver.onCompleted();}@Overridepublic void getProduct(ProductId request, StreamObserver<Product> responseObserver) {System.out.println(TextFormat.printer().escapingNonAscii(false).printToString(request));Product product = Product.newBuilder().setId(request.getValue()).setName("三国演义").build();responseObserver.onNext(product);responseObserver.onCompleted();}
}
2、服务端数据流模式(Server-side streaming RPC)

product.proto

syntax = "proto3";option java_multiple_files = true;
option java_package = "com.github.xjs.grpcapi.clientstream";
option java_outer_classname = "ProductProto";package product.clientstream;service ProductInfo {rpc getProductBatch (ProductGetBatchRequest) returns (stream Product);
}message ProductGetBatchRequest {int32 count = 10;}message Product {string id = 1;string name=2;string description=3;float price=4;
}

客户端:

public static void main(String[] args) throws Exception {CountDownLatch countDownLatch = new CountDownLatch(1);ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50050).usePlaintext().build();ProductInfoGrpc.ProductInfoStub stub = ProductInfoGrpc.newStub(channel);// 等待接收服务端的响应StreamObserver<Product> responseObserver = new StreamObserver<Product>() {@Overridepublic void onNext(Product result) {System.out.println("服务端返回:" + result.toString());}@Overridepublic void onError(Throwable throwable) {throwable.printStackTrace();}@Overridepublic void onCompleted() {System.out.println("服务端响应完成");// 关闭channelchannel.shutdown();// 结束程序countDownLatch.countDown();}};ProductGetBatchRequest request = ProductGetBatchRequest.newBuilder().setCount(10).build();stub.getProductBatch(request, responseObserver);// 等待结束countDownLatch.await();}

服务端:

@Override
public void getProductBatch(ProductGetBatchRequest request, StreamObserver<Product> responseObserver) {int count = request.getCount();for(int i=0; i<count; i++){Product product = Product.newBuilder().setId(""+(1+1)).setName("product" + i) .setPrice(100+i).build();responseObserver.onNext(product);System.out.println("发送数据:" + product);}responseObserver.onCompleted();System.out.println("发送完成");
}
3、客户端数据流模式(Client-side streaming RPC)

product.proto

syntax = "proto3";option java_multiple_files = true;
option java_package = "com.github.xjs.grpcapi.clientstream";
option java_outer_classname = "ProductProto";package product.clientstream;service ProductInfo {rpc addProductBatch (stream Product) returns (ProductAddResult);
}message Product {string id = 1;string name=2;string description=3;float price=4;
}message ProductAddResult {int32 count = 1;
}

客户端:

public static void main(String[] args) throws Exception {CountDownLatch countDownLatch = new CountDownLatch(1);ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50050).usePlaintext().build();ProductInfoGrpc.ProductInfoStub stub = ProductInfoGrpc.newStub(channel);// 等待接收服务端的响应StreamObserver<ProductAddResult> responseObserver = new StreamObserver<ProductAddResult>() {@Overridepublic void onNext(ProductAddResult result) {System.out.println("服务端返回:" + result.toString());}@Overridepublic void onError(Throwable throwable) {throwable.printStackTrace();}@Overridepublic void onCompleted() {System.out.println("服务端响应完成");// 关闭channelchannel.shutdown();// 结束程序countDownLatch.countDown();}};StreamObserver<Product> requestObserver = stub.addProductBatch(responseObserver);for(int i=0;i<10;i++){Product p = Product.newBuilder().setId("" + (i+1)).setPrice(100).setName("21天精通Java").setDescription("21天精通Java").build();requestObserver.onNext(p);}requestObserver.onCompleted();// 等待结束countDownLatch.await();
}

服务端:

public io.grpc.stub.StreamObserver<Product> addProductBatch(io.grpc.stub.StreamObserver<ProductAddResult> responseObserver) {return new StreamObserver<Product>() {List<Product> products = new ArrayList<>();@Overridepublic void onNext(Product product) {// 接收客户端请求System.out.println(TextFormat.printer().escapingNonAscii(false).printToString(product));products.add(product);}@Overridepublic void onError(Throwable throwable) {// 错误处理throwable.printStackTrace();}@Overridepublic void onCompleted() {// 客户端请求结束,发送响应ProductAddResult result = ProductAddResult.newBuilder().setCount(products.size()).build();responseObserver.onNext(result);responseObserver.onCompleted();System.out.println("服务端响应结束");}};
}
4、双向数据流模式(Bidirectional streaming RPC)

product.proto

syntax = "proto3";option java_multiple_files = true;
option java_package = "com.github.xjs.grpcapi.bidirectionalstream";
option java_outer_classname = "ProductProto";package product.clientstream;service ProductInfo {rpc saveProductBatch (stream Product) returns (stream ProductSaveResult);
}message ProductSaveResult {bool success = 1;
}message Product {string id = 1;string name=2;string description=3;float price=4;
}

客户端:

public static void main(String[] args) throws Exception {CountDownLatch countDownLatch = new CountDownLatch(1);ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50050).usePlaintext().build();ProductInfoGrpc.ProductInfoStub stub = ProductInfoGrpc.newStub(channel);// 等待接收服务端的响应StreamObserver<ProductSaveResult> responseObserver = new StreamObserver<ProductSaveResult>() {@Overridepublic void onNext(ProductSaveResult result) {System.out.println("服务端返回:" + result.toString());}@Overridepublic void onError(Throwable throwable) {throwable.printStackTrace();}@Overridepublic void onCompleted() {System.out.println("服务端响应完成");// 关闭channelchannel.shutdown();// 结束程序countDownLatch.countDown();}};StreamObserver<Product> requestObserver = stub.saveProductBatch(responseObserver);for(int i=0; i<10; i++){Product p = Product.newBuilder().setId(""+(i+1)).setName("product"+i).setPrice(100+i).build();requestObserver.onNext(p);System.out.println("客户端发送:" + p.toString());}requestObserver.onCompleted();System.out.println("客户端发送完成");// 等待结束countDownLatch.await();
}

服务端:

@Override
public StreamObserver<Product> saveProductBatch(StreamObserver<ProductSaveResult> responseObserver) {return new StreamObserver<Product>() {@Overridepublic void onNext(Product product) {System.out.println("收到客户端请求:" + product);ProductSaveResult result = ProductSaveResult.newBuilder().setSuccess(true).build();System.out.println("发送响应");responseObserver.onNext(result);}@Overridepublic void onError(Throwable throwable) {throwable.printStackTrace();}@Overridepublic void onCompleted() {System.out.println("客户端请求完成");responseObserver.onCompleted();System.out.println("服务端响应完成");}};
}

完成的源码下载:https://github.com/xjs1919/learning-demo/tree/master/grpc-demo


http://www.mrgr.cn/news/65305.html

相关文章:

  • 编写第一个 Appium 测试脚本:从安装到运行!
  • Docker篇(基础命令)
  • Web组件之 Listener (监听器)
  • 基于MATLAB DCT域图像水印技术
  • three.js 智慧城市扫光效果
  • 终于弄懂了Python中元组与列表的区别
  • ChatGPT:真如吹的那般神乎其神吗?
  • pdf 添加页眉页脚,获取前五页
  • SpringBoot新闻稿件管理系统:架构与实现
  • 编程模拟生产者和消费者问题(java)
  • Qt6 CMake 中引入 Qt Linguist 翻译功能
  • LeetCode每日一题633---平方数之和
  • runner,hook介绍
  • 在Java中如何创建一个类和对象?
  • Chromium127编译指南 Mac篇(一)- 环境准备详解
  • [实战-11] FlinkSql 设置时区对TIMESTAMP和TIMESTAMP_LTZ的影响
  • 每日一问:什么是SQL注入?注入方式和预防方法有哪些?
  • 100种算法【Python版】第35篇——PageRank算法
  • Java中的排序
  • 爱普生SG-8101CA可编程晶振应用在工业自动化机器人
  • 从0开始学习Linux——文本编辑器
  • java动态导入导出excel,javassist动态创建类
  • C/C++ stackful 有栈协同程式的一些缺点。
  • django电商易购系统-计算机设计毕业源码61059
  • JAVA通过AOP自定义注解记录日志
  • 100种算法【Python版】第38篇—— Tarjan算法