1. 简介
在本教程中,我们将学习如何使用 Spring Data Cassandra 的响应式数据访问功能。
这是 Spring Data Cassandra 系列文章的第三篇。在这篇文章中,我们将通过 REST API 暴露一个 Cassandra 数据库接口。
如果你对 Spring Data Cassandra 还不熟悉,可以先阅读系列中的第一篇和第二篇文章。
2. Maven 依赖
✅ Spring Data Cassandra 支持 Project Reactor 和 RxJava 的响应式类型。
在本教程中,我们主要使用 Project Reactor 提供的 Flux
和 Mono
类型来演示。
首先,添加以下依赖:
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-cassandra</artifactId>
<version>2.1.2.RELEASE</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
你可以在这里查看最新的 spring-data-cassandra 版本。
接下来,我们还需要添加 Spring Web 模块的支持,用于暴露 REST 接口:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
3. 实现我们的应用
由于我们需要持久化数据,首先定义实体类:
@Table
public class Employee {
@PrimaryKey
private int id;
private String name;
private String address;
private String email;
private int age;
// 构造方法、getter/setter 省略
}
然后创建一个继承自 ReactiveCassandraRepository
的接口:
public interface EmployeeRepository extends ReactiveCassandraRepository<Employee, Integer> {
@AllowFiltering
Flux<Employee> findByAgeGreaterThan(int age);
}
⚠️ 注意:这个接口是开启响应式支持的关键。
3.1. REST 控制器实现 CRUD 操作
为了演示,我们编写一个简单的 REST 控制器,暴露一些基本的查询操作:
@RestController
@RequestMapping("employee")
public class EmployeeController {
@Autowired
EmployeeService employeeService;
@PostConstruct
public void saveEmployees() {
List<Employee> employees = new ArrayList<>();
employees.add(new Employee(123, "John Doe", "Delaware", "john.doe@example.com", 31));
employees.add(new Employee(324, "Adam Smith", "North Carolina", "adam.smith@example.com", 43));
employees.add(new Employee(355, "Kevin Dunner", "Virginia", "kevin.dunner@example.com", 24));
employees.add(new Employee(643, "Mike Lauren", "New York", "mike.lauren@example.com", 41));
employeeService.initializeEmployees(employees);
}
@GetMapping("/list")
public Flux<Employee> getAllEmployees() {
return employeeService.getAllEmployees();
}
@GetMapping("/{id}")
public Mono<Employee> getEmployeeById(@PathVariable int id) {
return employeeService.getEmployeeById(id);
}
@GetMapping("/filterByAge/{age}")
public Flux<Employee> getEmployeesFilterByAge(@PathVariable int age) {
return employeeService.getEmployeesFilterByAge(age);
}
}
对应的 Service 层也很简单粗暴:
@Service
public class EmployeeService {
@Autowired
EmployeeRepository employeeRepository;
public void initializeEmployees(List<Employee> employees) {
Flux<Employee> savedEmployees = employeeRepository.saveAll(employees);
savedEmployees.subscribe();
}
public Flux<Employee> getAllEmployees() {
return employeeRepository.findAll();
}
public Flux<Employee> getEmployeesFilterByAge(int age) {
return employeeRepository.findByAgeGreaterThan(age);
}
public Mono<Employee> getEmployeeById(int id) {
return employeeRepository.findById(id);
}
}
3.2. 数据库配置
接着,在 application.properties
中指定连接 Cassandra 所需的信息:
spring.data.cassandra.keyspace-name=practice
spring.data.cassandra.port=9042
spring.data.cassandra.local-datacenter=datacenter1
📌 注意:datacenter1
是默认的数据中心名称。
4. 测试接口
4.1. 手动测试
先获取所有员工信息:
curl localhost:8080/employee/list
响应结果如下:
[
{
"id": 324,
"name": "Adam Smith",
"address": "North Carolina",
"email": "adam.smith@example.com",
"age": 43
},
{
"id": 123,
"name": "John Doe",
"address": "Delaware",
"email": "john.doe@example.com",
"age": 31
},
{
"id": 355,
"name": "Kevin Dunner",
"address": "Virginia",
"email": "kevin.dunner@example.com",
"age": 24
},
{
"id": 643,
"name": "Mike Lauren",
"address": "New York",
"email": "mike.lauren@example.com",
"age": 41
}
]
再根据 ID 查询特定员工:
curl localhost:8080/employee/643
返回结果:
{
"id": 643,
"name": "Mike Lauren",
"address": "New York",
"email": "mike.lauren@example.com",
"age": 41
}
最后测试按年龄过滤:
curl localhost:8080/employee/filterByAge/35
返回年龄大于 35 的员工:
[
{
"id": 324,
"name": "Adam Smith",
"address": "North Carolina",
"email": "adam.smith@example.com",
"age": 43
},
{
"id": 643,
"name": "Mike Lauren",
"address": "New York",
"email": "mike.lauren@example.com",
"age": 41
}
]
4.2. 集成测试
此外,我们也可以写一个集成测试验证功能是否正常:
@RunWith(SpringRunner.class)
@SpringBootTest
public class ReactiveEmployeeRepositoryIntegrationTest {
@Autowired
EmployeeRepository repository;
@Before
public void setUp() {
Flux<Employee> deleteAndInsert = repository.deleteAll()
.thenMany(repository.saveAll(Flux.just(
new Employee(111, "John Doe", "Delaware", "john.doe@example.com", 31),
new Employee(222, "Adam Smith", "North Carolina", "adam.smith@example.com", 43),
new Employee(333, "Kevin Dunner", "Virginia", "kevin.dunner@example.com", 24),
new Employee(444, "Mike Lauren", "New York", "mike.lauren@example.com", 41))));
StepVerifier
.create(deleteAndInsert)
.expectNextCount(4)
.verifyComplete();
}
@Test
public void givenRecordsAreInserted_whenDbIsQueried_thenShouldIncludeNewRecords() {
Mono<Long> saveAndCount = repository.count()
.doOnNext(System.out::println)
.thenMany(repository
.saveAll(Flux.just(
new Employee(325, "Kim Jones", "Florida", "kim.jones@example.com", 42),
new Employee(654, "Tom Moody", "New Hampshire", "tom.moody@example.com", 44))))
.last()
.flatMap(v -> repository.count())
.doOnNext(System.out::println);
StepVerifier
.create(saveAndCount)
.expectNext(6L)
.verifyComplete();
}
@Test
public void givenAgeForFilter_whenDbIsQueried_thenShouldReturnFilteredRecords() {
StepVerifier
.create(repository.findByAgeGreaterThan(35))
.expectNextCount(2)
.verifyComplete();
}
}
5. 总结
总结一下,我们学习了如何使用 Spring Data Cassandra 构建响应式(非阻塞)应用,并通过 REST 接口对外提供服务。
一如既往,本文示例代码可以在 GitHub 上找到:源码地址。