@feuyeux
2015-10-24T23:31:43.000000Z
字数 21909
阅读 8384
有时,异步可以为REST服务带来更强大的功能、提高服务的性能,及提升用户体验。本章将详述异步机制和异步通信,以及基于JAX-RS2标准的实践。首先我们将一起思考,异步机制到底有什么用处,随后详述JAX-RS2定义的异步请求处理规范。在异步机制的理论和实践之后,我们分别讲述基于HTTP1.1协议和HTML5的异步通信。
在涉及性能的话题中,我们通常会考虑启用异步机制。那么异步在何时可以提高性能呢?启用异步机制的前提是同步运行时资源存在空置。
接下来,我们从服务器和客户端两个角度来思考引入异步要解决的问题,以及如何来解决。
服务器端使用异步机制的主要目的是将“处理连接”与“处理请求”解耦。
对于服务器而言,如果处理连接的线程被一个需要较长时间才能处理完毕的任务阻塞,那么服务器处理连接的能力就会下降,而此时服务器的资源很有可能是空闲的。此时,我们考虑将处理连接和处理请求任务解偶,处理连接的线程接收请求后,将其分派给处理请求任务的线程。这样以来,即使任务需要较长时间才能完成,处理连接的线程也无需阻塞等待了,服务器因此可以重用连接线程,从而提供更高的吞吐率。处理请求的线程相对于处理连接的线程,是异步执行的,当任务结束后,服务器会从上下文中找到当前连接,并将处理结果返回,作为该连接请求的响应。
举个例子。比如一个小邮局有5个窗口,如果每个窗口的工作人员都要全程处理顾客的邮寄业务,那么小邮局的最大吞吐能力为5个,一旦有一个窗口受理的业务非常耗时,吞吐量就减少一个。
如果每个窗口的接待人员只负责理解顾客业务并将其分类分派给邮局的其他工作人员具体处理,那么邮局中处理业务的人员与顾客之间的沟通不会阻碍窗口接待人员,窗口的吞吐能力会一直保持在5个。
我们进一步来看这个场景。处理任务的线程在任务处理过程中是同步阻塞的,如果任务是可分解的,那么我们可以考虑使用多线程来同时分别处理分解后的任务,最后将其合并。这样以来,处理任务的线程在任务处理过程中,就处于异步阻塞状态,其所用时间由分解后任务中最长的子任务处理时间决定。
当受理的业务是要邮寄由多个小包装组成的包裹时,假设会过来好几个工作人员帮忙,一个确认小包装的重量,一个找来了大包装,一个准备好胶带,还有一个帮助确认邮寄地址,很快一个标有邮寄地址、重量,并确定邮寄费用的大包装就搞定了。
从这个角度上看,异步设计的确可以提高服务器的性能。但是,并非异步就一定能提高性能。因为,异步可以提高性能的前提:
如果小邮局的工作人员只有6个人,开5个窗口就很难再分派受理业务了,因为窗口里面只剩余一个人,他无法一次处理5个业务,顾客只能默默地等待了。
对于客户端而言,无论服务器是同步还是异步,如果客户端需要等待请求响应后,才处理其他事情(比如浏览器的行为是渲染HTML页面),那么客户端的行为都是同步且阻塞的。
我们还举上面这个例子。小邮局的顾客排起了长队,一个接一个地办理业务,虽然每个窗口都采用了分派的方式来受理邮寄业务,但对于每个队伍而言,大家还得等前一个被受理才能走近窗口办理业务。
为了让客户端的流程不受服务器端处理过程的阻塞,可以在客户端启用异步机制。在请求发出前,先注册事件通知(使用观察者模式实现的回调机制),请求发出后,流程继续执行而不等待。当响应到达后,客户端处理响应信息,更新状态。
于是,小邮局启用了叫号机制,顾客到了小邮局的第一件事就是按业务拿号,然后无需排队,可以忙一会儿自己手头的事情,待叫号的时候,走近窗口办理业务就好了。
综上所述,我们可以看到异步机制为服务器和客户端带来的好处。那么,我们在REST中如何实现异步呢?接下来,我们将详述在JAX-RS2中如何实现异步服务和异步请求。
基于4.1节的理论,我们来看看JAX-RS2是如何支持异步的。
在Java领域,Java语言的并发处理,Java SE中5.0是一个里程碑;而Java EE的并发支持,Java EE 7.0是一个重要的版本。容器级别上有了对并发的支持,客户端等待服务器的响应就可以由一个Future实现,感觉上就像Java SE开发中,等待同一个JVM的另一个线程一样。在JAX-RS2的异步实现过程中,线程是由容器管理的,这是Java EE 7中JSR236规范定义的功能,读者可以参见《Java EE 7 精粹》一书的第10章。
了解了JAX-RS2异步处理的流程,接下来我们进入实践。
阅读指导
4.2节的示例源代码地址为:
https://github.com/feuyeux/jax-rs2-guide-II/tree/master/4.2.asynchronized
对于JAX-RS2的服务器端,实现异步主要包括两个技术点。一个是资源方法中对AsyncResponse的使用,另一个是对异步机制中CompletionCallback和TimeoutHandler接口的实现。本节我们将通过一个示例来讲述服务器端的异步实现。
假设图书资源要支持全量查询,而这个查询的过程是很耗时的。我们将利用JAX-RS2提供的AsyncResponse,通过一个异步线程来执行查询,在查询完成后,由这个异步线程完成对请求的响应。
我们首先来看这个支持异步的图书资源类AsyncResource,其资源地址定义为books。示例代码如下。
@Path("books")
public class AsyncResource {
private final ExecutorService threadPool = Executors.newFixedThreadPool(10);
@GET
/** 关注点1 **/
public void getAll(@Suspended final AsyncResponse asyncResponse) {
configResponse(asyncResponse);
/** 关注点2 **/
threadPool.submit(new BatchRunner(asyncResponse));
}
class BatchRunner implements Runnable {
private final AsyncResponse response;
public BatchRunner(AsyncResponse asyncResponse) {
this.response = asyncResponse;
}
@Override
public void run() {
try {
Books books = queryAll();
/** 关注点3 **/
response.resume(books);
} catch (InterruptedException e) {
log.error(e);
}
}
private Books queryAll() throws InterruptedException {
Books books = new Books();
for (int i = 0; i < ThreadLocalRandom.current().nextInt(5, 10); i++) {
Thread.sleep(500);
Book book = new Book(i + 10000l, "Java RESTful Web Services", "华章");
log.debug(book);
books.getBookList().add(book);
}
return books;
}
}
}
在这段代码中,GET方法getAll()用于处理全量查询(通过queryAll()方法来模拟一个耗时的处理场景)。该方法的参数是异步响应类AsyncResponse的实例(习惯上,位于第一位的参数是上下文环境变量参数,如果有业务参数,应当置于其后),使用注解@Suspended来标识,见关注点1。我们将耗时的查询交由一个异步线程执行,见关注点2。当查询执行结束,异步响应实例的resume()方法被调用,请求处理线程被唤醒,返回值将作为resume()方法的参数响应给客户端,见关注点3。
除了以上3个关注点,我们会发现getAll包含了configResponse这样一个方法。该方法是用于定义回调的。接下来我们来看看回调的具体实现。
CompletionCallback是JAX-RS2定义的用于处理异步完成的接口。当请求处理完成时,CompletionCallback实例的onComplete()方法会被回调。实现onComplete方法,可以监听请求处理完成事件并实现相关业务流程。CompletionCallback的实现作为AsyncResource的register()方法的参数来配置,这样配置后,AsyncResource实例会在resume()被调用后执行回调方法onComplete。示例代码如下。
asyncResponse.register(new CompletionCallback() {
@Override
public void onComplete(Throwablethrowable) {
if (throwable == null) {
LOGGER.info("CompletionCallback-onComplete: OK");
} else {
LOGGER.info("CompletionCallback-onComplete: ERROR: " + throwable.getMessage());
}
}
});
相应的Java8 Lamda形式的实现如下,余下的两个回调我们直接使用Lamda形式。
asyncResponse.register((CompletionCallback) throwable -> {
if (throwable == null) {
log.info("CompletionCallback-onComplete: OK");
} else {
log.info("CompletionCallback-onComplete: ERROR: " + throwable.getMessage());
}
});
ConnectionCallback是JAX-RS2定义的连接断开的接口,当请求-响应模型的连接断开时,ConnectionCallback实例的onDisconnect()方法会被回调。实现onDisconnect方法可以监听连接断开事件并实现相关业务,比如主动唤醒AsyncResource实例并设置HTTP状态码为410、客户端请求资源不可用(Response.Status.GONE)来完成响应。ConnectionCallback的实现可以作为AsyncResource的register()方法的参数来配置。示例代码如下。
asyncResponse.register((ConnectionCallback) disconnected -> {
log.info("ConnectionCallback-onDisconnect");
//Status.GONE=410
disconnected.resume(Response.status(Response.Status.GONE).entity("disconnect!").build());
});
TimeoutHandler是JAX-RS2定义的超时处理器接口,用于处理异步响应类超时事件。当预期的超时时间到达后,TimeoutHandler实例的handleTimeout()方法会被调用。实现handleTimeout方法可以监听超时事件并处理相关业务,比如主动唤醒AsyncResource实例,并设置HTTP状态码为503、服务器端服务不可用(Response.Status.SERVICE_UNAVAILABLE)来完成响应。TimeoutHandler的实现可以作为AsyncResource的setTimeoutHandler()方法的参数来配置。AsyncResource的setTimeout()方法用于设置超时时间,默认情况下AsyncResource永不超时。示例代码如下。
asyncResponse.setTimeoutHandler(asyncResponse0 -> {
log.info("TIMEOUT");
//Status.SERVICE_UNAVAILABLE=503
asyncResponse0.resume(Response.status(Response.Status.SERVICE_UNAVAILABLE).entity("Operation time out.").build());
});
asyncResponse.setTimeout(TIMEOUT, TimeUnit.SECONDS);
相应地,JAX-RX为客户端提供了用于执行异步请求的API。开发者使用这套API可以轻松地实现对服务器端的异步请求。
首先我们来实现基本的异步请求,示例代码如下。
@Test
public void testAsync() throws InterruptedException, ExecutionException {
final Invocation.Builder request = target("http://localhost:" + this.port + "/books").request();
/** 关注点1 **/
final AsyncInvoker async = request.async();
final Future<Books> responseFuture = async.get(Books.class);
long beginTime = System.currentTimeMillis();
try {
/** 关注点2 **/
Books result = responseFuture.get(AsyncResource.TIMEOUT + 1, TimeUnit.SECONDS);
log.debug("Testing result size = {}", result.getBookList().size());
} catch (TimeoutException e) {
/** 关注点3 **/
log.debug("Fail to request asynchronously", e);
} finally {
log.debug("Elapsed time = {}", System.currentTimeMillis() - beginTime);
}
}
在这段代码中,客户端使用AsyncInvoker接口的get()方法提交异步请求,见关注点1。该方法返回Future接口的实例,客户端线程可以以非阻塞的方式处理其他业务流程,然后调用Future的get()方法获取服务器处理结果,见关注点2。如果在指定的时间内,服务器没有响应,将报TimeoutException异常,我们可以在异常捕获中实现超时处理,见关注点3。
客户端亦可以实现异步调用的回调。在AsyncInvoker接口的get()方法中,定义InvocationCallback接口的实例,即可实现对REST请求的回调处理。示例代码如下。
@Test
public void testAsyncCallBack() throws InterruptedException, ExecutionException {
final AsyncInvoker async = target("http://localhost:" + this.port + "/books").request().async();
final Future<Books> responseFuture = async.get(new InvocationCallback<Books>() {
@Override
/** 关注点1 **/
public void completed(Books result) {
log.debug("On Completed: " + result.getBookList().size());
}
@Override
/** 关注点2 **/
public void failed(Throwable throwable) {
log.debug("On Failed: " + throwable.getMessage());
throwable.printStackTrace();
}
});
log.debug("First response time::" + System.currentTimeMillis());
try {
responseFuture.get(AsyncResource.TIMEOUT + 1, TimeUnit.SECONDS);
} catch (TimeoutException e) {
log.debug("", e);
} finally {
log.debug("Second response time::" + System.currentTimeMillis());
}
}
在这段代码中,completed()方法用于监听并处理REST调用成功事件,见关注点1。failed()方法用于监听并处理REST调用失败事件,见关注点2。
最后,我们启动示例服务,然后在终端/控制台录入如下cURL命令来测试REST异步处理。示例中的这个资源方法模拟实现了一个耗时的全量图书查询过程,预期的返回值为全部图书资源的详情。
在示例项目根目录执行run.sh,启动REST服务,示例如下。
pwd
/Users/erichan/sourcecode/jax-rs2-guide-II/samples/5.synchronized/asynchronized
./run.sh
使用cURL命令请求服务,示例如下。
curl :8080/books
{
"book": [
{
"bookId": 10000,
"bookName": "Java RESTful Web Services",
"publisher": "华章"
},
{
"bookId": 10001,
"bookName": "Java RESTful Web Services",
"publisher": "华章"
},
{
"bookId": 10002,
"bookName": "Java RESTful Web Services",
"publisher": "华章"
},
{
"bookId": 10003,
"bookName": "Java RESTful Web Services",
"publisher": "华章"
},
{
"bookId": 10004,
"bookName": "Java RESTful Web Services",
"publisher": "华章"
},
{
"bookId": 10005,
"bookName": "Java RESTful Web Services",
"publisher": "华章"
}
]
}
本章的前面两节从异步机制,讲述了启用异步的意义和在REST中对异步的实现。本节我们从服务器和客户端的通信角度,全面介绍基于HTTP1.1协议的异步通信方案。
服务器-浏览器通信技术的第一种解决方案是客户端轮询技术,即Polling。
客户端轮询技术(Polling)相对其他方案,最原始、易行。即浏览器周期性地主动访问服务器的特定地址,以获取服务器端数据状态的变化。通常,在浏览器端使用JavaScript脚本启动一个定时任务,该任务向服务器发送请求并获取资源状态。如果服务器端特定数据发生变化,会将变化信息响应给客户端,客户端使用响应的数据渲染界面,为用户做出及时的反馈。
Polling异步通信方案可以结合HATEOAS和Web Link技术,将服务器端的轮询状态地址返回给客户端。如图4-1所示,服务器会在接收请求后立即(以HATEOAS或者Web Link技术)返回给客户端一个查询处理结果的资源地址,并结束这一次的请求-响应流程,HTTP连接关闭,HTTP状态码为202(注意:不是HTTP状态200 OK,202代表服务器已接受请求但尚未处理)。客户端通过轮询机制,向新的REST地址发起请求并获得该处理的进度状态(完成状态为HTTP状态码100,如果请求过期或者资源地址错误则HTTP状态码为404 找不到),并最终在获取处理完毕信息后结束轮询。
图4-1 客户端轮询实现异步通信示意图
优点:这种解决方案比起同步处理的优点是客户端可以即时得到服务器的反馈,并在获得最终结果之前,有机会处理后续业务。另外,Polling技术不需要对服务器和客户端使用额外的第三方支持包,开发者容易理解、使用现有技术和工具就可以实现。客户端轮询技术对设计没有注入性污染。选择技术架构和设计、实现业务逻辑时这种方式可以即插即拔,不会污染业务平台中结构性的代码。
缺点:客户端轮询技术的缺点是显而易见的,轮询中的每次请求-响应的过程都需要建立新的HTTP连接并在结束时关闭该连接。这就造成两大问题。
第一,如果服务器端的业务数据在两次定时任务发起的请求过程中没有变化,后一次请求的做功实际为负数—浪费了服务器端的带宽、没有获得有效负载。
第二,也是最让开发者纠结的痛点,即浏览器端的定时器间隔时间参数的设置。由于需要及时获取服务器端的业务数据的状态,这个定时间隔参数设置不宜过长;但是过短又会频发第一个问题。因此,间隔时间的设置是个尴尬的坑,因为在编码和调试阶段定义并运行完好的参数,很难和生产环境吻合,甚至开发阶段有可能疏漏或无法覆盖到全部生产环境中的业务场景。还有一个让人难受的地方是,这种请求的代码很难抽象出来,因为不同业务的定时间隔都是一个独立的经验值。
Comet是反向Ajax的技术集,包括长轮询(Long Polling)和流(Streaming)两种技术实现。
什么是反向Ajax呢?要了解反向Ajax,我们先从Ajax说起。
Ajax技术是指从浏览器端向服务器端发起的异步请求,已经烂大街,5.1.1节所述的Polling就是Ajax的实践。概括地说就是,浏览器发起的请求是通过脚本实现的,页面并没有提交或者跳转,请求由服务器处理并返回响应后,浏览器处理响应数据并将这一变化渲染到HTML中的DOM树,HTML页面的标签值得到了更新,实现了页面的局部刷新。
反向Ajax(Reverse Ajax)技术从请求方向看并没有做到反向,因为基于请求-响应模式下的HTTP请求本质上无法做到反向。这个“反向”是从实现结果上看的,即从服务器端(通过保持连接的HTTP通道)向客户端发送数据,以实现低延迟地通知客户端的技术。反向Ajax技术是服务器-浏览器通信技术的第二种解决方案,其底层实现依赖于HTTP连接不能断开这一前提条件。长轮询(Long Polling)和流(Streaming)技术是反向Ajax的两种技术手段,通信原理相同,如图4-2所示。
图4-2 长轮询示意图
如图4-2所示,长连接通过keepAlive使HTTP连接得以保持。为什么要保持连接呢?因为在请求发出后的一定时间内,服务器一直没有做出响应,该连接会因连接超时而断开。Comet利用HTTP1.1的keepAlive持久性连接技术,在浏览器发出请求后,通过keepAlive保持服务器向浏览器做出响应的通信。这样一来,就解决了连接超时断开的问题。那么,连接的关闭就只有两种情况,一种是浏览器主动断开,一种是服务器端特定数据发生变化,并将这一信息响应给浏览器,主动断开连接完成请求-响应模式的一次请求。
实现Comet比起Polling要困难得多,服务器端和浏览器端都需要第三方的库来支持这一技术。Atmosphere库和CometD库是实现Comet技术的第三方工具包,Jersey自身并没有提供支持Comet实现的包,而是将其交由Servlet容器来支持。本书不再对Comet的实现做进一步讨论。但读者要清楚Comet技术是这个领域的一个选择。
**优点:**Comet解决方案使异步通信可以在一次请求-响应模型中完成。反向Ajax的技术解决了Polling低效地消耗服务器的网络带宽和系统负载的缺点。同时,由于服务器主动向浏览器发送数据,因此有很好的低延迟性。
**缺点:**Comet需要服务器端额外的技术实现来支持,同时需要在服务器和浏览器两端引入第三方工具包。实现相对复杂。
Web Hook解决方案是指在客户端发送请求时,将一个回调地址同时发送给服务器,服务器接收响应后,异步处理请求并对本次请求即刻做出响应,客户端随即处理其他业务并监听回调。服务器端在响应客户端后,继续以异步的方式处理方才的请求,在处理完毕后通过回调地址通知客户端处理结果,如图4-3所示。
图4-3 Web Hook实现异步通信示意图
**优点:**Web Hook解决方案具备Polling方案的优点,易于实现,无需引入第三方技术,并且没有Polling方案中无效的轮询负载。
**缺点:**Web Hook这种方案无法在浏览器作为客户端的场景中实施,因为浏览器无从提供一个回调地址给服务器。因此,该方案适用于另外一个服务器做客户端的场景。另外,与随后介绍的SSE相比,这种解决方案还是多出了一次服务器回调客户端的HTTP连接,并且客户端回调线程和请求线程之间,存在多线程问题,并且需要具备相应的状态监控机制,需要开发者留意。
SSE(Server-Sent Events)是HTML5技术集的一部分,定义了服务器推送技术的标准规范。
SSE规范的地址是http://dev.w3.org/html5/eventsource。其核心是基于EventSource接口的事件监听机制,包括onopen、onmessage和onerror三个事件监听器。SSE服务器端响应数据的媒体类型(Content-Type)是text/event-stream。Jersey的媒体库提供了对SSE的支持,详述参见4.4节。
**优点:**SSE是标准规范--HTML5标准之一,具备编程语言的无关性。首先SSE支持跨语言开发,无论具体使用什么语言和框架,只要按照以EventSource接口为中心,完成事件监听机制即可实现SSE。其次SSE支持跨语言的调用,这点很好理解,基于标准接口和标准事件监听,第七层协议上的HTTP包很容易被各系统彼此阅读。SSE的代码实现和交互逻辑相对简单,在Java EE生态环境中,得到了Jersey提供的支持。
缺点:由于请求-响应模型的限制,SSE和Comet一样,是一种从服务器端到浏览器端的单向通信,浏览器无法在同一条连接上做出二次请求或者对服务器的响应做出“响应”,这一缺点无法支持复杂的交互需求。另外,SSE标准和Jersey-sse支持包都还在不断完善中(Jersey提供持续升级的SSE支持包,并不意味着当前版本的不稳定)。
从4.3节中介绍的4种服务器端推送技术中,我们清楚了服务器-浏览器异步通信是什么以及基本实现原理。我们研究异步通信的主旨是提高REST服务的性能,而HTML5技术栈的服务器端推送事件(SSE)技术,正是其中最佳的方案。本节将详细讲述如何在REST服务中,实现SSE技术。
阅读指导
4.4节的示例源代码地址为:
https://github.com/feuyeux/jax-rs2-guide-II/tree/master/4.4.sse
Jersey的SSE支持包jersey-media-sse,基于HTML5的SSE规范,提供了一套支持SSE规范的完整的API。Maven坐标定义如下。
<dependency>
<groupId>org.glassfish.jersey.media</groupId>
<artifactId>jersey-media-sse</artifactId>
</dependency>
Jersey的SSE支持包提供两种通信模式,发布-订阅模式和广播模式。前者是一种端到端的通信,后一种是多播通信。接下来我们分别讲述这两种模式。
发布-订阅模式的API和典型的工作流程见图4-4。
图4-4 Jersey实现SSE流程示意图
根据SSE标准规范定义的EventSource接口,Jersey SSE定义了EventListener接口及其实现类EventSource。SSE的实现流程描述如下:
第一步,在客户端创建EventSource实例并覆盖onEvent()方法。该方法用于处理服务器端推送的事件,输入参数为InboundEvent(进站事件)。EventSource的构造函数包含一个输入参数,是WebTarget端点,指向服务器端的资源路径为sse的GET方法,服务器端的这个资源方法会使用注解@Produces(SseFeature.SERVER_SENT_EVENTS)。如图8-4中1所示,在EventSource实例化过程中,客户端会从这个端点向服务器发出请求,并在请求头中指定接收数据的媒体类型为SseFeature.SERVER_SENT_EVENTS,即Accept头信息声明为text/event-stream类型。
第二步,请求被服务器接收后,开启SSE事件通信通道,方向是从服务器端向客户端,并返回响应给客户端。此时,HTTP连接被保持着,并没有关闭,如图8-4中2所示。
此时,在SSE事件通信通道的客户端一端,会建立EventInput信道,用于读取InboundEvent(进站事件)。EventInput类继承自ChunkedInput,ChunkedInput允许将数据在一条信道中分次传输;EventInput类的泛型类型为InboundEvent,即上述的onEvent方法待处理的类型。当进站事件到达时,MessageBodyReader的SSE实现类InboundEventReader会解析数据,并反序列化为InboundEvent类型的数据。
同时,在SSE事件通信通道的服务器一端,会建立EventOutput信道,用于写入OutboundEvent(出站事件)。EventOutput类继承自ChunkedOutput,ChunkedOutput允许在发送出站事件后,HTTP连接通过HTTP1.1的Keep-Alive保持连接,出站事件被写入HTTP响应头后,EventOutput信道将等待更多的服务器推送事件。出站事件的序列化写入,由MessageBodyWriter的SSE实现类OutboundEventWriter完成。
接下来的三个步骤就是在这个信道上完成的。
第三步,客户端向服务器发送POST请求,如图8-4中3所示。
第四步,服务器端接收后,会向EventOutput信道写入数据,如图8-4中4所示。
最后一步,客户端监听到信道中有数据到达,将读取并处理推送事件,如图8-4中5所示。
到此,服务器推送事件的流程走完一遍。信道的连接可以由服务器主动关闭,或者由客户端请求关闭。
通过上述的发布-订阅模式,我们对Jersey的SSE支持包和通信机制有了几乎全面的了解。广播模式与发布-订阅模式相比,客户端的实现相同,服务器端推送事件的写入,由端到端的EventInput信道,换成了多点广播类SseBroadcaster。SseBroadcaster类继承自Broadcaster类,泛型类型为OutboundEvent(出站事件)。SseBroadcaster类提供了一次关闭多点信道的方法closeAll(),可以根据业务需要,在完成广播事件后执行。
到此,Jersey的SSE支持包(如图8-5所示)中的成员类都已经一一介绍。接下来将讲述如何使用Jersey的SSE支持包来实践上述两种模式。
图8-5 Jersey的SSE实现包示意图
我们的REST应用实践,可以概括为2个环节。第一个环节是基于本章上述理论开发REST入口类Application、资源类以及单元测试类;第二个环节是集成测试,以验证资源方法的功能。
首先,我们介绍支持SSE的REST应用入口类AirResourceConfig,该类是Application类的子类。示例代码如下。
/** 关注点1:为SSE定义资源路径 **/
@ApplicationPath("/event/*")
public class AirResourceConfig extends ResourceConfig {
public AirResourceConfig() {
/** 关注点2:注册Feture和资源类 **/
super(
SseFeature.class,
AirSsePubSubResource.class,
AirSseBroadcastResource.class
);
}
}
在这段代码中,定义了支持SSE的REST服务根资源路径:/event/*,见关注点1。在构造方法中,我们手动注册了SseFeature类(在jersey2.8版本之后可以自动探测),用以标识我们的REST服务支持SSE特征。同时注册了发布-订阅资源类AirSsePubSubResource和广播资源类AirSseBroadcastResource,见关注点2。
我们分别讲述这两个资源类,先来看用于支持发布-订阅模式的资源类AirSsePubSubResource。该类实现了上述的发布-订阅模式流程的服务器端代码,示例代码如下。
/** 关注点1:为发布-订阅模式定义资源路径 **/
@Path("pubsub")
public class AirSsePubSubResource {
private static EventOutputeventOutput = new EventOutput();
@GET
/** 关注点2:提供SSE事件输出通道的资源方法 **/
@Produces(SseFeature.SERVER_SENT_EVENTS)
public EventOutputpublishMessage() throws IOException {
return eventOutput;
}
@POST
public void saveMessage(String message) throws IOException {
/** 关注点3:执行业务逻辑 **/
log.info("What the client post: {}", message);
/** 关注点4:写入SSE通道的资源方法 **/
eventOutput.write(new OutboundEvent.Builder()
.id(System.nanoTime() + "")
.name("post-message")
.data(String.class, message).build());
}
}
在这段代码中,资源地址定义了发布-订阅模式的资源地址为“pubsub”,见关注点1。GET方法用于公布SSE通信通道,POST方法用于处理业务和写入SSE事件,两者逻辑上是先后被调用的关系。publishMessage方法公布了推送事件输出通道的接口,返回值是前述的EventOutput类型的推送事件输出信道,见关注点2。POST方法saveMessage()用于接收并处理客户端提交的信息,见关注点3,最后并响应信息写入推送事件输出信道,根据HTML5的SSE规范,出站事件OutboundEvent的数据结构包含三个主要信息:id、name和data,关注点4。
服务器端的代码已经完成,我们再通过一个单元测试类SsePubSubTest,实现客户端的代码。SsePubSubTest类继承了Jersey的测试框架类JerseyTest(参见第6章),省去了样板测试代码。示例代码如下。
@Test
public void testEventSource() throws InterruptedException, URISyntaxException {
final CountDownLatch latch = new CountDownLatch(testCount);
/** 关注点1:构建EventSource实例 **/
final EventSource eventSource = new EventSource(target().path(ROOT_PATH)) {
private int i;
@Override
public void onEvent(InboundEvent inboundEvent) {
try {
/** 关注点2:监听事件并处理 **/
String data = inboundEvent.readData(String.class);
log.info("What the server response: {}:{}:{}",
inboundEvent.getId(),
inboundEvent.getName(),
data);
Assert.assertEquals(messagePrefix + i++, data);
latch.countDown();
} catch (ProcessingException e) {
e.printStackTrace();
}
}
};
for (int i = 0; i < testCount; i++) {
target().path(ROOT_PATH).request().post(Entity.text(messagePrefix + i));
}
try {
latch.await();
} finally {
eventSource.close();
}
}
在这段代码中,集成测试方法testEventSource()模拟了客户端监听并处理SSE事件,并检验了SSE发布-订阅模式的功能和流程。
首先,客户端创建了EventSource实例,并通过该实例请求服务器端的GET方法,以获得SSE事件输出信道,见关注点1。在EventSource接口的实现中,onEvent方法中做了三件事:第一是打印输出服务器推送事件的内容,内容包括前面讲述的SSE规范定义的数据结构:id、name和data。第二是使用相等断言测试服务器端返回数据是否符合预期。第三是调用同步器CountDownLatch实例的countDown方法(原因见下文),见关注点2。
测试代码中使用CountDownLatch的原因是在测试流程中,通过GET订阅服务器SSE事件通知和通过POST发送数据是两个异步操作,而我们期待的顺序是在执行完POST代码块后,主线程不退出,以便前面代码块中定义的监听和处理推送事件的onEvent()方法可以被执行。为此,我们引入CountDownLatch实例并调用了其await()方法,使得测试主线程被阻塞而不是直接走完退出,以便等待回调被执行。相应地,onEvent()方法的第三步是调用CountDownLatch实例的countDown方法来递减其内部计数器,确保我们期待的消息都被接收后,主线程最终被唤醒以结束整个测试流程。
最后,我们来看一下完整的单元测试输出。
18:26:24.183 AirSsePubSubResource - What the client post: pubsub-0
18:26:24.196 SsePubSubTest - What the server response: 30693005931717:post-message:pubsub-0
18:26:24.199 AirSsePubSubResource - What the client post: pubsub-1
18:26:24.201 SsePubSubTest - What the server response: 30693020207612:post-message:pubsub-1
18:26:24.205 AirSsePubSubResource - What the client post: pubsub-2
18:26:24.207 SsePubSubTest - What the server response: 30693026189883:post-message:pubsub-2
18:26:24.211 AirSsePubSubResource - What the client post: pubsub-3
18:26:24.213 SsePubSubTest - What the server response: 30693032364560:post-message:pubsub-3
18:26:24.218 AirSsePubSubResource - What the client post: pubsub-4
18:26:24.219 SsePubSubTest - What the server response: 30693039008185:post-message:pubsub-4
18:26:24.223 AirSsePubSubResource - What the client post: pubsub-5
18:26:24.224 SsePubSubTest - What the server response: 30693043716912:post-message:pubsub-5
18:26:24.228 AirSsePubSubResource - What the client post: pubsub-6
18:26:24.229 SsePubSubTest - What the server response: 30693048605712:post-message:pubsub-6
18:26:24.233 AirSsePubSubResource - What the client post: pubsub-7
18:26:24.235 SsePubSubTest - What the server response: 30693054086320:post-message:pubsub-7
18:26:24.240 AirSsePubSubResource - What the client post: pubsub-8
18:26:24.241 SsePubSubTest - What the server response: 30693060768748:post-message:pubsub-8
18:26:24.246 AirSsePubSubResource - What the client post: pubsub-9
18:26:24.247 SsePubSubTest - What the server response: 30693066859047:post-message:pubsub-9
集成测试更为普遍的方式是使用cURL命令直接对资源地址进行访问,即时检测资源方法的功能,或者使用shell脚本编写cURL请求。本例将使用cURL命令分别对上述的两种模式的实现进行集成测试。
启动示例服务,然后使用两个终端/控制台测试发布-订阅流程。在第一个终端中录入如下cURL命令,然后等待其输出。这一步请求了GET方法并建立了HTTP连接。
curl -H "Accept: text/event-stream" --url http://localhost:8080/sse/event/pubsub
在第二个终端中依次录入如下cURL命令,以模拟多次提交POST请求的流程。
curl -X POST --data "Java1.5" --url http://localhost:8080/sse/event/pubsub
curl -X POST --data "Java1.6" --url http://localhost:8080/sse/event/pubsub
curl -X POST --data "Java1.7" --url http://localhost:8080/sse/event/pubsub
curl -X POST --data "Java1.8" --url http://localhost:8080/sse/event/pubsub
第一个终端应输出如下信息。展示了服务器端在处理每一次POST请求时,向事件输出通道写入出站事件。
event: post message
id: 22127543063654
data: Java1.5
event: post message
id: 22134553708520
data: Java1.6
event: post message
id: 22141048454287
data: Java1.7
event: post message
id: 22149255904866
data: Java1.8
有了发布-订阅模式的实践,广播模式就很好理解了。广播模式依然按资源类、测试类和集成测试顺序讲述,并省去与发布-订阅相同的信息。
资源类AirSseBroadcastResource用于支持广播模式的SSE。示例代码如下。
/** 关注点1:为广播模式的SSE定义资源路径 **/
@Path("broadcast")
public class AirSseBroadcastResource {
private static final BlockingQueue<BroadcastProcess>processQueue =
new LinkedBlockingQueue<>(1);
@Path("book")
@POST
public Boolean postBook(@DefaultValue("0") @QueryParam("total")int total,
String bookName) {
/** 关注点2:调用BroadcastProcess实例实现广播 **/
final BroadcastProcessbroadcastProcess = new BroadcastProcess(total, bookName);
processQueue.add(broadcastProcess);
Executors.newSingleThreadExecutor().execute(broadcastProcess);
return true;
}
/** 关注点 3:广播处理线程类 **/
static class BroadcastProcess implements Runnable {
……
public void run() {
……
OutboundEvent.BuildereventBuilder = new OutboundEvent.Builder()
.mediaType(MediaType.TEXT_PLAIN_TYPE);
OutboundEvent event = eventBuilder.id(processId + "")
.name("New Book Name")
.data(String.class, bookName).build();
broadcaster.broadcast(event);
在这段代码中,资源地址定义了广播模式的资源地址为“broadcast”,见关注点1。实现广播的核心代码块位于内部线程类BroadcastProcess中,该线程类使用广播类SseBroadcaster实例将SSE事件广播出去,见关注点3。
POST方法postBook()资源地址是broadcast/book用于接收客户端提交的收听广播的客户端数量和最新图书信息。一旦接收数据,该方法会动态生成一个线程类BroadcastProcess的实例,并尝试将其缓存到BlockingQueue实例中。BlockingQueue的add()方法会在尝试入队失败时,直接抛出异常,意味着广播类只缓存最新的一条图书资源,直接失败的处理可以避免客户端请求线程阻塞/挂起,见关注点2。缓存队列会被GET方法消费,篇幅所限没有展示全部代码,读者可以从源代码中查阅。
广播测试类SseBroadcaseTest的示例代码如下。与前一测试非常类似不冗述相同部分,需要额外说明的是,在每一个客户端的GET请求中,生成EventSource类实例的过程是通过EventSource类的内部Builder完成的,因此并没有自动打开SSE信道,因此在注册监听、覆盖onEvent()方法后,需要显式调用open()方法,见关注点1。
readerEventSources[i] = EventSource.target(endpoint).build();
readerEventSources[i].register(new EventListener() {
@Override
public void onEvent(InboundEvent inboundEvent) {
try {
String data = inboundEvent.readData(String.class);
log.info("What the server response: {}:{}:{}",
inboundEvent.getId(),
inboundEvent.getName(),
data);
Assert.assertEquals(newBookName, data);
doneLatch.countDown();
} catch (ProcessingException e) {
log.error("", e);
}
}
});
/** 关注点 1:显式调用open()方法 **/
readerEventSources[i].open();
启动服务,然后根据上述测试场景的逻辑,按照图4-5所示,使用三个终端/控制台测试广播流程。
图4-5 SSE广播测试示意图
在图4-5所示的第一终端ClientA中录入如下cURL命令,其中,数据为jax-rs2-guide;参数total=2,当两个客户端连接到服务器后,向全部客户端发起广播。
curl -X POST --data "jax-rs2-guide" http://localhost:8080/sse/event/broadcast/book?total=2
接下来,在第二个终端ClientB录入如下cURL命令,终端会出现阻塞,不去管它,继续第三个终端ClientC的命令录入。
curl -H "Accept: text/event-stream" http://localhost:8080/sse/event/broadcast/book?clientId=1
在第三个录入如下cURL命令,第二、三个会同时收到服务器的广播消息。
curl -H "Accept: text/event-stream" http://localhost:8080/sse/event/broadcast/book?clientId=2
事件消息中的data信息既是POST提交的内容。
event: New Book Name
id: 33035275182692
data: jax-rs2-guide
HTML5包含了SSE和Web Socket。SSE用于服务器推送事件,但并不仅限于这样使用,本节前述已经展示了其处理异步通信的能力。Web Socket无疑非常适用于服务器推送的场景。WebSocket(RFC 6455)是HTML5技术集的一部分,它提供了一个双向的、在一条TCP信道中的客户端和服务器之间全双工的通信。
WebSocket消除了所有与 HTTP 连接的无状态特性相关的限制。Java EE 7已经支持WebSocket,其参考实现是Glashfish项目的Tyrus子项目(项目地址:https://tyrus.java.net)。本书不再对WebSocket的实现做进一步讲述,推荐读者阅读来自Oracle的布道者Arun Gupta所著的《Java EE 7精粹》一书的第7章WebSocket。
优点:优点是标准规范--HTML5标准之一,逐渐成为流行趋势。功能强大、性能突出:双向、双工通信。
缺点:相对于SSE的实现,WebSocket较为复杂。
最后,笔者在此对互联网通信技术的发展做下简短的憧憬。与WebSocket非常类似的标准是HTTP2(前身是SPDY),两者的共同点是解决HTTP1.1无法双向通信的问题。所不同的是,前者借助HTTP1.1握手,而后者是对HTTP1.1的全面升级。技术日新月异,我们当下讨论的异步通信、服务器推送技术都是基于HTTP1.1协议的,REST服务技术必将会随着新技术的发展而产生新的解决方案。
本章主要的知识点如下: