1. 简介
在 Kotlin 协程中,Flow
是一种异步数据流,用于按顺序发射多个值。它类似于响应式流(Reactive Streams)中的“冷发布者”(cold Publisher),也就是说:只有当调用终端操作符(terminal operator)如 collect()
或 reduce()
时,Flow 才会真正开始执行。
除了终端操作符,Flow 还提供了一系列中间操作符(intermediate operators),例如 map()
、filter()
、take()
、zip()
等,用于对数据流进行链式转换处理。
本文重点关注一组特殊的中间操作符 —— 扁平化操作符(flattening operators),它们的作用是将一个 Flow<Flow<T>>
类型的嵌套流展平为单层 Flow<T>
,便于后续统一处理。
这类操作符看似功能相近,实则行为差异显著,稍不注意就容易踩坑 ❌。下面我们就来深入对比 flatMapConcat()
、flatMapMerge()
和 flatMapLatest()
的使用场景与核心区别。
2. Flow 扁平化操作符详解
当我们处理异步业务逻辑时,经常会遇到这样的情况:每个上游发射的值都会触发一个新的异步请求,从而生成一个新的 Flow。此时原始 Flow 就变成了 Flow<Flow<T>>
结构。
为了简化下游处理,我们需要把这些“流中流”合并成一个单一的、可消费的 Flow。这就是扁平化操作符存在的意义。
⚠️ 注意:虽然标准库中
Collection
和Sequence
提供了flatten()
和flatMap()
,但由于 Flow 具备异步特性,其扁平化方式更为复杂,因此 kotlinx.coroutines 提供了专门的并发安全版本。
接下来我们逐一分析三个关键操作符的行为特点。
2.1. flatMapConcat()
—— 串行合并,保证顺序 ✅
flatMapConcat()
的作用是对每个元素应用变换函数生成新的 Flow,并按顺序依次收集这些内部 Flow,前一个完成后再启动下一个。
它是 map()
+ flattenConcat()
的语法糖,适用于需要严格保持发射顺序的场景。
示例代码:
val result = flow { // flow builder
for (i in 1..3) {
delay(1) // pretend we are doing something useful here
emit(i) // emit next value
}
}.flatMapConcat { it ->
flow {
emit(it * 2)
delay(2)
emit(it * 2 + 1)
}
}.toList()
输出结果:
[2, 3, 4, 5, 6, 7]
行为解析:
- 第一个值
1
被映射为flow { emit(2); delay(2); emit(3) }
- 完整发射完
2 → 3
后,才继续处理2
和3
- 所以最终输出完全有序
✅ 适用场景:分页加载、依赖顺序执行的任务链、日志归集等要求顺序一致性的业务。
❌ 缺点:吞吐量低,整体耗时较长,因为必须等待前一个子流结束。
2.2. flatMapMerge()
—— 并发合并,提升性能 ✅
flatMapMerge()
会同时启动多个内部 Flow 并并发地收集它们的结果,然后将所有结果合并到一个输出流中。
它是 map()
+ flattenMerge()
的组合,支持通过 concurrency
参数控制最大并发数,默认值为 16(即 DEFAULT_CONCURRENCY
)。
示例代码:
val result = flow { // flow builder
for (i in 1..3) {
delay(1) // time consuming logic
emit(i)
}
}.flatMapMerge { it ->
flow {
emit(it * 2)
delay(2) // time consuming logic
emit(it * 2 + 1)
}
}.toList()
输出结果:
[2, 4, 3, 6, 5, 7]
行为解析:
- 值
1
触发子流:2 → [delay 2ms] → 3
- 在等待
3
发射期间,主 Flow 已经发出2
和3
,对应的子流也立即启动 - 所有子流并行运行,结果交错输出
📌 特别说明:
如果设置 concurrency = 1
,flatMapMerge()
的行为将退化为 flatMapConcat()
,即串行执行。
✅ 适用场景:并行网络请求、独立任务批量处理、追求高吞吐的后台任务。
⚠️ 注意事项:
- 输出无序,不能依赖顺序
- 高并发可能增加系统负载,建议根据实际资源限制调整
concurrency
值
2.3. flatMapLatest()
—— 只保留最新,自动取消旧任务 ✅
flatMapLatest()
是最特殊的一个操作符。每当上游发射新值时,它会立即取消之前正在运行的内部 Flow,并启动一个基于新值的新 Flow。
这种“取最新、舍旧”的语义非常适合处理用户频繁交互导致的过期请求问题。
示例代码:
val result = flow { // flow builder
for (i in 1..3) {
delay(1) // time consuming logic
emit(i) // emit next value
}
}.flatMapLatest { it ->
flow {
emit(it * 2)
delay(2)
emit(it * 2 + 1)
}
}.toList()
输出结果:
[2, 4, 6, 7]
行为解析:
- 发射
1
→ 启动子流 A:准备输出2 → 3
- 1ms 后发射
2
→ 取消子流 A,启动子流 B:输出4 → 5
- 再 1ms 后发射
3
→ 取消子流 B,启动子流 C:输出6 → 7
- 子流 C 正常完成,最终输出
[2, 4, 6, 7]
🔍 为什么没有
3
和5
?因为它们所属的 Flow 被提前取消了!
✅ 适用场景:
- 搜索框实时搜索(防抖)
- 页面刷新/重试机制
- 用户输入预览(如代码编辑器智能提示)
⚠️ 踩坑提醒:
不要在 flatMapLatest()
中执行不可中断的操作(如非 suspend 函数、阻塞 IO),否则 cancellation 可能无法及时生效。
3. 总结对比
操作符 | 执行模式 | 是否保持顺序 | 是否并发 | 是否取消旧任务 | 推荐使用场景 |
---|---|---|---|---|---|
flatMapConcat() |
串行 | ✅ 是 | ❌ 否 | ❌ 否 | 顺序敏感任务 |
flatMapMerge() |
并发(可控) | ❌ 否 | ✅ 是 | ❌ 否 | 高吞吐、独立任务并行处理 |
flatMapLatest() |
最新优先 | ❌ 否 | ⚠️ 局部 | ✅ 是 | 实时交互、防重复提交、取消过期请求 |
📌 核心选择建议:
- 要顺序?→ 选
flatMapConcat()
- 要性能?→ 选
flatMapMerge(concurrency)
- 要响应最新事件?→ 选
flatMapLatest()
💡 小技巧:结合
onEach { println("emitted: $it") }
和delay()
调试 Flow 行为非常有效。
源码示例地址
文中所有示例代码均可在 GitHub 获取:
👉 https://github.com/Baeldung/kotlin-tutorials/tree/master/core-kotlin-modules/core-kotlin-concurrency-2
建议动手跑一遍,加深理解。毕竟 Flow 的并发行为光看文字容易懵 😅。