1. 概述
Java的核心特性之一是并发编程,它允许多个线程并行执行任务。通过这种方式,我们可以实现异步和非阻塞的指令执行,从而优化资源利用率,特别是在多核CPU环境下。线程分为两种类型:有返回值和无返回值(后者通常指返回void的方法)。
本文将重点探讨如何在线程任务完成后获取返回值。
2. Thread与Runnable
Java线程本质上是一个轻量级进程。我们先看下Java程序的典型工作流程:
Java程序本身是一个运行中的进程,而线程是进程的子集,可以访问主内存并与同进程的其他线程通信。线程具有完整的生命周期和多种状态。实现线程的常见方式是通过Runnable
接口:
public class RunnableExample implements Runnable {
...
@Override
public void run() {
// 执行任务
}
}
启动线程的方式如下:
Thread thread = new Thread(new RunnableExample());
thread.start();
thread.join();
显然,Runnable
无法直接返回值。但我们可以通过wait()
和notify()
实现线程同步。join()
方法会阻塞当前线程直到目标线程执行完成——这在获取异步执行结果时至关重要。
3. Callable
Java 1.5引入了Callable
接口。下面是一个计算阶乘的异步任务示例,使用BigInteger
处理大数结果:
public class CallableFactorialTask implements Callable<BigInteger> {
// 字段和构造函数
@Override
public BigInteger call() throws Exception {
return factorial(BigInteger.valueOf(value));
}
}
配套的阶乘计算器:
public class FactorialCalculator {
public static BigInteger factorial(BigInteger end) {
BigInteger start = BigInteger.ONE;
BigInteger res = BigInteger.ONE;
for (int i = start.add(BigInteger.ONE).intValue(); i <= end.intValue(); i++) {
res = res.multiply(BigInteger.valueOf(i));
}
return res;
}
public static BigInteger factorial(BigInteger start, BigInteger end) {
BigInteger res = start;
for (int i = start.add(BigInteger.ONE).intValue(); i <= end.intValue(); i++) {
res = res.multiply(BigInteger.valueOf(i));
}
return res;
}
}
Callable
只有一个需要重写的call()
方法,该方法返回异步任务的结果。Callable
和Runnable
都是@FunctionalInterface
,但Callable
可以返回值并抛出异常,需要配合Future
使用。
4. 执行Callable
执行Callable
有两种主要方式:使用Future
或Fork/Join框架。
4.1. Callable与Future
Java 1.5引入了Future接口,用于封装异步处理的结果。可以类比为JavaScript中的Promise。
典型场景是同时调用多个接口获取数据,需要等待所有任务完成才能收集结果。Future
会包装响应并等待线程完成,但可能因超时或执行异常而中断。
Future
接口定义如下:
public interface Future<V> {
boolean cancel(boolean var1);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long var1, TimeUnit var3) throws InterruptedException, ExecutionException, TimeoutException;
}
get()
方法是我们等待并获取执行结果的关键。
启动Future
任务需要关联ThreadPool
。下面使用Executor
接口和ExecutorService
实现创建ThreadPoolExecutor
,这里选择缓存线程池演示:
public BigInteger execute(List<CallableFactorialTask> tasks) {
BigInteger result = BigInteger.ZERO;
ExecutorService cachedPool = Executors.newCachedThreadPool();
List<Future<BigInteger>> futures;
try {
futures = cachedPool.invokeAll(tasks);
} catch (InterruptedException e) {
// 异常处理示例
throw new RuntimeException(e);
}
for (Future<BigInteger> future : futures) {
try {
result = result.add(future.get());
} catch (InterruptedException | ExecutionException e) {
// 异常处理示例
throw new RuntimeException(e);
}
}
return result;
}
执行流程示意图:
Executor
会调用所有任务并收集到Future
对象中,随后可从异步处理中获取结果。测试两个阶乘数求和:
@Test
void givenCallableExecutor_whenExecuteFactorial_thenResultOk() {
BigInteger result = callableExecutor.execute(Arrays.asList(new CallableFactorialTask(5), new CallableFactorialTask(3)));
assertEquals(BigInteger.valueOf(126), result);
}
4.2. Callable与Fork/Join
另一种选择是使用ForkJoinPool。它虽然继承自AbstractExecutorService
,但线程创建和组织方式不同。它会将任务拆分为更小的子任务,优化资源避免线程空闲。子任务划分示意图:
主任务被拆分为SubTask1、SubTask3和SubTask4等最小执行单元,最终合并结果。将前例改造为ForkJoinPool
版本:
public BigInteger execute(List<Callable<BigInteger>> forkFactorials) {
List<Future<BigInteger>> futures = forkJoinPool.invokeAll(forkFactorials);
BigInteger result = BigInteger.ZERO;
for (Future<BigInteger> future : futures) {
try {
result = result.add(future.get());
} catch (InterruptedException | ExecutionException e) {
// 异常处理示例
throw new RuntimeException(e);
}
}
return result;
}
只需更换线程池即可获取Future
。测试阶乘任务列表:
@Test
void givenForkExecutor_whenExecuteCallable_thenResultOk() {
assertEquals(BigInteger.valueOf(126),
forkExecutor.execute(Arrays.asList(new CallableFactorialTask(5), new CallableFactorialTask(3))));
}
更灵活的方式是自定义任务拆分逻辑(如基于输入参数或服务负载)。需将任务重写为ForkJoinTask
,使用RecursiveTask
:
public class ForkFactorialTask extends RecursiveTask<BigInteger> {
// 字段和构造函数
@Override
protected BigInteger compute() {
BigInteger factorial = BigInteger.ONE;
if (end - start > threshold) {
int middle = (end + start) / 2;
return factorial.multiply(new ForkFactorialTask(start, middle, threshold).fork()
.join()
.multiply(new ForkFactorialTask(middle + 1, end, threshold).fork()
.join()));
}
return factorial.multiply(factorial(BigInteger.valueOf(start), BigInteger.valueOf(end)));
}
}
当超过阈值时拆分任务,通过invoke()
获取结果:
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
int result = forkJoinPool.invoke(forkFactorialTask);
也可使用submit()
或execute()
,但必须调用join()
完成执行。测试任务拆分:
@Test
void givenForkExecutor_whenExecuteRecursiveTask_thenResultOk() {
assertEquals(BigInteger.valueOf(3628800), forkExecutor.execute(new ForkFactorialTask(10, 5)));
}
此例将10的阶乘拆分为1-5和6-10两个子任务。
5. CompletableFuture
**Java 8引入了CompletableFuture**,显著改进了多线程编程。它消除了Future
的样板代码,增加了链式调用和结果组合等特性。更重要的是,现在可以对任意方法执行异步计算,不再局限于Callable
,还能合并语义不同的多个Future
。
5.1. supplyAsync()
使用CompletableFuture
非常简单:
CompletableFuture<BigInteger> future = CompletableFuture.supplyAsync(() -> factorial(BigInteger.valueOf(10)));
...
BigInteger result = future.get();
不再需要Callable
,可直接传递lambda表达式。测试阶乘计算:
@Test
void givenCompletableFuture_whenSupplyAsyncFactorial_thenResultOk() throws ExecutionException, InterruptedException {
CompletableFuture<BigInteger> completableFuture = CompletableFuture.supplyAsync(() -> factorial(BigInteger.valueOf(10)));
assertEquals(BigInteger.valueOf(3628800), completableFuture.get());
}
未指定线程池时默认使用ForkJoinPool
。也可指定线程池(如固定大小线程池):
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> factorial(BigInteger.valueOf(10)), Executors.newFixedThreadPool(1));
5.2. thenCompose()
可创建顺序执行的Future
链。假设第二个任务依赖第一个任务的输出:
CompletableFuture<BigInteger> completableFuture = CompletableFuture.supplyAsync(() -> factorial(BigInteger.valueOf(3)))
.thenCompose(inputFromFirstTask -> CompletableFuture.supplyAsync(() -> factorial(inputFromFirstTask)));
BigInteger result = completableFuture.get();
thenCompose()
方法可将前一个CompletableFuture
的返回值传递给下一个任务。测试阶乘链式调用(3! = 6 → 6! = 720):
@Test
void givenCompletableFuture_whenComposeTasks_thenResultOk() throws ExecutionException, InterruptedException {
CompletableFuture<BigInteger> completableFuture = CompletableFuture.supplyAsync(() -> factorial(BigInteger.valueOf(3)))
.thenCompose(inputFromFirstTask -> CompletableFuture.supplyAsync(() -> factorial(inputFromFirstTask)));
assertEquals(BigInteger.valueOf(720), completableFuture.get());
}
5.3. allOf()
使用静态方法allOf()
可并行执行多个Future
。收集异步结果只需添加到allOf()
并调用join()
完成:
BigInteger result = allOf(asyncTask1, asyncTask2)
.thenApplyAsync(fn -> factorial(factorialTask1.join()).add(factorial(new BigInteger(factorialTask2.join()))), Executors.newFixedThreadPool(1)).join();
注意:allOf()返回void,需手动从单个Future
获取结果。且同一执行中可运行不同返回类型的Future
。
测试合并不同类型的阶乘任务(数值型和字符串型):
@Test
void givenCompletableFuture_whenAllOfTasks_thenResultOk() {
CompletableFuture<BigInteger> asyncTask1 = CompletableFuture.supplyAsync(() -> BigInteger.valueOf(5));
CompletableFuture<String> asyncTask2 = CompletableFuture.supplyAsync(() -> "3");
BigInteger result = allOf(asyncTask1, asyncTask2)
.thenApplyAsync(fn -> factorial(asyncTask1.join()).add(factorial(new BigInteger(asyncTask2.join()))), Executors.newFixedThreadPool(1))
.join();
assertEquals(BigInteger.valueOf(126), result);
}
6. 结论
本文探讨了从线程获取返回值的多种方法:
- ✅
Callable
配合Future
和线程池:Future
包装结果并等待任务完成 - ✅
ForkJoinPool
:将任务拆分为子任务优化执行 - ✅
CompletableFuture
(Java 8+):支持任意lambda表达式、链式调用和结果组合
通过阶乘计算示例,我们实践了Future
、Fork/Join和CompletableFuture
的使用。完整代码示例可在GitHub获取。