无名管道的C++封装

xpipe-无名管道的C++封装类

无名管道的C++封装类,用于父子进程进行通信

基础介绍


unix下一切皆文件,管道也不例外。无名管道pipe定义在<unistd.h>中。

 #include <unistd.h>
 int pipe(int fd[2]);
其中fd[0]是读端,fd[1]是写端,fd[1]的输出是fd[0]的输入,因此管道是一个有向的半双工通信方式。使用`write(fd[1],...)`和`read(fd[0],...)`对管道中的信息进行读写。无名管道通常运用于父子进程间通信。关闭读端或者写端是使用`close`函数,同文件句柄一样,关闭后不能重新打开。如果关闭后使用该端,系统会发送一个`SIGPIPE`的信号。作为一个文件,管道有一个缓存大小限制,这是一个运行时限制,使用`fpathconf`函数可以查看其大小,类型名为`_PC_PIPE_BUF`.
如:
 cout<<fpathconf(fd[0],_PC_PIPE_BUF)<<endl;
在我的 Ubuntu10.10 下为4096字节,刚好一页大小。而在AIX服务器上,管道大小的限制则为32768字节。

读写管道使用系统函数read和write,如:

 write(m_fd[1],content.c_str(),content.length());
这能体现管道作为文件的本质,但不能体现通信的意图,因此我将管道的读写封装为与socket中发送和接收。
 
ssize_t xpipe::send(void *buf, size_t n)
 {
  return write(m_fd[1], buf, n);
 }
 ssize_t xpipe::recv(void *buf, size_t nbytes)
 {
  return read(m_fd[0], buf, nbytes);
 }

使用中,通信的内容常常为字符串,上述两个函数不仅能满足这个要求,还能传递一些简单结构体消息(稍后在讨论),但是每次都要输入长度。为简化开发,我将send和recv重载,作为特化方法,方便字符串的传递。使用方法非常简单,如:
 xpipe x;
 x.send("Whose your daddy?");
 string rs;
 x.recv(rs);

关于简单结构体,需要作个说明,这里指的是由C++基本类型组合而成的结构体,如:
 class child
 {
 public:
  long id;
  char name[20];
 };

注意: string不是基本类型 。传递结构体消息示例如下:
 xpipe x;
 child cc;
 cc.id=10;
 strcpy(cc.name,"PAYBY");
 x.send((child *)&cc,sizeof(child));
 /*-------------------------*/
 child dd;
 x.recv((child *)&dd,sizeof(child)); 

通信设计


文件是常见的通信媒介。但对文件的读写必须要加上读写的角色信息才能体现通信的过程。一个简单的单向通信包含消息发送方和消息接收方。对管道读写时常常有可以看到接收进程(读进程)关闭写端口,发送进程(写进程)关闭读端口,这样做是为了确保信息流向的单一,以免信息从接收进程流向发送进程。对通信而言,这依然不够直观。单向的信息流动中,一方仅仅是发送者(senderonly),另一方仅仅是接收者(receiveronly)。因此,在父子进程通信过程中,给他们指定角色就可以了。
示例代码如下:
 xpipe x;
 pid_t pid=fork();
 string item="whose your daddy";
 if (pid==0)
 {//child process
  x.receiveronly();
  
  string rs;
  x.recv(rs);
  //check point
  assert(rs==item);
  exit(0); 
 }
 else if (pid>0)
 {//parent process
  int ret;
  x.senderonly();
  x.send(item);
  wait(&ret);
 }

在示例代码中父进程被指定为发送者(x.senderonly();),不能通过`x`管道进行接收信息,子进程被指定为接收者(x.receiveronly();),不能通过x管道进行发送信息。要实现父子进程的互相通信。可以在指定另一个管道,将子进程指定为发送者,父进程指定为接收者。(见使用示例)

使用示例


父子进程互相通信

 
xpipe x;
 xpipe y;
 pid_t pid=fork();
 string x_item="whose your daddy?";
 string y_item="my father is Ligang!";
 if (pid==0)
 {//child process
  x.receiveronly();
  y.senderonly();
  string rs;
  x.recv(rs);
  //check point
  assert(rs==x_item);
  
  y.send(y_item);
  cout<<"child process:"<<y_item<<endl;
  exit(0); 
 }
 else if (pid>0)
 {//parent process
  int ret;
  x.senderonly();
  y.receiveronly();
  x.send(x_item);
  cout<<"parent process:"<<x_item<<endl;
  
  string ts;
  y.recv(ts);
  assert(ts==y_item);
  wait(&ret);
 }

预期结果为:

parent process:whose your daddy?
child process:my father is Ligang!


代码一览



头文件xpipe.h
#ifndef __XPIPEH__
#define __XPIPEH__

#include <unistd.h>
#include <string>
#include <string.h>
#include <stdio.h>
using namespace std;
/*
	无名管道的C++封装类,用于父子进程进行通信
	时间 :2013年7月15日 20:30:58
	邮 箱:chen_xueyou@163.com
*/
class xpipe
{
public:
	xpipe();
	~xpipe();
///核心方法
	ssize_t send(void *buf, size_t n);
	ssize_t recv(void *buf, size_t nbytes);
///常用方法特化
	void 	send(const string &content);
	void 	recv(string &content);
//确定通信角色
	void	senderonly(){DisReadable();}
	void	receiveronly(){DisWriteable();}

//属性操作
	string	role() const;

	long 	Bufsize(long newbufsize=0);
private:
	//读写关闭操作
	void 	DisReadable();
	void 	DisWriteable();

	/* data */
private:
	int 		m_fd[2];
	bool 	m_readable;
	bool 	m_writeable;
	
	long 	m_bufsize;
	char *	m_buf;
};
#endif

xpipe.cpp
#ifndef __XPIPECPP__
#define __XPIPECPP__

#include "xpipe.h"

xpipe::xpipe()
:m_readable(true),m_writeable(true),m_buf(NULL)
{
	int success=pipe(m_fd);
	if(success<0)
	{
		throw puts("create pipe failed!");
	}
	//检测系统设置的管道限制大小
	m_bufsize=fpathconf(m_fd[0],_PC_PIPE_BUF);
	
}
xpipe::~xpipe()
{
	if(m_readable)
		close(m_fd[0]);
	if(m_writeable)
		close(m_fd[1]);
	if(m_buf!=NULL)
		delete m_buf;
}
ssize_t xpipe::send(void *buf, size_t n)
{
	return write(m_fd[1], buf, n);
}
ssize_t xpipe::recv(void *buf, size_t nbytes)
{
	return read(m_fd[0], buf, nbytes);
}
void xpipe::send(const string &content)
{
	write(m_fd[1],content.c_str(),content.length());
}
void  xpipe::recv(string &content)
{
	if (m_buf==NULL)
	{//lazy run
		m_buf=new char[m_bufsize];
		if (m_buf==NULL)
		{
			throw puts("memory not enough!");
		}
	}
	memset(m_buf,0,m_bufsize);
	read(m_fd[0],m_buf,m_bufsize);
	content=string(m_buf);
}
//返回当前管道所扮演到角色
string xpipe::role() const
{
	if (m_writeable&&m_readable)
	{
		return "sender and receiver";
	}
	if (m_writeable)
	{
		return "sender";
	}
	if (m_readable)
	{
		return "receiver";
	}

	return "none";
	
}
/*关闭读端口*/
void xpipe::DisReadable()
{
	if(m_readable)
	{
		close(m_fd[0]);
		m_readable=false;
	}	
}
/*关闭写端口*/
void xpipe::DisWriteable()
{
	if (m_writeable)
	{
		close(m_fd[1]);
		m_writeable=false;
	}
		
}
/*如果输入大于0:调整缓存区大小,并返回调整后缓存区大小
  如果输入小于等于0,则不设置,只返回缓存区大小
  缓存区大小构造时默认设置为系统对管道的限制大小
  默认参数为0
 */
long xpipe::Bufsize(long newbufsize)
{
	//大于0才设置
	if (newbufsize>0)
	{
		m_bufsize=newbufsize;
		delete m_buf;
		//重新申请缓存区
		m_buf=new char[m_bufsize];
		if (m_buf==NULL)
		{
			throw puts("memory not enough!");
		}
	}
	
	return m_bufsize;
}
#endif


测试文件test.cpp
#include <iostream>
#include <assert.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <stdlib.h>
#include "xpipe.h"
using namespace std;


/*test Bufszie*/
void test1()
{
	xpipe x;
	int fd[2];
	pipe(fd);
	//check point
	assert(x.Bufsize()==fpathconf(fd[0],_PC_PIPE_BUF));
	x.Bufsize(20);
	//check point
	assert(x.Bufsize()==20);
}

/*test read/recv*/
/////////////////////////////////////
class childreq
{
public:
	long recid;
	char billtype[20];

};
void test2()
{
	xpipe x;
	pid_t pid=fork();
	if (pid==0)
	{
		x.receiveronly();

		childreq dd;
		x.recv((childreq *)&dd,sizeof(childreq));
		//check point
		assert(dd.recid==10);
		assert(!strcmp(dd.billtype,"PAYBY"));
		exit(0);
	}
	else if (pid>0)
	{
		x.senderonly();

		childreq cc;
		cc.recid=10;
		strcpy(cc.billtype,"PAYBY");

		x.send((childreq *)&cc,sizeof(childreq));
		int ret;
		wait(&ret);
	}
}

/*test read/recv*/
void test3()
{
	xpipe x;
	pid_t pid=fork();
	string item="whose your daddy";
	if (pid==0)
	{//child process
		x.receiveronly();
		
		string rs;
		x.recv(rs);
		//check point
		assert(rs==item);
		exit(0);	
	}
	else if (pid>0)
	{//parent process
		int ret;
		x.senderonly();
		x.send(item);
		wait(&ret);
	}
}
/*test role*/
void test4()
{
	xpipe x;
	assert(x.role()=="sender and receiver");
	x.senderonly();
	assert(x.role()=="sender");
	x.receiveronly();
	assert(x.role()=="none");

	xpipe y;
	y.receiveronly();
	assert(y.role()=="receiver");

}
/*test read/recv*/
void test5()
{
	xpipe x;
	xpipe y;
	pid_t pid=fork();
	string x_item="whose your daddy?";
	string y_item="my father is Ligang!";
	if (pid==0)
	{//child process
		x.receiveronly();
		y.senderonly();

		string rs;
		x.recv(rs);
		//check point
		assert(rs==x_item);
		
		y.send(y_item);
		cout<<"child process:"<<y_item<<endl;
		exit(0);	
	}
	else if (pid>0)
	{//parent process
		int ret;
		x.senderonly();
		y.receiveronly();

		x.send(x_item);
		cout<<"parent process:"<<x_item<<endl;
		
		string ts;
		y.recv(ts);
		assert(ts==y_item);

		wait(&ret);
	}
}
int main(int argc, char const *argv[])
{
	test1();
	test2();
	test3();
	test4();
	test5();
	cout<<"pass all the tests"<<endl;
}

makefile文件
CXX=g++
all:
        $(CXX) -c xpipe.cpp
        $(CXX) test.cpp -o test xpipe.o
clean:
        rm xpipe.o test



有问题,请给我留言,我的0CSDN博客 | 新浪微博 |  个人网站

源码在Github上,点击下载

上一篇:利用c#制作托盘程序,并禁止多个应用实例运行


下一篇:tar 解压时提示 Archive contains obsolescent base-64 headers