2021SC@SDUSC
Eventloop是异步编程的内部类。在异步编程模型中,必须避免Eventloop线程中的阻塞操作(如I/O或长时间运行的计算)。应使用此类操作的异步版本。Eventloop表示只有一个阻塞操作的无限循环selector.select(),它选择一组键,这些键对应的通道已准备好进行I/O操作。使用从外部添加到Eventloop的带有任务的键和队列,它从方法run()中的一个线程开始异步执行,该方法被覆盖,因为Eventloop是Runnable的实现。当此eventloop没有选定的键且其包含任务的队列为空时,其工作将结束。
这篇文章来查看Eventloop之间具体是如何实现连接的
异步连接到给定的套接字地址。
public void connect(SocketAddress address, @NotNull Callback<SocketChannel> cb) {
connect(address, 0, cb);
}
public void connect(SocketAddress address, @Nullable Duration timeout, @NotNull Callback<SocketChannel> cb) {
connect(address, timeout == null ? 0L : timeout.toMillis(), cb);
}
以指定的超时值异步连接到给定的套接字地址。超时为零被解释为默认系统超时
address是socketChannel的地址;
timeout是要使用的超时值,以毫秒为单位,0 为默认系统连接超时。
public void connect(@NotNull SocketAddress address, long timeout, @NotNull Callback<SocketChannel> cb) {
if (CHECK) checkState(inEventloopThread(), "Not in eventloop thread");
SocketChannel channel;
try {
channel = SocketChannel.open();
} catch (IOException e) {
try {
cb.accept(null, e);
} catch (Throwable e1) {
handleError(eventloopFatalErrorHandler, e1, cb);
}
return;
}
try {
channel.configureBlocking(false);
channel.connect(address);
if (timeout == 0) {
channel.register(ensureSelector(), SelectionKey.OP_CONNECT, cb);
} else {
ScheduledRunnable scheduledTimeout = delay(timeout, () -> {
closeChannel(channel, null);
cb.accept(null, new AsyncTimeoutException("Connection timed out"));
});
channel.register(ensureSelector(), SelectionKey.OP_CONNECT,
(Callback<SocketChannel>) (result, e) -> {
scheduledTimeout.cancel();
cb.accept(result, e);
});
}
if (selector != null) {
selector.wakeup();
}
} catch (IOException e) {
closeChannel(channel, null);
try {
cb.accept(null, e);
} catch (Throwable e1) {
handleError(eventloopFatalErrorHandler, e1, cb);
}
}
}
public long tick() {
if (CHECK) checkState(inEventloopThread(), "Not in eventloop thread");
return (long) loop << 32 | tick;
}
将新任务发布到 localTasks的头部。推荐使用此方法,因为任务将尽快执行且不会使 CPU 缓存失效。
runnale代表可运行的任务。
*/
public void post(@NotNull @Async.Schedule Runnable runnable) {
if (CHECK) checkState(inEventloopThread(), "Not in eventloop thread");
localTasks.addFirst(runnable);
}
新任务将发布到 localTasks 的末尾。
public void postLast(@NotNull @Async.Schedule Runnable runnable) {
if (CHECK) checkState(inEventloopThread(), "Not in eventloop thread");
localTasks.addLast(runnable);
}
public void postNext(@NotNull @Async.Schedule Runnable runnable) {
if (CHECK) checkState(inEventloopThread(), "Not in eventloop thread");
nextTasks.add(runnable);
}
从其他线程发布新任务。这是与来自其他线程的 eventloop 通信的首选方法。
@Override
public void execute(@NotNull @Async.Schedule Runnable runnable) {
concurrentTasks.offer(runnable);
if (selector != null) {
selector.wakeup();
}
}
安排新任务。使用此可运行对象返回ScheduledRunnable。
@Override
public @NotNull ScheduledRunnable schedule(long timestamp, @NotNull @Async.Schedule Runnable runnable) {
if (CHECK) checkState(inEventloopThread(), "Not in eventloop thread");
return addScheduledTask(timestamp, runnable, false);
}
安排新的后台任务。使用此可运行对象返回ScheduledRunnable。如果 eventloop 只包含后台任务,它将被关闭
@Override
public @NotNull ScheduledRunnable scheduleBackground(long timestamp, @NotNull @Async.Schedule Runnable runnable) {
if (CHECK)
checkState(inEventloopThread(), "Not in eventloop thread");
return addScheduledTask(timestamp, runnable, true);
}
private @NotNull ScheduledRunnable addScheduledTask(long timestamp, Runnable runnable, boolean background) {
ScheduledRunnable scheduledTask = ScheduledRunnable.create(timestamp, runnable);
PriorityQueue<ScheduledRunnable> taskQueue = background ? backgroundTasks : scheduledTasks;
taskQueue.offer(scheduledTask);
return scheduledTask;
}
通知事件循环有关其他线程中的并发操作。在所有外部任务完成之前,Eventloop 不会退出。
public void startExternalTask() {
externalTasksCount.incrementAndGet();
}
通知事件循环其他线程中相应操作的完成。不调用此方法将阻止事件循环退出。
public void completeExternalTask() {
externalTasksCount.decrementAndGet();
}
public long refreshTimestampAndGet() {
refreshTimestamp();
return timestamp;
}
private void refreshTimestamp() {
timestamp = timeProvider.currentTimeMillis();
}
返回此事件循环的当前时间
@Override
public long currentTimeMillis() {
return timestamp;
}
@Override
public @NotNull Eventloop getEventloop() {
return this;
}
将Runnable 提交给eventloop 执行 Runnable 在eventloop 线程中执行
computation 是要执行的计算;
返回CompletableFuture当计算完成;
@Override
public @NotNull CompletableFuture<Void> submit(@NotNull RunnableEx computation) {
CompletableFuture<Void> future = new CompletableFuture<>();
execute(() -> {
try {
computation.run();
} catch (Exception ex) {
handleError(eventloopFatalErrorHandler, ex, computation);
future.completeExceptionally(ex);
return;
}
future.complete(null);
});
return future;
}
@Override
public <T> @NotNull CompletableFuture<T> submit(AsyncComputation<? extends T> computation) {
CompletableFuture<T> future = new CompletableFuture<>();
execute(() -> {
try {
computation.run((result, e) -> {
if (e == null) {
future.complete(result);
} else {
future.completeExceptionally(e);
}
});
} catch (Exception ex) {
handleError(eventloopFatalErrorHandler, ex, computation);
future.completeExceptionally(ex);
}
});
return future;
}