Linux系统编程—进程间通信

目录

01. 管道

用于父子进程间(有亲缘关系的进程之间)的通信。

#include <unistd.h>

int pipe(int pipefd[2]);

#define _GNU_SOURCE             /* See feature_test_macros(7) */
#include <fcntl.h>              /* Obtain O_* constant definitions */
#include <unistd.h>

int pipe2(int pipefd[2], int flags);

/*
On  success,  zero is returned.  On error, -1 is returned, and errno is
set appropriately.
*/

pipefd[0] 是读取端,pipefd[1] 是写入端;从 pipefd[0] 读到的就是往 pipefd[1] 写入的数据。

  • 当读一个写端已关闭的管道/FIFO时,在所有数据都被读取后,读操作返回 0;
  • 如果写一个读端已关闭的管道/FIFO,则会错误返回并产生 SIGPIPE 信号。

如果写操作一次写入的数据大小不超过 PIPE_BUF,则此写操作写入的数据与其他进程对同一管道/FIFO的写操作所写入的数据不会交叉;否则可能会发生数据交叉!

对于 pipe2flags 取值如下:

  • 0: 此时和 pipe 相同
  • O_CLOEXEC: 在执行 exec 系列函数时,关闭管道文件描述符
  • O_DIRECT: 以包模式写入和读取数据;如果写入的数据超过 PIPE_BUF,则数据会被分割为多个包;如果读取的数据小于包的大小,则包的余下部分会被丢弃
  • O_NONBLOCK: 非阻塞模式

02. FIFO

又叫命名管道,可用于没有亲缘关系的进程间的通信。

#include <sys/types.h>
#include <sys/stat.h>

int mkfifo(const char *pathname, mode_t mode);

#include <fcntl.h>           /* Definition of AT_* constants */
#include <sys/stat.h>

/*
 pathname 是相对于目录 dirfd(目录的文件描述符) 的相对路径;
 如果 pathname 为绝对路径,则忽略 dirfd
*/
int mkfifoat(int dirfd, const char *pathname, mode_t mode);

/*
On  success mkfifo() and mkfifoat() return 0.  In the case of an error,
-1 is returned (in which case, errno is set appropriately).
*/

首先通过 mkfifo/mkfifoat 创建 fifo,然后使用 open 打开它,即可通过读写操作进行通信。

如果以非阻塞模式打开 fifo,只读 open 会阻塞进程直到另一个进程以写方式 open 该 fifo,反之亦然。

03. 记录锁

使用 fcntl 能够在一个文件的任意一部分上放置一把锁(劝告式锁)。

#include <fcntl.h>

int fcntl(int fildes, int cmd, .../* struct flock* */);

/*
返回值:
	成功时,返回值不为 -1;
	失败时,返回 -1,并设置 errno
*/

cmd 取值:

  • F_SETLK: 获取或释放锁,如果无法加锁则失败返回(errno 置为 EAGAIN/EACCES
  • F_SETLKW: 获取或释放锁,如果无法加锁则阻塞直至可以加锁,可被信号中断
  • F_GETLK: 检测是否能够获取指定的锁,如果可以,则在 l_type 中返回 F_UNLCK;如果不可以则会返回已有锁的信息(起始偏移、大小、持有进程)

flock 结构定义了要获取/删除的锁:

struct flock {
   ...
   short l_type;    /* Type of lock: F_RDLCK, F_WRLCK, F_UNLCK */
   short l_whence;  /* How to interpret l_start: SEEK_SET, SEEK_CUR, SEEK_END */
   off_t l_start;   /* Starting offset for lock */
   off_t l_len;     /* Number of bytes to lock */
   pid_t l_pid;     /* PID of process blocking our lock (set by F_GETLK) */
   ...
};

l_type 取值:

  • F_RDLCK: 加读锁
  • F_WRLCK: 加写锁
  • F_UNLCK: 释放锁

l_whence 取值:

  • SEEK_SET: 锁区域的起始位置为 l_start
  • SEEK_CUR: 锁区域的起始位置为 当前文件偏移量 + l_start
  • SEEK_END: 锁区域的起始位置为 文件大小 + l_start

l_len 指定了锁区域的大小(字节),0 表示直到文件末尾。

如果一个进程对某个文件区间已持有一把锁,随后该进程在同一区间再加一把锁,那么新锁将替换已有锁!

在设置或释放锁时,系统会合并相邻同类型的锁区间或分裂一个锁区间为多个不同类型的锁区间。如,

// 在一个读锁之后再加一个读锁
|	读锁1		|	读锁2		|	--合并-->		|			读锁		 	|


// 在读锁中间加一个写锁
|		     读锁			 |	 --分裂--> 	 |	读锁1	  |  写锁	 | 读锁2 |

记录锁同时与一个进程和一个 i-node 关联,所以,

  • 当进程终止时,其建立的所有记录锁都将全部被释放。

  • 当一个进程关闭了一个文件描述符之后,进程持有的对应文件上的记录锁都将被释放,不管这些锁是通过哪个文件描述符获得的!如,

    fd1 = open("test", O_RDWR);
    fd2 = open("test", O_RDWR);
    
    // 通过 fd1 获取记录锁 ...
    
    close(fd2);		// 当前进程在 test 文件上所持有的记录锁都被释放掉!
    
    // ...
    

fork 创建的子进程不会继承父进程的记录锁。

如果文件描述符没有设置 close-on-exec 标志,则在执行 exec 系列函数时,新程序可以继续使用原有的记录锁;否则则无法使用。

例子:

#include <fcntl.h>

static inline int flock_op(int fd, short type, short whence, off_t offset, off_t len) {
	struct flock lock;
	int ret;

	lock.l_type		= type;
	lock.l_start	= offset;
	lock.l_whence	= whence;
	lock.l_len		= len;

	ret = fcntl(fd, F_SETLK, &lock);
	return ret;
}

int read_lock(int fd, off_t offset, off_t len) {
	return flock_op(fd, F_RDLCK, SEEK_SET, offset, len);
}

int write_lock(int fd, off_t offset, off_t len) {
	return flock_op(fd, F_WRLCK, SEEK_SET, offset, len);
}

int unlock(int fd, off_t offset, off_t len) {
	return flock_op(fd, F_UNLCK, SEEK_SET, offset, len);
}

04. POSIX命名信号量

链接时需指定 -pthread.

创建或打开

#include <fcntl.h>           /* For O_* constants */
#include <sys/stat.h>        /* For mode constants */
#include <semaphore.h>

// 打开
sem_t *sem_open(const char *name, int oflag);

// 创建
sem_t *sem_open(const char *name, int oflag, mode_t mode, unsigned int value);

/*
On  success,  sem_open() returns the address of the new semaphore;
On error,  sem_open()  returns  SEM_FAILED, with errno set to indicate the error.
*/

name 指定了信号量的名称,以 / 开头,后跟多个非 / 的字符,如 /mysemmode 参数与 open 中的 mode 参数一样,指定了信号量的访问权限(需要可读写);value 指定了信号量的初始值。

oflag 取值:

  • 0: 打开一个已有的信号量

  • O_CREAT: 如果信号量不存在,则创建之

  • O_EXCL: 结合 O_CREAT 使用,如果信号量已经存在,则失败返回

关闭

#include <semaphore.h>

int sem_close(sem_t *sem);

/*
On  success sem_close() returns 0; on error, -1 is returned, with errno
set to indicate the error.
*/

进程终止或执行 exec 系列函数时会自动关闭信号量。

删除

#include <semaphore.h>

int sem_unlink(const char *name);

/*
On success sem_unlink() returns 0; on error, -1 is returned, with errno
set to indicate the error.
*/

将信号量标记为"一旦所有进程都已关闭该信号量,则删除之"。

PV操作

#include <semaphore.h>

int sem_wait(sem_t *sem);
int sem_trywait(sem_t *sem);
int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);

/*
All of these functions return 0 on success; on error, the value of  the
semaphore  is left unchanged, -1 is returned, and errno is set to indi‐
cate the error.
*/

如果信号量的当前值大于 0,则减一返回,否则,

  • sem_wait: 阻塞直到大于0,然后减一返回
  • sem_trywait: 立即失败返回,errno 设为 EAGAIN
  • sem_timedwait: 最多阻塞指定的时长(绝对时间),超时则失败返回,并将 errno 设为 ETIMEDOUT,否则减一返回
#include <semaphore.h>

int sem_post(sem_t *sem);

/*
sem_post() returns 0 on success; on error, the value of  the  semaphore
is  left  unchanged,  -1  is returned, and errno is set to indicate the
error.
*/

将信号量的当前值加一。

05. POSXI匿名信号量

将信号量放在共享内存区域中,可实现进程间/线程间通信。

PV 操作和命名信号量的一致。

初始化

#include <semaphore.h>

int sem_init(sem_t *sem, int pshared, unsigned int value);

/*
sem_init() returns 0 on success; on error, -1 is returned, and errno is
set to indicate the error.
*/

value 指定了信号量的初始值。

pshared 取值:

  • 0: 信号量在调用进程的线程间共享
  • != 0: 信号量在进程间共享,此时 sem 需位于共享内存区域中

销毁

#include <semaphore.h>

int sem_destroy(sem_t *sem);

/*
sem_destroy() returns 0 on success; on error, -1 is returned, and errno
is set to indicate the error.
*/

销毁之后可以重新初始化。

06. POSIX消息队列

链接时需使用 -lrt 选项。

可以使用 pollselectepoll 来监控 POSIX消息队列。

创建或打开

#include <fcntl.h>           /* For O_* constants */
#include <sys/stat.h>        /* For mode constants */
#include <mqueue.h>

// 打开
mqd_t mq_open(const char *name, int oflag);

// 创建
mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);

/*
On success, mq_open() returns a message queue  descriptor  for  use  by
other message queue functions.  On error, mq_open() returns (mqd_t) -1,
with errno set to indicate the error.
*/

name 指定了队列的名称,以 / 开头,后跟多个非 / 的字符,如 /myqueuemode 参数与 open 中的 mode 参数一样,指定了队列的访问权限(如可读写)。

oflag 取值:

  • O_RDONLY: 只读
  • O_WRONLY: 只写
  • O_RDWR: 读写
  • O_CLOEXEC: 设置 close-on-exec 标志
  • O_CREAT: 不存在时创建
  • O_EXCL: 结合 O_CREAT 使用,如果已存在则失败返回(errno 设为 EEXIST
  • O_NONBLOCK: 非阻塞模式打开

attr 指定了队列的属性,NULL 表示默认属性。

关闭

#include <mqueue.h>

int mq_close(mqd_t mqdes);

/*
On  success  mq_close() returns 0; on error, -1 is returned, with errno
set to indicate the error.
*/

会自动删除已注册的消息通知。

进程终止或执行 exec 时会自动关闭。

删除

#include <mqueue.h>

int mq_unlink(const char *name);

/*
On success mq_unlink() returns 0; on error, -1 is returned, with  errno
set to indicate the error.
*/

将队列标记为"一旦所有进程都已关闭该队列,则删除之"。

属性

#include <mqueue.h>

int mq_getattr(mqd_t mqdes, struct mq_attr *attr);

// oldattr 可为 NULL,不返回之前的属性
int mq_setattr(mqd_t mqdes, const struct mq_attr *newattr, struct mq_attr *oldattr);

struct mq_attr {
   long mq_flags;       /* Flags: 0 or O_NONBLOCK */
   long mq_maxmsg;      /* Max. # of messages on queue */
   long mq_msgsize;     /* 每条消息的最大大小/字节 */
   long mq_curmsgs;     /* # of messages currently in queue */
};

/*
On  success  mq_getattr()  and  mq_setattr()  return 0; on error, -1 is
returned, with errno set to indicate the error.
*/

设置属性时,只能设置 mq_flags 字段。

发送消息

#include <mqueue.h>

int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned int msg_prio);

#include <time.h>
#include <mqueue.h>

int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned int msg_prio,
             const struct timespec *abs_timeout);

struct timespec {
   time_t tv_sec;        /* seconds */
   long   tv_nsec;       /* nanoseconds */
};

/*
On  success,  mq_send() and mq_timedsend() return zero; on error, -1 is
returned, with errno set to indicate the error.
*/

msg_prio 指定了消息的优先级(0最低),消息在队列中是按照优先级排序的,高优先级在前,相同优先级的多条消息按照发送时间排序。

abs_timeout 指定了阻塞等待的超时时间(绝对时间),超时时 errno 被设为 ETIMEDOUT

接收消息

#include <mqueue.h>

// msg_prio 可为 NULL,不返回优先级
ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio);

#include <time.h>
#include <mqueue.h>

ssize_t mq_timedreceive(mqd_t mqdes, char *msg_ptr,
                  size_t msg_len, unsigned int *msg_prio,
                  const struct timespec *abs_timeout);

/*
 On success, mq_receive() and mq_timedreceive()  return  the  number  of
bytes in the received message; on error, -1 is returned, with errno set
to indicate the error.
*/

获取并删除队列头消息。

msg_len 需大于等于队列的 mq_msgsize 属性,否则会失败返回(errnoEMSGSIZE )。

消息通知

#include <mqueue.h>

int mq_notify(mqd_t mqdes, const struct sigevent *sevp);

union sigval {          /* Data passed with notification */
   int     sival_int;         /* Integer value */
   void   *sival_ptr;         /* Pointer value */
};

struct sigevent {
   int          sigev_notify; /* Notification method */
   int          sigev_signo;  /* Notification signal */
   union sigval sigev_value;  /* Data passed with
                                 notification */
   void       (*sigev_notify_function) (union sigval);
                    /* Function used for thread notification (SIGEV_THREAD) */
   void        *sigev_notify_attributes;
                    /* Attributes for notification thread (SIGEV_THREAD) */
   pid_t        sigev_notify_thread_id;
                    /* ID of thread to signal (SIGEV_THREAD_ID) */
};

/*
On  success mq_notify() returns 0; on error, -1 is returned, with errno
set to indicate the error.
*/
  • 当消息进入队列时发送通知。

  • 一次只能有一个进程能够注册通知,否则会失败返回(errnoEBUSY)。

  • 向注册进程发送通知后便会删除该注册信息。

  • 如果 sevp 为 NULL,则撤销注册。

sevpsigev_notify 字段指定了通知方式:

  • SIGEV_NONE: 不通知,依旧会删除注册信息
  • SIGEV_SIGNAL: 生成 sigev_signo 中指定的信号
  • SIGEV_THREAD: 调用 sigev_notify_function 中指定的函数

07. XSI IPC

IPC标识符和键

每个 XSI IPC 对象在内核都通过一个非负整数的标识符 (内部名)加以引用;此外,每个 IPC 对象都与一个键(外部名)关联。

为了使不同的进程引用相同的 IPC 对象,可以有如下方式:

  1. 将键指定为 IPC_PRIVATE,在使用 XXXget 操作创建 IPC 对象后,将返回的 IPC 对象标识符存到其他进程可以访问到的地方(如,文件),随后其他进程便可通过该标识符引用相同的 IPC 对象;
  2. 所有的进程通过使用相同的参数调用 ftok 来生成相同的 IPC 键,然后使用该键调用 XXXget 操作来创建或打开既有的 IPC 对象;
#include <sys/types.h>
#include <sys/ipc.h>

// 只取 proj_id 的低 8 bits
key_t ftok(const char *pathname, int proj_id);

/*
On  success,  the  generated key_t value is returned.  On failure -1 is
returned, with errno indicating the error as  for  the  stat(2)  system
call.
*/

权限结构

struct ipc_perm {
   key_t          __key;       /* IPC Key */
   uid_t          uid;         /* Effective UID of owner */
   gid_t          gid;         /* Effective GID of owner */
   uid_t          cuid;        /* Effective UID of creator */
   gid_t          cgid;        /* Effective GID of creator */
   unsigned short mode;        /* Permissions */
   unsigned short __seq;       /* Sequence number */
};

08. XSI 共享内存

为了避免多个进程同时访问共享内存,需要使用额外的同步机制,如信号量等。

创建或打开

#include <sys/ipc.h>
#include <sys/shm.h>

int shmget(key_t key, size_t size, int shmflg);

/*
On success, a valid shared memory identifier is returned.  On error, -1
is returned, and errno is set to indicate the error.
*/

size 指定了共享内存的大小(字节),如果该共享内存已存在,则忽略 size.

shmflg 取值:

  • IPC_CREAT: 如果内存段不存在,则创建之
  • IPC_EXCL: 结合 IPC_CREAT 使用,如果内存段已存在,则失败返回(errno 设为 EEXIST
  • SHM_HUGETLB: 使用 huge page 来创建共享内存
  • SHM_HUGE_2MB, SHM_HUGE_1GB: 结合 SHM_HUGETLB 使用,指定 huge page 的大小
  • SHM_NORESERVE: 不为此共享内存保留交换空间

附加内存段

将共享内存段附加到进程的虚拟地址空间中。

#include <sys/types.h>
#include <sys/shm.h>

void *shmat(int shmid, const void *shmaddr, int shmflg);

/*
On  success,  shmat() returns the address of the attached shared memory
segment; on error, (void *) -1 is returned, and errno is set  to  indi‐
cate the cause of the error.
*/
  • 如果 shmaddr 为 NULL,则自动附加到一个合适的地方(推荐);
  • 如果 shmaddr 不为 NULL,且 shmflg 中没有设置 SHM_RND,则附加内存段到 shmaddr 指定的地方(必须是系统分页大小的整数倍);
  • 如果 shmaddr 不为 NULL,且 shmflg 中设置了 SHM_RND,则内存段附加到的地址为 shmaddr 舍入到最近的 SHMLBA (shared memory low boundary address)的整数倍处。

shmflg 还可取值:

  • SHM_EXEC: 内存段中的内容可执行
  • SHM_RDONLY: 内存段中的内容只读
  • SHM_REMAP: 替换 shmaddr 处已有的映射

fork 创建的子进程会继承父进程附加的共享内存段。

分离内存段

将共享内存从进程的地址空间中分离出去。

#include <sys/types.h>
#include <sys/shm.h>

int shmdt(const void *shmaddr);

/*
On  success,  shmdt()  returns 0; on error -1 is returned, and errno is
set to indicate the cause of the error.
*/

执行 exec 后,所有已附加的共享内存段都会被分离。

进程终止后,会自动分离共享内存段。

分离并不等于删除。

控制操作

#include <sys/ipc.h>
#include <sys/shm.h>

int shmctl(int shmid, int cmd, struct shmid_ds *buf);

struct shmid_ds {
   struct ipc_perm shm_perm;    /* Ownership and permissions */
   size_t          shm_segsz;   /* Size of segment (bytes) */
   time_t          shm_atime;   /* Last attach time */
   time_t          shm_dtime;   /* Last detach time */
   time_t          shm_ctime;   /* Last change time */
   pid_t           shm_cpid;    /* PID of creator */
   pid_t           shm_lpid;    /* PID of last shmat(2)/shmdt(2) */
   shmatt_t        shm_nattch;  /* No. of current attaches */
   ...
};

/*
A successful IPC_INFO or SHM_INFO operation returns the  index  of  the
highest used entry in the kernel's internal array recording information
about all shared memory segments.  (This information can be  used  with
repeated  SHM_STAT  operations  to  obtain information about all shared
memory segments  on  the  system.)   A  successful  SHM_STAT  operation
returns  the  identifier  of  the shared memory segment whose index was
given in shmid.  Other operations return 0 on success.

On error, -1 is returned, and errno is set appropriately.
*/

cmd 取值:

  • IPC_RMID: 标记删除;在所有进程都已分离该段之后会执行删除操作,忽略 buf

  • IPC_STAT: 在 buf 中返回共享内存段的信息。

  • IPC_SET: 使用 buf 中的信息更新内存段的信息。

  • IPC_INFO: 在 buf 中返回共享内存的系统限制信息,需要将 buf 转换为 struct shminfo* 类型,需要定义 _GNU_SOURCE 宏,会忽略 shmid 参数

    struct shminfo {
         unsigned long shmmax; /* Maximum segment size */
         unsigned long shmmin; /* Minimum segment size; always 1 */
         unsigned long shmmni; /* Maximum number of segments */
         unsigned long shmseg; /* Maximum number of segments
                                  that a process can attach;
                                  unused within kernel */
         unsigned long shmall; /* Maximum number of pages of
                                  shared memory, system-wide */
     };
    
  • SHM_INFO: 在 buf 中返回共享内存所使用的系统资源信息,需要将 buf 转换为 struct shm_info* 类型,需要定义 _GNU_SOURCE 宏,会忽略 shmid 参数

    struct shm_info {
         int           used_ids; /* # of currently existing segments */
         unsigned long shm_tot;  /* Total number of shared memory pages */
         unsigned long shm_rss;  /* # of resident shared memory pages */
         unsigned long shm_swp;  /* # of swapped shared memory pages */
         unsigned long swap_attempts; /* Unused since Linux 2.4 */
         unsigned long swap_successes; /* Unused since Linux 2.4 */
     };
    
  • SHM_STAT: 和 IPC_STAT 类似,但 shmid 参数是该共享内存在内核为所有共享内存维护的数组中的索引。

  • SHM_LOCK: 防止共享内存被交换出去,会在 shm_perm.mode 中设置 SHM_LOCKED.

  • SHM_UNLOCK: 允许共享内存被交换出去。

如,获取所有的共享内存段信息:(ipcs -m

#include <sys/ipc.h>
#include <sys/shm.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>

static void fatal(const char* msg) {
	fprintf(stderr, "%s: %s\n", msg, strerror(errno));
	exit(-1);
}

static void waring(const char* msg) {
	fprintf(stderr, "%s: %s\n", msg, strerror(errno));
}

void print_shminfo(const struct shm_info* info ) {
	printf("used_ids: %d\n", info->used_ids);
	printf("shm_tot: %lu\n", info->shm_tot);
	printf("\n");
}

void print_shmds(const struct shmid_ds* ds) {
	printf("shm_segsz: %lu\n", ds->shm_segsz);
	printf("shm_nattach: %lu\n", ds->shm_nattch);
	printf("\n");
}

int main() {
	int ret, n;
	struct shm_info info;
	struct shmid_ds ds;

	n = shmctl(0, SHM_INFO, (struct shmid_ds*)&info);
	if (n == -1) {
		fatal("shmctl");
	}
	print_shminfo(&info);

	for (int i = 0; i < n; i++) {
		ret = shmctl(i, SHM_STAT, &ds);
		if (ret != -1) {
			print_shmds(&ds);
		}
		else {
			waring("shmctl SHM_STAT");
		}
	}
	
	return 0;
}

09. XSI 消息队列

创建或打开

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgget(key_t key, int msgflg);

/*
If successful, the return value will be the message queue identifier (a
nonnegative integer), otherwise -1 with errno indicating the error.
*/

msgflg 取值:

  • IPC_CREAT: 如果不存在则创建之
  • IPC_EXCL: 结合 IPC_CREAT 使用,如果已存在则失败返回(errno 设为 EEXIST

发送消息

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);

/*
On  failure  msgsnd() returns -1 with errno indicating the error,
otherwise msgsnd() returns 0.
*/

msgp 指向的内容通常具有如下形式:

struct msgbuf {
   long mtype;       /* message type, must be > 0 */
   char mtext[];     /* message data */
};

msgsz 参数指定了 mtext 中的字节数。

msgflg 取值:

  • IPC_NOWAIT: 非阻塞发送,失败时将 errno 置为 EAGAIN

接收消息

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);

/*
On  failure  msgrcv() returns -1 with errno indicating the error,
otherwise msgrcv() returns the number  of  bytes actually copied into the mtext array.
*/

msgsz 参数指定了 mtext 所指缓冲区的最大大小。如果消息的大小超过了 msgsz,则不会从队列中删除并返回消息,而是失败返回,并将 errno 置为 E2BIG.

msgtyp 指定了要接收的消息的类型,

  • == 0: 获取队列中的第一条消息
  • > 0: 指定类型的第一条消息
  • < 0: mtype 最小的、且其值小于等于 -msgtyp 的第一条消息

msgflg 取值:

  • IPC_NOWAIT: 非阻塞接收,如果没有消息则失败返回,并将 errno 置为 ENOMSG
  • MSG_COPY: 需和 IPC_NOWAIT 一起使用,获取队列中的第 msgtyp 条消息(从0开始计数),且不删除该消息
  • MSG_EXCEPT: msgtyp 大于 0 时起作用,返回队列中第一条 mtype 不为 msgtyp 的消息
  • MSG_NOERROR: 如果 msgsz 小于消息大小,则截断消息,不产生错误

控制操作

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgctl(int msqid, int cmd, struct msqid_ds *buf);

struct msqid_ds {
   struct ipc_perm msg_perm;     /* Ownership and permissions */
   time_t          msg_stime;    /* Time of last msgsnd(2) */
   time_t          msg_rtime;    /* Time of last msgrcv(2) */
   time_t          msg_ctime;    /* Time of last change */
   unsigned long   __msg_cbytes; /* Current number of bytes in queue (nonstandard) */
   msgqnum_t       msg_qnum;     /* Current number of messages in queue */
   msglen_t        msg_qbytes;   /* Maximum number of bytes allowed in queue */
   pid_t           msg_lspid;    /* PID of last msgsnd(2) */
   pid_t           msg_lrpid;    /* PID of last msgrcv(2) */
};

/*
On success, IPC_STAT, IPC_SET, and IPC_RMID  return  0.   A  successful
IPC_INFO  or  MSG_INFO  operation returns the index of the highest used
entry in the kernel's internal array recording  information  about  all
message  queues.   (This information can be used with repeated MSG_STAT
operations to obtain information about all queues on  the  system.)   A
successful MSG_STAT operation returns the identifier of the queue whose
index was given in msqid.

On error, -1 is returned with errno indicating the error.
*/

cmd 取值:

  • IPC_RMID: 立即删除消息队列,唤醒所有等待的进程,忽略 buf 参数。

  • IPC_STAT: 在 buf 中返回消息队列的信息。

  • IPC_SET: 使用 buf 中的信息更新队列的信息。

  • IPC_INFO: 在 buf 中返回消息队列的系统限制信息,需要将 buf 转换为 struct msginfo* 类型,需要定义 _GNU_SOURCE 宏,会忽略 msqid 参数

    struct msginfo {
      int msgpool; /* Size in kibibytes of buffer pool
                      used to hold message data;
                      unused within kernel */
      int msgmap;  /* Maximum number of entries in message
                      map; unused within kernel */
      int msgmax;  /* Maximum number of bytes that can be
                      written in a single message */
      int msgmnb;  /* Maximum number of bytes that can be
                      written to queue; used to initialize
                      msg_qbytes during queue creation
                      (msgget(2)) */
      int msgmni;  /* Maximum number of message queues */
      int msgssz;  /* Message segment size;
                      unused within kernel */
      int msgtql;  /* Maximum number of messages on all queues
                      in system; unused within kernel */
      unsigned short int msgseg;
                   /* Maximum number of segments;
                      unused within kernel */
    };
    
  • MSG_INFO: 类似于 IPC_INFO,只是,

    struct msginfo {
      int msgpool; /* the number of message queues that cur‐
                  	  rently exist on the system */
      int msgmap;  /* total number  of  messages in all queues on the system */
      ...
      int msgtql;  /* the total number of bytes in all messages  
      				  in  all queues on the system */
      ...
    };
    
  • MSG_STAT: 和 IPC_STAT 类似,但 msqid 参数是队列在内核为所有消息队列维护的数组中的索引。

10. 内存映射

有两种内存映射:

  • 文件映射:将文件的一部分映射到进程的虚拟内存中;

  • 匿名映射:没有对应的文件,默认会将内容初始化为 0;

每种映射又可分为:

  • 私有映射:在映射内容上所作的修改对其他进程不可见,对于文件映射来说不会修改底层文件;
  • 共享映射:在映射内容上所作的修改对其他进程可见,对于文件映射来说会修改底层文件;

创建映射

#include <sys/mman.h>

void *mmap(void *addr, size_t length, int prot, int flags, int fd, off_t offset);

/*
On success, mmap() returns a pointer to the mapped area.  On error, the
value MAP_FAILED (that is, (void *) -1) is returned, and errno  is  set
to indicate the cause of the error.
*/

如果 addr 为 NULL,则会自动将映射放到进程地址空间中合适的地方。

length 指定了要映射的字节数。

prot 取值:

  • PROT_EXEC: 映射区域可执行
  • PROT_READ: 映射区域可读
  • PROT_WRITE: 映射区域可写
  • PROT_NONE: 映射区域不可访问

flags 取值:

  • MAP_SHARED: 共享映射
  • MAP_PRIVATE: 私有映射
  • MAP_ANONYMOUS: 匿名映射
  • MAP_FIXED: 将映射放到 addr 处(必须是分页大小的整数倍)
  • MAP_GROWSDOWN: 用于栈,映射需向下扩展
  • MAP_HUGETLB: 使用 huge page 来创建映射
  • MAP_HUGE_2MB, MAP_HUGE_1GB: 结合 MAP_HUGETLB 使用,指定 huge page 的大小
  • MAP_LOCKED: 将映射分页锁进内存,不被交换出去
  • MAP_NORESERVE: 不预留交换空间
  • MAP_UNINITIALIZED: 不清零匿名映射的分页

fd 指定要映射哪个文件、offset 指定要映射的文件区域的起始偏移量(匿名映射不使用)。

解除映射

#include <sys/mman.h>

int munmap(void *addr, size_t length);

/*
On  success,  munmap() returns 0.  On failure, it returns -1, and errno
is set to indicate the cause of the error (probably to EINVAL).
*/

addr 指定要删除的映射的起始地址,需是分页边界对齐。

length 指定要删除的映射的大小,会向上舍入到分页的整数倍。

进程终止或执行 exec 后会自动解除映射。

文件映射

  • 调用 mmap 之后便可以关闭传入的文件描述符。
  • 文件偏移量 offset 需是系统分页大小的整数倍。
  • 共享文件映射下,多个进程会共享相同的内存物理分页。

可使用如下操作来同步共享文件映射和底层文件内容:

#include <sys/mman.h>

int msync(void *addr, size_t length, int flags);

/*
On success, zero is returned.  On error, -1 is returned, and  errno  is
set appropriately.
*/

addrlength 指定了要同步的映射区域,addr 需是分页边界对齐的,length 会向上舍入到分页大小的整数倍。

flags 取值:

  • MS_ASYNC: 异步写入文件,不会阻塞
  • MS_SYNC: 同步写入文件,会阻塞
  • MS_INVALIDATE: 使相同文件区域的其他映射无效,让它们能够使用刚写入的内容来更新映射

匿名映射

  • fd 设为 -1.
  • 共享的匿名映射可以使父子进程共享同一块内存区域。

11. 套接字

创建

#include <sys/types.h>
#include <sys/socket.h>

int socket(int domain, int type, int protocol);

/*
On  success,  a  file  descriptor  for  the new socket is returned.  On
error, -1 is returned, and errno is set appropriately.
*/

domain 常用值:AF_INET, AF_INET6, AF_UNIX, AF_NETLINK, AF_PACKET.

type 常用值:SOCK_STREAM, SOCK_DGRAM, SOCK_RAW.

可以在 type 中位或如下值:

  • SOCK_NONBLOCK: 设置套接字描述符为非阻塞模式
  • SOCK_CLOEXEC: 设置 close-on-exec 标志

protocol 指定要使用的协议,对于 TCP、UDP、Unix域套接字,将其设为 0 即可。

绑定地址

#include <sys/types.h>
#include <sys/socket.h>

int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);

struct sockaddr {
   sa_family_t sa_family;
   char        sa_data[14];
}

/*
On  success,  zero is returned.  On error, -1 is returned, and errno is
set appropriately.
*/

ipv4地址结构:

struct sockaddr_in {
   sa_family_t    sin_family; /* address family: AF_INET */
   in_port_t      sin_port;   /* port in network byte order */
   struct in_addr sin_addr;   /* internet address */
};

/* Internet address. */
struct in_addr {
   uint32_t       s_addr;     /* address in network byte order */
};

ipv6地址结构:

struct sockaddr_in6 {
   sa_family_t     sin6_family;   /* AF_INET6 */
   in_port_t       sin6_port;     /* port number */
   uint32_t        sin6_flowinfo; /* IPv6 flow information */
   struct in6_addr sin6_addr;     /* IPv6 address */
   uint32_t        sin6_scope_id; /* Scope ID (new in 2.4) */
};

struct in6_addr {
   unsigned char   s6_addr[16];   /* IPv6 address */
};

unix地址结构:

struct sockaddr_un {
   sa_family_t sun_family;               /* AF_UNIX */
   char        sun_path[108];            /* pathname */
};

监听

#include <sys/types.h>
#include <sys/socket.h>

int listen(int sockfd, int backlog);

/*
On  success,  zero is returned.  On error, -1 is returned, and errno is
set appropriately.
*/

backlog 指定了等待连接队列的最大大小。

接受连接

#include <sys/types.h>
#include <sys/socket.h>

int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);

#define _GNU_SOURCE             /* See feature_test_macros(7) */
#include <sys/socket.h>

int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);

/*
On  success,  these system calls return a nonnegative integer that is a
descriptor for the accepted socket.  On  error,  -1  is  returned,  and
errno is set appropriately.
*/

addrlen 需初始化为 addr 指向空间的大小,返回时,其中保存对端地址的大小。如对对端地址不感兴趣,可将 addraddrlen 设为 NULL.

flags 可取值:

  • SOCK_NONBLOCK: 将返回的描述符设为非阻塞模式
  • SOCK_CLOEXEC: 为返回的描述符设置 close-on-exec 标志

发起连接

#include <sys/types.h> 
#include <sys/socket.h>

int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);

/*
If  the connection or binding succeeds, zero is returned.  On error, -1
is returned, and errno is set appropriately.
*/

对于基于连接的套接字,它会连接到 addr 指定的对端。

对于 SOCK_DGRAM 套接字,addr 指定了默认发送操作的对端。

关闭

#include <unistd.h>

int close(int fd);

/*
close()  returns  zero on success.  On error, -1 is returned, and errno
is set appropriately.
*/

半关闭

#include <sys/socket.h>

int shutdown(int sockfd, int how);

/*
On  success,  zero is returned.  On error, -1 is returned, and errno is
set appropriately.
*/

how 取值:

  • SHUT_RD: 关闭读端,之后的读操作将返回 0;对于 unix域套接字,对端将产生 SIGPIPE 信号,之后的写操作将失败返回(errno 置为 EPIPE
  • SHUT_WR: 关闭写端,之后的写操作将会产生 SIGPIPE 信号(errno 置为 EPIPE
  • SHUT_RDWR: 关闭读写端

shutdown 并不会关闭套接字描述符。

无论该套接字上是否还关联着其他的描述符,shutdown 都会关闭套接字通道,这意味着无法通过其他描述符进行读/写操作。如,

fd2 = dup(sockfd);

shutdown(sockfd, SHUT_RD);

read(fd2, buf, len);	// 失败!!

发送数据

也可以使用 write 系列操作。

send

#include <sys/types.h>
#include <sys/socket.h>

ssize_t send(int sockfd, const void *buf, size_t len, int flags);

/*
On success, it returns the number of bytes sent.  On error,  -1
is returned, and errno is set appropriately.
*/

send(sockfd, buf, len, flags) 等价于 sendto(sockfd, buf, len, flags, NULL, 0).

flags 取值:

  • MSG_CONFIRM: 提供链路层反馈以保持地址映射有效,不用再通过 ARP 探测邻居
  • MSG_DONTROUTE: 勿将数据包路由出本地网络
  • MSG_DONTWAIT: 非阻塞发送
  • MSG_EOR: 如果协议支持,标记记录结束
  • MSG_MORE: 延迟发送,以允许发送更多数据
  • MSG_NOSIGNAL: 如果对端已关闭连接,不要产生 SIGPIPE 信号
  • MSG_OOB: 如果协议支持,发送带外数据

sendto

#include <sys/types.h>
#include <sys/socket.h>

ssize_t sendto(int sockfd, const void *buf, size_t len, int flags,
                      const struct sockaddr *dest_addr, socklen_t addrlen);

/*
On success, it returns the number of bytes sent.  On error,  -1
is returned, and errno is set appropriately.
*/

flagssend.

sendmsg

#include <sys/types.h>
#include <sys/socket.h>

ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags);

struct msghdr {
   void         *msg_name;       /* optional address */
   socklen_t     msg_namelen;    /* size of address */
   struct iovec *msg_iov;        /* scatter/gather array */
   size_t        msg_iovlen;     /* # elements in msg_iov */
   void         *msg_control;    /* ancillary data */
   size_t        msg_controllen; /* ancillary data buffer len */
   int           msg_flags;      /* flags (unused) */
};

struct iovec {                    /* Scatter/gather array items */
   void  *iov_base;              /* Starting address */
   size_t iov_len;               /* Number of bytes to transfer */
};

/*
On success, it returns the number of bytes sent.  On error,  -1
is returned, and errno is set appropriately.
*/

flagssend.

接收数据

也可使用 read 系列操作。

recv

#include <sys/types.h>
#include <sys/socket.h>

ssize_t recv(int sockfd, void *buf, size_t len, int flags);

/*
It returns  the  number  of bytes received, or -1 if an error
occurred.  In the event of an error,  errno  is  set  to  indicate  the
error.
*/

recv(fd, buf, len, flags) 等价于 recvfrom(fd, buf, len, flags, NULL, 0).

flags 取值:

  • MSG_DONTWAIT: 非阻塞接收
  • MSG_ERRQUEUE: 接收错误消息作为辅助数据
  • MSG_OOB: 如果数据支持,获取带外数据
  • MSG_PEEK: 返回数据包内容,但不取走(移除)数据包
  • MSG_TRUNC: 即使数据包被截断,也返回数据包的实际长度
  • MSG_WAITALL: 等到所有数据都可用再返回

recvfrom

#include <sys/types.h>
#include <sys/socket.h>

ssize_t recvfrom(int sockfd, void *buf, size_t len, int flags,
                        struct sockaddr *src_addr, socklen_t *addrlen);

/*
It returns  the  number  of bytes received, or -1 if an error
occurred.  In the event of an error,  errno  is  set  to  indicate  the
error.
*/

addrlen 需初始化为 src_addr 指向空间的大小,返回时,其中保存对端地址的大小。如对对端地址不感兴趣,可将 src_addraddrlen 设为 NULL.

flagsrecv.

recvmsg

#include <sys/types.h>
#include <sys/socket.h>

ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags);

/*
It returns  the  number  of bytes received, or -1 if an error
occurred.  In the event of an error,  errno  is  set  to  indicate  the
error.
*/

flagsrecv.

返回的 msg->msg_flags:

  • MSG_EOR: 指示记录结束
  • MSG_TRUNC: 一般数据被截断
  • MSG_CTRUNC: 控制数据被截断
  • MSG_OOB: 接收到带外数据
  • MSG_ERRQUEUE: 接收到错误信息作为辅助数据

发送文件

#include <sys/sendfile.h>

ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count);

/*
If  the  transfer was successful, the number of bytes written to out_fd
is returned.  Note that a successful call to sendfile() may write fewer
bytes  than  requested; the caller should be prepared to retry the call
if there were unsent bytes.

On error, -1 is returned, and errno is set appropriately.
*/

in_fd 读取数据,并写到 out_fd 中,而这一切都是在内核空间完成的,不需要通过用户空间!

offset 指定了源文件的起始偏移量,为 NULL 表示当前偏移量;返回时会更新 offset 以指向新的偏移。但 sendfile 并不会更改源文件的偏移量,但会修改目标文件的偏移量。

count 指定了最多发送多少字节的数据。

自 Linux 2.6.33 之后,out_fd 可以是任何文件,而不必是套接字描述符!

上一篇:Cloudera Manager 术语和架构


下一篇:Entity Framework Code First数据库连接 转载 https://www.cnblogs.com/libingql/p/3351275.html