@cxm-2016
2016-11-23T15:50:57.000000Z
字数 3302
阅读 2232
RxJava学习指南
版本:3
整理:陈小默
Subject可以看成是一个桥梁或者代理,在某些ReactiveX实现中(如RxJava),它同时充当了Observer和Observable的角色。因为它是一个Observer,它可以订阅一个或多个Observable;又因为它是一个Observable,它可以转发它收到(Observe)的数据,也可以发射新的数据。
由于一个Subject订阅一个Observable,它可以触发这个Observable开始发射数据(如果那个Observable是"冷"的--就是说,它等待有订阅才开始发射数据)。因此有这样的效果,Subject可以把原来那个"冷"的Observable变成"热"的。
针对不同的场景一共有四种类型的Subject。他们并不是在所有的实现中全部都存在,而且一些实现使用其它的命名约定(例如,在RxScala中Subject被称作PublishSubject)。
一个AsyncSubject只在原始Observable完成后,发射来自原始Observable的最后一个值。(如果原始Observable没有发射任何值,AsyncObject也不发射任何值)它会把这最后一个值发射给任何后续的观察者。
示例代码:
fun main(args: Array<String>) {
val asyncSubject = AsyncSubject<Int>()
asyncSubject.onNext(1)
asyncSubject.subscribe(
{ println("A see $it") },
{ println("A get error") })
asyncSubject.onNext(2)
asyncSubject.subscribe(
{ println("B see $it") },
{ println("B get error") })
asyncSubject.onNext(3)
asyncSubject.onCompleted()
}
运行结果
A see 3
B see 3
然而,如果原始的Observable因为发生了错误而终止,AsyncSubject将不会发射任何数据,只是简单的向前传递这个错误通知。
当观察者订阅BehaviorSubject时,它开始发射原始Observable最近发射的数据(如果此时还没有收到任何数据,它会发射一个默认值),然后继续发射其它任何来自原始Observable的数据。
然而,如果原始的Observable因为发生了一个错误而终止,BehaviorSubject将不会发射任何数据,只是简单的向前传递这个错误通知。
代码示例:
fun main(args: Array<String>) {
val behaviorSubject = BehaviorSubject(0)
behaviorSubject.subscribe(
{ println("A see $it") },
{ println("A get error") })
behaviorSubject.onNext(1)
behaviorSubject.onNext(2)
behaviorSubject.onNext(3)
behaviorSubject.subscribe(
{ println("B see $it") },
{ println("B get error") })
behaviorSubject.onNext(4)
behaviorSubject.onError(RuntimeException())
behaviorSubject.subscribe(
{ println("C see $it") },
{ println("C get error") })
}
运行结果:
A see 0
A see 1
A see 2
A see 3
B see 3
A see 4
B see 4
A get error
B get error
C get error
PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者。需要注意的是,PublishSubject可能会一创建完成就立刻开始发射数据(除非你可以阻止它发生),因此这里有一个风险:在Subject被创建后到有观察者订阅它之前这个时间段内,一个或多个数据可能会丢失。如果要确保来自原始Observable的所有数据都被分发,你需要这样做:或者使用Create创建那个Observable以便手动给它引入"冷"Observable的行为(当所有观察者都已经订阅时才开始发射数据),或者改用ReplaySubject。
示例:
fun main(args: Array<String>) {
val stringPublishSubject = PublishSubject<String>()
stringPublishSubject.subscribe {
println("observer 1 get $it")
}
stringPublishSubject.onNext("1")
stringPublishSubject.subscribe {
println("observer 2 get $it")
}
stringPublishSubject.onNext("2")
}
输出结果:
observer 1 get 1
observer 1 get 2
observer 2 get 2
如果原始的Observable因为发生了一个错误而终止,PublishSubject将不会发射任何数据,只是简单的向前传递这个错误通知。
ReplaySubject会发射所有来自原始Observable的数据给观察者,无论它们是何时订阅的。也有其它版本的ReplaySubject,在重放缓存增长到一定大小的时候或过了一段时间后会丢弃旧的数据(原始Observable发射的)。
如果你把ReplaySubject当作一个观察者使用,注意不要从多个线程中调用它的onNext方法(包括其它的on系列方法),这可能导致同时(非顺序)调用,这会违反Observable协议,给Subject的结果增加了不确定性。
示例代码
fun main(args: Array<String>) {
val replaySubject = ReplaySubject<Int>()
replaySubject.onNext(1)
replaySubject.subscribe(
{ println("A see $it") },
{ println("A get error") })
replaySubject.onNext(2)
replaySubject.subscribe(
{ println("B see $it") },
{ println("B get error") })
replaySubject.onNext(3)
replaySubject.onError(RuntimeException())
}
运行结果:
A see 1
A see 2
B see 1
B see 2
A see 3
B see 3
A get error
B get error
如果你把 Subject
当作一个 Subscriber
使用,注意不要从多个线程中调用它的onNext方法(包括其它的on系列方法),这可能导致同时(非顺序)调用,这会违反Observable协议,给Subject的结果增加了不确定性。
要避免此类问题,你可以将 Subject
转换为一个 SerializedSubject
,类似于这样:
mySafeSubject = new SerializedSubject( myUnsafeSubject );
[1]ReactiveX文档