1. 简介
在 Kotlin 中,协程(Coroutines)和 Flow 为处理异步操作和数据流编程提供了强大支持。而当我们需要协调多个异步数据源时,如何高效地合并多个 Flow 就成了关键技能。
实际开发中,我们经常遇到这样的场景:UI 需要同时监听网络请求和本地数据库更新、传感器数据需要与用户输入实时结合……这些都需要将多个 Flow 进行组合。本文将系统介绍 Kotlin 中常用的 Flow 合并方式,帮你避开“并发逻辑混乱”这类常见踩坑。
2. Flow 基础回顾
Flow 是 Kotlin 协程的一部分,用于表示可异步发射多个值的数据流。它类似于 Sequence
,但专为异步环境设计 —— 每次 emit()
都可能是 suspend 的。
简单来说:
- ✅ 单个值用
suspend fun
- ✅ 多个异步值用
Flow<T>
- ❌ 不要用
List<Deferred<T>>
去模拟流式处理,那是反模式
3. 示例 Flow 构建
为了演示不同合并策略的效果,先定义两个简单的整数发射 Flow:
suspend fun sampleFlow1(): Flow<Int> = flow {
repeat(5) {
delay(1000)
emit(it)
}
}
suspend fun sampleFlow2(): Flow<Int> = flow {
repeat(5) {
delay(1500)
emit(it * it)
}
}
说明:
sampleFlow1()
:每 1 秒发射一次,值为0,1,2,3,4
sampleFlow2()
:每 1.5 秒发射一次,值为0,1,4,9,16
接下来我们看几种典型的合并方式。
3.1. 使用 zip() —— 成对同步发射
suspend fun main() {
val combinedFlow = sampleFlow1().zip(sampleFlow2()) { first, second ->
"($first, $second)"
}
combinedFlow.collect { println(it) }
}
✅ 核心行为:zip()
会等待两个 Flow 各自发出一个值后才进行合并,像拉链一样一一配对。如果一方发得慢,另一方就得等着。
适用场景:
- ⚠️ 需要严格按序配对的双源数据(如:用户ID + 对应头像URL)
- ❌ 不适合响应式 UI 更新(因为会阻塞快的一方)
输出结果:
(0, 0)
(1, 1)
(2, 4)
💡 只有前三组能对上节奏,后续因
sampleFlow2
发射间隔更长,在程序结束前未能触发第四次 zip。
3.2. 使用 combine() —— 最新值动态组合
suspend fun main() {
val combinedFlow = sampleFlow1().combine(sampleFlow2()) { first, second ->
"($first, $second)"
}
combinedFlow.collect { println(it) }
}
✅ 核心行为:
只要任意一个 Flow 发出新值,combine()
就立即使用双方最新的值生成一条新数据。这是典型的“反应式”思维。
适用场景:
- ✅ 实时搜索框:关键词 Flow + 筛选条件 Flow → 动态查询
- ✅ 订阅多个状态变化并联动更新(ViewModel 中很常见)
输出结果:
(0, 0)
(1, 0)
(2, 0)
(2, 1)
(2, 4)
解释:
sampleFlow1
第三次发射2
时,sampleFlow2
还是0
→(2,0)
- 接着
sampleFlow2
发射1
→ 触发(2,1)
- 再发射
4
→ 触发(2,4)
📌 注意:combine 是 stateful 的,它记住了每个 Flow 的最新值。
3.3. 使用 flatMapConcat() —— 串行依赖处理
suspend fun main() {
val combinedFlow = sampleFlow1().flatMapConcat { value1 ->
sampleFlow2().map { value2 ->
"($value1, $value2)"
}
}
combinedFlow.collect { println(it) }
}
✅ 核心行为:
对于 sampleFlow1
的每一个值,都会顺序执行一次完整的 sampleFlow2()
流程,且必须等前一个子流完成才会处理下一个主流程值。
适用场景:
- ✅ 请求链式接口:获取商品列表 → 逐个查询库存
- ✅ 数据迁移任务:每个数据库分片依次处理
输出结果:
(0, 0)
(0, 1)
(0, 4)
(1, 0)
(1, 1)
(1, 4)
(2, 0)
(2, 1)
(2, 4)
⚠️ 虽然叫
flatMapConcat
,但它其实是“外层阻塞式遍历”,性能较低,慎用于高频事件。
3.4. 使用 flatMapMerge() —— 并发扁平化合并
suspend fun main() {
val combinedFlow = sampleFlow1().flatMapMerge { value1 ->
sampleFlow2().map { value2 ->
"($value1, $value2)"
}
}
combinedFlow.collect { println(it) }
}
✅ 核心行为:
与 flatMapConcat
类似,但所有子 Flow 并发执行,最终结果可能交错出现。你可以理解为“非阻塞版 flatMapConcat”。
参数说明:
- 默认并发数为
DEFAULT_CONCURRENCY
(通常是 16) - 可手动传参控制最大并发量:
flatMapMerge(concurrency = 3)
适用场景:
- ✅ 批量拉取资源(如:多个 API 并行调用)
- ✅ 提升吞吐量的关键手段
输出结果(顺序不定):
(0, 0)
(1, 0)
(0, 1)
(2, 0)
(1, 1)
(0, 4)
(2, 1)
(1, 4)
(2, 4)
💡 因为
sampleFlow1
发射更快,所以(1,x)
和(2,x)
很早就启动了各自的 inner flow。
3.5. 使用 flattenConcat() —— 流的串行展平
suspend fun main() {
val combinedFlow = listOf(sampleFlow1(), sampleFlow2()).flattenConcat()
combinedFlow.collect { println(it) }
}
✅ 核心行为:
将一个 List<Flow<T>>
按顺序连接成单个 Flow,前一个 Flow 完结后才开始下一个。
等价写法:
listOf(flowA, flowB).fold(emptyFlow()) { acc, f -> acc + f }
适用场景:
- ✅ 日志聚合:按模块顺序读取多个日志流
- ✅ 初始化任务:数据库 → 缓存 → 配置中心,依次加载
输出结果:
0
1
2
0
1
4
⚠️ 注意这不是 merge!它是完全串行的,不要期望并发加速。
3.6. 使用 merge() —— 多流并发合并
suspend fun main() {
val combinedFlow = merge(sampleFlow1(), sampleFlow2())
combinedFlow.collect { println(it) }
}
✅ 核心行为:
来自任一流的数据一旦可用,立即发射,不保证顺序,也不等待其他流。这是最“自由”的合并方式。
适用场景:
- ✅ 实时消息总线:合并聊天室、通知、系统事件
- ✅ 监控面板:CPU、内存、网络流量多指标统一展示
输出示例:
0
0
1
1
2
4
💡 典型的“谁快谁先上”。适合只关心“有没有新数据”,不在乎“来自哪”或“是否完整”的场景。
4. 总结对比表
方法 | 触发时机 | 是否并发 | 典型用途 |
---|---|---|---|
zip() |
双方各发一值后配对 | 否 | 成对同步 |
combine() |
任一流发新值,用最新值组合 | 否 | 响应式联动 |
flatMapConcat() |
外层每项触发内层完整流程 | 否 | 串行依赖任务 |
flatMapMerge() |
外层每项触发并发内层流程 | ✅ | 高吞吐批量处理 |
flattenConcat() |
按列表顺序串行拼接所有流 | 否 | 有序流合并 |
merge() |
任一流有值就发,完全自由 | ✅ | 多源事件聚合 |
📌 选择建议:
- 想“等齐了再处理”?→ 用
zip
- 想“有变动就响应”?→ 用
combine
- 想“一个接一个跑完”?→ 用
flattenConcat
或flatMapConcat
- 想“一起跑,越快越好”?→ 用
merge
或flatMapMerge
掌握这些组合技巧,你就能游刃有余地构建复杂的异步数据管道,再也不怕“多个 LiveData 改来改去”那种历史遗留问题了 😄