2021SC@SDUSC
该篇继上文继续来看Datastream的代码,在这之前,让我们回顾一下什么是Datastream。
Datastream is an extremely fast implementation of reactive streams. It is useful for intra and inter-server communication and asynchronous data processing.
Datastream是一个非常快速的反应式流的实现。 它对服务器内和服务器间的 通信和异步数据处理非常有用
- 一个现代的异步反应流的实现(不同于Java 8中的流和传统的基于线程的阻塞流)。
- 具有极其有效的背压控制的异步性,以处理数据源速度的自然不平衡问题
- 可组合的流操作(映射器、还原器、过滤器、分类器、合并/拆分器、压缩、序列化)。
- 在Eventloop模块之上的基于流的网络和文件I/O
- 与CSP模块相兼容
接下来,是AbstractStreamSupplier类。该类是StreamSupplier的抽象类,帮助其处理状态转换,并有助于实现基本行为。
主要方法有,如下:
protected void onInit() {
}
public final void send(T item) {
dataAcceptorBuffered.accept(item);
}
public final Promise<Void> sendEndOfStream() {
if (CHECK) checkState(eventloop.inEventloopThread(), "Not in eventloop thread");
if (endOfStreamRequest) return flushPromise;
if (flushAsync > 0) {
asyncEnd();
}
endOfStreamRequest = true;
//noinspection unchecked
this.dataAcceptorBuffered = (StreamDataAcceptor<T>) NO_ACCEPTOR;
flush();
return getFlushPromise();
}
public final @NotNull Promise<Void> getFlushPromise() {
if (isEndOfStream()) {
return endOfStream;
} else if (flushPromise != null) {
return flushPromise;
} else if (dataAcceptor != null) {
return Promise.complete();
} else {
flushPromise = new SettablePromise<>();
return flushPromise;
}
}
private void ensureInitialized() {
if (!initialized) {
initialized = true;
onInit();
}
}
protected void onResumed() {
}
protected void onSuspended() {
}
public final @Nullable StreamDataAcceptor<T> getDataAcceptor() {
return dataAcceptor;
}
public final boolean isReady() {
return dataAcceptor != null;
}
protected void one rror(Exception e) {
}
protected void onCleanup() {
}
onInit():此方法将只调用一次:在创建此供应商之后的下一个eventloop中,或者在onStart()或onError(Exception)调用之前 send(T item):通过此供应商发送给定项目。如果供应商处于挂起状态,此方法将项目存储到内部缓冲区,并且在供应商到达sendEndOfStream()时不得调用此方法。 sendEndOfStream():将此供应商置于关闭状态,无错误。此操作是最终操作,无法撤消。只有第一次呼叫才会产生任何影响。 getFlushPromise():返回将在所有数据项传播到实际数据接受者时完成的承诺 ensureInitialized():仅当此供应商尚未初始化时,才通过调用onInit()来初始化此供应商。 onResumed():当此供应商从暂停状态更改为正常状态时调用。 onSuspended():当此供应商将正常状态更改为挂起状态时调用。 getDataAcceptor():当此供应商处于挂起状态时,返回当前数据接受器(使用updateDataAcceptor()方法设置的最后一个数据接受器)或返回null isReady():此供应商处于正常状态时返回true,暂停或关闭时返回false。 onError(Exception e):当此供应商错误地更改为关闭状态时,将调用此方法。 onCleanup():此供应商更改为关闭状态后,将异步调用此方法,而不考虑错误。
此外,还有flush()。导致此供应商尝试提供其缓冲项,并相应地更新当前状态。
private void flush() {
if (CHECK) checkState(eventloop.inEventloopThread(), "Not in eventloop thread");
flushRequest = true;
if (flushRunning || flushAsync > 0) return; // recursive call
if (endOfStream.isComplete()) return;
if (!isStarted()) return;
flushRunning = true;
while (flushRequest) {
flushRequest = false;
while (isReady() && !buffer.isEmpty()) {
T item = buffer.pollFirst();
this.dataAcceptor.accept(item);
}
if (isReady() && !isEndOfStream()) {
onResumed();
}
}
flushRunning = false;
if (flushAsync > 0) return;
if (!buffer.isEmpty()) return;
if (endOfStream.isComplete()) return;
if (!endOfStreamRequest) {
if (this.flushPromise != null) {
SettablePromise<Void> flushPromise = this.flushPromise;
this.flushPromise = null;
flushPromise.set(null);
}
return;
}
dataAcceptor = null;
if (flushPromise != null) {
flushPromise.set(null);
}
endOfStream.set(null);
}