@cxm-2016
2016-12-17T11:42:36.000000Z
字数 10254
阅读 5148
RxJava学习指南
版本:1
整理:陈小默
延迟一段指定的时间再发射来自Observable的发射物
Delay
操作符让原始Observable在发射每项数据之前都暂停一段指定的时间段。效果是Observable发射的数据项在时间上向前整体平移了一个增量。
RxJava的实现是 delay
和delaySubscription
。
第一种delay
接受一个定义时长的参数(包括数量和单位)。每当原始Observable发射一项数据,delay
就启动一个定时器,当定时器过了给定的时间段时,delay
返回的Observable发射相同的数据项。
注意:delay
不会平移onError
通知,它会立即将这个通知传递给订阅者,同时丢弃任何待发射的onNext
通知。然而它会平移一个onCompleted
通知。
delay
默认在computation
调度器上执行,你可以通过参数指定使用其它的调度器。
另一种delay
不实用常数延时参数,它使用一个函数针对原始Observable的每一项数据返回一个Observable,它监视返回的这个Observable,当任何那样的Observable终止时,delay
返回的Observable就发射关联的那项数据。
这种delay
默认不在任何特定的调度器上执行。
这个版本的delay
对每一项数据使用一个Observable作为原始Observable的延时定时器。
这种delay
默认不在任何特定的调度器上执行。
还有一个操作符delaySubscription
让你你可以延迟订阅原始Observable。它结合搜一个定义延时的参数。
delaySubscription
默认在computation
调度器上执行,你可以通过参数指定使用其它的调度器。
还有一个版本的delaySubscription
使用一个Obseable而不是一个固定的时长来设置订阅延时。
这种delaySubscription
默认不在任何特定的调度器上执行。
注册一个动作作为原始Observable生命周期事件的一种占位符
你可以注册回调,当Observable的某个事件发生时,Rx会在与Observable链关联的正常通知集合中调用它。Rx实现了多种操作符用于达到这个目的。
RxJava实现了很多Do
操作符的变体。
doOnEach
操作符让你可以注册一个回调,它产生的Observable每发射一项数据就会调用它一次。你可以以Action
的形式传递参数给它,这个Action接受一个onNext
的变体Notification
作为它的唯一参数,你也可以传递一个Observable给doOnEach
,这个Observable的onNext
会被调用,就好像它订阅了原始的Observable一样。
doOnNext
操作符类似于doOnEach(Action1)
,但是它的Action不是接受一个Notification
参数,而是接受发射的数据项。
示例代码
Observable.just(1, 2, 3)
.doOnNext(new Action1<Integer>() {
@Override
public void call(Integer item) {
if( item > 1 ) {
throw new RuntimeException( "Item exceeds maximum value" );
}
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});
输出
Next: 1
Error: Item exceeds maximum value
doOnSubscribe
操作符注册一个动作,当观察者订阅它生成的Observable它就会被调用。
doOnUnsubscribe
操作符注册一个动作,当观察者取消订阅它生成的Observable它就会被调用。
doOnCompleted
操作符注册一个动作,当它产生的Observable正常终止调用onCompleted
时会被调用。
doOnError
操作符注册一个动作,当它产生的Observable异常终止调用onError
时会被调用。
doOnTerminate
操作符注册一个动作,当它产生的Observable终止之前会被调用,无论是正常还是异常终止。
finallyDo
操作符注册一个动作,当它产生的Observable终止之后会被调用,无论是正常还是异常终止。
Materialize
将数据项和事件通知都当做数据项发射,Dematerialize
刚好相反。
一个合法的有限的Obversable将调用它的观察者的onNext
方法零次或多次,然后调用观察者的onCompleted
或onError
正好一次。Materialize
操作符将这一系列调用,包括原来的onNext
通知和终止通知onCompleted
或onError
都转换为一个Observable发射的数据序列。
RxJava的materialize
将来自原始Observable的通知转换为Notification
对象,然后它返回的Observable会发射这些数据。
materialize
默认不在任何特定的调度器 (Scheduler
) 上执行。
Dematerialize
操作符是Materialize
的逆向过程,它将Materialize
转换的结果还原成它原本的形式。
dematerialize
反转这个过程,将原始Observable发射的Notification
对象还原成Observable的通知。
dematerialize
默认不在任何特定的调度器 (Scheduler
) 上执行。
指定一个观察者在哪个调度器上观察这个Observable
很多ReactiveX实现都使用调度器 "Scheduler
"来管理多线程环境中Observable的转场。你可以使用ObserveOn
操作符指定Observable在一个特定的调度器上发送通知给观察者 (调用观察者的onNext
, onCompleted
, onError
方法)。
注意:当遇到一个异常时ObserveOn
会立即向前传递这个onError
终止通知,它不会等待慢速消费的Observable接受任何之前它已经收到但还没有发射的数据项。这可能意味着onError
通知会跳到(并吞掉)原始Observable发射的数据项前面,正如图例上展示的。
SubscribeOn
操作符的作用类似,但它是用于指定Observable本身在特定的调度器上执行,它同样会在那个调度器上给观察者发通知。
RxJava中,要指定Observable应该在哪个调度器上调用观察者的onNext
, onCompleted
, onError
方法,你需要使用observeOn
操作符,传递给它一个合适的Scheduler
。
强制一个Observable连续调用并保证行为正确
一个Observable可以异步调用它的观察者的方法,可能是从不同的线程调用。这可能会让Observable行为不正确,它可能会在某一个onNext
调用之前尝试调用onCompleted
或onError
方法,或者从两个不同的线程同时调用onNext
方法。使用Serialize
操作符,你可以纠正这个Observable的行为,保证它的行为是正确的且是同步的。
RxJava中的实现是serialize
,它默认不在任何特定的调度器上执行。
操作来自Observable的发射物和通知
Subscribe
操作符是连接观察者和Observable的胶水。一个观察者要想看到Observable发射的数据项,或者想要从Observable获取错误和完成通知,它首先必须使用这个操作符订阅那个Observable。
Subscribe
操作符的一般实现可能会接受一到三个方法(然后由观察者组合它们),或者接受一个实现了包含这三个方法的接口的对象(有时叫做Observer
或Subscriber
):
onNext
每当Observable发射了一项数据它就会调用这个方法。这个方法的参数是这个Observable发射的数据项。
onError
Observable调用这个方法表示它无法生成期待的数据或者遇到了其它错误。这将停止Observable,它在这之后不会再调用onNext
或onCompleted
。onError
方法的参数是导致这个错误的原因的一个表示(有时可能是一个Exception或Throwable对象,其它时候也可能是一个简单的字符串,取决于具体的实现)。
onCompleted
如果没有遇到任何错误,Observable在最后一次调用onCompleted
之后会调用这个方法。
如果一个Observable直到有一个观察者订阅它才开始发射数据项,就称之为"冷"的Observable;如果一个Observable可能在任何时刻开始发射数据,就称之为"热"的Observable,一个订阅者可能从开始之后的某个时刻开始观察它发射的数据序列,它可能会错过在订阅之前发射的数据。
RxJava中的实现是subscribe
方法。
如果你使用无参数的版本,它将触发对Observable的一个订阅,但是将忽略它的发射物和通知。这个操作会激活一个"冷"的Observable。
你也可以传递一到三个函数给它,它们会按下面的方法解释:
onNext
onNext
和onError
onNext
, onError
和onCompleted
最后,你还可以传递一个Observer
或Subscriber
接口给它,Observer
接口包含这三个以on
开头的方法。Subscriber
接口也实现了这三个方法,而且还添加了几个额外的方法,用于支持使用反压操作(reactive pull backpressure
),这让Subscriber
可以在Observable完成前取消订阅。
subscribe
方法返回一个实现了Subscription
接口的对象。这个接口包含unsubscribe
方法,任何时刻你都可以调用它来断开subscribe
方法建立的Observable和观察者之间的订阅关系。
forEach
方法是简化版的subscribe
,你同样可以传递一到三个函数给它,解释和传递给subscribe
时一样。
不同的是,你无法使用forEach
返回的对象取消订阅。也没办法传递一个可以用于取消订阅的参数。因此,只有当你明确地需要操作Observable的所有发射物和通知时,你才应该使用这个操作符。
BlockingObservable
类中也有一个类似的叫作forEach
的方法。详细的说明见 BlockingObservable
指定Observable自身在哪个调度器上执行
很多ReactiveX实现都使用调度器 "Scheduler
"来管理多线程环境中Observable的转场。你可以使用SubscribeOn
操作符指定Observable在一个特定的调度器上运转。
ObserveOn
操作符的作用类似,但是功能很有限,它指示Observable在一个指定的调度器上给观察者发通知。
在某些实现中还有一个UnsubscribeOn
操作符。
将一个发射数据的Observable转换为发射那些数据发射时间间隔的Observable
TimeInterval
操作符拦截原始Observable发射的数据项,替换为发射表示相邻发射物时间间隔的对象。
RxJava中的实现为timeInterval
,这个操作符将原始Observable转换为另一个Obserervable,后者发射一个标志替换前者的数据项,这个标志表示前者的两个连续发射物之间流逝的时间长度。新的Observable的第一个发射物表示的是在观察者订阅原始Observable到原始Observable发射它的第一项数据之间流逝的时间长度。不存在与原始Observable发射最后一项数据和发射onCompleted
通知之间时长对应的发射物。
timeInterval
默认在immediate
调度器上执行,你可以通过传参数修改。
对原始Observable的一个镜像,如果过了一个指定的时长仍没有发射数据,它会发一个错误通知
如果原始Observable过了指定的一段时长没有发射任何数据,Timeout
操作符会以一个onError
通知终止这个Observable。
RxJava中的实现为timeout
,但是有好几个变体。
第一个变体接受一个时长参数,每当原始Observable发射了一项数据,timeout
就启动一个计时器,如果计时器超过了指定指定的时长而原始Observable没有发射另一项数据,timeout
就抛出TimeoutException
,以一个错误通知终止Observable。
这个timeout
默认在computation
调度器上执行,你可以通过参数指定其它的调度器。
这个版本的timeout
在超时时会切换到使用一个你指定的备用的Observable,而不是发错误通知。它也默认在computation
调度器上执行。
这个版本的timeout
使用一个函数针对原始Observable的每一项返回一个Observable,如果当这个Observable终止时原始Observable还没有发射另一项数据,就会认为是超时了,timeout
就抛出TimeoutException
,以一个错误通知终止Observable。
这个timeout
默认在immediate
调度器上执行。
这个版本的timeout
同时指定超时时长和备用的Observable。它默认在immediate
调度器上执行。
这个版本的time
除了给每一项设置超时,还可以单独给第一项设置一个超时。它默认在immediate
调度器上执行。
同上,但是同时可以指定一个备用的Observable。它默认在immediate
调度器上执行。
给Observable发射的数据项附加一个时间戳
RxJava中的实现为timestamp
,它将一个发射T类型数据的Observable转换为一个发射类型为Timestamped<T>
的数据的Observable,每一项都包含数据的原始发射时间。
timestamp
默认在immediate
调度器上执行,但是可以通过参数指定其它的调度器。
创建一个只在Observable生命周期内存在的一次性资源
Using
操作符让你可以指示Observable创建一个只在它的生命周期内存在的资源,当Observable终止时这个资源会被自动释放。
using
操作符接受三个参数:
当一个观察者订阅using
返回的Observable时,using
将会使用Observable工厂函数创建观察者要观察的Observable,同时使用资源工厂函数创建一个你想要创建的资源。当观察者取消订阅这个Observable时,或者当观察者终止时(无论是正常终止还是因错误而终止),using
使用第三个函数释放它创建的资源。
using
默认不在任何特定的调度器上执行。
将Observable转换为另一个对象或数据结构
ReactiveX的很多语言特定实现都有一种操作符让你可以将Observable或者Observable发射的数据序列转换为另一个对象或数据结构。它们中的一些会阻塞直到Observable终止,然后生成一个等价的对象或数据结构;另一些返回一个发射那个对象或数据结构的Observable。
在某些ReactiveX实现中,还有一个操作符用于将Observable转换成阻塞式的。一个阻塞式的Ogbservable在普通的Observable的基础上增加了几个方法,用于操作Observable发射的数据项。
getIterator
操作符只能用于BlockingObservable
的子类,要使用它,你首先必须把原始的Observable转换为一个BlockingObservable
。可以使用这两个操作符:BlockingObservable.from
或the Observable.toBlocking
。
这个操作符将Observable转换为一个Iterator
,你可以通过它迭代原始Observable发射的数据集。
toFuture
操作符也是只能用于BlockingObservable
。这个操作符将Observable转换为一个返回单个数据项的Future
,如果原始Observable发射多个数据项,Future
会收到一个IllegalArgumentException
;如果原始Observable没有发射任何数据,Future
会收到一个NoSuchElementException
。
如果你想将发射多个数据项的Observable转换为Future
,可以这样用:myObservable.toList().toBlocking().toFuture()
。
toFuture
操作符也是只能用于BlockingObservable
。这个操作符将Observable转换为一个Iterable
,你可以通过它迭代原始Observable发射的数据集。
通常,发射多项数据的Observable会为每一项数据调用onNext
方法。你可以用toList
操作符改变这个行为,让Observable将多项数据组合成一个List
,然后调用一次onNext
方法传递整个列表。
如果原始Observable没有发射任何数据就调用了onCompleted
,toList
返回的Observable会在调用onCompleted
之前发射一个空列表。如果原始Observable调用了onError
,toList
返回的Observable会立即调用它的观察者的onError
方法。
toList
默认不在任何特定的调度器上执行。
toMap
收集原始Observable发射的所有数据项到一个Map(默认是HashMap)然后发射这个Map。你可以提供一个用于生成Map的Key的函数,还可以提供一个函数转换数据项到Map存储的值(默认数据项本身就是值)。
toMap
默认不在任何特定的调度器上执行。
toMultiMap
类似于toMap
,不同的是,它生成的这个Map同时还是一个ArrayList
(默认是这样,你可以传递一个可选的工厂方法修改这个行为)。
toMultiMap
默认不在任何特定的调度器上执行。
toSortedList
类似于toList
,不同的是,它会对产生的列表排序,默认是自然升序,如果发射的数据项没有实现Comparable
接口,会抛出一个异常。然而,你也可以传递一个函数作为用于比较两个数据项,这是toSortedList
不会使用Comparable
接口。
toSortedList
默认不在任何特定的调度器上执行。
nest
操作符有一个特殊的用途:将一个Observable转换为一个发射这个Observable的Observable。