1. Overview

Kotlin Coroutines can often add readability to reactive, callback-heavy code.

In this tutorial, we’ll learn how to leverage these coroutines to build non-blocking Spring Boot applications. We’ll also compare the reactive and coroutine approaches.

2. Coroutine Motivation

These days, it’s common for systems to serve thousands or even millions of requests. Consequently, the development world is moving towards non-blocking computation and request handling. Utilizing system resources efficiently by offloading I/O operations from core threads makes it possible to handle many more requests compared to the traditional thread per-request approach.

Asynchronous processing is not a trivial task and can be error-prone. Fortunately, we have tools for tackling this complexity, like Java CompletableFutures or reactive libraries like RxJava. Indeed, the Spring framework already supports reactive approaches with the Reactor and WebFlux frameworks.

Asynchronous code can be hard to read, but the Kotlin language provides the concept of Coroutines to allow writing concurrent and asynchronous code in a sequential style.

Coroutines are very flexible, so we have more control over the execution of tasks via Jobs and Scopes. Besides that, Kotlin coroutines work perfectly side by side with existing Java non-blocking frameworks.

Spring will support Kotlin Coroutines from version 5.2.

3. Project Setup

Let’s start by adding the dependencies we’ll need.

We’ll use the Netty framework, an asynchronous client-server event-driven framework. We’ll use NettyWebServer as an embedded implementation of a reactive web server.

Additionally, starting from version 3.0, the Servlet Specification introduces support for applications to process requests in a non-blocking manner. So, we could also use a servlet container like Jetty or Tomcat.

Let’s use these versions, including Spring 6 via Spring Boot:

<properties>
    <kotlin.version>1.7.0</kotlin.version>
    <r2dbc.version>1.0.0.RELEASE</r2dbc.version>
    <r2dbc-spi.version>1.0.0.RELEASE</r2dbc-spi.version>
    <h2-r2dbc.version>1.0.0.RELEASE</h2-r2dbc.version>
    <spring-boot.version>3.1.4.RELEASE</spring-boot.version>
</properties>

Next, as we’re relying on WebFlux for asynchronous processing, it’s very important to use spring-boot-starter-webflux instead of spring-boot-starter-web. So, we need to include this dependency in our pom.xml:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
    <version>${spring-boot.version}</version>
</dependency>

Next, we’ll add R2DBC dependencies to support reactive database access:

<dependency>
    <groupId>io.r2dbc</groupId>
    <artifactId>r2dbc-h2</artifactId>
    <version>${h2-r2dbc.version}</version>
</dependency>
<dependency>
    <groupId>io.r2dbc</groupId>
    <artifactId>r2dbc-spi</artifactId>
    <version>${r2dbc-spi.version}</version>
</dependency>

Finally, we’ll add the Kotlin core and coroutines dependencies:

<dependency>
    <groupId>org.jetbrains.kotlin</groupId>
    <artifactId>kotlin-reflect</artifactId>
</dependency>
<dependency>
    <groupId>org.jetbrains.kotlin</groupId>
    <artifactId>kotlin-stdlib-jdk8</artifactId>
</dependency>
<dependency>
    <groupId>org.jetbrains.kotlinx</groupId>
    <artifactId>kotlinx-coroutines-core</artifactId>
</dependency>
<dependency>
    <groupId>org.jetbrains.kotlinx</groupId>
    <artifactId>kotlinx-coroutines-reactor</artifactId>
</dependency>

4. Spring Data R2DBC with Coroutines

In this section, we’ll focus on accessing databases both in reactive and coroutines styles.

4.1. Reactive R2DBC

Let’s start with the reactive relational database client. Simply put, R2DBC is an API specification that declares a reactive API to be implemented by database vendors.

Our data store will be powered by the in-memory H2 database. Additionally, reactive relational drivers are available for PostgreSQL and Microsoft SQL.

First, let’s implement a simple repository using the reactive approach:

@Repository
class ProductRepository(private val client: DatabaseClient) {

    fun getProductById(id: Int): Mono<Product> {
        return client.execute()
          .sql("SELECT * FROM products WHERE id = $1")
          .bind(0, id)
          .`as`(Product::class.java)
          .fetch()
          .one()
    }

    fun addNewProduct(name: String, price: Float): Mono<Void> {
        return client.execute()
          .sql("INSERT INTO products (name, price) VALUES($1, $2)")
          .bind(0, name)
          .bind(1, price)
          .then()
    }

    fun getAllProducts(): Flux<Product> {
        return client.select()
          .from("products")
          .`as`(Product::class.java)
          .fetch()
          .all()
    }
}

Here, we’re using the non-blocking DatabaseClient to execute queries against the database. Let’s rewrite our repository class using suspending functions and corresponding Kotlin types.

4.2. R2DBC with Coroutines

In order to transform functions from reactive to the Coroutines API, we add the suspend modifier before the function definition:

fun noResultFunc(): Mono<Void>
suspend fun noResultFunc()

Furthermore, we can omit the Void return type. In case of a non-void result, we just return a result of the defined type without wrapping it in the Mono class:

fun singleItemResultFunc(): Mono<T>
fun singleItemResultFunc(): T?

Next, if a source may emit more than a single item, we just change Flux to Flow as follows:

fun multiItemsResultFunc(): Flux<T>
fun mutliItemsResultFunc(): Flow<T>

Let’s apply these rules and refactor our repository:

@Repository
class ProductRepositoryCoroutines(private val client: DatabaseClient) {

    suspend fun getProductById(id: Int): Product? =
        client.execute()
          .sql("SELECT * FROM products WHERE id = $1")
          .bind(0, id)
          .`as`(Product::class.java)
          .fetch()
          .one()
          .awaitFirstOrNull()

    suspend fun addNewProduct(name: String, price: Float) =
        client.execute()
          .sql("INSERT INTO products (name, price) VALUES($1, $2)")
          .bind(0, name)
          .bind(1, price)
          .then()
          .awaitFirstOrNull()

    @FlowPreview
    fun getAllProducts(): Flow<Product> =
        client.select()
          .from("products")
          .`as`(Product::class.java)
          .fetch()
          .all()
          .asFlow()
}

In the snippet above, there are several points that require our attention. Where do these await* functions come from? They are defined as Kotlin extension functions in the kotlin-coroutines-reactive library.

Furthermore, there are more extensions available in the spring-data-r2dbc library.

5. Spring WebFlux Controllers

So far, we’ve seen how to implement repositories but haven’t made any actual queries to the datastore yet. So, in this section, we’ll figure out how to apply coroutines with the Spring WebFlux framework by creating non-blocking controllers.

5.1. Reactive Controllers

Let’s define two simple endpoints that, in turn, query the database through our repository.

Let’s start with the more familiar reactive style:

@RestController
class ProductController {
    @Autowired
    lateinit var productRepository: ProductRepository

    @GetMapping("/{id}")
    fun findOne(@PathVariable id: Int): Mono<Product> {
        return productRepository.getProductById(id)
    }

    @GetMapping("/")
    fun findAll(): Flux<Product> {
        return productRepository.getAllProducts()
    }
}

This raises the question: which thread is responsible for executing actual I/O operations? By default, each query’s operations run on a separate reactor NIO thread that’s chosen by an underlying scheduler implementation.

5.2. Controllers with Coroutines

Let’s refactor the controller by leveraging suspending functions and using the corresponding repository class:

@RestController
class ProductControllerCoroutines {
    @Autowired
    lateinit var productRepository: ProductRepositoryCoroutines

    @GetMapping("/{id}")
    suspend fun findOne(@PathVariable id: Int): Product? {
        return productRepository.getProductById(id)
    }

    @FlowPreview
    @GetMapping("/")
    fun findAll(): Flow<Product> {
        return productRepository.getAllProducts()
    }
}

First, please note that the findAll() function is not a suspending one. However, as far as we’re returning Flow, it internally calls suspending functions.

For this version, database queries will run on the same reactor thread as in the reactive example.

6. Spring WebFlux WebClient

Next, imagine we have microservices in our system.

In order to complete a request, we need to query another service to get additional data. So, in our case, a good example would be getting a product stock quantity. We’ll use WebClient from the WebFlux framework to call another service via the API.

6.1. Reactive WebClient

To start with, let’s see how to make a simple request:

val htmlResponse = webClient.get()
  .uri("https://www.baeldung.com/")
  .retrieve().bodyToMono<String>()

The next move is to call the external stock service to get stock quantity and then return a combined result to a client. At first, we’ll get a product from the repository and then query the stock service:

@GetMapping("/{id}/stock")
fun findOneInStock(@PathVariable id: Int): Mono<ProductStockView> {
   val product = productRepository.getProductById(id)
   
   val stockQuantity = webClient.get()
     .uri("/stock-service/product/$id/quantity")
     .accept(MediaType.APPLICATION_JSON)
     .retrieve()
     .bodyToMono<Int>()
   return product.zipWith(stockQuantity) { 
       productInStock, stockQty ->
         ProductStockView(productInStock, stockQty)
   }
}

Notice that we’re returning an object of the Mono type from the repository. Then, we’re getting a Mono from the WebClient. Finally, the actual subscription happens when we call the zipWith() method. We wait for both requests to complete and finally combine them into a new object.

6.2. WebClient with Coroutines

Now, let’s see how to use the WebClient with coroutines.

To perform a GET request, we apply the awaitBody() suspending extension function:

val htmlResponse = webClient.get()
  .uri("https://www.baeldung.com/")
  .retrieve()
  .awaitBody<String>()

This way, if the API call returns anything but a 2xx response, the retrieve() method will throw an exception. In order to customize the response handling for different response statuses, we can use the awaitExchange() suspending extension function:

val response: ResponseEntity<String> = webClient.get()
  .uri("https://www.baeldung.com/")
  .awaitExchange()
  .awaitEntity()

Since we have access to the resulting ResponseEntity, we can check the status code and then act accordingly.

Let’s get back to our microservice example. We can execute a request against the stock service:

@GetMapping("/{id}/stock")
suspend fun findOneInStock(@PathVariable id: Int): ProductStockView {
    val product = productRepository.getProductById(id)
    val quantity = webClient.get()
      .uri("/stock-service/product/$id/quantity")
      .accept(APPLICATION_JSON)
      .retrieve()
      .awaitBody<Int>()
    return ProductStockView(product!!, quantity)
}

We should note that this looks like a blocking code. One of the main benefits of using coroutines is the ability to write asynchronous code fluently and readably.

In the example above, the database query and the web request will be executed one after another. This is because coroutines are sequential by default.

Can we run suspending functions in parallel? Absolutely! Let’s modify our endpoint method to run the queries in parallel:

@GetMapping("/{id}/stock")
suspend fun findOneInStock(@PathVariable id: Int): ProductStockView = coroutineScope {
    val product: Deferred<Product?> = async(start = CoroutineStart.LAZY) {
        productRepository.getProductById(id)
    }
    val quantity: Deferred<Int> = async(start = CoroutineStart.LAZY) {
        webClient.get()
          .uri("/stock-service/product/$id/quantity")
          .accept(APPLICATION_JSON)
          .retrieve().awaitBody<Int>()
    }
    product.start()
    quantity.start()
    ProductStockView(product.await()!!, quantity.await())
}

Here, by wrapping a suspending function in the async{} block, we get an object of the Deferred<> type. By default, coroutines are immediately scheduled for execution. As a result, if we want to run them exactly when the await() method is called, and in parallel, we need to pass CoroutineStart.LAZY as the optional start parameter.

Finally, to start executing functions, we call the start*()* method. In that case, the two functions will execute in parallel. This technique is also known as parallel decomposition.

It’s interesting to note that functions in async blocks are dispatched to separate worker threads. After that, actual I/O operations happen on threads from the Reactor NIO pool.

To enforce structured concurrency, we’ve used the coroutineScope{} scoping function to create our own scope. It will wait for all the coroutines inside the block to complete before completing itself. However, the coroutineScope{} function doesn’t block the current thread compared to runBlocking.

7. WebFlux.fn DSL Routes

Finally, let’s see how to use coroutines with DSL Routes definitions.

The WebFlux Functional Framework, powered by Kotlin, provides a concise and fluent way to define endpoints. The coRouter {} DSL supports Kotlin Coroutines for defining router functions.

First, let’s define router endpoints in the DSL fashion:

@Configuration
class RouterConfiguration {
    @FlowPreview
    @Bean
    fun productRoutes(productsHandler: ProductsHandler) = coRouter {
        GET("/", productsHandler::findAll)
        GET("/{id}", productsHandler::findOne)
        GET("/{id}/stock", productsHandler::findOneInStock)
    }
}

Now we have our route definitions, let’s implement the ProductsHandler with the same functionality as the ProductController:

@Component
class ProductsHandler(
  @Autowired var webClient: WebClient, 
  @Autowired var productRepository: ProductRepositoryCoroutines) {
    
    @FlowPreview
    suspend fun findAll(request: ServerRequest): ServerResponse =
        ServerResponse.ok().json().bodyAndAwait(productRepository.getAllProducts())

    suspend fun findOneInStock(request: ServerRequest): ServerResponse {
        val id = request.pathVariable("id").toInt()
        val product: Deferred<Product?> = GlobalScope.async {
            productRepository.getProductById(id)
        }
        val quantity: Deferred<Int> = GlobalScope.async {
            webClient.get()
              .uri("/stock-service/product/$id/quantity")
              .accept(MediaType.APPLICATION_JSON)
              .retrieve().awaitBody<Int>()
        }
        return ServerResponse.ok()
          .json()
          .bodyValueAndAwait(ProductStockView(product.await()!!, quantity.await()))
    }

    suspend fun findOne(request: ServerRequest): ServerResponse {
        val id = request.pathVariable("id").toInt()
        return productRepository.getProductById(id)?.let 
          { ServerResponse.ok().json().bodyValueAndAwait(it) }
          ?: ServerResponse.notFound().buildAndAwait()
    }
}

We should note that we’ve used suspending functions for defining the ProductsHandler class. Not much changed compared to the controller except for request and response types.

This is all we need to set up a simple REST controller. Consequently, thanks to using the Routes DSL along with Kotlin coroutines, we have fluent and concise endpoint definitions.

8. Conclusion

In this article, we’ve explored Kotlin coroutines and found out how to integrate them with Spring frameworks, R2DBC, and WebFlux in particular.

Applying non-blocking approaches in a project may improve application performance and scalability. In addition, we’ve seen how using Kotlin coroutines can make asynchronous code more readable.

We should note that the mid-development versions of the above libraries may change a lot before they reach stable releases and that minor version differences may be incompatible with each other.

The code of the examples is available, as always, over on GitHub.


« 上一篇: MockMvc Kotlin DSL