[关闭]
@cxm-2016 2016-12-12T09:09:54.000000Z 字数 12175 阅读 3666

RxJava学习指南(六)——过滤操作符

RxJava学习指南

版本:4
整理:陈小默


七、过滤操作符

7.1 Debounce

限流操作符的作用是过滤发射频率过快的数据,该操作符会记录每一个数据发射的时间,如果在安全时间限度内没有其他数据发射,则发射此数据,否则忽略此数据并监听最近发射的数据。

debounce

RxJava将这个操作符实现为throttleWithTimeoutdebounce

注意:onCompleted通知不会触发限流。

7.1.1 throttleWithTimeout

debounce

throtleWithTimeout/debounce的一个变体根据你指定的时间间隔进行限流,时间单位通过TimeUnit参数指定。

这种操作符默认在computation调度器上执行,但是你可以通过第三个参数指定。

  1. Observable.create<Int> {
  2. with(it) {
  3. try {
  4. onNext(0)
  5. Thread.sleep(50L)
  6. for (i in 0 until 10) onNext(i)
  7. Thread.sleep(60L)
  8. onNext(10)
  9. } catch (e: Exception) {
  10. onError(e)
  11. }
  12. onCompleted()
  13. }
  14. }.debounce(50L, TimeUnit.MILLISECONDS)
  15. .subscribe(
  16. { println("Item: $it") },
  17. { println("Error: ${it.message}") },
  18. { println("Sequence complete") })
Item: 0
Item: 9
Item: 10
Sequence complete

7.1.2 debounce

这里提供一个新的限流计时方式,既使用Observable充当计时器。我们可以在每个数据发射时创建一个Observable,如果在这个Observable完成之前,有新的数据发射出来,那么就直接丢弃当前的数据项,否则在Observable完成之时发射数据。

debounce

debounce的这个变体默认不在任何特定的调度器上执行。

  1. Observable.create<Int> {
  2. with(it) {
  3. try {
  4. onNext(0)
  5. Thread.sleep(50L)
  6. for (i in 0 until 10) onNext(i)
  7. Thread.sleep(60L)
  8. onNext(10)
  9. } catch (e: Exception) {
  10. onError(e)
  11. }
  12. onCompleted()
  13. }
  14. }.debounce { Observable.timer(50L, TimeUnit.MILLISECONDS) }
  15. .subscribe(
  16. { println("Item: $it") },
  17. { println("Error: ${it.message}") },
  18. { println("Sequence complete") })

7.2 Distinct

抑制(过滤掉)重复的数据项

distinct

Distinct的过滤规则是:只允许还没有发射过的数据项通过。

在某些实现中,有一些变体允许你调整判定两个数据不同(distinct)的标准。还有一些实现只比较一项数据和它的直接前驱,因此只会从序列中过滤掉连续重复的数据。

7.2.1 distinct()

distinct

RxJava将这个操作符实现为distinct函数。

示例代码

  1. Observable.just(1, 2, 3, 2, 1, 4, 5).distinct()
  2. .subscribe(
  3. { println("Item: $it") },
  4. { println("Error: ${it.message}") },
  5. { println("Sequence complete") })

输出

Item: 1
Item: 2
Item: 3
Item: 4
Item: 5
Sequence complete

7.2.2 distinct(Func1)

distinct

这个操作符有一个变体接受一个函数。这个函数根据原始Observable发射的数据项产生一个Key,然后,比较这些Key。如果两个数据的key相同,则只保留最先到达的数据。

  1. Observable.just(1, 2, 3.0F, 4.0F, 5.0, 6.0, true, false)
  2. .distinct {
  3. when (it) {
  4. is Int -> "I"
  5. is Float -> "F"
  6. is Double -> "D"
  7. is Boolean -> "B"
  8. else -> "A"
  9. }
  10. }
  11. .subscribe(
  12. { println("Item: $it") },
  13. { println("Error: ${it.message}") },
  14. { println("Sequence complete") })

Lambda:给接收到的数据创建一个key

7.2.3 distinctUntilChanged

distinctUntilChanged

RxJava还是实现了一个distinctUntilChanged操作符。它只判定一个数据和它的直接前驱是否是不同的。

  1. Observable.just(1, 2, 2, 1, 2, 2)
  2. .distinctUntilChanged()
  3. .subscribe(
  4. { println("Item: $it") },
  5. { println("Error: ${it.message}") },
  6. { println("Sequence complete") })

7.3.4 distinctUntilChanged(Func1)

distinctUntilChanged.key

distinct(Func1)一样,根据一个函数产生的Key判定两个相邻的数据项是不是不同的。

  1. Observable.just(1, 2.0F, 3.0F, 4, 5.0F, 6.0F)
  2. .distinctUntilChanged {
  3. when (it) {
  4. is Int -> "I"
  5. is Float -> "F"
  6. else -> "A"
  7. }
  8. }
  9. .subscribe(
  10. { println("Item: $it") },
  11. { println("Error: ${it.message}") },
  12. { println("Sequence complete") })

Lambda:给接收到的数据创建一个key

Item: 1
Item: 2.0
Item: 4
Item: 5.0
Sequence complete

distinctdistinctUntilChanged默认不在任何特定的调度器上执行。

7.4 ElementAt

只发射第N项数据

elementAt

ElementAt操作符获取原始Observable发射的数据序列指定索引位置的数据项,然后当做自己的唯一数据发射。

elementAt

RxJava将这个操作符实现为elementAt,给它传递一个基于0的索引值,它会发射原始Observable数据序列对应索引位置的值,如果你传递给elementAt的值为5,那么它会发射第六项的数据。

如果你传递的是一个负数,或者原始Observable的数据项数小于index+1,将会抛出一个IndexOutOfBoundsException异常。

  1. Observable.just(0, 1, 2, 3, 4, 5)
  2. .elementAt(2)
  3. .subscribe(
  4. { println("Item: $it") },
  5. { println("Error: ${it.message}") },
  6. { println("Sequence complete") })

7.4.1 elementAtOrDefault

elementAtOrDefault

RxJava还实现了elementAtOrDefault操作符。与elementAt的区别是,如果索引值大于数据项数,它会发射一个默认值(通过额外的参数指定),而不是抛出异常。但是如果你传递一个负数索引值,它仍然会抛出一个IndexOutOfBoundsException异常。

  1. Observable.just(0, 1, 2, 3)
  2. .elementAtOrDefault(6, -1)
  3. .subscribe(
  4. { println("Item: $it") },
  5. { println("Error: ${it.message}") },
  6. { println("Sequence complete") })

elementAtelementAtOrDefault默认不在任何特定的调度器上执行。

7.5 Filter

过滤操作符,只发射满足条件的数据。

filter

filter

RxJava将这个操作符实现为filter函数。

示例代码

  1. Observable.just(0, 1, 2, 3, 4, 5, 6)
  2. .filter { 0 == it % 2 }
  3. .subscribe(
  4. { println("Item: $it") },
  5. { println("Error: ${it.message}") },
  6. { println("Sequence complete") })

Lambda:测试结果

Item: 0
Item: 2
Item: 4
Item: 6
Sequence complete

filter默认不在任何特定的调度器上执行。

7.5.1 ofType

ofType

ofTypefilter操作符的一个特殊形式。它过滤一个Observable只返回指定类型的数据。

  1. Observable.just(0, 1, "hello", 3, "world", 5, 6)
  2. .ofType(String::class.java)
  3. .subscribe(
  4. { println("Item: $it") },
  5. { println("Error: ${it.message}") },
  6. { println("Sequence complete") })

ofType默认不在任何特定的调度器上指定。

7.6 First

只发射第一项(或者满足某个条件的第一项)数据

first

在某些实现中,First没有实现为一个返回Observable的过滤操作符,而是实现为一个在当时就发射原始Observable指定数据项的阻塞函数。在这些实现中,如果你想要的是一个过滤操作符,最好使用Take(1)或者ElementAt(0)

在一些实现中还有一个Single操作符。它的行为与First类似,但为了确保只发射单个值,它会等待原始Observable终止(否则,不是发射那个值,而是以一个错误通知终止)。你可以使用它从原始Observable获取第一项数据,而且也确保只发射一项数据。

在RxJava中,这个操作符被实现为firstfirstOrDefaulttakeFirst

可能容易混淆,BlockingObservable也有名叫firstfirstOrDefault的操作符,它们会阻塞并返回值,不是立即返回一个Observable。

还有几个其它的操作符执行类似的功能。

7.5.1 过滤操作符

first

只发射第一个数据,使用没有参数的first操作符。

示例代码

  1. Observable.just(1, 2, 3)
  2. .first()
  3. .subscribe(
  4. { println("Item: $it") },
  5. { println("Error: ${it.message}") },
  6. { println("Sequence complete") })

7.5.2 first(Func1)

发射第一个满足条件的数据。

first

  1. Observable.just(1, 2, 3)
  2. .first { 0 == it % 2 }
  3. .subscribe(
  4. { println("Item: $it") },
  5. { println("Error: ${it.message}") },
  6. { println("Sequence complete") })

7.5.3 firstOrDefault

firstOrDefault

firstOrDefaultfirst类似,如果没有满足条件的数据时就发射一个默认值。

  1. Observable.empty<Int>()
  2. .firstOrDefault(-1)
  3. .subscribe(
  4. { println("Item: $it") },
  5. { println("Error: ${it.message}") },
  6. { println("Sequence complete") })

7.5.4 firstOrDefault(Func1)

firstOrDefault

发射这个函数判定为true的第一项数据,如果没有满足条件的数据时就发射一个默认值。

  1. Observable.just(1, 3, 5, 7, 9)
  2. .firstOrDefault(-1, { 0 == it % 2 })
  3. .subscribe(
  4. { println("Item: $it") },
  5. { println("Error: ${it.message}") },
  6. { println("Sequence complete") })

7.5.5 takeFirst

takeFirst

takeFirstfirst类似,除了这一点:如果原始Observable没有发射任何满足条件的数据,first会抛出一个NoSuchElementExceptiontakeFist会返回一个空的Observable(不调用onNext()但是会调用onCompleted)。

  1. Observable.just(1, 3, 5, 7, 9)
  2. .takeFirst { 0 == it % 2 }
  3. .subscribe(
  4. { println("Item: $it") },
  5. { println("Error: ${it.message}") },
  6. { println("Sequence complete") })
Sequence complete

7.5.6 single

该操作符用来判断当前Observable是否只发射一个数据,如果是,则正常发射数据,否则抛出一个NoSuchElementException

single

  1. Observable.from(arr)
  2. .single()
  3. .subscribe(
  4. { println("Item: $it") },
  5. { println("Error: ${it.message}") },
  6. { println("Sequence complete") })

val arr = arrayOf(1)

Item: 1
Sequence complete

val arr = emptyArray<Int>()

Error: Sequence contains no elements

val arr = arrayOf(1,2,3)

Error: Sequence contains too many elements

7.5.7 single(Func1)

single

single的变体接受一个谓词函数,发射满足条件的单个值,如果不是正好只有一个数据项满足条件,会以错误通知终止。

  1. Observable.from(arr)
  2. .single { 0 == it % 2 }
  3. .subscribe(
  4. { println("Item: $it") },
  5. { println("Error: ${it.message}") },
  6. { println("Sequence complete") })

val arr = arrayOf(1, 3, 5)时,打印

Error: Sequence contains no elements

val arr = arrayOf(1, 2, 3, 5),打印

Item: 2
Sequence complete

val arr = arrayOf(1, 2, 3, 4, 5),打印

Error: Sequence contains too many elements

7.5.8 singleOrDefault

single

firstOrDefault类似,但是如果原始Observable发射超过一个的数据,会以错误通知终止。

  1. Observable.from(arr)
  2. .singleOrDefault(-1)
  3. .subscribe(
  4. { println("Item: $it") },
  5. { println("Error: ${it.message}") },
  6. { println("Sequence complete") })

val arr = emptyArray<Int>(),打印

Item: -1
Sequence complete

val arr = arrayOf(1, 2, 3, 4, 5),打印

Error: Sequence contains too many elements

7.5.9 singleOrDefault(T,Func1)

single

firstOrDefault(T, Func1)类似,如果没有数据满足条件,返回默认值;如果有多个数据满足条件,以错误通知终止。

  1. Observable.from(arr)
  2. .singleOrDefault(-1, { it % 2 == 0 })
  3. .subscribe(
  4. { println("Item: $it") },
  5. { println("Error: ${it.message}") },
  6. { println("Sequence complete") })

first系列的这几个操作符默认不在任何特定的调度器上执行。

7.6 IgnoreElements

不发射任何数据,只发射Observable的终止通知(正常终止,或者错误终止)。

ignoreElements

ignoreElements默认不在任何特定的调度器上执行。

7.7 Last

只发射最后一项(或者满足某个条件的最后一项)数据

last

在某些实现中,Last没有实现为一个返回Observable的过滤操作符,而是实现为一个在当时就发射原始Observable指定数据项的阻塞函数。在这些实现中,如果你想要的是一个过滤操作符,最好使用TakeLast(1)

在RxJava中的实现是lastlastOrDefault

可能容易混淆,BlockingObservable也有名叫lastlastOrDefault的操作符,它们会阻塞并返回值,不是立即返回一个Observable。

7.7.1 过滤操作符

last

只发射最后一项数据,使用没有参数的last操作符。

示例代码

  1. Observable.just(1, 2, 3, 4, 5)
  2. .last()
  3. .subscribe(
  4. { println("Item:$it") },
  5. { println("Error:${it.message}") },
  6. { println("Sequence complete") })

last

这个版本的last返回一个发射原始Observable中满足条件的最后一项数据的Observable。

  1. Observable.just(1, 2, 3, 4, 5)
  2. .last { it < 3 }
  3. .subscribe(
  4. { println("Item:$it") },
  5. { println("Error:${it.message}") },
  6. { println("Sequence complete") })

lastOrDefaultlast类似,不同的是,如果原始Observable没有发射任何值,它发射你指定的默认值。

last

这个版本的lastOrDefault可以接受一个谓词函数,如果有数据满足条件,返回的Observable就发射原始Observable满足条件的最后一项数据,否则发射默认值。

lastlastOrDefault默认不在任何特定的调度器上执行。

7.8 Sample

通过另一个Observable充当定时器,筛选距离定时器最近的数据进行发射。

sample

Sample操作符定时查看一个Observable,然后发射自上次采样以来它最近发射的数据。

在某些实现中,有一个ThrottleFirst操作符的功能类似,但不是发射采样期间的最近的数据,而是发射在那段时间内的第一项数据。

RxJava将这个操作符实现为samplethrottleLast

注意:如果自上次采样以来,原始Observable没有发射任何数据,这个操作返回的Observable在那段时间内也不会发射任何数据。

sample

sample(别名throttleLast)的一个变体按照你参数中指定的时间间隔定时采样(TimeUnit指定时间单位)。

sample的这个变体默认在computation调度器上执行,但是你可以使用第三个参数指定其它的调度器。

  1. Observable.interval(50L, TimeUnit.MILLISECONDS, Schedulers.immediate())
  2. .limit(10)
  3. .sample(100L, TimeUnit.MILLISECONDS, Schedulers.newThread())
  4. .subscribe(
  5. { println("Item:$it") },
  6. { println("Error:${it.message}") },
  7. { println("Sequence complete") })

sample

sample的这个变体每当第二个Observable发射一个数据(或者当它终止)时就对原始Observable进行采样。第二个Observable通过参数传递给sample

sample的这个变体默认不在任何特定的调度器上执行。

  1. Observable.interval(50L, TimeUnit.MILLISECONDS, Schedulers.immediate())
  2. .limit(10)
  3. .sample(Observable.interval(100L, TimeUnit.MILLISECONDS, Schedulers.newThread()))
  4. .subscribe(
  5. { println("Item:$it") },
  6. { println("Error:${it.message}") },
  7. { println("Sequence complete") })
Item:1
Item:2
Item:4
Item:6
Item:8
Item:10
Sequence complete

throttleFirst

throttleFirstthrottleLast/sample不同,在每个采样周期内,它总是发射原始Observable的第一项数据,而不是最近的一项。

throttleFirst操作符默认在computation调度器上执行,但是你可以使用第三个参数指定其它的调度器。

7.9 Skip

抑制Observable发射的前N项数据

skip

使用Skip操作符,你可以忽略Observable'发射的前N项数据,只保留之后的数据。

skip

RxJava中这个操作符叫skipskip的这个变体默认不在任何特定的调度器上执行。

skip

skip的这个变体接受一个时长而不是数量参数。它会丢弃原始Observable开始的那段时间发射的数据,时长和时间单位通过参数指定。

skip的这个变体默认在computation调度器上执行,但是你可以使用第三个参数指定其它的调度器。

7.10 SkipLast

抑制Observable发射的后N项数据

skipLast

使用SkipLast操作符修改原始Observable,你可以忽略Observable'发射的后N项数据,只保留前面的数据。

skipLast

使用SkipLast操作符,你可以忽略原始Observable发射的后N项数据,只保留之前的数据。注意:这个机制是这样实现的:延迟原始Observable发射的任何数据项,直到它发射了N项数据。

skipLast的这个变体默认不在任何特定的调度器上执行。

skipLast

还有一个skipLast变体接受一个时长而不是数量参数。它会丢弃在原始Observable的生命周期内最后一段时间内发射的数据。时长和时间单位通过参数指定。

注意:这个机制是这样实现的:延迟原始Observable发射的任何数据项,直到自这次发射之后过了给定的时长。

skipLast的这个变体默认在computation调度器上执行,但是你可以使用第三个参数指定其它的调度器。

7.11 Take

只发射前面的N项数据

take

使用Take操作符让你可以修改Observable的行为,只返回前面的N项数据,然后发射完成通知,忽略剩余的数据。

take

RxJava将这个操作符实现为take函数。

如果你对一个Observable使用take(n)(或它的同义词limit(n))操作符,而那个Observable发射的数据少于N项,那么take操作生成的Observable不会抛异常或发射onError通知,在完成前它只会发射相同的少量数据。

示例代码

  1. Observable.just(1, 2, 3, 4, 5, 6)
  2. .take(3)
  3. .subscribe(
  4. { println("Item: $it") },
  5. { println("Error: ${it.message}") },
  6. { println("Sequence complete") })
Item: 1
Item: 2
Item: 3
Sequence complete

take(int)默认不任何特定的调度器上执行。

take

take的这个变体接受一个时长而不是数量参数。它会丢发射Observable开始的那段时间发射的数据,时长和时间单位通过参数指定。

take的这个变体默认在computation调度器上执行,但是你可以使用第三个参数指定其它的调度器。

7.12 TakeLast

发射Observable发射的最后N项数据

takeLast

使用TakeLast操作符修改原始Observable,你可以只发射Observable'发射的后N项数据,忽略前面的数据。

7.12.1 taskLast.n

takeLast

使用takeLast操作符,你可以只发射原始Observable发射的后N项数据,忽略之前的数据。注意:这会延迟原始Observable发射的任何数据项,直到它全部完成。

takeLast的这个变体默认不在任何特定的调度器上执行。

7.12.2 takeLast.t

takeLast

还有一个takeLast变体接受一个时长而不是数量参数。它会发射在原始Observable的生命周期内最后一段时间内发射的数据。时长和时间单位通过参数指定。

注意:这会延迟原始Observable发射的任何数据项,直到它全部完成。

takeLast的这个变体默认在computation调度器上执行,但是你可以使用第三个参数指定其它的调度器。

7.12.3 takeLastBuffer

takeLast

还有一个操作符叫takeLastBuffer,它和takeLast类似,,唯一的不同是它把所有的数据项收集到一个List再发射,而不是依次发射一个。

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注