1. 概述
gRPC 是一个用于进程间远程过程调用(RPC)的高性能平台。它采用客户端-服务器模型,支持主流编程语言。关于 gRPC 的基础概念,可以参考我们的 gRPC 入门指南。
本教程重点介绍 gRPC 的流式传输特性。流式传输允许服务器和客户端之间复用多个消息,实现高效灵活的进程间通信。
2. gRPC 流式传输基础
gRPC 基于 HTTP/2 协议进行服务间通信。HTTP/2 的关键优势之一是支持流(stream)——每个流可在单个连接上复用多个双向消息。
gRPC 支持三种流式调用类型:
- 服务端流式 RPC:客户端发送单个请求,服务器返回一系列顺序读取的消息
- 客户端流式 RPC:客户端发送消息序列,服务器处理完毕后返回单个响应
- 双向流式 RPC:客户端和服务器可互相发送多组消息,消息按发送顺序接收,但处理顺序可自由控制
我们将通过一个股票信息交换的示例演示这三种调用方式。
3. 服务定义
使用 stock_quote.proto
定义服务接口和消息结构:
service StockQuoteProvider {
rpc serverSideStreamingGetListStockQuotes(Stock) returns (stream StockQuote) {}
rpc clientSideStreamingGetStatisticsOfStocks(stream Stock) returns (StockQuote) {}
rpc bidirectionalStreamingGetListsStockQuotes(stream Stock) returns (stream StockQuote) {}
}
message Stock {
string ticker_symbol = 1;
string company_name = 2;
string description = 3;
}
message StockQuote {
double price = 1;
int32 offer_number = 2;
string description = 3;
}
StockQuoteProvider
服务包含三个支持流式传输的方法。客户端通过 Stock
消息查询,服务器用 StockQuote
消息响应。
通过 pom.xml
中的 protobuf-maven-plugin
可从该 IDL 文件生成 Java 代码:
- 客户端存根和服务器代码生成在
target/generated-sources/protobuf/java
和*/grpc-java
目录 - 后续实现将基于这些生成的代码
4. 服务器实现
StockServer
构造函数使用 gRPC Server
监听和分发请求:
public class StockServer {
private int port;
private io.grpc.Server server;
public StockServer(int port) throws IOException {
this.port = port;
server = ServerBuilder.forPort(port)
.addService(new StockService())
.build();
}
//...
}
StockService
继承自 StockQuoteProviderImplBase
(由 protobuf 插件生成),需覆盖三个流式服务方法:
4.1. 服务端流式传输
客户端发送单个报价请求,服务器返回多个不同价格的响应:
@Override
public void serverSideStreamingGetListStockQuotes(Stock request, StreamObserver<StockQuote> responseObserver) {
for (int i = 1; i <= 5; i++) {
StockQuote stockQuote = StockQuote.newBuilder()
.setPrice(fetchStockPriceBid(request))
.setOfferNumber(i)
.setDescription("Price for stock:" + request.getTickerSymbol())
.build();
responseObserver.onNext(stockQuote);
}
responseObserver.onCompleted();
}
关键点:
- 通过
responseObserver.onNext()
发送每个报价 - 用
responseObserver.onCompleted()
标记 RPC 结束
4.2. 客户端流式传输
客户端发送多只股票,服务器返回单个统计结果:
@Override
public StreamObserver<Stock> clientSideStreamingGetStatisticsOfStocks(StreamObserver<StockQuote> responseObserver) {
return new StreamObserver<Stock>() {
int count;
double price = 0.0;
StringBuffer sb = new StringBuffer();
@Override
public void onNext(Stock stock) {
count++;
price = +fetchStockPriceBid(stock);
sb.append(":")
.append(stock.getTickerSymbol());
}
@Override
public void onCompleted() {
responseObserver.onNext(StockQuote.newBuilder()
.setPrice(price / count)
.setDescription("Statistics-" + sb.toString())
.build());
responseObserver.onCompleted();
}
// handle onError() ...
};
}
实现逻辑:
- 通过
onNext()
累计处理客户端消息 - 客户端完成发送后(
onCompleted()
),计算平均价格并返回 - 需覆盖
onError()
处理异常终止
4.3. 双向流式传输
客户端发送多只股票,服务器为每只股票返回多个报价:
@Override
public StreamObserver<Stock> bidirectionalStreamingGetListsStockQuotes(StreamObserver<StockQuote> responseObserver) {
return new StreamObserver<Stock>() {
@Override
public void onNext(Stock request) {
for (int i = 1; i <= 5; i++) {
StockQuote stockQuote = StockQuote.newBuilder()
.setPrice(fetchStockPriceBid(request))
.setOfferNumber(i)
.setDescription("Price for stock:" + request.getTickerSymbol())
.build();
responseObserver.onNext(stockQuote);
}
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
//handle OnError() ...
};
}
核心区别:
- 收到每条消息后立即响应(无需等待全部消息)
- 响应顺序与接收顺序一致,但可自由调整响应顺序
5. 客户端实现
StockClient
构造函数初始化 gRPC 生成的存根类:
public class StockClient {
private StockQuoteProviderBlockingStub blockingStub;
private StockQuoteProviderStub nonBlockingStub;
public StockClient(Channel channel) {
blockingStub = StockQuoteProviderGrpc.newBlockingStub(channel);
nonBlockingStub = StockQuoteProviderGrpc.newStub(channel);
}
// ...
}
两种存根类型:
StockQuoteProviderBlockingStub
:同步调用StockQuoteProviderStub
:异步调用
5.1. 服务端流式传输的客户端调用
客户端发送单个请求,获取股票报价列表:
public void serverSideStreamingListOfStockPrices() {
Stock request = Stock.newBuilder()
.setTickerSymbol("AU")
.setCompanyName("Austich")
.setDescription("server streaming example")
.build();
Iterator<StockQuote> stockQuotes;
try {
logInfo("REQUEST - ticker symbol {0}", request.getTickerSymbol());
stockQuotes = blockingStub.serverSideStreamingGetListStockQuotes(request);
for (int i = 1; stockQuotes.hasNext(); i++) {
StockQuote stockQuote = stockQuotes.next();
logInfo("RESPONSE - Price #" + i + ": {0}", stockQuote.getPrice());
}
} catch (StatusRuntimeException e) {
logInfo("RPC failed: {0}", e.getStatus());
}
}
关键点:
- 使用阻塞存根发起同步请求
- 通过
Iterator
遍历返回的StockQuotes
5.2. 客户端流式传输的客户端调用
客户端发送股票流,获取统计结果:
public void clientSideStreamingGetStatisticsOfStocks() throws InterruptedException {
StreamObserver<StockQuote> responseObserver = new StreamObserver<StockQuote>() {
@Override
public void onNext(StockQuote summary) {
logInfo("RESPONSE, got stock statistics - Average Price: {0}, description: {1}", summary.getPrice(), summary.getDescription());
}
@Override
public void onCompleted() {
logInfo("Finished clientSideStreamingGetStatisticsOfStocks");
}
// Override OnError ...
};
StreamObserver<Stock> requestObserver = nonBlockingStub.clientSideStreamingGetStatisticsOfStocks(responseObserver);
try {
for (Stock stock : stocks) {
logInfo("REQUEST: {0}, {1}", stock.getTickerSymbol(), stock.getCompanyName());
requestObserver.onNext(stock);
}
} catch (RuntimeException e) {
requestObserver.onError(e);
throw e;
}
requestObserver.onCompleted();
}
实现要点:
- 创建
responseObserver
处理服务器响应 - 使用非阻塞存根获取
requestObserver
发送消息流 - 发送完成后调用
onCompleted()
5.3. 双向流式传输的客户端调用
客户端发送股票流,获取每只股票的多条报价:
public void bidirectionalStreamingGetListsStockQuotes() throws InterruptedException{
StreamObserver<StockQuote> responseObserver = new StreamObserver<StockQuote>() {
@Override
public void onNext(StockQuote stockQuote) {
logInfo("RESPONSE price#{0} : {1}, description:{2}", stockQuote.getOfferNumber(), stockQuote.getPrice(), stockQuote.getDescription());
}
@Override
public void onCompleted() {
logInfo("Finished bidirectionalStreamingGetListsStockQuotes");
}
//Override onError() ...
};
StreamObserver<Stock> requestObserver = nonBlockingStub.bidirectionalStreamingGetListsStockQuotes(responseObserver);
try {
for (Stock stock : stocks) {
logInfo("REQUEST: {0}, {1}", stock.getTickerSymbol(), stock.getCompanyName());
requestObserver.onNext(stock);
Thread.sleep(200);
}
} catch (RuntimeException e) {
requestObserver.onError(e);
throw e;
}
requestObserver.onCompleted();
}
与客户端流式传输的主要区别:
- 响应是多条消息而非单条
- 响应顺序与请求顺序独立(可乱序到达)
- 添加
Thread.sleep(200)
模拟真实场景的请求间隔
6. 运行服务器和客户端
使用 Maven 编译后,打开两个命令窗口:
启动服务器:
mvn exec:java -Dexec.mainClass=com.baeldung.grpc.streaming.StockServer
启动客户端:
mvn exec:java -Dexec.mainClass=com.baeldung.grpc.streaming.StockClient
7. 总结
本文深入探讨了 gRPC 流式传输的实现:
- 流式传输通过单连接复用多组消息,显著提升通信效率
- 消息按发送顺序接收,但处理顺序可自由控制
- 三种流式模式覆盖了不同业务场景需求
完整示例代码可在 GitHub 获取。实际开发中,需特别注意流的生命周期管理和错误处理,避免资源泄漏。