1. Introduction

Cats Effect is one of the most popular libraries in the Scala ecosystem. It provides the IO monad for describing, controlling, and performing actions in an effective way. In this tutorial, we’ll look at a powerful feature of Cats Effect — concurrent programming using Fibers.

2. Concurrency vs. Parallelism

Parallelism is when two or more tasks are being executed simultaneously. This is generally achieved by having multiple cores of processors. That means the number of processing cores defines the degree of parallelism.

Concurrency is the concept of executing multiple tasks on the same core by context switching. It creates an illusion of simultaneous execution, whereas, in reality, the system switches between the different tasks and executes them. Concurrency is possible even in single-core (single-threaded) environments. In other words, parallel tasks will be executed on different threads, whereas concurrent tasks might be executed on the same thread.

An application can fall into one of four groups:

  • Concurrent and NOT parallel
  • Parallel and NOT Concurrent
  • Both Parallel and Concurrent
  • Neither Parallel nor Concurrent (Sequential Execution)

3. Fibers

By default, the Cats Effect runtime executes the IOs sequentially. If multiple IOs are chained together and executed, they’ll be processed in the order of chaining:

val sequentialOps = IO("Step1") >> IO("Step2") >> IO("Step3")

However, we can also write concurrent operations in Cats Effect using Fibers. Fibers are extremely lightweight threads. We can create as many fibers as required without causing any performance issues.

The Cats Effect runtime will schedule these fibers on the available threads automatically and in the best possible way. Fibers also implement semantic blocking, which means that the CE runtime will not block the underlying threads. Additionally, the fiber is not locked to a particular thread. Hence, the runtime can execute the fibers across multiple threads based on the thread availability.

Cats Effect uses Fibers as the building blocks to implement advanced concurrency features.

4. Setup

Let’s first set up the necessary dependencies required to work with Cats Effect. We can add the library dependency to the build.sbt:

libraryDependencies += "org.typelevel" %% "cats-effect" % "3.5.0"

5. Managing Fiber Lifecycle

The fibers follow a very similar lifecycle as that of a thread. Before looking at that, we can create an extension method on IO to print the name of the thread on which a fiber is executed. This is useful for identifying the thread on which the IO or fiber is scheduled and executed:

implicit class Xtensions[A](io: IO[A]) {
  def printIO: IO[A] =
    for {
      a <- io
      _ = println(s"[${Thread.currentThread().getName}] " + a)
    } yield a
}

5.1. Creating and Executing a Fiber

Now that we’ve added the dependency, let’s create and execute fibers. We can invoke the start method on an IO to create a fiber. Since creating a fiber itself is an effect, the start method wraps the fiber in an IO to suspend the effect. Since it returns an IO, we can use for-comprehension to chain and process the fibers, like any other IOs. Now, let’s create a fiber:

val io: IO[String] = IO("Starting a task").printIO >> IO.sleep(400.millis) >> IO("Task completed").printIO
val fibExec = for {
  fib <- io.start
  _ <- fib.join
} yield ()

The start method runs the IO in a different fiber. We need to invoke the join method on the fiber to get the result of it. When we execute the fiber with printIO method applied, we can see a different thread name for each IO

cats effect1

5.2. Cancelling a Fiber

One of the most important advantages of using a fiber is the ability to cancel it anytime during its execution. This ensures that the resources used for the fiber will be de-allocated and hence both memory and processing cycles are saved. To cancel a fiber execution, we can invoke cancel on the fiber:

val fibCancel = for {
  fib <- io.start
  _ <- IO.sleep(100.millis) >> fib.cancel >> IO("Fiber cancelled").printIO
  _ <- fib.join
} yield ()

In the above example, the fiber is canceled after 100 milliseconds into its execution.

5.3. Outcomes of Fiber Execution

After a fiber is executed, the result can be one of three types: Succeeded, Errored, or Canceled. When a fiber result is joined, its result is of type Outcome[IO, Throwable, A]. If a fiber completes successfully, the outcome will be of type Succeeded. If the fiber execution resulted in a failure, it will have the type Errored. On fiber cancelation, the outcome will be of type Canceled. Therefore, we can pattern-match on the outcome result to handle different scenarios:

val outcome: IO[String] = fibCancel.flatMap {
  case Outcome.Succeeded(fa) => IO("fiber executed successfully").printIO
  case Outcome.Errored(e)    => IO("error occurred during fiber execution").printIO
  case Outcome.Canceled()    => IO("fiber was canceled!").printIO
}

5.4. Applying Finalizer on Cancelation

We can apply specific finalizers on any fibers to execute on cancelation. This way, it’s easier to apply some operations only in case of cancelation, such as releasing used resources. We can apply the finalizer using the onCancel() method on the IO from which the fiber is created:

val ioWithCancelationHook = io.onCancel(IO("Applying cancelation finalizer").printIO.void)
val finaliserAction = for {
  fib <- ioWithCancelationHook.start
  _ <- IO.sleep(100.millis) >> fib.cancel >> IO("fiber cancelled").printIO
  _ <- fib.join
} yield ()

6. Racing IOs

*Racing is a concept when two IOs are executed on different fibers and the result of the first completed task (either a success or an error) is returned*. The losing fiber is generally canceled by the runtime. We can use the method race with two IOs as:

val participant1 = IO("Start Task1").printIO >> IO.sleep(Random.nextInt(1000).millis) >> IO("Task 1 completed").printIO
val participant2 = IO("Start Task2").printIO >> IO.sleep(Random.nextInt(1000).millis) >> IO("Task 2 completed").printIO
val raceResult: IO[Either[String, String]] = IO.race(participant1, participant2)

The return type of race result is an Either. Depending on the winner, either a Right or a Left will be present. Since the loser is canceled, only one of the complete statements will be printed when we run the race. We can verify the cancelation by using the onCancel finalizer before starting the race as:

val participant1WithFinaliser = participant1.onCancel(IO("Task 1 got canceled").printIO.void)
val participant2WithFinaliser = participant2.onCancel(IO("Task 2 got canceled").printIO.void)
val raceWithFinaliser = IO.race(participant1WithFinaliser, participant2WithFinaliser)

Now, this will print the completion message for the winner and the cancelation message for the loser. Cats Effect also provides another variation of race() as racePair(). Instead of canceling the loser fiber, racePair returns the handler to the loser fiber along with the winner result. The invoker can decide to either cancel the losing fiber or let it complete. We can apply racePair to the previous example as:

val racePairResult: IO[Either[
  (OutcomeIO[String], FiberIO[String]),
  (FiberIO[String], OutcomeIO[String])
  ]] = IO.racePair(participant1, participant2)

7. Timeout

Cats Effect provides a method to execute an IO with a timeout value. This way, we can ensure that a task will be completed within a duration, otherwise canceling it. Cats Effect implements this feature using the race method internally. We can use the timeout method on an IO as:

val ioWithTimeout: IO[String] = participant1.timeout(400.millis)

If the task takes more than 400 milliseconds, it will raise TimeoutException. We can use the method timeoutTo to additionally execute a fallback IO in case the task takes more than the desired time:

val ioWithFallback = participant1.timeoutTo(400.millis, IO("Fallback IO executed after timeout").printIO)

8. Conclusion

In this article, we looked at concurrent programming in Cats Effect using Fibers. As always, all the sample code used in this tutorial is available over on GitHub.


« 上一篇: Scala 继承指南