SystemV行列步队的报文构造哀求如下:
{ long type;------------------------------------------------------类型 XXXX msg_payload;-----------------------------------------内容}sysV_msg_t;
现有约五六个用户进程,每个进程背后是一个自己的业务功能,每个进程都须要随时能访问对方的资源,这样便是说每个进程都向其他进程供应做事,也须要其他进程为自己供应做事,以是也便是C/S机制,而且每个进程对其他进程的访问都可能是同步的,也便是说须要被访问者给予访问者以回答,当然也可以是异步的;除此以外,每个进程可能会有不同的线程,它们大概都会向某一进程发起访问。
基于以上的哀求,在采取systemV办法的条件下,须要一种机制能够知足以上的哀求,根据systemV接口特点,操持如下:

systemV办法的行列步队以行列步队ID、类型为区分,根据这个特点,可以让须要访问其他进程资源的每一个线程(把稳是线程)注册一个行列步队ID,作为其自身的“通讯地址”,每次访问其他进程后,被访问者都要以该ID的行列步队回答给这个访问者线程,同时根据访问资源的不同,确定不同的类型,这样就可以将收发报文清晰的区分,详细如下:
上面说了,每一个线程都要注册一个行列步队ID,为了能够统一的管理,让每一个线程都知道给供应做事的是谁,怎么发送给他,须要一个机制,我们把形成这个机制所需的函数放在一个叫mipc.c的文件里,并且把该文件编成一个动态库给所有进程连接,这里的第一个机制便是注册,所谓注册便是把本线程的信息注册在一个条款表项中,这个表项的构造是:
typedef struct{ int flag_is_hold;---------------------------------------------------标志位,表示是否是有效条款 pthread_t thread_id;--------------------------------------------所在线程ID,主要区分标识 char name[MIPC_NAME_MAX_LEN];-----------------------条款名称,主要区分标识 int mqd;------------------------------------------------------------行列步队ID} MIPC_THREAD_INFO_T;
这样,当线程A须要访问另一进程的线程B的资源时,只需知道线程B的条款名称即可,就可以查表知道它的行列步队ID,这样就能够知道该把发到哪个行列步队,这里每个线程的条款名称是预知的。
大概你会问,动态库对付不同进程是同一份拷贝,那为什么线程A还可以查表获取另一个进程的线程B的条款内容,这是须要通过一个收发包进程实现,线程A须要发给另一个进程的线程B时,首先须要发行列步队给收发包进程,而发送报文的内容包括线程B的条款名称,由收发包进程解析报文内容获取线程A发送报文的目的地是那里,然后由收发包进程再把发送给线程B;线程B吸收报文并做处理后,须要回答给线程A,也是发送给收发包进程,并由收发包进程解析报文内容获取到报文目的地是线程A,再把报文发送给线程A,线程A就可以收到另一个进程的线程B的回答了。这样一轮收发行列步队,就实现了线程A和另一个进程的线程B的同步访问。
下面是目前规定的报文格式:
typedef struct sysV_msg{ long type; sysV_msg_payload_t msg_payload;}sysV_msg_t;sysV_msg_t是顶层的行列步队报文格式:前四个字节type标识了发送线程发送的行列步队类型,同时也可以是吸收线程要吸收的行列步队类型,这是MIPC能达到不同进程多个MIPC条款做到同步通讯的根本;msg_payload是报文内容,见接下来的描述:typedef struct sysV_msg_payload{ char src_name[MIPC_NAME_MAX_LEN]; char dst_name[MIPC_NAME_MAX_LEN]; MIPC_MSG_HDR_T msg_hdr; unsigned char payload[MIPC_MSG_MAX_LEN];}sysV_msg_payload_t;
更多linux内核视频教程文档资料免费领取后台私信【内核】自行获取。
sysV_msg_payload_t是报文详细内容的构造格式:
src_name填入发送线程注册时的条款名称;dst_name是报文发送目的地吸收线程注册时的名称;msg_hdr是报文负载的头,记录该报文类型、长度、是否须要回答;payload是报文实际负载,目前规定最大长度为2048字节。
typedef struct{ unsigned int msg_type; //-1:notice(add a new item); other:api_id unsigned int msg_size; int sync_flag;//0:no sync, >0:sync, timeout value; -1:sync, wait forever} MIPC_MSG_HDR_T;
msg_hdr是报文负载的头,记录该报文类型、长度、是否须要回答
每个进程的每个须要访问其他进程的线程,都须要注册一个MIPC条款,否则发起的进程间通讯将被丢弃掉,注册函数为mipc_init,注册过程是给收发中转进程发送一个分外的,其代码片段如下:
while(root_msg_mqd < 0){ root_msg_mqd = msgget(MIPC_MSG_ROOT_KEY, 0);}
root_msg_mqd是收发中转进程的行列步队ID,收发中转进程必须首先启动建立这个行列步队,否则其他进程无法通过MIPC通讯;当该行列步队建立好后,其他进程将给收发中转进程发送分外哀求注册一个MIPC条款;
memset(&mipc_info, 0, sizeof(MIPC_THREAD_INFO_T));mipc_info.flag_is_hold = MIPC_TRUE;mipc_info.thread_id = pthread_self();memcpy(mipc_info.name ,name, strlen(name));mipc_info.mqd = mipc_mq_create();
上面这些便是要发送给收发中转进程的分外的注册,其本色便是把要注册的内容按前面描述过的MIPC_THREAD_INFO_T构造类型写入报文的实际负载中,包括置位有效标志、写入自己的线程ID、写入自己的条款名称、写入自己的吸收行列步队ID,函数mipc_mq_create便是调用msgget系统调用从内核申请一个行列步队ID;
msgq->type = MIPC_MSG_TYPE_NOTICE;memcpy(msgq->msg_payload.src_name, name, strlen(name));memcpy(msgq->msg_payload.dst_name, "mipc_main", strlen("mipc_main"));msgq->msg_payload.msg_hdr.msg_size = sizeof(MIPC_THREAD_INFO_T);msgq->msg_payload.msg_hdr.msg_type = MIPC_MSG_TYPE_NOTICE;msgq->msg_payload.msg_hdr.sync_flag = 0;memcpy(msgq->msg_payload.payload, &mipc_info, sizeof(MIPC_THREAD_INFO_T));
上面这段是填写报文的头的过程,在类型域中写入MIPC_MSG_TYPE_NOTICE,这是个分外的类型标识是要注册一个条款;src_name域填写要注册条款标名称,这也是mipc_init函数的唯一参数;dst_name域填写mipc_main,这是预设的中转收发进程的名字;然后报文长度为构造MIPC_THREAD_INFO_T的大小;报文类型同样填写MIPC_MSG_TYPE_NOTICE;同步标识置为0意味中转收发进程收到后无需回答,由于没有太大回答的必要;末了把注册的内容复制到实际负载域中。
ize += msgq->msg_payload.msg_hdr.msg_size;ret = mipc_mq_send(root_msg_mqd, buffer, size, 0);
末了把全体报文长度重新更新好,并发送,函数mipc_mq_send实际调用系统调用msgsnd发送行列步队;中转收发进程在收到后就把该条款内容写入表中用于往后查询。
仅仅上面这些是不足的,由于一个进程可能会有多个线程都要发起对其他进程的访问,也便是每个线程都须要注册一个条款,如果每个线程发起访问时还须要吸收回答,那么它必须知道自己所在线程条款标行列步队ID,每个线程很难做到随时可以找出属于自己的行列步队ID,以是还须要再次注册在属于本进程的一个条款表,如下:
user_thread_mipc_info_t user_thread_info[MIPC_USER_INFO_NUM];ret = mipc_user_thread_info_register(name, mipc_info.mqd, pthread_self());
这个方法利用的是,库代码的数据段,对付,每个进程都有一份拷贝,以是每个进程都利用该函数配置libmipc.so中的全局变量数值user_thread_info[MIPC_USER_INFO_NUM]是不会冲突的,即是各自进程独占的;mipc_user_thread_info_register函数实现很大略,仍旧是操作线程ID、行列步队ID、条款名称、有效标志的一个表,代码片段如下:
for (index = 0; index < MIPC_USER_INFO_NUM; index++){ if (!user_thread_info[index].flag) {undefined find_flag = 1; break; }}
在这个全局变量数组中找到一个空的条款,该数值预设条款个数为10,该当肯定能知足每个进程的须要;
if (find_flag){ if (src_name) memcpy(user_thread_info[index].myname, src_name, strlen(src_name)); user_thread_info[index].mqd = mqd; user_thread_info[index].thread_id = tid; user_thread_info[index].flag = 1; return MIPC_OK;}return MIPC_ERROR;
如果真的找不到空余条款,则只能返回缺点即不能增加条款;正常情形下是可以找到的,这时就把线程ID、行列步队ID、条款名称、有效标志写入该数组条款即可。
上面就完成了全体注册过程,所注册的线程在须要访问其他进程时,只须要首先查询自己的行列步队ID,然后以被访问线程的条款名称为参数就可以实现进程间通讯
每个进程都有自己的一个唯一的server,它是该进程唯一的server,由一个线程用MIPC条款吸收处理对它的访问,也便是说,其他所有线程多该进程的访问都必须访问该server所在的MIPC条款,访问该进程其他MIPC条款无法得到精确的回答;每个进程的server所在MIPC条款代码片段如下:
osTaskCreate(&apm_mq_thread, "apm_mq_thread", (GLFUNCPTR) apm_mq_recv, 0, 0, 47 - 1,0x2800) != IAMBA_OK)
每个进程都必须先建立一个线程,然后在这个线程内再建立MIPC条款用于吸收处理所有对该进程的访问,上面便是先建立一个线程apm_mq_recv,其函数内容片段如下:
int mipc_fd = mipc_init("new_apm");
首先建立一个MIPC条款,这便是之后用于吸收处理对该进程全部访问的MIPC条款;
mthread_register_mq(mipc_fd, apm_new_mipc_server_handler, &thread_index_apm, &mq_index_apm);
然后调用函数mthread_register_mq注册收到报文后的hook处理函数,这里处理函数是apm_new_mipc_server_handler;
while(1){ size = mipc_mq_receive(mipc_fd, buffer, MIPC_DEFAULT_MAX_MSG_SIZE, 0, MIPC_MSG_SYNC_WAIT_FOREVER); if (size > 0) {undefined mthread_check_mq_msg(thread_index_apm, mq_index_apm, (char )msgq, size); memset(buffer, 0, sizeof(buffer)); }}
最后进入永久循环,将壅塞地吸收所有发给该MIPC条款标行列步队ID的全部报文,把稳函数mipc_mq_receive的第四个参数为0,意为吸收任何发送给该行列步队ID的第一个报文,也便是不区分类型;每收到报文都调用上一步注册的hook处理函数去处理报文。实际的处理函数,便是每个进程自己的server api函数,这些函数末了都会根据报文中的src_name、msg_type确定该当以什么样的类型回答给哪一个行列步队ID,这样做到了报文不会稠浊,不会导致须要吸收回答的收不到,不须要吸收回答的反而收到的征象。
末了描述一下收发中转进程,之以是须要改进程,是由于,每个发起进程间通讯的MIPC条款所在线程,无法获悉目的进程的行列步队ID,由于每个进程依赖调用动态库libmipc.so建立条款,而库代码的数据段对付每个进程是独占的,以是它只能知道自己进程内部线程所建立的条款,无法知道其他进程的条款,以是将无法实现跨进程间的通讯;这就须要一个能够掩护所有条款表的东西,这便是收发中转进程,由它来记录掩护全部注册的条款,而发起进程间通讯的任何线程只需知道被访问者的MIPC条款名称即可,下面是收发中转进程的代码片段:
osTaskCreate(&mipc_process_thread, "mipc_process_thread", (GLFUNCPTR) mipc_process, 0, 0, 47 - 1, 0x2800) != IAMBA_OK)
首先建立一个线程只用于处理报文,主线程只卖力吸收,如下;
while (1){ size = msgrcv(root_msg_mqd, (void )buffer, MIPC_DEFAULT_MAX_MSG_SIZE, 0, 0); if (size > 0) {undefined ret = mipc_msg_check(buffer, size); if (ret) {undefined fprintf(stderr, "wrong mpic msg\n\r"); continue; } //fprintf(stderr, "mipc_main: recv an msg, size:%d!\n\r", size); mipc_msg_list_add(buffer, size); }}
主线程以壅塞办法吸收所有发给它的报文,对报文进行大略判断后就加入到报文吸收链表中,这样是为了提高效率,避免报文被积压在内核行列步队中,函数mipc_msg_list_add代码片段如下:
list_mipc_msg_t newmsg = (list_mipc_msg_t )malloc(sizeof(list_mipc_msg_t));INIT_LIST_HEAD(&newmsg->list);memcpy(newmsg->buffer, buffer, size);newmsg->size = size;pthread_mutex_lock(&mipc_msg_sem);LIST_ADD_TAIL(&newmsg->list, &mipc_msg_list.list);mipc_msg_list.addnum++;mipc_msg_list.curnum++;pthread_mutex_unlock(&mipc_msg_sem);
为每一个报文都动态申请一个报文构造,然后把它加入到链表尾部,并更新统计值;
报文处理线程的代码片段如下;
while(1){ if (LIST_EMPTY(&mipc_msg_list.list)) usleep(50); else {undefined mipc_msg_list_del(buffer, &size); ret = mipc_msg_process(buffer, size); if (ret) {undefined fprintf(stderr, "mipc msg process error\n\r"); } memset(buffer, 0, MIPC_DEFAULT_MAX_MSG_SIZE); }}
它在报文吸收链表不为空时,取出报文链表头部开始的第一个报文,将其拷贝到一个固定缓存处,同时开释该报文动态申请的空间,调用函数mipc_msg_process处理报文,报文的处理办法非常大略,除了分外的注册和解注册外,别的都是根据报文的目的MIPC名称查表找到对应的行列步队ID并发送。
示例1:
下面是一个同步进程间通讯的示例:
某两个线程已经分别注册了MIPC条款,名称分别为"main_apm"和"alarm_apm",现在条款名称为"main_apm"对"alarm_apm"发起进程间通讯,调用测试函数mipc_ping:
mipc_ping("alarm_apm", MIPC_MSG_SYNC_WAIT_FOREVER);
第二个参数MIPC_MSG_SYNC_WAIT_FOREVER的意思是“将一贯等待直到收到回答为止”,mipc_ping的实现如下:
mipc_send_sync_msg(dest_name, MIPC_MSG_TYPE_PING, NULL, 0, &result, sizeof(int), timeout);
mipc_ping的实现便是调用函数mipc_send_sync_msg,参数dest_name便是"alarm_apm"即被访问线程的条款名称。
第二个参数代码了报文类型为“PING”,第三个参数是实际负载的指针,这里为NULL表示实际负载无需任何内容,第四个参数是实际负载的长度,这里为0表示实际负载无需任何内容,第5个参数代表用于吸收回答的指针,这里是个int型变量result的地址,第6个参数代表吸收回答的长度,这里是int型变量的长度;
下面是函数mipc_send_sync_msg的实现片段:
mipc_user_thread_info_find(src_name, &mqd, pthread_self());
首先调用函数mipc_user_thread_info_find,找到自己所在线程的MIPC条款标条款名称和行列步队ID,依赖的便是自己的线程ID(这个可以随时获取到),通过查表找到;
mipc_clear_resp_msq(src_name, mqd, api_id);
然后根据自己的行列步队ID和所要发送报文的类型,先将发给自己的这样的报文全部读取干净,这是为了避免已有的发给自己的这样类型的报文和一会收到的回答报文稠浊;
mipc_send_msg(src_name, dest_name, api_id, inMsg, inSize, timeout)
接下来是发送报文,把稳函数mipc_send_msg实际是把报文发送给收发中转进程,由于它不知道被访问进程对应的行列步队ID,它只能把自己的MIPC名称和被访问进程的MIPC名称作为索引,由收发中转进程去判断该当发给哪个行列步队;
mipc_receive_response_msg(src_name, mqd, api_id, outMsg, outSize, timeout)
末了是等待回答,该函数实际内容是通过自己的MIPC条款名称得出自己的MIPC条款行列步队ID,吸收发给该行列步队ID且报文类型为api_id的,这样就避免了收错;当收到回答报文后把报文的实际负载内容取出拷贝到吸收回答的地址处,这样就完成了一次同步的进程间通讯;其余可以设置超市价,默认为-1即永久等待;
中转收发进程在收到后,根据报文中指示的dst_name查表得出该当发往的行列步队ID并立即发送,这时相应进程的server处理线程就会收到该报文,并调用其hook处理函数处理,任何hook处理函数的子分支都会根据报文中的msg_type域找到该当调用的下一级处理函数,处理后得出返回值填入报文的实际负载,以对“PING”类型的报文处理为例:
case MIPC_MSG_TYPE_PING:{ int result = 0; mipc_response_msg(pMsg->msg_payload.src_name, pMsg->msg_payload.dst_name, msgq_header->msg_type, &result, sizeof(int)); return MIPC_OK;}
上面便是每个进程的server处理线程的hook处理函数对付“PING“的处理,它调用函数mipc_response_msg发送回答报文,同样发送给中转收发进程并且把回答内容填入实际负载,这里便是一个值为0的int型变量,符合发送者的哀求;调用结束后,server处理完毕。
示例2:
下面的例子更加实用,以利用最多的中间件midware为例,在文件midware_mipc_client.c文件中包含了每一个访问midware进程资源的client api函数,以个中一个为例:
ONU_STATUS midware_insert_entry(MIDWARE_TABLE_ID_E table_id, VOID entry, UINT32 size){ midware_insert_entry_IN_T input; midware_insert_entry_OUT_T output; memset(&output, 0, sizeof(output)); input.table_id = table_id; if (NULL != entry) memcpy(&input.entry, entry, size); input.size = size; // Call message queue or socket to send out the parameters if (MIPC_OK != mipc_send_sync_msg("midware", 1, &input, sizeof(input), &output, sizeof(output), -1)) {undefined printf("%s: failed to send message\n", __FUNCTION__); } return output.ret;}
任何一个注册了MIPC条款标线程都可以调用该api函数访问midware进程,每一个api都是在调用mipc_send_sync_msg函数实现访问midware进程,第一个参数标识了被访问MIPC条款标名称,第二个参数是类型也是报文类型,第三、第四个参数分别是输入参数内容及长度,第五、第六参数是吸收回答的地址和长度,它们都是要被填入实际负载域;
由2.3节已知server进程是如何收到报文和大致的处理流程,下面是处理细节:
if (MIPC_ERROR != mipc_receive_msg(source_module, &api_id, inMsg)){ // Call APIs midware_mipc_server_call(source_module, dest_module, api_id, msgq->msg_payload.payload);}…………………….void midware_mipc_server_call(char source_module, char dest_module, int api_id, char input){ switch (api_id) {undefined case 1: {undefined midware_insert_entry_IN_T in = (midware_insert_entry_IN_T)input; midware_insert_entry_OUT_T out; //实际实行须要做的处理 out.ret = midware_insert_entry(in->table_id, (void)&in->entry, in->size); //把回答内容回答给发送者 // send back everyting in out data structure if (MIPC_OK != mipc_response_msg(source_module, dest_module, 1, &out, sizeof(out))) {undefined printf("%s: failed to send response message\n", __FUNCTION__); } break; }
首先通过调用函数mipc_receive_msg解析出报文的发送MIPC条款名称,这个是用来回复报文利用的;如何根据报文类型得出该当实行哪个详细处理函数,这里只列出case 1的情形,事实上有非常多的报文类型,在实行完毕后将回答内容作为参数调用函数mipc_response_msg,回答内容将被填入实际负载并发送给中转收发进程,中转收发进程会把该报文发给正在壅塞等待的访问midware进程的线程MIPC条款。
总结:
上面是整改后marvell OMCI SDK内部利用的进程间通讯办法的描述,目前测试过程中运转正常,GPON也可以注册,但整改过程大幅度增删了原有代码,以是还有待于进一步考验。这种进程间通讯办法的效率并不高(进程A到进程B须要发送两次行列步队,如果须要吸收回答则须要发送四次行列步队),但总体而言比较安全稳定,实现了类似我们1.45代码的RPC办法的功能,即任何代码只要注册了client都可以向其他进程的server发送访问,担保了顺序访问同一进程,此外中转收发进程的报文吸收和处理分开,最大程度的提高效率避免积压。