libevent(十二)bufferevent filter zlib 压缩通信(二)

libevent(十二)bufferevent filter zlib 压缩通信(二)
使用zlib进行文件传输:
客户端:读取文件 -> 输出过滤器进行数据压缩 -> 发送数据
服务端:读取文件 -> 输入过滤器进行数据解压-> 存储数据

main.cpp

#include <event2/event.h>
#include <event2/listener.h>
#include <string.h>
#ifndef _WIN32
#include <signal.h>
#endif
#include <iostream>
#include <zlib.h>
using namespace std;

int main()
{
#ifdef _WIN32 
	//初始化socket库
	WSADATA wsa;
	WSAStartup(MAKEWORD(2, 2), &wsa);
#else
	//忽略管道信号,发送数据给已关闭的socket
	if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
		return 1;
#endif

	std::cout << "test server!\n";
	//创建libevent的上下文
	event_base* base = event_base_new();
	if (base)
	{
		cout << "event_base_new success!" << endl;
	}

	void Server(event_base * base);
	Server(base);
	void Client(event_base * base);
	Client(base);
	//事件分发处理
	if (base)
		event_base_dispatch(base);
	if (base)
		event_base_free(base);
#ifdef _WIN32
	WSACleanup();
#endif
	return 0;
}

zlib_server.cpp

#include <event2/event.h>
#include <event2/listener.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <string.h>
#ifndef _WIN32
#include <signal.h>
#endif
#include <iostream>
#include <string>
#include <zlib.h>

using namespace std;
#define SPORT 5001


struct Status
{
	bool start = false;
	FILE* fp = 0;
	z_stream* p;
	int recv_num = 0;
	int write_num = 0;
	~Status()
	{
		if (p)
		{
			inflateEnd(p);
		}
		delete p;
		p = 0;

		if (fp)
		{
			fclose(fp);
		}
		fp = 0;
	}
};


bufferevent_filter_result filter_in(evbuffer* s, evbuffer* d, 
	ev_ssize_t limit, bufferevent_flush_mode mode, void* arg) 
{
	//1 接收客户端发送的文件名
	Status* status = (Status*)arg;
	if (!status->start)
	{
		char data[1024] = { 0 };
		int len = evbuffer_remove(s, data, sizeof(data) - 1);
		evbuffer_add(d, data, len);
		return BEV_OK;
	}

	//解压
	evbuffer_iovec v_in[1];

	//读取数据,不清理缓冲
	int n = evbuffer_peek(s, -1, NULL, v_in, 1);
	if (n <= 0) 
	{
		return BEV_NEED_MORE;
	}
	z_stream* p = status->p;

	//zlib 输入数据大小
	p->avail_in = v_in[0].iov_len;

	//zlib 输入数据地址
	p->next_in = (Byte*)v_in[0].iov_base;

	//申请输出空间大小
	evbuffer_iovec v_out[1];
	evbuffer_reserve_space(d, 4096, v_out, 1);

	//zlib 输出空间大小
	p->avail_out = v_out[0].iov_len;

	//zlib 输出空间地址
	p->next_out = (Byte*)v_out[0].iov_base;

	//解压数据
	int re = inflate(p, Z_SYNC_FLUSH);
	if (re != Z_OK)
	{
		cerr << "inflate failed!" << endl;
	}


	//解压用了多少数据,从source evbuffer中移除
	//p->avail_in 未处理数据大小
	int n_read = v_in[0].iov_len - p->avail_in;

	//解压后数据大小 传入des evbuffer
	//p->avail_out 剩余空间大小
	int n_write = v_out[0].iov_len - p->avail_out;

	//移除source evbuffer中数据
	evbuffer_drain(s, n_read);

	//传入des evbuffer
	v_out[0].iov_len = n_write;
	evbuffer_commit_space(d, v_out, 1);
	cout << "Server n_read " << n_read << "\t n_write " << n_write << endl;
	status->recv_num += n_read;
	status->write_num += n_write;
	return BEV_OK;
}


static void read_cb(bufferevent* bev, void* arg)
{
	Status* status = (Status*)arg;
	if (!status->start)
	{
		//001接收文件名
		char data[1024] = { 0 };
		bufferevent_read(bev, data, sizeof(data) - 1);
		string out = "out/";
		out += data;

		//打开写入文件
		status->fp = fopen(out.c_str(), "wb");
		if (!status->fp)
		{
			cout << "server open " << out << " failed!" << endl;
			return;
		}

		//002 回复OK
		bufferevent_write(bev, "OK", 2);
		status->start = true;
		return;
	}

	do
	{
		//写入文件
		char data[1024] = { 0 };
		int len = bufferevent_read(bev, data, sizeof(data));
		if (len >= 0)
		{
			fwrite(data, 1, len, status->fp);
			fflush(status->fp);
		}
	} while (evbuffer_get_length(bufferevent_get_input(bev)) > 0);
}

static void event_cb(bufferevent* bev, short events, void* arg)
{
	cout << "server event_cb " << events << endl;
	Status* status = (Status*)arg;
	if (events & BEV_EVENT_EOF)
	{
		cout << "server event BEV_EVENT_EOF success!" << endl;
		cout << "Server recv = " << status->recv_num << endl;
		cout << "Server write = " << status->write_num << endl;
		delete status;
		bufferevent_free(bev);
	}
}


static void listen_cb(struct evconnlistener* e, evutil_socket_t s, struct sockaddr* a, int socklen, void* arg)
{
	cout << "listen_cb" << endl;
	event_base* base = (event_base*)arg;

	//1 创建一个bufferevent 用来通信
	bufferevent* bev = bufferevent_socket_new(base, s, BEV_OPT_CLOSE_ON_FREE);
	Status* status = new Status();
	status->p = new z_stream();
	inflateInit(status->p);

	//2 添加过滤 并设置输入回调
	bufferevent* bev_filter = bufferevent_filter_new(bev,
		filter_in,               // 输入过滤函数
		0,                       // 输出过滤
		BEV_OPT_CLOSE_ON_FREE,   // 关闭filter同时管理bufferevent
		0,                       // 清理回调
		status                   // 传递参数
	);

	//3 设置回调 读取 事件(处理连接断开) 
	bufferevent_setcb(bev_filter, read_cb, 0, event_cb, status);
	bufferevent_enable(bev_filter, EV_READ | EV_WRITE);
}


void Server(event_base* base)
{
	cout << "----begin Server----" << endl;
	//监听端口(socket ,bind,listen 绑定事件)

	sockaddr_in sin;
	memset(&sin, 0, sizeof(sin));
	sin.sin_family = AF_INET;
	sin.sin_port = htons(SPORT);

	evconnlistener* ev = evconnlistener_new_bind(base,    // libevent的上下文
		listen_cb,                                        // 接收到连接的回调函数
		base,                                             // 回调函数获取的参数 arg
		LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE,        // 地址重用,evconnlistener关闭同时关闭socket
		10,                                               // 连接队列大小,对应listen函数
		(sockaddr*)&sin,                                  // 绑定的地址和端口
		sizeof(sin)
	);
}

zlib_client.cpp

#include <event2/event.h>
#include <event2/listener.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <string.h>
#ifndef _WIN32
#include <signal.h>
#endif
#include <iostream>
#include <zlib.h>
using namespace std;

#define FILEPATH "001.txt"

struct ClientStatus
{
	FILE* fp = 0;
	bool end = false;
	bool startSend = false;
	z_stream* z_output = 0;
	int readNum = 0;
	int sendNum = 0;
	~ClientStatus() 
	{
		if (z_output)
		{
			deflateEnd(z_output);
		}
		delete z_output;
		z_output = 0;

		if (fp)
		{
			fclose(fp);
		}
		fp = 0;
	}
};

bufferevent_filter_result filter_out(evbuffer* s, evbuffer* d,
	ev_ssize_t limit, bufferevent_flush_mode mode, void* arg)
{
	ClientStatus* sta = (ClientStatus*)arg;

	//压缩文件,发送文件名消息去掉
	if (!sta->startSend)
	{
		char data[1024] = { 0 };
		int len = evbuffer_remove(s, data, sizeof(data));
		evbuffer_add(d, data, len);
		return BEV_OK;
	}
	//开始压缩文件(取出buffer中数据的引用)
	evbuffer_iovec v_in[1];
	int n = evbuffer_peek(s, -1, 0, v_in, 1);
	if (n<=0)
	{
		//调用write回调, 清理空间
		if (sta->end)
		{
			return BEV_OK;
		}
		//没有数据 BEV_NEED_MORE 不会进入写入回调
		return BEV_NEED_MORE;
	}
	//记下zlib上下文
	z_stream* p = sta->z_output;
	if (!p)
	{
		return BEV_ERROR;
	}
	//zlib 输入数据大小
	p->avail_in = v_in[0].iov_len;
	//zlib 输入数据地址
	p->next_in = (Byte*)v_in[0].iov_base;

	//申请输出空间大小
	evbuffer_iovec v_out[1];
	evbuffer_reserve_space(d, 4096, v_out, 1);
	//zlib 输出空间大小
	p->avail_out = v_out[0].iov_len;
	//zlib 输出空间地址
	p->next_out = (Byte*)v_out[0].iov_base;

	//压缩
	int re = deflate(p, Z_SYNC_FLUSH);
	if (re != Z_OK)
	{
		cerr << "deflate failed!" << endl;
	}

	//压缩用了多少数据,从source evbuffer中移除
	//p->avail_in 未处理数据大小
	int n_read = v_in[0].iov_len - p->avail_in;

	//压缩后数据大小 传入des evbuffer
	//p->avail_out 剩余空间大小
	int n_write = v_out[0].iov_len - p->avail_out;

	//移除source evbuffer中数据
	evbuffer_drain(s, n_read);

	//传入des evbuffer
	v_out[0].iov_len = n_write;
	evbuffer_commit_space(d, v_out, 1);
	cout << "Client n_read " << n_read << "\t n_write " << n_write << endl;
	sta->readNum += n_read;
	sta->sendNum += n_write;
	return BEV_OK;
}


static void client_read_cb(bufferevent* bev, void* arg)
{

	ClientStatus* sta = (ClientStatus*)arg;

	//002 接收服务端发送的OK回复
	char data[1024] = { 0 };
	int len = bufferevent_read(bev, data, sizeof(data) - 1);
	if (strcmp(data, "OK") == 0)
	{
		cout << data << endl;
		sta->startSend = true;
		
		//开始发送文件,触发写入回调
		bufferevent_trigger(bev, EV_WRITE, 0);
	}
	else
	{
		bufferevent_free(bev);
	}
	cout << "client_read_cb " << len << endl;
}


static void client_write_cb(bufferevent* bev, void* arg)
{

	cout << "client_write_cb" << endl;
	ClientStatus* s = (ClientStatus*)arg;
	FILE* fp = s->fp;
	//判断什么时候清理资源
	if (s->end)
	{
		//判断缓冲是否有数据,如果有刷新
		//获取过滤器绑定的buffer
		bufferevent* be = bufferevent_get_underlying(bev);
		//获取输出缓冲及其大小
		evbuffer* evb = bufferevent_get_output(be);
		int len = evbuffer_get_length(evb);
		if (len <= 0)
		{

			cout << "Client readNum = " << s->readNum << endl;
			cout << "Client sendNum = " << s->sendNum << endl;

			//立刻清理 如果缓冲有数据,不会发送
			bufferevent_free(bev);
			delete s;
			return;
		}
		//刷新缓冲
		bufferevent_flush(bev, EV_WRITE, BEV_FINISHED);
		return;
	}

	if (!fp)return;
	
	//读取文件
	char data[1024] = { 0 };
	int len = fread(data, 1, sizeof(data), fp);
	if (len <= 0)
	{
		fclose(fp);
		s->end = true;
		//刷新缓冲
		bufferevent_flush(bev, EV_WRITE, BEV_FINISHED);
		return;
	}
	//发送文件
	bufferevent_write(bev, data, len);
}


static void client_event_cb(bufferevent* be, short events, void* arg)
{
	cout << "client_event_cb " << events << endl;
	if (events & BEV_EVENT_CONNECTED)
	{
		cout << "client BEV_EVENT_CONNECTED" << endl;
		//001 发送文件名
		bufferevent_write(be, FILEPATH, strlen(FILEPATH));

		//初始化文件句柄
		FILE* fp = fopen(FILEPATH, "rb");
		if (!fp)
		{
			cout << "open file " << FILEPATH << " failed!" << endl;
		}
		ClientStatus* s = new ClientStatus();
		s->fp = fp;

		//初始化zlib上下文
		s->z_output = new z_stream();
		deflate(s->z_output, Z_DEFAULT_COMPRESSION);

		//创建输出过滤
		bufferevent* bev_filter = bufferevent_filter_new(be, 0, filter_out,
			BEV_OPT_CLOSE_ON_FREE | BEV_OPT_DEFER_CALLBACKS, 0, s);
		

		//设置读取、写入和事件的回调
		bufferevent_setcb(bev_filter, client_read_cb, client_write_cb, client_event_cb, s);
		bufferevent_enable(bev_filter, EV_READ | EV_WRITE);
	}
}


void Client(event_base* base)
{
	cout << "-----begin Client-----" << endl;
	//连接服务端
	sockaddr_in sin;
	memset(&sin, 0, sizeof(sin));
	sin.sin_family = AF_INET;
	sin.sin_port = htons(5001);
	evutil_inet_pton(AF_INET, "127.0.0.1", &sin.sin_addr.s_addr);
	bufferevent* bev = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE);

	//只绑定事件回调,用来确认连接成功
	bufferevent_enable(bev, EV_READ | EV_WRITE);
	bufferevent_setcb(bev, 0, 0, client_event_cb, 0);

	bufferevent_socket_connect(bev, (sockaddr*)&sin, sizeof(sin));
	//接收回复确认OK
}

Makefile

test_buffer_filter_zlib:main.cpp zlib_server.cpp zlib_client.cpp
        g++ $^ -o $@ -levent -lz
        ./$@

clean:
        rm -rf test_buffer_filter_zlib
        rm -rf *.o
上一篇:Velero-K8S集群备份和恢复


下一篇:网络编程06