bufferevent_filter过滤器示例完成服务端接收消息

demo下载地址

链接:https://pan.baidu.com/s/1j21bCNiXHBSFxk2VUCxK6g
提取码:v9hn

#include <event2/event.h>
#include <event2/listener.h>
#include <string.h>
#ifndef _WIN32
#include <signal.h>
#endif
#include <iostream>
using namespace std;

int main()
{
#ifdef _WIN32 
    //初始化socket库
    WSADATA wsa;
    WSAStartup(MAKEWORD(2,2),&wsa);
#else
    //忽略管道信号,发送数据给已关闭的socket
    if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
        return 1;
#endif

    std::cout << "test server!\n"; 
    //创建libevent的上下文
    event_base * base = event_base_new();
    if (base)
    {
        cout << "event_base_new success!" << endl;
    }

    void Server(event_base*base);
    Server(base);
    void Client(event_base* base);
    Client(base);
    //事件分发处理
    if(base)
        event_base_dispatch(base);
    if(base)
        event_base_free(base);
#ifdef _WIN32
    WSACleanup();
#endif
    return 0;
}

zlib_server.cpp

#include <event2/event.h>
#include <event2/listener.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <string.h>
#ifndef _WIN32
#include <signal.h>
#endif
#include <iostream>
#include <string>
using namespace std;
#define SPORT 5001
struct Status
{
    bool start = false;
    FILE *fp = 0;
    //string filename;
};
bufferevent_filter_result filter_in(evbuffer *s, evbuffer *d,
    ev_ssize_t limit,bufferevent_flush_mode mode,void *arg)
{
    //1 接收客户端发送的文件名
    char data[1024] = { 0 };
    int len = evbuffer_remove(s, data, sizeof(data) - 1);
    //cout << "server recv "<<data << endl;
    evbuffer_add(d, data, len);
    return BEV_OK;
}
static void read_cb(bufferevent *bev, void *arg)
{
    Status *status = (Status *)arg;
    if (!status->start)
    {
        //001接收文件名
        char data[1024] = { 0 };
        bufferevent_read(bev, data, sizeof(data) - 1);
        //status->filename = data;
        string out = "out\\";
        out += data;
        //打开写入文件
        status->fp = fopen(out.c_str(), "wb");
        if (!status->fp)
        {
            cout << "server open " << out << " failed!" << endl;
            return;
        }

        //002 回复OK
        bufferevent_write(bev, "OK", 2);
        status->start = true;
        return;
    }

    do
    {
        //写入文件
        char data[1024] = { 0 };
        int len = bufferevent_read(bev, data, sizeof(data));
        if (len >= 0)
        {
            fwrite(data, 1, len, status->fp);
            fflush(status->fp);
        }
    } while (evbuffer_get_length(bufferevent_get_input(bev)) >0 );
}

static void event_cb(bufferevent *bev,short events, void *arg)
{
    cout << "server event_cb " << events << endl;
    Status *status = (Status *)arg;
    if (events & BEV_EVENT_EOF)
    {
        cout << "server event BEV_EVENT_EOF" << endl;
        if (status->fp)
        {
            fclose(status->fp);
            status->fp = 0;
        }
        bufferevent_free(bev);
    }
}
static void listen_cb(struct evconnlistener * e, evutil_socket_t s, struct sockaddr *a, int socklen, void *arg)
{
    cout << "listen_cb" << endl;
    event_base *base = (event_base *)arg;
    //1 创建一个bufferevent 用来通信
    bufferevent *bev = bufferevent_socket_new(base, s, BEV_OPT_CLOSE_ON_FREE);
    Status *status = new Status();

    //2 添加过滤 并设置输入回调
    bufferevent *bev_filter = bufferevent_filter_new(bev,
        filter_in,//输入过滤函数
        0,//输出过滤
        BEV_OPT_CLOSE_ON_FREE,//关闭filter同时管理bufferevent
        0, //清理回调
        status    //传递参数
        );

    //3 设置回调 读取 事件(处理连接断开) 
    bufferevent_setcb(bev_filter, read_cb, 0, event_cb, status);
    bufferevent_enable(bev_filter, EV_READ | EV_WRITE);
}


void Server(event_base*base)
{
    cout << "begin Server" << endl;
    //监听端口
//socket ,bind,listen 绑定事件
    sockaddr_in sin;
    memset(&sin, 0, sizeof(sin));
    sin.sin_family = AF_INET;
    sin.sin_port = htons(SPORT);

    evconnlistener *ev = evconnlistener_new_bind(base,    // libevent的上下文
        listen_cb,                    //接收到连接的回调函数
        base,                        //回调函数获取的参数 arg
        LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE,//地址重用,evconnlistener关闭同时关闭socket
        10,                            //连接队列大小,对应listen函数
        (sockaddr*)&sin,                            //绑定的地址和端口
        sizeof(sin)
    );

}

zlib_client.cpp

#include <event2/event.h>
#include <event2/listener.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <string.h>
#ifndef _WIN32
#include <signal.h>
#endif
#include <iostream>
using namespace std;
#define FILEPATH "001.bmp"

struct ClientStatus
{
    FILE *fp = 0;
    bool end = false;
};
bufferevent_filter_result filter_out(evbuffer *s, evbuffer *d,
    ev_ssize_t limit, bufferevent_flush_mode mode, void *arg)
{

    //cout << "filter_out" << endl;
    char data[1024] = { 0 };
    int len = evbuffer_remove(s, data, sizeof(data));
    evbuffer_add(d, data, len);
    return BEV_OK;
}

static void client_read_cb(bufferevent *bev, void *arg)
{
    //002 接收服务端发送的OK回复
    char data[1024] = { 0 };
    int len = bufferevent_read(bev, data, sizeof(data) - 1);
    if (strcmp(data, "OK") == 0)
    {
        cout << data << endl;

        //开始发送文件,触发写入回调
        bufferevent_trigger(bev, EV_WRITE, 0);
    }
    else
    {
        bufferevent_free(bev);
    }
    cout << "client_read_cb " << len << endl;
}
static void client_write_cb(bufferevent *bev, void *arg)
{
    ClientStatus *s = (ClientStatus *)arg;
    FILE *fp = s->fp;
    //判断什么时候清理资源
    if (s->end)
    {
        //判断缓冲是否有数据,如果有刷新
        //获取过滤器绑定的buffer
        bufferevent *be = bufferevent_get_underlying(bev);
        //获取输出缓冲及其大小
        evbuffer *evb = bufferevent_get_output(be);
        int len = evbuffer_get_length(evb);
        //cout << "evbuffer_get_length = " << len << endl;
        if (len <= 0)
        {
            //立刻清理 如果缓冲有数据,不会发送
            bufferevent_free(bev);
            delete s;
            return;
        }
        //刷新缓冲
        bufferevent_flush(bev, EV_WRITE, BEV_FINISHED);
        return;
    }

    if (!fp)return;
    //cout << "client_write_cb" << endl;
    //读取文件
    char data[1024] = { 0 };
    int len = fread(data, 1, sizeof(data), fp);
    if (len <= 0)
    {
        fclose(fp);
        s->end = true;
        //刷新缓冲
        bufferevent_flush(bev, EV_WRITE, BEV_FINISHED);
        return;
    }
    //发送文件
    bufferevent_write(bev, data, len);
}
static void client_event_cb(bufferevent*be, short events, void *arg)
{
    cout << "client_event_cb " << events << endl;
    if (events & BEV_EVENT_CONNECTED)
    {
        cout << "BEV_EVENT_CONNECTED" << endl;
        //001 发送文件名
        bufferevent_write(be, FILEPATH, strlen(FILEPATH));

        //创建输出过滤
        bufferevent * bev_filter = bufferevent_filter_new(be, 0, filter_out,
                BEV_OPT_CLOSE_ON_FREE| BEV_OPT_DEFER_CALLBACKS,0,0);
        FILE *fp = fopen(FILEPATH, "rb");
        if (!fp)
        {
            cout << "open file " << FILEPATH << " failed!" << endl;
            //return;
        }
        ClientStatus *s = new ClientStatus();
        s->fp = fp;
        
        //设置读取、写入和事件的回调
        bufferevent_setcb(bev_filter, client_read_cb, client_write_cb, client_event_cb, s);
        bufferevent_enable(bev_filter, EV_READ | EV_WRITE);
    }
}

void Client(event_base* base)
{
    cout << "begin Client" << endl;
    //连接服务端
    sockaddr_in sin;
    memset(&sin, 0, sizeof(sin));
    sin.sin_family = AF_INET;
    sin.sin_port = htons(5001);
    evutil_inet_pton(AF_INET, "127.0.0.1", &sin.sin_addr.s_addr);
    bufferevent *bev = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE);
    
    //只绑定事件回调,用来确认连接成功
    bufferevent_enable(bev,EV_READ | EV_WRITE);
    bufferevent_setcb(bev, 0, 0, client_event_cb, 0);

    bufferevent_socket_connect(bev, (sockaddr*)&sin, sizeof(sin));
    //接收回复确认OK
}

 

上一篇:Netty那点事: 概述, Netty中的buffer, Channel与Pipeline


下一篇:Velero-K8S集群备份和恢复