[关闭]
@delight 2014-05-02T20:34:17.000000Z 字数 13390 阅读 2061

Dolphin Push 快速上手

job push java deploy

Maven

maven是java工程常用的部署工具,一般用来解决依赖问题,当然还有别的功能。

installation

  1. 从source或官方安装,sudo aptitude install maven即可;
  2. 安装eclipse插件,从Install New Software-Available Software中搜索Maven,选择Maven Integration for Eclipse安装即可;也可以使用Eclipse MarketPlace(4.0+)安装;

usage

cometd

项目代码依照的是cometd框架,官方文档见此,请认真学习。下面的是个人笔记。

Bayeux协议

该协议用于客户端和服务器的双向异步通信(架构于HTTP或WebSocket协议之上),这种通信是通过名为隧道(channels)的模型上进行发布/订阅的。

从服务端向客户端push被称为推送,这种技术配合AJAX就组成了CometD项目。该项目的目的就是提供Bayeux协议的多种实现。

CometD架构

整体采用“发布-订阅”模型,这是客户端之间相互通信的方式;

客户端(Client Sessions)这边的会话使用javascript的org.cometd.Cometd对象,或者java中的org.cometd.bayeux.client.ClientSession[或其BayeuxClient子类]。

服务端(Server Sessions)这边使用org.cometd.bayeux.server.ServerSession类或其BayeuxServer子类,二者的联系在/meta/connect/发送后建立。服务端有一个消息队列,用来暂存推送到订阅端的消息。

本地会话(Local Sessions)是web服务器上的Bayeux client,通过org.cometd.bayeux.server.LocalSession实现。想要创建服务端必须有一个对应的客户端,所以这个Local session主要是为了方便某些行为而实现的。

消息处理

Message Processing Model

  1. 客户端通过ClientSession.getChannel(String)取得Channel对象,向里面写消息;消息首先通过客户端的extensions,进行第一次处理;然后消息被送往客户端的transport,该模块负责将消息转化为JSON格式(JSONContext.Client),与server端的transport通信,将消息发送出去;服务端的transport将消息转回普通格式(JSONContext.Server),将消息交给BayeuxServer处理;后者先使用BayeuxServerextensions进行第一次过滤,然后使用ServerSessionextensions进行第二次过滤,随后,调用认证模块检测安全测试和认证子进行第三次过滤。最后,如果是广播或者服务消息,服务器将会使用BayeuxServer进行发送。Server方的listeners拥有最后一次修改消息的权利,随后消息内容被冻结。如果消息是广播消息,服务器负责推送到订阅的客户端。
  2. 服务器收到Bayeux消息后,新建一个线程处理这些消息,server端的listener在这个新线程中被绑定。这意味着server端的listener是按着队列处理消息的,如果处理耗时较长,应该新建一个线程。
  3. 认证策略。继承SecurityPolicy,重载canHandshake方法,最后注册该模块。
  4. 如果client想同某个特定的客户端通信,可以通过/service/** Channel,服务端这边的listener可以从信息里面取出userId等标识字段,然后使用对应的ServerSession.deliver方法进行转发;
  5. Server端可能需要对外部事件在某个特殊channel上做广播作为应答。这时候就需要LocalSession帮忙了,新建一个与之握手,在收到外部事件后,在该server上需要的channel上publish即可;

客户端(JS部分)

客户端 (Java部分)

Java部分的客户端和服务端都是用Jetty框架搭建的。

客户端,如前所述,主要是用org.cometd.bayeux.client.ClientSession接口的实现类BayeuxClient。典型使用如下:

  1. //新建Jetty的http客户端
  2. HttpClient httpClient=new HttpClient();
  3. //启动
  4. httpClient.start();
  5. //选项的字典
  6. Map<String, Object> options=new HashMap<String,Object>();
  7. //长轮询接口
  8. ClientTransport transport=new LongPollingTransport(options,httpClient);
  9. //新建session,绑定该接口;必须知道服务器对应的绝对路径
  10. ClientSession client=new BayeuxClient("http://localhost:8080/cometd",transport);
  11. //握手,异步回调
  12. client.handshake(new ClientSessionChannel.MessageListener()
  13. {
  14. public void onMessage(ClientSessionChannel channel,Message message)
  15. {
  16. if(message.isSuccessful())
  17. {...}
  18. }
  19. });
  20. //也可以通过在调用`handshake()`之前使用
  21. //client.getChannel(Channel.META_HANDSHAKE).addListener
  22. //添加一个新的监听器来达到该目的。
  23. //或者干脆使用同步的方法:
  24. //boolean handshaken = client.waitFor(1000, BayeuxClient.State.CONNECTED);
  25. //订阅channel,必须在握手成功后才能执行
  26. //FooListener实现`ClientSessionChannel.MessageListener`接口
  27. //可以在`subscribe`函数的第二个参数再传入一个回调接口,检测是否订阅成功
  28. static final String CHANNEL="/foo";
  29. ClientSessionChannel.MessageListener fooListener=new FooListener();
  30. client.getChannel(CHANNEL).subscribe(fooListener);
  31. //如果在客户端监听meta通道,可以处理服务器的握手响应
  32. client.getChannel(Channel.META_HANDSHAKE).addListener(new ...)
  33. {...}
  34. //发布信息,填充消息字典data,使用下面的语句在指定channel上发布
  35. client.getChannel("/game/table/1").publish(data);
  36. //你也可以在`publish`的第二个参数上使用一个回调接口来确定publish是否成功
  37. //批量发送,即batch
  38. client.batch( new Runnable()
  39. {
  40. public void run()
  41. {
  42. Map<String, Object> data1 = new HashMap<string,Object>();
  43. client.getChannel("/game/table/1").publish(data1);
  44. Map<String,Object> data2 = new HashMap<String,Object>();
  45. client.getChannel("/game/chat/1").publish(data2);
  46. }
  47. });
  48. //或者你可以使用`StartBatch()`和`endBatch`方法,后者最好在`finally`中调用
  49. //断开连接
  50. client.disconnect(new ClientSessionChannel.MessageListener()
  51. {...}
  52. //也可以使用阻塞形式的waitFor方法

如果使用websocket协议,必须在绑定long-pulling之前绑定websocket,因为后者可能不被支持。

  1. WebSocketClientFactory wsFactory=new WebSocketClientFactory();
  2. wsFactory.start();
  3. ClientTransport wsTransport=new WebSocketTransport(null, wsFactory, null);
  4. //http transport
  5. ...
  6. BayeuxClient client=new BayeuxClient("http://localhost:8080/cometd",wsTransport,httpTransport);
  7. client.handshake();

在options中需要填入各个协议相关参数,用以优化配置。

如果使用maven,客户端的依赖包有:org.cometd.java:cometd-java-client和一个SLF4J的日志库,比如org.slf4j-log4jl2,这样就可以使用long-polling接口了;如果需要使用websocket,则必须再添加org.cometd.java:cometd-websocket-jetty模块。

服务端

  1. 配置BayeuxServer和接口的参数一般在web.xml中进行,作为org.cometd.server.CometdServlet,如果使用maven构建,这里会自动生成的。当然你也可以自己填入相关参数。
  2. 如果需要应用servlet3的相关特性,需要添加async-supported元素并进行配置;
  3. Channel是按需生成的,但是为了防止临时channel过多造成的膨胀,CometD默认每秒扫描一次所有channel,移除不在使用中的。为了可以原子地动态创建channel,可以使用以下方法:
  1. BayeuxServer bayeuxServer=...;
  2. boolean created = bayeuxServer.createIfAbsent("/my/channel",new ServerChannel.Initializer()
  3. {
  4. public void configureChannel(ConfigurableServerChannel channel)
  5. {
  6. channel.setPersistent(true);
  7. }
  8. });

而通过channel.setPersistent(true)可以将channel设为永久存在。

4.服务端也是通过bayeuxServer.getChannel(name)来获取channel对象,其使用方法类似客户端。

5.服务端一般通过继承org.cometd.server.AbstractService,来实现服务。其一般格式如下:

  1. public class EchoService extends AbstractService
  2. {
  3. public EchoService(BayeuxServer bayeuxServer)
  4. {
  5. //构造函数,给父类传入服务名
  6. super(bayeuxServer,"echo")
  7. //映射channel和处理方法
  8. addService("/echo","processEcho")
  9. }
  10. //处理函数(委托),必须满足4中形式中的一种,且第一个参数都是ServerSession
  11. //JSON数据默认被解串行到HashMap中
  12. //如果在`addService`中使用了通配符,第二个参数应该是具体的channelName
  13. //`addService`和`removeService`都是可以动态调用的
  14. //每次add一个新service,CometD都会新建一个LocalSession,可以使用getLocalSession()取得
  15. public void processEcho(ServerSession remote, Map<String,Object> data)
  16. {
  17. remote.deliver(getServerSession(),"/echo",data,null);
  18. }
  19. }

除了使用继承方法外,也可以使用注解注入的方法。同Spring等泛用型框架不同,CometD只有部分支持。常用的方式如下:

  1. @Service("echoService")
  2. public class EchoService
  3. {
  4. //general
  5. @Inject
  6. private BayeuxServer bayeux;
  7. //equal getLocalSession()
  8. @org.cometd.annotation.Session
  9. private LocalSession localSession;
  10. //equal getServerSession()
  11. @Session
  12. private ServerSession serverSession;
  13. @Configure("/echo")
  14. public void configure(ConfigurableServerChannel channel)
  15. {
  16. channel.setLazy(true);
  17. channel.addAuthorizer(GrantAuthorizer.GRANT_PUBLISH);
  18. }
  19. @Listener("/echo")
  20. public void echo(ServerSession remote, ServerMessage.Mutable message)
  21. {
  22. String channel = message.getChannel();
  23. Object data=message.getData();
  24. remote.deliver(serverSession.channel,data,null);
  25. }
  26. @Subscription("/echo")
  27. public void echo(Message message)
  28. {
  29. System.out.println("Echo service published"+message);
  30. }
  31. }
  32. //注入的解析依赖`ServerAnnotationProcesser`类
  33. BayeuxServer bayeux=...;
  34. serverAnnotationProcessor processor = new ServerAnnotationProcessor(bayeux);
  35. Echoservice service=new EchoService();
  36. processor.process(service);
  37. //processor.deprocess(service);

客户端也可以使用这种注入解析的形式,这里从略。

服务集成

方式不止一种:

  1. <servlet>
  2. <servlet-name>cometd</servlet-name>
  3. <servlet-class>org.cometd.server.CometdServlet</servlet-class>
  4. <load-on-startup>1</load-on-startup>
  5. </servlet>
  6. <servlet>
  7. <servlet-name>configuration</servlet-name>
  8. <servlet-class>com.acme.cometd.ConfigurationServlet</servlet-class>
  9. <load-on-startup>2</load-on-startup>
  10. </servlet>

然后在代码中写入如下类:

  1. public class ConfigurationServlet extends GenericServlet
  2. {
  3. public void init() throws ServletException
  4. {
  5. BayeuxServer bayeux=(BayeuxServer)getServletContext().getAttribute(BayeuxServer.ATTRIBUTE);
  6. new EchoService(bayeux);
  7. }
  8. public void service(ServletRequest request, ServletResponse response) throws ServletException, IOException
  9. {
  10. throw new ServletException();
  11. }
  12. }
  1. <listener>
  2. <listener-class>com.acme.cometd.BayeuxInitializer</listener-class>
  3. </listener>

然后实现ServletContextAttributeListener接口,代码如下:

  1. public class BayeuxInitializer implements ServletContextAttributeListener
  2. {
  3. public void attributeAdded(ServletContextAttributeEvent event)
  4. {
  5. if (Bayeux.ATTRIBUTE.equals(event.getName()))
  6. {
  7. // Grab the Bayeux object
  8. BayeuxServer bayeux = (BayeuxServer)event.getValue();
  9. new EchoService(bayeux);
  10. // Create other services here
  11. // This is also the place where you can configure the Bayeux object
  12. // by adding extensions or specifying a SecurityPolicy
  13. }
  14. }
  15. public void attributeRemoved(ServletContextAttributeEvent event)
  16. {
  17. }
  18. public void attributeReplaced(ServletContextAttributeEvent event)
  19. {
  20. }
  21. }

注解注入的集成方法与上面类似,从略。

认证

为了对Bayeux协议执行过程进行控制,你可以继承一个org.cometd.bayeux.server.DefalutSecurityPolicy,并重载某些动作(canHandshake, canCreate, canSubscribe, canPublish)对应的函数。

服务端在创建时可以传入Channel的Initializer(),使用channel.addAuthorizer(..),添加验证子。后者必须实现public Result authorize(Operation operation, ChannelId channel, ServerSession session, ServerMessage message);这个方法。
该接口应该返回Result.grant(), Result.ignore()或者Result.deny(xxx)

验证子与Channel对应,可以对ServerChannel对象直接调用addAuthorizer方法。

Server端接口

Server端如果使用websocket,需要在web.xml中加入以下选项:

  1. <init-param>
  2. <param-name>transports</param-name>
  3. <param-value>org.cometd.websocket.server.WebSocketTransport</param-value>
  4. </init-param>

如果使用maven,记得在pom.xml中加入依赖org.cometd.java:cometd-websocket-jetty,并在WEB-INF/lib中放入需要的cometd-websocket-jetty-<version>.jar文件。

上下文信息

可以通过bayeuxServer.getContext()得到上下文对象,该对象可以用来获取客户端链接信息等。注意不要缓存该对象,每次使用时重新调用较为妥当。

惰性信道

如果消息不需要立刻发送,可以使用message.setLazy(true)将消息标记为惰性的;也可以使用channel.setLazy(true)将整个信道设置为惰性的。也可以使用setLazyTimeout(xxx)设置超时时间。

多重会话

从略

JMX集成

web.xml中添加

  1. <context-param>
  2. <param-name>org.eclipse.jetty.server.context.ManagedAttributes</param-name>
  3. <param-value>org.cometd.bayeux,org.cometd.oort.Oort</param-value>
  4. </context-param>

Oort集群

Oort是CometD自带的服务器集群解决方案,如果服务器down掉,前端均衡服务器会自动将所有连接与其他节点重新握手。

节点必须知道一个已经在集群中的节点才能动态加入集群,初始节点可以通过静态配置。

web.xml中添加如下配置:

  1. <servlet>
  2. <servlet-name>oort</servlet-name>
  3. <servlet-class>org.cometd.oort.OortMulticastConfigServlet</servlet-class>
  4. <load-on-startup>2</load-on-startup>
  5. <init-param>
  6. <param-name>oort.url</param-name>
  7. <param-value>http://host:port/context/cometd</param-value>
  8. </init-param>
  9. </servlet>

Oort依赖BayeuxServer,所以加载必须在后者之后。除了oort.url之外,还有其他的一系列参数可供配置,比如oort.cloud是一个逗号分割的一系列其他Orrt的URL,供本Oort在启动时加载。

也可以在代码中进行初始化:

  1. public class OortConfigurationServlet extends GenericServlet
  2. {
  3. public void init() throws ServletException
  4. {
  5. // Grab the Oort object
  6. Oort oort = (Oort)getServletContext().getAttribute(Oort.OORT_ATTRIBUTE);
  7. // Figure out the URLs to connect to, using other discovery means
  8. List<String> urls = ...;
  9. // Connect to the other Oort comets
  10. for (String url : urls)
  11. {
  12. OortComet oortComet = oort.observeComet(url);
  13. if (!oortComet.waitFor(1000, BayeuxClient.State.CONNECTED))
  14. throw new ServletException("Cannot connect to Oort comet " + url);
  15. }
  16. }
  17. public void service(ServletRequest request, ServletResponse response) throws ServletException, IOException
  18. {
  19. throw new ServletException();
  20. }
  21. }

静态发现的策略比较死板,不利于集群的动态扩展。可以通过配置达到动态增减oort node的目的。框架通过“多播”来达到这个目的,在web.xml中配置oort.multicast.*来配置多播参数,并保证操作系统、路由器等正确配置了多播支持。这样,所有的结点每隔一段时间会进行一次多播,收到该信息的其他结点自动尚未连接的新的结点。

如果同一user的不同client连接到了不同的server,其publish或者send到指定client的信息显然必须经过转发。server通过指定oort.channels来指定要求转发的channel,相同于一种“订阅”。

Seti

Seti是Oort的一个组件,用来追踪所有连接到集群上的客户端,其主要作用是透明化向指定useid client发送消息的过程。

使用seti.associate(userId,session)来将客户端的某种标识与ServerSession进行关联,也可以使用BayeuxServer.Extension完成该任务。推荐在handshake后完成关联。

当某个seti第一次与userId关联时,会在/seti/all信道上进行广播。这样,所有的cometd都知道userId与哪个seti关联了,以后同一个userId与同一个seti进行关联时就不会做任何广播了。同一个userId可以关联不同的seti任意多次,因此seti可以完成broadcast和deliver的功能。

取消关联时,Seti会计算连接数,如果userId的关联数降为0,就会广播该信息(类似引用计数)。

可以通过在Seti中订阅PresenceAdded事件来监视上述变化。当userIdServerSession关联后,可以使用seti.sendMessage(String userId,String channel, Object data)来向一个特定用户发送信息。

数据共享

node之间可能需要共享某些数据,比如连接到各自node的client数目,这可以通过OortObject来实现。

创建oort后,可以通过以下代码创建一个OortObject:

  1. Oort oort = ...;
  2. OortObject.Factory<List<String>> factory = OortObjectFactories.forList();
  3. OortObject<List<String>> users = new OortObject<List<String>>(oort, "users", factory);
  4. users.start();

每个node会复制其他结点OortObejct的数据,存放于本地。为了避免效率太低,cometD提供了OortMap等数据结构。可以通过以下代码分享数据:

  1. OortObject.Factory<List<String>> factory=users.getFactory();
  2. List<String> names=factory.newObject(null);
  3. names.add("B1");
  4. users.setAndShare(names);

  1. OortStringMap<UserInfo> userInfos = ..;
  2. userInfos.putAndShare("B1", new UserInfo("B1", ...));
  3. ...
  4. userInfos.removeAndShare("B1");

同理,OortList也有类似的addAndShareremoveAndShare方法。为了保证每次修改都能被其他node感知,必须使用这些指定的方法,而不是直接操作容器。

默认情况下,Oort将接受到的通过toString()方法将非原生类型串行为JSON字符串,如果需要持久化自定义类型,必须继承一个JSON.Convertor类,实现与JSON的相互转换。然后在web.xml中的oort和cometd的初始化参数中键入jsonContxt指定转换器。

每个node都本地存放有其他所有node的share object,可以通过org.cometd.oort.OortObjectMergers来一次性取出所有node的指定字段。

可以对OortObject添加监听器,以对数值变化作出响应,不同的动作(remove, add, update)对应不同的监听器(委托|回调函数)。

OortService

由于能修改OortObject的只有其所属的node,其他node如果想要修改某个node,就只能利用该node提供的OortService

其实现方式是继承一个OortService,并重载某些指定的method,具体可以参照官方文档。

扩展

该部分主要用来修改协议的ext字段。

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注