摘自:https://blog.csdn.net/jasonchen_gbd/article/details/46055885
上一篇文章介绍了ubus的组件和实现原理,本文通过代码实例介绍使用ubus进行进程间通信的三种方式。
1. invoke的方式实现端对端通信
最简单的情景就是一个提供服务的server端,一个请求服务的client端,client请求server的服务。 下面的例子中,server注册了一个名为“scan_prog”的对象,该对象中提供一个“scan”方法: ubus_invoke.h:#ifndef __UBUS_INVOKE_H__ #define __UBUS_INVOKE_H__ #include <json/json.h> #include <libubox/blobmsg_json.h> struct prog_attr { char name[64]; int chn_id; }; #define PROG_MAX 8 #endif /* __UBUS_INVOKE_H__ */
invoke_server.c:
1 #include <libubox/uloop.h> 2 #include <libubox/ustream.h> 3 #include <libubox/utils.h> 4 #include <libubus.h> 5 #include <json/json.h> 6 #include <libubox/blobmsg_json.h> 7 #include "ubus_invoke.h" 8 9 static struct ubus_context * ctx = NULL; 10 static struct blob_buf b; 11 static const char * sock_path; 12 13 static struct prog_attr uri_list[PROG_MAX] = 14 { 15 {"program_beijing", 1}, 16 {"program_guangzhou", 2}, 17 {"program_shanghai", 3}, 18 {"", -1}, 19 }; 20 21 enum 22 { 23 SCAN_CHNID, 24 SCAN_POLICY_MAX, 25 }; 26 27 static const struct blobmsg_policy scan_policy[SCAN_POLICY_MAX] = { 28 [SCAN_CHNID] = {.name = "chnID", .type = BLOBMSG_TYPE_INT32}, 29 }; 30 31 static int ubus_start_scan(struct ubus_context *ctx, struct ubus_object *obj, 32 struct ubus_request_data *req, const char *method, 33 struct blob_attr *msg) 34 { 35 int ret = -1; 36 void * json_list = NULL; 37 void * json_uri = NULL; 38 int idx; 39 40 blob_buf_init(&b, 0); 41 42 struct blob_attr *tb[SCAN_POLICY_MAX]; 43 blobmsg_parse(scan_policy, SCAN_POLICY_MAX, tb, blob_data(msg), blob_len(msg)); 44 45 /* 46 本例子中,如果请求特定的节目号,返回节目名称。 47 如果请求节目号是0,则返回所有节目列表。 48 */ 49 if (tb[SCAN_CHNID] != NULL) 50 { 51 int chnid = blobmsg_get_u32(tb[SCAN_CHNID]); 52 53 if (chnid == 0) 54 { 55 json_uri = blobmsg_open_array(&b, "prog_list"); 56 for (idx = 0; idx < PROG_MAX; idx++) 57 { 58 if ('\0' != uri_list[idx].name[0]) 59 { 60 json_list = blobmsg_open_table(&b, NULL); 61 blobmsg_add_string(&b, "name", uri_list[idx].name); 62 blobmsg_add_u32(&b, "channel", uri_list[idx].chn_id); 63 blobmsg_close_table(&b, json_list); 64 } 65 } 66 blobmsg_close_array(&b, json_uri); 67 ret = 0; 68 } 69 else 70 { 71 for (idx = 0; idx < PROG_MAX; idx++) 72 { 73 if ('\0' != uri_list[idx].name[0] && uri_list[idx].chn_id == chnid) 74 { 75 blobmsg_add_string(&b, "name", uri_list[idx].name); 76 ret = 0; 77 } 78 } 79 } 80 } 81 82 blobmsg_add_u32(&b, "result", ret); 83 ubus_send_reply(ctx, req, b.head); 84 85 return 0; 86 } 87 88 /* 方法列表 */ 89 static struct ubus_method scan_methods[] = 90 { 91 UBUS_METHOD("scan", ubus_start_scan, scan_policy), 92 }; 93 94 /* type目前没有实际用处 */ 95 static struct ubus_object_type scan_obj_type 96 = UBUS_OBJECT_TYPE("scan_prog", scan_methods); 97 98 static struct ubus_object scan_obj = 99 { 100 .name = "scan_prog", /* 对象的名字 */ 101 .type = &scan_obj_type, 102 .methods = scan_methods, 103 .n_methods = ARRAY_SIZE(scan_methods), 104 }; 105 106 static void ubus_add_fd(void) 107 { 108 ubus_add_uloop(ctx); 109 110 #ifdef FD_CLOEXEC 111 fcntl(ctx->sock.fd, F_SETFD, 112 fcntl(ctx->sock.fd, F_GETFD) | FD_CLOEXEC); 113 #endif 114 } 115 116 static void ubus_reconn_timer(struct uloop_timeout *timeout) 117 { 118 static struct uloop_timeout retry = 119 { 120 .cb = ubus_reconn_timer, 121 }; 122 int t = 2; 123 124 if (ubus_reconnect(ctx, sock_path) != 0) { 125 uloop_timeout_set(&retry, t * 1000); 126 return; 127 } 128 129 ubus_add_fd(); 130 } 131 132 static void ubus_connection_lost(struct ubus_context *ctx) 133 { 134 ubus_reconn_timer(NULL); 135 } 136 137 static int display_ubus_init(const char *path) 138 { 139 uloop_init(); 140 sock_path = path; 141 142 ctx = ubus_connect(path); 143 if (!ctx) 144 { 145 printf("ubus connect failed\n"); 146 return -1; 147 } 148 149 printf("connected as %08x\n", ctx->local_id); 150 ctx->connection_lost = ubus_connection_lost; 151 152 ubus_add_fd(); 153 154 /* 向ubusd注册对象和方法,供其他ubus客户端调用。 */ 155 if (ubus_add_object(ctx, &scan_obj) != 0) 156 { 157 printf("ubus add obj failed\n"); 158 return -1; 159 } 160 161 return 0; 162 } 163 164 static void display_ubus_done(void) 165 { 166 if (ctx) 167 ubus_free(ctx); 168 } 169 170 int main(int argc, char * argv[]) 171 { 172 char * path = NULL; 173 174 if (-1 == display_ubus_init(path)) 175 { 176 printf("ubus connect failed!\n"); 177 return -1; 178 } 179 180 uloop_run(); 181 182 display_ubus_done(); 183 184 return 0; 185 }
客户端代码invoke_client.c去调用server端的"scan_prog"对象的"scan"方法:
#include <libubox/uloop.h> #include <libubox/ustream.h> #include <libubox/utils.h> #include <libubus.h> #include <json/json.h> #include <libubox/blobmsg_json.h> #include "ubus_invoke.h" static struct ubus_context * ctx = NULL; static struct blob_buf b; static const char * cli_path; enum { SCAN_CHNID, SCAN_POLICY_MAX, }; static const struct blobmsg_policy scan_policy[SCAN_POLICY_MAX] = { [SCAN_CHNID] = {.name = "chnID", .type = BLOBMSG_TYPE_INT32}, }; static int timeout = 30; static bool simple_output = false; static void scanreq_prog_cb(struct ubus_request *req, int type, struct blob_attr *msg) { char *str; if (!msg) return; /* 在这里处理返回的消息。 本例子只是将返回的消息打印出来。 */ str = blobmsg_format_json_indent(msg, true, simple_output ? -1 : 0); printf("%s\n", str); free(str); } static int client_ubus_call() { unsigned int id; int ret; blob_buf_init(&b, 0); /* 需要传递的参数 */ blobmsg_add_u32(&b, scan_policy[SCAN_CHNID].name, 0); /* 向ubusd查询是否存在"scan_prog"这个对象, 如果存在,返回其id */ ret = ubus_lookup_id(ctx, "scan_prog", &id); if (ret != UBUS_STATUS_OK) { printf("lookup scan_prog failed\n"); return ret; } else { printf("lookup scan_prog successs\n"); } /* 调用"scan_prog"对象的"scan"方法 */ return ubus_invoke(ctx, id, "scan", b.head, scanreq_prog_cb, NULL, timeout * 1000); } /* ubus_invoke()的声明如下: int ubus_invoke(struct ubus_context *ctx, uint32_t obj, const char *method, struct blob_attr *msg, ubus_data_handler_t cb, void *priv, int timeout); 其中cb参数是消息回调函数,其函数类型定义为: typedef void (*ubus_data_handler_t)(struct ubus_request *req, int type, struct blob_attr *msg); 参数type是请求消息的类型,如UBUS_MSG_INVOKE,UBUS_MSG_DATA等。 参数msg存放从服务端得到的回复消息,struct blob_attr类型,同样用blobmsg_parse()来解析。 参数req保存了请求方的消息属性,其中req->priv即ubus_invoke()中的priv参数, 用这个参数可以零活的传递一些额外的数据。 */ static int client_ubus_init(const char *path) { uloop_init(); cli_path = path; ctx = ubus_connect(path); if (!ctx) { printf("ubus connect failed\n"); return -1; } printf("connected as %08x\n", ctx->local_id); return 0; } static void client_ubus_done(void) { if (ctx) ubus_free(ctx); } int main(int argc, char * argv[]) { /* ubusd创建的unix socket,默认值为"/var/run/ubus.sock" */ char * path = NULL; /* 连接ubusd */ if (UBUS_STATUS_OK != client_ubus_init(path)) { printf("ubus connect failed!\n"); return -1; } /* 调用某个ubus方法并处理返回结果 */ client_ubus_call(); client_ubus_done(); return 0; }
先执行server程序,再执行client程序,可以看到client发出请求后,server返回了相应的节目号,在client打印出了接收到的消息。 也可以通过shell命令来请求server的服务,例如: ubus call scan_prog scan '{"chnID": 0}'
这条命令和执行client程序的作用相同。
2. subscribe/notify的方式实现订阅
ubus支持以订阅的方式进行进程间通信,通信方式如下图:
被订阅者(server)又称为notifier,订阅者(client)又称为subscriber。这样一来,可以同时有多个client订阅server的某个服务,当server通过该服务广播消息时,所有client都会被通知,并执行各自的处理。
主要过程为:
server进程:
- 连接ubusd。
- 注册一个object,用于client订阅。
- server可以向订阅者广播消息。
- 等待消息。
client进程:
- 连接ubusd。
- 向server订阅object,并定义收到server的消息时的处理函数。
- 等待消息。
代码示例:下面这个例子中,server注册了一个名为“test”的对象,并定期广播消息。而client去订阅这个对象,并对server发出的消息做处理。
notify_server.c:
1 #include <unistd.h> 2 #include <libubox/blobmsg_json.h> 3 #include <libubox/uloop.h> 4 #include <libubus.h> 5 6 static struct ubus_context *ctx; 7 8 static void test_client_subscribe_cb(struct ubus_context *ctx, struct ubus_object *obj) 9 { 10 fprintf(stderr, "Subscribers active: %d\n", obj->has_subscribers); 11 } 12 13 /* 这个空的method列表,只是为了让object有个名字,这样client可以通过object name来订阅。 */ 14 static struct ubus_method test_methods[] = {}; 15 16 static struct ubus_object_type test_obj_type = 17 UBUS_OBJECT_TYPE("test", test_methods); 18 19 static struct ubus_object test_object = { 20 .name = "test", /* object的名字 */ 21 .type = &test_obj_type, 22 .subscribe_cb = test_client_subscribe_cb, 23 }; 24 25 static void notifier_main(void) 26 { 27 int ret; 28 29 /* 注册一个object,client可以订阅这个object */ 30 ret = ubus_add_object(ctx, &test_object); 31 if (ret) { 32 fprintf(stderr, "Failed to add object: %s\n", ubus_strerror(ret)); 33 return; 34 } 35 36 /* 在需要的时候,向所有客户端发送notify消息 */ 37 38 /* step1: 如果需要传递参数,则保存到struct blob_attr类型的结构体中。 */ 39 40 /* 41 int ubus_notify(struct ubus_context *ctx, struct ubus_object *obj, const char *type, struct blob_attr *msg, int timeout); 42 type是一个字符串,自定义的。msg是需要携带的参数。如果需要等待回复,timeout需设置为>=0。 43 */ 44 while (1) { 45 sleep(2); 46 /* step2: 广播notification消息。 */ 47 ubus_notify(ctx, &test_object, "say Hi!", NULL, -1); 48 } 49 } 50 51 int main(int argc, char **argv) 52 { 53 const char *ubus_socket = NULL; 54 55 uloop_init(); 56 57 ctx = ubus_connect(ubus_socket); 58 if (!ctx) { 59 fprintf(stderr, "Failed to connect to ubus\n"); 60 return -1; 61 } 62 63 ubus_add_uloop(ctx); 64 65 notifier_main(); 66 67 uloop_run(); 68 69 ubus_free(ctx); 70 uloop_done(); 71 72 return 0; 73 }
notify_client.c:客户端订阅“test”对象,在收到3次消息后,随即取消对“test”对象的订阅。
1 #include <unistd.h> 2 #include <libubox/blobmsg_json.h> 3 #include <libubox/uloop.h> 4 #include <libubus.h> 5 6 static struct ubus_context *ctx; 7 8 static int counter = 0; 9 static uint32_t obj_id; 10 static struct ubus_subscriber test_event; 11 12 static int test_notify(struct ubus_context *ctx, struct ubus_object *obj, 13 struct ubus_request_data *req, 14 const char *method, struct blob_attr *msg) 15 { 16 printf("notify handler...\n"); 17 counter++; 18 if (counter > 3) 19 ubus_unsubscribe(ctx, &test_event, obj_id); /* 取消订阅 */ 20 return 0; 21 } 22 23 static void test_handle_remove(struct ubus_context *ctx, 24 struct ubus_subscriber *obj, uint32_t id) 25 { 26 printf("remove handler...\n"); 27 } 28 29 static void subscriber_main(void) 30 { 31 int ret; 32 33 /* 通知到来时的处理函数。 */ 34 test_event.cb = test_notify; 35 test_event.remove_cb = test_handle_remove; //server主动发起删除该client的订阅的cb函数(如server退出的时候) 36 37 /* 注册test_event */ 38 ret = ubus_register_subscriber(ctx, &test_event); 39 if (ret) 40 fprintf(stderr, "Failed to add watch handler: %s\n", ubus_strerror(ret)); 41 42 /* 得到要订阅的object的id */ 43 ret = ubus_lookup_id(ctx, "test", &obj_id); 44 if (ret) 45 fprintf(stderr, "Failed to lookup object: %s\n", ubus_strerror(ret)); 46 47 /* 订阅object */ 48 ret = ubus_subscribe(ctx, &test_event, obj_id); 49 if (ret) 50 fprintf(stderr, "Failed to subscribe: %s\n", ubus_strerror(ret)); 51 } 52 53 int main(int argc, char **argv) 54 { 55 const char *ubus_socket = NULL; 56 57 uloop_init(); 58 59 ctx = ubus_connect(ubus_socket); 60 if (!ctx) { 61 fprintf(stderr, "Failed to connect to ubus\n"); 62 return -1; 63 } 64 65 ubus_add_uloop(ctx); 66 67 subscriber_main(); 68 69 uloop_run(); 70 71 ubus_free(ctx); 72 uloop_done(); 73 74 return 0; 75 }
先运行server&,注册可订阅的对象“test”,并随即每2秒向外广播通知消息。这时还没有client订阅这个对象。
运行多个client程序,由于每个client都订阅了“test”对象,则所有client都会收到server发出的消息。当client取消订阅后,则不再收到server端的消息。
event的方式实现事件通知
event机制从一对一的进程之间通信来讲,和invoke机制类似。不过event机制中,发送方不需要知道谁要接收这个消息,实际上就是一个广播消息。这类似于U盘的热插拔:当插上或拔出U盘时,内核会广播一个NETLINK事件,之后内核继续做自己的事情,而不关心谁会去监听和处理这个事件。
下面的例子中,client端同时监听了“add_device”和“rm_device”两个事件,而server端会触发“add_device”事件并携带device的一些信息发送出去。
event_client.c:
1 #include <libubox/uloop.h> 2 #include <libubox/ustream.h> 3 #include <libubox/utils.h> 4 #include <libubus.h> 5 #include <json/json.h> 6 #include <libubox/blobmsg_json.h> 7 8 static struct ubus_context * ctx = NULL; 9 static const char * cli_path; 10 11 #define UBUS_EVENT_ADD_DEVICE "add_device" 12 #define UBUS_EVENT_REMOVE_DEVICE "rm_device" 13 14 static void ubus_probe_device_event(struct ubus_context *ctx, struct ubus_event_handler *ev, 15 const char *type, struct blob_attr *msg) 16 { 17 char *str; 18 19 if (!msg) 20 return; 21 22 /* 23 在这里实现收到事件后的动作。 24 event也可以传递消息,放在msg中。 25 26 本例子只是将返回的消息打印出来。 27 */ 28 str = blobmsg_format_json(msg, true); 29 printf("{ \"%s\": %s }\n", type, str); 30 free(str); 31 } 32 33 static int client_ubus_register_events() 34 { 35 static struct ubus_event_handler listener; 36 int ret = 0; 37 38 /* 注册特定event的listener。多个event可以使用同一个listener */ 39 memset(&listener, 0, sizeof(listener)); 40 listener.cb = ubus_probe_device_event; 41 42 ret = ubus_register_event_handler(ctx, &listener, UBUS_EVENT_ADD_DEVICE); 43 if (ret) 44 { 45 fprintf(stderr, "register event failed.\n"); 46 return -1; 47 } 48 49 ret = ubus_register_event_handler(ctx, &listener, UBUS_EVENT_REMOVE_DEVICE); 50 if (ret) 51 { 52 ubus_unregister_event_handler(ctx, &listener); 53 fprintf(stderr, "register event failed.\n"); 54 return -1; 55 } 56 57 return 0; 58 } 59 60 static void ubus_add_fd(void) 61 { 62 ubus_add_uloop(ctx); 63 64 #ifdef FD_CLOEXEC 65 fcntl(ctx->sock.fd, F_SETFD, 66 fcntl(ctx->sock.fd, F_GETFD) | FD_CLOEXEC); 67 #endif 68 } 69 70 static void ubus_reconn_timer(struct uloop_timeout *timeout) 71 { 72 static struct uloop_timeout retry = 73 { 74 .cb = ubus_reconn_timer, 75 }; 76 int t = 2; 77 78 if (ubus_reconnect(ctx, cli_path) != 0) { 79 uloop_timeout_set(&retry, t * 1000); 80 return; 81 } 82 83 ubus_add_fd(); 84 } 85 86 static void ubus_connection_lost(struct ubus_context *ctx) 87 { 88 ubus_reconn_timer(NULL); 89 } 90 91 static int client_ubus_init(const char *path) 92 { 93 uloop_init(); 94 cli_path = path; 95 96 ctx = ubus_connect(path); 97 if (!ctx) 98 { 99 printf("ubus connect failed\n"); 100 return -1; 101 } 102 103 printf("connected as %08x\n", ctx->local_id); 104 ctx->connection_lost = ubus_connection_lost; 105 106 ubus_add_fd(); 107 108 return 0; 109 } 110 111 static void client_ubus_done(void) 112 { 113 if (ctx) 114 ubus_free(ctx); 115 } 116 117 int main(int argc, char * argv[]) 118 { 119 char * path = NULL; 120 121 /* 连接ubusd */ 122 if (UBUS_STATUS_OK != client_ubus_init(path)) 123 { 124 printf("ubus connect failed!\n"); 125 return -1; 126 } 127 128 /* 注册某个事件的处理函数 */ 129 client_ubus_register_events(); 130 131 uloop_run(); 132 133 client_ubus_done(); 134 135 return 0; 136 }
event_server.c:
1 #include <libubox/uloop.h> 2 #include <libubox/ustream.h> 3 #include <libubox/utils.h> 4 #include <libubus.h> 5 #include <json/json.h> 6 #include <libubox/blobmsg_json.h> 7 8 static struct ubus_context * ctx = NULL; 9 static struct blob_buf b; 10 static const char * sock_path; 11 12 static int server_ubus_send_event(void) 13 { 14 blob_buf_init(&b, 0); 15 16 /* 需要传递的参数 */ 17 blobmsg_add_u32(&b, "major", 3); 18 blobmsg_add_u32(&b, "minor", 56); 19 blobmsg_add_string(&b, "name", "mmc01"); 20 21 /* 广播名为"add_device"的事件 */ 22 return ubus_send_event(ctx, "add_device", b.head); 23 } 24 25 static int display_ubus_init(const char *path) 26 { 27 uloop_init(); 28 sock_path = path; 29 30 ctx = ubus_connect(path); 31 if (!ctx) 32 { 33 printf("ubus connect failed\n"); 34 return -1; 35 } 36 37 printf("connected as %08x\n", ctx->local_id); 38 39 return 0; 40 } 41 42 static void display_ubus_done(void) 43 { 44 if (ctx) 45 ubus_free(ctx); 46 } 47 48 int main(int argc, char * argv[]) 49 { 50 char * path = NULL; 51 52 if (-1 == display_ubus_init(path)) 53 { 54 printf("ubus connect failed!\n"); 55 return -1; 56 } 57 58 server_ubus_send_event(); 59 60 display_ubus_done(); 61 62 return 0; 63 }
先运行client &注册事件。我们同时启动两个client程序。
再执行server主动触发"add_device"事件,则凡是注册了这个事件的client都会收到该事件并执行各自的处理。
root@NVR:~# ./server connected as fdecbdc1 { "add_device": { "major": 3, "minor": 56, "name": "mmc01" } } { "add_device": { "major": 3, "minor": 56, "name": "mmc01" } }
也可以通过命令行的ubus send命令触发事件:
root@NVR:~# ubus send "rm_device" '{ "major": 3, "minor": 56, "name": "mmc01" }' { "rm_device": { "major": 3, "minor": 56, "name": "mmc01" } } { "rm_device": { "major": 3, "minor": 56, "name": "mmc01" } }
在使用ubus时,可根据实际的场景来选择用哪种方式进行进程间通信。如之前所说,ubus是为发送消息而设计的,不合适传输大量数据