工厂方法创建流
- Backpressure : the ability for the consumer to signal the producer that the rate of emission is too high
push工厂方法
通过单线程生产者(在同一时间只有一个线程,可以调用next,complete或error)创建Flux实例,此方法适配于异步,单线程,多值Api,无须关注背压和取消。
同样可以桥接接口,示例见create示例(把create换成push)
-
push()并且create()两者都允许设置onRequest使用者以管理请求量并确保仅在有待处理的请求时才通过接收器推送数据。
-
onCancel 首先调用,仅用于取消信号。
-
onDispose 为完成,错误或取消信号而调用。
package com.ccand99.projectreactor.factory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import java.time.Duration;
import java.util.stream.IntStream;
public class pushDemo {
private static final Logger log = LoggerFactory.getLogger(pushDemo.class);
public static void main(String[] args) throws InterruptedException {
push();
Thread.sleep(3000);
}
static void push() {
//IntStream.range(2000,3000) Stream API 生成1000个整数
//emitter 为FluxSink<T>类型,next发送到Flux实例
//push 方法不需要关心背压和取消
Flux.push(emitter -> IntStream.range(2000,3000).forEach(emitter::next))
//延迟模拟背压
.delayElements(Duration.ofMillis(1))
.subscribe(e -> log.info("onNext: {}",e));
}
}
create工厂方法
从不同线程发送事件,会序列化FluxSink。和push都支持重载溢出策略,能通过注册额外的处理程序清理资源。可以支持接口桥接(官网有示例)
此外,由于create可以桥接异步API并管理背压,因此您可以通过指示以下内容来完善如何进行背压行为(backpressure)
OverflowStrategy:
IGNORE
完全忽略下游背压请求。当队列下游充满时,可能会IllegalStateException。
ERRORIllegalStateException
在下游无法跟上时发出信号。
DROP
如果下游没有准备好接收它,则丢弃输入的信号。
LATEST
让下游只从上游获取最新信号。
BUFFER(默认值)
,以在下游无法跟上时缓冲所有信号。(这会实现无限缓冲,并可能导致OutOfMemoryError)。
package com.ccand99.projectreactor.factory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import java.util.List;
public class CreateDemo {
private static final Logger log = LoggerFactory.getLogger(CreateDemo.class);
public static void main(String[] args) throws InterruptedException {
create();
Thread.sleep(1000);
}
//和push都支持重载溢出策略,能通过注册额外的处理程序清理资源。
static void create() throws InterruptedException {
Disposable disposable = Flux.create(emitter -> {
emitter.onDispose(() -> log.info("Dispose"));
//将事件推送到发射器。
emitter.next("test");
}).subscribe(e -> log.info("onNext: {}", e));
Thread.sleep(500);
disposable.dispose();
}
//接口桥接官网示例
interface MyEventListener<T> {
void onDataChunk(List<T> chunk);
void processComplete();
}
//此类只是为了编译
static class MyEventProcessor {
private MyEventListener myEventListener;
public MyEventProcessor() { }
void register(MyEventListener myEventListener){
this.myEventListener = myEventListener;
}
List<String> getHistory(long n){ return null; }
//...
}
static void officialDemo1() {
MyEventProcessor myEventProcessor = new MyEventProcessor();
Flux<String> bridge = Flux.create(sink -> {
//接口实现类桥接到Flux
myEventProcessor.register(
new MyEventListener<String>() {
public void onDataChunk(List<String> chunk) {
for(String s : chunk) {
sink.next(s);
}
}
public void processComplete() {
sink.complete();
}
});
});
}
//混合推拉,使用request
static void pullAndpush() {
MyEventProcessor myEventProcessor = new MyEventProcessor();
Flux<String> bridge = Flux.create(sink -> {
myEventProcessor.register(
new MyEventListener<String>() {
public void onDataChunk(List<String> chunk) {
for(String s : chunk) {
sink.next(s);
}
}
public void processComplete() {
sink.complete();
}
});
sink.onRequest(n -> {
// 发出请求时轮询消息。
List<String> messages = myEventProcessor.getHistory(n);
for(String s : messages) {
//如果消息立即可用,请将其推入接收器。
sink.next(s);
}
});
});
}
}
generate工厂方法
适用在基于生成器内部处理状态创建复杂序列。接收器为一个 SynchronousSink,其next()方法每次回调调用最多只能调用一次
package com.ccand99.projectreactor.factory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.util.function.Tuples;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicLong;
public class GenerateDemo {
private static final Logger log = LoggerFactory.getLogger(GenerateDemo.class);
public static void main(String[] args) throws InterruptedException {
//generate();
//officialDemo1();
//officialDemo2();
officialDemo3();
Thread.sleep(1000);
}
static void generate() {
Flux.generate(
// 初始状态
() -> Tuples.of(0L, 1L),
//这里类似reduce操作符
(state, sink) -> {
log.info("generated value {}", state.getT2());
sink.next(state.getT2());
long newValue = state.getT1() + state.getT2();
return Tuples.of(state.getT2(), newValue);
})
//同delayElements模拟会执行BiFunction,再执行onNext
//.delayElements(Duration.ofMillis(1))
.take(7)
.subscribe(e -> log.info("onNext: {}", e));
}
//https://projectreactor.io/docs/core/release/reference/
static void officialDemo1() {
Flux<String> flux = Flux.generate(
() -> 0,
(state, sink) -> {
log.info("generated value {}", state);
sink.next("3 x " + state + " = " + 3 * state);
if (state == 10) sink.complete();
return state + 1;
});
flux.delayElements(Duration.ofMillis(1))
.take(7).subscribe(e -> log.info("onNext: {}", e));
}
//generate(Supplier<S>, BiFunction, Consumer<S>)
static void officialDemo2() {
Flux<String> flux = Flux.generate(
//这次,我们生成一个可变对象作为状态
AtomicLong::new,
(state, sink) -> {
log.info("generated value {}", state);
long i = state.getAndIncrement();
sink.next("3 x " + i + " = " + 3*i);
if (i == 10) sink.complete();
return state;
});
flux.take(7).subscribe(e -> log.info("onNext: {}", e));
}
//包含consumer
static void officialDemo3() {
Flux<String> flux = Flux.generate(
AtomicLong::new,
(state, sink) -> {
log.info("generated value {}", state);
long i = state.getAndIncrement();
sink.next("3 x " + i + " = " + 3*i);
if (i == 10) sink.complete();
return state;
}, (state) -> System.out.println("state: " + state));
flux.take(7).subscribe(e -> log.info("onNext: {}", e));
}
}
Using工厂方法
using工厂方法能根据一个disposable创建流,在响应式中实现了try-with-resources。
Flux<String> flux =
Flux.using(
() -> disposableInstance,
disposable -> Flux.just(disposable.toString()),
Disposable::dispose
);
package com.ccand99.projectreactor.factory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import java.util.Arrays;
import java.util.Random;
public class UsingDemo {
private static final Logger log = LoggerFactory.getLogger(UsingDemo.class);
public static void main(String[] args) {
using();
}
//try-with-resources方式,此方式自动关闭代码块
static void CommandModel() {
try (Connection connection = Connection.newConnection()) {
connection.getData().forEach(
data -> log.info("Received data: {}",data)
);
} catch (Exception e){
log.info("Error: {}",e.getMessage()) ;
}
}
static void using() {
Flux<String> ioRequestResult = Flux.using(
//关联Connection生命周期
Connection::newConnection,
//转换方式fromIterable
connection -> Flux.fromIterable(connection.getData()),
//关闭方法
Connection::close
);
ioRequestResult.subscribe(
data -> log.info("Received data: {}",data),
e -> log.info("Error: {}",e.getMessage()),
() -> log.info("Stream finish")
);
}
//包装一个阻塞API(简化的Connection)
private static class Connection implements AutoCloseable {
private final Random random = new Random();
public Iterable<String> getData() {
if (random.nextInt(10) < 3) {
throw new RuntimeException("Communication error");
}
return Arrays.asList("Some","data");
}
@Override
public void close() {
log.info("IO Connection closed");
}
public static Connection newConnection() {
log.info("IO Connection created");
return new Connection();
}
}
}
usingWhen工厂
包装响应式事务。using通过Callable实例获取资源,usingWhen通过订阅Publisher。
package com.ccand99.projectreactor.factory;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.Random;
import java.util.function.BiFunction;
public class UsingWhenDemo {
private static final Logger log = LoggerFactory.getLogger(UsingWhenDemo.class);
public static void main(String[] args) throws InterruptedException {
Flux.usingWhen(
Transaction.beginTransaction(),
transaction -> transaction.insertRows(Flux.just("A", "B", "C")),
Transaction::commit,
new BiFunction<Transaction, Throwable, Publisher<?>>() {
@Override
public Publisher<?> apply(Transaction transaction, Throwable throwable) {
transaction.rollback();
log.info("error {}",throwable.getMessage());
return Mono.empty();
}
},
Transaction::rollback
).subscribe(
d -> log.info("onNext: {}",d),
e -> log.info("onError: {}",e),
() -> log.info("onComplete")
);
Thread.sleep(1000);
}
}
//简化的事务处理模型
class Transaction {
private static final Logger log = LoggerFactory.getLogger(Transaction.class);
private static final Random random = new Random();
private final int id;
public Transaction(int id) {
this.id = id;
log.info("[T: {}] created",id);
}
//静态方法,生成事务
public static Mono<Transaction> beginTransaction() {
return Mono.defer(
() -> Mono.just(new Transaction(random.nextInt(1000)))
);
}
public Flux<String> insertRows(Publisher<String> rows) {
//模拟插入,利用随机模拟产生插入失败的行为。
return Flux.from(rows)
.delayElements(Duration.ofMillis(100))
.flatMap( r-> {
if (random.nextInt(10) < 2) {
return Mono.error(new RuntimeException("Error: "+ r) );
} else {
return Mono.just(r);
}
});
}
//异步提交,有时提交失败
public Mono<Void> commit() {
return Mono.defer( () -> {
log.info("[T: {}] commit",id);
if (random.nextBoolean()) {
return Mono.empty();
} else {
return Mono.error(new RuntimeException("conflict"));
}
});
}
//异步回滚,有时事务回滚失败
public Mono<Void> rollback() {
return Mono.defer( () -> {
log.info("[T: {}] rollback",id);
if (random.nextBoolean()) {
return Mono.empty();
} else {
return Mono.error(new RuntimeException("conflict"));
}
});
}
}
错误处理
Demo:
package com.ccand99.projectreactor.handleError;
import reactor.core.publisher.Flux;
import reactor.util.retry.Retry;
import java.time.Duration;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
public class BadDemo {
public static void main(String[] args) throws InterruptedException {
handle();
Thread.sleep(8000);
}
static void handle() {
Flux.just("user-1")
.flatMap(user ->
recommendedBooks(user).retryWhen(Retry.backoff(5, Duration.ofMillis(100)))
.timeout(Duration.ofSeconds(4))
.onErrorResume(e -> Flux.just("The Martian"))
)
.subscribe(System.out::println,System.out::println,System.out::println);
}
static Flux<String> recommendedBooks(String userId) {
return Flux.defer(
() -> {
if (new Random().nextInt(10) < 7) {
return Flux.<String>error(new RuntimeException("err"))
.delaySequence(Duration.ofMillis(100));
} else {
return Flux.just("Blue Mars", "The Expanse")
.delayElements(Duration.ofMillis(50));
}
}).doOnSubscribe(s -> System.out.println(userId));
}
//官网retry示例:
static void offcial() {
AtomicInteger errorCount = new AtomicInteger();
AtomicInteger transientHelper = new AtomicInteger();
Flux<Integer> transientFlux = Flux.<Integer>generate(sink -> {
int i = transientHelper.getAndIncrement();
if (i == 10) {
// 我们generate的消息源中含有大量错误。当计数器达到10时,它将成功完成
sink.next(i);
sink.complete();
}
else if (i % 3 == 0) {
// 如果transientHelper原子是的倍数3,我们将发射onNext并结束当前的脉冲串。
sink.next(i);
}
else {
// 在其他情况下,我们发出onError。那是三分之二,所以2的爆发onError被1中断onNext
sink.error(new IllegalStateException("Transient error at " + i));
}
})
.doOnError(e -> errorCount.incrementAndGet());
//如果没有transientErrors(true),2第二个脉冲串将达到配置的最大尝试次数,并且发出该序列后将失败onNext(3)。
transientFlux.retryWhen(Retry.max(2).transientErrors(true))
.blockLast();
//assertThat(errorCount).hasValue(6);
}
}
反应序列中的 任何错误都是终端事件。即使使用了错误处理运算符,它也不会让原始序列继续。相反,它将onError信号转换为新序列的开始(后备序列)。换句话说,它将替换其上游的终止序列。未经检查的异常总是通过传播onError.
错误处理运算符决策树
-
在subscribe中的onError定义处理程序(如果未定义,则onError抛出UnsupportedOperationException。您可以使用Exceptions.isErrorCallbackNotImplemented方法进一步对其进行检测和分类。)
-
通过onErrorReturn来捕获错误,用一个默认值或从异常出计算出的值替换,等效于try-catch
Flux.just(1, 2, 0) .map(i -> "100 / " + i + " = " + (100 / i)) //this triggers an error with 0 .onErrorReturn("Divided by zero :("); // error handling example
-
可以用onErrorResume捕获异常,并执行备用工作流
//如果您的标称进程正在从外部且不可靠的服务中获取数据,但是您还保留了同一数据的本地缓存,该缓存可能会过时但更可靠 Flux.just("key1", "key2") .flatMap(k -> callExternalService(k) .onErrorResume(e -> getFromCache(k)));
等效于
String v1; try { v1 = callExternalService("key1"); } catch (Throwable error) { v1 = getFromCache("key1"); } String v2; try { v2 = callExternalService("key2"); } catch (Throwable error) { v2 = getFromCache("key2"); }
如果本是一个带异常处理的Futrue:
erroringFlux.onErrorResume(error -> Mono.just( MyWrapper.fromError(error) ));
-
用onErrorMap转换为另一个异常处理
Flux.just("timeout1") .flatMap(k -> callExternalService(k)) .onErrorMap(original -> new BusinessException("oops, SLA exceeded", original));
-
doOnError 运算符,等效于“捕获,记录特定于错误的消息并重新抛出”模式
LongAdder failureStat = new LongAdder(); Flux<String> flux = Flux.just("unknown") .flatMap(k -> callExternalService(k) .doOnError(e -> { failureStat.increment(); log("uh oh, falling back, service failed for key " + k); }) );
-
doFinally和using doFinally与序列终止(用onComplete或onError或取消)时要执行的副作用有关。它提示您哪种终止方式会引起副作用。using请查看工厂说明
Stats stats = new Stats(); LongAdder statsCancel = new LongAdder(); Flux<String> flux = Flux.just("foo", "bar") .doOnSubscribe(s -> stats.startTimer()) .doFinally(type -> { stats.stopTimerAndRecordTiming(); if (type == SignalType.CANCEL) statsCancel.increment(); }) .take(1);
-
retry重新执行响应式流(原始流依然终止),retryBackoff提供指数退避算法。retryWhen需要一个Flux参数(可以用
Retry.from(Function)
工厂方法创建)。Flux.interval(Duration.ofMillis(250)) .map(input -> { if (input < 3) return "tick " + input; throw new RuntimeException("boom"); }) .retry(1) .elapsed() .subscribe(System.out::println, System.err::println); Thread.sleep(2100);
-
other:
-
去空数据流:
defaultIfEmpty
返回默认值,或者switchIfEmpty
返回不同的响应流。 -
可以用timeout操作符,可以抛出TimeoutException,然后处理可以抛出TimeoutException异常
-
Reactor有一个Exceptions实用程序类,您可以使用它来确保仅在检查了异常的情况下包装异常:
-
Exceptions.propagate如有必要,使用该方法包装异常。它还throwIfFatal先调用 ,并且不包装RuntimeException。
-
使用该Exceptions.unwrap方法来获取原始的未包装的异常(返回到特定于反应堆的异常的层次结构的根本原因):
Flux<String> converted = Flux .range(1, 10) .map(i -> { try { return convert(i); } catch (IOException e) { throw Exceptions.propagate(e); } }); converted.subscribe( v -> System.out.println("RECEIVED: " + v), e -> { if (Exceptions.unwrap(e) instanceof IOException) { System.out.println("Something bad happened with I/O"); } else { System.out.println("Something bad happened"); }});
-