1. 概述
简单来说,rxjava-jdbc 是一个用于与关系型数据库交互的 API,支持流式风格的方法调用。在这篇快速教程中,我们将了解这个库的基本用法和常见功能。
如果你想学习 RxJava 基础,可以参考这篇文章。
2. Maven 依赖
首先在 pom.xml
中添加依赖:
<dependency>
<groupId>com.github.davidmoten</groupId>
<artifactId>rxjava-jdbc</artifactId>
<version>0.7.11</version>
</dependency>
最新版本可在 Maven Central 查找。
3. 核心组件
Database
类是执行所有数据库操作的主要入口点。通过实现 ConnectionProvider
接口并传入 from()
静态方法创建 Database
对象:
public static ConnectionProvider connectionProvider
= new ConnectionProviderFromUrl(
DB_CONNECTION, DB_USER, DB_PASSWORD);
Database db = Database.from(connectionProvider);
ConnectionProvider
有多种实现值得注意:
ConnectionProviderFromContext
ConnectionProviderFromDataSource
ConnectionProviderFromUrl
ConnectionProviderPooled
基础操作主要使用以下 API:
select()
– 用于 SQL 查询update()
– 用于 DDL(如创建/删除表)和 DML(增删改)
4. 快速上手
下面演示如何执行基本数据库操作:
public class BasicQueryTypesTest {
Observable<Integer> create,
insert1,
insert2,
insert3,
update,
delete = null;
@Test
public void whenCreateTableAndInsertRecords_thenCorrect() {
create = db.update(
"CREATE TABLE IF NOT EXISTS EMPLOYEE("
+ "id int primary key, name varchar(255))")
.count();
insert1 = db.update(
"INSERT INTO EMPLOYEE(id, name) VALUES(1, 'John')")
.dependsOn(create)
.count();
update = db.update(
"UPDATE EMPLOYEE SET name = 'Alan' WHERE id = 1")
.dependsOn(create)
.count();
insert2 = db.update(
"INSERT INTO EMPLOYEE(id, name) VALUES(2, 'Sarah')")
.dependsOn(create)
.count();
insert3 = db.update(
"INSERT INTO EMPLOYEE(id, name) VALUES(3, 'Mike')")
.dependsOn(create)
.count();
delete = db.update(
"DELETE FROM EMPLOYEE WHERE id = 2")
.dependsOn(create)
.count();
List<String> names = db.select(
"select name from EMPLOYEE where id < ?")
.parameter(3)
.dependsOn(create)
.dependsOn(insert1)
.dependsOn(insert2)
.dependsOn(insert3)
.dependsOn(update)
.dependsOn(delete)
.getAs(String.class)
.toList()
.toBlocking()
.single();
assertEquals(Arrays.asList("Alan"), names);
}
}
⚠️ 注意:必须通过 dependsOn()
明确声明操作顺序,否则可能产生不可预测的结果或直接报错。
5. 自动映射
自动映射功能可将数据库记录直接映射为对象,支持两种方式:
5.1. 基于接口的映射
通过注解接口实现映射。先定义接口:
public interface Employee {
@Column("id")
int id();
@Column("name")
String name();
}
使用示例:
@Test
public void whenSelectFromTableAndAutomap_thenCorrect() {
List<Employee> employees = db.select("select id, name from EMPLOYEE")
.dependsOn(create)
.dependsOn(insert1)
.dependsOn(insert2)
.autoMap(Employee.class)
.toList()
.toBlocking()
.single();
assertThat(
employees.get(0).id()).isEqualTo(1);
assertThat(
employees.get(0).name()).isEqualTo("Alan");
assertThat(
employees.get(1).id()).isEqualTo(2);
assertThat(
employees.get(1).name()).isEqualTo("Sarah");
}
5.2. 基于类的映射
使用具体类实现映射。定义类:
public class Manager {
private int id;
private String name;
// 标准构造函数、getter/setter
}
使用示例:
@Test
public void whenSelectManagersAndAutomap_thenCorrect() {
List<Manager> managers = db.select("select id, name from MANAGER")
.dependsOn(create)
.dependsOn(insert1)
.dependsOn(insert2)
.autoMap(Manager.class)
.toList()
.toBlocking()
.single();
assertThat(
managers.get(0).getId()).isEqualTo(1);
assertThat(
managers.get(0).getName()).isEqualTo("Alan");
assertThat(
managers.get(1).getId()).isEqualTo(2);
assertThat(
managers.get(1).getName()).isEqualTo("Sarah");
}
✅ 关键点:
create
/insert1
/insert2
是创建表和插入数据的操作引用- 查询字段数量必须与类构造函数参数数量一致
- 字段类型需支持自动映射(如
int
↔INTEGER
)
更多映射细节参考 rxjava-jdbc 仓库
6. 处理大对象
支持 CLOB 和 BLOB 类型操作,以下是具体用法:
6.1. CLOB 操作
插入和查询 CLOB 示例:
@Before
public void setup() throws IOException {
create = db.update(
"CREATE TABLE IF NOT EXISTS " +
"SERVERLOG (id int primary key, document CLOB)")
.count();
InputStream actualInputStream
= new FileInputStream("src/test/resources/actual_clob");
actualDocument = getStringFromInputStream(actualInputStream);
InputStream expectedInputStream = new FileInputStream(
"src/test/resources/expected_clob");
expectedDocument = getStringFromInputStream(expectedInputStream);
insert = db.update(
"insert into SERVERLOG(id,document) values(?,?)")
.parameter(1)
.parameter(Database.toSentinelIfNull(actualDocument))
.dependsOn(create)
.count();
}
@Test
public void whenSelectCLOB_thenCorrect() throws IOException {
db.select("select document from SERVERLOG where id = 1")
.dependsOn(create)
.dependsOn(insert)
.getAs(String.class)
.toList()
.toBlocking()
.single();
assertEquals(expectedDocument, actualDocument);
}
getStringFromInputStream()
是将InputStream
转为String
的工具方法。
6.2. BLOB 操作
与 CLOB 类似,但需传入字节数组:
@Before
public void setup() throws IOException {
create = db.update(
"CREATE TABLE IF NOT EXISTS "
+ "SERVERLOG (id int primary key, document BLOB)")
.count();
InputStream actualInputStream
= new FileInputStream("src/test/resources/actual_clob");
actualDocument = getStringFromInputStream(actualInputStream);
byte[] bytes = this.actualDocument.getBytes(StandardCharsets.UTF_8);
InputStream expectedInputStream = new FileInputStream(
"src/test/resources/expected_clob");
expectedDocument = getStringFromInputStream(expectedInputStream);
insert = db.update(
"insert into SERVERLOG(id,document) values(?,?)")
.parameter(1)
.parameter(Database.toSentinelIfNull(bytes))
.dependsOn(create)
.count();
}
测试代码可复用上例的查询逻辑。
7. 事务管理
事务支持将多个操作捆绑执行,实现统一提交或回滚:
@Test
public void whenCommitTransaction_thenRecordUpdated() {
Observable<Boolean> begin = db.beginTransaction();
Observable<Integer> createStatement = db.update(
"CREATE TABLE IF NOT EXISTS EMPLOYEE(id int primary key, name varchar(255))")
.dependsOn(begin)
.count();
Observable<Integer> insertStatement = db.update(
"INSERT INTO EMPLOYEE(id, name) VALUES(1, 'John')")
.dependsOn(createStatement)
.count();
Observable<Integer> updateStatement = db.update(
"UPDATE EMPLOYEE SET name = 'Tom' WHERE id = 1")
.dependsOn(insertStatement)
.count();
Observable<Boolean> commit = db.commit(updateStatement);
String name = db.select("select name from EMPLOYEE WHERE id = 1")
.dependsOn(commit)
.getAs(String.class)
.toBlocking()
.single();
assertEquals("Tom", name);
}
事务操作流程:
beginTransaction()
开启事务- 后续操作自动加入同一事务
commit()
提交或rollback()
回滚
❌ 踩坑提示:务必在异常处理中调用 rollback()
,避免数据不一致。
8. 获取自增主键
当表使用 auto_increment
字段时,可通过 returnGeneratedKeys()
获取生成值:
@Test
public void whenInsertAndReturnGeneratedKey_thenCorrect() {
Integer key = db.update("INSERT INTO EMPLOYEE(name) VALUES('John')")
.dependsOn(createStatement)
.returnGeneratedKeys()
.getAs(Integer.class)
.count()
.toBlocking()
.single();
assertThat(key).isEqualTo(1);
}
9. 总结
本文介绍了 rxjava-jdbc 的流式 API 用法,包括:
- ✅ 基础增删改查操作
- ✅ 自动映射(接口/类两种方式)
- ✅ 大对象(CLOB/BLOB)处理
- ✅ 事务管理
- ✅ 自增主键获取
完整代码示例见 GitHub 仓库。