1. 概述
在这个教程中,我们将深入探讨如何使用Java的响应式编程来解决一个有趣的问题:如何将Flux<DataBuffer>
读入单个InputStream
。
2. 请求设置
要解决将Flux<DataBuffer>
读入单个InputStream
的问题,首先我们会使用Spring的响应式WebClient进行GET请求。此外,我们可以使用gorest.co.in提供的其中一个公共API端点来进行测试场景:
String REQUEST_ENDPOINT = "https://gorest.co.in/public/v2/users";
接下来,定义getWebClient()
方法以获取WebClient
类的新实例:
static WebClient getWebClient() {
WebClient.Builder webClientBuilder = WebClient.builder();
return webClientBuilder.build();
}
现在我们已经准备好向*/public/v2/users*端点发送GET请求。然而,我们需要得到响应体作为Flux<DataBuffer>
对象。让我们进入下一节,了解如何通过BodyExtractors
来实现这一点。
3. BodyExtractors
和DataBufferUtils
我们可以使用spring-webflux
中的BodyExtractors
类的toDataBuffers()
方法将响应体提取为Flux<DataBuffer>
。
创建一个名为body
的Flux<DataBuffer>
类型的实例:
Flux<DataBuffer> body = client
.get(
.uri(REQUEST_ENDPOINT)
.exchangeToFlux( clientResponse -> {
return clientResponse.body(BodyExtractors.toDataBuffers());
});
为了将这些DataBuffer
流收集到单个InputStream
中,一个有效的策略是使用PipedInputStream
和PipedOutputStream
。我们打算向PipedOutputStream
写入,并最终从PipedInputStream
读取。让我们看看如何创建这两个连接的流:
PipedOutputStream outputStream = new PipedOutputStream();
PipedInputStream inputStream = new PipedInputStream(1024*10);
inputStream.connect(outputStream);
请注意,默认大小为1024字节。但预期从Flux<DataBuffer>
收集的结果可能会超过默认值,因此我们需要显式指定一个较大的值,这里设为1024*10。
最后,我们使用DataBufferUtils
类的write()
实用方法将body
作为发布者写入outputStream
:
DataBufferUtils.write(body, outputStream).subscribe();
在声明时,我们已将inputStream
连接到outputStream
。现在我们可以从inputStream
读取了。让我们进入下一节,实际操作一下。
4. 从PipedInputStream
读取内容
首先,定义一个辅助方法readContent()
,将InputStream
读取为String
对象:
String readContent(InputStream stream) throws IOException {
StringBuffer contentStringBuffer = new StringBuffer();
byte[] tmp = new byte[stream.available()];
int byteCount = stream.read(tmp, 0, tmp.length);
contentStringBuffer.append(new String(tmp));
return String.valueOf(contentStringBuffer);
}
由于通常在不同的线程中读取PipedInputStream
,我们创建一个名为readContentFromPipedInputStream()
的方法,它内部启动一个新的线程,通过调用readContent()
方法从PipedInputStream
读取内容并将其转换为String
对象:
String readContentFromPipedInputStream(PipedInputStream stream) throws IOException {
StringBuffer contentStringBuffer = new StringBuffer();
try {
Thread pipeReader = new Thread(() -> {
try {
contentStringBuffer.append(readContent(stream));
} catch (IOException e) {
throw new RuntimeException(e);
}
});
pipeReader.start();
pipeReader.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
stream.close();
}
return String.valueOf(contentStringBuffer);
}
至此,我们的代码已经准备就绪用于模拟。让我们实际运行看看:
WebClient webClient = getWebClient();
InputStream inputStream = getResponseAsInputStream(webClient, REQUEST_ENDPOINT);
Thread.sleep(3000);
String content = readContentFromPipedInputStream((PipedInputStream) inputStream);
logger.info("response content: \n{}", content.replace("}","}\n"));
由于我们正在处理一个异步系统,我们在从流中读取之前会延迟3秒,以便能看到完整的响应。同时,在日志记录时插入换行符,以将长输出拆分为多行。
最后,让我们检查代码执行产生的输出:
20:45:04.120 [main] INFO com.baeldung.databuffer.DataBufferToInputStream - response content:
[{"id":2642,"name":"Bhupen Trivedi","email":"[email protected]","gender":"male","status":"active"}
,{"id":2637,"name":"Preity Patel","email":"[email protected]","gender":"female","status":"inactive"}
,{"id":2633,"name":"Brijesh Shah","email":"[email protected]","gender":"male","status":"inactive"}
...
,{"id":2623,"name":"Mohini Mishra","email":"[email protected]","gender":"female","status":"inactive"}
]
就这样!看起来一切都正确。
5. 总结
在这篇文章中,我们利用了管道流的概念以及BodyExtractors
和DataBufferUtils
类中的工具方法,将Flux<DataBuffer>
读入单个InputStream
。
如往常一样,完整的教程源代码可以在GitHub上找到:eugenp/tutorials。