1. 概述

gRPC 是一个用于进程间远程过程调用(RPC)的高性能平台。它采用客户端-服务器模型,支持主流编程语言。关于 gRPC 的基础概念,可以参考我们的 gRPC 入门指南

本教程重点介绍 gRPC 的流式传输特性。流式传输允许服务器和客户端之间复用多个消息,实现高效灵活的进程间通信。

2. gRPC 流式传输基础

gRPC 基于 HTTP/2 协议进行服务间通信。HTTP/2 的关键优势之一是支持流(stream)——每个流可在单个连接上复用多个双向消息。

gRPC 支持三种流式调用类型:

  1. 服务端流式 RPC:客户端发送单个请求,服务器返回一系列顺序读取的消息
  2. 客户端流式 RPC:客户端发送消息序列,服务器处理完毕后返回单个响应
  3. 双向流式 RPC:客户端和服务器可互相发送多组消息,消息按发送顺序接收,但处理顺序可自由控制

我们将通过一个股票信息交换的示例演示这三种调用方式。

3. 服务定义

使用 stock_quote.proto 定义服务接口和消息结构:

service StockQuoteProvider {
  
  rpc serverSideStreamingGetListStockQuotes(Stock) returns (stream StockQuote) {}

  rpc clientSideStreamingGetStatisticsOfStocks(stream Stock) returns (StockQuote) {}
  
  rpc bidirectionalStreamingGetListsStockQuotes(stream Stock) returns (stream StockQuote) {}
}
message Stock {
   string ticker_symbol = 1;
   string company_name = 2;
   string description = 3;
}
message StockQuote {
   double price = 1;
   int32 offer_number = 2;
   string description = 3;
}

StockQuoteProvider 服务包含三个支持流式传输的方法。客户端通过 Stock 消息查询,服务器用 StockQuote 消息响应。

通过 pom.xml 中的 protobuf-maven-plugin 可从该 IDL 文件生成 Java 代码:

  • 客户端存根和服务器代码生成在 target/generated-sources/protobuf/java*/grpc-java 目录
  • 后续实现将基于这些生成的代码

4. 服务器实现

StockServer 构造函数使用 gRPC Server 监听和分发请求:

public class StockServer {
    private int port;
    private io.grpc.Server server;

    public StockServer(int port) throws IOException {
        this.port = port;
        server = ServerBuilder.forPort(port)
          .addService(new StockService())
          .build();
    }
    //...
}

StockService 继承自 StockQuoteProviderImplBase(由 protobuf 插件生成),需覆盖三个流式服务方法:

4.1. 服务端流式传输

客户端发送单个报价请求,服务器返回多个不同价格的响应:

@Override
public void serverSideStreamingGetListStockQuotes(Stock request, StreamObserver<StockQuote> responseObserver) {
    for (int i = 1; i <= 5; i++) {
        StockQuote stockQuote = StockQuote.newBuilder()
          .setPrice(fetchStockPriceBid(request))
          .setOfferNumber(i)
          .setDescription("Price for stock:" + request.getTickerSymbol())
          .build();
        responseObserver.onNext(stockQuote);
    }
    responseObserver.onCompleted();
}

关键点:

  • 通过 responseObserver.onNext() 发送每个报价
  • responseObserver.onCompleted() 标记 RPC 结束

4.2. 客户端流式传输

客户端发送多只股票,服务器返回单个统计结果:

@Override
public StreamObserver<Stock> clientSideStreamingGetStatisticsOfStocks(StreamObserver<StockQuote> responseObserver) {
    return new StreamObserver<Stock>() {
        int count;
        double price = 0.0;
        StringBuffer sb = new StringBuffer();

        @Override
        public void onNext(Stock stock) {
            count++;
            price = +fetchStockPriceBid(stock);
            sb.append(":")
                .append(stock.getTickerSymbol());
        }

        @Override
        public void onCompleted() {
            responseObserver.onNext(StockQuote.newBuilder()
                .setPrice(price / count)
                .setDescription("Statistics-" + sb.toString())
                .build());
            responseObserver.onCompleted();
        }

        // handle onError() ...
    };
}

实现逻辑:

  1. 通过 onNext() 累计处理客户端消息
  2. 客户端完成发送后(onCompleted()),计算平均价格并返回
  3. 需覆盖 onError() 处理异常终止

4.3. 双向流式传输

客户端发送多只股票,服务器为每只股票返回多个报价:

@Override
public StreamObserver<Stock> bidirectionalStreamingGetListsStockQuotes(StreamObserver<StockQuote> responseObserver) {
    return new StreamObserver<Stock>() {
        @Override
        public void onNext(Stock request) {
            for (int i = 1; i <= 5; i++) {
                StockQuote stockQuote = StockQuote.newBuilder()
                  .setPrice(fetchStockPriceBid(request))
                  .setOfferNumber(i)
                  .setDescription("Price for stock:" + request.getTickerSymbol())
                  .build();
                responseObserver.onNext(stockQuote);
            }
        }

        @Override
        public void onCompleted() {
            responseObserver.onCompleted();
        }

        //handle OnError() ...
    };
}

核心区别:

  • 收到每条消息后立即响应(无需等待全部消息)
  • 响应顺序与接收顺序一致,但可自由调整响应顺序

5. 客户端实现

StockClient 构造函数初始化 gRPC 生成的存根类:

public class StockClient {
    private StockQuoteProviderBlockingStub blockingStub;
    private StockQuoteProviderStub nonBlockingStub;

    public StockClient(Channel channel) {
        blockingStub = StockQuoteProviderGrpc.newBlockingStub(channel);
        nonBlockingStub = StockQuoteProviderGrpc.newStub(channel);
    }
    // ...
}

两种存根类型:

  • StockQuoteProviderBlockingStub:同步调用
  • StockQuoteProviderStub:异步调用

5.1. 服务端流式传输的客户端调用

客户端发送单个请求,获取股票报价列表:

public void serverSideStreamingListOfStockPrices() {
    Stock request = Stock.newBuilder()
      .setTickerSymbol("AU")
      .setCompanyName("Austich")
      .setDescription("server streaming example")
      .build();
    Iterator<StockQuote> stockQuotes;
    try {
        logInfo("REQUEST - ticker symbol {0}", request.getTickerSymbol());
        stockQuotes = blockingStub.serverSideStreamingGetListStockQuotes(request);
        for (int i = 1; stockQuotes.hasNext(); i++) {
            StockQuote stockQuote = stockQuotes.next();
            logInfo("RESPONSE - Price #" + i + ": {0}", stockQuote.getPrice());
        }
    } catch (StatusRuntimeException e) {
        logInfo("RPC failed: {0}", e.getStatus());
    }
}

关键点:

  • 使用阻塞存根发起同步请求
  • 通过 Iterator 遍历返回的 StockQuotes

5.2. 客户端流式传输的客户端调用

客户端发送股票流,获取统计结果:

public void clientSideStreamingGetStatisticsOfStocks() throws InterruptedException {
    StreamObserver<StockQuote> responseObserver = new StreamObserver<StockQuote>() {
        @Override
        public void onNext(StockQuote summary) {
            logInfo("RESPONSE, got stock statistics - Average Price: {0}, description: {1}", summary.getPrice(), summary.getDescription());
        }

        @Override
        public void onCompleted() {
            logInfo("Finished clientSideStreamingGetStatisticsOfStocks");
        }

        // Override OnError ...
    };
    
    StreamObserver<Stock> requestObserver = nonBlockingStub.clientSideStreamingGetStatisticsOfStocks(responseObserver);
    try {
        for (Stock stock : stocks) {
            logInfo("REQUEST: {0}, {1}", stock.getTickerSymbol(), stock.getCompanyName());
            requestObserver.onNext(stock);
        }
    } catch (RuntimeException e) {
        requestObserver.onError(e);
        throw e;
    }
    requestObserver.onCompleted();
}

实现要点:

  1. 创建 responseObserver 处理服务器响应
  2. 使用非阻塞存根获取 requestObserver 发送消息流
  3. 发送完成后调用 onCompleted()

5.3. 双向流式传输的客户端调用

客户端发送股票流,获取每只股票的多条报价:

public void bidirectionalStreamingGetListsStockQuotes() throws InterruptedException{
    StreamObserver<StockQuote> responseObserver = new StreamObserver<StockQuote>() {
        @Override
        public void onNext(StockQuote stockQuote) {
            logInfo("RESPONSE price#{0} : {1}, description:{2}", stockQuote.getOfferNumber(), stockQuote.getPrice(), stockQuote.getDescription());
        }

        @Override
        public void onCompleted() {
            logInfo("Finished bidirectionalStreamingGetListsStockQuotes");
        }

        //Override onError() ...
    };
    
    StreamObserver<Stock> requestObserver = nonBlockingStub.bidirectionalStreamingGetListsStockQuotes(responseObserver);
    try {
        for (Stock stock : stocks) {
            logInfo("REQUEST: {0}, {1}", stock.getTickerSymbol(), stock.getCompanyName());
            requestObserver.onNext(stock);
            Thread.sleep(200);
        }
    } catch (RuntimeException e) {
        requestObserver.onError(e);
        throw e;
    }
    requestObserver.onCompleted();
}

与客户端流式传输的主要区别:

  • 响应是多条消息而非单条
  • 响应顺序与请求顺序独立(可乱序到达)
  • 添加 Thread.sleep(200) 模拟真实场景的请求间隔

6. 运行服务器和客户端

使用 Maven 编译后,打开两个命令窗口:

启动服务器:

mvn exec:java -Dexec.mainClass=com.baeldung.grpc.streaming.StockServer

启动客户端:

mvn exec:java -Dexec.mainClass=com.baeldung.grpc.streaming.StockClient

7. 总结

本文深入探讨了 gRPC 流式传输的实现:

  • 流式传输通过单连接复用多组消息,显著提升通信效率
  • 消息按发送顺序接收,但处理顺序可自由控制
  • 三种流式模式覆盖了不同业务场景需求

完整示例代码可在 GitHub 获取。实际开发中,需特别注意流的生命周期管理和错误处理,避免资源泄漏。


原始标题:Streaming with gRPC in Java