@cxm-2016
2016-12-05T11:06:39.000000Z
字数 5731
阅读 3089
RxJava学习指南
版本:2
整理:陈小默
使用Pattern和Plan作为中介,将两个或多个Observable发射的数据集合并到一起
And/Then/When操作符组合的行为类似于zip
,但是它们使用一个中间数据结构。接受两个或多个Observable,一次一个将它们的发射物合并到Pattern
对象,然后操作那个Pattern
对象,变换为一个Plan
。随后将这些Plan
变换为Observable的发射物。
它们属于rxjava-joins
模块,不是核心RxJava包的一部分。
当两个Observables中的任何一个发射了数据时,使用一个函数结合每个Observable发射的最近数据项,并且基于这个函数的结果发射数据。
CombineLatest
操作符行为类似于zip
,但是只有当原始的Observable中的每一个都发射了一条数据时zip
才发射数据。CombineLatest
则在原始的Observable中任意一个发射了数据时发射一条数据。当原始Observables的任何一个发射了一条数据时,CombineLatest
使用一个函数结合它们最近发射的数据,然后发射这个函数的返回值。
RxJava将这个操作符实现为combineLatest
,它接受二到九个Observable作为参数,或者单个Observables列表作为参数。它默认不在任何特定的调度器上执行。
Observable.combineLatest(Observable.interval(200L, TimeUnit.MILLISECONDS, Schedulers.newThread()),
Observable.interval(100L, TimeUnit.MILLISECONDS, Schedulers.immediate()).limit(10),
{ a, b -> "$a-$b" })
.subscribe(
{ println("Item:$it") },
{ println("Error:${it.message}") },
{ println("Sequence complete") })
Item:0-1
Item:0-2
Item:1-2
Item:1-3
Item:1-4
Item:2-4
Item:2-5
Item:2-6
Item:3-6
Item:3-7
Item:3-8
Item:3-9
Item:4-9
withLatestFrom
操作符还在开发中,不是1.0版本的一部分。类似于combineLatest
,但是只在单个原始Observable发射了一条数据时才发射数据。
任何时候,只要在另一个Observable发射的数据定义的时间窗口内,这个Observable发射了一条数据,就结合两个Observable发射的数据。
Join
操作符的作用是结合两个Observable的数据,结合方式需要调用者指定。我们需要为左右两个Observable发射的数据设计一个窗口,这个窗口使用Observable实现。每当数据发射时,创建一个相应的Observable表示窗口打开,当这个Observable发射任意数据(包括普通数据或者结束标记)都表示该窗口的关闭。当两个数据的窗口重叠时,结合并发射此数据。
Observable.interval(100L, TimeUnit.MILLISECONDS)//左侧数据
.join(Observable.interval(120L, TimeUnit.MILLISECONDS, Schedulers.immediate()), //右侧数据
{ l -> Observable.timer(50L, TimeUnit.MILLISECONDS) }, //左侧数据的计时器(窗口)
{ r -> Observable.timer(50L, TimeUnit.MILLISECONDS) }, //右侧数据的计时器(窗口)
{ l, r -> "$l-$r" })//左右数据的结合方式
.limit(10)
.subscribe(
{ println("Item:$it") },
{ println("Error:${it.message}") },
{ println("Sequence complete") })
很多ReactiveX的实现都有一个Join
操作符,同时也存在一个功能相似的GroupJoin
操作符。GroupJoin会将与左侧窗口期相交的右侧数据重新打包成一个新的Observable,然后再左侧数据的窗口期结束时发射这个Observable。
Observable.interval(100L, TimeUnit.MILLISECONDS)//左侧数据
.groupJoin(Observable.interval(120L, TimeUnit.MILLISECONDS, Schedulers.immediate()), //右侧数据
{ l -> Observable.timer(50L, TimeUnit.MILLISECONDS) }, //左侧数据的计时器(窗口)
{ r -> Observable.timer(50L, TimeUnit.MILLISECONDS) }, //右侧数据的计时器(窗口)
{ l, obs_r -> mapOf(l to obs_r) })//左右数据的结合方式
.limit(10)
.subscribe(
{ println("Item:$it") },
{ println("Error:${it.message}") },
{ println("Sequence complete") })
join
操作符的四个参数
join
默认不在任何特定的调度器上执行。
groupJoin
参数:
join
不一样的是,这里会得到一个将相交右侧数据打包的ObservablegroupJoin
默认不在任何特定的调度器上执行。
可选的StringObservable
类中也有一个join
操作符。它将一个发射字符串序列的Observable转换为一个发射单个字符串的Observable,join
操作符使用指定的定界符将全部单独的字符串连接起来。
合并多个Observables的发射物
Merge
操作符会按照时间顺序将所有数据项交错发射。
Observable.merge(Observable.interval(100L, TimeUnit.MILLISECONDS),
Observable.interval(20L, 100L, TimeUnit.MILLISECONDS),
Observable.interval(50L, 100L, TimeUnit.MILLISECONDS, Schedulers.immediate()))
.limit(10)
.subscribe(
{ println("Item:$it") },
{ println("Error:${it.message}") },
{ println("Sequence complete") })
在很多ReactiveX实现中还有一个叫MergeDelayError
的操作符,它会保留onError
通知直到合并后的Observable所有的数据发射完成。
RxJava将它实现为merge
, mergeWith
和mergeDelayError
。
除了传递多个Observable给merge
,你还可以传递一个Observable列表List
,数组,甚至是一个发射Observable序列的Observable,merge
将合并它们的输出作为单个Observable的输出:
如果你传递一个发射Observables序列的Observable,你可以指定merge
应该同时订阅的Observable'的最大数量。一旦达到订阅数的限制,它将不再订阅原始Observable发射的任何其它Observable,直到某个已经订阅的Observable发射了onCompleted
通知。
merge
是静态方法,mergeWith
是对象方法,举个例子,Observable.merge(odds,evens)
等价于odds.mergeWith(evens)
。
如果传递给merge
的任何一个的Observable发射了onError
通知终止了,merge
操作符生成的Observable也会立即以onError
通知终止。如果你想让它继续发射数据,在最后才报告错误,可以使用mergeDelayError
。
在数据序列的开头插入一条指定的项
如果你想要一个Observable在发射数据之前先发射一个指定的数据序列,可以使用StartWith
操作符。(如果你想一个Observable发射的数据末尾追加一个数据序列可以使用Concat
操作符。)
可接受一个Iterable或者多个Observable作为函数的参数。
你也可以传递一个Observable给startWith
,它会将那个Observable的发射物插在原始Observable发射的数据序列之前,然后把这个当做自己的发射物集合。这可以看作是Concat
的反转。
将一个发射多个Observables的Observable转换成另一个单独的Observable,后者发射那些Observables最近发射的数据项
Switch
订阅一个发射多个Observables的Observable。它每次观察那些Observables中的一个,Switch
返回的这个Observable取消订阅前一个发射数据的Observable,开始发射最近的Observable发射的数据。注意:当原始Observable发射了一个新的Observable时(不是这个新的Observable发射了一条数据时),它将取消订阅之前的那个Observable。这意味着,在后来那个Observable产生之后到它开始发射数据之前的这段时间里,前一个Observable发射的数据将被丢弃(就像图例上的那个黄色圆圈一样)。
Java将这个操作符实现为switchOnNext
。它默认不在任何特定的调度器上执行。
通过一个函数将多个Observables的发射物结合到一起,基于这个函数的结果为每个结合体发射单个数据项。
Zip
严格按照顺序发射多个Observable的结合
Observable.zip(Observable.interval(100L, TimeUnit.MILLISECONDS),
Observable.interval(100L, TimeUnit.MILLISECONDS, Schedulers.immediate()),
{ f, s -> "$f-$s" })
.limit(10)
.subscribe(
{ println("Item:$it") },
{ println("Error:${it.message}") },
{ println("Sequence complete") })
RxJava将这个操作符实现为zip
和zipWith
。
zip
的最后一个参数接受每个Observable发射的一项数据,返回被压缩后的数据,它可以接受一到九个参数:一个Observable序列,或者一些发射Observable的Observables。
zipWith
操作符总是接受两个参数,第一个参数是一个Observable或者一个Iterable。
zip
和zipWith
默认不在任何特定的操作符上执行。