1. 概述

在这个教程中,我们将深入探讨如何使用Java的响应式编程来解决一个有趣的问题:如何将Flux<DataBuffer>读入单个InputStream

2. 请求设置

要解决将Flux<DataBuffer>读入单个InputStream的问题,首先我们会使用Spring的响应式WebClient进行GET请求。此外,我们可以使用gorest.co.in提供的其中一个公共API端点来进行测试场景:

String REQUEST_ENDPOINT = "https://gorest.co.in/public/v2/users";

接下来,定义getWebClient()方法以获取WebClient类的新实例:

static WebClient getWebClient() {
    WebClient.Builder webClientBuilder = WebClient.builder();
    return webClientBuilder.build();
}

现在我们已经准备好向*/public/v2/users*端点发送GET请求。然而,我们需要得到响应体作为Flux<DataBuffer>对象。让我们进入下一节,了解如何通过BodyExtractors来实现这一点。

3. BodyExtractorsDataBufferUtils

我们可以使用spring-webflux中的BodyExtractors类的toDataBuffers()方法将响应体提取为Flux<DataBuffer>

创建一个名为bodyFlux<DataBuffer>类型的实例:

Flux<DataBuffer> body = client
  .get(
  .uri(REQUEST_ENDPOINT)
    .exchangeToFlux( clientResponse -> {
        return clientResponse.body(BodyExtractors.toDataBuffers());
    });

为了将这些DataBuffer流收集到单个InputStream中,一个有效的策略是使用PipedInputStreamPipedOutputStream。我们打算向PipedOutputStream写入,并最终从PipedInputStream读取。让我们看看如何创建这两个连接的流:

PipedOutputStream outputStream = new PipedOutputStream();
PipedInputStream inputStream = new PipedInputStream(1024*10);
inputStream.connect(outputStream);

请注意,默认大小为1024字节。但预期从Flux<DataBuffer>收集的结果可能会超过默认值,因此我们需要显式指定一个较大的值,这里设为1024*10。

最后,我们使用DataBufferUtils类的write()实用方法将body作为发布者写入outputStream

DataBufferUtils.write(body, outputStream).subscribe();

在声明时,我们已将inputStream连接到outputStream。现在我们可以从inputStream读取了。让我们进入下一节,实际操作一下。

4. 从PipedInputStream读取内容

首先,定义一个辅助方法readContent(),将InputStream读取为String对象:

String readContent(InputStream stream) throws IOException {
    StringBuffer contentStringBuffer = new StringBuffer();
    byte[] tmp = new byte[stream.available()];
    int byteCount = stream.read(tmp, 0, tmp.length);
    contentStringBuffer.append(new String(tmp));
    return String.valueOf(contentStringBuffer);
}

由于通常在不同的线程中读取PipedInputStream,我们创建一个名为readContentFromPipedInputStream()的方法,它内部启动一个新的线程,通过调用readContent()方法从PipedInputStream读取内容并将其转换为String对象:

String readContentFromPipedInputStream(PipedInputStream stream) throws IOException {
    StringBuffer contentStringBuffer = new StringBuffer();
    try {
        Thread pipeReader = new Thread(() -> {
            try {
                contentStringBuffer.append(readContent(stream));
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
        pipeReader.start();
        pipeReader.join();
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    } finally {
        stream.close();
    }

    return String.valueOf(contentStringBuffer);
}

至此,我们的代码已经准备就绪用于模拟。让我们实际运行看看:

WebClient webClient = getWebClient();
InputStream inputStream = getResponseAsInputStream(webClient, REQUEST_ENDPOINT);
Thread.sleep(3000);
String content = readContentFromPipedInputStream((PipedInputStream) inputStream);
logger.info("response content: \n{}", content.replace("}","}\n"));

由于我们正在处理一个异步系统,我们在从流中读取之前会延迟3秒,以便能看到完整的响应。同时,在日志记录时插入换行符,以将长输出拆分为多行。

最后,让我们检查代码执行产生的输出:

20:45:04.120 [main] INFO com.baeldung.databuffer.DataBufferToInputStream - response content: 
[{"id":2642,"name":"Bhupen Trivedi","email":"[email protected]","gender":"male","status":"active"}
,{"id":2637,"name":"Preity Patel","email":"[email protected]","gender":"female","status":"inactive"}
,{"id":2633,"name":"Brijesh Shah","email":"[email protected]","gender":"male","status":"inactive"}
...
,{"id":2623,"name":"Mohini Mishra","email":"[email protected]","gender":"female","status":"inactive"}
]

就这样!看起来一切都正确。

5. 总结

在这篇文章中,我们利用了管道流的概念以及BodyExtractorsDataBufferUtils类中的工具方法,将Flux<DataBuffer>读入单个InputStream

如往常一样,完整的教程源代码可以在GitHub上找到:eugenp/tutorials


« 上一篇: Java Weekly, 第448期
» 下一篇: 测试Spring JMS