[关闭]
@cxm-2016 2017-01-19T19:15:54.000000Z 字数 16731 阅读 3664

RxJava学习指南(五)——变换操作符

RxJava学习指南

版本:7
整理:陈小默


六、变换操作符

6.1 Buffer

将发射的数据收集起来放进一个数据结构中,然后将这些数据结构当作元素发射出去。

buffer

注意:如果原来的Observable发射了一个onError通知,Buffer会立即传递这个通知。

Window操作符与Buffer类似,但是它在发射之前把收集到的数据放进单独的Observable,而不是放进一个数据结构。

6.1.1 buffer(count)

将一定数量的数据打包起来放进List中,然后发射这些List

buffer3

  1. Observable.from(arrayOf(1, 2, 3, 4, 5, 6, 7, 8, 9)).buffer(3)
  2. .subscribe(
  3. { println("Item: $it") },
  4. { println("Error: ${it.message}") },
  5. { println("Sequence complete") })

运行结果:

Item: [1, 2, 3]
Item: [4, 5, 6]
Item: [7, 8, 9]
Sequence complete

6.1.2 buffer(count, skip)

我们可以使用该操作符按照一定的数量规则截取数据,参数count为数据包中最大数据数量,skip为间隔多少个元素进行一次缓存操作。

buffer4

  1. Observable.from(arrayOf(1, 2, 3, 4, 5, 6, 7, 8, 9)).buffer(2, 3)
  2. .subscribe(
  3. { println("Item: $it") },
  4. { println("Error: ${it.message}") },
  5. { println("Sequence complete") })
Item: [1, 2]
Item: [4, 5]
Item: [7, 8]
Sequence complete

通过这个例子可以看出,参数的含义是每个3个元素进行一次缓存操作,缓存操作从间隔位置其向后取2个元素存放到List中。

6.1.3 buffer(bufferClosingSelector)

如果我们需要通过特定规则的时间去缓存数据,可以使用次操作符。该操作符需要一个Observable发射间隔标记,每当收到一个间隔标记,就将之前收到的数据打包成一个数据包并发射出去。

buffer1

  1. Observable.create<Int> {
  2. with(it) {
  3. for (i in 0 until 40) {
  4. try {
  5. Thread.sleep(250L)
  6. onNext(i)
  7. } catch (e: Exception) {
  8. onError(e)
  9. }
  10. }
  11. onCompleted()
  12. }
  13. }.buffer {
  14. Observable.interval(1L, 1L, TimeUnit.SECONDS, Schedulers.newThread())
  15. }
  16. .subscribe(
  17. { println("Item: $it") },
  18. { println("Error: ${it.message}") },
  19. { println("Sequence complete") })

以上程序示例为在主线程中每250毫秒发射一个数字,在另一个线程中每1秒钟发射一个结束标记(任意值),每当发现结束标记,就将收集到的数据一次性输出。程序运行结果如下(每次运行结果不一定相同)

Item: [0, 1, 2]
Item: [3, 4, 5, 6]
Item: [7, 8, 9, 10]
Item: [11, 12, 13, 14]
Item: [15, 16, 17, 18]
Item: [19, 20, 21, 22]
Item: [23, 24, 25, 26]
Item: [27, 28, 29, 30]
Item: [31, 32, 33, 34]
Item: [35, 36, 37, 38]
Item: [39]
Sequence complete

6.1.4 buffer(boundary)

buffer8

buffer(boundary)监视一个名叫boundary的Observable,每当这个Observable发射了一个值,它就创建一个新的List开始收集来自原始Observable的数据并发射原来的List

6.1.5 buffer(bufferOpenings, bufferClosingSelector)

该操作符需要一个用来发射Open标记的Observable。每当发射一个Open标记,就创建一个用来发射Close标记的Observable。当发射Close标记的Observable发射第一个数据时表示该窗口关闭。

buffer2

  1. Observable.interval(250L, TimeUnit.MILLISECONDS, Schedulers.immediate())
  2. .limit(20)
  3. .buffer(Observable.interval(800L, TimeUnit.MILLISECONDS, Schedulers.newThread()), {
  4. Observable.timer(500L, TimeUnit.MILLISECONDS, Schedulers.newThread())
  5. })
  6. .subscribe(
  7. { println("Item: $it") },
  8. { println("Error: ${it.message}") },
  9. { println("Sequence complete") })
Item: [3, 4]
Item: [6, 7]
Item: [9, 10]
Item: [12, 13]
Item: [15, 16]
Item: [19]
Sequence complete

6.1.6 buffer(timespan, unit[, scheduler])

buffer5

该操作符每个固定的时间收集一次数据存放的缓存包中(List),然后发射这些缓存包。默认情况下会使用computation调度器。

  1. Observable.create<Int> {
  2. with(it) {
  3. for (i in 0 until 40) {
  4. try {
  5. Thread.sleep(250L)
  6. onNext(i)
  7. } catch (e: Exception) {
  8. onError(e)
  9. }
  10. }
  11. onCompleted()
  12. }
  13. }.buffer(1L, TimeUnit.SECONDS, Schedulers.newThread())
  14. .subscribe(
  15. { println("Item: $it") },
  16. { println("Error: ${it.message}") },
  17. { println("Sequence complete") })

6.1.7 buffer(timespan, unit, count[, scheduler])

该操作符有两个保存数据的关键结点,第一个是如果当前已经缓存的数据项达到了count项,那么此缓存包不再接受新数据而是直接发射,第二个结点是每当计时器到达timespane间隔,就会将当前缓存包发射出去,而不管里面有多少数据。默认情况下会使用computation调度器。

buffer6

  1. Observable.create<Int> {
  2. with(it) {
  3. for (i in 0 until 40) {
  4. try {
  5. Thread.sleep(250L)
  6. onNext(i)
  7. } catch (e: Exception) {
  8. onError(e)
  9. }
  10. }
  11. onCompleted()
  12. }
  13. }.buffer(1L, TimeUnit.SECONDS, 3, Schedulers.newThread())
  14. .subscribe(
  15. { println("Item: $it") },
  16. { println("Error: ${it.message}") },
  17. { println("Sequence complete") })

结果:

Item: [0, 1, 2]
Item: []
Item: [3, 4, 5]
Item: [6]
Item: [7, 8, 9]
Item: [10]
Item: [11, 12, 13]
Item: [14]
Item: [15, 16, 17]
Item: [18]
Item: [19, 20, 21]
Item: [22]
Item: [23, 24, 25]
Item: [26]
Item: [27, 28, 29]
Item: [30]
Item: [31, 32, 33]
Item: [34]
Item: [35, 36, 37]
Item: [38]
Item: [39]
Sequence complete

6.1.8 buffer(timespan, timeshift, unit[, scheduler])

buffer7

buffer(timespan, timeshift, unit)在每一个timeshift时期内都创建一个新的List,然后用原始Observable发射的每一项数据填充这个列表(在把这个List当做自己的数据发射前,从创建时开始,直到过了timespan这么长的时间)。如果timespan长于timeshift,它发射的数据包将会重叠,因此可能包含重复的数据项。

还有另一个版本的buffer接受一个Scheduler参数,默认情况下会使用computation调度器。

  1. Observable.create<Int> {
  2. with(it) {
  3. for (i in 0 until 40) {
  4. try {
  5. Thread.sleep(250L)
  6. onNext(i)
  7. } catch (e: Exception) {
  8. onError(e)
  9. }
  10. }
  11. onCompleted()
  12. }
  13. }.buffer(1L, 2L, TimeUnit.SECONDS, Schedulers.newThread())
  14. .subscribe(
  15. { println("Item: $it") },
  16. { println("Error: ${it.message}") },
  17. { println("Sequence complete") })

上述代码表示,以2秒为一个时间单位输出,但输出前1秒发射的数据集合。

Item: [0, 1, 2]
Item: [7, 8, 9, 10]
Item: [15, 16, 17, 18]
Item: [23, 24, 25, 26]
Item: [31, 32, 33, 34]
Item: [39]
Sequence complete

6.2 FlatMap

FlatMap将一个发射数据的Observable变换为多个Observables。

flatMap

FlatMap操作符使用一个指定的函数对原始Observable发射的每一项数据执行变换操作,这个函数返回一个本身也发射数据的Observable,然后FlatMap合并这些Observables发射的数据,最后将合并后的结果当做它自己的数据序列发射。

这个方法是很有用的,例如,当你有一个这样的Observable:它发射一个数据序列,这些数据本身包含Observable成员或者可以变换为Observable,因此你可以创建一个新的Observable发射这些次级Observable发射的数据的完整集合。

注意:FlatMap对这些Observables发射的数据做的是合并(merge)操作,因此它们可能是交错的。

在许多语言特定的实现中,还有一个操作符不会让变换后的Observables发射的数据交错,它按照严格的顺序发射这些数据,这个操作符通常被叫作ConcatMap或者类似的名字。

mergeMap

RxJava将这个操作符实现为flatMap函数。

注意:如果任何一个通过这个flatMap操作产生的单独的Observable调用onError异常终止了,这个Observable自身会立即调用onError并终止。

这个操作符有一个接受额外的int参数的一个变体。这个参数设置flatMap从原来的Observable映射Observables的最大同时订阅数。当达到这个限制时,它会等待其中一个终止然后再订阅另一个。

  1. Observable.just("hello", "world").flatMap {
  2. Observable.from(it.toCharArray().asIterable())
  3. }.subscribe(
  4. { println("Item: $it") },
  5. { println("Error: ${it.message}") },
  6. { println("Sequence complete") })
Item: h
Item: e
Item: l
Item: l
Item: o
Item: w
Item: o
Item: r
Item: l
Item: d
Sequence complete

mergeMap.nce

还有一个版本的flatMap为原始Observable的每一项数据和每一个通知创建一个新的Observable(并对数据平坦化)。

它也有一个接受额外int参数的变体。

  1. Observable.just("hello", "world").flatMap(
  2. { Observable.from(it.asIterable()) },
  3. { Observable.just(it) },
  4. { Observable.just("over") })
  5. .subscribe(
  6. { println("Item: $it") },
  7. { println("Error: ${it.message}") },
  8. { println("Sequence complete") })
Item: h
Item: e
Item: l
Item: l
Item: o
Item: w
Item: o
Item: r
Item: l
Item: d
Item: over
Sequence complete

mergeMap.r

还有一个版本的flatMap会使用原始Observable的数据触发的Observable组合这些数据,然后发射这些数据组合。它也有一个接受额外int参数的版本。

  1. Observable.just("hello", "world").flatMap({ Observable.from(it.asIterable()) },
  2. { t, u -> u.toInt() })
  3. .subscribe(
  4. { println("Item: $it") },
  5. { println("Error: ${it.message}") },
  6. { println("Sequence complete") })
Item: 104
Item: 101
Item: 108
Item: 108
Item: 111
Item: 119
Item: 111
Item: 114
Item: 108
Item: 100
Sequence complete

6.2.1 flatMapIterable

mergeMapIterable

flatMapIterable这个变体成对的打包数据,然后生成Iterable而不是原始数据和生成的Observables,但是处理方式是相同的。

  1. Observable.just("hello", "world").flatMapIterable { it.asIterable() }
  2. .subscribe(
  3. { println("Item: $it") },
  4. { println("Error: ${it.message}") },
  5. { println("Sequence complete") })

6.2.2 concatMap

concatMap

还有一个concatMap操作符,它类似于最简单版本的flatMap,但是它按次序连接而不是合并那些生成的Observables,然后产生自己的数据序列。

  1. Observable.just("hello", "world").concatMap { Observable.from(it.asIterable()) }
  2. .subscribe(
  3. { println("Item: $it") },
  4. { println("Error: ${it.message}") },
  5. { println("Sequence complete") })

6.2.3 switchMap

switchMap

RxJava还实现了switchMap操作符。它和flatMap很像,除了一点:当原始Observable发射一个新的数据(Observable)时,它将取消订阅并停止监视产生执之前那个数据的Observable,只监视当前这一个。

6.2.4 split

St.split

在特殊的StringObservable类(默认没有包含在RxJava中)中还有一个split操作符。它将一个发射字符串的Observable转换为另一个发射字符串的Observable,只不过,后者将原始的数据序列当做一个数据流,使用一个正则表达式边界分割它们,然后合并发射分割的结果。

6.3 GroupBy

将一个Observable分拆为一些Observables集合,它们中的每一个发射原始Observable的一个子序列

groupBy

GroupBy操作符将原始Observable分拆为一些Observables集合,它们中的每一个发射原始Observable数据序列的一个子序列。哪个数据项由哪一个Observable发射是由一个函数判定的,这个函数给每一项指定一个Key,Key相同的数据会被同一个Observable发射。

RxJava实现了groupBy操作符。它返回Observable的一个特殊子类GroupedObservable,实现了GroupedObservable接口的对象有一个额外的方法getKey,这个Key用于将数据分组到指定的Observable。

有一个版本的groupBy允许你传递一个变换函数,这样它可以在发射结果GroupedObservable之前改变数据项。

注意:groupBy将原始Observable分解为一个发射多个GroupedObservable的Observable,一旦有订阅,每个GroupedObservable就开始缓存数据。因此,如果你忽略这些GroupedObservable中的任何一个,这个缓存可能形成一个潜在的内存泄露。因此,如果你不想观察,也不要忽略GroupedObservable。你应该使用像take(0)这样会丢弃自己的缓存的操作符。

如果你取消订阅一个GroupedObservable,那个Observable将会终止。如果之后原始的Observable又发射了一个与这个Observable的Key匹配的数据,groupBy将会为这个Key创建一个新的GroupedObservable

groupBy默认不在任何特定的调度器上执行。

  1. Observable.just(1, 2.0, 3, 4, 5.0).groupBy { if (it is Int) "int" else "double" }
  2. .filter { it.key == "int" }
  3. .flatMap { it.asObservable() }
  4. .subscribe(
  5. { println("Item: $it") },
  6. { println("Error: ${it.message}") },
  7. { println("Sequence complete") })
Item: 1
Item: 3
Item: 4
Sequence complete

6.4 Map

对Observable发射的每一项数据应用一个函数,执行变换操作

map

Map操作符对原始Observable发射的每一项数据应用一个你选择的函数,然后返回一个发射这些结果的Observable。

RxJava将这个操作符实现为map函数。这个操作符默认不在任何特定的调度器上执行。

  1. Observable.just(1, 2.0, 3, 4, 5.0)
  2. .map { "str:$it" }
  3. .subscribe(
  4. { println("Item: $it") },
  5. { println("Error: ${it.message}") },
  6. { println("Sequence complete") })
Item: str:1
Item: str:2.0
Item: str:3
Item: str:4
Item: str:5.0
Sequence complete

6.4.1 cast

cast

cast操作符将原始Observable发射的每一项数据都强制转换为一个指定的类型,然后再发射数据,它是map的一个特殊版本。

  1. Observable.just(1, 2, 3, 4, 5)
  2. .cast(Any::class.java)
  3. .subscribe(
  4. { println("Item: $it") },
  5. { println("Error: ${it.message}") },
  6. { println("Sequence complete") })
Item: 1
Item: 2
Item: 3
Item: 4
Item: 5
Sequence complete

6.4.2 encode

encode

encodeStringObservable类中,不是标准RxJava的一部分,它也是一个特殊的map操作符。encode将一个发射字符串的Observable变换为一个发射字节数组(这个字节数组按照原始字符串中的多字节字符边界划分)的Observable。

6.4.3 byLine

byLine

byLine同样在StringObservable类中,也不是标准RxJava的一部分,它也是一个特殊的map操作符。byLine将一个发射字符串的Observable变换为一个按行发射来自原始Observable的字符串的Observable。

6.5 Scan

连续地对数据序列的每一项应用一个函数,然后连续发射结果

scan

Scan操作符对原始Observable发射的第一项数据应用一个函数,然后将那个函数的结果作为自己的第一项数据发射。它将函数的结果同第二项数据一起填充给这个函数来产生它自己的第二项数据。它持续进行这个过程来产生剩余的数据序列。这个操作符在某些情况下被叫做accumulator

scan

RxJava实现了scan操作符。

示例代码:

  1. Observable.just(1, 2, 3, 4, 5)
  2. .scan { sum: Int, item: Int -> sum + item }
  3. .subscribe(
  4. { println("Item: $it") },
  5. { println("Error: ${it.message}") },
  6. { println("Sequence complete") })

输出

Item: 1
Item: 3
Item: 6
Item: 10
Item: 15
Sequence complete

scanSeed

有一个scan操作符的变体,你可以传递一个种子值给累加器函数的第一次调用(Observable发射的第一项数据)。如果你使用这个版本,scan将发射种子值作为自己的第一项数据。注意:传递null作为种子值与不传递是不同的,null种子值是合法的。

  1. Observable.just(1, 2, 3, 4, 5)
  2. .scan(10, { sum: Int, item: Int -> sum + item })
  3. .subscribe(
  4. { println("Item: $it") },
  5. { println("Error: ${it.message}") },
  6. { println("Sequence complete") })
Item: 10
Item: 11
Item: 13
Item: 16
Item: 20
Item: 25
Sequence complete

这个操作符默认不在任何特定的调度器上执行。

6.6 Window

定期将来自原始Observable的数据分解为一个Observable窗口。

window

WindowBuffer类似,但不是发射来自原始Observable的数据包,它发射的是Observables,这些Observables中的每一个都发射原始Observable数据的一个子集,最后发射一个onCompleted通知。

Buffer一样,Window有很多变体,每一种都以自己的方式将原始Observable分解为多个作为结果的Observable,每一个都包含一个映射原始数据的window。用Window操作符的术语描述就是,当一个窗口打开(when a window "opens")意味着一个新的Observable已经发射(产生)了,而且这个Observable开始发射来自原始Observable的数据;当一个窗口关闭(when a window "closes")意味着发射(产生)的Observable停止发射原始Observable的数据,并且发射终止通知onCompleted给它的观察者们。

在RxJava中有许多种Window操作符的变体。

6.6.1 window(closingSelector)

lambda:返回一个定期发射结束标记的Observable<TClosing>

window1

window的这个变体会立即打开它的第一个窗口。每当它观察到closingSelector返回的Observable发射了一个对象时,它就关闭当前打开的窗口并立即打开一个新窗口。用这个方法,这种window变体发射一系列不重叠的窗口,这些窗口的数据集合与原始Observable发射的数据是一一对应的。

  1. Observable.create<Int> {
  2. with(it) {
  3. for (i in 0 until 10) {
  4. try {
  5. Thread.sleep(250L)
  6. onNext(i)
  7. } catch (e: Exception) {
  8. onError(e)
  9. }
  10. }
  11. onCompleted()
  12. }
  13. }.window { Observable.interval(1L, TimeUnit.SECONDS, Schedulers.newThread()) }
  14. .subscribe(
  15. {
  16. it.subscribe(
  17. { println("Window Item: $it") },
  18. { println("Window Error: ${it.message}") },
  19. { println("Window Sequence complete") })
  20. },
  21. { println("Error: ${it.message}") },
  22. { println("Sequence complete") })
Window Item: 0
Window Item: 1
Window Item: 2
Window Sequence complete
Window Item: 3
Window Item: 4
Window Item: 5
Window Item: 6
Window Sequence complete
Window Item: 7
Window Item: 8
Window Item: 9
Window Sequence complete
Sequence complete

6.6.2 window(windowOpenings, closingSelector)

window2

跟Buffer一样。

  1. Observable.interval(250L, TimeUnit.MILLISECONDS, Schedulers.immediate())
  2. .limit(20)
  3. .window(Observable.interval(800L, TimeUnit.MILLISECONDS, Schedulers.newThread()), {
  4. Observable.timer(500L, TimeUnit.MILLISECONDS, Schedulers.newThread())
  5. })
  6. .subscribe(
  7. { it.toList().subscribe { println(it) } },
  8. { println("Error: ${it.message}") },
  9. { println("Sequence complete") })
[3, 4]
[6, 7]
[9, 10]
[12, 13]
[15, 16, 17]
[19]
Sequence complete

第一个参数返回发射开始标记的Observable,第二个参数使用Lambda返回一个发射结束标记的Observable

6.6.3 window(count)

window3

这个window的变体立即打开它的第一个窗口。每当当前窗口发射了count项数据,它就关闭当前窗口并打开一个新窗口。如果从原始Observable收到了onErroronCompleted通知它也会关闭当前窗口。这种window变体发射一系列不重叠的窗口,这些窗口的数据集合与原始Observable发射的数据是一一对应的。

  1. Observable.create<Int> {
  2. with(it) {
  3. for (i in 0 until 10) {
  4. try {
  5. Thread.sleep(250L)
  6. onNext(i)
  7. } catch (e: Exception) {
  8. onError(e)
  9. }
  10. }
  11. onCompleted()
  12. }
  13. }.window(2)
  14. .subscribe(
  15. {
  16. it.subscribe(
  17. { println("Window Item: $it") },
  18. { println("Window Error: ${it.message}") },
  19. { println("Window Sequence complete") })
  20. },
  21. { println("Error: ${it.message}") },
  22. { println("Sequence complete") })
Window Item: 0
Window Item: 1
Window Sequence complete
Window Item: 2
Window Item: 3
Window Sequence complete
Window Item: 4
Window Item: 5
Window Sequence complete
Window Item: 6
Window Item: 7
Window Sequence complete
Window Item: 8
Window Item: 9
Window Sequence complete
Sequence complete

6.6.4 window(count, skip)

window4

这个window的变体立即打开它的第一个窗口。原始Observable每发射skip项数据它就打开一个新窗口(例如,如果skip等于3,每到第三项数据,它会打开一耳光新窗口)。每当当前窗口发射了count项数据,它就关闭当前窗口并打开一个新窗口。如果从原始Observable收到了onErroronCompleted通知它也会关闭当前窗口。如果skip=count,它的行为与window(source, count)相同;如果skip < count,窗口可会有count - skip 个重叠的数据;如果skip > count,在两个窗口之间会有skip - count项数据被丢弃。

  1. Observable.create<Int> {
  2. with(it) {
  3. for (i in 0 until 10) {
  4. try {
  5. Thread.sleep(250L)
  6. onNext(i)
  7. } catch (e: Exception) {
  8. onError(e)
  9. }
  10. }
  11. onCompleted()
  12. }
  13. }.window(2, 4)
  14. .subscribe(
  15. {
  16. it.subscribe(
  17. { println("Window Item: $it") },
  18. { println("Window Error: ${it.message}") },
  19. { println("Window Sequence complete") })
  20. },
  21. { println("Error: ${it.message}") },
  22. { println("Sequence complete") })
Window Item: 0
Window Item: 1
Window Sequence complete
Window Item: 4
Window Item: 5
Window Sequence complete
Window Item: 8
Window Item: 9
Window Sequence complete
Sequence complete

6.6.5 window(timespan, unit[, scheduler])

window5

这个window的变体立即打开它的第一个窗口。每当过了timespan这么长的时间它就关闭当前窗口并打开一个新窗口(时间单位是unit,可选在调度器scheduler上执行)。如果从原始Observable收到了onErroronCompleted通知它也会关闭当前窗口。这种window变体发射一系列不重叠的窗口,这些窗口的数据集合与原始Observable发射的数据也是一一对应的。

  1. Observable.create<Int> {
  2. with(it) {
  3. for (i in 0 until 10) {
  4. try {
  5. Thread.sleep(250L)
  6. onNext(i)
  7. } catch (e: Exception) {
  8. onError(e)
  9. }
  10. }
  11. onCompleted()
  12. }
  13. }.window(1L, TimeUnit.SECONDS)
  14. .subscribe(
  15. {
  16. it.subscribe(
  17. { println("Window Item: $it") },
  18. { println("Window Error: ${it.message}") },
  19. { println("Window Sequence complete") })
  20. },
  21. { println("Error: ${it.message}") },
  22. { println("Sequence complete") })

6.6.6 window(timespan, unit, count[, scheduler])

window6

这个window的变体立即打开它的第一个窗口。这个变体是window(count)window(timespan, unit[, scheduler])的结合,每当过了timespan的时长或者当前窗口收到了count项数据,它就关闭当前窗口并打开另一个。如果从原始Observable收到了onErroronCompleted通知它也会关闭当前窗口。这种window变体发射一系列不重叠的窗口,这些窗口的数据集合与原始Observable发射的数据也是一一对应的。

  1. Observable.create<Int> {
  2. with(it) {
  3. for (i in 0 until 10) {
  4. try {
  5. Thread.sleep(250L)
  6. onNext(i)
  7. } catch (e: Exception) {
  8. onError(e)
  9. }
  10. }
  11. onCompleted()
  12. }
  13. }.window(1L, TimeUnit.SECONDS, 2)
  14. .subscribe(
  15. {
  16. it.subscribe(
  17. { println("Window Item: $it") },
  18. { println("Window Error: ${it.message}") },
  19. { println("Window Sequence complete") })
  20. },
  21. { println("Error: ${it.message}") },
  22. { println("Sequence complete") })

6.6.7 window(timespan, timeshift, unit[, scheduler])

timespan是截取长度,timeshift是区间长度

window7

buffer(timespan, timeshift, unit)在每一个timeshift时期内都创建一个新的List,然后用原始Observable发射的每一项数据填充这个列表(在把这个List当做自己的数据发射前,从创建时开始,直到过了timespan这么长的时间)。如果timespan长于timeshift,它发射的数据包将会重叠,因此可能包含重复的数据项。

这个window的变体立即打开它的第一个窗口。随后每当过了timeshift的时长就打开一个新窗口(时间单位是unit,可选在调度器scheduler上执行),当窗口打开的时长达到timespan,它就关闭当前打开的窗口。如果从原始Observable收到了onErroronCompleted通知它也会关闭当前窗口。窗口的数据可能重叠也可能有间隙,取决于你设置的timeshifttimespan的值。

这个变体的window默认在computation调度器上执行它的定时器。

  1. Observable.create<Int> {
  2. with(it) {
  3. for (i in 0 until 10) {
  4. try {
  5. Thread.sleep(250L)
  6. onNext(i)
  7. } catch (e: Exception) {
  8. onError(e)
  9. }
  10. }
  11. onCompleted()
  12. }
  13. }.window(1L, 2L, TimeUnit.SECONDS)
  14. .subscribe(
  15. {
  16. it.subscribe(
  17. { println("Window Item: $it") },
  18. { println("Window Error: ${it.message}") },
  19. { println("Window Sequence complete") })
  20. },
  21. { println("Error: ${it.message}") },
  22. { println("Sequence complete") })

6.6.8 window-backpressure

你可以使用Window操作符实现反压backpressure(意思是,处理这样一个Observable:它产生数据的数据可能比它的观察者消费数据的数据快)。

bp.window1

Window操作符可以将大量的数据序列缩减为较少的数据窗口序列,让它们更容易处理。例如,你可以按固定的时间间隔,定期关闭和发射来自一个爆发性Observable的数据窗口。

示例代码

  1. Observable<Observable<Integer>> burstyWindowed = bursty.window(500, TimeUnit.MILLISECONDS);

bp.window2

你还可以选择每当收到爆发性Observable的N项数据时发射一个新的数据窗口。

示例代码

  1. Observable<Observable<Integer>> burstyWindowed = bursty.window(5);
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注