@a604572782
2017-05-03T01:53:56.000000Z
字数 13160
阅读 3368
NetMQ ZeroMQ C#
[NetMQ](https://github.com/zeromq/netmq.git)是ZeroMQ的C#移植版本,它是对标准socket接口的扩展。它提供了一种异步消息队列,多消息模式,消息过滤(订阅),对多种传输协议的无缝访问。
当前有2个版本正在维护,版本3最新版为3.3.4,版本4最新版本为4.0.1。本文档是对4.0.1分支代码进行分析。
对NetMQ的源码进行学习并分析理解,因此写下该系列文章,本系列文章暂定编写计划如下:
1. 消息队列NetMQ 原理分析1-Context和ZObject
2. 消息队列NetMQ 原理分析2-IO线程和完成端口
3. 消息队列NetMQ 原理分析3-命令产生/处理、创建Socket和回收线程
4. 消息队列NetMQ 原理分析4-Session、Option和Pipe
5. 消息队列NetMQ 原理分析5-Engine
6. 消息队列NetMQ 原理分析6-TCP和Inpoc实现
7. 消息队列NetMQ 原理分析7-Device
8. 消息队列NetMQ 原理分析8-不同类型的Socket
9. 消息队列NetMQ 原理分析9-实战
友情提示: 看本系列文章时最好获取源码,更有助于理解。
Command定义如下
internal struct Command{public Command([CanBeNull] ZObject destination, CommandType type, [CanBeNull] object arg = null) : this(){Destination = destination;CommandType = type;Arg = arg;}[CanBeNull]public ZObject Destination { get; }public CommandType CommandType { get; }[CanBeNull]public object Arg { get; private set; }public override string ToString(){return base.ToString() + "[" + CommandType + ", " + Destination + "]";}}
其包含了3个信息:调用者,命令类型和命令参数。
还记的《消息队列NetMQ 原理分析1-Context和ZObject》中我们介绍过NetMQ中的命令类型吗?待处理命令全部会存放着Socket的信箱中。当Socket有命令(连接完成、发送完成或接受完成等)需要处理时调用基类ZObject的SendCommand方法。
private void SendCommand([NotNull] Command cmd){m_ctx.SendCommand(cmd.Destination.ThreadId, cmd);}

ZObject实际调用Context的SendCommand方法
public void SendCommand(int threadId, [NotNull] Command command){m_slots[threadId].Send(command);}
m_slots[threadId]保存的是当前IO线程的IO信箱IOThreadMailbox,在《消息队列NetMQ 原理分析2-IO线程和完成端口》
我们简单介绍了IOThreadMailbox的结构。
[NotNull] private readonly YPipe<Command> m_commandPipe = new YPipe<Command>(Config.CommandPipeGranularity, "mailbox");
IOThreadMailbox中维护这一个Command管道,该管道实际就是一个先进先出队列,详细解析会在第四章进行介绍。
public void Send(Command command){bool ok;lock (m_sync){//向管道写入命令m_commandPipe.Write(ref command, false);//成功写入会返回false,表示有命令需要处理ok = m_commandPipe.Flush();}if (!ok){//向完成端口传递信号m_proactor.SignalMailbox(this);}}public bool TryRecv(out Command command){return m_commandPipe.TryRead(out command);}public void RaiseEvent(){if (!m_disposed){m_mailboxEvent.Ready();}}
IOThreadMailbox的主要就是这三个方法
1. 当有命令来的时候调用Send方法向管道(队列)写入命令。写完时,会向完成端口传递信号。
2. 当有命令需要处理时调用TryRecv方法读取
3. 当完成端口接收到信号需要命令处理时,调用RaiseEvent(实际是信箱的IO线程的RaiseEvent方法)进行处理命令。
public void SignalMailbox(IOThreadMailbox mailbox){//该方法会向完成端口的队列中插入一个信号状态m_completionPort.Signal(mailbox);}
有关于完成端口介绍请查看《消息队列NetMQ 原理分析2-IO线程和完成端口》
当有命令需要处理时,完成端口会接收到信号。
private void Loop(){...int timeout = ExecuteTimers();int removed;if (!m_completionPort.GetMultipleQueuedCompletionStatus(timeout != 0 ? timeout : -1, completionStatuses, out removed))continue;for (int i = 0; i < removed; i++){try{if (completionStatuses[i].OperationType == OperationType.Signal){var mailbox = (IOThreadMailbox)completionStatuses[i].State;mailbox.RaiseEvent();}...}...}...}
在线程轮询方法Loop中,当接收到需要处理的数据时,首先会判断是否是信号,若为信号,则将状态(参数)转化为IOThreadMailbox类型,同时调用RaiseEvent方法处理命令。
public void Ready(){Command command;while (m_mailbox.TryRecv(out command))command.Destination.ProcessCommand(command);}
当有命令需要处理时,会调用IOThreadMailbox的TryRecv方法从管道(队列,先进先出)中获取第一个命令进行处理。
在介绍回收线程工作之前,我们先看下创建一个新的Socket做了哪些工作,这里的Socket实际是NetMQ中的SocketBase。
RequestSocket socket = new RequestSocket();socket.Connect("tcp://127.0.0.1:12345");
NetMQSocket是NetMQ的Socket的基类。
public RequestSocket(string connectionString = null) : base(ZmqSocketType.Req, connectionString, DefaultAction.Connect){}
internal NetMQSocket(ZmqSocketType socketType, string connectionString, DefaultAction defaultAction){m_socketHandle = NetMQConfig.Context.CreateSocket(socketType);m_netMqSelector = new NetMQSelector();Options = new SocketOptions(this);m_socketEventArgs = new NetMQSocketEventArgs(this);Options.Linger = NetMQConfig.Linger;if (!string.IsNullOrEmpty(connectionString)){var endpoints =connectionString.Split(new[] {','}, StringSplitOptions.RemoveEmptyEntries).Select(a => a.Trim()).Where(a=> !string.IsNullOrEmpty(a));foreach (string endpoint in endpoints){if (endpoint[0] == '@'){Bind(endpoint.Substring(1));}else if (endpoint[0] == '>'){Connect(endpoint.Substring(1));}else if (defaultAction == DefaultAction.Connect){Connect(endpoint);}else{Bind(endpoint);}}}}
首先会根据Socket的类型创建对应的Socket,调用的是Context的CreateSocket方法。具体的请看创建SocketBase。最终创建方法是调用SocketBase的Create方法
public static SocketBase Create(ZmqSocketType type, [NotNull] Ctx parent, int threadId, int socketId){switch (type){...case ZmqSocketType.Req:return new Req(parent, threadId, socketId);...default:throw new InvalidException("SocketBase.Create called with invalid type of " + type);}}
创建完后,就对地址进行解析。若有多个地址,则可用,分隔。
var endpoints =connectionString.Split(new[] {','}, StringSplitOptions.RemoveEmptyEntries).Select(a => a.Trim()).Where(a=> !string.IsNullOrEmpty(a));
解析完成后则用默认的方式进行绑定或连接,如RequestSocket默认为连接,而ResponseSocket则为绑定。
首先对地址进行解析,判断当前是tcp还是其他协议。然后会根据协议类型创建对应的Socket,具体的协议类型分析请查看《消息队列NetMQ 原理分析6-TCP和Inpoc实现》
private static void DecodeAddress([NotNull] string addr, out string address, out string protocol){const string protocolDelimeter = "://";int protocolDelimeterIndex = addr.IndexOf(protocolDelimeter, StringComparison.Ordinal);protocol = addr.Substring(0, protocolDelimeterIndex);address = addr.Substring(protocolDelimeterIndex + protocolDelimeter.Length);}
负载均衡选择一个IO线程。
Session,Socket和Session的关系如图所示 
Socket和Session,如上图所示。创建管道完毕后需要设置管道的回调事件,管道1设置回调为Socket的回调方法,管道2设置为Session的回调方法。具体关于
Session和Pipe的内容请查看《消息队列NetMQ 原理分析4-Session、Option和Pipe》。
Socket和Session的关系
protected void LaunchChild([NotNull] Own obj){// Specify the owner of the object.obj.SetOwner(this);// Plug the object into the I/O thread.SendPlug(obj);// Take ownership of the object.SendOwn(this, obj);}
Session的宿主设置为该Socket
private void SetOwner([NotNull] Own owner){Debug.Assert(m_owner == null);m_owner = owner;}
Session,当管道有数据交互时,Session的回调方法就会触发。
protected void SendPlug([NotNull] Own destination, bool incSeqnum = true){if (incSeqnum)destination.IncSeqnum();SendCommand(new Command(destination, CommandType.Plug));}
SessionBase的ProcessPlug会被触发
protected override void ProcessPlug(){m_ioObject.SetHandler(this);if (m_connect)StartConnecting(false);}
Session加入到Socket的Session集合中,
protected void SendOwn([NotNull] Own destination, [NotNull] Own obj){destination.IncSeqnum();SendCommand(new Command(destination, CommandType.Own, obj));}
SocketBase的父类方法SendOwn(Own方法)方法会被触发,将Session加入到集合中
protected override void ProcessOwn(Own obj){...// Store the reference to the owned object.m_owned.Add(obj);}
首先对地址进行解析,判断当前是tcp还是其他协议。然后会根据协议类型创建对应的Socket,具体的协议类型分析请查看《消息队列NetMQ 原理分析6-TCP和Inpoc实现》
private static void DecodeAddress([NotNull] string addr, out string address, out string protocol){const string protocolDelimeter = "://";int protocolDelimeterIndex = addr.IndexOf(protocolDelimeter, StringComparison.Ordinal);protocol = addr.Substring(0, protocolDelimeterIndex);address = addr.Substring(protocolDelimeterIndex + protocolDelimeter.Length);}
负载均衡选择一个IO线程。
处理Socket和Session的关系
protected void LaunchChild([NotNull] Own obj){// Specify the owner of the object.obj.SetOwner(this);// Plug the object into the I/O thread.SendPlug(obj);// Take ownership of the object.SendOwn(this, obj);}
Listener的宿主设置为该Socket
private void SetOwner([NotNull] Own owner){Debug.Assert(m_owner == null);m_owner = owner;}
Listener,当管道有数据交互是,Listener的回调方法就会触发。
protected void SendPlug([NotNull] Own destination, bool incSeqnum = true){if (incSeqnum)destination.IncSeqnum();SendCommand(new Command(destination, CommandType.Plug));}
Listener的ProcessPlug会被触发
protected override void ProcessPlug(){m_ioObject.SetHandler(this);m_ioObject.AddSocket(m_handle);//接收异步socketAccept();}
Listener加入到Socket的Listener集合中,
protected void SendOwn([NotNull] Own destination, [NotNull] Own obj){destination.IncSeqnum();SendCommand(new Command(destination, CommandType.Own, obj));}
SocketBase的父类方法SendOwn(Own方法)方法会被触发,将Listener加入到集合中
protected override void ProcessOwn(Own obj){...// Store the reference to the owned object.m_owned.Add(obj);}
SocketBase的创建处理就完成了
(垃圾)回收线程是专门处理(清理)异步关闭的Socket的线程,它在NetMQ中起到至关重要的作用。
internal class Reaper : ZObject, IPollEvents{...}
Reaper是一个ZObject对象,同时实现了IPollEvents接口,该接口的作用是当有信息接收或发送时进行处理。回收线程实现了InEvent方法。
internal interface IPollEvents : ITimerEvent{void InEvent();void OutEvent();}
InEvent方法实现和IO线程的Ready方法很像,都是遍历需要处理的命令进行处理。
public void InEvent(){while (true){Command command;if (!m_mailbox.TryRecv(0, out command))break;command.Destination.ProcessCommand(command);}}
public Reaper([NotNull] Ctx ctx, int threadId): base(ctx, threadId){m_sockets = 0;m_terminating = false;string name = "reaper-" + threadId;m_poller = new Utils.Poller(name);m_mailbox = new Mailbox(name);m_mailboxHandle = m_mailbox.Handle;m_poller.AddHandle(m_mailboxHandle, this);m_poller.SetPollIn(m_mailboxHandle);}
Poller对象,用于轮询回收SocketBase。Mailbox对象用于Command的收发
internal class Mailbox : IMailbox{...}
MailBox和IO线程的IOThreadMailbox一样,实现了IMailbox接口。
当有SocketBase需要释放时,会向完成端口发送Reap信号。
public void Close(){// Mark the socket as disposedm_disposed = true;//工作线程向Socket邮箱发送Reap信号//回收线程会做剩下的工作SendReap(this);}
向回收线程的邮箱发送当前SocketBase的回收命令
protected void SendReap([NotNull] SocketBase socket){SendCommand(new Command(m_ctx.GetReaper(), CommandType.Reap, socket));}
Reap接收到释放信号进行处理
protected override void ProcessReap(SocketBase socket){// Add the socket to the poller.socket.StartReaping(m_poller);++m_sockets;}
Socket的加入到回收线程的中,当Socket接收到数据时,由回收线程回调该Socket的处理事件进行处理。
internal void StartReaping([NotNull] Poller poller){m_poller = poller;m_handle = m_mailbox.Handle;m_poller.AddHandle(m_handle, this);m_poller.SetPollIn(m_handle);Terminate();CheckDestroy();}
Socket时,直接终止即可默认情况下
NetMQ的Linger值被设置为-1,就是说如果网络读写没有进行完是不能退出的。如果Linger被设置为0,那么中断时会丢弃一切未完成的网络操作。如果Linger被设置的大于0,那么将等待Linger毫秒用来完成未完成的网络读写,在指定的时间里完成或者超时都会立即返回。
Session,则需要发送请求清理关联Socket的当前Session对象
protected void Terminate(){...if (m_owner == null){// 释放的是Socket,Owner为空ProcessTerm(m_options.Linger);}else{// 释放的是Session则会关联一个SocketSendTermReq(m_owner, this);}}
SocketBase时,需要先中断当前SocketBase关联的SessionBaseSession集合Session全部终止后发送给当前Socket宿主终端响应(TermAck)
protected override void ProcessTerm(int linger){...// 断开所有session的连接foreach (Own it in m_owned){SendTerm(it, linger);}RegisterTermAcks(m_owned.Count);m_owned.Clear();CheckTermAcks();}
SessionLigner大于0 则等到N毫秒后再终止终止Socket和Session之间的管道
protected override void ProcessTerm(int linger){if (m_pipe == null){ProceedWithTerm();return;}m_pending = true;if (linger > 0){Debug.Assert(!m_hasLingerTimer);m_ioObject.AddTimer(linger, LingerTimerId);m_hasLingerTimer = true;}// 是否需要等待一定时间后消息处理完再终止管道.m_pipe.Terminate(linger != 0);// TODO: Should this go into pipe_t::terminate ?// In case there's no engine and there's only delimiter in the// pipe it wouldn't be ever read. Thus we check for it explicitly.m_pipe.CheckRead();}
管道状态如下所示
private enum State{/// <summary> Active 表示在中断命令开始前的状态 </summary>Active,/// <summary> Delimited 表示在终端命令接收前从管道接收到分隔符</summary>Delimited,/// <summary> Pending 表示中断命令已经从管道接收,但是仍有待定消息可读</summary>Pending,/// <summary> Terminating 表示所有待定消息都已经读取等待管道终止确认信号返回 </summary>Terminating,/// <summary> Terminated 表示终止命令是由用户显示调用 </summary>Terminated,/// <summary> Double_terminated 表示用户调用了终止命令同时管道也调用了终止命令 </summary>DoubleTerminated}
终止当前管道
若当前状态为Terminated、DoubleTerminated和Terminating不再处理终止命令
public void Terminate(bool delay){//判断当前状态是否可处理终止命令...if (m_state == State.Active){// 向另一个管道发送终止命令然后等待确认终止SendPipeTerm(m_peer);m_state = State.Terminated;}else if (m_state == State.Pending && !m_delay){// 若有待处理数据,但是不等待直接终止,则向另一个管道发送确认终止.m_outboundPipe = null;SendPipeTermAck(m_peer);m_state = State.Terminating;}else if (m_state == State.Pending){//若有待处理数据但是需要等到则不处理.}else if (m_state == State.Delimited){//若已经获取到限定符但是还没有收到终止命令则忽略定界符,然后发送终止命令给另一个管道SendPipeTerm(m_peer);m_state = State.Terminated;}else{// 没有其他状态Debug.Assert(false);}//停止向外发送的消息m_outActive = false;if (m_outboundPipe != null){//抛弃未发送出的消息.Rollback();// 这里不会再先查水位,所以即使管道满了也可再写入,向管道写入定界符 .var msg = new Msg();msg.InitDelimiter();m_outboundPipe.Write(ref msg, false);Flush();}}
protected override void ProcessPipeTerm()
{
// 这是一个简单的例子有道管道终止
//若没有更多待处理消息需要读取,或者这个管道已经丢去待处理数据,我们直接将状态设置为正在终止(terminating),否则我们搁置待处理状态直到所有待处理消息被发送
if (m_state == State.Active)
{
if (!m_delay)
{
//不需要等到消息处理
m_state = State.Terminating;
m_outboundPipe = null;
//发送终止确认
SendPipeTermAck(m_peer);
}
else
m_state = State.Pending;
return;
}
// 若定界符碰巧在终止命令之前到达,将状态改为正在终止
if (m_state == State.Delimited)
{
m_state = State.Terminating;
m_outboundPipe = null;
SendPipeTermAck(m_peer);
return;
}
// 当管道并发关闭,则状态改为DoubleTerminated
if (m_state == State.Terminated)
{
m_state = State.DoubleTerminated;
m_outboundPipe = null;
SendPipeTermAck(m_peer);
return;
}
// pipe_term is invalid in other states.
Debug.Assert(false);
}
确认终止
protected override void ProcessPipeTermAck(){// 通知Socket或Session中断当前管道 .Debug.Assert(m_sink != null);m_sink.Terminated(this);// 若正则处理或double_terminated这里不做任何事// 简化释放管道,在已终止状态,我们必须在释放这个管道之前确认//其他状态都是非法的if (m_state == State.Terminated){m_outboundPipe = null;SendPipeTermAck(m_peer);}elseDebug.Assert(m_state == State.Terminating || m_state == State.DoubleTerminated);// 删除所有管道中的未读消息,然后释放流入管道var msg = new Msg();while (m_inboundPipe.TryRead(out msg)){msg.Close();}m_inboundPipe = null;}
整体回收Socket流程图如下:

public virtual void InEvent(){// 回收线程命令会调用此事件try{ProcessCommands(0, false);}catch{// ignored}finally{CheckDestroy();}}
private void CheckDestroy(){// socket释放完则做最后的清除和释放工作.if (m_destroyed){// 从回收线程移除轮询m_poller.RemoveHandle(m_handle);// 释放socke.DestroySocket(this);// 通知已释放.SendReaped();// Deallocate.base.ProcessDestroy();}}
该篇介绍命令处理方式和回收线程回收Socket,顺便介绍了下创建SocketBase的细节性问题。以便对释放Socket有更清晰的认识。