1. Overview

Axon Framework helps us build event-driven microservice systems. In A Guide to the Axon Framework, we learned about Axon by walking through a simple Axon Spring Boot application that includes building an example Order model for us to update and query. That article uses a simple point-to-point query.

In this tutorial, we’ll build on the above example to examine all of the ways we can dispatch queries in Axon. In addition to looking more closely at the point-to-point query, we’ll also learn about the streaming query, the scatter-gather query and the subscription query.

2. Query Dispatching

When we submit a query to Axon, the framework will issue that query to all registered query handlers capable of answering our query. In a distributed system, it’s possible that multiple nodes can support the same kind of query, and it’s also possible for a single node to have multiple query handlers that can support the query.

How, then, does Axon decide which results to include in its response? The answer depends on how we dispatch the query. Axon gives us three options:

  • A point-to-point query obtains a complete answer from any node that supports our query
  • A streaming query obtains a stream of answers from any node that supports our query
  • A scatter-gather query obtains a complete answer from all nodes that support our query
  • A subscription query obtains the answer so far and then continues to listen for any updates

In the following sections, we’ll learn how to support and dispatch each kind of query.

3. Point-to-Point Queries

With a point-to-point query, Axon issues the query to every node that supports the query. Axon assumes any node is capable of giving a complete answer to a point-to-point query, and it will simply return the results it gets from the first node that responds.

In this section, we’ll use a point-to-point query to get all current Orders in our system.

3.1. Defining the Query

Axon uses strongly typed classes to represent a query type and encapsulate the query parameters. In this case, since we’re querying all orders, we don’t require any query parameters. Thus, we can represent our query with an empty class:

public class FindAllOrderedProductsQuery {}

3.2. Defining the Query Handler

We can register query handlers using the @QueryHandler annotation.

Let’s create a class for our query handlers and add a handler that can support FindAllOrderedProductsQuery queries:

@Service
public class InMemoryOrdersEventHandler implements OrdersEventHandler {
    private final Map<String, Order> orders = new HashMap<>();

    @QueryHandler
    public List<Order> handle(FindAllOrderedProductsQuery query) {
        return new ArrayList<>(orders.values());
    }
}

In the above example, we’re registering handle() as an Axon query handler that:

  1. is capable of responding to FindAllOrderedProductsQuery queries
  2. returns a List of Orders. As we’ll see later, Axon takes the return type into account when deciding which query handler can respond to a given query. This can make it easier to gradually migrate to a new API.

We use the OrdersEventHandler interface above so that we can later swap in an implementation that uses a persistent data store, such as MongoDB. For this tutorial, we’ll keep things simple by storing Order objects in an in-memory Map. Thus, our query handler just needs to return the Order objects as a List.

3.3. Dispatching a Point-to-Point Query

Now that we’ve defined a query type and a query handler, we’re ready to dispatch a FindAllOrderedProductsQuery to Axon. Let’s create a service class with a method that issues a point-to-point FindAllOrderedProductsQuery:

@Service
public class OrderQueryService {
    private final QueryGateway queryGateway;

    public OrderQueryService(QueryGateway queryGateway) {
        this.queryGateway = queryGateway;
    }

    public CompletableFuture<List<OrderResponse>> findAllOrders() {
        return queryGateway.query(new FindAllOrderedProductsQuery(),
            ResponseTypes.multipleInstancesOf(Order.class))
          .thenApply(r -> r.stream()
            .map(OrderResponse::new)
            .collect(Collectors.toList()));
    }
}

In the above example, we use Axon’s QueryGateway to dispatch an instance of FindAllOrderedProductsQuery. We use the gateway’s query() method to issue a point-to-point query. Because we are specifying ResponseTypes.multipleInstancesOf(Order.class), Axon knows we only want to talk to query handlers whose return type is a collection of Order objects.

Finally, to add a layer of indirection between our Order model class and our external clients, we wrap our results in OrderResponse objects.

3.4. Testing Our Point-to-Point Query

We’ll use @SpringBootTest to test our query using the Axon integration. Let’s start by adding the spring-test dependency to our pom.xml file:

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-test</artifactId>
    <scope>test</scope>
</dependency>

Next, let’s add a test that invokes our service method to retrieve an Order:

@SpringBootTest(classes = OrderApplication.class)
class OrderQueryServiceIntegrationTest {

    @Autowired
    OrderQueryService queryService;

    @Autowired
    OrdersEventHandler handler;

    private String orderId;

    @BeforeEach
    void setUp() {
        orderId = UUID.randomUUID().toString();
        Order order = new Order(orderId);
        handler.reset(Collections.singletonList(order));
    }

    @Test
    void givenOrderCreatedEventSend_whenCallingAllOrders_thenOneCreatedOrderIsReturned()
            throws ExecutionException, InterruptedException {
        List<OrderResponse> result = queryService.findAllOrders().get();
        assertEquals(1, result.size());
        OrderResponse response = result.get(0);
        assertEquals(orderId, response.getOrderId());
        assertEquals(OrderStatusResponse.CREATED, response.getOrderStatus());
        assertTrue(response.getProducts().isEmpty());
    }
}

In our @BeforeEach method above, we invoke reset(), which is a convenience method in OrdersEventHandler for pre-loading Order objects from a legacy system, or to help facilitate a migration. Here, we use it to pre-load an Order into our in-memory store for our testing.

We then invoke our service method and verify that it has retrieved our test order after dispatching our query to the query handler we set up earlier.

4. Streaming Queries

With streaming queries, we can send large collections as a stream.

Instead of waiting until the whole result is complete at the query handler side, like with point-to-point queries, the result can be returned in pieces. Unlike subscription queries, streaming queries are expected to be complete at some point.

By depending on project Reactor, the streaming query benefits from features such as backpressure to handle large collections of results. If we don’t already have the reactor-core dependency, we need to add it to be able to use streaming queries.

4.1. Defining the Query

We’ll reuse the query from the point-to-point query.

4.2. Defining the Query Handler

A streaming query should return a Publisher. We can use Reactor to create a Mono from the values of the in-memory map:

@QueryHandler
public Publisher<Order> handleStreaming(FindAllOrderedProductsQuery query) {
    return Mono.fromCallable(orders::values).flatMapMany(Flux::fromIterable);
}

We use flatMapMany() to convert the Mono to a Publisher.

4.3. Dispatching a Streaming Query

The method to add to the OrderQueryService is very similar to the point-to-point query. We do give it a different method name, so the distinction’s clear:

public Flux<OrderResponse> allOrdersStreaming() {
    Publisher<Order> publisher = queryGateway.streamingQuery(new FindAllOrderedProductsQuery(), Order.class);
    return Flux.from(publisher).map(OrderResponse::new);
}

4.4. Testing Our Streaming Query

Let’s add a test for this to our OrderQueryServiceIntegrationTest:

@Test
void givenOrderCreatedEventSend_whenCallingAllOrdersStreaming_thenOneOrderIsReturned() {
    Flux<OrderResponse> result = queryService.allOrdersStreaming();
    StepVerifier.create(result)
      .assertNext(order -> assertEquals(orderId, order.getOrderId()))
      .expectComplete()
      .verify();
}

We should note that with the expectComplete(), we verify the stream was indeed completed.

5. Scatter-Gather Queries

A scatter-gather query is dispatched to all query handlers in all nodes that support the query. For these queries, the results from each query handler are combined into a single response. If two nodes have the same Spring application name, Axon considers them equivalent and will only use the results from the first node that responds.

In this section, we’ll create a query to retrieve the total number of shipped products that match a given product ID. We will simulate querying both a live system and a legacy system in order to show that Axon will combine the responses from both systems.

5.1. Defining the Query

Unlike our point-to-point query, we need to supply a parameter this time: the product ID. Instead of an empty class, we’ll create a POJO with our product ID parameter:

public class TotalProductsShippedQuery {
    private final String productId;

    public TotalProductsShippedQuery(String productId) {
        this.productId = productId;
    }

    // getter
}

5.2. Defining the Query Handlers

First, we’ll query the event-based system, which, as we’ll recall, uses an in-memory data store. Let’s add a query handler to our existing InMemoryOrdersEventHandler to get the total number of shipped products:

@QueryHandler
public Integer handle(TotalProductsShippedQuery query) {
    return orders.values().stream()
      .filter(o -> o.getOrderStatus() == OrderStatus.SHIPPED)
      .map(o -> Optional.ofNullable(o.getProducts().get(query.getProductId())).orElse(0))
      .reduce(0, Integer::sum);
}

Above, we retrieve all of our in-memory Order objects and remove any that haven’t been shipped. We then invoke getProducts() on each Order to get the number of products shipped whose product ID matches our query parameter. We then sum those numbers to get our total number of shipped products.

Since we want to combine those results with the numbers in our hypothetical legacy system, let’s simulate the legacy data with a separate class and query handler:

@Service
public class LegacyQueryHandler {
    @QueryHandler
    public Integer handle(TotalProductsShippedQuery query) {
        switch (query.getProductId()) {
        case "Deluxe Chair":
            return 234;
        case "a6aa01eb-4e38-4dfb-b53b-b5b82961fbf3":
            return 10;
        default:
            return 0;
        }
    }
}

For the sake of this tutorial, this query handler exists in the same Spring application as our InMemoryOrdersEventHandler handler. In a real-life scenario, we would likely not have multiple query handlers for the same query type within the same application. A scatter-gather query typically combines results from multiple Spring applications that each have a single handler.

5.3. Dispatching a Scatter-Gather Query

Let’s add a new method to our OrderQueryService for dispatching a scatter-gather query:

public Integer totalShipped(String productId) {
    return queryGateway.scatterGather(new TotalProductsShippedQuery(productId),
        ResponseTypes.instanceOf(Integer.class), 10L, TimeUnit.SECONDS)
      .reduce(0, Integer::sum);
}

This time, we construct our query object with the productId parameter. We also set a 10-second timeout to our scatterGather() call. Axon will only respond with results it can retrieve within that time window. If one or more handlers do not respond within that window, their results will not be included in queryGateway‘s response.

5.4. Testing Our Scatter-Gather Query

Let’s add a test for this to our OrderQueryServiceIntegrationTest:

void givenThreeDeluxeChairsShipped_whenCallingAllShippedChairs_then234PlusTreeIsReturned() {
    Order order = new Order(orderId);
    order.getProducts().put("Deluxe Chair", 3);
    order.setOrderShipped();
    handler.reset(Collections.singletonList(order));

    assertEquals(237, queryService.totalShipped("Deluxe Chair"));
}

Above, we use our reset() method to simulate three orders in our event-driven system. Previously, in our LegacyQueryHandler, we hard-coded 234 shipped deluxe chairs in our legacy system. Thus, our test should yield a combined total of 237 deluxe chairs shipped.

6. Subscription Queries

With subscription queries, we get an initial result followed by a stream of updates. In this section, we’ll query our system for an Order in its current state, but then remain connected to Axon in order to get any new updates to that Order as they occur.

6.1. Defining the Query

Since we want to retrieve a specific order, let’s create a query class that includes an order ID as its only parameter:

public class OrderUpdatesQuery {
    private final String orderId;

    public OrderUpdatesQuery(String orderId) {
        this.orderId = orderId;
    }

    // getter
}

6.2. Defining the Query Handler

The query handler for retrieving an Order from our in-memory map is very simple. Let’s add it to our InMemoryOrdersEventHandler class:

@QueryHandler
public Order handle(OrderUpdatesQuery query) {
    return orders.get(query.getOrderId());
}

6.3. Emitting Query Updates

A subscription query is only interesting when there are updates. Axon Framework provides a QueryUpdateEmitter class we can use to inform Axon how and when a subscription should be updated. Let’s inject that emitter into our InMemoryOrdersEventHandler class and use it in a convenience method:

@Service
public class InMemoryOrdersEventHandler implements OrdersEventHandler {

    private final QueryUpdateEmitter emitter;

    public InMemoryOrdersEventHandler(QueryUpdateEmitter emitter) {
        this.emitter = emitter;
    }

    private void emitUpdate(Order order) {
        emitter.emit(OrderUpdatesQuery.class, q -> order.getOrderId()
          .equals(q.getOrderId()), order);
    }

    // our event and query handlers
}

Our emitter.emit() invocation tells Axon that any clients subscribed to the OrderUpdatesQuery may need to receive an update. The second argument is a filter telling Axon that only subscriptions matching the supplied order ID should get the update.

We can now use our emitUpdate() method inside any event handler that modifies an order. For example, if an order is shipped, any active subscription to updates to that order should be notified. Let’s create an event handler for the OrderShippedEvent that was covered in the previous article and have it emit updates to the shipped order:

@Service
public class InMemoryOrdersEventHandler implements OrdersEventHandler {
    @EventHandler
    public void on(OrderShippedEvent event) {
        orders.computeIfPresent(event.getOrderId(), (orderId, order) -> {
            order.setOrderShipped();
            emitUpdate(order);
            return order;
        });
    }

    // fields, query handlers, other event handlers, and our emitUpdate() method
}

We can do the same for our ProductAddedEventProductCountIncrementedEventProductCountDecrementedEvent, and OrderConfirmedEvent events.

6.4. Subscribing to a Query

Next, we’ll build a service method for subscribing to a query. We’ll use Flux from Reactor Core in order to stream updates to the client code.

Let’s add that dependency to our pom.xml file:

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
</dependency>

Now, let’s add our service method implementation to OrderQueryService:

public class OrderQueryService {
    public Flux<OrderResponse> orderUpdates(String orderId) {
        return subscriptionQuery(new OrderUpdatesQuery(orderId), ResponseTypes.instanceOf(Order.class))
                .map(OrderResponse::new);
    }

    private <Q, R> Flux<R> subscriptionQuery(Q query, ResponseType<R> resultType) {
        SubscriptionQueryResult<R, R> result = queryGateway.subscriptionQuery(query,
          resultType, resultType);
        return result.initialResult()
          .concatWith(result.updates())
          .doFinally(signal -> result.close());
    }

    // our other service methods
}

The public orderUpdates() method above delegates most of its work to our private convenience method, subscriptionQuery(), though we again package our response as OrderResponse objects so we’re not exposing our internal Order object.

Our generalized subscriptionQuery() convenience method is where we combine the initial result we get from Axon with any future updates.

First, we invoke Axon’s queryGateway.subscriptionQuery() to get a SubscriptionQueryResult object. We supply resultType to *queryGateway.*subscriptionQuery() twice because we are always expecting an Order object, but we could use a different type for updates if we wanted to.

Next, we use result.getInitialResult() and result.getUpdates() to get all the information we need to fulfill the subscription.

Finally, we close the stream.

While we don’t use it here, there’s also a Reactive extension for Axon Framework that offers an alternative query gateway that can make it easier to work with subscription queries.

6.5. Testing Our Subscription Query

To help us test our service method that returns a Flux, we’ll use the StepVerifier class that we get from the reactor-test dependency:

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <scope>test</scope>
</dependency>

Let’s add our test:

class OrderQueryServiceIntegrationTest {
    @Test
    void givenOrdersAreUpdated_whenCallingOrderUpdates_thenUpdatesReturned() {
        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
        executor.schedule(this::addIncrementDecrementConfirmAndShip, 100L, TimeUnit.MILLISECONDS);
        try {
            StepVerifier.create(queryService.orderUpdates(orderId))
              .assertNext(order -> assertTrue(order.getProducts().isEmpty()))
              .assertNext(order -> assertEquals(1, order.getProducts().get(productId)))
              .assertNext(order -> assertEquals(2, order.getProducts().get(productId)))
              .assertNext(order -> assertEquals(1, order.getProducts().get(productId)))
              .assertNext(order -> assertEquals(OrderStatusResponse.CONFIRMED, order.getOrderStatus()))
              .assertNext(order -> assertEquals(OrderStatusResponse.SHIPPED, order.getOrderStatus()))
              .thenCancel()
              .verify();
        } finally {
            executor.shutdown();
        }
    }

    private void addIncrementDecrementConfirmAndShip() {
        sendProductAddedEvent();
        sendProductCountIncrementEvent();
        sendProductCountDecrement();
        sendOrderConfirmedEvent();
        sendOrderShippedEvent();
    }

    private void sendProductAddedEvent() {
        ProductAddedEvent event = new ProductAddedEvent(orderId, productId);
        eventGateway.publish(event);
    }

    private void sendProductCountIncrementEvent() {
        ProductCountIncrementedEvent event = new ProductCountIncrementedEvent(orderId, productId);
        eventGateway.publish(event);
    }

    private void sendProductCountDecrement() {
        ProductCountDecrementedEvent event = new ProductCountDecrementedEvent(orderId, productId);
        eventGateway.publish(event);
    }

    private void sendOrderConfirmedEvent() {
        OrderConfirmedEvent event = new OrderConfirmedEvent(orderId);
        eventGateway.publish(event);
    }

    private void sendOrderShippedEvent() {
        OrderShippedEvent event = new OrderShippedEvent(orderId);
        eventGateway.publish(event);
    }

    // our other tests
}

Above, we have a private addIncrementDecrementConfirmAndShip() method that publishes five Order-related events to Axon. We invoke this in a separate thread via ScheduledExecutorService 100ms after the test begins in order to simulate events that come in after we’ve begun our OrderUpdatesQuery subscription.

In our primary thread, we invoke the orderUpdates() query we’re testing, using StepVerifier to allow us to make assertions on every discrete update we receive from the subscription.

7. Conclusion

In this article, we explored three approaches to dispatching queries within the Axon Framework: point-to-point queries, scatter-gather queries, and subscription queries.

As always, the complete code examples for this article are available over on GitHub.