1. Overview

Axon framework helps us build event-driven microservice systems. In a guide to the Axon framework, we learned about Axon by walking through a simple axon Spring Boot application that includes building an example Order model for us to update and query. In dispatching queries in the Axon framework, we added all the supported queries.

This article will look at persisting Axon framework’s query model. We’ll cover storing the projections using MongoDB, along with the challenges for testing and how to keep the stream in sync with the query model.

2. Persistence Considerations

To create a handler that uses a database to persist the query model, we implement the OrdersEventHandler interface. In a production environment, we don’t want to build the query model from scratch each time. With the Axon framework, we can choose how to persist the model, and what to choose depends on the data involved. If we want free text search, we might want to use Elasticsearch. When we have unstructured data, we might want to use MongoDB. When there are a lot of relations between entities, we might want to use a graph database like Neo4J.

2.1. Token Store

When building the query model by going through the events, Axon uses a TokenStore to keep track. Ideally, the token store is persisted in the same database as the query model to ensure consistency. Using a persistent token store will also make sure we can run multiple instances, where each instance only needs to handle part of the events. Splitting to several instances works with segments, where an instance can claim all or part of the segments for processing. If we use JPA or JDBC for persistence, use the JpaTokenStore or JdbcTokenStore. Both token store implementations are available in the Axon framework without needing an extension.

2.2. Building the Query Model

At startup, a streaming event processor will start reading events from an event store. Using a persistent TokenStore, the processor begins where it was previously left. Otherwise, it will, by default, start at the beginning. For each event, the processor will call the event handler annotated method.

Let’s build further on the order application and allow orders to be created and updated in several ways. A ProductAddedEvent is handled in the InMemoryOrdersEventHandler by updating the in-memory model:

@EventHandler
public void on(ProductAddedEvent event) {
    orders.computeIfPresent(event.getOrderId(), (orderId, order) -> {
      order.addProduct(event.getProductId());
      return order;
    });
}

Here the order in the in-memory map will be updated using the addProduct function. Instead of an in-memory model, we can store the data in a database.

3. Mongo Extension

Let’s use MongoDB to persist our query model. We use the Axon framework mongo extension to persist the token store in Mongo. Since we already added the axon-bom, we don’t need to specify the version when adding the extension to our pom.xml:

<dependency>
    <groupId>org.axonframework.extensions.mongo</groupId>
    <artifactId>axon-mongo</artifactId>
</dependency>

3.1. Token Store

With the dependency in place, we can configure Axon to use the MongoTokenStore:

@Bean
public TokenStore getTokenStore(MongoClient client, Serializer serializer){
    return MongoTokenStore.builder()
      .mongoTemplate(
        DefaultMongoTemplate.builder()
          .mongoDatabase(client)
          .build()
      )
      .serializer(serializer)
      .build();
}

3.2. Event Handles Class

A Spring Profile named mongo enables to switch between implementations of the event hander. When the mongo profile is active, the MongoOrdersEventHandler is used, together with the token store configuration. Together this makes the event handler class:

@Service
@ProcessingGroup("orders")
@Profile("mongo")
public class MongoOrdersEventHandler implements OrdersEventHandler {
    // all methods regarding updating an querying the projection
}

At the same time, we added the @Profile("!mongo") in the InMemoryOrdersEventHandler, so there are not both active simultaneously. Spring profiles are an excellent way to enable components conditionally.

We’ll use dependency injection in the constructor to get the MongoClient and the QueryUpdateEmitter. We use a MongoClient to create the MongoCollection and indexes. We inject the QueryUpdateEmitter to enable subscription queries:

public MongoOrdersEventHandler(MongoClient client, QueryUpdateEmitter emitter) {
    orders = client
      .getDatabase(AXON_FRAMEWORK_DATABASE_NAME)
      .getCollection(ORDER_COLLECTION_NAME);
    orders.createIndex(Indexes.ascending(ORDER_ID_PROPERTY_NAME),
      new IndexOptions().unique(true));
    this.emitter = emitter;
}

Note that we set the order id as unique. This way, we can be sure there can’t be two documents with the same order id.

The MongoOrdersEventHandler uses the orders mongo collection to handle queries. We need to map the Mongo documents to orders using the documentToOrder() method:

@QueryHandler
public List<Order> handle(FindAllOrderedProductsQuery query) {
    List<Order> orderList = new ArrayList<>();
    orders
      .find()
      .forEach(d -> orderList.add(documentToOrder(d)));
    return orderList;
}

3.3. Complex Queries

To be able to handle the TotalProductsShippedQuery, we added a shippedProductFilter that filters out the orders that are shipped and have the product:

private Bson shippedProductFilter(String productId){
    return and(
      eq(ORDER_STATUS_PROPERTY_NAME, OrderStatus.SHIPPED.toString()),
      exists(String.format(PRODUCTS_PROPERTY_NAME + ".%s", productId))
    );
}

This filter is then used in the query hander extracting and adding the count of the products :

@QueryHandler
public Integer handle(TotalProductsShippedQuery query) {
    AtomicInteger result = new AtomicInteger();
    orders
      .find(shippedProductFilter(query.getProductId()))
      .map(d -> d.get(PRODUCTS_PROPERTY_NAME, Document.class))
      .map(d -> d.getInteger(query.getProductId(), 0))
      .forEach(result::addAndGet);
    return result.get();
}

This query will get all the shipped products and retrieve all of the products in those documents. It will then count the specific product queried for and return the total number.

4. Testing the Persisted Query Model

Testing with the persisted model can make things harder since we **want an isolated environment for each test.
**

4.1. Unit Tests

For the MongoOrdersEventHandler, we need to make sure to drop the collection so that we don’t keep the state from a previous test. We do this by implementing the getHandler() method:

@Override
protected OrdersEventHandler getHandler() {
    mongoClient.getDatabase("axonframework").drop();
    return new MongoOrdersEventHandler(mongoClient, emitter);
}

Using the @BeforeEach annotated method, we can be sure each test starts fresh. In this case, we use an embedded Mongo for the tests. Another alternative would be to use test containers. In this regard, testing the query model is not different from other application tests that require a database.

4.2. Integration Tests

For the integration test, we use a similar approach. However, since the integration test uses the OrdersEventHandler interface, we rely on the implemented reset() method.

The reset() method implementation is:

@Override
public void reset(List<Order> orderList) {
    orders.deleteMany(new Document());
    orderList.forEach(o -> orders.insertOne(orderToDocument(o)));
}

The reset() method ensures only the orders in the list are part of the collection. The method executes before each test in OrderQueryServiceIntegrationTest:

@BeforeEach
void setUp() {
    orderId = UUID.randomUUID().toString();
    Order order = new Order(orderId);
    handler.reset(Collections.singletonList(order));
}

As for testing the queries, we need at least one order. It makes the tests themselves easier by already storing one order.

5. Conclusion

In this article, we showed how to persist the query model. We learned how to query and test the model using MongoDB.

As always, the complete code used in this article is available over on GitHub.