1. Overview

The Axon Framework helps us build event-driven microservice systems. In A Guide to the Axon Framework, we learned about Axon by walking through a simple Axon Spring Boot application. The application can create and update orders and can also confirm and ship those orders.

In Dispatching Queries in Axon Framework, we added some more queries to the OrderQueryService.

Queries are typically used for UI’s, which usually call REST endpoints.

In this tutorial, we’ll create REST endpoints for all the queries. We’ll also use those endpoints from an integration test.

2. Using a Query in the REST Endpoint

We can add REST endpoints by adding functions to a @RestController annotated class. We’ll use the class OrderRestEndpoint for this. Previously we used the QueryGateway directly in the controller. We’ll replace the injected QueryGateway for the OrderQueryService, which we implemented in Dispatching Queries in Axon Framework. This way, the only concern for the controller functions is to bind behavior to the REST paths.

All the endpoints are listed in the order-api.http file in the project. Thanks to this file, the endpoints are callable when using IntelliJ as our IDE.

2.1. Point-to-Point Queries

Point-to-Point Queries only have a single response and are therefore easy to implement:

@GetMapping("/all-orders")
public CompletableFuture<List<OrderResponse>> findAllOrders() {
    return orderQueryService.findAllOrders();
}

Spring waits for the CompletableFuture to be resolved and responds with the JSON-formatted payload. We can test this with a call to localhost:8080/all-orders to get all the orders in one array.

From a clean setup, if we first add two orders using a post to http://localhost:8080/order/666a1661-474d-4046-8b12-8b5896312768 and http://localhost:8080/ship-order, we should see the following when we call get on http://localhost:8080/all-orders:

[
  {
    "orderId": "72d67527-a27c-416e-a904-396ebf222344",
    "products": {
      "Deluxe Chair": 1
    },
    "orderStatus": "SHIPPED"
  },
  {
    "orderId": "666a1661-474d-4046-8b12-8b5896312768",
    "products": {},
    "orderStatus": "CREATED"
  }
]

2.2. Streaming Queries

A streaming query will return a stream of events and eventually close. We could wait for the stream to close and send the response once it’s completed. However, it’s more efficient to stream it directly. We do this by making use of Server-Send events:

@GetMapping(path = "/all-orders-streaming", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<OrderResponse> allOrdersStreaming() {
    return orderQueryService.allOrdersStreaming();
}

By adding the media type, Spring understands we want the response as server-send events. This means each order is sent individually. If the client supports server-send events, localhost:8080/all-orders-streaming will return all the orders one by one.

Having the same items in the database as with the point-to-point query will give a result like:

data:{"orderId":"72d67527-a27c-416e-a904-396ebf222344","products":{"Deluxe Chair":1},"orderStatus":"SHIPPED"}

data:{"orderId":"666a1661-474d-4046-8b12-8b5896312768","products":{},"orderStatus":"CREATED"}

These are both individual server send events.

2.3. Scatter-Gather Queries.

The logic of combining the responses returned to the Axon query is already present in the OrderQueryService. This makes the implementation very similar to the point-to-point query, as there’s just a single response. For example, to add an endpoint using the scatter-gather query:

@GetMapping("/total-shipped/{product-id}")
public Integer totalShipped(@PathVariable("product-id") String productId) {
    return orderQueryService.totalShipped(productId);
}

Calling http://localhost:8080/total-shipped/Deluxe Chair returns the total number of shipped chairs, including the 234 from the LegacyQueryHandler. If the one from the ship-order call is still in the database, it should return 235.

2.4. Subscription Queries

Contrary to streaming queries, subscribing queries might never end. So for subscription queries waiting till it’s complete is not an option. We’ll again leverage server-send events to add the endpoint:

@GetMapping(path = "/order-updates/{order-id}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<OrderResponse> orderUpdates(@PathVariable("order-id") String orderId) {
    return orderQueryService.orderUpdates(orderId);
}

Doing a call to http://localhost:8080/order-updates/666a1661-474d-4046-8b12-8b5896312768 will give us a stream of updates on that product. With a post call to http://localhost:8080/order/666a1661-474d-4046-8b12-8b5896312768/product/a6aa01eb-4e38-4dfb-b53b-b5b82961fbf3dd, we trigger an update. That update is sent as a server-send event.

We’ll see both the initial state and the state after the update. The connection stays open to receive further updates.

data:{"orderId":"666a1661-474d-4046-8b12-8b5896312768","products":{},"orderStatus":"CREATED"}

data:{"orderId":"666a1661-474d-4046-8b12-8b5896312768","products":{"a6aa01eb-4e38-4dfb-b53b-b5b82961fbf3":1},"orderStatus":"CREATED"}

As we can see, the update contains the product we added.

3. Integration Tests

For the integration tests, let’s use WebClient.

For these tests, we’ll run the whole application using @SpringBootTest, first changing the state using the other REST endpoints. These other REST endpoints trigger one or multiple commands to create one or numerous events. To create orders, we’ll use the endpoints that were added in A Guide to the Axon Framework. We use the @DirtiesContext annotation on each test, so events created in one test don’t affect another.

Instead of running Axon Server during the integration tests, we set axon.axonserver.enabled=false in application.properties in our src/test/resources. This way, we’ll use the non-distributed gateways, which run faster and don’t require Axon Server. The gateways are the instances handling the three different kinds of messages.

We can create some helper methods to make our tests more readable. These helper functions provide the correct types and set HTTP headers when needed. For example:

private void verifyVoidPost(WebClient client, String uri) {
    StepVerifier.create(retrieveResponse(client.post()
      .uri(uri)))
      .verifyComplete();
}

This is useful for calling post endpoints with a void return type. It will use the retrieveResponse() helper function to do the call and verify it’s complete. Such things are used often and take a couple of lines of code. We make the tests more readable and maintainable by putting them in helper functions.

3.1. Testing the Point-to-Point Query

To test the /all-orders REST endpoint, let’s create one order and then validate if we can retrieve the created order. To be able to do this, we first need to create a WebClient. The web client is a reactive instance we can use to make HTTP calls. After the call to create an order, we fetch all orders and verify the result:

WebClient client = WebClient.builder()
  .clientConnector(httpConnector())
  .build();
createRandomNewOrder(client);
StepVerifier.create(retrieveListResponse(client.get()
    .uri("http://localhost:" + port + "/all-orders")))
  .expectNextMatches(list -> 1 == list.size() && list.get(0)
    .getOrderStatus() == OrderStatusResponse.CREATED)
  .verifyComplete();

Since it’s reactive, we can use a StepVerifier from reactor-test to validate the response.

We expect just one Order in the list, the one we just created. Furthermore, we expect the Order to have the CREATED order status.

3.2. Testing the Streaming Query

The streaming query might return multiple orders. We also want to test if the stream completes. For the test, we’ll create three new random orders and then test the streaming query response:

WebClient client = WebClient.builder()
  .clientConnector(httpConnector())
  .build();
for (int i = 0; i < 3; i++) {
    createRandomNewOrder(client);
}
StepVerifier.create(retrieveStreamingResponse(client.get()
    .uri("http://localhost:" + port + "/all-orders-streaming")))
  .expectNextMatches(o -> o.getOrderStatus() == OrderStatusResponse.CREATED)
  .expectNextMatches(o -> o.getOrderStatus() == OrderStatusResponse.CREATED)
  .expectNextMatches(o -> o.getOrderStatus() == OrderStatusResponse.CREATED)
  .verifyComplete();

With the verifyComplete() at the end, we ensure the stream is closed. We should note that it’s possible to implement a streaming query in such a way that it doesn’t complete. In this case, it does, and it’s important to verify it.

3.3. Testing the Scatter-Gather Query

To test the scatter-gather query, we need to ensure the result from multiple handlers is combined. We ship one chair using an endpoint. We then retrieve all the shipped chairs. As the LegacyQueryHandler returns 234 for the chair, the result should be 235.

WebClient client = WebClient.builder()
  .clientConnector(httpConnector())
  .build();
verifyVoidPost(client, "http://localhost:" + port + "/ship-order");
StepVerifier.create(retrieveIntegerResponse(client.get()
    .uri("http://localhost:" + port + "/total-shipped/Deluxe Chair")))
  .assertNext(r -> assertEquals(235, r))
  .verifyComplete();

The retrieveIntegerResponse() helper function returns an integer from the response body.

3.4. Testing the Subscription Query

The subscription query will stay active as long as we don’t close the connection. We’d like to test both the initial result and the updates. Therefore, we use a ScheduledExecutorService so we can use multiple threads in the test. The service allows updating an order from one Thread while validating the returned orders in another. To make it a bit more readable, we use a different method to do updates:

private void addIncrementDecrementConfirmAndShipProduct(String orderId, String productId) {
    WebClient client = WebClient.builder()
      .clientConnector(httpConnector())
      .build();
    String base = "http://localhost:" + port + "/order/" + orderId;
    verifyVoidPost(client, base + "/product/" + productId);
    verifyVoidPost(client, base + "/product/" + productId + "/increment");
    // and some more
}

The method creates and uses its own instance of a web client to not interfere with the one used to verify the response.

The actual test will call this from the executor and validate the updates:

//Create two webclients, creating the id's for the test, and create an order.
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
executor.schedule(() -> addIncrementDecrementConfirmAndShipProduct(orderId, productId), 1L, TimeUnit.SECONDS);
try {
    StepVerifier.create(retrieveStreamingResponse(receiverClient.get()
      .uri("http://localhost:" + port + "/order-updates/" + orderId)))
      .assertNext(p -> assertTrue(p.getProducts().isEmpty()))
      //Some more assertions.
      .assertNext(p -> assertEquals(OrderStatusResponse.SHIPPED, p.getOrderStatus()))
      .thenCancel()
      .verify();
} finally {
    executor.shutdown();
}

We should note that we wait one second before the updates to be sure we don’t miss the first update. We use a random UUID to generate the productId, which is used both for updating and verifying the results. Each change should trigger an update.

Depending on the expected state after the update, we add an assertion. We need to call thenCancel() to end the test, as the subscription would stay open without it. A finally block is used to ensure we always close the executor.

4. Conclusion

In this article, we learned how to add REST endpoints for queries. These can be used to build a UI.

We also learned how to test those endpoints using WebClient.

The implementation of all these examples and code snippets can be found over on GitHub.

For any additional questions on this topic, also check out Discuss AxonIQ.