概述

在我们的使用Couchbase在Spring应用中教程后续篇中,我们将探讨Couchbase SDK的异步特性以及如何利用它进行批量持久化操作,从而让应用程序更有效地利用Couchbase资源。

1.1. CrudService 接口增强

首先,我们扩展通用的CrudService接口,加入批量操作功能:

public interface CrudService<T> {
    ...
    
    List<T> readBulk(Iterable<String> ids);

    void createBulk(Iterable<T> items);

    void updateBulk(Iterable<T> items);

    void deleteBulk(Iterable<String> ids);

    boolean exists(String id);
}

1.2. CouchbaseEntity 接口

我们定义一个接口,用于表示我们想要持久化的实体:

public interface CouchbaseEntity {

    String getId();
    
    void setId(String id);
    
}

1.3. AbstractCrudService

然后,我们将实现这些方法在基类中。这个类继承自我们在上一教程中使用的PersonCrudService,并开始如下:

public abstract class AbstractCrudService<T extends CouchbaseEntity> implements CrudService<T> {
    private BucketService bucketService;
    private Bucket bucket;
    private JsonDocumentConverter<T> converter;

    public AbstractCrudService(BucketService bucketService, JsonDocumentConverter<T> converter) {
        this.bucketService = bucketService;
        this.converter = converter;
    }

    protected void loadBucket() {
        bucket = bucketService.getBucket();
    }
    
    ...
}

2. 异步存储桶接口

Couchbase SDK提供了AsyncBucket接口,用于执行异步操作。通过Bucket实例,你可以通过async()方法获取其异步版本:

AsyncBucket asyncBucket = bucket.async();

3. 批量操作

使用AsyncBucket接口进行批量操作时,我们将借助RxJava库。

3.1. 批量读取

这里我们实现readBulk方法。首先,我们使用AsyncBucketRxJavaflatMap机制异步获取文档到Observable<JsonDocument>,然后使用RxJavatoBlocking机制将这些转换为实体列表:

@Override
public List<T> readBulk(Iterable<String> ids) {
    AsyncBucket asyncBucket = bucket.async();
    Observable<JsonDocument> asyncOperation = Observable
      .from(ids)
      .flatMap(new Func1<String, Observable<JsonDocument>>() {
          public Observable<JsonDocument> call(String key) {
              return asyncBucket.get(key);
          }
    });

    List<T> items = new ArrayList<T>();
    try {
        asyncOperation.toBlocking()
          .forEach(new Action1<JsonDocument>() {
              public void call(JsonDocument doc) {
                  T item = converter.fromDocument(doc);
                  items.add(item);
              }
        });
    } catch (Exception e) {
        logger.error("Error during bulk get", e);
    }

    return items;
}

3.2. 批量插入

我们再次使用RxJavaflatMap结构来实现createBulk方法。

由于批量修改请求的生成速度可能超过响应的生成速度,导致过载情况,当遇到BackpressureException时,我们会实施带有指数延迟的重试:

@Override
public void createBulk(Iterable<T> items) {
    AsyncBucket asyncBucket = bucket.async();
    Observable
      .from(items)
      .flatMap(new Func1<T, Observable<JsonDocument>>() {
          @SuppressWarnings("unchecked")
          @Override
          public Observable<JsonDocument> call(final T t) {
              if(t.getId() == null) {
                  t.setId(UUID.randomUUID().toString());
              }
              JsonDocument doc = converter.toDocument(t);
              return asyncBucket.insert(doc)
                .retryWhen(RetryBuilder
                  .anyOf(BackpressureException.class)
                  .delay(Delay.exponential(TimeUnit.MILLISECONDS, 100))
                  .max(10)
                  .build());
          }
      })
      .last()
      .toBlocking()
      .single();
}

3.3. 批量更新

我们在updateBulk方法中使用类似机制:

@Override
public void updateBulk(Iterable<T> items) {
    AsyncBucket asyncBucket = bucket.async();
    Observable
      .from(items)
      .flatMap(new Func1<T, Observable<JsonDocument>>() {
          @SuppressWarnings("unchecked")
          @Override
          public Observable<JsonDocument> call(final T t) {
              JsonDocument doc = converter.toDocument(t);
              return asyncBucket.upsert(doc)
                .retryWhen(RetryBuilder
                  .anyOf(BackpressureException.class)
                  .delay(Delay.exponential(TimeUnit.MILLISECONDS, 100))
                  .max(10)
                  .build());
          }
      })
      .last()
      .toBlocking()
      .single();
}

3.4. 批量删除

我们编写deleteBulk方法如下:

@Override
public void deleteBulk(Iterable<String> ids) {
    AsyncBucket asyncBucket = bucket.async();
    Observable
      .from(ids)
      .flatMap(new Func1<String, Observable<JsonDocument>>() {
          @SuppressWarnings("unchecked")
          @Override
          public Observable<JsonDocument> call(String key) {
              return asyncBucket.remove(key)
                .retryWhen(RetryBuilder
                  .anyOf(BackpressureException.class)
                  .delay(Delay.exponential(TimeUnit.MILLISECONDS, 100))
                  .max(10)
                  .build());
          }
      })
      .last()
      .toBlocking()
      .single();
}

4. PersonCrudService

最后,我们编写一个Spring服务PersonCrudService,它继承自我们的AbstractCrudService,针对Person实体。

由于所有的Couchbase交互都在抽象类中实现,实体类的实现非常简单,只需确保所有依赖项已注入并加载存储桶:

@Service
public class PersonCrudService extends AbstractCrudService<Person> {

    @Autowired
    public PersonCrudService(
      @Qualifier("TutorialBucketService") BucketService bucketService,
      PersonDocumentConverter converter) {
        super(bucketService, converter);
    }

    @PostConstruct
    private void init() {
        loadBucket();
    }
}

5. 总结

本教程中的源代码可以在GitHub项目中找到。

有关Couchbase Java SDK的更多信息,请访问官方开发者文档网站


« 上一篇: Orika 映射