1. 概述
在 Kotlin 协程中,当我们需要异步地表示一系列数据流并对其进行处理时,Flow
是首选工具。它借鉴了响应式编程的思想,设计上类似于 Reactive Streams 中的 Publisher
。
你也可以把 Flow
理解为一个**异步版本的 Sequence
**:与 Sequence
在调用 next()
时阻塞不同,Flow
通过 suspend 机制挂起协程,让线程去做其他更有意义的事。
✅ 关键特性:
Flow
是“冷”流(cold flow),这意味着它本身不持有任何资源,也不会主动执行。- 只有当调用终端操作符(如
collect()
)时,数据流才会被触发执行。 - 相比之下,
Channel
是“热”的,会主动生产数据并可能占用资源。
本文将带你深入理解如何创建、使用和组合 Flow
,以及它的常见应用场景和注意事项。
2. 构建与消费 Flow
创建 Flow 的方式
最基础的方式是使用 flow {}
构建器,类似 sequence {}
,内部通过 emit()
发送数据:
val flowWithSingleValue = flow { emit(0) }
此外,集合、区间或序列可以通过 .asFlow()
扩展函数转换为 Flow
:
val flowFromList = listOf("mene", "mene", "tekel", "upharsin").asFlow()
消费 Flow
使用 collect()
是最常见的消费方式。下面这个例子将列表元素拼接成字符串:
val inscription = buildString {
listOf("mene", "mene", "tekel", "upharsin")
.asFlow()
.collect { append(it).append(", ") }
}.removeSuffix(", ")
assertEquals(inscription, "mene, mene, tekel, upharsin")
⚠️ 注意:collect()
是挂起函数,必须在协程作用域中调用。
其他终端操作符
除了 collect()
,还有多个常用的终端操作符,它们都会触发流的执行:
val data = flow { "PEACE".asSequence().forEach { emit(it.toString()) } }
val charList = data.toList() // ✅ 转为 List
assertEquals(5, charList.size)
val symbols = data.toSet() // ✅ 去重转 Set
assertEquals(4, symbols.size)
val word = data.reduce { acc, c -> acc + c } // ✅ 归约
assertEquals("PEACE", word)
val firstLetter = data.first() // ✅ 取第一个
assertEquals("P", firstLetter)
❗注意:上述代码中的
data
是同一个Flow
实例,但由于Flow
是冷流,每次调用终端操作都会重新执行发射逻辑。
3. 转换 Flow 数据
在最终消费前,我们通常会对 Flow
进行一系列变换。常用的操作符包括:
基础变换:map
与 filter
val secretCodes = setOf(65, 67, 69, 80)
val symbols = (60..100).asFlow()
.filter { it in secretCodes }
.map { Char(it) }
.toList()
通用变换:transform {}
所有变换操作底层都基于 transform {}
,它可以灵活控制何时 emit()
:
val monthsOfSorrow = Month.values().asFlow()
.transform {
if (it <= Month.MAY)
emit(it)
}
.toList()
限制数量:take(n)
如果你想只取前 N 个元素,可以用 take()
:
Month.values().asFlow().take(5).toList()
⚠️ 踩坑提醒:take(n)
内部通过抛出异常中断流,因此要确保上游能正确处理取消(CancellationException 不应被捕获)。
4. Flow 作为状态更新流
有些场景下,Flow
表示的是某个状态的持续更新,比如页面访问量、传感器数据等。这类流往往是无限的,我们只关心最新的值。
使用 conflate()
合并中间值
conflate()
会跳过收集器处理期间产生的中间值,只保留最新一个待处理:
val buffer = mutableListOf<Int>()
(1..10).asFlow()
.transform {
delay(10)
emit(it)
}
.conflate()
.collect{
delay(50)
buffer.add(it)
}
assertTrue { buffer.size < 10}
输出结果中 buffer.size
通常远小于 10,说明大量中间值被跳过。
使用 collectLatest {}
处理最新值
如果你只想处理每个新值,并且希望前一个未完成的任务自动取消,使用 collectLatest
:
var latest = -1
(1..10).asFlow()
.collectLatest {
latest = it
}
assertEquals(10, latest)
✅ collectLatest
会取消正在执行的 lambda,一旦新值到来。
类似操作符还有
transformLatest
、onEachLatest
等,统称*Latest
家族。
5. 组合多个 Flow
有时我们需要合并两个数据流,Kotlin 提供了两种主要方式。
zip
:按顺序配对
zip
将两个流按发射顺序一一对应组合:
val codes = listOf(80, 69, 65, 67).asFlow()
val symbols = listOf('P', 'A', 'C', 'E').asFlow()
val list = buildList {
codes.zip(symbols) { code, symbol -> add("$code -> $symbol") }.collect()
}
assertEquals(listOf("80 -> P", "69 -> A", "65 -> C", "67 -> E"), list)
输出长度由较短的那个流决定。
combine
:任一更新即重组
combine
更适合“状态合并”场景。只要任意一个流发出新值,就会立即重新计算:
val arrowsI = listOf("v", "<", "^", ">").asFlow().map { delay(30); it }
val arrowsII = listOf("v", ">", "^", "<").asFlow().map { delay(20); it }
arrowsI.combine(arrowsII) { one, two -> "$one $two" }
.collect { println(it) }
输出示例:
v v
v >
< >
< ^
^ <
> <
✅ 适用场景:UI 中两个动态数据源联动刷新。
6. 异常处理
Flow
中任何环节都可能抛出异常 —— 发射、转换、收集阶段都有风险。
错误做法:在 try-catch 中 emit
// ❌ 不推荐
flow {
try {
emit(fetchData())
} catch (e: Exception) {
emit(defaultValue)
}
}
这违反了 Flow
的异常透明原则,可能导致异常无法传递到下游。
正确做法:使用 catch
操作符
val result = (1..10).asFlow()
.transform {
if (it > 3) throw IllegalArgumentException("Too much")
emit(it)
}
.catch {
when (it) {
is IllegalArgumentException -> emit(3)
else -> throw it // 未知异常继续抛出
}
}
.toList()
assertEquals(listOf(1, 2, 3, 3), result)
✅ catch
只捕获其上游的异常,下游仍可继续抛出。
7. Flow 的背压机制
Flow
天然支持背压(Back-pressure),这是它相比 Channel
或传统回调的一大优势。
因为 emit()
是挂起函数,只有当消费者准备好接收下一个值时,生产者才会继续执行:
val fastFlow = flow {
(1..10).forEach {
println("Before waiting: $it")
delay(10)
println("About to emit: $it")
emit(Char(60 + it))
println("After emitting: $it")
}
}
fastFlow.collect {
Thread.sleep(100) // 模拟耗时操作
println("Collected $it")
}
输出节选:
Before waiting: 1
About to emit: 1
Collected =
After emitting: 1
Before waiting: 2
About to emit: 2
Collected >
After emitting: 2
✅ 可见:每发送一个值后,必须等 collect
结束才能继续下一轮循环。
8. Flow 上下文切换
虽然 Flow
本身不持有线程资源,但它的执行依赖于协程上下文。
默认行为:继承收集者上下文
flow {
assertTrue { "main" in Thread.currentThread().name } // 默认运行在 collector 的 context
emit("Hello")
}.collect {
println(it)
}
使用 flowOn()
切换发射上下文
若想让发射逻辑运行在 IO 线程,使用 flowOn()
:
flow {
(1..10).forEach { emit(it) }
assertTrue { "DefaultDispatcher-worker" in Thread.currentThread().name }
}
.flowOn(Dispatchers.IO)
.collect {
println(it)
assertTrue { "main" in Thread.currentThread().name }
}
✅ flowOn()
可多次调用,最近的一个生效。
⚠️ 踩坑提醒:flowOn()
影响的是其前面的所有操作。如果写在最后,只会影响前面的链式调用。
9. 总结
Flow
是 Kotlin 协程生态中用于异步数据流处理的核心抽象,具备以下特点:
✅ 冷流设计:无订阅则无执行,安全高效
✅ 类比 Sequence:API 设计熟悉,学习成本低
✅ 天然背压支持:生产者不会压垮消费者
✅ 丰富的操作符:支持 map、filter、zip、combine、catch 等
✅ 上下文可控:通过 flowOn()
灵活调度线程
📌 最佳实践建议:
- 避免在
try-catch
中直接emit
- 使用
*Latest
操作符处理高频更新 - 合理使用
conflate()
减少不必要的处理 - 多个数据源联动优先考虑
combine
掌握 Flow
,就等于掌握了 Kotlin 异步编程的“高级语法”,无论是网络请求、数据库监听还是 UI 状态管理,都能游刃有余。