@zero1036
2017-09-04T14:46:52.000000Z
字数 2677
阅读 2529
RabbitMQ
参考源码可知,RabbitMQ Client是通过javax.net.SocketFactory建立tcp长连接,实现consumer client端与mq server端通讯。通过订阅方法被动从Queue中拉取消息,伪代码如下:
ConnectionFactory factory = new ConnectionFactory();
Connection conn = factory.newConnection();
Channel channel=conn.createChannel();
注意:
创建Connection需要指定MQ的物理地址和端口,是socket > > tcp物理连接,而channel是一个逻辑的概念,支持在tcp连接上创建多个MQ 。
创建Socket核心源码如下:
import javax.net.SocketFactory;
public class SocketFrameHandlerFactory extends AbstractFrameHandlerFactory {
private final SocketFactory factory;
...
public SocketFrameHandlerFactory(int connectionTimeout, SocketFactory factory, SocketConfigurator configurator, boolean ssl, ExecutorService shutdownExecutor) {
super(connectionTimeout, configurator, ssl);
this.factory = factory;
this.shutdownExecutor = shutdownExecutor;
}
public FrameHandler create(Address addr) throws IOException {
String hostName = addr.getHost();
int portNumber = ConnectionFactory.portOrDefault(addr.getPort(), this.ssl);
Socket socket = null;
try {
//通过javax.net.SocketFactory建立tcp长连接
socket = this.factory.createSocket();
this.configurator.configure(socket);
socket.connect(new InetSocketAddress(hostName, portNumber), this.connectionTimeout);
return this.create(socket);
} catch (IOException var6) {
quietTrySocketClose(socket);
throw var6;
}
}
...
}
创建Connection连接后,通过启动MainLoop后台线程,无间断从socket(FrameHandler)中获取数据包Frame。注意readFrame()可能抛出SocketTimeoutException超时异常。但注意,socket读取超时并非mq实例的连接超时,是从socket中读取字节流的等待时间。这个超时时间默认为10秒,由AMQConnection.HANDSHAKE_TIMEOUT
常量指定。源码简化后逻辑如下:
private class MainLoop implements Runnable {
private MainLoop() {
}
public void run() {
//while(true)实现无限循环
while(true) {
try {
//退出条件是:AMQConnection状态非运行时,获取捕捉异常
if(AMQConnection.this._running) {
Frame ex = AMQConnection.this._frameHandler.readFrame();
AMQConnection.this.readFrame(ex);
continue;
}
} catch (Throwable var5) {
...
}
return;
}
}
}
class SocketFrameHandler implements FrameHandler{
public Frame readFrame() throws IOException {
//通过DataInputStream实现数据获取
DataInputStream var1 = this._inputStream;
synchronized(this._inputStream) {
int type;
try {
// 从“数据输入流”中读取“无符号的Byte类型”的值,即读取值为正数的byte值
type = is.readUnsignedByte();
} catch (SocketTimeoutException, var6) {
//注意:readFrame()此处会抛出SocketTimeoutException,socket读取超时并非mq实例的连接超时,是从socket中读取字节流的等待时间,默认为10秒,由`AMQConnection.HANDSHAKE_TIMEOUT`常量指定
return null;
}
if(type == 65) {
protocolVersionMismatch(is);
}
//读取出输入流中的UTF-8数据的长度
int channel = is.readUnsignedShort();
// 从“数据输入流”中读取“int类型”的值
int payloadSize = is.readInt();
byte[] payload = new byte[payloadSize];
// 从“数据输入流”中读取“long类型”的值
is.readFully(payload);
int frameEndMarker = is.readUnsignedByte();
if(frameEndMarker != 206) {
throw new MalformedFrameException("Bad frame end marker: " + frameEndMarker);
} else {
return new Frame(type, channel, payload);
}
}
}
}
更多关于DataInputStream
技巧参考:java io系列14之 DataInputStream(数据输入流)的认知、源码和示例