1. 概述

在处理大文件或无限数据集时,如果一次性将所有数据加载到内存中,很容易导致堆溢出(OOM)问题。流式处理(Streaming)正是为了解决这一难题:它允许我们逐步加载并处理数据,而无需一次性将其全部载入内存。

FS2 是一个基于函数式编程(FP)范式的 Scala 流处理库,提供了完全函数式的方式来构建和操作流。

2. 流(Streams)

假设我们要处理一个超过 90GB 的大文件,统计其中每个单词出现的次数,并将结果写入另一个文件。一种天真的做法是直接将整个文件读入内存再进行统计:

def readAndWriteFile(readFrom: String, writeTo: String) = {
  val counts = mutable.Map.empty[String, Int]
  val fileSource = scala.io.Source.fromFile(readFrom)
  try {
    fileSource
      .getLines()
      .toList
      .flatMap(_.split("\\W+"))
      .foreach { word =>
        counts += (word -> (counts.getOrElse(word, 0) + 1))
      }
  } finally {
    fileSource.close()
  }
  val fileContent = counts.foldLeft("") {
   case (accumulator, (word, count)) =>
     accumulator + s"$word = $count\n"
 }
  val writer = new PrintWriter(new File(writeTo))
  writer.write(fileContent)
  writer.close()
}

运行这段代码,大概率会遇到如下错误:

Exception in thread "main" java.lang.OutOfMemoryError: Java heap space

这是因为我们调用了 .toList,强制将整个文件加载进内存。而使用流式处理,我们可以按需加载和处理数据,避免 OOM。

Scala 中的 Stream 本身就能实现这一点。但标准库中的 Stream 在处理异步、并行、节流等高级特性时力不从心。这时候就需要借助像 Akka StreamsFS2 这样的专业库。

流是惰性求值的,这意味着我们调用 map 等操作时,并不会立即执行,而是构建一个延迟执行的计算图。

⚠️ 如果想避免 OOM,应该使用 .toStream 替代 .toList

3. FS2 流

FS2 提供了一种函数式的方式来定义流,它支持资源管理、并行计算、组合性等高级特性。

FS2 的核心类型是:

Stream[+F[_], +O]
  • F[_]:表示上下文环境(通常是 IO)
  • O:表示输出的数据类型

3.1. Source

Source 是流的起点,负责从文件、数据库、网络等地方逐步获取数据

✅ FS2 是“拉模型”(pull-based):下游需要数据时,才向上游请求。

示例:创建一个简单的 Source

val intStream: Stream[Pure, Int] = Stream(1,2,3,4,5)

在我们的单词统计示例中,我们通过 Files[IO].readAll 创建了一个 Source:

val source: Stream[IO, Byte] = Files[IO].readAll(fs2Path)

这表示我们使用 IO 上下文,逐步读取文件字节流。

3.2. Pipe

Pipe 是一个流转换器,它接收一个流并输出另一个流。可以看作是函数:Stream[F, I] => Stream[F, O]

示例:定义一个简单的 Pipe

val add1Pipe : Pipe[Pure, Int, Int] = _.map(_ + 1)

在我们的示例中,Pipe 的作用是:

  • 将字节解码为字符串
  • 按行分割
  • 按单词分割
  • 统计词频
  • 构造结果字符串
  • 再次编码为字节流
val pipe : Pipe[IO, Byte, Byte] = src =>
     src.through(text.utf8.decode)
     .through(text.lines)
     .flatMap(line => Stream.apply(line.split("\\W+"): _*))
     .fold(Map.empty[String, Int]) { (count, word) =>
       count + (word -> (count.getOrElse(word, 0) + 1))
     }
     .map(_.foldLeft("") {
         case (accumulator, (word, count)) =>
           accumulator + s"$word = $count\n"
       })
     .through(text.utf8.encode)

3.3. Sink

Sink 是流的终点,通常用于执行副作用操作,比如写入文件、数据库、打印等。

✅ Sink 是一种特殊的 Pipe,其输出类型为 Unit

val sink : Pipe[IO, Byte, Unit] = Files[IO].writeAll(Path(writeTo))

3.4. 连接流元素

通过 .through(pipe) 方法,我们可以将 Source、Pipe、Sink 串联起来,形成一个完整的流处理链:

val stream : Stream[IO, Unit] = 
   source
    .through(pipe)
    .through(sink)

4. 高级特性

4.1. 批处理(Batching)

FS2 中通过 Chunk 来实现批处理:

Stream((1 to 100) : _*)
  .chunkN(10) // 每10个元素为一组
  .map(println)
  .compile
  .drain

4.2. 异步处理(Asynchronicity)

FS2 支持并发处理,例如使用 parEvalMapUnordered

def writeToSocket[F[_] : Async](chunk: Chunk[String]): F[Unit] =
  Async[F].async_ { callback =>
     println(s"[thread: ${Thread.currentThread().getName}] :: Writing $chunk to socket")
     callback(Right(()))
  }

Stream((1 to 100).map(_.toString): _*)
  .chunkN(10)
  .covary[IO]
  .parEvalMapUnordered(10)(writeToSocket[IO])
  .compile
  .drain

parEvalMapUnordered 可以并行执行副作用,提升性能。

5. 总结

本文介绍了 FS2 如何通过函数式的方式优雅地处理流式数据。它不仅避免了 OOM,还支持异步、批处理、资源管理等高级特性,非常适合构建高性能、可组合的流处理应用。

源码示例可从 GitHub 获取。


原始标题:Introduction to FS2: Functional Streams for Scala