1. Introduction

In this tutorial, we’ll learn how to use reactive data access features of Spring Data Cassandra.

Particularly, this is the third article of the Spring Data Cassandra article series. In this one, we’ll expose a Cassandra database using a REST API.

We can read more about Spring Data Cassandra in the first and second articles of the series.

2. Maven Dependencies

As a matter of fact, Spring Data Cassandra supports Project Reactor and RxJava reactive types. To demonstrate, we’ll use the Project reactor’s reactive types Flux and Mono in this tutorial.

To start with, let’s add the dependencies needed for our tutorial:

<dependency>
    <groupId>org.springframework.data</groupId>
    <artifactId>spring-data-cassandra</artifactId>
    <version>2.1.2.RELEASE</version>
</dependency>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
</dependency>

The latest version of the spring-data-cassandra can be found here.

Now, we’re going to expose SELECT operations from the database via a REST API. So, let’s add the dependency for RestController, too:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

3. Implementing Our App

Since we will be persisting data, let’s first define our entity object*:*

@Table
public class Employee {
    @PrimaryKey
    private int id;
    private String name;
    private String address;
    private String email;
    private int age;
}

Next, its time to create an EmployeeRepository that extends from ReactiveCassandraRepository. It’s important to note that this interface enables the support for reactive types:

public interface EmployeeRepository extends ReactiveCassandraRepository<Employee, Integer> {
    @AllowFiltering
    Flux<Employee> findByAgeGreaterThan(int age);
}

3.1. Rest Controller for CRUD Operations

For the purpose of illustration, we’ll expose some basic SELECT operations using a simple Rest Controller:

@RestController
@RequestMapping("employee")
public class EmployeeController {

    @Autowired
    EmployeeService employeeService;

    @PostConstruct
    public void saveEmployees() {
        List<Employee> employees = new ArrayList<>();
        employees.add(new Employee(123, "John Doe", "Delaware", "[email protected]", 31));
        employees.add(new Employee(324, "Adam Smith", "North Carolina", "[email protected]", 43));
        employees.add(new Employee(355, "Kevin Dunner", "Virginia", "[email protected]", 24));
        employees.add(new Employee(643, "Mike Lauren", "New York", "[email protected]", 41));
        employeeService.initializeEmployees(employees);
    }

    @GetMapping("/list")
    public Flux<Employee> getAllEmployees() {
        Flux<Employee> employees = employeeService.getAllEmployees();
        return employees;
    }

    @GetMapping("/{id}")
    public Mono<Employee> getEmployeeById(@PathVariable int id) {
        return employeeService.getEmployeeById(id);
    }

    @GetMapping("/filterByAge/{age}")
    public Flux<Employee> getEmployeesFilterByAge(@PathVariable int age) {
        return employeeService.getEmployeesFilterByAge(age);
    }
}

Finally, let’s add a simple EmployeeService:

@Service
public class EmployeeService {

    @Autowired
    EmployeeRepository employeeRepository;

    public void initializeEmployees(List<Employee> employees) {
        Flux<Employee> savedEmployees = employeeRepository.saveAll(employees);
        savedEmployees.subscribe();
    }

    public Flux<Employee> getAllEmployees() {
        Flux<Employee> employees =  employeeRepository.findAll();
        return employees;
    }

    public Flux<Employee> getEmployeesFilterByAge(int age) {
        return employeeRepository.findByAgeGreaterThan(age);
    }

    public Mono<Employee> getEmployeeById(int id) {
        return employeeRepository.findById(id);
    }
}

3.2. Database Configuration

Then, let’s specify the keyspace and port to use for connecting with Cassandra in application.properties:

spring.data.cassandra.keyspace-name=practice
spring.data.cassandra.port=9042
spring.data.cassandra.local-datacenter=datacenter1

Note: datacenter1 is the default data center name.

4. Testing the Endpoints

Finally, its time to test our API endpoints.

4.1. Manual Testing

To begin with, let’s fetch the employee records from the database:

curl localhost:8080/employee/list

As a result, we get all the employees:

[
    {
        "id": 324,
        "name": "Adam Smith",
        "address": "North Carolina",
        "email": "[email protected]",
        "age": 43
    },
    {
        "id": 123,
        "name": "John Doe",
        "address": "Delaware",
        "email": "[email protected]",
        "age": 31
    },
    {
        "id": 355,
        "name": "Kevin Dunner",
        "address": "Virginia",
        "email": "[email protected]",
        "age": 24
    },
    {
        "id": 643,
        "name": "Mike Lauren",
        "address": "New York",
        "email": "[email protected]",
       "age": 41
    }
]

Moving on, let’s try to find a specific employee by his id:

curl localhost:8080/employee/643

As a result, we get Mr. Mike Lauren back:

{
    "id": 643,
    "name": "Mike Lauren",
    "address": "New York",
    "email": "[email protected]",
    "age": 41
}

Finally, let’s see if our age filter works:

curl localhost:8080/employee/filterByAge/35

And as expected, we get all the employees whose age is greater than 35:

[
    {
        "id": 324,
        "name": "Adam Smith",
        "address": "North Carolina",
        "email": "[email protected]",
        "age": 43
    },
    {
        "id": 643,
        "name": "Mike Lauren",
        "address": "New York",
        "email": "[email protected]",
        "age": 41
    }
]

4.2. Integration Testing

Additionally, let’s test the same functionality by writing a test case:

@RunWith(SpringRunner.class)
@SpringBootTest
public class ReactiveEmployeeRepositoryIntegrationTest {

    @Autowired
    EmployeeRepository repository;

    @Before
    public void setUp() {
        Flux<Employee> deleteAndInsert = repository.deleteAll()
          .thenMany(repository.saveAll(Flux.just(
            new Employee(111, "John Doe", "Delaware", "[email protected]", 31),
            new Employee(222, "Adam Smith", "North Carolina", "[email protected]", 43),
            new Employee(333, "Kevin Dunner", "Virginia", "[email protected]", 24),
            new Employee(444, "Mike Lauren", "New York", "[email protected]", 41))));

        StepVerifier
          .create(deleteAndInsert)
          .expectNextCount(4)
          .verifyComplete();
    }

    @Test
    public void givenRecordsAreInserted_whenDbIsQueried_thenShouldIncludeNewRecords() {
        Mono<Long> saveAndCount = repository.count()
          .doOnNext(System.out::println)
          .thenMany(repository
            .saveAll(Flux.just(
            new Employee(325, "Kim Jones", "Florida", "[email protected]", 42),
            new Employee(654, "Tom Moody", "New Hampshire", "[email protected]", 44))))
          .last()
          .flatMap(v -> repository.count())
          .doOnNext(System.out::println);

        StepVerifier
          .create(saveAndCount)
          .expectNext(6L)
          .verifyComplete();
    }

    @Test
    public void givenAgeForFilter_whenDbIsQueried_thenShouldReturnFilteredRecords() {
        StepVerifier
          .create(repository.findByAgeGreaterThan(35))
          .expectNextCount(2)
          .verifyComplete();
    }
}

5. Conclusion

In summary, we learned how to use reactive types using Spring Data Cassandra to build a non-blocking application.

As always, check out the source code for this tutorial over on GitHub.