Camel - DefaultReactiveExecutor
DefaultReactiveExecutor
作用:
用于创建Worker,并对监控Worker的运行情况;监控内容包括:
- 已创建Worker
- 正在执行的Worker
- 当前挂起的任务数
一个Worker 对应一个Thread。
每个Worker创建时会记录自己的编号,并且会创建一个双端队列(Deque),如果是异步消息往队列头部添加任务,如果是同步消息往尾部添加任务。
org.apache.camel.impl.engine.DefaultReactiveExecutor.Worker#schedule
void schedule(Runnable runnable, boolean first, boolean main, boolean sync) {
if (LOG.isTraceEnabled()) {
LOG.trace("Schedule [first={}, main={}, sync={}]: {}", first, main, sync, runnable);
}
if (main) {
if (!queue.isEmpty()) {
if (back == null) {
back = new ArrayDeque<>();
}
back.push(queue);
queue = new ArrayDeque<>();
}
}
if (first) {
queue.addFirst(runnable);
executor.pendingTasks.incrementAndGet();
} else {
queue.addLast(runnable);
executor.pendingTasks.incrementAndGet();
}
if (!running || sync) {
running = true;
executor.runningWorkers.incrementAndGet();
try {
for (;;) {
final Runnable polled = queue.pollFirst();
if (polled == null) {
if (back != null && !back.isEmpty()) {
queue = back.pollFirst();
continue;
} else {
break;
}
}
try {
executor.pendingTasks.decrementAndGet();
if (LOG.isTraceEnabled()) {
LOG.trace("Worker #{} running: {}", number, runnable);
}
polled.run();
} catch (Throwable t) {
LOG.warn("Error executing reactive work due to " + t.getMessage() + ". This exception is ignored.",
t);
}
}
} finally {
running = false;
executor.runningWorkers.decrementAndGet();
}
} else {
if (LOG.isTraceEnabled()) {
LOG.trace("Queuing reactive work: {}", runnable);
}
}
}