1. 概述
gRPC 是一个高性能的进程间远程过程调用(RPC)框架,可在任何环境中运行。本教程将聚焦 Java 中的 gRPC 错误处理机制。gRPC 凭借低延迟和高吞吐量特性,特别适合微服务架构等复杂场景。在这些分布式系统中,理解网络组件的状态、性能和故障至关重要,因此健壮的错误处理实现是监控系统的核心保障。
2. gRPC 错误处理基础
gRPC 中的错误是一等公民,每次调用要么返回正常载荷消息,要么返回状态错误消息。
错误通过状态消息编码,并在所有支持语言中统一实现。
关键原则:
- 避免在响应载荷中包含错误信息,应始终使用
StreamObserver::onError
,它会自动将错误状态添加到尾部元数据 - 唯一例外是流式调用场景(后文详述)
- 所有 gRPC 库都支持官方错误模型
Java 通过 io.grpc.Status
类封装基础错误模型,需提供标准状态码和可选错误描述。此模型独立于数据编码(如 Protobuf/REST),但无法附带详细错误信息。
若使用 Protobuf 编码,可采用更丰富的 Google API 错误模型,由 com.google.rpc.Status
类实现:
- 提供
com.google.rpc.Code
状态码 - 支持错误描述
- 支持附加 Protobuf 格式的错误详情
- 预定义常用错误类型(在
error_details.proto
中):// 常用错误类位于 com.google.rpc 包 RetryInfo, DebugInfo, QuotaFailure, ErrorInfo, PreconditionFailure, BadRequest, RequestInfo, ResourceInfo, Help
补充方案:可通过键值对形式向 RPC 元数据添加自定义错误信息。
我们将通过一个商品定价服务演示错误处理:客户端发送商品名称,服务器返回价格信息。
3. 一元 RPC 调用错误处理
定义服务接口(commodity_price.proto
):
service CommodityPriceProvider {
rpc getBestCommodityPrice(Commodity) returns (CommodityQuote) {}
}
message Commodity {
string access_token = 1;
string commodity_name = 2;
}
message CommodityQuote {
string commodity_name = 1;
string producer_name = 2;
double price = 3;
}
message ErrorResponse {
string commodity_name = 1;
string access_token = 2;
string expected_token = 3;
string expected_value = 4;
}
服务要求客户端提供 access_token
和 commodity_name
,服务器返回包含商品名、生产商和价格的 CommodityQuote
。ErrorResponse
是自定义错误消息示例。
3.1 使用 io.grpc.Status
响应错误
服务器端实现:
public void getBestCommodityPrice(Commodity request, StreamObserver<CommodityQuote> responseObserver) {
if (commodityLookupBasePrice.get(request.getCommodityName()) == null) {
Metadata.Key<ErrorResponse> errorResponseKey = ProtoUtils.keyForProto(ErrorResponse.getDefaultInstance());
ErrorResponse errorResponse = ErrorResponse.newBuilder()
.setCommodityName(request.getCommodityName())
.setAccessToken(request.getAccessToken())
.setExpectedValue("Only Commodity1, Commodity2 are supported")
.build();
Metadata metadata = new Metadata();
metadata.put(errorResponseKey, errorResponse);
responseObserver.onError(io.grpc.Status.INVALID_ARGUMENT.withDescription("The commodity is not supported")
.asRuntimeException(metadata));
}
// ...
}
关键步骤:
- 构建自定义
ErrorResponse
- 创建元数据键值对并添加到
Metadata
- 通过
asRuntimeException(metadata)
将Status
转为Throwable
- 调用
responseObserver.onError
发送错误
客户端测试:
@Test
public void whenUsingInvalidCommodityName_thenReturnExceptionIoRpcStatus() throws Exception {
Commodity request = Commodity.newBuilder()
.setAccessToken("123validToken")
.setCommodityName("Commodity5")
.build();
StatusRuntimeException thrown = Assertions.assertThrows(StatusRuntimeException.class, () -> blockingStub.getBestCommodityPrice(request));
assertEquals("INVALID_ARGUMENT", thrown.getStatus().getCode().toString());
assertEquals("INVALID_ARGUMENT: The commodity is not supported", thrown.getMessage());
Metadata metadata = Status.trailersFromThrowable(thrown);
ErrorResponse errorResponse = metadata.get(ProtoUtils.keyForProto(ErrorResponse.getDefaultInstance()));
assertEquals("Commodity5",errorResponse.getCommodityName());
assertEquals("123validToken", errorResponse.getAccessToken());
assertEquals("Only Commodity1, Commodity2 are supported", errorResponse.getExpectedValue());
}
踩坑点:使用 Status.trailersFromThrowable
从异常提取元数据,通过 ProtoUtils.keyForProto
获取自定义错误类型的键。
3.2 使用 com.google.rpc.Status
响应错误
服务器端实现:
public void getBestCommodityPrice(Commodity request, StreamObserver<CommodityQuote> responseObserver) {
// ...
if (request.getAccessToken().equals("123validToken") == false) {
com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
.setCode(com.google.rpc.Code.NOT_FOUND.getNumber())
.setMessage("The access token not found")
.addDetails(Any.pack(ErrorInfo.newBuilder()
.setReason("Invalid Token")
.setDomain("com.baeldung.grpc.errorhandling")
.putMetadata("insertToken", "123validToken")
.build()))
.build();
responseObserver.onError(StatusProto.toStatusRuntimeException(status));
}
// ...
}
优势:
- 支持结构化错误详情(示例使用预定义
ErrorInfo
) - 通过
Any.pack()
序列化错误详情 - 使用
StatusProto.toStatusRuntimeException
转换为异常
客户端测试:
@Test
public void whenUsingInvalidRequestToken_thenReturnExceptionGoogleRPCStatus() throws Exception {
Commodity request = Commodity.newBuilder()
.setAccessToken("invalidToken")
.setCommodityName("Commodity1")
.build();
StatusRuntimeException thrown = Assertions.assertThrows(StatusRuntimeException.class,
() -> blockingStub.getBestCommodityPrice(request));
com.google.rpc.Status status = StatusProto.fromThrowable(thrown);
assertNotNull(status);
assertEquals("NOT_FOUND", Code.forNumber(status.getCode()).toString());
assertEquals("The access token not found", status.getMessage());
for (Any any : status.getDetailsList()) {
if (any.is(ErrorInfo.class)) {
ErrorInfo errorInfo = any.unpack(ErrorInfo.class);
assertEquals("Invalid Token", errorInfo.getReason());
assertEquals("com.baeldung.grpc.errorhandling", errorInfo.getDomain());
assertEquals("123validToken", errorInfo.getMetadataMap().get("insertToken"));
}
}
}
关键技巧:通过 StatusProto.fromThrowable
直接解析异常,遍历 getDetailsList()
提取结构化错误详情。
4. gRPC 流式调用错误处理
gRPC 流支持单次 RPC 中传输多条消息。前述错误处理方式在流式场景中不适用,因为:
onError()
必须是 RPC 最后调用的方法- 调用后框架会立即切断连接
解决方案:将错误嵌入消息本身。修改 commodity_price.proto
:
service CommodityPriceProvider {
rpc getBestCommodityPrice(Commodity) returns (CommodityQuote) {}
rpc bidirectionalListOfPrices(stream Commodity) returns (stream StreamingCommodityQuote) {}
}
message Commodity {
string access_token = 1;
string commodity_name = 2;
}
message StreamingCommodityQuote{
oneof message{
CommodityQuote comodity_quote = 1;
google.rpc.Status status = 2;
}
}
核心设计:StreamingCommodityQuote
使用 oneof
关键字,使消息可承载正常数据或错误状态。
服务器端实现:
public StreamObserver<Commodity> bidirectionalListOfPrices(StreamObserver<StreamingCommodityQuote> responseObserver) {
return new StreamObserver<Commodity>() {
@Override
public void onNext(Commodity request) {
if (request.getAccessToken().equals("123validToken") == false) {
com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
.setCode(Code.NOT_FOUND.getNumber())
.setMessage("The access token not found")
.addDetails(Any.pack(ErrorInfo.newBuilder()
.setReason("Invalid Token")
.setDomain("com.baeldung.grpc.errorhandling")
.putMetadata("insertToken", "123validToken")
.build()))
.build();
StreamingCommodityQuote streamingCommodityQuote = StreamingCommodityQuote.newBuilder()
.setStatus(status)
.build();
responseObserver.onNext(streamingCommodityQuote);
}
// ...
}
}
}
关键区别:通过 responseObserver.onNext()
发送错误消息,**不调用 onError()
**,保持连接持续可用。
客户端处理:
public void onNext(StreamingCommodityQuote streamingCommodityQuote) {
switch (streamingCommodityQuote.getMessageCase()) {
case COMODITY_QUOTE:
CommodityQuote commodityQuote = streamingCommodityQuote.getComodityQuote();
logger.info("RESPONSE producer:" + commodityQuote.getCommodityName() + " price:" + commodityQuote.getPrice());
break;
case STATUS:
com.google.rpc.Status status = streamingCommodityQuote.getStatus();
logger.info("Status code:" + Code.forNumber(status.getCode()));
logger.info("Status message:" + status.getMessage());
for (Any any : status.getDetailsList()) {
if (any.is(ErrorInfo.class)) {
ErrorInfo errorInfo;
try {
errorInfo = any.unpack(ErrorInfo.class);
logger.info("Reason:" + errorInfo.getReason());
logger.info("Domain:" + errorInfo.getDomain());
logger.info("Insert Token:" + errorInfo.getMetadataMap().get("insertToken"));
} catch (InvalidProtocolBufferException e) {
logger.error(e.getMessage());
}
}
}
break;
// ...
}
}
处理逻辑:通过 switch
判断消息类型,分别处理正常响应和错误状态。
5. 总结
本教程演示了 gRPC 中一元调用和流式调用的错误处理实现。gRPC 作为分布式系统通信框架,健壮的错误处理对系统监控至关重要,尤其在微服务架构中。
核心要点:
- 基础错误模型:
io.grpc.Status
(简单但功能有限) - 增强错误模型:
com.google.rpc.Status
(支持结构化详情) - 流式调用特殊处理:错误嵌入消息体
- 自定义错误:通过元数据扩展
示例源码见 GitHub 仓库