ActiveJ框架学习——Async I/O之HTTP(二)

2021SC@SDUSC

上一篇文章介绍了HTTP的主要内容,那么接下来我们就来看源代码部分。

首先,先看AsyncServlet接口。

  • 基本上,它只是一个异步函数,将 HttpRequest 映射到 HttpResponse
  • 开箱即用的预定义 AsyncServlet 集合(StaticServlet、BlockingServlet、RoutingServlet 等)
  • 广泛支持 AsyncServlets 的功能组合。 RoutingServlet 用于构建 servlet 路由
    • HTTP 路径和方法到 AsyncServlets(包括其他 RoutingServlets)的灵活映射
    • 支持路径参数(如 /users/:id)和相对路径

简单来说,它是servlet 函数的接口,异步接收 HttpRequest,处理它,然后返回 HttpResponse。

public interface AsyncServlet {
	@NotNull Promisable<HttpResponse> serve(@NotNull HttpRequest request) throws Exception;

	default @NotNull Promise<HttpResponse> serveAsync(@NotNull HttpRequest request) throws Exception {
		return serve(request).promise();
	}

	static @NotNull AsyncServlet ofBlocking(@NotNull Executor executor, @NotNull BlockingServlet blockingServlet) {
		return request -> request.loadBody()
				.then(() -> Promise.ofBlocking(executor,
						() -> blockingServlet.serve(request)));
	}
}
方法ofBlocking()使用给定的 Executor 将给定的 BlockingServlet 包装成异步的。

BlockingServlet

AsyncServlet 的阻塞版本。

public interface BlockingServlet {
	@NotNull HttpResponse serve(@NotNull HttpRequest request) throws Exception;
}

HttpExceptionFormatter

这是用于 HTTP 的格式化程序功能的接口。它转换可能从服务器根部 servlet 返回的未处理的检查异常,并将它们转换为 HTTP 错误响应。其具有如下操作:

标准格式化程序将除 HttpException 之外的所有异常映射到具有 500 状态代码的空响应。 HttpExceptions 被映射到响应及其状态代码、消息和原因的堆栈跟踪(如果已指定)。

	HttpExceptionFormatter DEFAULT_FORMATTER = e -> {
		HttpResponse response;
		if (e instanceof HttpError) {
			int code = ((HttpError) e).getCode();
			response = HttpResponse.ofCode(code).withHtml(HTTP_ERROR_HTML.replace("{title}", HttpUtils.getHttpErrorTitle(code)).replace("{message}", e.toString()));
		} else {
			response = HttpResponse.ofCode(500).withHtml(INTERNAL_SERVER_ERROR_HTML);
		}
		// default formatter leaks no information about unknown exceptions
		return response
				.withHeader(CACHE_CONTROL, "no-store")
				.withHeader(PRAGMA, "no-cache")
				.withHeader(AGE, "0");
	};

此格式化程序将异常的堆栈跟踪打印到 HTTP 响应中。

	HttpExceptionFormatter DEBUG_FORMATTER = e ->
			DebugStacktraceRenderer.render(e, e instanceof HttpError ? ((HttpError) e).getCode() : 500)
					.withHeader(CACHE_CONTROL, "no-store")
					.withHeader(PRAGMA, "no-cache")
					.withHeader(AGE, "0");

此格式化程序是 DEFAULT_FORMATTER 或 DEBUG_FORMATTER 之一,取决于是否从 IntelliJ IDE 启动应用程序。

	HttpExceptionFormatter COMMON_FORMATTER =
			System.getProperty("java.class.path", "").contains("idea_rt.jar") ?
					DEBUG_FORMATTER :
					DEFAULT_FORMATTER;

HttpPathPart

标记接口由枚举实现,这些枚举存储要附加到 HTTP 请求路径的命令

public interface HttpPathPart {

	@Override
	String toString();
}

IAsyncHttpClient

异步 HTTP 客户端的接口。它就像一个异步函数一样简单,它接受 HttpRequest 并为其返回一个 HttpResponse,所以它基本上是 AsyncServlet 的倒数。

public interface IAsyncHttpClient {
	Promise<HttpResponse> request(HttpRequest request);
}

IAsyncWebSocketClient

public interface IAsyncWebSocketClient {
	Promise<WebSocket> webSocketRequest(HttpRequest request);
}

WebSocket

允许以帧或消息的形式发送和接收网络套接字数据的抽象类。由于此接口扩展了 AsyncCloseable 接口,因此可以使用适当的异常关闭网络套接字。如果异常是 WebSocketException 的实例,它将被转换为具有相应关闭代码和关闭原因的适当关闭帧。 任何其他异常都将导致在客户端和服务器上具有关闭代码 1001 和 1011 的关闭帧。

其中具体的方法有:

@NotNull Promise<Message> readMessage();

返回一个由一个或多个数据帧组成的完整网络套接字消息的promise。如果另一方在其他消息之间发送了代码 1000 的关闭帧,则可能返回 null 的promise。 null 消息表示流结束。在调用 readFrame() 后调用该方法没有返回消息的最后一帧是非法的。

	@NotNull Promise<Frame> readFrame();

返回一个网络套接字数据帧的promise。它可能包含整个 Web 套接字消息或只是消息的一部分。文本框架的任何 UTF-8 验证都应由此方法的用户完成。如果对方发送了一个关闭的代码帧 1000,可能会返回一个 null 的。 null 帧表示流结束。所有 readXXX 方法都应该被连续调用。在调用此方法并且没有返回消息的最后一帧之后调用readMessage() 是非法的。

	default @NotNull ChannelSupplier<Frame> frameReadChannel() {
		return ChannelSupplier.of(this::readFrame, this);
	}

允许获取 Frame 的频道供应商的快捷方式。

 

	default @NotNull ChannelSupplier<Message> messageReadChannel() {
		return ChannelSupplier.of(this::readMessage, this);
	}

允许获取Message 的频道供应商的快捷方式,

	@NotNull Promise<Void> writeMessage(@Nullable Message msg);

一种发送网络套接字消息的方法。如果将 null 传递给此方法,则表示流结束。相当于发送一个关闭帧,关闭代码为 1000,表示web socket连接正常关闭。

	@NotNull Promise<Void> writeFrame(@Nullable Frame frame);

一种发送网络套接字数据帧的方法。如果将 null 传递给此方法,则表示流结束。相当于发送一个关闭帧,关闭代码为 1000,表示web socket连接正常关闭。

	default @NotNull ChannelConsumer<Frame> frameWriteChannel() {
		return ChannelConsumer.of(this::writeFrame, this)
				.withAcknowledgement(ack -> ack.then(() -> writeFrame(null)));
	}

 允许获取 Frame 的频道使用者的快捷方式。

	default @NotNull ChannelConsumer<Message> messageWriteChannel() {
		return ChannelConsumer.of(this::writeMessage, this)
				.withAcknowledgement(ack -> ack
						.then(() -> writeMessage(null)));
	}

允许获取 Message 的频道消费者的快捷方式。

	HttpRequest getRequest();

一种用于检查与此 Web 套接字关联的 HTTP 请求的方法。

	HttpRequest getRequest();

一种用于检查与此 Web 套接字关联的 HTTP 请求的方法。

	HttpResponse getResponse();

一种检查与此 Web 套接字关联的 HTTP 响应的方法。

	boolean isClosed();

 指示这个套接字是否已经关闭

 最后,该接口含有两个内部类Message和Frame。

  • Message:完整的 Web 套接字消息的表示。它可能包含文本或二进制数据。
  • Frame:网络套接字数据帧的表示。它可以是文本、二进制或连续类型之一。
    	final class Message implements Recyclable {
    		private final MessageType type;
    		private final @Nullable ByteBuf binaryPayload;
    		private final @Nullable String textPayload;
    
    		Message(MessageType type, @Nullable ByteBuf binaryPayload, @Nullable String textPayload) {
    			this.type = type;
    			this.textPayload = textPayload;
    			this.binaryPayload = binaryPayload;
    		}
    
    		public static Message text(String payload) {
    			return new Message(TEXT, null, payload);
    		}
    
    		public static Message binary(ByteBuf payload) {
    			return new Message(BINARY, payload, null);
    		}
    
    		public MessageType getType() {
    			return type;
    		}
    
    		public ByteBuf getBuf() {
    			return checkNotNull(binaryPayload);
    		}
    
    		public String getText() {
    			return checkNotNull(textPayload);
    		}
    
    		@Override
    		public void recycle() {
    			if (binaryPayload != null) {
    				binaryPayload.recycle();
    			}
    		}
    
    		public enum MessageType {
    			TEXT, BINARY
    		}
    	}
    	final class Frame implements Recyclable {
    		private final FrameType type;
    		private final ByteBuf payload;
    		private final boolean isLastFrame;
    
    		Frame(FrameType type, ByteBuf payload, boolean isLastFrame) {
    			this.type = type;
    			this.payload = payload;
    			this.isLastFrame = isLastFrame;
    		}
    
    		public static Frame text(ByteBuf buf) {
    			return new Frame(FrameType.TEXT, buf, true);
    		}
    
    		public static Frame text(ByteBuf buf, boolean isLastFrame) {
    			return new Frame(FrameType.TEXT, buf, isLastFrame);
    		}
    
    		public static Frame binary(ByteBuf buf) {
    			return new Frame(FrameType.BINARY, buf, true);
    		}
    
    		public static Frame binary(ByteBuf buf, boolean isLastFrame) {
    			return new Frame(FrameType.BINARY, buf, isLastFrame);
    		}
    
    		public static Frame next(ByteBuf buf, boolean isLastFrame) {
    			return new Frame(CONTINUATION, buf, isLastFrame);
    		}
    
    		public FrameType getType() {
    			return type;
    		}
    
    		public ByteBuf getPayload() {
    			return payload;
    		}
    
    		public boolean isLastFrame() {
    			return isLastFrame;
    		}
    
    		@Override
    		public void recycle() {
    			payload.recycle();
    		}
    
    		public enum FrameType {
    			TEXT, BINARY, CONTINUATION
    		}
    	}

上一篇:<2021SC@SDUSC>netty常见编解码器(二)


下一篇:netty系列之:使用UDP协议