1. 概述

Java的核心特性之一是并发编程,它允许多个线程并行执行任务。通过这种方式,我们可以实现异步和非阻塞的指令执行,从而优化资源利用率,特别是在多核CPU环境下。线程分为两种类型:有返回值和无返回值(后者通常指返回void的方法)。

本文将重点探讨如何在线程任务完成后获取返回值

2. Thread与Runnable

Java线程本质上是一个轻量级进程。我们先看下Java程序的典型工作流程:

Java线程示意图

Java程序本身是一个运行中的进程,而线程是进程的子集,可以访问主内存并与同进程的其他线程通信。线程具有完整的生命周期和多种状态。实现线程的常见方式是通过Runnable接口:

public class RunnableExample implements Runnable {
    ...
    @Override
    public void run() {
        // 执行任务
    }
}

启动线程的方式如下:

Thread thread = new Thread(new RunnableExample());
thread.start();
thread.join();

显然,Runnable无法直接返回值。但我们可以通过wait()notify()实现线程同步。join()方法会阻塞当前线程直到目标线程执行完成——这在获取异步执行结果时至关重要。

3. Callable

Java 1.5引入了Callable接口。下面是一个计算阶乘的异步任务示例,使用BigInteger处理大数结果:

public class CallableFactorialTask implements Callable<BigInteger> {
    // 字段和构造函数
    @Override
    public BigInteger call() throws Exception {
        return factorial(BigInteger.valueOf(value));
    }
}

配套的阶乘计算器:

public class FactorialCalculator {

    public static BigInteger factorial(BigInteger end) {
        BigInteger start = BigInteger.ONE;
        BigInteger res = BigInteger.ONE;

        for (int i = start.add(BigInteger.ONE).intValue(); i <= end.intValue(); i++) {
            res = res.multiply(BigInteger.valueOf(i));
        }

        return res;
    }

    public static BigInteger factorial(BigInteger start, BigInteger end) {
        BigInteger res = start;

        for (int i = start.add(BigInteger.ONE).intValue(); i <= end.intValue(); i++) {
            res = res.multiply(BigInteger.valueOf(i));
        }

        return res;
    }
}

Callable只有一个需要重写的call()方法,该方法返回异步任务的结果。CallableRunnable都是@FunctionalInterface,但Callable可以返回值并抛出异常,需要配合Future使用。

4. 执行Callable

执行Callable有两种主要方式:使用Future或Fork/Join框架。

4.1. Callable与Future

Java 1.5引入了Future接口,用于封装异步处理的结果。可以类比为JavaScript中的Promise

典型场景是同时调用多个接口获取数据,需要等待所有任务完成才能收集结果。Future会包装响应并等待线程完成,但可能因超时或执行异常而中断。

Future接口定义如下:

public interface Future<V> {
    boolean cancel(boolean var1);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long var1, TimeUnit var3) throws InterruptedException, ExecutionException, TimeoutException;
}

get()方法是我们等待并获取执行结果的关键

启动Future任务需要关联ThreadPool。下面使用Executor接口和ExecutorService实现创建ThreadPoolExecutor,这里选择缓存线程池演示:

public BigInteger execute(List<CallableFactorialTask> tasks) {

    BigInteger result = BigInteger.ZERO;

    ExecutorService cachedPool = Executors.newCachedThreadPool();

    List<Future<BigInteger>> futures;

    try {
        futures = cachedPool.invokeAll(tasks);
    } catch (InterruptedException e) {
        // 异常处理示例
        throw new RuntimeException(e);
    }

    for (Future<BigInteger> future : futures) {
        try {
            result = result.add(future.get());
        } catch (InterruptedException | ExecutionException e) {
            // 异常处理示例
            throw new RuntimeException(e);
        }
    }

    return result;
}

执行流程示意图:

Callable与Future交互图

Executor会调用所有任务并收集到Future对象中,随后可从异步处理中获取结果。测试两个阶乘数求和:

@Test
void givenCallableExecutor_whenExecuteFactorial_thenResultOk() {
    BigInteger result = callableExecutor.execute(Arrays.asList(new CallableFactorialTask(5), new CallableFactorialTask(3)));
    assertEquals(BigInteger.valueOf(126), result);
}

4.2. Callable与Fork/Join

另一种选择是使用ForkJoinPool。它虽然继承自AbstractExecutorService,但线程创建和组织方式不同。它会将任务拆分为更小的子任务,优化资源避免线程空闲。子任务划分示意图:

Fork任务拆分图

主任务被拆分为SubTask1、SubTask3和SubTask4等最小执行单元,最终合并结果。将前例改造为ForkJoinPool版本:

public BigInteger execute(List<Callable<BigInteger>> forkFactorials) {
    List<Future<BigInteger>> futures = forkJoinPool.invokeAll(forkFactorials);

    BigInteger result = BigInteger.ZERO;

    for (Future<BigInteger> future : futures) {
        try {
            result = result.add(future.get());
        } catch (InterruptedException | ExecutionException e) {
            // 异常处理示例
            throw new RuntimeException(e);
        }
    }

    return result;
}

只需更换线程池即可获取Future。测试阶乘任务列表:

@Test
void givenForkExecutor_whenExecuteCallable_thenResultOk() {
    assertEquals(BigInteger.valueOf(126), 
      forkExecutor.execute(Arrays.asList(new CallableFactorialTask(5), new CallableFactorialTask(3))));
}

更灵活的方式是自定义任务拆分逻辑(如基于输入参数或服务负载)。需将任务重写为ForkJoinTask,使用RecursiveTask

public class ForkFactorialTask extends RecursiveTask<BigInteger> {
    // 字段和构造函数

    @Override
    protected BigInteger compute() {

        BigInteger factorial = BigInteger.ONE;

        if (end - start > threshold) {

            int middle = (end + start) / 2;

            return factorial.multiply(new ForkFactorialTask(start, middle, threshold).fork()
              .join()
              .multiply(new ForkFactorialTask(middle + 1, end, threshold).fork()
                .join()));
        }

        return factorial.multiply(factorial(BigInteger.valueOf(start), BigInteger.valueOf(end)));
    }
}

当超过阈值时拆分任务,通过invoke()获取结果:

ForkJoinPool forkJoinPool = ForkJoinPool.commonPool(); 
int result = forkJoinPool.invoke(forkFactorialTask);

也可使用submit()execute(),但必须调用join()完成执行。测试任务拆分:

@Test
void givenForkExecutor_whenExecuteRecursiveTask_thenResultOk() {
    assertEquals(BigInteger.valueOf(3628800), forkExecutor.execute(new ForkFactorialTask(10, 5)));
}

此例将10的阶乘拆分为1-5和6-10两个子任务。

5. CompletableFuture

**Java 8引入了CompletableFuture**,显著改进了多线程编程。它消除了Future的样板代码,增加了链式调用和结果组合等特性。更重要的是,现在可以对任意方法执行异步计算,不再局限于Callable,还能合并语义不同的多个Future

5.1. supplyAsync()

使用CompletableFuture非常简单:

CompletableFuture<BigInteger> future = CompletableFuture.supplyAsync(() -> factorial(BigInteger.valueOf(10)));
...
BigInteger result = future.get();

不再需要Callable,可直接传递lambda表达式。测试阶乘计算:

@Test
void givenCompletableFuture_whenSupplyAsyncFactorial_thenResultOk() throws ExecutionException, InterruptedException {
    CompletableFuture<BigInteger> completableFuture = CompletableFuture.supplyAsync(() -> factorial(BigInteger.valueOf(10)));
    assertEquals(BigInteger.valueOf(3628800), completableFuture.get());
}

未指定线程池时默认使用ForkJoinPool。也可指定线程池(如固定大小线程池):

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> factorial(BigInteger.valueOf(10)), Executors.newFixedThreadPool(1));

5.2. thenCompose()

可创建顺序执行的Future链。假设第二个任务依赖第一个任务的输出:

CompletableFuture<BigInteger> completableFuture = CompletableFuture.supplyAsync(() -> factorial(BigInteger.valueOf(3)))
   .thenCompose(inputFromFirstTask -> CompletableFuture.supplyAsync(() -> factorial(inputFromFirstTask)));

BigInteger result = completableFuture.get();

thenCompose()方法可将前一个CompletableFuture的返回值传递给下一个任务。测试阶乘链式调用(3! = 6 → 6! = 720):

@Test
void givenCompletableFuture_whenComposeTasks_thenResultOk() throws ExecutionException, InterruptedException {
    CompletableFuture<BigInteger> completableFuture = CompletableFuture.supplyAsync(() -> factorial(BigInteger.valueOf(3)))
      .thenCompose(inputFromFirstTask -> CompletableFuture.supplyAsync(() -> factorial(inputFromFirstTask)));
    assertEquals(BigInteger.valueOf(720), completableFuture.get());
}

5.3. allOf()

使用静态方法allOf()可并行执行多个Future。收集异步结果只需添加到allOf()并调用join()完成:

BigInteger result = allOf(asyncTask1, asyncTask2)
  .thenApplyAsync(fn -> factorial(factorialTask1.join()).add(factorial(new BigInteger(factorialTask2.join()))), Executors.newFixedThreadPool(1)).join();

注意:allOf()返回void,需手动从单个Future获取结果。且同一执行中可运行不同返回类型的Future

测试合并不同类型的阶乘任务(数值型和字符串型):

@Test
void givenCompletableFuture_whenAllOfTasks_thenResultOk() {
    CompletableFuture<BigInteger> asyncTask1 = CompletableFuture.supplyAsync(() -> BigInteger.valueOf(5));
    CompletableFuture<String> asyncTask2 = CompletableFuture.supplyAsync(() -> "3");

    BigInteger result = allOf(asyncTask1, asyncTask2)
      .thenApplyAsync(fn -> factorial(asyncTask1.join()).add(factorial(new BigInteger(asyncTask2.join()))), Executors.newFixedThreadPool(1))
        .join();

    assertEquals(BigInteger.valueOf(126), result);
}

6. 结论

本文探讨了从线程获取返回值的多种方法:

  • Callable配合Future和线程池:Future包装结果并等待任务完成
  • ForkJoinPool:将任务拆分为子任务优化执行
  • CompletableFuture(Java 8+):支持任意lambda表达式、链式调用和结果组合

通过阶乘计算示例,我们实践了Future、Fork/Join和CompletableFuture的使用。完整代码示例可在GitHub获取。


原始标题:Returning a Value After Finishing Thread’s Job in Java