[关闭]
@Rays 2017-02-21T08:49:21.000000Z 字数 20568 阅读 2940

通过例子学RXJava2 by Example

RxJava


摘要: In the ongoing evolution of paradigms for simplifying concurrency under load, the most promising addition is reactive programming, a specification that provides tools for handling asynchronous streams of data and for managing flow-control, making it easier to reason about overall program design. In this article we overcome the learning curve with a gentle progression of examples.

作者: Victor Grazi

正文:

本文要点

  • 响应式编程是一种处理异步数据流的规范。Reactive programming is a specification for dealing with asynchronous streams of data
  • 响应式提供了用于转化并组合数据流的工具,也是管理流控制的工具。Reactive provides tools for transforming and combining streams and for managing flow-control
  • 弹子图(Marble diagrams)提供了可视化响应式结构的交互画布(Canvas)。Marble diagrams provide an interactive canvas for visualizing reactive constructs
  • 仿Java Stream API,但是这种模仿完全是表面上的。Resembles Java Streams API but the resemblance is purely superficial
  • 附属到活动(hot)流,衰减并处理异步数据传送。Attach to hot streams to attenuate and process asynchronous data feeds

正在进行的对负载条件下简化并发的编程范例变革中,我们已经看到采用的技术包括:java.util.concurrent、Akka streams、CompletableFuture和Netty这样的框架。最近,响应式编程由于自身的能力和稳定的工具集,已经成为突发性地受到广泛关注。
In the ongoing evolution of programming paradigms for simplifying concurrency under load, we have seen the adoption of java.util.concurrent, Akka streams, CompletableFuture, and frameworks like Netty. Most recently, reactive programming has been enjoying a burst of popularity thanks to its power and its robust tool set.

响应式编程是处理异步数据流的一种规范,提供了用于转换和组合流的工具,并可用做管理流控制的工具,这使得响应式编程易于推出你的整体程序设计。
Reactive programming is a specification for dealing with asynchronous streams of data, providing tools for transforming and combining streams and for managing flow-control, making it easier to reason about your overall program design.

但是响应式编程入门不易,的确存在学习曲线。对于我们之中的数学家,可以回忆学习从标准代数的标量到线性代数的向量、矩阵和张量,本质上数据流被作为基本单元对待。不同于使用对象的传统编程,响应式编程的基本单元是事件流。事件可以以对象、数据传送、鼠标移动甚至是异常等形式提供。“异常”一词表达了对异常处理的传统概念,正如在下面的句子中,这被认为是要发生的,这里是异常。在响应式编程中,异常是一等公民,象这样处理每个位。鉴于流通常是异步的,抛出异常是没有意义的,因此异常在流中以事件形式传递。
But easy it is not, and there is definitely a learning curve. For the mathematicians among us it is reminiscent of the leap from learning standard algebra with its scalar quantities, to linear algebra with its vectors, matrices, and tensors, essentially streams of data that are treated as a unit. Unlike traditional programming that considers objects, the fundamental unit of reactive reasoning is the stream of events. Events can come in the form of objects, data feeds, mouse movements, or even exceptions. The word “exception” expresses the traditional notion of an exceptional handling, as in - this is what is supposed to happen and here are the exceptions. In reactive, exceptions are first class citizens, treated every bit as such. Since streams are generally asynchronous, it doesn’t make sense to throw an exception, so instead any exception is passed as an event in the stream.

本文中我们将考虑响应式编程的基础,对内在的重要概念采用教学视角。
In this article we will consider the fundamentals of reactive programming, with a pedagogical eye on internalizing the important concepts.

首先要记住的是在响应式总一切皆是流(Stream)。Observable是包裹流的基本单元。流可以包括零到多个事件,这些事件可以是已完成的,也可能是尚未完成的;可能会产生错误,也可能没有错误。一旦流完成了或产生了错误,流本质上就完成了,虽然有工具可以在发送异常时重新尝试或替换不同流。
First thing to keep in mind is that in reactive everything is a stream. Observable is the fundamental unit that wraps a stream. Streams can contain zero or more events, and may or may not complete, and may or may not issue an error. Once a stream completes or issues an error, it is essentially done, although there are tools for retrying or substituting different streams when an exception occurs.

在你尝试我们的例子前,将RxJava的依赖关系包括到你的代码库中。你可以使用下面的依赖从Maven加载:
Before you try out our examples, include the RxJava dependencies in your code base. You can load it from Maven using the dependency:

  1. <dependency>
  2. <groupId>io.reactivex.rxjava2</groupId>
  3. <artifactId>rxjava</artifactId>
  4. <version>2.0.5</version>
  5. </dependency>

这里Observable类具有很多的静态工厂方法和操作符,每个适合不同Observable的生成,或者将他们附加到感兴趣的处理。Observable是不可变的,因此操作符总是生成一个新的Observable。要理解我们的代码例子,让我们重新看一下我们在本文后面的代码例子中将要使用的基本Observable操作符。
The Observable class has dozens of static factory methods and operators, each in a wide variety of flavors for generating new Observables, or for attaching them to processes of interest. Observables are immutable, so operators always produce a new Observable. To understand our code examples, let’s review the basic Observable operators that we'll be using in the code samples later in this article.

Observable仅是随一个完成之后生成一个发出单一通用实例的Observable。例如:
Observable.just produces an Observable that emits a single generic instance, followed by a complete. For example:

  1. Observable.just("Howdy!")

在完成之前创建一个发出单一事件(字符串“Howdy!”)的新Observable。
Creates a new Observable that emits a single event before completing, the String “Howdy!”

你可以将这个Observable赋值给一个Observable变量。
You can assign that Observable to an Observable variable

  1. Observable<String> hello = Observable.just("Howdy!");

但是这本身并不会让你走得很远,因为正如总所周知的森林中的一棵树,如果周围没有人听到它,那么他就不会发生。一个Observable必须具有一个订阅者,对其发出的事件做一些事情。幸好Java现在具有Lambdas,它允许我们以准确的描述风格表达我们的observable:
But that by itself won’t get you very far, because just like the proverbial tree that falls in a forest, if nobody is around to hear it, it does not make a sound. An Observable must have a subscriber to do anything with the events it emits. Thankfully Java now has Lambdas, which allow us to express our observables in a concise declarative style:

  1. Observable<String> howdy = Observable.just("Howdy!");
  2. howdy.subscribe(System.out::println);

发出一个群集的“Howdy!”。
which emits a gregarious "Howdy!"

类似于所有的Observable方法,just关键字被重载,因此你也可以说:
Like all Observable methods, the just keyword is overloaded and so you can also say

  1. Observable.just("Hello", "World")
  2. .subscribe(System.out::println);

给出输出:
Which outputs

  1. Hello
  2. World

对多达十个输入参数进行了重载。注意输出是在两个独立行商,意味着两个独立的输出事件。
just is overloaded for up to 10 input parameters. Notice the output is on two separate lines, indicating two separate output events.

让我们尝试提供一个列表,查看输出情况:
Let’s try supplying a list and see what happens:

  1. List<String> words = Arrays.asList(
  2. "the",
  3. "quick",
  4. "brown",
  5. "fox",
  6. "jumped",
  7. "over",
  8. "the",
  9. "lazy",
  10. "dog"
  11. );
  12. Observable.just(words)
  13. .subscribe(System.out::println);

输出是非常突兀的:
This outputs an abrupt

  1. [the, quick, brown, fox, jumped, over, the, lazy, dog]

我们很高兴每个单词是单独生成的,但是我们得到了单一生成组成了整个列表。为改正这个,我们调用更为适当的fromIterable方法:
We were expecting each word as a separate emission, but we got a single emission consisting of the whole list. To correct that, we invoke the more appropriate fromIterable method:

  1. Observable.fromIterable(words)
  2. .subscribe(System.out::println);
  3. which converts an array or iterable to a series of events, one per element.

(注意在rxjava1中,在方法中有一个重载。这已被多种方式替代,包括添加fromIterable和fromArray。)
(Note that in rxjava1 there was a single overloaded from method. This has been replaced with several flavors of from including fromIterable and fromArray.)

执行这样的代码,给出了更想要的多行输出:
Executing that provides the more desirable multiline output:

  1. the
  2. quick
  3. brown
  4. fox
  5. jumped
  6. over
  7. the
  8. lazy
  9. dog

最好在输出中每行编号。再一次,这是observable的工作。在我们对此编码前,让我们研究一下两个操作符:range和zip。range(i.n)创建了一个从i开始的n个数字的流。
It would be nice to get some numbering on that. Again, a job for observables.
Before we code that let’s investigate two operators, range and zip. range(i, n) creates a stream of n numbers starting with i.

  1. Observable.range(1, 5).subscribe(System.out::println);

输出:
outputs:

  1. 1
  2. 2
  3. 3
  4. 4
  5. 5

我们添加编号的问题将得以解决,如果我们具有一种合并范围流和我们的字流的方法。
Our problem of adding numbering would be solved if we had a way to combine the range stream with our word stream.

对于平滑以各种语言进行响应式编程的学习曲线,RX Marbles 是一个很好的网站。该网站特点是提供了交互式JavaScript渲染,用于很多响应式操作。每个通常“弹珠”交互式习语用于表示有操作符生成的一个或更多的源流和结果流。时间自左向右流动,事件用弹珠表示。你可以点击并拖拽源弹珠以查看他们是如何影响结果的。
RX Marbles is a great site for smoothing the reactive learning curve, in any language. The site features interactive JavaScript renderings for many of the reactive operations. Each uses the common “marbles” reactive idiom to depict one or more source streams and the result stream produced by the operator. Time passes from left to right, and events are represented by marbles. You can click and drag the source marbles to see how they affect the result.

一个快速精度揭示了zip操作,即医生定的。让我们看一下弹珠范例以更好的理解:
A quick perusal reveals the zip operation, just what the doctor ordered. Let’s look at the marble diagram to understand it better:

zip combines the elements of the source stream with the elements of a supplied stream, using a pairwise “zip” transformation mapping that you can supply in the form of a Lambda. When either of those streams completes, the zipped stream completes, so any remaining events from the other stream would be lost. zip accepts up to nine source streams and zip operations. There is a corresponding zipWith operator that zips a provided stream with the existing stream.
自拍组合源流的元素和提供流的元素,使用一对你可以用Lambda形式提供的“zip”转换映射。当两个流都完成之后,压缩的流完成,因此来自其他流的任何剩余事件将会丢失。zip接受多至9个源流和zip操作。有相应的zipWith操作符,使用现有流压缩一个提供的流。

回到我们的例子。我们可以使用range和zipWith去预先考虑我们的行数字,使用String.format作为我们的zip转换:
Coming back to our example. We can use range and zipWith to prepend our line numbers, using String.format as our zip transformation:

  1. Observable.fromIterable(words)
  2. .zipWith(Observable.range(1, Integer.MAX_VALUE),
  3. (string, count)->String.format("%2d. %s", count, string))
  4. .subscribe(System.out::println);

输出:
Which outputs:

  1. 1. the
  2. 2. quick
  3. 3. brown
  4. 4. fox
  5. 5. jumped
  6. 6. over
  7. 7. the
  8. 8. lazy
  9. 9. dog

看上去很好!注意zip和zipWith操作符一旦任何一个流完成就停止从所有流拉动。这就是为什么我们不受Integer.MAX_VALUE上限的恐吓的原因。
Looking good! Notice that the zip and zipWith operators stop pulling from all of the streams once any of the streams has completed. That is why we were not intimidated by the Integer.MAX_VALUE upper limit.

现在看一下我们如何不列出单词,而是列出组成这些单词的字母。这时对flatMap的一个任务,采用从一个Observable的发射(对象、集合或数组),并将这些元素映射到独自的Observable,然后将发射拉平为单一Observable。
Now let’s say we want to list not the words but the letters comprising those words. This is a job for flatMap, which takes the emissions (objects, collections, or arrays) from an Observable, and maps those elements to individual Observables, then flattens the emissions from all of those into a single Observable.

对于我们的例子,我们将使用split去将每个单词转换为一个包含单词成员字母的数组。继而flatMap这些,以创建一个有所有单词字母组成的新Observable:
For our example we will use split to transform each word into an array of its component characters. We will then flatMap those to create a new Observable consisting of all of the characters of all of the words:

  1. Observable.fromIterable(words)
  2. .flatMap(word -> Observable.fromArray(word.split("")))
  3. .zipWith(Observable.range(1, Integer.MAX_VALUE),
  4. (string, count) -> String.format("%2d. %s", count, string))
  5. .subscribe(System.out::println);

这个输出:
That outputs

  1. 1. t
  2. 2. h
  3. 3. e
  4. 4. q
  5. 5. u
  6. 6. i
  7. 7. c
  8. 8. k
  9. ...
  10. 30. l
  11. 31. a
  12. 32. z
  13. 33. y
  14. 34. d
  15. 35. o
  16. 36. g

给出了所有单词并统计。但是这里给出了太多数据,我们只要要不同的字母:
All words present and accounted for. But there’s too much data, we only want the distinct letters:

  1. Observable.fromIterable(words)
  2. .flatMap(word -> Observable.fromArray(word.split("")))
  3. .distinct()
  4. .zipWith(Observable.range(1, Integer.MAX_VALUE),
  5. (string, count) -> String.format("%2d. %s", count, string))
  6. .subscribe(System.out::println);

生成:
producing:

  1. 1. t
  2. 2. h
  3. 3. e
  4. 4. q
  5. 5. u
  6. 6. i
  7. 7. c
  8. 8. k
  9. 9. b
  10. 10. r
  11. 11. o
  12. 12. w
  13. 13. n
  14. 14. f
  15. 15. x
  16. 16. j
  17. 17. m
  18. 18. p
  19. 19. d
  20. 20. v
  21. 21. l
  22. 22. a
  23. 23. z
  24. 24. y
  25. 25. g

在孩时我就被教育了我们的“quick brwon fox”短语包含所有英语字母,但是我们这里只看到了25个字母,而非全部26个。对他们进行排序以找到丢失的那一个:
As a child I was taught that our “quick brown fox” phrase contained every letter in the English alphabet, but we see there are only 25 not 26. Let’s sort them to help locate the missing one:

  1. .flatMap(word -> Observable.fromIterable(word.split("")))
  2. .distinct()
  3. .sorted()
  4. .zipWith(Observable.range(1, Integer.MAX_VALUE),
  5. (string, count) -> String.format("%2d. %s", count, string))
  6. .subscribe(System.out::println);

产生输出:
That produces:

  1. 1. a
  2. 2. b
  3. 3. c
  4. ...
  5. 17. q
  6. 18. r
  7. 19. t
  8. 20. u
  9. 21. v
  10. 22. w
  11. 23. x
  12. 24. y
  13. 25. z

看上去应该是第19个字母“s”遗漏了。修正并生成期望的输出:
Looks like letter 19 “s” is missing. Correcting that produces the expected output

  1. List<String> words = Arrays.asList(
  2. "the",
  3. "quick",
  4. "brown",
  5. "fox",
  6. "jumped",
  7. "over",
  8. "the",
  9. "lazy",
  10. "dogs"
  11. );
  12. Observable.fromIterable(words)
  13. .flatMap(word -> Observable.fromArray(word.split("")))
  14. .distinct()
  15. .sorted()
  16. .zipWith(Observable.range(1, Integer.MAX_VALUE),
  17. (string, count) -> String.format("%2d. %s", count, string))
  18. .subscribe(System.out::println);

这个生成:
which yields:

  1. 1. a
  2. 2. b
  3. 3. c
  4. 4. d
  5. 5. e
  6. 6. f
  7. 7. g
  8. 8. h
  9. 9. i
  10. 10. j
  11. 11. k
  12. 12. l
  13. 13. m
  14. 14. n
  15. 15. o
  16. 16. p
  17. 17. q
  18. 18. r
  19. 19. s
  20. 20. t
  21. 21. u
  22. 22. v
  23. 23. w
  24. 24. x
  25. 25. y
  26. 26. z

这就好多了!
That’s a lot better!

目前为止,这所有看上去和Java 8中引入的Java Stream API十分相似。但是这种类似完全是巧合,因为响应式添加了更多内容。
So far, this all looks very similar to Java Streams API introduced in Java 8. But the resemblance is strictly coincidental, because reactive adds so much more.

Java Steam和Lambda表达式是有价值的语言添加,但是本质上,他们终究只是一种迭代集合并生成新集合的方法。他们是有限的、静态的,并且不提供于重用。甚至即使由Stream的并行操作符生成,他们离开并做他们自己的生存和连接,并只有在完成是返回,对程序有很小的控制。响应式与执行逮捕,引入了timing、throttling和流控制的概念,他们可以附加到“无线的”过程中,令人信服地永不终止。输出并非集合,而是可供你使用,无论你如何需求。
Java Streams and Lambda expressions were a valuable language addition, but in essence, they are, after all, nothing more tha a way to iterate collections and produce new collections. They are finite, static, and do not provide for reuse. Even when forked by the Stream parallel operator, they go off and do their own fork and join, and only return when done, leaving the program with little control. Reactive in contrast introduce the concepts of timing, throttling, and flow control, and they can attach to “infinite” processes that conceivably never end. The output is not a collection, but is available for you to deal with, however you require.

为更好地理解,让我们看一下更多的弹珠范例。
Let’s take a look at some more marble diagrams to get a better picture.

操作符merge合并9个源流为一个最终输出,保留次序。无需担心竞争条件,所有时间是“平整”为一个线程,包括所有异常和完成事件。
The merge operator merges up to nine source streams into the final output, preserving order. There is no need to worry about race conditions, all events are “flattened” onto a single thread, including any exception and completion events.

操作符debounce将所有指定时间延迟范围内的事件当成单一事件对待,只发射每个这样系列的最后一个事件:
The debounce operator treats all events within a specified time delay as a single event, emitting only the last in each such series:

随时间的延迟,你可以看到头部“1”和尾部“1”之间在时间上的差异。在“2,3,4,5”这个组中,每个元素都比前一个更少的延迟进入,因此他们被认为是一个,并且被debounce过滤掉了。如果我们将“5”稍微向右移出延迟窗口,就启动了一个新的debound窗口:
You can see the difference in time between the top “1” and the bottom “1” as the time delay. In the group 2, 3, 4, 5, each element is coming within less than that time delay from the previous, so they are considered one and debounced away. If we move the “5” a little bit to the right out of the delay window, it starts a new debounce window:

一个有意思的操作符是以可疑地定义的模棱两可的操作符amb,它的数组形式是ambArray
One interesting operator is the dubiously named ambiguous operator amb, and its array incarnation ambArray.

作为一个条件操作符,amb从它的所有输入流中选取首个流发射,并继续操作该流,忽略其他的流。在下面,第二个流是首个被推出的,因此结果选择该流并继续操作它。
amb is a conditional operator that selects the first stream to emit, from among all of its input streams, and sticks with that stream, ignoring all of the others. In the following, the second stream is the first to pump, so the result selects that stream and stays with it.

在第一个流中向左滑动“20”,使得顶部的流成为首个生产者,这样产生了一个改变的输出:
Sliding the “20” in the first stream over to the left makes the top stream the first producer, thereby producing an altered output:

这个是十分有用的例子,如果你有一个需要附加到提供者的过程,可能访问多个消息主题,或许是Bloomberg和Reuters,并且你不关心你要停留并在哪个上面继续处理。
This is useful for example if you have a process that needs to attach to a feed, perhaps reaching to several message topics or say Bloomberg and Reuters, and you don’t care which, you just need to get the first and stay with it.

Tick Tock

Now we have the tools to combine timed streams to produce a meaningful hybrid. In the next example we consider a feed that pumps every second during the week, but to save CPU only pumps every three seconds during the weekend. We can use that hybrid “metronome” to produce market data ticks at the desired rate.

First let’s create a boolean method that checks the current time and returns true for weekend and false for weekday:

  1. private static boolean isSlowTickTime() {
  2. return LocalDate.now().getDayOfWeek() == DayOfWeek.SATURDAY ||
  3. LocalDate.now().getDayOfWeek() == DayOfWeek.SUNDAY;
  4. }

For the purposes of those readers following along in an IDE, who may not want to wait until next weekend to see it working, you may substitute the following implementation, which ticks fast for 15 seconds and then slow for 15 seconds:

  1. private static long start = System.currentTimeMillis();
  2. public static Boolean isSlowTickTime() {
  3. return (System.currentTimeMillis() - start) % 30_000 >= 15_000;
  4. }

Let’s create two Observables, fast and slow, then apply filtering to schedule and merge them.

We will use the Observable.interval operation, which generates a tick every specified number of time units (counting sequential Longs beginning with 0.)

  1. Observable<Long> fast = Observable.interval(1, TimeUnit.SECONDS);
  2. Observable<Long> slow = Observable.interval(3, TimeUnit.SECONDS);

fast will emit an event every second, slow will emit every three seconds. (We will ignore the Long value of the event, we are only interested in the timings.)

Now we can produce our syncopated clock by merging those two observables, applying a filter to each that tells the fast stream to tick on the weekdays (or for 15 seconds), and the slow one to tick on the weekends (or alternate 15 seconds).

  1. Observable<Long> clock = Observable.merge(
  2. slow.filter(tick-> isSlowTickTime()),
  3. fast.filter(tick-> !isSlowTickTime())
  4. );

Finally, let’s add a subscription to print the time. Launching this will print the system date and time according to our required schedule.

clock.subscribe(tick-> System.out.println(new Date()));
You will also need a keep alive to prevent this from exiting, so add a

  1. Thread.sleep(60_000)

to the end of the method (and handle the InterruptedException).
Running that produces

  1. Fri Sep 16 03:08:18 BST 2016
  2. Fri Sep 16 03:08:19 BST 2016
  3. Fri Sep 16 03:08:20 BST 2016
  4. Fri Sep 16 03:08:21 BST 2016
  5. Fri Sep 16 03:08:22 BST 2016
  6. Fri Sep 16 03:08:23 BST 2016
  7. Fri Sep 16 03:08:24 BST 2016
  8. Fri Sep 16 03:08:25 BST 2016
  9. Fri Sep 16 03:08:26 BST 2016
  10. Fri Sep 16 03:08:27 BST 2016
  11. Fri Sep 16 03:08:28 BST 2016
  12. Fri Sep 16 03:08:29 BST 2016
  13. Fri Sep 16 03:08:30 BST 2016
  14. Fri Sep 16 03:08:31 BST 2016
  15. Fri Sep 16 03:08:32 BST 2016
  16. Fri Sep 16 03:08:35 BST 2016
  17. Fri Sep 16 03:08:38 BST 2016
  18. Fri Sep 16 03:08:41 BST 2016
  19. Fri Sep 16 03:08:44 BST 2016
  20. . . .

You can see that the first 15 ticks are a second apart, followed by 15 seconds of ticks that are three seconds apart, in alternation as required.

Attaching to an existing feed

This is all very useful for creating Observables from scratch to pump static data. But how do you attach an Observable to an existing feed, so that you can leverage the reactive flow control and stream manipulation strategies?

RxJava2 introduced some new classes we should become acquainted with before proceeding.

Cold and Hot Observables and Flowables

In previous RxJava versions Observable was equipped with flow control methods, even for small streams where it would be irrelevant. To conform with the reactive specification RxJava2 removes flow control from the Observable class and introduces the Flowable class, essentially an Observable that provides flow control.

Cold observables are what we have been discussing until now. They provide static data, although timing may still be regulated. The distinguishing qualities of cold observables is that they only pump when there is a subscriber, and all subscribers receive the exact same set of historical data, regardless of when they subscribe. Hot observables, in contrast, pump regardless of the number of subscribers, if any, and generally pump just the latest data to all subscribers (unless some caching strategy is applied.) Cold observables can be converted to hot by performing both of the following steps:

  1. Call the Observable’s publish method to produce a new ConnectableObservable
  2. Call the ConnectableObservable's connect method to start pumping.

This works but does not support any flow control. In general we will prefer to connect to existing long running feeds using a Flowable, using a parallel syntax to Observables, except providing backpressure controls.

1a. Call the Flowable’s publish method to produce a new ConnectableFlowable
2a. Call the ConnectableFlowable's connect method to start pumping.

To attach to an existing feed, you could (if you felt so inclined) add a listener to your feed that propagates ticks to subscribers by calling their onNext method on each tick. Your implementation would need to take care to ensure that each subscriber is still subscribed, or stop pumping to it, and would need to respect backpressure semantics. Thankfully all of that work is performed automatically by Flowable’s create method. For our example, let’s assume we have a SomeFeed market data service that issues price ticks, and a SomeListener method that listens for those price ticks as well as lifecycle events. There is an implementation of these on GitHub if you’d like to try it at home.

Our feed accepts a listener, which supports the following API:

  1. public void priceTick(PriceTick event);
  2. public void error(Throwable throwable);

Our PriceTick has accessors for date, instrument, and price, and a method for signalling the last tick:

Let’s look at an example that connects an Observable to a live feed using a Flowable:

  1. 1 SomeFeed<PriceTick> feed = new SomeFeed<>();
  2. 2 Flowable<PriceTick> flowable = Flowable.create(emitter -> {
  3. 3 SomeListener listener = new SomeListener() {
  4. 4 @Override
  5. 5 public void priceTick(PriceTick event) {
  6. 6 emitter.onNext(event);
  7. 7 if (event.isLast()) {
  8. 8 emitter.onComplete();
  9. 9 }
  10. 10 }
  11. 11
  12. 12 @Override
  13. 13 public void error(Throwable e) {
  14. 14 emitter.onError(e);
  15. 15 }
  16. 16 };
  17. 17 feed.register(listener);
  18. 18 }, BackpressureStrategy.BUFFER);
  19. 19 flowable.subscribe(System.out::println);
  20. 20

This is taken almost verbatim from the Flowable Javadoc; here is how it works - the Flowable wraps the steps of creating a listener (line 3) and registering to the service (line 17). Subscribers are automatically attached by the Flowable. The events generated by the service are delegated to the listener (line 6). Line 18 tells the Observer to buffer all notifications until they are consumed by a subscriber. Other backpressure choices are:

BackpressureMode.MISSING to apply no backpressure. If the stream can’t keep up, may throw a MissingBackpressureException or IllegalStateException.

BackpressureStrategy.ERROR emits a MissingBackpressureException if the downstream can't keep up.

BackpressureStrategy.DROP Drops the incoming onNext value if the downstream can't keep up.

BackpressureStrategy.LATEST Keeps the latest onNext value and overwrites it with newer ones until the downstream can consume it.

All of this produces a cold Flowable. As with any cold observable, no ticks would be forthcoming until the first observer subscribes, and all subscribers would receive the same set of historical feeds, which is probably not what we want.

To convert this to a hot observable so that all subscribers receive all notifications as they occur in real time, we must call publish and connect, as described earlier:

  1. 21 ConnectableFlowable<PriceTick> hotObservable = flowable.publish();
  2. 22 hotObservable.connect();

Finally, we can subscribe and display our price ticks:

  1. 23 hotObservable.subscribe((priceTick) ->
  2. 24 System.out.printf("%s %4s %6.2f%n", priceTick.getDate(),
  3. 25 priceTick.getInstrument(), priceTick.getPrice()));

关于本文作者

Victor Grazi是InfoQ的Java版块主编。在2012年成为Java冠军程序员(Oracle Java Champion)之后,Victor就在Nomura Securities从事核心平台工具相关的工作,同时也担任技术咨询和Java布道师。他还经常在技术会议上做演讲。Victor还负责着名为“Java Concurrent Animated”和“Bytecode Explorer”开源项目。

查看英文原文:RXJava2 by Example

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注