1. Overview
Event sourcing is a useful architecture for representing the state of a system as a series of ordered events. Its main difference from traditional storing patterns is that the events are stored in an append-only journal. This means that in case of an application crash, we can replay our data from the start and end up in the same state.
Moreover, we can replay events up to a specific point in time and get the state then, which may help us in determining the cause of an issue at that time.
In this tutorial, we’ll explore how to use Akka Persistence and implement stateful actors.
2. Commands and Events
To use the Akka Persistence module, we add its dependency to our build.sbt file:
libraryDependencies += "com.typesafe.akka" %% "akka-persistence-typed" % "2.8.0"
Now, we’re ready to implement our application.
We’ll implement a simple bank account. An application in this domain requires security in every aspect, but this is out of scope in our example. Our application will implement three basic actions: withdrawal, deposit and information about the balance.
2.1. The Commands
Each actor of our application will receive commands sequentially to determine which action it’s going to serve next. So, let’s implement a trait that we’ll use to define our commands:
sealed trait Command
final case class WithdrawCommand(
amount: Int,
replyTo: ActorRef[StatusReply[Response]]
) extends Command
final case class DepositCommand(
amount: Int,
replyTo: ActorRef[StatusReply[Response]]
) extends Command
final case class BalanceCheckCommand(replyTo: ActorRef[StatusReply[Response]])
extends Command
Each command contains information about the specific action that needs to take place. An ActorRef argument is also passed, so we can respond to the original sender. We wrap the response as a StatusReply, a class that holds information if the command was successfully executed or not.
Response is a custom object that we need to implement:
sealed trait Response
final case class ActionResponse(message: String) extends Response
final case class BalanceCheckResponse(amount: Int) extends Response
ActionResponse contains a simple message that a command was successfully served, while BalanceCheckResponse contains the amount the account holds.
2.2. The Events
Our actor will use the commands to trigger some functionality and generate the events that will persist. So, the next step is to implement our events in the same way we did the commands:
sealed trait Event
final case class WithdrawalEvent(amount: Int) extends Event
final case class DepositEvent(amount: Int) extends Event
In our context, we only need the amount withdrawn or deposited, but in a more complex scenario, we may add a lot more information.
3. The Persistent Actor
We implemented our commands and events and now we’re ready to implement our persistent actor. This actor is responsible for receiving the commands, constructing the events, persisting them and responding to the sender of the command:
object BankAccount {
final case class State(amount: Int)
def apply(id: Long): Behavior[Command] =
EventSourcedBehavior
.withEnforcedReplies[Command, Event, State](
persistenceId = PersistenceId(id.toString, "bank-account"),
emptyState = State(0),
commandHandler = commandHandler,
eventHandler = eventHandler
)
}
We actually create our persistent actor using the apply() method of our object. It returns a Behavior object holding a Command. This means that the actor returns a function that handles instances of Command. Since our application always returns a response to the sender, we enforce our Behavior to check upon compilation that a response is actually returned.
In a real application, we’ll need to implement one actor for each bank account we serve. For this, each bank account needs a unique id to know which events to load and replay from our persistence store. We do this using the PersistenceId class that receives a Long and concatenates it with the “bank-account” string to differentiate it from other actors that implement different functionalities.
After that, we determine our initial state, which in our case is a bank account holding no amount, and then we have to implement our basic functionality, the command and event handlers.
3.1. Snapshots
In a production application that has processed and stored a large number of events, replaying all of them for recovery takes a significant amount of time. To improve performance, our actor can persist a snapshot of its state and replay the events after that.
There are many policies regarding snapshotting. For example, we can persist a snapshot when a certain event is received. We’d of course persist the event as well.
For our application, we’ll take a snapshot every five events and keep the three latest snapshots. It’s quite easy to do this, as the only thing needed is to enhance our Behavior:
EventSourcedBehavior
.withEnforcedReplies[Command, Event, State](
persistenceId = PersistenceId(id.toString, "bank-account"),
emptyState = State(0),
commandHandler = commandHandler,
eventHandler = eventHandler
)
.withRetention(
RetentionCriteria.snapshotEvery(numberOfEvents = 5, keepNSnapshots = 3)
)
In case of failure, there’s no need to implement code to handle the recovery of snapshots and events. The actor does this automatically, by identifying the snapshots and events with the specific persistence id and will start replaying them. The actor only begins to accept new events once recovery is complete.
3.2. The Command Handler
The command handler is the most important part of our actor, as it handles our logic and event creation. It’s actually a PartialFunction that receives the current actor’s state and the received command to generate an Effect. In our case, since we always return a reply to the sender, it will be a ReplyEffect object:
val commandHandler: (State, Command) => ReplyEffect[Event, State] = {
(state, command) =>
command match {
case WithdrawCommand(amount, replyTo) =>
if (state.amount >= amount) {
Effect
.persist(WithdrawalEvent(amount))
.thenReply(replyTo)(_ =>
StatusReply.success(
ActionResponse(s"Amount $amount was withdrawn")
)
)
} else {
Effect.reply(replyTo)(
StatusReply.error(
s"Account has insufficient funds. Available balance ${state.amount}"
)
)
}
case DepositCommand(amount, replyTo) =>
Effect
.persist(DepositEvent(amount))
.thenReply(replyTo)(_ =>
StatusReply.success(
ActionResponse(s"Amount $amount was deposited")
)
)
case BalanceCheckCommand(replyTo) =>
Effect.reply(replyTo)(
StatusReply.Success(BalanceCheckResponse(state.amount))
)
}
}
In case of WithdrawCommand, we first check if the account actually holds the requested amount. If it does, we create WithdrawalEvent and persist it and then we return a successful response with an informative message. If it doesn’t, we return an error message.
DepositCommand is simpler, as we don’t need to make any checks. We just persist DepositEvent and reply with a successful response.
Finally, BalanceCheck**Command is the simplest of all. We just get the amount of the account from the actor’s state and return it to the sender.
3.3. The Event Handler
The event handler is also a PartialFunction responsible for handling the events that the command handler persisted in and updating the actor’s state. It receives the current state and the event and returns the updated state:
val eventHandler: (State, Event) => State = { (state, event) =>
event match {
case WithdrawalEvent(amount) => state.copy(state.amount - amount)
case DepositEvent(amount) => state.copy(state.amount + amount)
}
}
We only have two events to handle, WithdrawalEvent and DepositEvent. For each one, we only need to subtract or add the event’s amount to the current state and then return it.
4. Persistent Stores
We defined our events and our persistent actor, but we didn’t define the store in which our events and snapshots will be persisted.
There are various stores we can use. The Akka organization maintains plugins for Apache Cassandra, SQL databases and Google Spanner, but there are a lot more community plugins we can use.
In order to use a specific store, we need to override the akka.persistence.journal.plugin and akka.persistence.snapshot-store.plugin values in the application.conf file. We can choose different stores for the events and the snapshots.
For our application, we’ll persist the events in memory and the snapshots in the local filesystem under the target/snapshot folder:
akka.persistence {
journal {
plugin = "akka.persistence.journal.inmem"
}
snapshot-store {
plugin = "akka.persistence.snapshot-store.local"
local.dir = "target/snapshot"
}
}
Different stores will need extra configuration, like a datasource for an SQL database.
5. Conclusion
In this article, we built a simple bank account application that receives commands and generates and persists events using the Akka Persistence module.
We also discussed about snapshots and different persistent stores we can use in our application.
As usual, the code is available over on GitHub.