@zero1036
2017-09-04T06:46:52.000000Z
字数 2677
阅读 2768
RabbitMQ
参考源码可知,RabbitMQ Client是通过javax.net.SocketFactory建立tcp长连接,实现consumer client端与mq server端通讯。通过订阅方法被动从Queue中拉取消息,伪代码如下:
ConnectionFactory factory = new ConnectionFactory();Connection conn = factory.newConnection();Channel channel=conn.createChannel();
注意:
创建Connection需要指定MQ的物理地址和端口,是socket > > tcp物理连接,而channel是一个逻辑的概念,支持在tcp连接上创建多个MQ 。
创建Socket核心源码如下:
import javax.net.SocketFactory;public class SocketFrameHandlerFactory extends AbstractFrameHandlerFactory {private final SocketFactory factory;...public SocketFrameHandlerFactory(int connectionTimeout, SocketFactory factory, SocketConfigurator configurator, boolean ssl, ExecutorService shutdownExecutor) {super(connectionTimeout, configurator, ssl);this.factory = factory;this.shutdownExecutor = shutdownExecutor;}public FrameHandler create(Address addr) throws IOException {String hostName = addr.getHost();int portNumber = ConnectionFactory.portOrDefault(addr.getPort(), this.ssl);Socket socket = null;try {//通过javax.net.SocketFactory建立tcp长连接socket = this.factory.createSocket();this.configurator.configure(socket);socket.connect(new InetSocketAddress(hostName, portNumber), this.connectionTimeout);return this.create(socket);} catch (IOException var6) {quietTrySocketClose(socket);throw var6;}}...}
创建Connection连接后,通过启动MainLoop后台线程,无间断从socket(FrameHandler)中获取数据包Frame。注意readFrame()可能抛出SocketTimeoutException超时异常。但注意,socket读取超时并非mq实例的连接超时,是从socket中读取字节流的等待时间。这个超时时间默认为10秒,由AMQConnection.HANDSHAKE_TIMEOUT常量指定。源码简化后逻辑如下:
private class MainLoop implements Runnable {private MainLoop() {}public void run() {//while(true)实现无限循环while(true) {try {//退出条件是:AMQConnection状态非运行时,获取捕捉异常if(AMQConnection.this._running) {Frame ex = AMQConnection.this._frameHandler.readFrame();AMQConnection.this.readFrame(ex);continue;}} catch (Throwable var5) {...}return;}}}class SocketFrameHandler implements FrameHandler{public Frame readFrame() throws IOException {//通过DataInputStream实现数据获取DataInputStream var1 = this._inputStream;synchronized(this._inputStream) {int type;try {// 从“数据输入流”中读取“无符号的Byte类型”的值,即读取值为正数的byte值type = is.readUnsignedByte();} catch (SocketTimeoutException, var6) {//注意:readFrame()此处会抛出SocketTimeoutException,socket读取超时并非mq实例的连接超时,是从socket中读取字节流的等待时间,默认为10秒,由`AMQConnection.HANDSHAKE_TIMEOUT`常量指定return null;}if(type == 65) {protocolVersionMismatch(is);}//读取出输入流中的UTF-8数据的长度int channel = is.readUnsignedShort();// 从“数据输入流”中读取“int类型”的值int payloadSize = is.readInt();byte[] payload = new byte[payloadSize];// 从“数据输入流”中读取“long类型”的值is.readFully(payload);int frameEndMarker = is.readUnsignedByte();if(frameEndMarker != 206) {throw new MalformedFrameException("Bad frame end marker: " + frameEndMarker);} else {return new Frame(type, channel, payload);}}}}
更多关于DataInputStream技巧参考:java io系列14之 DataInputStream(数据输入流)的认知、源码和示例