[关闭]
@cxm-2016 2016-12-05T11:06:39.000000Z 字数 5731 阅读 3077

RxJava学习指南(七)——结合操作符

RxJava学习指南

版本:2
整理:陈小默


八、结合操作符

8.1 And/Then/When

使用Pattern和Plan作为中介,将两个或多个Observable发射的数据集合并到一起

and/then/when

And/Then/When操作符组合的行为类似于zip,但是它们使用一个中间数据结构。接受两个或多个Observable,一次一个将它们的发射物合并到Pattern对象,然后操作那个Pattern对象,变换为一个Plan。随后将这些Plan变换为Observable的发射物。

and/then/when

它们属于rxjava-joins模块,不是核心RxJava包的一部分。

8.2 CombineLatest

当两个Observables中的任何一个发射了数据时,使用一个函数结合每个Observable发射的最近数据项,并且基于这个函数的结果发射数据。

combineLatest

CombineLatest操作符行为类似于zip,但是只有当原始的Observable中的每一个都发射了一条数据时zip才发射数据。CombineLatest则在原始的Observable中任意一个发射了数据时发射一条数据。当原始Observables的任何一个发射了一条数据时,CombineLatest使用一个函数结合它们最近发射的数据,然后发射这个函数的返回值。

combineLatest

RxJava将这个操作符实现为combineLatest,它接受二到九个Observable作为参数,或者单个Observables列表作为参数。它默认不在任何特定的调度器上执行。

  1. Observable.combineLatest(Observable.interval(200L, TimeUnit.MILLISECONDS, Schedulers.newThread()),
  2. Observable.interval(100L, TimeUnit.MILLISECONDS, Schedulers.immediate()).limit(10),
  3. { a, b -> "$a-$b" })
  4. .subscribe(
  5. { println("Item:$it") },
  6. { println("Error:${it.message}") },
  7. { println("Sequence complete") })
Item:0-1
Item:0-2
Item:1-2
Item:1-3
Item:1-4
Item:2-4
Item:2-5
Item:2-6
Item:3-6
Item:3-7
Item:3-8
Item:3-9
Item:4-9

8.2.1 withLatestFrom

withLatestFrom

withLatestFrom操作符还在开发中,不是1.0版本的一部分。类似于combineLatest,但是只在单个原始Observable发射了一条数据时才发射数据。

8.3 Join

任何时候,只要在另一个Observable发射的数据定义的时间窗口内,这个Observable发射了一条数据,就结合两个Observable发射的数据。

join

Join操作符的作用是结合两个Observable的数据,结合方式需要调用者指定。我们需要为左右两个Observable发射的数据设计一个窗口,这个窗口使用Observable实现。每当数据发射时,创建一个相应的Observable表示窗口打开,当这个Observable发射任意数据(包括普通数据或者结束标记)都表示该窗口的关闭。当两个数据的窗口重叠时,结合并发射此数据。

  1. Observable.interval(100L, TimeUnit.MILLISECONDS)//左侧数据
  2. .join(Observable.interval(120L, TimeUnit.MILLISECONDS, Schedulers.immediate()), //右侧数据
  3. { l -> Observable.timer(50L, TimeUnit.MILLISECONDS) }, //左侧数据的计时器(窗口)
  4. { r -> Observable.timer(50L, TimeUnit.MILLISECONDS) }, //右侧数据的计时器(窗口)
  5. { l, r -> "$l-$r" })//左右数据的结合方式
  6. .limit(10)
  7. .subscribe(
  8. { println("Item:$it") },
  9. { println("Error:${it.message}") },
  10. { println("Sequence complete") })

groupJoin

很多ReactiveX的实现都有一个Join操作符,同时也存在一个功能相似的GroupJoin操作符。GroupJoin会将与左侧窗口期相交的右侧数据重新打包成一个新的Observable,然后再左侧数据的窗口期结束时发射这个Observable。

  1. Observable.interval(100L, TimeUnit.MILLISECONDS)//左侧数据
  2. .groupJoin(Observable.interval(120L, TimeUnit.MILLISECONDS, Schedulers.immediate()), //右侧数据
  3. { l -> Observable.timer(50L, TimeUnit.MILLISECONDS) }, //左侧数据的计时器(窗口)
  4. { r -> Observable.timer(50L, TimeUnit.MILLISECONDS) }, //右侧数据的计时器(窗口)
  5. { l, obs_r -> mapOf(l to obs_r) })//左右数据的结合方式
  6. .limit(10)
  7. .subscribe(
  8. { println("Item:$it") },
  9. { println("Error:${it.message}") },
  10. { println("Sequence complete") })

join

join 操作符的四个参数

  1. 用于和数据源(左侧数据)结合的第二Observable(右侧数据)
  2. 数据源(左侧数据)的窗口函数
  3. 右侧数据的窗口函数
  4. 当两个窗口重叠时发生的动作

join默认不在任何特定的调度器上执行。

groupJoin

groupJoin参数:

  1. 用于和数据源结合的第二Observable
  2. 数据源(左侧数据)的窗口函数
  3. 右侧数据的窗口函数
  4. 当两个窗口重叠时发生的动作,和join不一样的是,这里会得到一个将相交右侧数据打包的Observable

groupJoin默认不在任何特定的调度器上执行。

st.join

可选的StringObservable类中也有一个join操作符。它将一个发射字符串序列的Observable转换为一个发射单个字符串的Observable,join操作符使用指定的定界符将全部单独的字符串连接起来。

8.4 Merge

合并多个Observables的发射物

merge

Merge操作符会按照时间顺序将所有数据项交错发射。

  1. Observable.merge(Observable.interval(100L, TimeUnit.MILLISECONDS),
  2. Observable.interval(20L, 100L, TimeUnit.MILLISECONDS),
  3. Observable.interval(50L, 100L, TimeUnit.MILLISECONDS, Schedulers.immediate()))
  4. .limit(10)
  5. .subscribe(
  6. { println("Item:$it") },
  7. { println("Error:${it.message}") },
  8. { println("Sequence complete") })

mergeDelayError

在很多ReactiveX实现中还有一个叫MergeDelayError的操作符,它会保留onError通知直到合并后的Observable所有的数据发射完成。

RxJava将它实现为merge, mergeWithmergeDelayError

merge

除了传递多个Observable给merge,你还可以传递一个Observable列表List,数组,甚至是一个发射Observable序列的Observable,merge将合并它们的输出作为单个Observable的输出:

merge

如果你传递一个发射Observables序列的Observable,你可以指定merge应该同时订阅的Observable'的最大数量。一旦达到订阅数的限制,它将不再订阅原始Observable发射的任何其它Observable,直到某个已经订阅的Observable发射了onCompleted通知。

merge是静态方法,mergeWith是对象方法,举个例子,Observable.merge(odds,evens)等价于odds.mergeWith(evens)

如果传递给merge的任何一个的Observable发射了onError通知终止了,merge操作符生成的Observable也会立即以onError通知终止。如果你想让它继续发射数据,在最后才报告错误,可以使用mergeDelayError

merge

8.5 StartWith

在数据序列的开头插入一条指定的项

startWith

如果你想要一个Observable在发射数据之前先发射一个指定的数据序列,可以使用StartWith操作符。(如果你想一个Observable发射的数据末尾追加一个数据序列可以使用Concat操作符。)

startWith

可接受一个Iterable或者多个Observable作为函数的参数。

startWith

你也可以传递一个Observable给startWith,它会将那个Observable的发射物插在原始Observable发射的数据序列之前,然后把这个当做自己的发射物集合。这可以看作是Concat的反转。

8.6 Switch

将一个发射多个Observables的Observable转换成另一个单独的Observable,后者发射那些Observables最近发射的数据项

switch

Switch订阅一个发射多个Observables的Observable。它每次观察那些Observables中的一个,Switch返回的这个Observable取消订阅前一个发射数据的Observable,开始发射最近的Observable发射的数据。注意:当原始Observable发射了一个新的Observable时(不是这个新的Observable发射了一条数据时),它将取消订阅之前的那个Observable。这意味着,在后来那个Observable产生之后到它开始发射数据之前的这段时间里,前一个Observable发射的数据将被丢弃(就像图例上的那个黄色圆圈一样)。

Java将这个操作符实现为switchOnNext。它默认不在任何特定的调度器上执行。

8.7 Zip

通过一个函数将多个Observables的发射物结合到一起,基于这个函数的结果为每个结合体发射单个数据项。

zip

Zip严格按照顺序发射多个Observable的结合

  1. Observable.zip(Observable.interval(100L, TimeUnit.MILLISECONDS),
  2. Observable.interval(100L, TimeUnit.MILLISECONDS, Schedulers.immediate()),
  3. { f, s -> "$f-$s" })
  4. .limit(10)
  5. .subscribe(
  6. { println("Item:$it") },
  7. { println("Error:${it.message}") },
  8. { println("Sequence complete") })

RxJava将这个操作符实现为zipzipWith

zip

zip的最后一个参数接受每个Observable发射的一项数据,返回被压缩后的数据,它可以接受一到九个参数:一个Observable序列,或者一些发射Observable的Observables。

8.7.1 zipWith

zip

zipWith操作符总是接受两个参数,第一个参数是一个Observable或者一个Iterable。

zipzipWith默认不在任何特定的操作符上执行。

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注