@delight
2014-05-02T12:34:17.000000Z
字数 13390
阅读 2322
job push java deploy
maven是java工程常用的部署工具,一般用来解决依赖问题,当然还有别的功能。
sudo aptitude install maven即可;Install New Software-Available Software中搜索Maven,选择Maven Integration for Eclipse安装即可;也可以使用Eclipse MarketPlace(4.0+)安装;File-new-maven project,导入工程选择File-Import-maven project from existing files;jetty应用,可以在run as-Main-goals里面填入jetty:run和对应的jetty:stop,具体可以参照这里.pom.xml中对整个项目进行配置,本项目使用了maven-war-plugin插件,如果需要编译,在命令行下使用mvn compile war:war(在src的上一层目录)就会在target目录下生成war文件,将该文件丢到jetty的webapps文件夹下,启动jetty服务即可运行服务程序。运行部分可以参照这里和这里.项目代码依照的是cometd框架,官方文档见此,请认真学习。下面的是个人笔记。
该协议用于客户端和服务器的双向异步通信(架构于HTTP或WebSocket协议之上),这种通信是通过名为隧道(channels)的模型上进行发布/订阅的。
从服务端向客户端push被称为推送,这种技术配合AJAX就组成了CometD项目。该项目的目的就是提供Bayeux协议的多种实现。
客户端(client)
注意客户端并不一定只运行于web客户端,也可以运行于web服务器中。客户端与服务器之间保持两个连接,保证异步双全工特性。
消息格式
使用JSON格式。
消息模型
必须支持long-polling,
连接过程
可以握手,也可以不握手。
细节
*匹配单个字段;**匹配多个字段;/meta/ channel是供协议自身使用的,Server端的Bayeux client可以订阅该隧道,Server端的处理句柄可以且只能把响应回送给发送原始请求信息的客户端;如果该信息里面有个id字段,响应必须在该字段含有相同的值;/service/是设计用来辅助处理请求-响应类型消息的。Server不应该记录客户端对该隧道的订阅;如果消息里有id字段,响应应该在该字段含有相同或相关联的值;clientId是标记客户端的字段,服务器保证该字段的唯一性;/meta/handshake)必须包含version字段;普通消息则必须包含clientId字段;advice字段是最优选项字段,两边都可以发送。其内容包括reconnect(服务器端,重连方式建议),timeout(客户端,超时值),interval(时间间隔,/meta/connect),multiple-clients(多重客户端,bool,服务器检测到同一个web客户端运行了多个Bayeux客户端),hosts(服务器告知客户端可选的替代服务器,数组),connectionType(连接类型,long-polling, callback-polling, iframe和flash的一种);/meta/**和/service/**的消息应该有id字段;timestamp是可选字段;data包含了真正的信息;successful字段,用来确认操作是否成功;/meta/subscribe和/meta/unsubscribe隧道的消息必须包含subscription字段;error字段作为响应信息的可选字段,标记错误原因;ext是自定义字段;connectionId和json-comment-filtered字段均已废弃;/meta/handshake隧道上握手,成功后在/meta/connect隧道上连接。握手会返回clientId字段;客户端和服务器之间应该维持有一条connect信息,用来保持心跳;/meta/disconnect隧道上推送断开连接的信息;服务端如果收到该消息,必须在统一隧道上进行响应。/meta/subscribe上订阅,服务器响应是否订阅成功;退订类似。channel进行了订阅,服务器必须负责将此类消息送到。long-polling),其请求形式应该是HTTP POST的application/json字段(UTF-8编码);也可以是application/x-www-form-urlencoded字段的编码消息;其响应应该是一个普通的HTTP响应,其内容被封装在application/json字段;ext字段进行。整体采用“发布-订阅”模型,这是客户端之间相互通信的方式;
客户端(Client Sessions)这边的会话使用javascript的org.cometd.Cometd对象,或者java中的org.cometd.bayeux.client.ClientSession[或其BayeuxClient子类]。
服务端(Server Sessions)这边使用org.cometd.bayeux.server.ServerSession类或其BayeuxServer子类,二者的联系在/meta/connect/发送后建立。服务端有一个消息队列,用来暂存推送到订阅端的消息。
本地会话(Local Sessions)是web服务器上的Bayeux client,通过org.cometd.bayeux.server.LocalSession实现。想要创建服务端必须有一个对应的客户端,所以这个Local session主要是为了方便某些行为而实现的。

ClientSession.getChannel(String)取得Channel对象,向里面写消息;消息首先通过客户端的extensions,进行第一次处理;然后消息被送往客户端的transport,该模块负责将消息转化为JSON格式(JSONContext.Client),与server端的transport通信,将消息发送出去;服务端的transport将消息转回普通格式(JSONContext.Server),将消息交给BayeuxServer处理;后者先使用BayeuxServer的extensions进行第一次过滤,然后使用ServerSession的extensions进行第二次过滤,随后,调用认证模块检测安全测试和认证子进行第三次过滤。最后,如果是广播或者服务消息,服务器将会使用BayeuxServer进行发送。Server方的listeners拥有最后一次修改消息的权利,随后消息内容被冻结。如果消息是广播消息,服务器负责推送到订阅的客户端。SecurityPolicy,重载canHandshake方法,最后注册该模块。/service/** Channel,服务端这边的listener可以从信息里面取出userId等标识字段,然后使用对应的ServerSession.deliver方法进行转发;LocalSession帮忙了,新建一个与之握手,在收到外部事件后,在该server上需要的channel上publish即可;略
Java部分的客户端和服务端都是用Jetty框架搭建的。
客户端,如前所述,主要是用org.cometd.bayeux.client.ClientSession接口的实现类BayeuxClient。典型使用如下:
//新建Jetty的http客户端HttpClient httpClient=new HttpClient();//启动httpClient.start();//选项的字典Map<String, Object> options=new HashMap<String,Object>();//长轮询接口ClientTransport transport=new LongPollingTransport(options,httpClient);//新建session,绑定该接口;必须知道服务器对应的绝对路径ClientSession client=new BayeuxClient("http://localhost:8080/cometd",transport);//握手,异步回调client.handshake(new ClientSessionChannel.MessageListener(){public void onMessage(ClientSessionChannel channel,Message message){if(message.isSuccessful()){...}}});//也可以通过在调用`handshake()`之前使用//client.getChannel(Channel.META_HANDSHAKE).addListener//添加一个新的监听器来达到该目的。//或者干脆使用同步的方法://boolean handshaken = client.waitFor(1000, BayeuxClient.State.CONNECTED);//订阅channel,必须在握手成功后才能执行//FooListener实现`ClientSessionChannel.MessageListener`接口//可以在`subscribe`函数的第二个参数再传入一个回调接口,检测是否订阅成功static final String CHANNEL="/foo";ClientSessionChannel.MessageListener fooListener=new FooListener();client.getChannel(CHANNEL).subscribe(fooListener);//如果在客户端监听meta通道,可以处理服务器的握手响应client.getChannel(Channel.META_HANDSHAKE).addListener(new ...){...}//发布信息,填充消息字典data,使用下面的语句在指定channel上发布client.getChannel("/game/table/1").publish(data);//你也可以在`publish`的第二个参数上使用一个回调接口来确定publish是否成功//批量发送,即batchclient.batch( new Runnable(){public void run(){Map<String, Object> data1 = new HashMap<string,Object>();client.getChannel("/game/table/1").publish(data1);Map<String,Object> data2 = new HashMap<String,Object>();client.getChannel("/game/chat/1").publish(data2);}});//或者你可以使用`StartBatch()`和`endBatch`方法,后者最好在`finally`中调用//断开连接client.disconnect(new ClientSessionChannel.MessageListener(){...}//也可以使用阻塞形式的waitFor方法
如果使用websocket协议,必须在绑定long-pulling之前绑定websocket,因为后者可能不被支持。
WebSocketClientFactory wsFactory=new WebSocketClientFactory();wsFactory.start();ClientTransport wsTransport=new WebSocketTransport(null, wsFactory, null);//http transport...BayeuxClient client=new BayeuxClient("http://localhost:8080/cometd",wsTransport,httpTransport);client.handshake();
在options中需要填入各个协议相关参数,用以优化配置。
如果使用maven,客户端的依赖包有:org.cometd.java:cometd-java-client和一个SLF4J的日志库,比如org.slf4j-log4jl2,这样就可以使用long-polling接口了;如果需要使用websocket,则必须再添加org.cometd.java:cometd-websocket-jetty模块。
BayeuxServer和接口的参数一般在web.xml中进行,作为org.cometd.server.CometdServlet,如果使用maven构建,这里会自动生成的。当然你也可以自己填入相关参数。async-supported元素并进行配置;
BayeuxServer bayeuxServer=...;boolean created = bayeuxServer.createIfAbsent("/my/channel",new ServerChannel.Initializer(){public void configureChannel(ConfigurableServerChannel channel){channel.setPersistent(true);}});
而通过channel.setPersistent(true)可以将channel设为永久存在。
4.服务端也是通过bayeuxServer.getChannel(name)来获取channel对象,其使用方法类似客户端。
5.服务端一般通过继承org.cometd.server.AbstractService,来实现服务。其一般格式如下:
public class EchoService extends AbstractService{public EchoService(BayeuxServer bayeuxServer){//构造函数,给父类传入服务名super(bayeuxServer,"echo")//映射channel和处理方法addService("/echo","processEcho")}//处理函数(委托),必须满足4中形式中的一种,且第一个参数都是ServerSession//JSON数据默认被解串行到HashMap中//如果在`addService`中使用了通配符,第二个参数应该是具体的channelName//`addService`和`removeService`都是可以动态调用的//每次add一个新service,CometD都会新建一个LocalSession,可以使用getLocalSession()取得public void processEcho(ServerSession remote, Map<String,Object> data){remote.deliver(getServerSession(),"/echo",data,null);}}
除了使用继承方法外,也可以使用注解注入的方法。同Spring等泛用型框架不同,CometD只有部分支持。常用的方式如下:
@Service("echoService")public class EchoService{//general@Injectprivate BayeuxServer bayeux;//equal getLocalSession()@org.cometd.annotation.Sessionprivate LocalSession localSession;//equal getServerSession()@Sessionprivate ServerSession serverSession;@Configure("/echo")public void configure(ConfigurableServerChannel channel){channel.setLazy(true);channel.addAuthorizer(GrantAuthorizer.GRANT_PUBLISH);}@Listener("/echo")public void echo(ServerSession remote, ServerMessage.Mutable message){String channel = message.getChannel();Object data=message.getData();remote.deliver(serverSession.channel,data,null);}@Subscription("/echo")public void echo(Message message){System.out.println("Echo service published"+message);}}//注入的解析依赖`ServerAnnotationProcesser`类BayeuxServer bayeux=...;serverAnnotationProcessor processor = new ServerAnnotationProcessor(bayeux);Echoservice service=new EchoService();processor.process(service);//processor.deprocess(service);
客户端也可以使用这种注入解析的形式,这里从略。
方式不止一种:
web.xml,
<servlet><servlet-name>cometd</servlet-name><servlet-class>org.cometd.server.CometdServlet</servlet-class><load-on-startup>1</load-on-startup></servlet><servlet><servlet-name>configuration</servlet-name><servlet-class>com.acme.cometd.ConfigurationServlet</servlet-class><load-on-startup>2</load-on-startup></servlet>
然后在代码中写入如下类:
public class ConfigurationServlet extends GenericServlet{public void init() throws ServletException{BayeuxServer bayeux=(BayeuxServer)getServletContext().getAttribute(BayeuxServer.ATTRIBUTE);new EchoService(bayeux);}public void service(ServletRequest request, ServletResponse response) throws ServletException, IOException{throw new ServletException();}}
web.xml中添加listener,如下:
<listener><listener-class>com.acme.cometd.BayeuxInitializer</listener-class></listener>
然后实现ServletContextAttributeListener接口,代码如下:
public class BayeuxInitializer implements ServletContextAttributeListener{public void attributeAdded(ServletContextAttributeEvent event){if (Bayeux.ATTRIBUTE.equals(event.getName())){// Grab the Bayeux objectBayeuxServer bayeux = (BayeuxServer)event.getValue();new EchoService(bayeux);// Create other services here// This is also the place where you can configure the Bayeux object// by adding extensions or specifying a SecurityPolicy}}public void attributeRemoved(ServletContextAttributeEvent event){}public void attributeReplaced(ServletContextAttributeEvent event){}}
注解注入的集成方法与上面类似,从略。
为了对Bayeux协议执行过程进行控制,你可以继承一个org.cometd.bayeux.server.DefalutSecurityPolicy,并重载某些动作(canHandshake, canCreate, canSubscribe, canPublish)对应的函数。
服务端在创建时可以传入Channel的Initializer(),使用channel.addAuthorizer(..),添加验证子。后者必须实现public Result authorize(Operation operation, ChannelId channel, ServerSession session, ServerMessage message);这个方法。
该接口应该返回Result.grant(), Result.ignore()或者Result.deny(xxx)。
验证子与Channel对应,可以对ServerChannel对象直接调用addAuthorizer方法。
Server端如果使用websocket,需要在web.xml中加入以下选项:
<init-param><param-name>transports</param-name><param-value>org.cometd.websocket.server.WebSocketTransport</param-value></init-param>
如果使用maven,记得在pom.xml中加入依赖org.cometd.java:cometd-websocket-jetty,并在WEB-INF/lib中放入需要的cometd-websocket-jetty-<version>.jar文件。
可以通过bayeuxServer.getContext()得到上下文对象,该对象可以用来获取客户端链接信息等。注意不要缓存该对象,每次使用时重新调用较为妥当。
如果消息不需要立刻发送,可以使用message.setLazy(true)将消息标记为惰性的;也可以使用channel.setLazy(true)将整个信道设置为惰性的。也可以使用setLazyTimeout(xxx)设置超时时间。
从略
在web.xml中添加
<context-param><param-name>org.eclipse.jetty.server.context.ManagedAttributes</param-name><param-value>org.cometd.bayeux,org.cometd.oort.Oort</param-value></context-param>
Oort是CometD自带的服务器集群解决方案,如果服务器down掉,前端均衡服务器会自动将所有连接与其他节点重新握手。
节点必须知道一个已经在集群中的节点才能动态加入集群,初始节点可以通过静态配置。
在web.xml中添加如下配置:
<servlet><servlet-name>oort</servlet-name><servlet-class>org.cometd.oort.OortMulticastConfigServlet</servlet-class><load-on-startup>2</load-on-startup><init-param><param-name>oort.url</param-name><param-value>http://host:port/context/cometd</param-value></init-param></servlet>
Oort依赖BayeuxServer,所以加载必须在后者之后。除了oort.url之外,还有其他的一系列参数可供配置,比如oort.cloud是一个逗号分割的一系列其他Orrt的URL,供本Oort在启动时加载。
也可以在代码中进行初始化:
public class OortConfigurationServlet extends GenericServlet{public void init() throws ServletException{// Grab the Oort objectOort oort = (Oort)getServletContext().getAttribute(Oort.OORT_ATTRIBUTE);// Figure out the URLs to connect to, using other discovery meansList<String> urls = ...;// Connect to the other Oort cometsfor (String url : urls){OortComet oortComet = oort.observeComet(url);if (!oortComet.waitFor(1000, BayeuxClient.State.CONNECTED))throw new ServletException("Cannot connect to Oort comet " + url);}}public void service(ServletRequest request, ServletResponse response) throws ServletException, IOException{throw new ServletException();}}
静态发现的策略比较死板,不利于集群的动态扩展。可以通过配置达到动态增减oort node的目的。框架通过“多播”来达到这个目的,在web.xml中配置oort.multicast.*来配置多播参数,并保证操作系统、路由器等正确配置了多播支持。这样,所有的结点每隔一段时间会进行一次多播,收到该信息的其他结点自动尚未连接的新的结点。
如果同一user的不同client连接到了不同的server,其publish或者send到指定client的信息显然必须经过转发。server通过指定oort.channels来指定要求转发的channel,相同于一种“订阅”。
Seti是Oort的一个组件,用来追踪所有连接到集群上的客户端,其主要作用是透明化向指定useid client发送消息的过程。
使用seti.associate(userId,session)来将客户端的某种标识与ServerSession进行关联,也可以使用BayeuxServer.Extension完成该任务。推荐在handshake后完成关联。
当某个seti第一次与userId关联时,会在/seti/all信道上进行广播。这样,所有的cometd都知道userId与哪个seti关联了,以后同一个userId与同一个seti进行关联时就不会做任何广播了。同一个userId可以关联不同的seti任意多次,因此seti可以完成broadcast和deliver的功能。
取消关联时,Seti会计算连接数,如果userId的关联数降为0,就会广播该信息(类似引用计数)。
可以通过在Seti中订阅PresenceAdded事件来监视上述变化。当userId与ServerSession关联后,可以使用seti.sendMessage(String userId,String channel, Object data)来向一个特定用户发送信息。
node之间可能需要共享某些数据,比如连接到各自node的client数目,这可以通过OortObject来实现。
创建oort后,可以通过以下代码创建一个OortObject:
Oort oort = ...;OortObject.Factory<List<String>> factory = OortObjectFactories.forList();OortObject<List<String>> users = new OortObject<List<String>>(oort, "users", factory);users.start();
每个node会复制其他结点OortObejct的数据,存放于本地。为了避免效率太低,cometD提供了OortMap等数据结构。可以通过以下代码分享数据:
OortObject.Factory<List<String>> factory=users.getFactory();List<String> names=factory.newObject(null);names.add("B1");users.setAndShare(names);
或
OortStringMap<UserInfo> userInfos = ..;userInfos.putAndShare("B1", new UserInfo("B1", ...));...userInfos.removeAndShare("B1");
同理,OortList也有类似的addAndShare和removeAndShare方法。为了保证每次修改都能被其他node感知,必须使用这些指定的方法,而不是直接操作容器。
默认情况下,Oort将接受到的通过toString()方法将非原生类型串行为JSON字符串,如果需要持久化自定义类型,必须继承一个JSON.Convertor类,实现与JSON的相互转换。然后在web.xml中的oort和cometd的初始化参数中键入jsonContxt指定转换器。
每个node都本地存放有其他所有node的share object,可以通过org.cometd.oort.OortObjectMergers来一次性取出所有node的指定字段。
可以对OortObject添加监听器,以对数值变化作出响应,不同的动作(remove, add, update)对应不同的监听器(委托|回调函数)。
由于能修改OortObject的只有其所属的node,其他node如果想要修改某个node,就只能利用该node提供的OortService。
其实现方式是继承一个OortService,并重载某些指定的method,具体可以参照官方文档。
该部分主要用来修改协议的ext字段。