1. 简介

在 Kotlin 协程中,Flow 是一种异步数据流,用于按顺序发射多个值。它类似于响应式流(Reactive Streams)中的“冷发布者”(cold Publisher),也就是说:只有当调用终端操作符(terminal operator)如 collect()reduce() 时,Flow 才会真正开始执行

除了终端操作符,Flow 还提供了一系列中间操作符(intermediate operators),例如 map()filter()take()zip() 等,用于对数据流进行链式转换处理。

本文重点关注一组特殊的中间操作符 —— 扁平化操作符(flattening operators),它们的作用是将一个 Flow<Flow<T>> 类型的嵌套流展平为单层 Flow<T>,便于后续统一处理。

这类操作符看似功能相近,实则行为差异显著,稍不注意就容易踩坑 ❌。下面我们就来深入对比 flatMapConcat()flatMapMerge()flatMapLatest() 的使用场景与核心区别。


2. Flow 扁平化操作符详解

当我们处理异步业务逻辑时,经常会遇到这样的情况:每个上游发射的值都会触发一个新的异步请求,从而生成一个新的 Flow。此时原始 Flow 就变成了 Flow<Flow<T>> 结构。

为了简化下游处理,我们需要把这些“流中流”合并成一个单一的、可消费的 Flow。这就是扁平化操作符存在的意义。

⚠️ 注意:虽然标准库中 CollectionSequence 提供了 flatten()flatMap(),但由于 Flow 具备异步特性,其扁平化方式更为复杂,因此 kotlinx.coroutines 提供了专门的并发安全版本。

接下来我们逐一分析三个关键操作符的行为特点。

2.1. flatMapConcat() —— 串行合并,保证顺序 ✅

flatMapConcat() 的作用是对每个元素应用变换函数生成新的 Flow,并按顺序依次收集这些内部 Flow,前一个完成后再启动下一个。

它是 map() + flattenConcat() 的语法糖,适用于需要严格保持发射顺序的场景。

示例代码:

val result = flow { // flow builder
    for (i in 1..3) {
        delay(1) // pretend we are doing something useful here
        emit(i) // emit next value
    }
}.flatMapConcat { it ->
    flow {
        emit(it * 2)
        delay(2)
        emit(it * 2 + 1)
    }
}.toList()

输出结果:

[2, 3, 4, 5, 6, 7]

行为解析:

  • 第一个值 1 被映射为 flow { emit(2); delay(2); emit(3) }
  • 完整发射完 2 → 3 后,才继续处理 23
  • 所以最终输出完全有序

✅ 适用场景:分页加载、依赖顺序执行的任务链、日志归集等要求顺序一致性的业务。

❌ 缺点:吞吐量低,整体耗时较长,因为必须等待前一个子流结束。


2.2. flatMapMerge() —— 并发合并,提升性能 ✅

flatMapMerge() 会同时启动多个内部 Flow 并并发地收集它们的结果,然后将所有结果合并到一个输出流中。

它是 map() + flattenMerge() 的组合,支持通过 concurrency 参数控制最大并发数,默认值为 16(即 DEFAULT_CONCURRENCY)。

示例代码:

val result = flow { // flow builder
    for (i in 1..3) {
        delay(1) // time consuming logic
        emit(i)
    }
}.flatMapMerge { it ->
    flow {
        emit(it * 2)
        delay(2) // time consuming logic
        emit(it * 2 + 1)
    }
}.toList()

输出结果:

[2, 4, 3, 6, 5, 7]

行为解析:

  • 1 触发子流:2 → [delay 2ms] → 3
  • 在等待 3 发射期间,主 Flow 已经发出 23,对应的子流也立即启动
  • 所有子流并行运行,结果交错输出

📌 特别说明:
如果设置 concurrency = 1flatMapMerge() 的行为将退化为 flatMapConcat(),即串行执行。

✅ 适用场景:并行网络请求、独立任务批量处理、追求高吞吐的后台任务。

⚠️ 注意事项:

  • 输出无序,不能依赖顺序
  • 高并发可能增加系统负载,建议根据实际资源限制调整 concurrency

2.3. flatMapLatest() —— 只保留最新,自动取消旧任务 ✅

flatMapLatest() 是最特殊的一个操作符。每当上游发射新值时,它会立即取消之前正在运行的内部 Flow,并启动一个基于新值的新 Flow。

这种“取最新、舍旧”的语义非常适合处理用户频繁交互导致的过期请求问题。

示例代码:

val result = flow { // flow builder
    for (i in 1..3) {
        delay(1) // time consuming logic
        emit(i) // emit next value
    }
}.flatMapLatest { it ->
    flow {
        emit(it * 2)
        delay(2)
        emit(it * 2 + 1)
    }
}.toList()

输出结果:

[2, 4, 6, 7]

行为解析:

  1. 发射 1 → 启动子流 A:准备输出 2 → 3
  2. 1ms 后发射 2取消子流 A,启动子流 B:输出 4 → 5
  3. 再 1ms 后发射 3取消子流 B,启动子流 C:输出 6 → 7
  4. 子流 C 正常完成,最终输出 [2, 4, 6, 7]

🔍 为什么没有 35?因为它们所属的 Flow 被提前取消了!

✅ 适用场景:

  • 搜索框实时搜索(防抖)
  • 页面刷新/重试机制
  • 用户输入预览(如代码编辑器智能提示)

⚠️ 踩坑提醒:
不要在 flatMapLatest() 中执行不可中断的操作(如非 suspend 函数、阻塞 IO),否则 cancellation 可能无法及时生效。


3. 总结对比

操作符 执行模式 是否保持顺序 是否并发 是否取消旧任务 推荐使用场景
flatMapConcat() 串行 ✅ 是 ❌ 否 ❌ 否 顺序敏感任务
flatMapMerge() 并发(可控) ❌ 否 ✅ 是 ❌ 否 高吞吐、独立任务并行处理
flatMapLatest() 最新优先 ❌ 否 ⚠️ 局部 ✅ 是 实时交互、防重复提交、取消过期请求

📌 核心选择建议:

  • 要顺序?→ 选 flatMapConcat()
  • 要性能?→ 选 flatMapMerge(concurrency)
  • 要响应最新事件?→ 选 flatMapLatest()

💡 小技巧:结合 onEach { println("emitted: $it") }delay() 调试 Flow 行为非常有效。


源码示例地址

文中所有示例代码均可在 GitHub 获取:
👉 https://github.com/Baeldung/kotlin-tutorials/tree/master/core-kotlin-modules/core-kotlin-concurrency-2

建议动手跑一遍,加深理解。毕竟 Flow 的并发行为光看文字容易懵 😅。


原始标题:Difference Between flatMapMerge(), flatMapConcat() and flatMapLatest() in Kotlin