Camel - DefaultReactiveExecutor

Camel - DefaultReactiveExecutor

DefaultReactiveExecutor

作用:

用于创建Worker,并对监控Worker的运行情况;监控内容包括:

  • 已创建Worker
  • 正在执行的Worker
  • 当前挂起的任务数

一个Worker 对应一个Thread。

每个Worker创建时会记录自己的编号,并且会创建一个双端队列(Deque),如果是异步消息往队列头部添加任务,如果是同步消息往尾部添加任务。

org.apache.camel.impl.engine.DefaultReactiveExecutor.Worker#schedule

void schedule(Runnable runnable, boolean first, boolean main, boolean sync) {
 if (LOG.isTraceEnabled()) {
     LOG.trace("Schedule [first={}, main={}, sync={}]: {}", first, main, sync, runnable);
 }
 if (main) {
     if (!queue.isEmpty()) {
         if (back == null) {
             back = new ArrayDeque<>();
         }
         back.push(queue);
         queue = new ArrayDeque<>();
     }
 }
 if (first) {
     queue.addFirst(runnable);
     executor.pendingTasks.incrementAndGet();
 } else {
     queue.addLast(runnable);
     executor.pendingTasks.incrementAndGet();
 }
 if (!running || sync) {
     running = true;
     executor.runningWorkers.incrementAndGet();
     try {
         for (;;) {
             final Runnable polled = queue.pollFirst();
             if (polled == null) {
                 if (back != null && !back.isEmpty()) {
                     queue = back.pollFirst();
                     continue;
                 } else {
                     break;
                 }
             }
             try {
                 executor.pendingTasks.decrementAndGet();
                 if (LOG.isTraceEnabled()) {
                     LOG.trace("Worker #{} running: {}", number, runnable);
                 }
                 polled.run();
             } catch (Throwable t) {
                 LOG.warn("Error executing reactive work due to " + t.getMessage() + ". This exception is ignored.",
                         t);
             }
         }
     } finally {
         running = false;
         executor.runningWorkers.decrementAndGet();
     }
 } else {
     if (LOG.isTraceEnabled()) {
         LOG.trace("Queuing reactive work: {}", runnable);
     }
 }
}
上一篇:Apache Camel 教程


下一篇:探索3种*「集成框架」Apache、Spring和Mule