spdlog学习记录-async-logger 的调用过程

async-logger的代码在asyn_logger.h和async_looger-inl.h中,对应async_logger类。async_logger继承自logger,前面关于接受日志内容整理log_msg对象中的工作照常做,将对sink的调用(包括sink->log(msg)和sink->flush())都交由线程池去执行了,由此便实现了异步。代码如下:

void spdlog::async_logger::sink_it_(const details::log_msg &msg)
{
  if (auto pool_ptr = thread_pool_.lock()) {
      pool_ptr->post_log(shared_from_this(), msg, overflow_policy_);
  }
  else {
      throw_spdlog_ex("async log: thread pool doesn't exist anymore");
  }
}
// thread_pool_ 的声明
std::weak_ptr<details::thread_pool> thread_pool_;

线程池里面要有一个多生产多消费的线程安全队列,用来存放日志内容。可以有多个async_logger(即生产者)向里面生产日志,又同时又多个线程(即消费者)从里面消费日志。这个队列的容量应该是有限的,当队列满了之后向里面生产日志可以有不同的策略,spdlog提供了三种策略:阻塞丢弃新日志丢弃旧日志。为方便实现这个需求,用循环队列来实现

循环队列

循环队列的代码在circular_q.h中实现:

  • circular_q应设计成类模板,使其能够支持各种数据类型;
  • circular_q中实际存数据的std::vector<T> vec_的大小应该比circular_q能存的数据大小多一个,这样才能队列是满的还是空的

多生产多消费的线程安全队列

template <typename T>
class mpmc_blocking_queue {
  ...
  // try to enqueue and block if no room left
  void enqueue(T &&item) {
    std::unique_lock<std::mutex> lock(queue_mutex_);
    pop_cv_.wait(lock, [this] { return !this->q_.full(); }); // 阻塞式等待,直到q_非满状态
    q_.push_back(std::move(item));
    push_cv_.notify_one();
  }

  // blocking dequeue without a timeout.
  void dequeue(T &popped_item) {
    std::unique_lock<std::mutex> lock(queue_mutex_);
    push_cv_.wait(lock, [this] { return !this->q_.empty(); }); // 阻塞式等待
    popped_item = std::move(q_.front());
    q_.pop_front();
    pop_cv_.notify_one();
  }

private:
  std::mutex queue_mutex_;
  std::condition_variable push_cv_;
  std::condition_variable pop_cv_;
  spdlog::details::circular_q<T> q_;
  std::atomic<size_t> discard_counter_{0};
}

spdlog线程池

thread_pool使用了mpmc_blocking_queue(多生产者-多消费者阻塞队列)来缓存日志消息。这个队列允许多个前端线程(生产者)同时向队列中添加日志消息,也允许多个后端线程(消费者)同时从队列中取出消息。前端线程是指用户调用日志记录功能的线程。当用户调用异步日志记录方法时,日志消息会被封装成 async_msg 对象,并放入 mpmc_blocking_queue 队列中。thread_pool 内部维护了一组后端线程,这些线程从 mpmc_blocking_queue 队列中取出日志消息并进行处理。实际上是调用 async_logger::backend_sink_it_ 方法,将日志消息写入到预先注册的 sink(日志输出目标,如文件、控制台等)

log_msg
+string_view_t logger_name
+level::level_enum level
+log_clock::time_point time
+size_t thread_id
+source_loc source;
+string_view_t payload;
+mutable size_t color_range_start
+mutable size_t color_range_end
+log_msg(log_clock::time_point log_time, source_loc loc,string_view_t logger_name,level::level_enum lvl, string_view_t msg)
+log_msg(source_loc loc, string_view_t logger_name, level::level_enum lvl, string_view_t msg)
+log_msg(string_view_t logger_name, level::level_enum lvl, string_view_t msg)
log_msg_buffer
-memory_buf_t buffer
+log_msg_buffer()
+explicit log_msg_buffer(const log_msg &orig_msg)
+log_msg_buffer(const log_msg_buffer &other)
+log_msg_buffer(log_msg_buffer &&other)
+log_msg_buffer &operator=(const log_msg_buffer &other)
+log_msg_buffer &operator=(log_msg_buffer &&other)
-void update_string_views()
async_msg
+async_msg_type msg_type
+async_logger_ptr worker_ptr
+std::promise flush_promise
+async_msg()
+~async_msg()
+async_msg(const async_msg &)
+async_msg(async_logger_ptr &&worker, async_msg_type the_type, const details::log_msg &m)
+async_msg(async_logger_ptr &&worker, async_msg_type the_type)
+async_msg(async_logger_ptr &&worker, async_msg_type the_type, std::promise &&promise)
+explicit async_msg(async_msg_type the_type)
thread_pool
-q_type q_
-std::vector threads_
+thread_pool(size_t q_max_items, size_t threads_n, std::function on_thread_start, std::function on_thread_stop)
+thread_pool(size_t q_max_items, size_t threads_n, std::function on_thread_start)
+thread_pool(size_t q_max_items, size_t threads_n)
+void post_log(async_logger_ptr &&worker_ptr, const details::log_msg &msg, async_overflow_policy overflow_policy)
+std::future post_flush(async_logger_ptr &&worker_ptr, async_overflow_policy overflow_policy)
+size_t overrun_counter()
+void reset_overrun_counter()
+size_t discard_counter()
+void reset_discard_counter()
+size_t queue_size()
-void post_async_msg_(async_msg &&new_msg, async_overflow_policy overflow_policy)
-void worker_loop_()
-bool process_next_msg_()
SPDLOG_INLINE thread_pool::thread_pool(size_t q_max_items,
                                       size_t threads_n,
                                       std::function<void()> on_thread_start,
                                       std::function<void()> on_thread_stop)
    : q_(q_max_items) {
    if (threads_n == 0 || threads_n > 1000) {
        throw_spdlog_ex("spdlog::thread_pool(): invalid threads_n param (valid range is 1-1000)");
    }
    for (size_t i = 0; i < threads_n; i++) {
        threads_.emplace_back([this, on_thread_start, on_thread_stop] {
            on_thread_start();
            this->thread_pool::worker_loop_();
            on_thread_stop();
        });
    }
}
SPDLOG_INLINE thread_pool::thread_pool(size_t q_max_items,
                                       size_t threads_n,
                                       std::function<void()> on_thread_start)
    : thread_pool(q_max_items, threads_n, on_thread_start, [] {}) {
    
}
SPDLOG_INLINE thread_pool::thread_pool(size_t q_max_items, size_t threads_n)
    : thread_pool(q_max_items, threads_n, [] {}, [] {}) {
          
}

void SPDLOG_INLINE thread_pool::worker_loop_() {
    while (process_next_msg_()) {
    }
}

// process next message in the queue, return true if this thread should 
// still be active (while no terminate msg was received)
bool SPDLOG_INLINE thread_pool::process_next_msg_() {
  async_msg incoming_async_msg;
  q_.dequeue(incoming_async_msg);
  switch (incoming_async_msg.msg_type) {
    case async_msg_type::log: {
      incoming_async_msg.worker_ptr->backend_sink_it_(incoming_async_msg);
      return true;
    }
    case async_msg_type::flush: {
      incoming_async_msg.worker_ptr->backend_flush_();
      incoming_async_msg.flush_promise.set_value();
      return true;
    }
    case async_msg_type::terminate: {
      return false;
    }
    default: {
      assert(false);
    }
  }
  return true;
}

std::future<void> SPDLOG_INLINE thread_pool::post_flush(async_logger_ptr &&worker_ptr,
                                                        async_overflow_policy overflow_policy) {
    std::promise<void> promise;
    std::future<void> future = promise.get_future();
    post_async_msg_(async_msg(std::move(worker_ptr), async_msg_type::flush, std::move(promise)), 
                   overflow_policy);
    return future;
}
void SPDLOG_INLINE thread_pool::post_async_msg_(async_msg &&new_msg, async_overflow_policy overflow_policy) {
    if (overflow_policy == async_overflow_policy::block) {
        q_.enqueue(std::move(new_msg));
    } else if (overflow_policy == async_overflow_policy::overrun_oldest) {
        q_.enqueue_nowait(std::move(new_msg));
    } else {
        assert(overflow_policy == async_overflow_policy::discard_new);
        q_.enqueue_if_have_room(std::move(new_msg));
    }
}
上一篇:Java基础常见面试题总结(上)


下一篇:Codeforces Round 980 (Div. 2) A-C 题解