概述
在我们的使用Couchbase在Spring应用中教程后续篇中,我们将探讨Couchbase SDK的异步特性以及如何利用它进行批量持久化操作,从而让应用程序更有效地利用Couchbase资源。
1.1. CrudService
接口增强
首先,我们扩展通用的CrudService
接口,加入批量操作功能:
public interface CrudService<T> {
...
List<T> readBulk(Iterable<String> ids);
void createBulk(Iterable<T> items);
void updateBulk(Iterable<T> items);
void deleteBulk(Iterable<String> ids);
boolean exists(String id);
}
1.2. CouchbaseEntity
接口
我们定义一个接口,用于表示我们想要持久化的实体:
public interface CouchbaseEntity {
String getId();
void setId(String id);
}
1.3. AbstractCrudService
类
然后,我们将实现这些方法在基类中。这个类继承自我们在上一教程中使用的PersonCrudService
,并开始如下:
public abstract class AbstractCrudService<T extends CouchbaseEntity> implements CrudService<T> {
private BucketService bucketService;
private Bucket bucket;
private JsonDocumentConverter<T> converter;
public AbstractCrudService(BucketService bucketService, JsonDocumentConverter<T> converter) {
this.bucketService = bucketService;
this.converter = converter;
}
protected void loadBucket() {
bucket = bucketService.getBucket();
}
...
}
2. 异步存储桶接口
Couchbase SDK提供了AsyncBucket
接口,用于执行异步操作。通过Bucket
实例,你可以通过async()
方法获取其异步版本:
AsyncBucket asyncBucket = bucket.async();
3. 批量操作
使用AsyncBucket
接口进行批量操作时,我们将借助RxJava
库。
3.1. 批量读取
这里我们实现readBulk
方法。首先,我们使用AsyncBucket
和RxJava
的flatMap
机制异步获取文档到Observable<JsonDocument>
,然后使用RxJava
的toBlocking
机制将这些转换为实体列表:
@Override
public List<T> readBulk(Iterable<String> ids) {
AsyncBucket asyncBucket = bucket.async();
Observable<JsonDocument> asyncOperation = Observable
.from(ids)
.flatMap(new Func1<String, Observable<JsonDocument>>() {
public Observable<JsonDocument> call(String key) {
return asyncBucket.get(key);
}
});
List<T> items = new ArrayList<T>();
try {
asyncOperation.toBlocking()
.forEach(new Action1<JsonDocument>() {
public void call(JsonDocument doc) {
T item = converter.fromDocument(doc);
items.add(item);
}
});
} catch (Exception e) {
logger.error("Error during bulk get", e);
}
return items;
}
3.2. 批量插入
我们再次使用RxJava
的flatMap
结构来实现createBulk
方法。
由于批量修改请求的生成速度可能超过响应的生成速度,导致过载情况,当遇到BackpressureException
时,我们会实施带有指数延迟的重试:
@Override
public void createBulk(Iterable<T> items) {
AsyncBucket asyncBucket = bucket.async();
Observable
.from(items)
.flatMap(new Func1<T, Observable<JsonDocument>>() {
@SuppressWarnings("unchecked")
@Override
public Observable<JsonDocument> call(final T t) {
if(t.getId() == null) {
t.setId(UUID.randomUUID().toString());
}
JsonDocument doc = converter.toDocument(t);
return asyncBucket.insert(doc)
.retryWhen(RetryBuilder
.anyOf(BackpressureException.class)
.delay(Delay.exponential(TimeUnit.MILLISECONDS, 100))
.max(10)
.build());
}
})
.last()
.toBlocking()
.single();
}
3.3. 批量更新
我们在updateBulk
方法中使用类似机制:
@Override
public void updateBulk(Iterable<T> items) {
AsyncBucket asyncBucket = bucket.async();
Observable
.from(items)
.flatMap(new Func1<T, Observable<JsonDocument>>() {
@SuppressWarnings("unchecked")
@Override
public Observable<JsonDocument> call(final T t) {
JsonDocument doc = converter.toDocument(t);
return asyncBucket.upsert(doc)
.retryWhen(RetryBuilder
.anyOf(BackpressureException.class)
.delay(Delay.exponential(TimeUnit.MILLISECONDS, 100))
.max(10)
.build());
}
})
.last()
.toBlocking()
.single();
}
3.4. 批量删除
我们编写deleteBulk
方法如下:
@Override
public void deleteBulk(Iterable<String> ids) {
AsyncBucket asyncBucket = bucket.async();
Observable
.from(ids)
.flatMap(new Func1<String, Observable<JsonDocument>>() {
@SuppressWarnings("unchecked")
@Override
public Observable<JsonDocument> call(String key) {
return asyncBucket.remove(key)
.retryWhen(RetryBuilder
.anyOf(BackpressureException.class)
.delay(Delay.exponential(TimeUnit.MILLISECONDS, 100))
.max(10)
.build());
}
})
.last()
.toBlocking()
.single();
}
4. PersonCrudService
最后,我们编写一个Spring服务PersonCrudService
,它继承自我们的AbstractCrudService
,针对Person
实体。
由于所有的Couchbase交互都在抽象类中实现,实体类的实现非常简单,只需确保所有依赖项已注入并加载存储桶:
@Service
public class PersonCrudService extends AbstractCrudService<Person> {
@Autowired
public PersonCrudService(
@Qualifier("TutorialBucketService") BucketService bucketService,
PersonDocumentConverter converter) {
super(bucketService, converter);
}
@PostConstruct
private void init() {
loadBucket();
}
}
5. 总结
本教程中的源代码可以在GitHub项目中找到。
有关Couchbase Java SDK的更多信息,请访问官方开发者文档网站。