1. 简介

在 Kotlin 中,协程(Coroutines)和 Flow 为处理异步操作和数据流编程提供了强大支持。而当我们需要协调多个异步数据源时,如何高效地合并多个 Flow 就成了关键技能。

实际开发中,我们经常遇到这样的场景:UI 需要同时监听网络请求和本地数据库更新、传感器数据需要与用户输入实时结合……这些都需要将多个 Flow 进行组合。本文将系统介绍 Kotlin 中常用的 Flow 合并方式,帮你避开“并发逻辑混乱”这类常见踩坑。


2. Flow 基础回顾

Flow 是 Kotlin 协程的一部分,用于表示可异步发射多个值的数据流。它类似于 Sequence,但专为异步环境设计 —— 每次 emit() 都可能是 suspend 的。

简单来说:

  • ✅ 单个值用 suspend fun
  • ✅ 多个异步值用 Flow<T>
  • ❌ 不要用 List<Deferred<T>> 去模拟流式处理,那是反模式

3. 示例 Flow 构建

为了演示不同合并策略的效果,先定义两个简单的整数发射 Flow:

suspend fun sampleFlow1(): Flow<Int> = flow {
    repeat(5) {
        delay(1000)
        emit(it)
    }
}

suspend fun sampleFlow2(): Flow<Int> = flow {
    repeat(5) {
        delay(1500)
        emit(it * it)
    }
}

说明:

  • sampleFlow1():每 1 秒发射一次,值为 0,1,2,3,4
  • sampleFlow2():每 1.5 秒发射一次,值为 0,1,4,9,16

接下来我们看几种典型的合并方式。

3.1. 使用 zip() —— 成对同步发射

suspend fun main() {
    val combinedFlow = sampleFlow1().zip(sampleFlow2()) { first, second ->
        "($first, $second)"
    }
    combinedFlow.collect { println(it) }
}

核心行为
zip() 会等待两个 Flow 各自发出一个值后才进行合并,像拉链一样一一配对。如果一方发得慢,另一方就得等着。

适用场景:

  • ⚠️ 需要严格按序配对的双源数据(如:用户ID + 对应头像URL)
  • ❌ 不适合响应式 UI 更新(因为会阻塞快的一方)

输出结果:

(0, 0)
(1, 1)
(2, 4)

💡 只有前三组能对上节奏,后续因 sampleFlow2 发射间隔更长,在程序结束前未能触发第四次 zip。


3.2. 使用 combine() —— 最新值动态组合

suspend fun main() {
    val combinedFlow = sampleFlow1().combine(sampleFlow2()) { first, second ->
        "($first, $second)"
    }
    combinedFlow.collect { println(it) }
}

核心行为
只要任意一个 Flow 发出新值,combine() 就立即使用双方最新的值生成一条新数据。这是典型的“反应式”思维。

适用场景:

  • ✅ 实时搜索框:关键词 Flow + 筛选条件 Flow → 动态查询
  • ✅ 订阅多个状态变化并联动更新(ViewModel 中很常见)

输出结果:

(0, 0)
(1, 0)
(2, 0)
(2, 1)
(2, 4)

解释:

  • sampleFlow1 第三次发射 2 时,sampleFlow2 还是 0(2,0)
  • 接着 sampleFlow2 发射 1 → 触发 (2,1)
  • 再发射 4 → 触发 (2,4)

📌 注意:combine 是 stateful 的,它记住了每个 Flow 的最新值。


3.3. 使用 flatMapConcat() —— 串行依赖处理

suspend fun main() {
    val combinedFlow = sampleFlow1().flatMapConcat { value1 ->
        sampleFlow2().map { value2 ->
            "($value1, $value2)"
        }
    }
    combinedFlow.collect { println(it) }
}

核心行为
对于 sampleFlow1 的每一个值,都会顺序执行一次完整的 sampleFlow2() 流程,且必须等前一个子流完成才会处理下一个主流程值。

适用场景:

  • ✅ 请求链式接口:获取商品列表 → 逐个查询库存
  • ✅ 数据迁移任务:每个数据库分片依次处理

输出结果:

(0, 0)
(0, 1)
(0, 4)
(1, 0)
(1, 1)
(1, 4)
(2, 0)
(2, 1)
(2, 4)

⚠️ 虽然叫 flatMapConcat,但它其实是“外层阻塞式遍历”,性能较低,慎用于高频事件。


3.4. 使用 flatMapMerge() —— 并发扁平化合并

suspend fun main() {
    val combinedFlow = sampleFlow1().flatMapMerge { value1 ->
        sampleFlow2().map { value2 ->
            "($value1, $value2)"
        }
    }
    combinedFlow.collect { println(it) }
}

核心行为
flatMapConcat 类似,但所有子 Flow 并发执行,最终结果可能交错出现。你可以理解为“非阻塞版 flatMapConcat”。

参数说明:

  • 默认并发数为 DEFAULT_CONCURRENCY(通常是 16)
  • 可手动传参控制最大并发量:flatMapMerge(concurrency = 3)

适用场景:

  • ✅ 批量拉取资源(如:多个 API 并行调用)
  • ✅ 提升吞吐量的关键手段

输出结果(顺序不定):

(0, 0)
(1, 0)
(0, 1)
(2, 0)
(1, 1)
(0, 4)
(2, 1)
(1, 4)
(2, 4)

💡 因为 sampleFlow1 发射更快,所以 (1,x)(2,x) 很早就启动了各自的 inner flow。


3.5. 使用 flattenConcat() —— 流的串行展平

suspend fun main() {
    val combinedFlow = listOf(sampleFlow1(), sampleFlow2()).flattenConcat()
    combinedFlow.collect { println(it) }
}

核心行为
将一个 List<Flow<T>> 按顺序连接成单个 Flow,前一个 Flow 完结后才开始下一个

等价写法:

listOf(flowA, flowB).fold(emptyFlow()) { acc, f -> acc + f }

适用场景:

  • ✅ 日志聚合:按模块顺序读取多个日志流
  • ✅ 初始化任务:数据库 → 缓存 → 配置中心,依次加载

输出结果:

0
1
2
0
1
4

⚠️ 注意这不是 merge!它是完全串行的,不要期望并发加速。


3.6. 使用 merge() —— 多流并发合并

suspend fun main() {
    val combinedFlow = merge(sampleFlow1(), sampleFlow2())
    combinedFlow.collect { println(it) }
}

核心行为
来自任一流的数据一旦可用,立即发射,不保证顺序,也不等待其他流。这是最“自由”的合并方式。

适用场景:

  • ✅ 实时消息总线:合并聊天室、通知、系统事件
  • ✅ 监控面板:CPU、内存、网络流量多指标统一展示

输出示例:

0
0
1
1
2
4

💡 典型的“谁快谁先上”。适合只关心“有没有新数据”,不在乎“来自哪”或“是否完整”的场景。


4. 总结对比表

方法 触发时机 是否并发 典型用途
zip() 双方各发一值后配对 成对同步
combine() 任一流发新值,用最新值组合 响应式联动
flatMapConcat() 外层每项触发内层完整流程 串行依赖任务
flatMapMerge() 外层每项触发并发内层流程 高吞吐批量处理
flattenConcat() 按列表顺序串行拼接所有流 有序流合并
merge() 任一流有值就发,完全自由 多源事件聚合

📌 选择建议

  • 想“等齐了再处理”?→ 用 zip
  • 想“有变动就响应”?→ 用 combine
  • 想“一个接一个跑完”?→ 用 flattenConcatflatMapConcat
  • 想“一起跑,越快越好”?→ 用 mergeflatMapMerge

掌握这些组合技巧,你就能游刃有余地构建复杂的异步数据管道,再也不怕“多个 LiveData 改来改去”那种历史遗留问题了 😄


原始标题:Combining Multiple Kotlin Flows