1. 概述
在处理大文件或无限数据集时,如果一次性将所有数据加载到内存中,很容易导致堆溢出(OOM)问题。流式处理(Streaming)正是为了解决这一难题:它允许我们逐步加载并处理数据,而无需一次性将其全部载入内存。
FS2 是一个基于函数式编程(FP)范式的 Scala 流处理库,提供了完全函数式的方式来构建和操作流。
2. 流(Streams)
假设我们要处理一个超过 90GB 的大文件,统计其中每个单词出现的次数,并将结果写入另一个文件。一种天真的做法是直接将整个文件读入内存再进行统计:
def readAndWriteFile(readFrom: String, writeTo: String) = {
val counts = mutable.Map.empty[String, Int]
val fileSource = scala.io.Source.fromFile(readFrom)
try {
fileSource
.getLines()
.toList
.flatMap(_.split("\\W+"))
.foreach { word =>
counts += (word -> (counts.getOrElse(word, 0) + 1))
}
} finally {
fileSource.close()
}
val fileContent = counts.foldLeft("") {
case (accumulator, (word, count)) =>
accumulator + s"$word = $count\n"
}
val writer = new PrintWriter(new File(writeTo))
writer.write(fileContent)
writer.close()
}
运行这段代码,大概率会遇到如下错误:
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
这是因为我们调用了 .toList
,强制将整个文件加载进内存。而使用流式处理,我们可以按需加载和处理数据,避免 OOM。
Scala 中的 Stream 本身就能实现这一点。但标准库中的 Stream
在处理异步、并行、节流等高级特性时力不从心。这时候就需要借助像 Akka Streams 或 FS2 这样的专业库。
✅ 流是惰性求值的,这意味着我们调用 map
等操作时,并不会立即执行,而是构建一个延迟执行的计算图。
⚠️ 如果想避免 OOM,应该使用 .toStream
替代 .toList
。
3. FS2 流
FS2 提供了一种函数式的方式来定义流,它支持资源管理、并行计算、组合性等高级特性。
FS2 的核心类型是:
Stream[+F[_], +O]
F[_]
:表示上下文环境(通常是 IO)O
:表示输出的数据类型
3.1. Source
Source 是流的起点,负责从文件、数据库、网络等地方逐步获取数据。
✅ FS2 是“拉模型”(pull-based):下游需要数据时,才向上游请求。
示例:创建一个简单的 Source
val intStream: Stream[Pure, Int] = Stream(1,2,3,4,5)
在我们的单词统计示例中,我们通过 Files[IO].readAll
创建了一个 Source:
val source: Stream[IO, Byte] = Files[IO].readAll(fs2Path)
这表示我们使用 IO
上下文,逐步读取文件字节流。
3.2. Pipe
Pipe 是一个流转换器,它接收一个流并输出另一个流。可以看作是函数:Stream[F, I] => Stream[F, O]
示例:定义一个简单的 Pipe
val add1Pipe : Pipe[Pure, Int, Int] = _.map(_ + 1)
在我们的示例中,Pipe 的作用是:
- 将字节解码为字符串
- 按行分割
- 按单词分割
- 统计词频
- 构造结果字符串
- 再次编码为字节流
val pipe : Pipe[IO, Byte, Byte] = src =>
src.through(text.utf8.decode)
.through(text.lines)
.flatMap(line => Stream.apply(line.split("\\W+"): _*))
.fold(Map.empty[String, Int]) { (count, word) =>
count + (word -> (count.getOrElse(word, 0) + 1))
}
.map(_.foldLeft("") {
case (accumulator, (word, count)) =>
accumulator + s"$word = $count\n"
})
.through(text.utf8.encode)
3.3. Sink
Sink 是流的终点,通常用于执行副作用操作,比如写入文件、数据库、打印等。
✅ Sink 是一种特殊的 Pipe,其输出类型为 Unit
。
val sink : Pipe[IO, Byte, Unit] = Files[IO].writeAll(Path(writeTo))
3.4. 连接流元素
通过 .through(pipe)
方法,我们可以将 Source、Pipe、Sink 串联起来,形成一个完整的流处理链:
val stream : Stream[IO, Unit] =
source
.through(pipe)
.through(sink)
4. 高级特性
4.1. 批处理(Batching)
FS2 中通过 Chunk
来实现批处理:
Stream((1 to 100) : _*)
.chunkN(10) // 每10个元素为一组
.map(println)
.compile
.drain
4.2. 异步处理(Asynchronicity)
FS2 支持并发处理,例如使用 parEvalMapUnordered
:
def writeToSocket[F[_] : Async](chunk: Chunk[String]): F[Unit] =
Async[F].async_ { callback =>
println(s"[thread: ${Thread.currentThread().getName}] :: Writing $chunk to socket")
callback(Right(()))
}
Stream((1 to 100).map(_.toString): _*)
.chunkN(10)
.covary[IO]
.parEvalMapUnordered(10)(writeToSocket[IO])
.compile
.drain
✅ parEvalMapUnordered
可以并行执行副作用,提升性能。
5. 总结
本文介绍了 FS2 如何通过函数式的方式优雅地处理流式数据。它不仅避免了 OOM,还支持异步、批处理、资源管理等高级特性,非常适合构建高性能、可组合的流处理应用。
源码示例可从 GitHub 获取。