1. 简介

RxJava入门文章后,我们来看看聚合和数学操作符。

这些操作符必须等待源Observable发射所有项目后才能工作。⚠️ 因此,在可能代表超长或无限序列的Observable上使用它们存在风险。

其次,所有示例都使用TestSubscriber实例——这是Subscriber的特殊变体,专用于单元测试,可执行断言、检查接收事件或包装模拟的Subscriber

现在,让我们开始探索数学操作符。

2. 环境配置

要使用额外操作符,需要在pom.xml添加依赖

<dependency>
    <groupId>io.reactivex</groupId>
    <artifactId>rxjava-math</artifactId>
    <version>1.0.0</version>
</dependency>

或Gradle项目:

compile 'io.reactivex:rxjava-math:1.0.0'

3. 数学操作符

MathObservable专门用于执行数学运算,其操作符作用于另一个Observable,该Observable发射可被评估为数字的项目。

3.1. 平均值操作符

average操作符发射单个值——源Observable所有发射值的平均值。

看个实际例子:

Observable<Integer> sourceObservable = Observable.range(1, 20);
TestSubscriber<Integer> subscriber = TestSubscriber.create();

MathObservable.averageInteger(sourceObservable).subscribe(subscriber);

subscriber.assertValue(10);

处理原始值有四种类似操作符:averageIntegeraverageLongaverageFloataverageDouble

3.2. 最大值操作符

max操作符发射遇到的最大数值。

实际演示:

Observable<Integer> sourceObservable = Observable.range(1, 20);
TestSubscriber<Integer> subscriber = TestSubscriber.create();

MathObservable.max(sourceObservable).subscribe(subscriber);

subscriber.assertValue(9);

注意:max操作符有重载方法,可接受比较函数。由于数学操作符也能处理可被当作数字的对象,max重载操作符允许比较自定义类型或对标准类型进行自定义排序。

定义Item类:

class Item {
    private Integer id;

    // 标准构造函数、getter和setter
}

现在定义itemObservable,用max操作符发射id最大的Item

Item five = new Item(5);
List<Item> list = Arrays.asList(
  new Item(1), 
  new Item(2), 
  new Item(3), 
  new Item(4), 
  five);
Observable<Item> itemObservable = Observable.from(list);

TestSubscriber<Item> subscriber = TestSubscriber.create();

MathObservable.from(itemObservable)
  .max(Comparator.comparing(Item::getId))
  .subscribe(subscriber);

subscriber.assertValue(five);

3.3. 最小值操作符

min操作符发射单个项目,包含源Observable中的最小元素:

Observable<Integer> sourceObservable = Observable.range(1, 20);
TestSubscriber<Integer> subscriber = TestSubscriber.create();

MathObservable.min(sourceObservable).subscribe(subscriber);

subscriber.assertValue(1);

min操作符有重载方法,可接受比较器实例:

Item one = new Item(1);
List<Item> list = Arrays.asList(
  one, 
  new Item(2), 
  new Item(3), 
  new Item(4), 
  new Item(5));
TestSubscriber<Item> subscriber = TestSubscriber.create();
Observable<Item> itemObservable = Observable.from(list);

MathObservable.from(itemObservable)
  .min(Comparator.comparing(Item::getId))
  .subscribe(subscriber);

subscriber.assertValue(one);

3.4. 求和操作符

sum操作符发射单个值,表示源Observable发射的所有数字的总和:

Observable<Integer> sourceObservable = Observable.range(1, 20);
TestSubscriber<Integer> subscriber = TestSubscriber.create();

MathObservable.sumInteger(sourceObservable).subscribe(subscriber);

subscriber.assertValue(210);

还有原始类型专用操作符:sumIntegersumLongsumFloatsumDouble

4. 聚合操作符

4.1. 连接操作符

concat操作符将源Observable发射的项目连接在一起。

定义两个Observable并连接它们:

List<Integer> listOne = Arrays.asList(1, 2, 3, 4);
Observable<Integer> observableOne = Observable.from(listOne);

List<Integer> listTwo = Arrays.asList(5, 6, 7, 8);
Observable<Integer> observableTwo = Observable.from(listTwo);

TestSubscriber<Integer> subscriber = TestSubscriber.create();

Observable<Integer> concatObservable = observableOne
  .concatWith(observableTwo);

concatObservable.subscribe(subscriber);

subscriber.assertValues(1, 2, 3, 4, 5, 6, 7, 8);

深入细节:concat操作符会等待前一个Observable完成后,才订阅下一个Observable。❌ 因此,连接"热"Observable(立即开始发射项目)会导致在所有前序Observable完成前发射的项目丢失。

4.2. 计数操作符

count操作符发射源Observable发射的项目总数:

计算Observable发射的项目数量:

List<String> lettersList = Arrays.asList(
  "A", "B", "C", "D", "E", "F", "G");
TestSubscriber<Integer> subscriber = TestSubscriber.create();

Observable<Integer> sourceObservable = Observable
  .from(lettersList).count();
sourceObservable.subscribe(subscriber);

subscriber.assertValue(7);

如果源Observable以错误终止,count会传递错误通知而不发射项目。但如果它永不终止,count既不会发射项目也不会终止。

对于计数操作,还有countLong操作符,最终发射Long值,适用于可能超过Integer容量的序列。

4.3. 归约操作符

reduce操作符通过应用累加器函数,将所有发射元素归约为单个元素。此过程持续到所有项目发射完毕,然后reduce生成的Observable发射函数返回的最终值。

看如何对String列表执行归约,按反向顺序连接:

List<String> list = Arrays.asList("A", "B", "C", "D", "E", "F", "G");
TestSubscriber<String> subscriber = TestSubscriber.create();

Observable<String> reduceObservable = Observable.from(list)
  .reduce((letter1, letter2) -> letter2 + letter1);
reduceObservable.subscribe(subscriber);

subscriber.assertValue("GFEDCBA");

4.4. 收集操作符

collect操作符类似于reduce,但专用于将元素收集到单个可变数据结构中。它需要两个参数:

  • 返回空可变数据结构的函数
  • 当给定数据结构和发射项目时,适当修改数据结构的函数

看如何从Observable返回项目的set

List<String> list = Arrays.asList("A", "B", "C", "B", "B", "A", "D");
TestSubscriber<HashSet> subscriber = TestSubscriber.create();

Observable<HashSet<String>> reduceListObservable = Observable
  .from(list)
  .collect(HashSet::new, HashSet::add);
reduceListObservable.subscribe(subscriber);

subscriber.assertValues(new HashSet(list));

4.5. 转列表操作符

4.6. 转排序列表操作符

与上例类似,但发射的列表是排序的:

Observable<Integer> sourceObservable = Observable.range(10, 5);
TestSubscriber<List> subscriber = TestSubscriber.create();

Observable<List<Integer>> listObservable = sourceObservable
  .toSortedList();
listObservable.subscribe(subscriber);

subscriber.assertValue(Arrays.asList(10, 11, 12, 13, 14));

toSortedList使用默认比较,但可提供自定义比较器函数。看如何用自定义排序函数对整数进行反向排序:

Observable<Integer> sourceObservable = Observable.range(10, 5);
TestSubscriber<List> subscriber = TestSubscriber.create();

Observable<List<Integer>> listObservable 
  = sourceObservable.toSortedList((int1, int2) -> int2 - int1);
listObservable.subscribe(subscriber);

subscriber.assertValue(Arrays.asList(14, 13, 12, 11, 10));

4.7. 转映射操作符

toMap操作符将Observable发射的项目序列转换为映射,键由指定的键函数生成。

toMap操作符有不同重载方法,需要以下参数中的一个、两个或三个:

  1. keySelector:从项目生成键
  2. valueSelector:从发射项目生成将存储在映射中的实际值
  3. mapFactory:创建将保存项目的集合

先定义简单类Book

class Book {
    private String title;
    private Integer year;

    // 标准构造函数、getter和setter
}

看如何将发射的Book项目序列转换为Map,以书名为键,年份为值:

Observable<Book> bookObservable = Observable.just(
  new Book("The North Water", 2016), 
  new Book("Origin", 2017), 
  new Book("Sleeping Beauties", 2017)
);
TestSubscriber<Map> subscriber = TestSubscriber.create();

Observable<Map<String, Integer>> mapObservable = bookObservable
  .toMap(Book::getTitle, Book::getYear, HashMap::new);
mapObservable.subscribe(subscriber);

subscriber.assertValue(new HashMap() {{
  put("The North Water", 2016);
  put("Origin", 2017);
  put("Sleeping Beauties", 2017);
}});

4.8. 转多值映射操作符

映射时,多个值常共享同一键。将一个键映射到多个值的数据结构称为多值映射。

这可通过toMultiMap操作符实现,它将Observable发射的项目序列转换为List,该列表也是由指定键函数生成键的映射。

此操作符在toMap操作符参数基础上增加collectionFactory参数,允许指定值存储的集合类型。看具体实现:

Observable<Book> bookObservable = Observable.just(
  new Book("The North Water", 2016), 
  new Book("Origin", 2017), 
  new Book("Sleeping Beauties", 2017)
);
TestSubscriber<Map> subscriber = TestSubscriber.create();

Observable multiMapObservable = bookObservable.toMultimap(
  Book::getYear, 
  Book::getTitle, 
  () -> new HashMap<>(), 
  (key) -> new ArrayList<>()
);
multiMapObservable.subscribe(subscriber);

subscriber.assertValue(new HashMap() {{
    put(2016, Arrays.asList("The North Water"));
    put(2017, Arrays.asList("Origin", "Sleeping Beauties"));
}});

5. 总结

本文探讨了RxJava中的数学与聚合操作符,并提供了每个操作符的简单使用示例。

✅ 所有代码示例可在GitHub上找到。


原始标题:Mathematical and Aggregate Operators in RxJava