1. 概述

gRPC 是一个高性能的进程间远程过程调用(RPC)框架,可在任何环境中运行。本教程将聚焦 Java 中的 gRPC 错误处理机制。gRPC 凭借低延迟和高吞吐量特性,特别适合微服务架构等复杂场景。在这些分布式系统中,理解网络组件的状态、性能和故障至关重要,因此健壮的错误处理实现是监控系统的核心保障。

2. gRPC 错误处理基础

gRPC 中的错误是一等公民,每次调用要么返回正常载荷消息,要么返回状态错误消息

错误通过状态消息编码,并在所有支持语言中统一实现

关键原则:

  • 避免在响应载荷中包含错误信息,应始终使用 StreamObserver::onError,它会自动将错误状态添加到尾部元数据
  • 唯一例外是流式调用场景(后文详述)
  • 所有 gRPC 库都支持官方错误模型

Java 通过 io.grpc.Status 类封装基础错误模型,需提供标准状态码和可选错误描述。此模型独立于数据编码(如 Protobuf/REST),但无法附带详细错误信息

若使用 Protobuf 编码,可采用更丰富的 Google API 错误模型,由 com.google.rpc.Status 类实现:

  • 提供 com.google.rpc.Code 状态码
  • 支持错误描述
  • 支持附加 Protobuf 格式的错误详情
  • 预定义常用错误类型(在 error_details.proto 中):
    // 常用错误类位于 com.google.rpc 包
    RetryInfo, DebugInfo, QuotaFailure, ErrorInfo, 
    PreconditionFailure, BadRequest, RequestInfo, ResourceInfo, Help
    

补充方案:可通过键值对形式向 RPC 元数据添加自定义错误信息。

我们将通过一个商品定价服务演示错误处理:客户端发送商品名称,服务器返回价格信息。

3. 一元 RPC 调用错误处理

定义服务接口(commodity_price.proto):

service CommodityPriceProvider {
    rpc getBestCommodityPrice(Commodity) returns (CommodityQuote) {}
}

message Commodity {
    string access_token = 1;
    string commodity_name = 2;
}

message CommodityQuote {
    string commodity_name = 1;
    string producer_name = 2;
    double price = 3;
}

message ErrorResponse {
    string commodity_name = 1;
    string access_token = 2;
    string expected_token = 3;
    string expected_value = 4;
}

服务要求客户端提供 access_tokencommodity_name,服务器返回包含商品名、生产商和价格的 CommodityQuoteErrorResponse 是自定义错误消息示例。

3.1 使用 io.grpc.Status 响应错误

服务器端实现:

public void getBestCommodityPrice(Commodity request, StreamObserver<CommodityQuote> responseObserver) {

    if (commodityLookupBasePrice.get(request.getCommodityName()) == null) {
 
        Metadata.Key<ErrorResponse> errorResponseKey = ProtoUtils.keyForProto(ErrorResponse.getDefaultInstance());
        ErrorResponse errorResponse = ErrorResponse.newBuilder()
          .setCommodityName(request.getCommodityName())
          .setAccessToken(request.getAccessToken())
          .setExpectedValue("Only Commodity1, Commodity2 are supported")
          .build();
        Metadata metadata = new Metadata();
        metadata.put(errorResponseKey, errorResponse);
        responseObserver.onError(io.grpc.Status.INVALID_ARGUMENT.withDescription("The commodity is not supported")
          .asRuntimeException(metadata));
    } 
    // ...
}

关键步骤:

  1. 构建自定义 ErrorResponse
  2. 创建元数据键值对并添加到 Metadata
  3. 通过 asRuntimeException(metadata)Status 转为 Throwable
  4. 调用 responseObserver.onError 发送错误

客户端测试:

@Test
public void whenUsingInvalidCommodityName_thenReturnExceptionIoRpcStatus() throws Exception {
 
    Commodity request = Commodity.newBuilder()
      .setAccessToken("123validToken")
      .setCommodityName("Commodity5")
      .build();

    StatusRuntimeException thrown = Assertions.assertThrows(StatusRuntimeException.class, () -> blockingStub.getBestCommodityPrice(request));

    assertEquals("INVALID_ARGUMENT", thrown.getStatus().getCode().toString());
    assertEquals("INVALID_ARGUMENT: The commodity is not supported", thrown.getMessage());
    Metadata metadata = Status.trailersFromThrowable(thrown);
    ErrorResponse errorResponse = metadata.get(ProtoUtils.keyForProto(ErrorResponse.getDefaultInstance()));
    assertEquals("Commodity5",errorResponse.getCommodityName());
    assertEquals("123validToken", errorResponse.getAccessToken());
    assertEquals("Only Commodity1, Commodity2 are supported", errorResponse.getExpectedValue());
}

踩坑点:使用 Status.trailersFromThrowable 从异常提取元数据,通过 ProtoUtils.keyForProto 获取自定义错误类型的键。

3.2 使用 com.google.rpc.Status 响应错误

服务器端实现:

public void getBestCommodityPrice(Commodity request, StreamObserver<CommodityQuote> responseObserver) {
    // ...
    if (request.getAccessToken().equals("123validToken") == false) {

        com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
          .setCode(com.google.rpc.Code.NOT_FOUND.getNumber())
          .setMessage("The access token not found")
          .addDetails(Any.pack(ErrorInfo.newBuilder()
            .setReason("Invalid Token")
            .setDomain("com.baeldung.grpc.errorhandling")
            .putMetadata("insertToken", "123validToken")
            .build()))
          .build();
        responseObserver.onError(StatusProto.toStatusRuntimeException(status));
    }
    // ...
}

优势:

  • 支持结构化错误详情(示例使用预定义 ErrorInfo
  • 通过 Any.pack() 序列化错误详情
  • 使用 StatusProto.toStatusRuntimeException 转换为异常

客户端测试:

@Test
public void whenUsingInvalidRequestToken_thenReturnExceptionGoogleRPCStatus() throws Exception {
 
    Commodity request = Commodity.newBuilder()
      .setAccessToken("invalidToken")
      .setCommodityName("Commodity1")
      .build();

    StatusRuntimeException thrown = Assertions.assertThrows(StatusRuntimeException.class,
      () -> blockingStub.getBestCommodityPrice(request));
    com.google.rpc.Status status = StatusProto.fromThrowable(thrown);
    assertNotNull(status);
    assertEquals("NOT_FOUND", Code.forNumber(status.getCode()).toString());
    assertEquals("The access token not found", status.getMessage());
    for (Any any : status.getDetailsList()) {
        if (any.is(ErrorInfo.class)) {
            ErrorInfo errorInfo = any.unpack(ErrorInfo.class);
            assertEquals("Invalid Token", errorInfo.getReason());
            assertEquals("com.baeldung.grpc.errorhandling", errorInfo.getDomain());
            assertEquals("123validToken", errorInfo.getMetadataMap().get("insertToken"));
        }
    }
}

关键技巧:通过 StatusProto.fromThrowable 直接解析异常,遍历 getDetailsList() 提取结构化错误详情。

4. gRPC 流式调用错误处理

gRPC 流支持单次 RPC 中传输多条消息。前述错误处理方式在流式场景中不适用,因为:

  • onError() 必须是 RPC 最后调用的方法
  • 调用后框架会立即切断连接

解决方案:将错误嵌入消息本身。修改 commodity_price.proto

service CommodityPriceProvider {
  
    rpc getBestCommodityPrice(Commodity) returns (CommodityQuote) {}
  
    rpc bidirectionalListOfPrices(stream Commodity) returns (stream StreamingCommodityQuote) {}
}

message Commodity {
    string access_token = 1;
    string commodity_name = 2;
}

message StreamingCommodityQuote{
    oneof message{
        CommodityQuote comodity_quote = 1;
        google.rpc.Status status = 2;
   }   
}

核心设计StreamingCommodityQuote 使用 oneof 关键字,使消息可承载正常数据或错误状态。

服务器端实现:

public StreamObserver<Commodity> bidirectionalListOfPrices(StreamObserver<StreamingCommodityQuote> responseObserver) {

    return new StreamObserver<Commodity>() {
        @Override
        public void onNext(Commodity request) {

            if (request.getAccessToken().equals("123validToken") == false) {

                com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
                  .setCode(Code.NOT_FOUND.getNumber())
                  .setMessage("The access token not found")
                  .addDetails(Any.pack(ErrorInfo.newBuilder()
                    .setReason("Invalid Token")
                    .setDomain("com.baeldung.grpc.errorhandling")
                    .putMetadata("insertToken", "123validToken")
                    .build()))
                  .build();
                StreamingCommodityQuote streamingCommodityQuote = StreamingCommodityQuote.newBuilder()
                  .setStatus(status)
                  .build();
                responseObserver.onNext(streamingCommodityQuote);
            }
            // ...
        }
    }
}

关键区别:通过 responseObserver.onNext() 发送错误消息,**不调用 onError()**,保持连接持续可用。

客户端处理:

public void onNext(StreamingCommodityQuote streamingCommodityQuote) {

    switch (streamingCommodityQuote.getMessageCase()) {
        case COMODITY_QUOTE:
            CommodityQuote commodityQuote = streamingCommodityQuote.getComodityQuote();
            logger.info("RESPONSE producer:" + commodityQuote.getCommodityName() + " price:" + commodityQuote.getPrice());
            break;
        case STATUS:
            com.google.rpc.Status status = streamingCommodityQuote.getStatus();
            logger.info("Status code:" + Code.forNumber(status.getCode()));
            logger.info("Status message:" + status.getMessage());
            for (Any any : status.getDetailsList()) {
                if (any.is(ErrorInfo.class)) {
                    ErrorInfo errorInfo;
                    try {
                        errorInfo = any.unpack(ErrorInfo.class);
                        logger.info("Reason:" + errorInfo.getReason());
                        logger.info("Domain:" + errorInfo.getDomain());
                        logger.info("Insert Token:" + errorInfo.getMetadataMap().get("insertToken"));
                    } catch (InvalidProtocolBufferException e) {
                        logger.error(e.getMessage());
                    }
                }
            }
            break;
        // ...
    }
}

处理逻辑:通过 switch 判断消息类型,分别处理正常响应和错误状态。

5. 总结

本教程演示了 gRPC 中一元调用和流式调用的错误处理实现。gRPC 作为分布式系统通信框架,健壮的错误处理对系统监控至关重要,尤其在微服务架构中。

核心要点:

  1. 基础错误模型:io.grpc.Status(简单但功能有限)
  2. 增强错误模型:com.google.rpc.Status(支持结构化详情)
  3. 流式调用特殊处理:错误嵌入消息体
  4. 自定义错误:通过元数据扩展

示例源码见 GitHub 仓库


原始标题:Error Handling in gRPC | Baeldung