1. 概述

Kotlin 协程(Coroutines)提供了一种流畅、非阻塞地编写异步程序的方式。它的核心思想来源于 Continuation-passing style(CPS)编程范式。

Kotlin 语言本身提供了协程的基本语法支持,但更实用的功能则来自 kotlinx-coroutines-core 库。本文将从基础开始介绍协程的使用方式,并逐步过渡到使用该库构建更复杂的异步逻辑。


2. 使用 buildSequence 创建协程

我们可以使用 sequence 构建一个协程,用于生成斐波那契数列:

val fibonacciSeq = sequence {
    var a = 0
    var b = 1

    yield(1)

    while (true) {
        yield(a + b)

        val tmp = a + b
        a = b
        b = tmp
    }
}

yield() 是一个 suspend 函数,表示该函数可以挂起当前协程。挂起不会阻塞线程,而是将当前状态保存下来,以便稍后恢复。

注意suspend 函数只能在协程内部调用,否则编译器会报错。

我们可以通过如下方式测试生成器:

val res = fibonacciSeq.take(5).toList()
assertEquals(res, listOf(1, 1, 2, 3, 5))

3. 添加 kotlinx-coroutines 依赖

为了使用更高级的协程功能,我们需要引入 kotlinx-coroutines-core 库:

<dependency>
    <groupId>org.jetbrains.kotlinx</groupId>
    <artifactId>kotlinx-coroutines-core</artifactId>
    <version>1.3.9</version>
</dependency>

引入后,我们就可以使用 launchasyncrunBlocking 等协程构建器。


4. 使用 launch() 构建异步协程

假设我们有一个耗时函数:

suspend fun expensiveComputation(res: MutableList<String>) {
    delay(1000L)
    res.add("word!")
}

我们可以使用 launch 在非阻塞协程中调用它:

@Test
fun givenAsyncCoroutine_whenStartIt_thenShouldExecuteItInTheAsyncWay() {
    val res = mutableListOf<String>()

    runBlocking {
        launch { expensiveComputation(res) }
        res.add("Hello,")
    }

    assertEquals(res, listOf("Hello,", "word!"))
}

⚠️ runBlocking 是一个阻塞协程,用于测试。它确保 assertEquals 在协程执行完成后才执行。

虽然 launch 先执行,但由于 delay() 的存在,主线程会先执行 res.add("Hello,"),再执行 res.add("word!")


5. 协程非常轻量

我们可以轻松创建数十万个协程而不必担心内存溢出问题:

@Test
fun givenHugeAmountOfCoroutines_whenStartIt_thenShouldExecuteItWithoutOutOfMemory() {
    runBlocking<Unit> {
        val counter = AtomicInteger(0)
        val numberOfCoroutines = 100_000

        val jobs = List(numberOfCoroutines) {
            launch {
                delay(1L)
                counter.incrementAndGet()
            }
        }

        jobs.forEach { it.join() }
        assertEquals(counter.get(), numberOfCoroutines)
    }
}

协程底层使用线程池调度,因此不会创建过多线程,性能开销远低于传统线程模型。


6. 取消与超时机制

我们可以通过 JobisActive 属性来判断协程是否处于活动状态:

@Test
fun givenCancellableJob_whenRequestForCancel_thenShouldQuit() {
    runBlocking<Unit> {
        val job = launch(Dispatchers.Default) {
            while (isActive) {
                // println("is working")
            }
        }

        delay(1300L)
        job.cancel()
    }
}

也可以使用 withTimeout() 设置超时机制:

@Test(expected = CancellationException::class)
fun givenAsyncAction_whenDeclareTimeout_thenShouldFinishWhenTimedOut() {
    runBlocking<Unit> {
        withTimeout(1300L) {
            repeat(1000) { i ->
                println("Some expensive computation $i ...")
                delay(500L)
            }
        }
    }
}

⚠️ 如果不设置超时,长时间阻塞的协程可能会导致线程挂起,无法回收。


7. 并发执行异步任务

使用 async() 可以并发执行多个任务:

@Test
fun givenHaveTwoExpensiveAction_whenExecuteThemAsync_thenTheyShouldRunConcurrently() {
    runBlocking<Unit> {
        val delay = 1000L
        val time = measureTimeMillis {
            val one = async(Dispatchers.Default) { someExpensiveComputation(delay) }
            val two = async(Dispatchers.Default) { someExpensiveComputation(delay) }

            runBlocking {
                one.await()
                two.await()
            }
        }

        assertTrue(time < delay * 2)
    }
}

✅ 上述代码并发执行两个任务,总耗时接近 1 秒。

如果使用 CoroutineStart.LAZY,则任务会串行执行:

val one = async(Dispatchers.Default, CoroutineStart.LAZY) { someExpensiveComputation(delay) }
val two = async(Dispatchers.Default, CoroutineStart.LAZY) { someExpensiveComputation(delay) }

⚠️ 这是因为 await() 会阻塞主线程,直到前一个任务完成。


8. 总结

本文介绍了 Kotlin 协程的基本概念和使用方式:

  • 使用 sequence 创建协程,通过 yield() 挂起
  • 使用 launch()async() 构建异步任务
  • 协程轻量且易于并发
  • 支持取消与超时控制
  • 异步任务可以并发执行,但要注意懒加载可能带来的串行问题

所有示例代码都可以在 GitHub 项目 中找到。

建议:多用 async/await 实现并发逻辑,避免使用 runBlocking 阻塞主线程。


原始标题:Introduction to Kotlin Coroutines