c – 在N秒内两次提升asio deadline_timer async_wait(N秒)导致操作被取消

我想要的是当一个消息队列收到一个int N时,处理函数将在N秒后调用.下面是我的代码.

如果两个近消息队列的持续时间大于int N,则它运行正常,但是当两个接收到的消息队列之间的持续时间小于N时,处理程序将在一个处理程序中打印“Operation canceled”,这不是我的想.

对于任何帮助,我都会非常感激.

#include <boost/asio.hpp>
#include <zmq.h>
#include <boost/thread.hpp>
#include <iostream>

boost::asio::io_service io_service;

void* context = zmq_ctx_new();
void* sock_pull = zmq_socket(context, ZMQ_PULL);


void handler(const boost::system::error_code &ec) {
    std::cout << "hello, world" << "\t" << ec.message() << std::endl;
}

void run() {
    io_service.run();
}

void thread_listener() {

     int nRecv;
     boost::asio::deadline_timer timer(io_service, boost::posix_time::seconds(0));
     while( true ) {
         zmq_recv(sock_pull, &nRecv, sizeof(nRecv), 0);
         std::cout << nRecv << std::endl;
         timer.expires_from_now(boost::posix_time::seconds(nRecv));
         timer.async_wait(handler);
     }

 }

 int main(int argc, char* argv[]) {

     boost::asio::io_service::work work(io_service);

     zmq_bind(sock_pull, "tcp://*:60000");
     boost::thread tThread(thread_listener);
     boost::thread tThreadRun(run);
     tThread.join();
     tThreadRun.join();
     return 0;

 }

解决方法:

你打电话时

timer.expires_from_now(boost::posix_time::seconds(nRecv));

这个,as the documentation states,取消任何异步计时器挂起.

如果您希望在给定时间内在飞行中有重叠请求,则一个计时器显然是不够的.幸运的是,在Asio的绑定共享指针周围有一个众所周知的模式,你可以使用它来模仿每个响应的“会话”.

假设您定义一个会话以包含它自己的私人计时器:

struct session : boost::enable_shared_from_this<session> {
    session(boost::asio::io_service& svc, int N) :
        timer(svc, boost::posix_time::seconds(N)) 
    {
        // Note: shared_from_this is not allowed from ctor
    }

    void start() {
        // it's critical that the completion handler is bound to a shared
        // pointer so the handler keeps the session alive:
        timer.async_wait(boost::bind(&session::handler, shared_from_this(), boost::asio::placeholders::error));
    }

  private:
    void handler(const boost::system::error_code &ec) {
        std::cout << "hello, world" << "\t" << ec.message() << std::endl;
    }

    boost::asio::deadline_timer timer;
};

现在,替换使用硬编码计时器实例的代码是微不足道的:

 timer.expires_from_now(boost::posix_time::seconds(nRecv));
 timer.async_wait(handler);

会话开始时:

 boost::make_shared<session>(io_service, nRecv)->start();

一个完全有效的例子(带有适当的存根ZMQ东西):Live On Coliru

#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/make_shared.hpp>
#include <iostream>

boost::asio::io_service io_service;

/////////////////////////////////////////////////////////////////////////
// I love stubbing out stuff I don't want to install just to help others
enum { ZMQ_PULL };
static void* zmq_ctx_new()         { return nullptr; }
static void* zmq_socket(void*,int) { return nullptr; }
static void  zmq_bind(void*,char const*) {}
static void  zmq_recv(void*,int*data,size_t,int) 
{ 
    boost::this_thread::sleep_for(boost::chrono::milliseconds(rand()%1000));
    *data = 2;
}
// End of stubs :)
/////////////////////////////////////////////////////////////////////////

void* context  = zmq_ctx_new();
void* sock_pull = zmq_socket(context, ZMQ_PULL);

struct session : boost::enable_shared_from_this<session> {
    session(boost::asio::io_service& svc, int N) :
        timer(svc, boost::posix_time::seconds(N)) 
    {
        // Note: shared_from_this is not allowed from ctor
    }

    void start() {
        // it's critical that the completion handler is bound to a shared
        // pointer so the handler keeps the session alive:
        timer.async_wait(boost::bind(&session::handler, shared_from_this(), boost::asio::placeholders::error));
    }

    ~session() {
        std::cout << "bye (session end)\n";
    }

  private:
    void handler(const boost::system::error_code &ec) {
        std::cout << "hello, world" << "\t" << ec.message() << std::endl;
    }

    boost::asio::deadline_timer timer;
};

void run() {
    io_service.run();
}

void thread_listener() {
    int nRecv = 0;
    for(int n=0; n<4; ++n) {
        zmq_recv(sock_pull, &nRecv, sizeof(nRecv), 0);
        std::cout << nRecv << std::endl;

        boost::make_shared<session>(io_service, nRecv)->start();
    }
}

int main() {
    auto work = boost::make_shared<boost::asio::io_service::work>(io_service);

    zmq_bind(sock_pull, "tcp://*:60000");
    boost::thread tThread(thread_listener);
    boost::thread tThreadRun(run);

    tThread.join();
    work.reset();

    tThreadRun.join();
}
上一篇:Java-ZeroMQ:消失的消息


下一篇:解析如何在C语言中调用shell命令的实现方法【转】