[关闭]
@khan-lau 2016-04-20T15:43:30.000000Z 字数 9046 阅读 3919

Mosquitto源码分析

C++


  mosquitto是一款实现了消息推送协议MQTT v3.1的开源消息代理软件,提供轻量级的,可支持发布/可订阅的消息推送模式。

出版/订阅模式

  出版/订阅模式定义了如何向一 个节点发布和订阅消息,这些节点被称作主题(topic)。主题可以被认为是消息的传输中介,发布者(publisher)发布消息到主题,订阅者 (subscriber) 从主题订阅消息。这种模式使得消息订阅者和消息发布者保持互相独立,不需要接触即可保证消息的传送。

该模式使用场景

  TCP协议提供一对一的可靠传输,每个TCP连接由五个元素唯一确定:源IP地址、源端口号、目的IP地址、目的端口、通信协议。但在实际过程中,通信的终端数目是多个,需要维持的通信关系是多对多的。此时,每个参与通信的客户端所需维持的链接数将很庞大,而出版/订阅模式就是一种解决方法。它通过增加一个中间件的方式简化问题,让中间件维护这种多对多的关系。

mosquitto源码中重要的数据结构

  1)struct mosquitto用来保存一个客户端连接的所有信息,如用户名、密码、用户ID、向该客户端发送的消息等。(在mosquitto_internal.h定义)

  1. struct mosquitto {
  2. #ifndef WIN32
  3. int sock;
  4. # ifndef WITH_BROKER
  5. int sockpairR, sockpairW;
  6. # endif
  7. #else
  8. SOCKET sock;
  9. # ifndef WITH_BROKER
  10. SOCKET sockpairR, sockpairW; //sockpairR用来在超市前跳出select()
  11. # endif
  12. #endif
  13. enum _mosquitto_protocol protocol;
  14. char *address;
  15. char *id;
  16. char *username;
  17. char *password;
  18. uint16_t keepalive;
  19. bool clean_session;
  20. enum mosquitto_client_state state;
  21. time_t last_msg_in;
  22. time_t last_msg_out;
  23. time_t ping_t;
  24. uint16_t last_mid;
  25. struct _mosquitto_packet in_packet;
  26. struct _mosquitto_packet *current_out_packet; //当前处理(发送)的节点
  27. struct _mosquitto_packet *out_packet; // 待发送的packet队列的头结点
  28. struct mosquitto_message *will;
  29. #ifdef WITH_TLS
  30. SSL *ssl;
  31. SSL_CTX *ssl_ctx;
  32. char *tls_cafile;
  33. char *tls_capath;
  34. char *tls_certfile;
  35. char *tls_keyfile;
  36. int (*tls_pw_callback)(char *buf, int size, int rwflag, void *userdata);
  37. int tls_cert_reqs;
  38. char *tls_version;
  39. char *tls_ciphers;
  40. char *tls_psk;
  41. char *tls_psk_identity;
  42. bool tls_insecure;
  43. #endif
  44. bool want_write;
  45. #if defined(WITH_THREADING) && !defined(WITH_BROKER)
  46. pthread_mutex_t callback_mutex;
  47. pthread_mutex_t log_callback_mutex;
  48. pthread_mutex_t msgtime_mutex;
  49. pthread_mutex_t out_packet_mutex; //标记是否存在待发送的packet队列
  50. pthread_mutex_t current_out_packet_mutex;//标记当前是否有packet要发送
  51. pthread_mutex_t state_mutex;
  52. pthread_mutex_t in_message_mutex;
  53. pthread_mutex_t out_message_mutex;
  54. pthread_t thread_id;
  55. #endif
  56. #ifdef WITH_BROKER
  57. bool is_bridge;
  58. struct _mqtt3_bridge *bridge;
  59. struct mosquitto_client_msg *msgs;
  60. struct mosquitto_client_msg *last_msg;
  61. int msg_count;
  62. int msg_count12;
  63. struct _mosquitto_acl_user *acl_list;
  64. struct _mqtt3_listener *listener;
  65. time_t disconnect_t;
  66. int pollfd_index;
  67. int db_index;
  68. struct _mosquitto_packet *out_packet_last;
  69. bool is_dropping;
  70. #else
  71. void *userdata;
  72. bool in_callback;
  73. unsigned int message_retry;
  74. time_t last_retry_check;
  75. struct mosquitto_message_all *in_messages;
  76. struct mosquitto_message_all *in_messages_last;
  77. struct mosquitto_message_all *out_messages;
  78. struct mosquitto_message_all *out_messages_last;
  79. void (*on_connect)(struct mosquitto *, void *userdata, int rc);
  80. void (*on_disconnect)(struct mosquitto *, void *userdata, int rc);
  81. void (*on_publish)(struct mosquitto *, void *userdata, int mid);
  82. void (*on_message)(struct mosquitto *, void *userdata, const struct mosquitto_message *message);
  83. void (*on_subscribe)(struct mosquitto *, void *userdata, int mid, int qos_count, const int *granted_qos);
  84. void (*on_unsubscribe)(struct mosquitto *, void *userdata, int mid);
  85. void (*on_log)(struct mosquitto *, void *userdata, int level, const char *str);
  86. //void (*on_error)();
  87. char *host;
  88. int port;
  89. int in_queue_len;
  90. int out_queue_len;
  91. char *bind_address;
  92. unsigned int reconnect_delay;
  93. unsigned int reconnect_delay_max;
  94. bool reconnect_exponential_backoff;
  95. bool threaded;
  96. struct _mosquitto_packet *out_packet_last; //标记待发送packet队列的末尾一个元素
  97. int inflight_messages;
  98. int max_inflight_messages;
  99. # ifdef WITH_SRV
  100. ares_channel achan; // ares_init函数中初始化,返回一个交流通道
  101. # endif
  102. #endif
  103. };

  2)struct mosquitto_db在mosquitto_broker.h中定义,是mosquitto对所有内部数据的统一管理结构,可以认为是其内部的一个内存数据库,它保存了所有客户端,所有客户端的订阅关系等。

  1. struct mosquitto_db{
  2. dbid_t last_db_id;
  3. struct _mosquitto_subhier subs;
  4. struct _mosquitto_unpwd *unpwd;
  5. struct _mosquitto_acl_user *acl_list;
  6. struct _mosquitto_acl *acl_patterns;
  7. struct _mosquitto_unpwd *psk_id;
  8. struct mosquitto **contexts; //各个客户的情况,相当于数组,每创建一个客户就增加空间
  9. struct _clientid_index_hash *clientid_index_hash;
  10. int context_count; //初始值为1,与客户端的数量相关
  11. struct mosquitto_msg_store *msg_store;
  12. int msg_store_count;
  13. struct mqtt3_config *config;
  14. int persistence_changes;
  15. struct _mosquitto_auth_plugin auth_plugin;
  16. int subscription_count;
  17. int retained_count; //当前系统中retain标记的消息的条数
  18. };

  3)struct _mosquitto_subhier在mosquitto_broker.h中定义, 用于保存订阅树的节点(包括叶子节点和中间节点),mosquitto对订阅树采用孩子-兄弟链表法的方式进行存储,该存储方式主要借助于数据结构struct _mosquitto_subhier来完成。

  1. struct _mosquitto_subhier {
  2. struct _mosquitto_subhier *children; //第一个孩子节点
  3. struct _mosquitto_subhier *next; //下一个兄弟节点
  4. struct _mosquitto_subleaf *subs; //订阅列表
  5. char *topic; //该节点对应的Topic片段
  6. struct mosquitto_msg_store *retained;//该topic下被retain标记的消息
  7. };

  4)struct _mosquitto_subleaf 在mosquitto_broker.h中定义,程序中,对某一个Topic的所有订阅者被组织成一个订阅列表,该订阅列表是一个双向链表,链表的每一个节点都保存有一个订阅者,该链表的节点即是:struct _mosquitto_subleaf

  1. struct _mosquitto_subleaf {
  2. struct _mosquitto_subleaf *prev;
  3. struct _mosquitto_subleaf *next;
  4. struct mosquitto *context;
  5. int qos;
  6. };

  5)struct mqtt3_config在mosquitto_broker.h中定义, 用于保存mosquitto 的所有配置信息, mosquitto程序在启动时将初始化该结构体并从配置文件中读取配置信息保存于该结构体变量内。

  1. struct mqtt3_config {
  2. char *config_file;
  3. char *acl_file;
  4. bool allow_anonymous;
  5. bool allow_duplicate_messages;
  6. bool allow_zero_length_clientid;
  7. char *auto_id_prefix;
  8. int auto_id_prefix_len;
  9. int autosave_interval;
  10. bool autosave_on_changes;
  11. char *clientid_prefixes;
  12. bool connection_messages;
  13. bool daemon;
  14. struct _mqtt3_listener default_listener;
  15. struct _mqtt3_listener *listeners;
  16. int listener_count;
  17. int log_dest;
  18. int log_type;
  19. bool log_timestamp;
  20. char *log_file;
  21. FILE *log_fptr;
  22. int message_size_limit;
  23. char *password_file;
  24. bool persistence;
  25. char *persistence_location;
  26. char *persistence_file;
  27. char *persistence_filepath;
  28. time_t persistent_client_expiration;
  29. char *pid_file;
  30. char *psk_file;
  31. bool queue_qos0_messages;
  32. int retry_interval;
  33. int store_clean_interval;
  34. int sys_interval;
  35. bool upgrade_outgoing_qos;
  36. char *user;
  37. bool verbose;
  38. #ifdef WITH_BRIDGE
  39. struct _mqtt3_bridge *bridges;
  40. int bridge_count;
  41. #endif
  42. char *auth_plugin;
  43. struct mosquitto_auth_opt *auth_options;
  44. int auth_option_count;
  45. };

  6)struct _sub_token 在src/subs.c文件中定义,是将Topic分成Topic片段组成链表节点的基本结构。

  1. struct _sub_token {
  2. struct _sub_token *next;
  3. char *topic;
  4. };

重要的数据结构(回顾)

  虽然在上一篇中做了详细介绍,但是为了后面的代码分析的方便,还是在这里列出这几个数据结构的功能。

NO struct名称 作用
1 mosquitto 用来保存一个客户端连接的所有信息,如用户名、密码、用户ID、向该客户端发送的消息等。
2 mosquitto_db 是mosquitto对所有内部数据的统一管理结构,可以认为是其内部的一个内存数据库,它保存了所有客户端,所有客户端的订阅关系等待
3 mosquitto_subhier 用于保存订阅树的节点(包括叶子节点和中间节点),mosquitto对订阅树采用孩子-兄弟链表法的方式进行存储,该存储方式主要借助于数据结构struct mosquitto_subhier来完成。
4 mosquitto_subleaf 在程序中,对某一个Topic的所有订阅者被组织成一个订阅列表,该订阅列表是一个双向链表,链表的每一个节点都保存有一个订阅者,该链表的节点即是:struct mosquitto_subleaf
5 mqtt3_config 用于保存mosquitto 的所有配置信息, mosquitto程序在启动时将初始化该结构体并从配置文件中读取配置信息保存于该结构体变量内。
6 _sub_token 在src/subs.c文件中定义,也是十分重要的数据结构,组成字符串topic拆分成topic片段后的链表节点的结构。

订阅树的概念

  Mosquitto通过订阅树的方式来管理所有的topic以及客户端的订阅关系,它首先将所有的topic按照/分割并组织成一棵树结构,从根节点到树中的每个节点即组成该节点所对应的一个topic,每个topic都保存一个订阅列表,该订阅列表中保存了所有订阅当前topic的客户端信息。例如有如下订阅关系:

User Operation
客户端a1,a2,a3 订阅了topic:A1/B1/C1m
客户端b1,b2 订阅了topic:A2/B2/C2
客户端c1,c2 订阅了topic:A1/B1/C3
客户端d1 订阅了topic:A2/B3

则上述订阅树如图。
此处输入图片的描述

  Mosquitto程序在实现中根据topic消息的性质将订阅树分为两颗子树:业务子树和系统子树;mosquitto程序中将topic分为两种类型来处理:系统topic和业务topic,前者主要用于发布和维护mosquitto内部的系统消息,后者的topic是用户订阅的业务topic,做这种区分的原因是因为这两种的类型的topic性质和实现方式上有许多差别,这种差别主要体现在以下3点:

1)生存周期不同,系统topic无论是否有用户订阅都会存在与订阅树中,而业务topic必须有客户端订阅才能存在(除非其消息字段retain设置为1)。

2)创建方式不同,系统topic在消息发布时进行创建,业务topic即可以在订阅时创建也可以在消息发布时创建(此时需要该消息retain字段设置为1)。

3)消息保存方式不同,凡是发布到系统topic的消息都会被保存下来,业务消息将直接挂到订阅列表的各context的消息队列中,如果没有连接订阅或未设置其retain字段,消息将不会被保存下来,消息的retain字段是否被设置在函数mqtt3_handle_publish进行检查。

订阅树的创建:(在src/database.c中的mqtt3_db_open函数实现)

  mosquitto程序启动时将创建订阅树,该过程将创建三个节点:订阅树总根节点、业务子树根节点和系统子树根节点,这两个子树根节点作为订阅树总根节点的两个子节点,其中订阅树总根节点和业务子树的根节点中topic成员的值为空字符串,而系统子树根节点中保存的值为“$SYS”,如图:

此处输入图片的描述

  订阅树采用孩子兄弟链表法保存,确切来说应该是说业务子树和系统子树都是采用孩子兄弟链表法保存,而这两个节点还是作为总树根节点的两个子节点。

搭建订阅树

1) 系统子树搭建过程
  Mosquitto中,系统子树在发布系统消息时,自动检测topic片段是否存在,如果不存在则在系统topic上创建节点以搭建订阅树。搭建过程如下:
  将Topic按照“/"分成 topic片段;根据第一个topic片段“$SYS”遍历订阅树的子节点找到系统子树的根节点;根据topic下一个片段查找系统子树,若没有则创建这个节点,依次方案处理直至topic片段解析完。
  所用到的函数调用: mqtt3_db_messages_easy_queue(在src/database.c中) --->mqtt3_db_messages_queue (在src/subs.c中) ---> _sub_add(在src/subs.c中)

2)业务子树搭建过程
  分为两种类型:订阅时创建和消息发布时创建。后者与系统Topic的方式类似。前者在收到订阅请求后将该客户端挂到对应的业务子树节点的订阅列表中,若不存在客户端所订阅的Topic,则会自动为之添加相应节点。
所用到的函数调用: mqtt3_handle_subscribe(在src/read_handle_server.c中) --->mqtt3_sub_add(在src/subs.c中) --->_sub_add(在src/subs.c中)

  可以看到,在上面都使用了_sub_add函数,而调用它的分别是mqtt3_db_messages_queue 和mqtt3_sub_add函数,而且这三个函数都是在src/subs.c中,不妨来看看它们的逻辑。
mqtt3_db_messages_queue:(系统子树的搭建)

此处输入图片的描述

mqtt3_sub_add(业务子树的搭建)
此处输入图片的描述

_sub_add:
此处输入图片的描述

  这几张图把订阅树的构建的大致逻辑勾勒出来。可以看见业务子树和系统子树的搭建大致逻辑相似,但是在局部处理上还是有区别,最大的区别就是如果创建业务子树的时候如果有没有找到topic片段,则会向订阅树中添加相应节点,而创建系统子树时则不会(原因见前面提到的二者的区别的第二点)。
  还有就是这里在根据用户发布的topic(一个字符串)来在树结构中查询,用到了一个技巧,就是调用_sub_topic_tokenise函数将这个字符串分解,并组成一个链表的形式,然后通过遍历这个链表逐步完成对树结构的查询、添加等工作,最后释放掉这个链表。这里的链表就相当于一个缓冲区,值得借鉴。
例如一个Topic: year/month/day
就被转换为如下链表结构:
此处输入图片的描述

  在client/sub_client.c文件中,有一个函数my_message_callback,为了弄清楚一个新创建的客户端内部定义的那些纷繁的变量,笔者希望把这些信息都打印出来。而这个函数my_message_callback非常合适。从其命名就可以知道它是在订阅客户端收到消息后的回调处理函数,在这里将所有收到的信息都打印出来再合适不过。

  笔者在这个函数中加入相应打印信息后编译,但编译不成功,具体报错如下:
此处输入图片的描述

  百度“invalid application of 'sizeof' to incomplete type 'struct mosquitto_client_msg'”,度娘告诉我们这是由于该结构体没有被定义的原因。笔者一看头文件,果然没有。添加相应的头文件到该文件中编译,还是报错,找不到添加的头文件。当然这就和查找路径相关了。这里就有两种办法解决了:
1)根据该文件已有的头文件的位置判断其头文件搜素路径,然后把添加的头文件添加进去。
2)把添加的头文件的路径增加到该文件的头文件搜素路径中。
笔者最开始选择后者,在cient/Makefile中添加了指定的路径,编译后还是报错:

  1. Makefile:21: *** missing separator (did you mean TAB instead of 8 spaces?). Stop.”

  原来Makefile文件规定,添加的命令语句前面要以Tab键开头。
  但修改后仍然不行。后来通过仔细阅读Makefile文件才发现,它是将该文件搜索的路径整个编译后作为一个.so文件提供使用(刚开始没仔细看,仅仅在搜索路径下添加),所以,这里不能使用第二种方法,只能选择第一种方法。此时编译通过,即可根据自己的要求来打印想看到的东西了。

  下图中框出来的部分是笔者在源码的基础上自己添加的打印:
此处输入图片的描述

  因为笔者刚开始猜测订阅端在多次接收到同一个主题的消息时会将所有消息以队列的形式保存在本地,所以,在打印时用到了while循环,但通过实际打印的效果来看,笔者的猜想是错误的,仅仅打印了每次接受到的那一条信息。

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注