Linux 进程间通信(包含一个经典的生产者消费者实例代码)

前言:编写多进程程序时,有时不可避免的需要在多个进程之间传递数据,我们知道,进程的用户的地址空间是独立,父进程中对数据的修改并不会反映到子进程中,但内核是共享的,大多数进程间通信方式都是在内核中建立一块存储区域,用来实现进程间的通信(也可以将数据写进文件,通过文件操作,但文件操作的开销会比较大)。

一.管道通信方式:管道通信具有单向,无结构,先进先出的字节流特点;管道有2个端点,一个端点写入数据,一个端点读取数据,当数据从管道中读出时,这些数据将被移走。当进程从空管道中读取数据或向已满的管道写入数据时,进程将被挂起,直到有进程向管道中写入数据或从管道中读取数据,此时,进程将由等待状态变为就绪状态。对管道的操作和对文件的操作差不多。根据管道提供的应用接口的不同,管道可分为命名管道和无名管道。

1.无名管道

#include <unistd.h>

int pipe(int fd[2])

// 创建一个无名管道fd包含2个文件描述符的数组,fd[0]用于读,fd[1]用于写若成功返回0,否则反回-1

一般某个进程用于读,另一个进程用于写,用于读的进程需要close(fd[1]),用于写的进程需要close(fd[0]);

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
int main(void)
{
pid_t pid;
int fd[],i,n;
char chr;
pipe(fd);
pid=fork();
if(pid==) //子进程
{
close(fd[]);
for(i=;i<;i++)
{
read(fd[],&chr,);
printf("%c\n",chr);
}
close(fd[]);
exit();
}
close(fd[]); //父进程
for(i=;i<;i++)
{
chr='a'+i;
write(fd[],&chr,);
sleep();
}
close(fd[]);
return ;
}

若只想读取某个进程的结果,或写入某个进程的输入可以使用popen函数,popen函数首先调用pipe创建管道,然后调用fork函数创建子进程,在子进程中调用execve函数

执行相关命令。

#include <stdio.h>

FILE *popen(const char *command,const char *type)

//  command执行的shell命令,type命令的输入输出类型(r:打开命令执行的标准输出w:打开命令执行的标准输入),成功返回文件I/O流否则返回NULL

int pclose(FILE* stream)       //返回命令执行的返回状态

#include <stdio.h>
int main()
{
FILE *fp;
fp=popen("/bin/ls -a","r");
char buf[];
int line=;
while(fgets(buf,,fp)!=NULL)
{
printf("%d: %s",line++,buf);
}
pclose(fp);
return ;
}

命名管道

命名管道作为一个特殊的文件保存在文件系统中,任意一个进程都可以对命名管道进行读写,只要进程具有相应的读写权限

#include <sys/types.h>

#include <sys/stat.h>

int mkfifo(const char *pathname,mode_t mode);

//创建命名管道 pathname文件路径名,mode存取权限,若成功返回0,否则返回-1

#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
int main()
{
mkfifo("fifo",); //创建一个命令管道,属主和用户组具有读写权限
pid_t pid;
pid=fork();
if(pid==)
{
char buf[];
int fd=open("fifo",O_RDONLY); //子进程读管道中的数据
read(fd,buf,);
buf[]=;
printf("%s",buf);
close(fd);
exit();
}
int fd=open("fifo",O_WRONLY); //父进程向管道写入数据
write(fd,"fifo test\n",);
close(fd);
return ;
}

IPC(进程间通信):IPC的内容包括信号量,消息队列和共享内存,即一个IPC对象可以是一个信号量也可以是一个消息队列也可以是一个共享内存。每一个IPC对象都有一个正整数标识,与文件描述符不同的是ipc对象的标识是全局的,用于识别整个系统中不同的ipc对象。ipc对象的标识由进程运行时决定,不是每次都相同。因此进程为了通信,不仅要知道使用的IPC对象类型,而且也需要知道ipc对象标识。但是进程在执行前并不知道IPC对象标识,下面给出2种办法

(1)创建IPC对象时,key使用IPC_PRIVATE,这样就保证返回一个新的IPC对象,然后将这一标识存放于某个文件中,其他进程便可打开文件获得该IPC对象标识

(2)创建某一个IPC对象时,key值随机使用一个数字,在不同的进程中,对于创建IPC对象的进程,根据key值得到一个IPC对象标识,对于获取IPC对象的进程,使用相同的key值就可以得到与该key值相对应的IPC对象的标识

下面一段代码说明了key值的作用

#include <stdio.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sys/types.h>
#define MY_SEM_ID 120
int main ( int argc, char *argv[] )
{ int semid;
semid=semget(MY_SEM_ID,,|IPC_CREAT);//如果把MY_SEM_ID换成IPC_PRIVATE那么每次程序的执行
printf("semid=%d",semid); //结果都不相同,若是MY_SEM_ID,则结果相同即使在不同的
return ; //进程中,即只要key相同,则返回的标识就相同 }

IPC相关API

信号量(用与多进程程序在存取共享资源时的同步控制)

#include <sys/types.h>

#include <sys/ipc.h>

#include <sys/sem.h>

int semget(key_t key,int nsems,int semflg)

函数说明:功能:获得或创建信号量,key:根据key生成信号量标识,nsems:创建的信号量集中的信号量的个数,该参数只在创建信号量集时有效。,semflg:存取权限或创建条件若为0则用于获得已存在的信号量若为IPC_CREAT|perm perm为存取权限,则用于创建信号量,成功返回信号量标识,出错返回-1

int semop(int semid,struct sembuf* sops,unsigned nsops)

函数说明:功能:获得或者释放信号量,semid:信号量标识,sops指向由sembuf组成的数组,nsops信号量的个数,成功返回0,否则返回-1

struct sembuf{

ushort sem_num;   //在信号量数组中的索引

short sem_op;       //要执行的操作,若sem_op大于0那么操作为将sem_op加入到信号量的值中,并唤醒等待信号增加的进程;若sem_op等于0,当信号量的值也是0时, 函数返回,否则阻塞直到信号量的值为0;若sem_op小于0,则判断信号量的值加上sem_op的值,如果结果为0,唤醒等待信号量为0的进程,如果小于0,调用该函数的进程阻塞,如果大于0,那么从信号量里减去这个值并返回。

short sem_flg;         //操作标致SEM_UNDO会阻塞,IPC_NOWAIT不会阻塞

};

int semctl(int semid,int semnum,int cmd,union semun arg);

函数说明:功能:在信号量集上的控制操作,semid信号量集的标识,semnum信号量集的第几个信号量,撤销信号量集时,次参数可缺省,cmd用于指定操作类别,值为GETVAL获得信号量的值,SETVAL设置信号量的值,GETPID获得最后一次操作信号量的进程,GETNCNT获得正在等待信号量的进程数,GETZCNT获得等待信号量值变为0的进程数,IPC_RMID 删除信号量或信号量数组

共享内存

共享内存是内核中的一块存储空间,这块内存被映射至多个进程的虚拟地址空间。共享内存在不同进程虚拟地址空间中映射地址未必相同。

相关API

#include <sys/ipc.h>

#include <sys/shm.h>

int shmget(key_t key,int size,int shmflg)

函数说明:功能:创建或获得共享内存,key:作用同上,size:共享内存的大小,shmflg:存取权限或创建条件,若为IPC_CREAT|perm perm为存取权限,则表示创建共享内存,为0表示获得共享内存

void * shmat(int shmid,const void *shmaddr,int shmflg)

函数说明:功能:将创建的共享内存映射到进程虚拟地址空间的某个位置,shmid:共享内存标识,shmaddr要映射到的进程虚拟空间地址,若为NULL,则由系统决定对应的地址,shmflg:指定如何使用共享内存,若指定了SHM_RDONLY位则表示以只读的方式使用此段,否则以读写的方式使用此段.成功返回映射的地址失败返回-1

int shmdt(const void* shmaddr);

函数说明:解除共享内存的映射,shmaddr:共享内存的映射地址,成功返回0,否则返回-1

int shmctl(int shmid,int cmd,struct shmid_ds *buf)

函数说明:对以存在的共享内存进行操作,shmid:共享内存标识,cmd:操作类型:cmd 为IPC_STAT 获取共享内存的状态,IPC_/SET设置共享内存的权限,IPC_RMID删除共享内存,IPC_LOCK 锁定共享内存,使共享内存不被置换出去,IPC_UNLOCK解锁

    struct shmid_ds{

    struct ipc_perm   shm_perm;   //存取权限

    int        shm_segsz; //共享内存大小

    __kernel_time_t       shm_atime;  //最后映射时间

      __kernel_time_t       shm_dtime;  //最后解除映射时间

    __kernel_time_t       shm_ctime;  //最后修改时间

    __kernel_ipc_pid_t  shm_cpid;    //创建进程ID

    __kernel_ipc_pid_t  shm_lpid;    //最近操作进程ID

    unsigned short         shm_nattch; //建立映射的进程数

    }

下面是一个经典的生产者消费者代码实例   

//semshm.h 封装了创建信号量删除信号量,P操作,V操作
#ifndef SEMSHM_H
#define SEMSHM_H #define SHM_KEY 9494
#define SEM_KEY_1 9399
#define SEM_KEY_2 9595
#define BUF_SIZE 1024
#include <string.h>
union semnum
{
int val;
struct semid_ds *buf;
unsigned short *array;
}; int sem_create(key_t key,int val) //创建一个信号量并置处值为val
{
int semid;
semid=semget(key,,|IPC_CREAT);
if(semid==-)
{
perror("semget");
exit(-);
}
union semnum arg; //联合类型的变量初始化必须用{},赋值使用 arg.val=0
arg.val=val; //设信号量的初始值
if(semctl(semid,,SETVAL,arg)==-)
{
perror("semctl");
exit(-);
}
return semid;
}
void sem_del(int semid) //删除信号量
{
union semnum arg;
arg.val=;
if(semctl(semid,,IPC_RMID,arg)==-)
{
perror("semctl");
exit(-);
}
} int P(int semid) //P操作,使信号量的值减1
{
struct sembuf sops={,-,SEM_UNDO};//第三个参数设置为SEM_UNDO时当信号量小于0时会阻塞,设置为IPC_NOWAIT则不会阻塞
return (semop(semid,&sops,)); } int V(int semid) //V操作,使信号量的值加一
{
struct sembuf sops={,+,SEM_UNDO};
return (semop(semid,&sops,));
} struct msg_data //定义一个共享内存存储的消息结构体
{
char data[BUF_SIZE];
}; void Productor(struct msg_data *msg,int i) //生产者
{
const char * str="productid:";
char *array[]={"","","","",""};
strcpy(msg->data,str);
strcat(msg->data,array[i]);
printf("Productor:%s\n",msg->data);
} void Customer(struct msg_data *msg) //消费者
{
printf("Customer:%s\n",msg->data);
}
#endif
//allsemshm.c   创建了2个信号量用于实现生产者和消费者之间的同步问题 ,并创建了一个共享内存作为共享资源
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sys/shm.h>
#include "semshm.h"
int main()
{
int empty,full,shmid;
empty=sem_create(SEM_KEY_1,);//设置一个信号量置初值为1
full=sem_create(SEM_KEY_2,); //设置一个信号量置初值为0
shmid=shmget(SHM_KEY,sizeof(struct msg_data),|IPC_CREAT);
if(shmid==-)
{
perror("shmget");
exit(-);
} printf("empty=%d\tfull=%d\tshmid=%d\n",empty,full,shmid);
return ;
}
//semshm_s.c  生产者代码
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sys/shm.h>
#include "semshm.h" int main()
{
int empty,full,shmid;
empty=semget(SEM_KEY_1,,); //获得信号量
full=semget(SEM_KEY_2,,);
shmid=shmget(SHM_KEY,,); //获得共享内存
if(empty==-||full==-||shmid==-)
{
perror("get");
exit(-);
} struct msg_data *buf;
void * tmp=shmat(shmid,NULL,); //映射共享内存
buf=(struct msg_data*)tmp;
int i=;
for(i=;i<;i++)
{
sleep();
P(empty);
Productor(buf,i);    //生产者向共享内存写入数据
V(full);
}
return ;
}
//semshm_c.c  消费者代码 
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sys/shm.h>
#include "semshm.h" int main()
{
int empty,full,shmid;
empty=semget(SEM_KEY_1,,); //获取信号量
full=semget(SEM_KEY_2,,);
shmid=shmget(SHM_KEY,,); //获取共享内粗
if(empty==-||full==-||shmid==-)
{
perror("get");
exit(-);
} struct msg_data *buf;
buf=(struct msg_data*)shmat(shmid,NULL,); //映射共享内存地址
int i=;
for(i=;i<;i++)
{
P(full);
Customer(buf); //消费者从共享内存取数据
V(empty);
} return ;
}
//delsemshm.c  //删除所建的信号量和共享内存    
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sys/shm.h>
#include "semshm.h"
int main()
{
int empty,full,shmid;
empty=semget(SEM_KEY_1,,);
full=semget(SEM_KEY_2,,);
shmid=shmget(SHM_KEY,,);
if(empty==-||full==-||shmid==-)
{
perror("del get");
exit(-);
}
sem_del(empty);
sem_del(full);
struct shmid_ds *ds=(struct shmid_ds*)malloc(sizeof(struct shmid_ds));
shmctl(shmid,IPC_RMID,ds);
printf("del success !\n");
return ;
}

运行:我们先运行allsemshm这个程序,建立信号量和共享内存,再运行semshm_s这个生产者程序(会sleep15秒),同时,运行2个semshm_c消费者程序,观察程序的输出,再调用delsemshm程序删除信号量和共享内存

运行截图

Linux 进程间通信(包含一个经典的生产者消费者实例代码)

Linux 进程间通信(包含一个经典的生产者消费者实例代码)

Linux 进程间通信(包含一个经典的生产者消费者实例代码)

分析这段代码,首先创建了empty和full2个信号量用来实现进程同步问题(这个生产者消费者只有一个缓冲去可以不考虑互斥问题,对于多个缓冲区的生产者消费者问题需要使用3个信号量完成同步和互斥操作),并创建了一个共享内存做为共享资源,接着生产者会生产5次,只有缓冲区内容被取走时才能进行生产操作,同时开启了2个消费者程序。只有缓冲区有内容时,消费者才能执行他的动作。对于生产者消费者问题的信号量操作可以参考操作系统教程中相关内容。

消息队列

消息队列是存在于内核中的消息列表,一个进程可以将消息发送到消息列表。另一个进程可以从消息列表接受消息。消息队列的操作方式为先进先出

相关API

#include <sys/types.h>

#include <sys/ipc.h>

#include <sys/msg.h>

int msgget(key_t key,int msgflg)

函数说明:功能:获取或者创建一个消息队列,key值同上,msgflg:存取或者创建条件值同上。成功返回消息队列标识,失败返回-1.

int msgsnd(int msgid,const void* msgp,size_t msgsz,int msgflg)

函数说明:功能:向消息队列中发送消息,msgid:消息队列标识,msgp消息结构体的地址,msgsz:消息结构体的字节,msgflg:操作标志,成功返回0,否则返回-1。在消息队列没有足够的空间容纳发送的消息时,该函数会阻塞,如果msgflg为IPC_NOWAIT ,则不管发送消息是否成功,该函数都不会阻塞。其中msgp必须指向这样一个结构体

struct msgbuf{

long mtype;   //必须有且大于0

char mtext[1];  //这个可以自己定以,也可以定义其他成员

}

size_t msgrcv(int msgid,void *msgp,size_t msgsz,long msgtyp,int msgflg)

函数说明:获取指定消息队列中,msgtyp类型的消息,该值要根发送消息的结构体中msgp->mtype值一样,msgsz,消息结构体的大小,msgflg操作标志,值同上

成功返回收到的字节个数,失败返回-1

int msgctl(int msqid,int cmd,struct msqid_ds* buf)

函数说明:cmd操作类型,IPC_RMID删除消息队列,IPC_STAT获取消息队列的状态,IPC_SET改变消息队列的状态,buf用来存放消息队列的属性信息,其结构体如下

struct msqid_ms{

struct ipc_perm msg_perm;   //权限

struct msg *msg_first;   //消息队列的首

struct msg *msg_last;     //消息队列的尾

__kernel_time_t msg_stime;   //最后发送时间

__kernel_time_t msg_rtime;  //最后接受时间

__kernel_time_t msg_ctime; //最后修改时间

unsigned short msg_cbytes;    //当前消息队列的字节数

unsigned short msg_qnum;  //消息队列中的消息数

unsigned short msg_qbytes; //消息队列的最大字节数

__kernel_ipc_pid_t  msg_lspid; //最后发送消息的进程ID

__kernel_ipc_pid_t  msg_lrpid; //最后接受消息的进程ID

};

演示代码

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <unistd.h>
#define MSG_KEY 111
#define BUFSIZE 4096
struct msgbuf
{
long mtype;
char mtext[BUFSIZE];
};
int main()
{
int mspid;
pid_t pid; mspid=msgget(MSG_KEY,IPC_CREAT|);
if(mspid==-)
{
perror("msgget");
exit(-);
} pid=fork();
if(pid<)
{
perror("fork");
exit(-);
}
else if(pid==)
{
sleep();
int msqid=msgget(MSG_KEY,);
if(msqid==-)
{
perror("msgget");
exit(-);
}
struct msgbuf buf;
if(msgrcv(msqid,(void *)&buf,sizeof(struct msgbuf),,)==-)
{
perror("msgrcv");
exit(-);
}
printf("child:rcv a msg is %s\n",buf.mtext);
exit();
}
else
{
struct msgbuf buf;
buf.mtype=;
strcpy(buf.mtext,"Hello World!");
if(msgsnd(mspid,(const void *)&buf,sizeof(struct msgbuf),)==-)
{
perror("msgsnd");
exit(-);
}
printf("parent:snd a msg is %s\n",buf.mtext);
sleep();
} // struct msqid_ds cbuf;
msgctl(mspid,IPC_RMID,NULL);
return ;
}

总结:以上就是一些常用的进程通信的方法了,关于一些具体的API和一些结构体,以及宏所代表的意义,如果不是很清楚可以在Linux下man相关函数,Linux多进程编程就到这里了,接下来就是多线程编程相关的知识了。

上一篇:Android进程机制


下一篇:linux centos安装编译phantomjs 2.0的方法