1. Overview
Streaming enables us to incrementally load big or infinite data sets that wouldn’t typically fit in memory and operate on them without blowing our heap or running into an OOM error.
The FS2 streaming library helps achieve this goal in a fully functional way by taking advantage of functional programming (FP) paradigms.
2. Streams
Let’s say we have a large data file of over 90 GB and we’re tasked with writing a program that counts the total number of words in the file and writes the result to another file. A naive approach will be to load the file into memory and then try to count the words.
Here’s an example of trying to count the words in our large file in a non-functional way:
def readAndWriteFile(readFrom: String, writeTo: String) = {
val counts = mutable.Map.empty[String, Int]
val fileSource = scala.io.Source.fromFile(readFrom)
try {
fileSource
.getLines()
.toList
.flatMap(_.split("\\W+"))
.foreach { word =>
counts += (word -> (counts.getOrElse(word, 0) + 1))
}
} finally {
fileSource.close()
}
val fileContent = counts.foldLeft("") {
case (accumulator, (word, count)) =>
accumulator + s"$word = $count\n"
}
val writer = new PrintWriter(new File(writeTo))
writer.write(fileContent)
writer.close()
}
If we run this code, we’ll get a runtime error:
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
This error tells us that our file was too large and couldn’t fit into memory. We called toList on the iterator, which fetched all the data from the file into memory.
Streams in Scala help to solve this problem by incrementally loading the data and providing APIs to help process each chunk of data. Now, rather than loading the entire file into memory, we load data in configurable chunks and process them as they come in.
Streaming helps us ingest, process, analyze, and store data in a quick and responsive manner.
Streams typically define APIs similar to that of regular Scala collections such as List, or Vector. Higher-order functions are defined on these streams such as map, flatMap, or collect for data processing.
Streams are also useful when interacting with other data sources. Imagine we have a REST API that is used to download the large data we just described above. If we attempt to load all the data into our server before pushing it to the client, we could easily run out of memory on our server or slow down other clients due to the excessive amount of memory being used to service one client. But if we stream the file in small chunks, we can easily service multiple clients equally with a lower chance of running out of memory.
It’s important to know that streams are lazily evaluated. When we apply a higher-order function such as map, we aren’t creating the whole stream immediately — instead, the stream is lazily constructed.
One way we could have prevented memory errors in our above example would have been to call toStream instead of toList on the iterator.
There are some advanced features that are needed in day-to-day programming that the standard library’s Stream doesn’t provide, such as ease of asynchronous and parallel programming, or throttling. This is where we leverage the power of libraries that offer this functionality, such as Akka Streams, or FS2.
Let’s see how we can use the power of Fs2 streams to functionally solve our large data problem among other things.
3. FS2 Stream
FS2 provides a functional way to define a stream that involves incrementally loading and transforming data.
An FS2 stream is defined by the type Stream[+F[_], +O], which defines a stream that requires an environment (also called a context) of type F and outputs a value of O.
FS2 streams equip us with the ability to build streams that incorporate resource acquisition and release, parallel computation, and other things. They are also designed to provide modularity such that we can gradually build smaller elements of the stream that are easily composable and reusable.
We can use FS2 streams to define a stream that will perform our wordcount example:
def readAndWriteFile(readFrom: String, writeTo: String): Stream[IO, Unit] = {
val fs2Path = Path.fromNioPath(java.nio.file.Paths.get(readFrom))
val source: Stream[IO, Byte] = Files[IO].readAll(fs2Path)
val pipe: Pipe[IO, Byte, Byte] = src =>
src
.through(text.utf8.decode)
.through(text.lines)
.flatMap(line => Stream.apply(line.split("\\W+"): _*))
.fold(Map.empty[String, Int]) { (count, word) =>
count + (word -> (count.getOrElse(word, 0) + 1))
}
.map(_.foldLeft("") { case (accumulator, (word, count)) =>
accumulator + s"$word = $count\n"
})
.through(text.utf8.encode)
val sink: Pipe[IO, Byte, Unit] = Files[IO].writeAll(Path(writeTo))
val stream: Stream[IO, Unit] =
source
.through(pipe)
.through(sink)
stream
}
The beauty of using FS2 is that we were able to achieve the same goal in a functional manner — there are no mutable variables anywhere.
This example can look a little daunting, especially if this is the first time we’re using FS2. So, we’ll go over each element of the stream.
A good way to understand streams is to think of a stream in terms of a Source, Pipe, and Sink, similar to Akka Streams, that form a well-connected ETL pipeline.
3.1. Source
We can think of a Source as the starting or entry point of our stream with one open output that is responsible for incrementally fetching data from anything ranging from a file, database, or socket. It can comprise any number of internal sources and transformations that are wired together.
The FS2 stream model is a “pull” model, which means that “downstream” functions or parts call “upstream” functions to obtain data when needed.
What this means is that the Source, which is responsible for loading the data, loads data if and only if data is needed further down in the processing pipeline. For example, if an error occurs downstream or an exception is thrown and not handled, the Source stops loading any more data and typically releases acquired resources.
The easiest way of defining a Source is by using its apply method:
val intStream: Stream[Pure, Int] = Stream(1,2,3,4,5)
We just defined a stream that doesn’t need a context, hence the Pure F type, and emits Ints.
In our word count example, we defined a Source using the readAll() method which takes the file path we want to read from, a blocker that acts like an ExecutionContext, and the size of data we want to read each time the downstream pulls for data and then continuously produces Bytes:
val source: Stream[IO, Byte] = io.file.readAll[IO](fs2Path)
Looking at the type signature of the Source we defined, we’ll see that the very first parameter IO is the context or environment type, and the second parameter Byte is the type of data this Source emits downstream.
There must be at least one Source in every stream.
3.2. Pipe
A Pipe is a streaming element with open input and output. It is used as a processing step to transform data typically gotten from a Source. It can consist of a single or multiple processing steps.
A Pipe has the type signature Pipe[F[_], -I, +O], which is a type alias for Stream[F , I] => Stream[F , O] where F represents the effect type, I represents the type of the input element, and O represents the type of the output element.
Simply put, a Pipe in FS2 is a function that takes a stream of a certain type and returns another stream of the same or different type.
Let’s define a simple Pipe that takes a Stream of Ints and adds 1 to them:
val add1Pipe : Pipe[Pure, Int,Int] = stream => stream.map(_ + 1)
We can easily rewrite this as:
val add1Pipe : Pipe[Pure, Int,Int] = _.map(_ + 1)
In our word count example, we put our count logic into a pipe that
- takes the stream of bytes
- converts them to Strings
- splits them by lines
- splits lines into words that are flattened into the stream
- saves words and number of occurrences into a Map
- creates a String that represents words and their number of occurrences
- transforms the created String into a stream of Bytes
val pipe : Pipe[IO,Byte,Byte] = src =>
src.through(text.utf8.decode) // bytes to String
.through(text.lines) // split Strings to lines
.flatMap(line => Stream.apply(line.split("\\W+"): _*)) // split lines to words and flatten
.fold(Map.empty[String, Int]) {
(count, word) =>
count + (word -> (count.getOrElse(word, 0) + 1))
}
.map (_.foldLeft("") {
case (accumulator, (word, count)) =>
accumulator + s"$word = $count\n"
}
)
.through(text.utf8.encode) // transform String to stream of bytes
The type signature of our pipe is Pipe[IO,Byte,Byte], which is a stream transformation represented as a function from a stream of bytes to another stream of bytes.
If a Pipe is connected to a Source, the result is a new Source. Likewise, a Pipe connected to a Sink creates a new Sink.
3.3. Sink
A Sink can be thought of as a stream processing element that mostly represents the end of a stream. It is typically where effectful operations are performed such as writing to a file, database, or socket, or printing to a console.
In FS2, a sink is represented as Sink[F[_], -I], which is a type alias for Pipe[F, I, Unit]. The former has been deprecated for the latter. A Sink is a specialized pipe that converts a stream to a Stream[F, Unit].
It’s a function that takes an input value, typically runs an effect, and returns Unit indicating an effectful operation potentially using that input value.
In our word count example, we defined a sink that takes a stream of Bytes and writes those Bytes to a file:
val sink : Pipe[IO,Byte,Unit] = Files[IO].writeAll(Path(writeTo))
It’s important to note that a sink is just a regular pipe that can return any value, but typically represents an effectful computation, hence the Unit return type.
3.4. Connecting a Stream
Now that we’ve defined stream elements, let’s talk about how these elements are connected to form a whole stream.
For our word count example, we defined a Source that emits Bytes, a pipe that performs a series of transformations to those bytes, and a sink that takes those bytes and writes them to a file.
One of the major ways we connect these elements is via a method in FS2 streams called through, which transforms a given stream given a pipe:
val stream : Stream[IO,Unit] =
source
.through(pipe)
.through(sink)
This is where we connect the elements to form our runnable stream that will perform our word count.
4. Advanced Features
The FS2 streaming library has a lot of other features that make it exciting. It has built-in support for asynchronous computation or batching. In this section, we’ll look into some of the more advanced features that this streaming library offers.
4.1. Batching
Batching is the process of grouping elements and emitting them downstream for processing. In FS2, these groups of elements are called Chunks.
A Chunk can be thought of as a finite sequence of values that is used by FS2 streams internally:
Stream((1 to 100) : _*)
.chunkN(10) // group 10 elements together
.map(println)
.compile
.drain
4.2. Asynchronicity
We can use FS2 streams to write asynchronous code that leverages the power of concurrency and parallelism.
Here’s a contrived example where we concurrently write a range of numbers to a socket:
def writeToSocket[F[_] : Async](chunk: Chunk[String]): F[Unit] =
Async[F].async_ { callback =>
println(s"[thread: ${Thread.currentThread().getName}] :: Writing $chunk to socket")
callback(Right(()))
}
Stream((1 to 100).map(_.toString): _*)
.chunkN(10)
.covary[IO]
.parEvalMapUnordered(10)(writeToSocket[IO])
.compile
.drain
parEvalMapUnordered evaluates effects in parallel, emitting the results downstream in an unordered manner. If we run this code, we’ll see that the numbers will be printed in parallel. The number of concurrent effects is limited by the maxConcurrent parameter supplied.
The library also provides other features such as error handling and throttling.
5. Conclusion
In this article, we’ve seen how streams can help us handle large or infinite data sets. We also looked into the FS2 library and saw how to functionally write streaming applications, and we learned about some extra features that the library provides.
As usual, the source code can be found over on GitHub.