ActiveJ框架学习——Async I/O之Eventloop(四)

2021SC@SDUSC

Eventloop是异步编程的内部类。在异步编程模型中,必须避免Eventloop线程中的阻塞操作(如I/O或长时间运行的计算)。应使用此类操作的异步版本。Eventloop表示只有一个阻塞操作的无限循环selector.select(),它选择一组键,这些键对应的通道已准备好进行I/O操作。使用从外部添加到Eventloop的带有任务的键和队列,它从方法run()中的一个线程开始异步执行,该方法被覆盖,因为Eventloop是Runnable的实现。当此eventloop没有选定的键且其包含任务的队列为空时,其工作将结束。
这篇文章来查看Eventloop之间具体是如何实现连接的

异步连接到给定的套接字地址。
	public void connect(SocketAddress address, @NotNull Callback<SocketChannel> cb) {
		connect(address, 0, cb);
	}

	public void connect(SocketAddress address, @Nullable Duration timeout, @NotNull Callback<SocketChannel> cb) {
		connect(address, timeout == null ? 0L : timeout.toMillis(), cb);
	}

以指定的超时值异步连接到给定的套接字地址。超时为零被解释为默认系统超时

address是socketChannel的地址;

timeout是要使用的超时值,以毫秒为单位,0 为默认系统连接超时。

	public void connect(@NotNull SocketAddress address, long timeout, @NotNull Callback<SocketChannel> cb) {
		if (CHECK) checkState(inEventloopThread(), "Not in eventloop thread");
		SocketChannel channel;
		try {
			channel = SocketChannel.open();
		} catch (IOException e) {
			try {
				cb.accept(null, e);
			} catch (Throwable e1) {
				handleError(eventloopFatalErrorHandler, e1, cb);
			}
			return;
		}
		try {
			channel.configureBlocking(false);
			channel.connect(address);

			if (timeout == 0) {
				channel.register(ensureSelector(), SelectionKey.OP_CONNECT, cb);
			} else {
				ScheduledRunnable scheduledTimeout = delay(timeout, () -> {
					closeChannel(channel, null);
					cb.accept(null, new AsyncTimeoutException("Connection timed out"));
				});

				channel.register(ensureSelector(), SelectionKey.OP_CONNECT,
						(Callback<SocketChannel>) (result, e) -> {
							scheduledTimeout.cancel();
							cb.accept(result, e);
						});
			}

			if (selector != null) {
				selector.wakeup();
			}
		} catch (IOException e) {
			closeChannel(channel, null);
			try {
				cb.accept(null, e);
			} catch (Throwable e1) {
				handleError(eventloopFatalErrorHandler, e1, cb);
			}
		}
	}

	public long tick() {
		if (CHECK) checkState(inEventloopThread(), "Not in eventloop thread");
		return (long) loop << 32 | tick;
	}

将新任务发布到 localTask​​s的头部。推荐使用此方法,因为任务将尽快执行且不会使 CPU 缓存失效。

runnale代表可运行的任务。

	 */
	public void post(@NotNull @Async.Schedule Runnable runnable) {
		if (CHECK) checkState(inEventloopThread(), "Not in eventloop thread");
		localTasks.addFirst(runnable);
	}

新任务将发布到 localTask​​s 的末尾。

	public void postLast(@NotNull @Async.Schedule Runnable runnable) {
		if (CHECK) checkState(inEventloopThread(), "Not in eventloop thread");
		localTasks.addLast(runnable);
	}

	public void postNext(@NotNull @Async.Schedule Runnable runnable) {
		if (CHECK) checkState(inEventloopThread(), "Not in eventloop thread");
		nextTasks.add(runnable);
	}

从其他线程发布新任务。这是与来自其他线程的 eventloop 通信的首选方法。

	@Override
	public void execute(@NotNull @Async.Schedule Runnable runnable) {
		concurrentTasks.offer(runnable);
		if (selector != null) {
			selector.wakeup();
		}
	}

安排新任务。使用此可运行对象返回ScheduledRunnable。

	@Override
	public @NotNull ScheduledRunnable schedule(long timestamp, @NotNull @Async.Schedule Runnable runnable) {
		if (CHECK) checkState(inEventloopThread(), "Not in eventloop thread");
		return addScheduledTask(timestamp, runnable, false);
	}

安排新的后台任务。使用此可运行对象返回ScheduledRunnable。如果 eventloop 只包含后台任务,它将被关闭

	@Override
	public @NotNull ScheduledRunnable scheduleBackground(long timestamp, @NotNull @Async.Schedule Runnable runnable) {
		if (CHECK)
			checkState(inEventloopThread(), "Not in eventloop thread");
		return addScheduledTask(timestamp, runnable, true);
	}

	private @NotNull ScheduledRunnable addScheduledTask(long timestamp, Runnable runnable, boolean background) {
		ScheduledRunnable scheduledTask = ScheduledRunnable.create(timestamp, runnable);
		PriorityQueue<ScheduledRunnable> taskQueue = background ? backgroundTasks : scheduledTasks;
		taskQueue.offer(scheduledTask);
		return scheduledTask;
	}

通知事件循环有关其他线程中的并发操作。在所有外部任务完成之前,Eventloop 不会退出。

	public void startExternalTask() {
		externalTasksCount.incrementAndGet();
	}

通知事件循环其他线程中相应操作的完成。不调用此方法将阻止事件循环退出。

	public void completeExternalTask() {
		externalTasksCount.decrementAndGet();
	}

	public long refreshTimestampAndGet() {
		refreshTimestamp();
		return timestamp;
	}

	private void refreshTimestamp() {
		timestamp = timeProvider.currentTimeMillis();
	}

返回此事件循环的当前时间

	@Override
	public long currentTimeMillis() {
		return timestamp;
	}

	@Override
	public @NotNull Eventloop getEventloop() {
		return this;
	}

将Runnable 提交给eventloop 执行 Runnable 在eventloop 线程中执行

computation 是要执行的计算;

返回CompletableFuture当计算完成;

	@Override
	public @NotNull CompletableFuture<Void> submit(@NotNull RunnableEx computation) {
		CompletableFuture<Void> future = new CompletableFuture<>();
		execute(() -> {
			try {
				computation.run();
			} catch (Exception ex) {
				handleError(eventloopFatalErrorHandler, ex, computation);
				future.completeExceptionally(ex);
				return;
			}
			future.complete(null);
		});
		return future;
	}

	@Override
	public <T> @NotNull CompletableFuture<T> submit(AsyncComputation<? extends T> computation) {
		CompletableFuture<T> future = new CompletableFuture<>();
		execute(() -> {
			try {
				computation.run((result, e) -> {
					if (e == null) {
						future.complete(result);
					} else {
						future.completeExceptionally(e);
					}
				});
			} catch (Exception ex) {
				handleError(eventloopFatalErrorHandler, ex, computation);
				future.completeExceptionally(ex);
			}
		});
		return future;
	}

上一篇:IOS学习笔记02---语言发展概述,计算机语言简介.


下一篇:广播、ARP协议