设计一个C/S结构的程序,在开辟的一片共享内存中读写程序,server端负责写数据(每次写一个长度为100的数组),client端负责读数据,并把100个数组的元素以tab符号分割的输出到一个文件中。在不使用任何锁和信号量的情况下完成数据读写的同步(既server在对数组中元素赋值的过程中,client端绝对不能读)。
解决思路:
首先,在不加锁的情况下,为同步数据的读写,共享内存多申请一个int变量,作为数据读写权限的标志位(0-空闲,此时生产者可写,1-繁忙,两者均不能访问,2-新数据,此时消费者可读);Server/Client在读取标志位判断自己得到权限后首先应该修改标志位以告诉对方自己要对数据进行操作,然对方等待。
两个程序配合正常,Server/Client进程中看到的分享内存的首地址是不同的,因为操作系统会为每个单独运行的程序分配自己的进程空间,进程空间以一段地址来表示,每个进程只能运行和操作分配给他的那段地址空间,而不能跨进程地址访问(出于安全考虑);而共享存储的目的也是为了让这样的进程能相互共享数据,内核会为共享存储空间单独开辟一段空间(独立于任何进程),而挂接到该共享存储空间的进程则将共享空间的实际地址映射到自己的地址空间中,这样进程对共享空间的访问就和对自己空间访问一样,也因为这样所以每个进程看到的共享内存的地址是不同的(只看到映射后的地址)。以上为个人理解,具体还需参查资料来确认。
实际效果如图所示:
可以看到,在不采用任何缓冲的情况下,Server端每写一个数组,都需要等待Client端去读取后才能继续写,这样两个程序都无法高效运行,空转、轮询和等待耗费大部分时间。
改进,采用一个长度为1000的循环队列来缓存数据,并引入两个游标。不加锁实现两个程序对一个队列的异步读取操作,通过定义两个数组游标来实现(readIndex,writeIndex),一个表示消费者可读取数据的起始位置,一个表示为生产者可写数据的起始位置;
对于生产者,能操作的队列范围为[writeIndex, readIndex-1)
对于消费者,能操作的队列范围为[readIndex, writeIndex)
为方便判断队列状态,生成者最大可写数据包数为(队列长度-1),那么当readIndex == writeIndex 代表队列空,(writeIndex + 1) % 队列长度 == readIndex代表队列满,其他情况代表消费者可以对队列进行写操作;
而消费者只要在writeIndex != ReadIndex的情况下都可以对队列进行读操作。
这里,消费者和生产者需要在数据操作完之后才修改相应游标。
方法2和方法3时间对比:
明显后者效率要远远高于前者,采用队列的方法在访问一百万条数据只需18秒cpu时间(用户时间也仍然需要18s,应该是对文件写操作耗时),但方法2在优化过后访问一千条数据都需要4秒cpu时间,十万条则需要6分40秒。
改进后效果如图所示:
读取一百万条数据也只要18秒的时间,速度提升非常明显。
另外关于性能优化:程序在运行时空转会耗费大量时间,以消费者为例,轮到它执行的时候会不断轮询对应标志位看自己是否可读,可如果生产者还没有生产数据的时候这种轮询其实是无谓的,还暂用时间片,这个时候消费者可以主动让出时间休眠一会,等待对方生产数据再来读取;对于生产者也是一样的道理。在加入休眠后发现程序的性能都提高了很多。还有包括使用字符串拼接这种自定义缓存都会降低程序效率。
附代码:
***********************myshare.h************************
#include <stdio.h>
#include <sys/types.h>
#include
<sys/stat.h>
#include <fcntl.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <memory.h>
#include
<signal.h>
#include <string.h>
#define SHM_MODE (SHM_R | SHM_W | IPC_CREAT)
#define PATHNAME
"/etc/ssh/sshd_config"
#define CLASSID 1
#define
ITEMLEN 100
#define QEUELEN 1000
#define QUIT 1
typedef struct _stItem
{
unsigned int
items[ITEMLEN];
}stItem;
typedef struct _stctrl
{
int quit;
int windex;
int rindex;
}stCtrl;
typedef struct _stCItem
{
stItem item[QEUELEN];
int quit;
int windex;
int rindex;
}stCItem;
*************producer.c****************
#include "myshare.h"
/* 自定义接受键盘信号处理函数(ctrl+c,ctrl+\) */
static void sig_quit(int signo);
/* 标志位,如果有键盘中断置1,否则为0 */
int quit;
int main(int argc, char *argv[])
{
unsigned long
int i;
int idx;
/* 共享存储的标志ID */
int shmid;
/* 共享存储指针 */
stCItem *shmptr;
/* 用来生成标志ID的KEY
*/
key_t shmkey;
/* 使用ftok函数拉生产KEY
*/
if ( (shmkey = ftok(PATHNAME, CLASSID)) == -1)
return my_error_sys("ftok
error");
/* 根据KEY来申请共享存储,确保server/client挂载相同空间 */
if ( (shmid = shmget(shmkey, sizeof(stCItem), SHM_MODE))
< 0)
return
my_error_sys("shmget error");
/* 连接共享存储
*/
if ( (shmptr = shmat(shmid, 0, 0))
== (void *)-1)
return
my_error_sys("shmat error");
/* 注册处理函数,监听键盘信号 */
if (signal(SIGINT, sig_quit) == SIG_ERR)
return my_error_sys("signal
error");
if (signal(SIGQUIT, sig_quit) == SIG_ERR)
return my_error_sys("signal
error");
/* server每次启动先初始化共享存储 */
memset((void *)shmptr, 0, sizeof(stCItem));
i = 0;
quit = 0;
printf("produce data
start...\n");
while (!quit && !shmptr->quit)
{
if
((shmptr->windex + 1) % QEUELEN != shmptr->rindex)
{
/*
如果队列有空槽则生产数据 */
for (idx
= 0; idx < ITEMLEN; ++idx)
((shmptr->item)[shmptr->windex].items)[idx] = i;
++i;
/*
写完数据后修改游标 */
shmptr->windex = (shmptr->windex + 1) % QEUELEN;
}
else
/*
避免空转,主动放弃时间片 */
usleep(1);
}
/* 通知对方退出 */
shmptr->quit = 1;
/* 清除对应共享存储ID
*/
if (shmctl(shmid, IPC_RMID, 0) < 0)
return my_error_sys("shmctrl data
error");
printf("produce datas i : %d\n", i);
printf("produce data stop...\n");
return 0;
}
int my_error_sys(const char *pstr)
{
printf(pstr);
printf("\n");
return -1;
}
static void sig_quit(int signo)
{
quit = 1;
}
****************consumer.c********************
#include "myshare.h"
static void sig_quit(int signo);
int quit;
int main(int argc, char *argv[])
{
unsigned long
int idx;
unsigned long int i;
unsigned long int datanum;
/* 共享存储的标志ID */
int shmid;
/* 共享存储指针 */
stCItem *shmptr;
/* 用来生成标志ID的KEY */
key_t shmkey;
FILE *fp;
char *resultfile =
"data.txt";
/* 使用参数来控制读取的数据量,方便测试 */
if( argc != 2)
return my_error_sys("usage:
consumer <data number>");
sscanf(argv[1], "%d",
&datanum);
if ( (fp = fopen(resultfile, "w")) == NULL )
return my_error_sys("open file
error");
/* 使用ftok函数拉生产KEY */
if (
(shmkey = ftok(PATHNAME, CLASSID)) == -1)
return my_error_sys("ftok
error");
/* 根据KEY来申请共享存储,确保server/client挂载相同空间 */
if ( (shmid = shmget(shmkey, sizeof(stCItem), SHM_MODE))
< 0)
return
my_error_sys("shmget error");
/* 连接共享存储
*/
if ( (shmptr = shmat(shmid, 0, 0))
== (void *)-1)
return
my_error_sys("shmat error");
/* 注册处理函数,监听键盘信号 */
if (signal(SIGINT, sig_quit) == SIG_ERR)
return my_error_sys("signal
error");
if (signal(SIGQUIT, sig_quit) == SIG_ERR)
return my_error_sys("signal
error");
quit = 0;
i = 0;
memset((void *)shmptr, 0, sizeof(stCItem));
printf("start consume data....\n");
while (!quit)
{
/* 如果队列有数据则消费数据 */
if( shmptr->rindex !=
shmptr->windex )
{
for (idx
= 0; idx < ITEMLEN; ++idx)
{
fprintf(fp, "%u\t", (shmptr->item)[shmptr->rindex].items[idx]);
}
/*
用换行符刷新缓存区 */
fprintf(fp,"\n");
/*
读取完数据后修改游标 */
shmptr->rindex = (shmptr->rindex + 1) % QEUELEN;
++i;
if (i
>= datanum)
break;
}
else
/*
避免空转,主动放弃时间片 */
usleep(1);
}
/* 通知对方退出 */
shmptr->quit = 1;
fclose(fp);
/* 清除对应共享存储ID */
if (shmctl(shmid,
IPC_RMID, 0) < 0)
return
my_error_sys("shmctl data error");
printf("consumer datas
i : %d\n", i);
printf("quit consume...\n");
return 0;
}
int my_error_sys(const char *pstr)
{
printf(pstr);
printf("\n");
return -1;
}
static void sig_quit(int signo)
{
quit = 1;
}