1.4.x之前的版本可以参考博客,使用的是树来实现:
mosquito从版本1.5.x开始,主题订阅的数据结构有变化。采用哈希表来存储。每一层都有一个哈希表来存储。
/src/database.c
这里初始化了两个主题,一个是业务主题“”,为空;另一个是系统主题“$SYS”
函数sub__add_hier_entry很重要,新增哈希key-value都靠它来实现。
int db__open(struct mosquitto__config *config, struct mosquitto_db *db) { struct mosquitto__subhier *subhier; if(!config || !db) return MOSQ_ERR_INVAL; db->last_db_id = 0; db->contexts_by_id = NULL; db->contexts_by_sock = NULL; db->contexts_for_free = NULL; #ifdef WITH_BRIDGE db->bridges = NULL; db->bridge_count = 0; #endif // Initialize the hashtable db->clientid_index_hash = NULL; db->subs = NULL; subhier = sub__add_hier_entry(NULL, &db->subs, "", strlen(""));//业务子树根节点 if(!subhier) return MOSQ_ERR_NOMEM; subhier = sub__add_hier_entry(NULL, &db->subs, "$SYS", strlen("$SYS"));//系统子树根节点 if(!subhier) return MOSQ_ERR_NOMEM; subs.c,会创建内存 child = mosquitto__malloc(sizeof(struct mosquitto__subhier)); struct mosquitto__subhier *sub__add_hier_entry(struct mosquitto__subhier *parent, struct mosquitto__subhier **sibling, const char *topic, size_t len) { struct mosquitto__subhier *child; assert(sibling); child = mosquitto__malloc(sizeof(struct mosquitto__subhier)); if(!child){ log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); return NULL; } child->parent = parent; child->topic_len = len; if(UHPA_ALLOC_TOPIC(child) == 0){ child->topic_len = 0; mosquitto__free(child); log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); return NULL; }else{ strncpy(UHPA_ACCESS_TOPIC(child), topic, child->topic_len+1); } child->subs = NULL; child->children = NULL; child->retained = NULL; if(child->topic_len+1 > sizeof(child->topic.array)){ if(child->topic.ptr){ HASH_ADD_KEYPTR(hh, *sibling, child->topic.ptr, child->topic_len, child); }else{ mosquitto__free(child); return NULL; } }else{ HASH_ADD(hh, *sibling, topic.array, child->topic_len, child); } return child; }
截图1
每一层都有一个哈希表来存储。
例如,系统消息的主题表示为:
$SYS/broker/version
$SYS/broker/uptime
$SYS/broker/clients/total
$SYS/broker/clients/inactive
$SYS/broker/clients/disconnected
哈希表结构我用草图绘制,如截图2所示
截图2
subs.c源文件的static int sub__add_recurse()函数是核心所在,负责添加哈希元素。
对于业务主题(非系统主题)的订阅,其流程如下,请观看截图3的下半部分,有详细的函数跳转流程:
截图3
订阅的最终目标就是执行语句HASH_ADD(hh, *sibling, topic.array, child->topic_len, child); 往哈希表添加元素!