[关闭]
@zero1036 2017-09-04T14:46:52.000000Z 字数 2677 阅读 2537

RabbitMQ Client连接方式分析

RabbitMQ


Socket长连接or长轮询

参考源码可知,RabbitMQ Client是通过javax.net.SocketFactory建立tcp长连接,实现consumer client端与mq server端通讯。通过订阅方法被动从Queue中拉取消息,伪代码如下:

  1. ConnectionFactory factory = new ConnectionFactory();
  2. Connection conn = factory.newConnection();
  3. Channel channel=conn.createChannel();

注意:
创建Connection需要指定MQ的物理地址和端口,是socket > > tcp物理连接,而channel是一个逻辑的概念,支持在tcp连接上创建多个MQ 。


关键步骤源码分析

创建TCP连接

创建Socket核心源码如下:

  1. import javax.net.SocketFactory;
  2. public class SocketFrameHandlerFactory extends AbstractFrameHandlerFactory {
  3. private final SocketFactory factory;
  4. ...
  5. public SocketFrameHandlerFactory(int connectionTimeout, SocketFactory factory, SocketConfigurator configurator, boolean ssl, ExecutorService shutdownExecutor) {
  6. super(connectionTimeout, configurator, ssl);
  7. this.factory = factory;
  8. this.shutdownExecutor = shutdownExecutor;
  9. }
  10. public FrameHandler create(Address addr) throws IOException {
  11. String hostName = addr.getHost();
  12. int portNumber = ConnectionFactory.portOrDefault(addr.getPort(), this.ssl);
  13. Socket socket = null;
  14. try {
  15. //通过javax.net.SocketFactory建立tcp长连接
  16. socket = this.factory.createSocket();
  17. this.configurator.configure(socket);
  18. socket.connect(new InetSocketAddress(hostName, portNumber), this.connectionTimeout);
  19. return this.create(socket);
  20. } catch (IOException var6) {
  21. quietTrySocketClose(socket);
  22. throw var6;
  23. }
  24. }
  25. ...
  26. }

读取消息

创建Connection连接后,通过启动MainLoop后台线程,无间断从socket(FrameHandler)中获取数据包Frame。注意readFrame()可能抛出SocketTimeoutException超时异常。但注意,socket读取超时并非mq实例的连接超时,是从socket中读取字节流的等待时间。这个超时时间默认为10秒,由AMQConnection.HANDSHAKE_TIMEOUT常量指定。源码简化后逻辑如下:

  1. private class MainLoop implements Runnable {
  2. private MainLoop() {
  3. }
  4. public void run() {
  5. //while(true)实现无限循环
  6. while(true) {
  7. try {
  8. //退出条件是:AMQConnection状态非运行时,获取捕捉异常
  9. if(AMQConnection.this._running) {
  10. Frame ex = AMQConnection.this._frameHandler.readFrame();
  11. AMQConnection.this.readFrame(ex);
  12. continue;
  13. }
  14. } catch (Throwable var5) {
  15. ...
  16. }
  17. return;
  18. }
  19. }
  20. }
  21. class SocketFrameHandler implements FrameHandler{
  22. public Frame readFrame() throws IOException {
  23. //通过DataInputStream实现数据获取
  24. DataInputStream var1 = this._inputStream;
  25. synchronized(this._inputStream) {
  26. int type;
  27. try {
  28. // 从“数据输入流”中读取“无符号的Byte类型”的值,即读取值为正数的byte值
  29. type = is.readUnsignedByte();
  30. } catch (SocketTimeoutException var6) {
  31. //注意:readFrame()此处会抛出SocketTimeoutException,socket读取超时并非mq实例的连接超时,是从socket中读取字节流的等待时间,默认为10秒,由`AMQConnection.HANDSHAKE_TIMEOUT`常量指定
  32. return null;
  33. }
  34. if(type == 65) {
  35. protocolVersionMismatch(is);
  36. }
  37. //读取出输入流中的UTF-8数据的长度
  38. int channel = is.readUnsignedShort();
  39. // 从“数据输入流”中读取“int类型”的值
  40. int payloadSize = is.readInt();
  41. byte[] payload = new byte[payloadSize];
  42. // 从“数据输入流”中读取“long类型”的值
  43. is.readFully(payload);
  44. int frameEndMarker = is.readUnsignedByte();
  45. if(frameEndMarker != 206) {
  46. throw new MalformedFrameException("Bad frame end marker: " + frameEndMarker);
  47. } else {
  48. return new Frame(type, channel, payload);
  49. }
  50. }
  51. }
  52. }

更多关于DataInputStream技巧参考:java io系列14之 DataInputStream(数据输入流)的认知、源码和示例

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注