[关闭]
@Rays 2017-03-03T06:37:43.000000Z 字数 14100 阅读 1721

测试RxJava2

RxJava


摘要: 你已经探索了如何在代码中使用响应式编程,但是还需要考虑如何在代码库中对响应式编程进行测试。在本文中,Java冠军程序员Andres Almiray介绍了RxJava2的测试技术和工具。本文是“测试RxJava”一文的修订,根据RxJava2规范做了全面更新。

作者: Andres Almiray

正文:

译者注:关注InfoQ资讯的读者可能已经留意到,我们前面给出了一篇很好的RxJava文章“测试RxJava”。本文是上一篇文章的修订,用相同的例子循序渐进地介绍了如何测试RxJava2。译者将两篇文章中的不同之处用粗体标识出来,并使用添加注释的形式说明示例代码中的差异之处,以供读过前篇的读者快速浏览本文。

关键要点:

  • RxJava含有内建的、测试友好的解决方案。
  • 使用TestSubscriber去验证Observable。
  • 使用TestScheduler可实现对时间的严格控制。
  • Awaitility库提供了对测试环境进一步的控制。

本文是“测试RxJava”一文的修订,根据RxJava2规范做了全面更新。

你已经阅读过RxJava的相关内容,也已经在互联网上体验过像“RxJava实例解析”中的那些示例,现在打算在自己的代码中探索一下响应式编程了。但是,现在却一直困扰着如何测试那些可能会在代码库中发现的新功能呢?

使用响应式编程,就必须转变对给定问题的推理方式,因为我们要聚焦于作为事件流的流动数据,而非个别数据项。事件通常是被不同的线程所产生和消费,因此在编写测试时必须要对并发问题有着清晰的认识。幸运的是,RxJava提供了测试Observable和Subscription的内建支持,并且是直接构建于RxJava的核心依赖中。

第一步

让我们回顾一下在“RxJava by Example”一文中所给出的那个词汇的例子,看一下如何对该例子作测试。让我们从基础测试工具的设置开始。在我们的测试架构中,使用了JUnit作为测试工具。

  1. import io.reactivex.Observable;
  2. import io.reactivex.observers.TestObserver;
  3. import io.reactivex.plugins.RxJavaPlugins;
  4. import io.reactivex.schedulers.Schedulers;
  5. //RxJava2中,包名由rx.xxx改为io.reactivex.xxx。
  6. import org.junit.Test;
  7. import java.util.*;
  8. import static java.util.concurrent.TimeUnit.SECONDS;
  9. import static org.awaitility.Awaitility.await;
  10. import static org.junit.Assert.assertThat;
  11. import static org.hamcrest.Matchers.*;
  12. public class RxJavaTest {
  13. private static final List<String> WORDS = Arrays.asList(
  14. "the",
  15. "quick",
  16. "brown",
  17. "fox",
  18. "jumped",
  19. "over",
  20. "the",
  21. "lazy",
  22. "dog"
  23. );
  24. }

事实上在没有给定调度器(Scheduler)的情况下,Subscription将默认运行于调用线程上。因此我们将在首个测试中使用原生的方法。这意味着我们可实现一个Subscription接口的对象,在Subscription发生后就立刻对其状态做断言(assert)。

  1. @Test
  2. public void testInSameThread() {
  3. // given:
  4. List<String> results = new ArrayList<>();
  5. //Observable的from方法改为fromIterable。
  6. Observable<String> observable = Observable.fromIterable(WORDS)
  7. .zipWith(Observable.range(1, Integer.MAX_VALUE),
  8. (string, index) -> String.format("%2d. %s", index, string));
  9. // when:
  10. observable.subscribe(results::add);
  11. // then:
  12. assertThat(results, notNullValue());
  13. assertThat(results, hasSize(9));
  14. assertThat(results, hasItem(" 4. fox"));
  15. }

注意这里使用了显式的List<String>容器,与实际订阅者一起累计结果。由于给定的测试很简单,所以可能会使你认为这种显式累加器的方法已经足够好了。但是切记产品级的Observable中可能封装了错误或可能产生意外的事件。例子中的Subscriber与累加器的简单组合并不足以覆盖这种情况。但不用为此烦恼,RxJava提供的TestSubscriber类型就是用于处理这种情况的。下面我们使用TestSubscriber类型重构上面的测试。

  1. @Test
  2. //例子中所有的Subscriber改为Observer。
  3. public void testUsingTestObserver() {
  4. // given:
  5. TestObserver<String> observer = new TestObserver<>();
  6. //Observable的from方法改为fromIterable。
  7. Observable<String> observable = Observable.fromIterable(WORDS)
  8. .zipWith(Observable.range(1, Integer.MAX_VALUE),
  9. (string, index) -> String.format("%2d. %s", index, string));
  10. // when:
  11. observable.subscribe(observer);
  12. // then:
  13. observer.assertComplete();
  14. observer.assertNoErrors();
  15. observer.assertValueCount(9);
  16. //getOnNextEvents方法该为values方法。
  17. assertThat(observer.values(), hasItem(" 4. fox"));
  18. }

TestObserver不仅可替代用户累加器,还另给出了一些行为。例如它能够给出接收到的消息和每个事件相关数据的规模,它也可对Subscription被完成且在Observable消费期间没有错误出现的状态做断言。虽然当前测试中的Observable并未生成任何的错误,但是回到“RxJava by Example”一文,我们从中得知了Observable将例外与数据事件等同对待。我们可通过如下的方式通过连接例外事件而模拟错误:

  1. @Test
  2. //例子中所有的Subscriber改为Observer。
  3. public void testFailure() {
  4. // given:
  5. TestObserver<String> observer = new TestObserver<>();
  6. Exception exception = new RuntimeException("boom!");
  7. //Observable的from方法改为fromIterable。
  8. Observable<String> observable = Observable.fromIterable(WORDS)
  9. .zipWith(Observable.range(1, Integer.MAX_VALUE),
  10. (string, index) -> String.format("%2d. %s", index, string))
  11. .concatWith(Observable.error(exception));
  12. // when:
  13. observable.subscribe(observer);
  14. // then:
  15. observer.assertError(exception);
  16. observer.assertNotComplete();
  17. }

在我们所给出的有限用例中,所有的机制运行良好。但是实际的产品代码可能会完全不同于例子。因此在下文中,我们将考虑一些更加复杂的产品实例。

定制调度器(Scheduler)

在产品代码中,很多用例中的Observable都是在特定的线程上执行,这种线程在响应式编程环境中被称为“调度器(Scheduler)”。很多Observable操作将可选的调度器参数作为附加参数使用。RxJava定义了一系列任何时候都可用的命名调度器,包括IO调度器(io)、计算调度器(computation,为共享线程)和新线程调度器(newThread)。开发人员也可去实现个人定制的调度器。让我们通过指定计算调度器来修改Observable的代码吧。

  1. @Test
  2. //例子中所有的Subscriber改为Observer。
  3. public void testUsingComputationScheduler() {
  4. // given:
  5. TestObserver<String> observer = new TestObserver<>();
  6. //Observable的from方法改为fromIterable。
  7. Observable<String> observable = Observable.fromIterable(WORDS)
  8. .zipWith(Observable.range(1, Integer.MAX_VALUE),
  9. (string, index) -> String.format("%2d. %s", index, string));
  10. // when:
  11. observable.subscribeOn(Schedulers.computation())
  12. .subscribe(observer);
  13. //修订版中新添加语句。
  14. await().timeout(2, SECONDS)
  15. .until(observer::valueCount, equalTo(9));
  16. // then:
  17. observer.assertComplete();
  18. observer.assertNoErrors();
  19. assertThat(observer.values(), hasItem(" 4. fox"));
  20. }

当运行时就会立刻发现该代码是存在问题的。Subscriber在测试线程上执行其断言,但是Observable在后台线程(计算线程)上生成值。这意味着执行Subscriber断言可能先于Observable生成所有相关事件,因而导致测试的失败。

为使测试顺利执行,有如下的一些策略可选:

我们将对每个策略做展开介绍,但将从“将Observable转化为阻塞式”开始,因为实现该策略所需做的技术工作最少,这些工作与所使用的调度器无关。我们假设数据在后台线程中生成,这将导致Subscriber从同一后台线程得到通知。

我们要做的是强制生成所有的事件,并在下一个声明被执行前就在测试中完成Observable。这是通过在Observable自身上调用blockingIterable()方法实现的。

  1. @Test
  2. public void testUsingBlockingCall() {
  3. // given:
  4. //Observable的from方法改为fromIterable。
  5. Observable<String> observable = Observable.fromIterable(WORDS)
  6. .zipWith(Observable.range(1, Integer.MAX_VALUE),
  7. (string, index) -> String.format("%2d. %s", index, string));
  8. // when:
  9. //RxJava2中,toBlocking()和toIterable()方法改为blockingIterable()
  10. Iterable<String> results = observable
  11. .subscribeOn(Schedulers.computation())
  12. .blockingIterable();
  13. // then:
  14. assertThat(results, notNullValue());
  15. assertThat(results, iterableWithSize(9));
  16. assertThat(results, hasItem(" 4. fox"));
  17. }

该方法虽然适用于我们所给出的简单代码,但可能并不适用于实际的产品代码。如果生产者生成所有的数据需要很长的时间,那将会产生什么后果?这将使测试变得非常慢,并增加了编译时间,还可能会有其它的性能问题。幸运的是,TestObserver提供了一系列方法,强制测试等待事件的结束。下面给出了一种实现方法:

  1. //修订版中新添加的代码段。
  2. @Test
  3. public void testUsingComputationScheduler() {
  4. // given:
  5. TestObserver<String> observer = new TestObserver<>();
  6. Observable<String> observable = Observable.fromIterable(WORDS)
  7. .zipWith(Observable.range(1, Integer.MAX_VALUE),
  8. (string, index) -> String.format("%2d. %s", index, string));
  9. // when:
  10. observable.subscribeOn(Schedulers.computation())
  11. .subscribe(observer);
  12. observer.awaitTerminalEvent(2, SECONDS);
  13. // then:
  14. observer.assertComplete();
  15. observer.assertNoErrors();
  16. assertThat(observer.values(), hasItem(" 4. fox"));
  17. }

如果这些方法还不足以满足需求,这里我推荐一个便利的程序库,就是Awaitility。简单地说,Awaitility是一个以精确、简单易读的方式对异步系统相关期望进行表述的DSL。在项目中可以用Maven添加Awaitility的依赖关系。

  1. <dependency>
  2. <groupId>org.awaitility</groupId>
  3. <artifactId>awaitility</artifactId>
  4. <version>2.0.0</version>
  5. <scope>test</scope>
  6. </dependency>

或是使用Gradle:

  1. testCompile 'org.awaitility:awaitility:2.0.0'

Awaitility DSL的接入点是org.awaitility.Awaitility.await()方法(参见下面例子中的第13和14行代码)。可以使用Awaitility定义使测试继续所必须达成的条件,也可在条件中加入超时或其它的时序约束,例如最小、最大或持续范围。对于上面的例子,下面的代码给出了如何在结果中使用Awaitility:

  1. @Test
  2. //例子中所有的Subscriber改为Observer。
  3. public void testUsingComputationScheduler_awaitility() {
  4. // given:
  5. TestObserver<String> observer = new TestObserver<>();
  6. //Observable的from方法改为fromIterable。
  7. Observable<String> observable = Observable.fromIterable(WORDS)
  8. .zipWith(Observable.range(1, Integer.MAX_VALUE),
  9. (string, index) -> String.format("%2d. %s", index, string));
  10. // when:
  11. observable.subscribeOn(Schedulers.computation())
  12. .subscribe(observer);
  13. await().timeout(2, SECONDS)
  14. .until(observer::valueCount, equalTo(9));
  15. // then:
  16. observer.assertComplete();
  17. observer.assertNoErrors();
  18. //getOnNextEvents()方法改为values()方法。
  19. assertThat(observer.values(), hasItem(" 4. fox"));
  20. }

此版本测试并未以任何方式改变Observable的本质,这使得你做测试时不必对产品代码做任何改动。该版本测试使用最多2秒的等待时间通过检查Subscriber状态使Observable执行其作业。如果一切进行顺利,在2秒内就可将Subscriber的状态释放给所有的9个事件。

Awaitility具有和Hamcrest的匹配符、Java 8的lambda表达式和方法引用等的良好协作,从而给出精确的和可读的测试条件。Awaitility还提供了预制扩展,用于那些被广泛使用的JVM语言,其中包括Groovy和Scala。

我们要给出最后一个策略中使用了RxJava的扩展机制,该扩展是以RxJava API的组成部分发布的。RxJava中定义了一系列的扩展点,允许对几乎任何默认的RxJava行为进行微调。这种扩展机制使我们可以针对特定的RxJava特性提供修改过的值。利用该机制,在无需关心生成代码中所指定的调度器的情况下,我们可在测试中注入选定的调度器。这正是我们所寻找的方法,该方法被封装在RxJavaHooks类中。假设产品代码依赖于计算调度器,我们将覆盖它的默认值,返回一个调度器,它作为被调用的代码使事件处理发生,这是入队调度器(Schedulers.trampoline())。下面给出测试的代码:

  1. @Test
  2. //名称中的Hook改为Plugins,代码中所有subscriber改为observer。
  3. public void testUsingRxJavaPluginsWithImmediateScheduler() {
  4. // given:
  5. //调度器由immediate改为trampoline。
  6. RxJavaPlugins.setComputationSchedulerHandler(scheduler ->
  7. Schedulers.trampoline());
  8. TestObserver<String> observer = new TestObserver<>();
  9. //Observable的from方法改为fromIterable。
  10. Observable<String> observable = Observable.fromIterable(WORDS)
  11. .zipWith(Observable.range(1, Integer.MAX_VALUE),
  12. (string, index) -> String.format("%2d. %s", index, string));
  13. try {
  14. // when:
  15. observable.subscribeOn(Schedulers.computation())
  16. .subscribe(observer);
  17. // then:
  18. observer.assertComplete();
  19. observer.assertNoErrors();
  20. observer.assertValueCount(9);
  21. assertThat(observer.values(), hasItem(" 4. fox"));
  22. } finally {
  23. RxJavaPlugins.reset();
  24. }
  25. }

在测试中,产品代码察觉不到计算调度器是即刻的。请注意钩子函数必须被重置,否则即刻调度器的设置可能会发生泄漏,导致在各处的测试被破坏。使用try/finall代码块会在一定程度上模糊了测试的目的,但是幸运的是我们可以使用JUnit规则重构该行为,使测试更加精炼,结果更可读。下面给出使用上述规则的一种可能的实现代码:

  1. //由Public改为private static
  2. private static class ImmediateSchedulersRule implements TestRule {
  3. @Override
  4. public Statement apply(final Statement base, Description description) {
  5. return new Statement() {
  6. @Override
  7. //所有的调度器由immediate改为trampline。
  8. public void evaluate() throws Throwable {
  9. RxJavaPlugins.setIoSchedulerHandler(scheduler ->
  10. Schedulers.trampoline());
  11. RxJavaPlugins.setComputationSchedulerHandler(scheduler ->
  12. Schedulers.trampoline());
  13. RxJavaPlugins.setNewThreadSchedulerHandler(scheduler ->
  14. Schedulers.trampoline());
  15. try {
  16. base.evaluate();
  17. } finally {
  18. RxJavaPlugins.reset();
  19. }
  20. }
  21. };
  22. }
  23. }

此外,我们还对另外两个调度器的生成方法做了重写。该规则对此后其它的测试目标更为通用。在新的测试用例类中,该规则的使用方法很直接,只需简单地定义一个域,并将其中新类型标注为@Rule即可。示例代码如下:

  1. @Rule
  2. public final ImmediateSchedulersRule schedulers =
  3. new ImmediateSchedulersRule();
  4. @Test
  5. //例子中所有的Subscriber改为Observer。
  6. public void testUsingImmediateSchedulersRule() {
  7. // given:
  8. TestObserver<String> observer = new TestObserver<>();
  9. //Observable的from方法改为fromIterable。
  10. Observable<String> observable = Observable.fromIterable(WORDS)
  11. .zipWith(Observable.range(1, Integer.MAX_VALUE),
  12. (string, index) -> String.format("%2d. %s", index, string));
  13. // when:
  14. observable.subscribeOn(Schedulers.computation())
  15. .subscribe(observer);
  16. // then:
  17. observer.assertComplete();
  18. observer.assertNoErrors();
  19. observer.assertValueCount(9);
  20. assertThat(observer.values(), hasItem(" 4. fox"));
  21. }

最终我们可得到与前面测试一样的行为,却没有像前面测试那样的杂乱。下面用一些篇幅来回顾一下我们目前已经做到的事情:

上述的每个技术都作用于不同的场景中,但是所有技术都是通过“共同的线程”(译者注:作者在原文中指出common thread是作为双关语使用的,其另一个意思是“类似的思路”)相关联:在对Subscriber状态做断言之前,测试代码需等待Observable完成。考虑到Observable的行为会生成数据,是否有方法对该行为进行检查呢?换句话说,是否可以用编程的方式做Observable的现场调试?我们将在后文中给出这样的技术。

操控时间

到目前为止我们已用黑箱方式测试了Observable和Subscription。下面我们将考虑另外一种操控时间的技术,该技术使我们可以在Observable依然处于活动状态时,打开引擎盖去查看Subscriber状态。换句话说,我们将使用采用了RxJava的TestScheduler类白箱测试技术,这可以说是RxJava再一次来救场。这种特定的调度器可精确地设定时间的内部使用方式,例如可将时间提前半秒,或是使时间跳跃5秒。我们将首先给出这种新调度器实例的创建方法,然后再讨论代码的测试。

  1. @Test
  2. //例子中所有的Subscriber改为Observer。
  3. public void testUsingTestScheduler() {
  4. // given:
  5. TestScheduler scheduler = new TestScheduler();
  6. TestObserver<String> observer = new TestObserver<>();
  7. Observable<Long> tick = Observable.interval(1, SECONDS, scheduler);
  8. Observable<String> observable = Observable.fromIterable(WORDS)
  9. .zipWith(tick,
  10. (string, index) -> String.format("%2d. %s", index, string));
  11. observable.subscribeOn(scheduler)
  12. .subscribe(observer);
  13. // expect:
  14. observer.assertNoValues();
  15. observer.assertNotComplete();
  16. // when:
  17. scheduler.advanceTimeBy(1, SECONDS);
  18. // then:
  19. observer.assertNoErrors();
  20. observer.assertValueCount(1);
  21. observer.assertValues(" 0. the");
  22. // when:
  23. scheduler.advanceTimeTo(9, SECONDS);
  24. observer.assertComplete();
  25. observer.assertNoErrors();
  26. observer.assertValueCount(9);
  27. }

该“产品”代码有了略微的改变,这是由于我们使用了绑定到调度器时隙(interval())的方法生成计数(第6行),而非生成一个计数的范围。但这样做具有一个副作用,就是计数是从零开始生成的,而非从1开始。一旦配置了Observable和测试调度器,我们立刻做出这样的断言,即假定Subscriber不具有值(第15行)且没有被完成或生成任何的错误(第16行)。这是一个完整性测试,因为此时调度器并没有被移动,因而没有任何值被Observable产生或是被Subscriber接收到。

下面将时间向前调1整秒(第21行),该操作将会导致Observable生成第一个值,这正是随后的断言集所要检查的(第24到26行)。

下面将时间从当前时间调到9秒。需要注意的是,这意味着将时间准确地调整为调度器启动后的第9秒(并非是向前调1秒后再向前调9秒,即调度器检查启动后的第10秒)。换句话说,advanceTimeBy()方法将调度器的时间调整为相对于当前位置的时间,而advanceTimeTo()以绝对的方式调整时间。此后我们做出下一轮的断言(第30到32行),用于确保所有的数据由Observable生成且被Subscriber消费。另一件需要说明的事情就是使用TestScheduler时,真实的时间是立刻发生调整的,这着意味着测试并不用实际等待9秒才去完成。

正如你所看到的,该调度器的使用是非常便利的,仅需将该调度器提供给正在测试的Observable即可。但是对使用了指定类型调度器的Observable,该调度器并不能很好地适用。但是稍等一下,之前我们看到的是如何使用RxJavaPlugins切换一个不影响生产代码的调度器,而这一次是提供一个代替即刻调度器的TestScheduler。我们甚至可以apply定制JUnit规则同样的技术,使之前的代码可以用更重用的方式予以重写。首先该新规则为:

  1. //由public改为private static,所有Hooks改为Plugins。
  2. private static class TestSchedulerRule implements TestRule {
  3. private final TestScheduler testScheduler = new TestScheduler();
  4. public TestScheduler getTestScheduler() {
  5. return testScheduler;
  6. }
  7. @Override
  8. public Statement apply(final Statement base, Description description) {
  9. return new Statement() {
  10. @Override
  11. public void evaluate() throws Throwable {
  12. RxJavaPlugins.setIoSchedulerHandler(scheduler ->
  13. testScheduler);
  14. RxJavaPlugins.setComputationSchedulerHandler(scheduler ->
  15. testScheduler);
  16. RxJavaPlugins.setNewThreadSchedulerHandler(scheduler ->
  17. testScheduler);
  18. try {
  19. base.evaluate();
  20. } finally {
  21. RxJavaPlugins.reset();
  22. }
  23. }
  24. };
  25. }
  26. }

紧接着是实际的测试代码(在一个新的测试用例类中),去使用我们的测试规则:

  1. @Rule
  2. public final TestSchedulerRule testSchedulerRule = new TestSchedulerRule();
  3. @Test
  4. //例子中所有的Subscriber改为Observer。
  5. public void testUsingTestSchedulersRule() {
  6. // given:
  7. TestObserver<String> observer = new TestObserver<>();
  8. Observable<String> observable = Observable.fromIterable(WORDS)
  9. .zipWith(Observable.interval(1, SECONDS),
  10. (string, index) -> String.format("%2d. %s", index, string));
  11. observable.subscribeOn(Schedulers.computation())
  12. .subscribe(observer);
  13. // expect
  14. observer.assertNoValues();
  15. observer.assertNotComplete();
  16. // when:
  17. testSchedulerRule.getTestScheduler().advanceTimeBy(1, SECONDS);
  18. // then:
  19. observer.assertNoErrors();
  20. observer.assertValueCount(1);
  21. observer.assertValues(" 0. the");
  22. // when:
  23. testSchedulerRule.getTestScheduler().advanceTimeTo(9, SECONDS);
  24. observer.assertComplete();
  25. observer.assertNoErrors();
  26. observer.assertValueCount(9);
  27. }

这样你就成功地实现了它。使用经由RxJavaHooks注入TestScheduler的方法,可在无需更改原始Observable组合的情况下编写测试代码,此外它给出了一种在observable自身执行期间改变时间、并在特定点上做断言的方法。在本文中给出的所有这些技术,应该足够你选择用来测试RxJava的代码了。

未来

RxJava是最先为Java提供响应式编程能力的程序库之一。为了使RxJava API更好地符合Reactive Streams规范,即将推出的2.0版将会是重新设计的。Reactive Streams规范以Java和JavaScript运行时为目标,提供了使用非阻塞背压机制(back pressure)的异步流处理标准。这意味着下一版的RxJava中将会出现一些API改进。对这些改进的详细描述参见RxJava wiki

对于测试而言,这些核心类型(Observable、Maybe和Single)现在都给出了便利易用的test()方法,实现现场创建TestSubscriber实例。也可在TestSubscriber上链接方法调用,对这类用法也有一些新的断言方法。

本文是“测试RxJava”一文的修订。

关于作者

Andres Almiray是一位Java/Groovy开发者和Java冠军程序员,具有超过17年的软件设计和开发经验。他在Java早期推出的时代就参与了Web和桌面应用的开发。他是开源软件的忠实信徒,参与了Groovy、JMatter、Asciiidoctor等广为人知的项目。他是Griffon架构的创始者和现任项目领导者,还是JSR 377规范的牵头者。

查看英文原文: Testing RxJava2

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注