@delight
2014-05-02T20:34:17.000000Z
字数 13390
阅读 2061
job
push
java
deploy
maven是java工程常用的部署工具,一般用来解决依赖问题,当然还有别的功能。
sudo aptitude install maven
即可;Install New Software
-Available Software
中搜索Maven
,选择Maven Integration for Eclipse
安装即可;也可以使用Eclipse MarketPlace
(4.0+)安装;File
-new
-maven project
,导入工程选择File
-Import
-maven project from existing files
;jetty
应用,可以在run as
-Main
-goals
里面填入jetty:run
和对应的jetty:stop
,具体可以参照这里.pom.xml
中对整个项目进行配置,本项目使用了maven-war-plugin
插件,如果需要编译,在命令行下使用mvn compile war:war
(在src
的上一层目录)就会在target
目录下生成war
文件,将该文件丢到jetty的webapps
文件夹下,启动jetty服务即可运行服务程序。运行部分可以参照这里和这里.项目代码依照的是cometd框架,官方文档见此,请认真学习。下面的是个人笔记。
该协议用于客户端和服务器的双向异步通信(架构于HTTP或WebSocket协议之上),这种通信是通过名为隧道(channels
)的模型上进行发布/订阅的。
从服务端向客户端push
被称为推送,这种技术配合AJAX
就组成了CometD项目。该项目的目的就是提供Bayeux协议的多种实现。
客户端(client
)
注意客户端并不一定只运行于web客户端,也可以运行于web服务器中。客户端与服务器之间保持两个连接,保证异步双全工特性。
消息格式
使用JSON
格式。
消息模型
必须支持long-polling
,
连接过程
可以握手,也可以不握手。
细节
*
匹配单个字段;**
匹配多个字段;/meta/
channel是供协议自身使用的,Server端的Bayeux client
可以订阅该隧道,Server端的处理句柄可以且只能把响应回送给发送原始请求信息的客户端;如果该信息里面有个id
字段,响应必须在该字段含有相同的值;/service/
是设计用来辅助处理请求-响应类型消息的。Server不应该记录客户端对该隧道的订阅;如果消息里有id
字段,响应应该在该字段含有相同或相关联的值;clientId
是标记客户端的字段,服务器保证该字段的唯一性;/meta/handshake
)必须包含version
字段;普通消息则必须包含clientId
字段;advice
字段是最优选项字段,两边都可以发送。其内容包括reconnect
(服务器端,重连方式建议),timeout
(客户端,超时值),interval
(时间间隔,/meta/connect
),multiple-clients
(多重客户端,bool,服务器检测到同一个web客户端运行了多个Bayeux客户端),hosts
(服务器告知客户端可选的替代服务器,数组),connectionType
(连接类型,long-polling
, callback-polling
, iframe
和flash
的一种);/meta/**
和/service/**
的消息应该有id
字段;timestamp
是可选字段;data
包含了真正的信息;successful
字段,用来确认操作是否成功;/meta/subscribe
和/meta/unsubscribe
隧道的消息必须包含subscription
字段;error
字段作为响应信息的可选字段,标记错误原因;ext
是自定义字段;connectionId
和json-comment-filtered
字段均已废弃;/meta/handshake
隧道上握手,成功后在/meta/connect
隧道上连接。握手会返回clientId
字段;客户端和服务器之间应该维持有一条connect信息,用来保持心跳;/meta/disconnect
隧道上推送断开连接的信息;服务端如果收到该消息,必须在统一隧道上进行响应。/meta/subscribe
上订阅,服务器响应是否订阅成功;退订类似。channel
进行了订阅,服务器必须负责将此类消息送到。long-polling
),其请求形式应该是HTTP POST的application/json
字段(UTF-8编码);也可以是application/x-www-form-urlencoded
字段的编码消息;其响应应该是一个普通的HTTP响应,其内容被封装在application/json
字段;ext
字段进行。整体采用“发布-订阅”模型,这是客户端之间相互通信的方式;
客户端(Client Sessions)这边的会话使用javascript的org.cometd.Cometd
对象,或者java中的org.cometd.bayeux.client.ClientSession
[或其BayeuxClient
子类]。
服务端(Server Sessions)这边使用org.cometd.bayeux.server.ServerSession
类或其BayeuxServer
子类,二者的联系在/meta/connect/
发送后建立。服务端有一个消息队列,用来暂存推送到订阅端的消息。
本地会话(Local Sessions)是web服务器上的Bayeux client,通过org.cometd.bayeux.server.LocalSession
实现。想要创建服务端必须有一个对应的客户端,所以这个Local session主要是为了方便某些行为而实现的。
ClientSession.getChannel(String)
取得Channel对象,向里面写消息;消息首先通过客户端的extensions
,进行第一次处理;然后消息被送往客户端的transport
,该模块负责将消息转化为JSON格式(JSONContext.Client
),与server端的transport
通信,将消息发送出去;服务端的transport
将消息转回普通格式(JSONContext.Server
),将消息交给BayeuxServer
处理;后者先使用BayeuxServer
的extensions
进行第一次过滤,然后使用ServerSession
的extensions
进行第二次过滤,随后,调用认证模块检测安全测试和认证子进行第三次过滤。最后,如果是广播或者服务消息,服务器将会使用BayeuxServer
进行发送。Server方的listeners
拥有最后一次修改消息的权利,随后消息内容被冻结。如果消息是广播消息,服务器负责推送到订阅的客户端。SecurityPolicy
,重载canHandshake
方法,最后注册该模块。/service/**
Channel,服务端这边的listener可以从信息里面取出userId等标识字段,然后使用对应的ServerSession.deliver
方法进行转发;LocalSession
帮忙了,新建一个与之握手,在收到外部事件后,在该server上需要的channel上publish
即可;略
Java部分的客户端和服务端都是用Jetty框架搭建的。
客户端,如前所述,主要是用org.cometd.bayeux.client.ClientSession
接口的实现类BayeuxClient
。典型使用如下:
//新建Jetty的http客户端
HttpClient httpClient=new HttpClient();
//启动
httpClient.start();
//选项的字典
Map<String, Object> options=new HashMap<String,Object>();
//长轮询接口
ClientTransport transport=new LongPollingTransport(options,httpClient);
//新建session,绑定该接口;必须知道服务器对应的绝对路径
ClientSession client=new BayeuxClient("http://localhost:8080/cometd",transport);
//握手,异步回调
client.handshake(new ClientSessionChannel.MessageListener()
{
public void onMessage(ClientSessionChannel channel,Message message)
{
if(message.isSuccessful())
{...}
}
});
//也可以通过在调用`handshake()`之前使用
//client.getChannel(Channel.META_HANDSHAKE).addListener
//添加一个新的监听器来达到该目的。
//或者干脆使用同步的方法:
//boolean handshaken = client.waitFor(1000, BayeuxClient.State.CONNECTED);
//订阅channel,必须在握手成功后才能执行
//FooListener实现`ClientSessionChannel.MessageListener`接口
//可以在`subscribe`函数的第二个参数再传入一个回调接口,检测是否订阅成功
static final String CHANNEL="/foo";
ClientSessionChannel.MessageListener fooListener=new FooListener();
client.getChannel(CHANNEL).subscribe(fooListener);
//如果在客户端监听meta通道,可以处理服务器的握手响应
client.getChannel(Channel.META_HANDSHAKE).addListener(new ...)
{...}
//发布信息,填充消息字典data,使用下面的语句在指定channel上发布
client.getChannel("/game/table/1").publish(data);
//你也可以在`publish`的第二个参数上使用一个回调接口来确定publish是否成功
//批量发送,即batch
client.batch( new Runnable()
{
public void run()
{
Map<String, Object> data1 = new HashMap<string,Object>();
client.getChannel("/game/table/1").publish(data1);
Map<String,Object> data2 = new HashMap<String,Object>();
client.getChannel("/game/chat/1").publish(data2);
}
});
//或者你可以使用`StartBatch()`和`endBatch`方法,后者最好在`finally`中调用
//断开连接
client.disconnect(new ClientSessionChannel.MessageListener()
{...}
//也可以使用阻塞形式的waitFor方法
如果使用websocket协议,必须在绑定long-pulling之前绑定websocket,因为后者可能不被支持。
WebSocketClientFactory wsFactory=new WebSocketClientFactory();
wsFactory.start();
ClientTransport wsTransport=new WebSocketTransport(null, wsFactory, null);
//http transport
...
BayeuxClient client=new BayeuxClient("http://localhost:8080/cometd",wsTransport,httpTransport);
client.handshake();
在options中需要填入各个协议相关参数,用以优化配置。
如果使用maven,客户端的依赖包有:org.cometd.java:cometd-java-client
和一个SLF4J的日志库,比如org.slf4j-log4jl2
,这样就可以使用long-polling接口了;如果需要使用websocket,则必须再添加org.cometd.java:cometd-websocket-jetty
模块。
BayeuxServer
和接口的参数一般在web.xml
中进行,作为org.cometd.server.CometdServlet
,如果使用maven构建,这里会自动生成的。当然你也可以自己填入相关参数。async-supported
元素并进行配置;
BayeuxServer bayeuxServer=...;
boolean created = bayeuxServer.createIfAbsent("/my/channel",new ServerChannel.Initializer()
{
public void configureChannel(ConfigurableServerChannel channel)
{
channel.setPersistent(true);
}
});
而通过channel.setPersistent(true)
可以将channel设为永久存在。
4.服务端也是通过bayeuxServer.getChannel(name)
来获取channel对象,其使用方法类似客户端。
5.服务端一般通过继承org.cometd.server.AbstractService
,来实现服务。其一般格式如下:
public class EchoService extends AbstractService
{
public EchoService(BayeuxServer bayeuxServer)
{
//构造函数,给父类传入服务名
super(bayeuxServer,"echo")
//映射channel和处理方法
addService("/echo","processEcho")
}
//处理函数(委托),必须满足4中形式中的一种,且第一个参数都是ServerSession
//JSON数据默认被解串行到HashMap中
//如果在`addService`中使用了通配符,第二个参数应该是具体的channelName
//`addService`和`removeService`都是可以动态调用的
//每次add一个新service,CometD都会新建一个LocalSession,可以使用getLocalSession()取得
public void processEcho(ServerSession remote, Map<String,Object> data)
{
remote.deliver(getServerSession(),"/echo",data,null);
}
}
除了使用继承方法外,也可以使用注解注入的方法。同Spring
等泛用型框架不同,CometD只有部分支持。常用的方式如下:
@Service("echoService")
public class EchoService
{
//general
@Inject
private BayeuxServer bayeux;
//equal getLocalSession()
@org.cometd.annotation.Session
private LocalSession localSession;
//equal getServerSession()
@Session
private ServerSession serverSession;
@Configure("/echo")
public void configure(ConfigurableServerChannel channel)
{
channel.setLazy(true);
channel.addAuthorizer(GrantAuthorizer.GRANT_PUBLISH);
}
@Listener("/echo")
public void echo(ServerSession remote, ServerMessage.Mutable message)
{
String channel = message.getChannel();
Object data=message.getData();
remote.deliver(serverSession.channel,data,null);
}
@Subscription("/echo")
public void echo(Message message)
{
System.out.println("Echo service published"+message);
}
}
//注入的解析依赖`ServerAnnotationProcesser`类
BayeuxServer bayeux=...;
serverAnnotationProcessor processor = new ServerAnnotationProcessor(bayeux);
Echoservice service=new EchoService();
processor.process(service);
//processor.deprocess(service);
客户端也可以使用这种注入解析的形式,这里从略。
方式不止一种:
web.xml
,
<servlet>
<servlet-name>cometd</servlet-name>
<servlet-class>org.cometd.server.CometdServlet</servlet-class>
<load-on-startup>1</load-on-startup>
</servlet>
<servlet>
<servlet-name>configuration</servlet-name>
<servlet-class>com.acme.cometd.ConfigurationServlet</servlet-class>
<load-on-startup>2</load-on-startup>
</servlet>
然后在代码中写入如下类:
public class ConfigurationServlet extends GenericServlet
{
public void init() throws ServletException
{
BayeuxServer bayeux=(BayeuxServer)getServletContext().getAttribute(BayeuxServer.ATTRIBUTE);
new EchoService(bayeux);
}
public void service(ServletRequest request, ServletResponse response) throws ServletException, IOException
{
throw new ServletException();
}
}
web.xml
中添加listener
,如下:
<listener>
<listener-class>com.acme.cometd.BayeuxInitializer</listener-class>
</listener>
然后实现ServletContextAttributeListener
接口,代码如下:
public class BayeuxInitializer implements ServletContextAttributeListener
{
public void attributeAdded(ServletContextAttributeEvent event)
{
if (Bayeux.ATTRIBUTE.equals(event.getName()))
{
// Grab the Bayeux object
BayeuxServer bayeux = (BayeuxServer)event.getValue();
new EchoService(bayeux);
// Create other services here
// This is also the place where you can configure the Bayeux object
// by adding extensions or specifying a SecurityPolicy
}
}
public void attributeRemoved(ServletContextAttributeEvent event)
{
}
public void attributeReplaced(ServletContextAttributeEvent event)
{
}
}
注解注入的集成方法与上面类似,从略。
为了对Bayeux协议执行过程进行控制,你可以继承一个org.cometd.bayeux.server.DefalutSecurityPolicy
,并重载某些动作(canHandshake
, canCreate
, canSubscribe
, canPublish
)对应的函数。
服务端在创建时可以传入Channel的Initializer()
,使用channel.addAuthorizer(..)
,添加验证子。后者必须实现public Result authorize(Operation operation, ChannelId channel, ServerSession session, ServerMessage message);
这个方法。
该接口应该返回Result.grant()
, Result.ignore()
或者Result.deny(xxx)
。
验证子与Channel对应,可以对ServerChannel
对象直接调用addAuthorizer
方法。
Server端如果使用websocket,需要在web.xml
中加入以下选项:
<init-param>
<param-name>transports</param-name>
<param-value>org.cometd.websocket.server.WebSocketTransport</param-value>
</init-param>
如果使用maven,记得在pom.xml
中加入依赖org.cometd.java:cometd-websocket-jetty
,并在WEB-INF/lib
中放入需要的cometd-websocket-jetty-<version>.jar
文件。
可以通过bayeuxServer.getContext()
得到上下文对象,该对象可以用来获取客户端链接信息等。注意不要缓存该对象,每次使用时重新调用较为妥当。
如果消息不需要立刻发送,可以使用message.setLazy(true)
将消息标记为惰性的;也可以使用channel.setLazy(true)
将整个信道设置为惰性的。也可以使用setLazyTimeout(xxx)
设置超时时间。
从略
在web.xml
中添加
<context-param>
<param-name>org.eclipse.jetty.server.context.ManagedAttributes</param-name>
<param-value>org.cometd.bayeux,org.cometd.oort.Oort</param-value>
</context-param>
Oort是CometD自带的服务器集群解决方案,如果服务器down掉,前端均衡服务器会自动将所有连接与其他节点重新握手。
节点必须知道一个已经在集群中的节点才能动态加入集群,初始节点可以通过静态配置。
在web.xml
中添加如下配置:
<servlet>
<servlet-name>oort</servlet-name>
<servlet-class>org.cometd.oort.OortMulticastConfigServlet</servlet-class>
<load-on-startup>2</load-on-startup>
<init-param>
<param-name>oort.url</param-name>
<param-value>http://host:port/context/cometd</param-value>
</init-param>
</servlet>
Oort依赖BayeuxServer
,所以加载必须在后者之后。除了oort.url
之外,还有其他的一系列参数可供配置,比如oort.cloud
是一个逗号分割的一系列其他Orrt的URL,供本Oort在启动时加载。
也可以在代码中进行初始化:
public class OortConfigurationServlet extends GenericServlet
{
public void init() throws ServletException
{
// Grab the Oort object
Oort oort = (Oort)getServletContext().getAttribute(Oort.OORT_ATTRIBUTE);
// Figure out the URLs to connect to, using other discovery means
List<String> urls = ...;
// Connect to the other Oort comets
for (String url : urls)
{
OortComet oortComet = oort.observeComet(url);
if (!oortComet.waitFor(1000, BayeuxClient.State.CONNECTED))
throw new ServletException("Cannot connect to Oort comet " + url);
}
}
public void service(ServletRequest request, ServletResponse response) throws ServletException, IOException
{
throw new ServletException();
}
}
静态发现的策略比较死板,不利于集群的动态扩展。可以通过配置达到动态增减oort node的目的。框架通过“多播”来达到这个目的,在web.xml
中配置oort.multicast.*
来配置多播参数,并保证操作系统、路由器等正确配置了多播支持。这样,所有的结点每隔一段时间会进行一次多播,收到该信息的其他结点自动尚未连接的新的结点。
如果同一user的不同client连接到了不同的server,其publish或者send到指定client的信息显然必须经过转发。server通过指定oort.channels
来指定要求转发的channel,相同于一种“订阅”。
Seti
是Oort的一个组件,用来追踪所有连接到集群上的客户端,其主要作用是透明化向指定useid client发送消息的过程。
使用seti.associate(userId,session)
来将客户端的某种标识与ServerSession进行关联,也可以使用BayeuxServer.Extension
完成该任务。推荐在handshake后完成关联。
当某个seti
第一次与userId
关联时,会在/seti/all
信道上进行广播。这样,所有的cometd都知道userId
与哪个seti
关联了,以后同一个userId
与同一个seti
进行关联时就不会做任何广播了。同一个userId可以关联不同的seti任意多次,因此seti可以完成broadcast和deliver的功能。
取消关联时,Seti
会计算连接数,如果userId
的关联数降为0,就会广播该信息(类似引用计数)。
可以通过在Seti
中订阅PresenceAdded
事件来监视上述变化。当userId
与ServerSession
关联后,可以使用seti.sendMessage(String userId,String channel, Object data)
来向一个特定用户发送信息。
node之间可能需要共享某些数据,比如连接到各自node的client数目,这可以通过OortObject
来实现。
创建oort后,可以通过以下代码创建一个OortObject
:
Oort oort = ...;
OortObject.Factory<List<String>> factory = OortObjectFactories.forList();
OortObject<List<String>> users = new OortObject<List<String>>(oort, "users", factory);
users.start();
每个node会复制其他结点OortObejct的数据,存放于本地。为了避免效率太低,cometD提供了OortMap
等数据结构。可以通过以下代码分享数据:
OortObject.Factory<List<String>> factory=users.getFactory();
List<String> names=factory.newObject(null);
names.add("B1");
users.setAndShare(names);
或
OortStringMap<UserInfo> userInfos = ..;
userInfos.putAndShare("B1", new UserInfo("B1", ...));
...
userInfos.removeAndShare("B1");
同理,OortList
也有类似的addAndShare
和removeAndShare
方法。为了保证每次修改都能被其他node感知,必须使用这些指定的方法,而不是直接操作容器。
默认情况下,Oort将接受到的通过toString()
方法将非原生类型串行为JSON字符串,如果需要持久化自定义类型,必须继承一个JSON.Convertor
类,实现与JSON的相互转换。然后在web.xml中的oort和cometd的初始化参数中键入jsonContxt
指定转换器。
每个node都本地存放有其他所有node的share object,可以通过org.cometd.oort.OortObjectMergers
来一次性取出所有node的指定字段。
可以对OortObject
添加监听器,以对数值变化作出响应,不同的动作(remove, add, update)对应不同的监听器(委托|回调函数)。
由于能修改OortObject
的只有其所属的node,其他node如果想要修改某个node,就只能利用该node提供的OortService
。
其实现方式是继承一个OortService
,并重载某些指定的method,具体可以参照官方文档。
该部分主要用来修改协议的ext
字段。