1. 概述

在 Kotlin 协程中,当我们需要异步地表示一系列数据流并对其进行处理时,Flow 是首选工具。它借鉴了响应式编程的思想,设计上类似于 Reactive Streams 中的 Publisher

你也可以把 Flow 理解为一个**异步版本的 Sequence**:与 Sequence 在调用 next() 时阻塞不同,Flow 通过 suspend 机制挂起协程,让线程去做其他更有意义的事。

关键特性:

  • Flow 是“冷”流(cold flow),这意味着它本身不持有任何资源,也不会主动执行。
  • 只有当调用终端操作符(如 collect())时,数据流才会被触发执行。
  • 相比之下,Channel 是“热”的,会主动生产数据并可能占用资源。

本文将带你深入理解如何创建、使用和组合 Flow,以及它的常见应用场景和注意事项。


2. 构建与消费 Flow

创建 Flow 的方式

最基础的方式是使用 flow {} 构建器,类似 sequence {},内部通过 emit() 发送数据:

val flowWithSingleValue = flow { emit(0) }

此外,集合、区间或序列可以通过 .asFlow() 扩展函数转换为 Flow

val flowFromList = listOf("mene", "mene", "tekel", "upharsin").asFlow()

消费 Flow

使用 collect() 是最常见的消费方式。下面这个例子将列表元素拼接成字符串:

val inscription = buildString {
    listOf("mene", "mene", "tekel", "upharsin")
      .asFlow()
      .collect { append(it).append(", ") }
}.removeSuffix(", ")
assertEquals(inscription, "mene, mene, tekel, upharsin")

⚠️ 注意:collect()挂起函数,必须在协程作用域中调用。

其他终端操作符

除了 collect(),还有多个常用的终端操作符,它们都会触发流的执行:

val data = flow { "PEACE".asSequence().forEach { emit(it.toString()) } }

val charList = data.toList()        // ✅ 转为 List
assertEquals(5, charList.size)

val symbols = data.toSet()          // ✅ 去重转 Set
assertEquals(4, symbols.size)

val word = data.reduce { acc, c -> acc + c }  // ✅ 归约
assertEquals("PEACE", word)

val firstLetter = data.first()      // ✅ 取第一个
assertEquals("P", firstLetter)

❗注意:上述代码中的 data 是同一个 Flow 实例,但由于 Flow 是冷流,每次调用终端操作都会重新执行发射逻辑。


3. 转换 Flow 数据

在最终消费前,我们通常会对 Flow 进行一系列变换。常用的操作符包括:

基础变换:mapfilter

val secretCodes = setOf(65, 67, 69, 80)
val symbols = (60..100).asFlow()
    .filter { it in secretCodes }
    .map { Char(it) }
    .toList()

通用变换:transform {}

所有变换操作底层都基于 transform {},它可以灵活控制何时 emit()

val monthsOfSorrow = Month.values().asFlow()
    .transform {
        if (it <= Month.MAY)
            emit(it)
    }
    .toList()

限制数量:take(n)

如果你想只取前 N 个元素,可以用 take()

Month.values().asFlow().take(5).toList()

⚠️ 踩坑提醒:take(n) 内部通过抛出异常中断流,因此要确保上游能正确处理取消(CancellationException 不应被捕获)。


4. Flow 作为状态更新流

有些场景下,Flow 表示的是某个状态的持续更新,比如页面访问量、传感器数据等。这类流往往是无限的,我们只关心最新的值。

使用 conflate() 合并中间值

conflate() 会跳过收集器处理期间产生的中间值,只保留最新一个待处理:

val buffer = mutableListOf<Int>()
(1..10).asFlow()
    .transform {
        delay(10)
        emit(it)
    }
    .conflate()
    .collect{
        delay(50)
        buffer.add(it)
    }
assertTrue { buffer.size < 10}

输出结果中 buffer.size 通常远小于 10,说明大量中间值被跳过。

使用 collectLatest {} 处理最新值

如果你只想处理每个新值,并且希望前一个未完成的任务自动取消,使用 collectLatest

var latest = -1
(1..10).asFlow()
    .collectLatest { 
        latest = it 
    }
assertEquals(10, latest)

collectLatest 会取消正在执行的 lambda,一旦新值到来。

类似操作符还有 transformLatestonEachLatest 等,统称 *Latest 家族。


5. 组合多个 Flow

有时我们需要合并两个数据流,Kotlin 提供了两种主要方式。

zip:按顺序配对

zip 将两个流按发射顺序一一对应组合:

val codes = listOf(80, 69, 65, 67).asFlow()
val symbols = listOf('P', 'A', 'C', 'E').asFlow()
val list = buildList {
    codes.zip(symbols) { code, symbol -> add("$code -> $symbol") }.collect()
}
assertEquals(listOf("80 -> P", "69 -> A", "65 -> C", "67 -> E"), list)

输出长度由较短的那个流决定。

combine:任一更新即重组

combine 更适合“状态合并”场景。只要任意一个流发出新值,就会立即重新计算:

val arrowsI = listOf("v", "<", "^", ">").asFlow().map { delay(30); it }
val arrowsII = listOf("v", ">", "^", "<").asFlow().map { delay(20); it }
arrowsI.combine(arrowsII) { one, two -> "$one $two" }
    .collect { println(it) }

输出示例:

v v
v >
< >
< ^
^ <
> <

✅ 适用场景:UI 中两个动态数据源联动刷新。


6. 异常处理

Flow 中任何环节都可能抛出异常 —— 发射、转换、收集阶段都有风险。

错误做法:在 try-catch 中 emit

// ❌ 不推荐
flow {
    try {
        emit(fetchData())
    } catch (e: Exception) {
        emit(defaultValue)
    }
}

这违反了 Flow 的异常透明原则,可能导致异常无法传递到下游。

正确做法:使用 catch 操作符

val result = (1..10).asFlow()
    .transform {
        if (it > 3) throw IllegalArgumentException("Too much")
        emit(it)
    }
    .catch {
        when (it) {
            is IllegalArgumentException -> emit(3)
            else -> throw it // 未知异常继续抛出
        }
    }
    .toList()
assertEquals(listOf(1, 2, 3, 3), result)

catch 只捕获其上游的异常,下游仍可继续抛出。


7. Flow 的背压机制

Flow 天然支持背压(Back-pressure),这是它相比 Channel 或传统回调的一大优势。

因为 emit() 是挂起函数,只有当消费者准备好接收下一个值时,生产者才会继续执行:

val fastFlow = flow {
    (1..10).forEach {
        println("Before waiting: $it")
        delay(10)
        println("About to emit: $it")
        emit(Char(60 + it))
        println("After emitting: $it")
    }
}

fastFlow.collect {
    Thread.sleep(100) // 模拟耗时操作
    println("Collected $it")
}

输出节选:

Before waiting: 1
About to emit: 1
Collected =
After emitting: 1
Before waiting: 2
About to emit: 2
Collected >
After emitting: 2

✅ 可见:每发送一个值后,必须等 collect 结束才能继续下一轮循环。


8. Flow 上下文切换

虽然 Flow 本身不持有线程资源,但它的执行依赖于协程上下文。

默认行为:继承收集者上下文

flow {
    assertTrue { "main" in Thread.currentThread().name } // 默认运行在 collector 的 context
    emit("Hello")
}.collect {
    println(it)
}

使用 flowOn() 切换发射上下文

若想让发射逻辑运行在 IO 线程,使用 flowOn()

flow {
    (1..10).forEach { emit(it) }
    assertTrue { "DefaultDispatcher-worker" in Thread.currentThread().name }
}
.flowOn(Dispatchers.IO)
.collect {
    println(it)
    assertTrue { "main" in Thread.currentThread().name }
}

flowOn() 可多次调用,最近的一个生效。

⚠️ 踩坑提醒:flowOn() 影响的是其前面的所有操作。如果写在最后,只会影响前面的链式调用。


9. 总结

Flow 是 Kotlin 协程生态中用于异步数据流处理的核心抽象,具备以下特点:

冷流设计:无订阅则无执行,安全高效
类比 Sequence:API 设计熟悉,学习成本低
天然背压支持:生产者不会压垮消费者
丰富的操作符:支持 map、filter、zip、combine、catch 等
上下文可控:通过 flowOn() 灵活调度线程

📌 最佳实践建议:

  • 避免在 try-catch 中直接 emit
  • 使用 *Latest 操作符处理高频更新
  • 合理使用 conflate() 减少不必要的处理
  • 多个数据源联动优先考虑 combine

掌握 Flow,就等于掌握了 Kotlin 异步编程的“高级语法”,无论是网络请求、数据库监听还是 UI 状态管理,都能游刃有余。


原始标题:Introduction to the Kotlin Flow Class