Dart3_异步编程

参考

https://dart.cn/guides/language/language-tour#asynchrony-support

https://dart.cn/guides/libraries/library-tour#dartasync---asynchronous-programming

https://dart.cn/codelabs/async-await

异步编程

在dart中是没有线程的概念的,取而代之的是异步编程。

可以使用两种方式创建异步函数:

l 通过async和await关键字

l 通过Future api来链式创建

async和await的操作和Future是分不开的,只是async和await更简化了逻辑,看起来像是同步代码一样。

在直接使用 Future API 前,首先应该考虑 await 来替代。代码中使用 await 表达式会比直接使用 Future API 更容易理解。

async和await

声明一个异步函数,在方法名后边添加上关键字async即可,同时返回值要修改成用Future<T>,T是函数体要返回的值,如下例:

Future<void> main() async {
var future = getName();
future.then((value) => print('getName:$value'));
print("after get name");
}

Future<String> getName() async {
  print("getName start");
return "wz";
}

输出结果是:

getName start

after get name

getName:wz

上边的代码其实并不会异步执行,因为异步执行是在遇到第一await时会挂起函数,并直接返回Future<T>,同时函数会等待await指定的操作完成后才继续方法后边的代码。

修改一下上边的getName的代码:

Future<String> getName() async {
  print("getName start");
  await Future.delayed(Duration(seconds: 2));
print("after delay");
  return "wz";
}

输出结果是:

getName start

after get name

after delay

getName:wz

await使用说明:

l await只能用在async中。

l await后可以跟普通函数(但没有意义的),也可以跟异步函数(返回值是Future<T>的函数)。

l await等待的操作结果是可以赋值给一个变量的,此时会把后边的Future<T>的T取出来,这样我们就可以用同步的方式编写异步调用,

var entrypoint = await findEntrypoint();

var exitCode = await runExecutable(entrypoint, args);

await flushThenExit(exitCode);

处理异常

使用 try、catch 以及 finally 来处理使用 await 导致的异常:

try {

  version = await lookUpVersion();

。。。

} catch (e) {

  // 无法找到版本时做出的反应

}

如果有异常发生的话,lookUpVersion下边的代码就不会执行了,会直接跳到catch中。

Future

https://api.dart.cn/stable/2.10.4/dart-async/Future-class.html

从 Dart 2.1 开始,使用 Future 和 Stream 不需要导入 dart:async ,因为 dart:core 库 export 了这些类。

上边通过async和await返回了一个Future对象,可以通过此对象注册一些监听,比如执行完后再去执行其他,异常处理等。

Future<int> future = getFuture();

future.then((value) => handleValue(value))

      .catchError((error) => handleError(error));

FutureOr<T>

先说一下这个类,因为Future中有很多方法都有此类,

此类表示的是一个值,这个值可以是Future<T>或T。

此类声明是内部future或value泛型类型的公共代理。对此类的引用被解析为内部类型。

任何类extend、mixin或implement 此类都是一个编译时错误。

Future的构造函数

l Future(FutureOr<T> computation())

创建一个Future,其中包含Timer.run异步调用computation()的结果。

l Future.delayed(Duration duration, [FutureOr<T> computation()])

创建一个Future,会在指定的延时后执行computation()

l Future.microtask(FutureOr<T> computation())

其中包含scheduleMicrotask异步调用computation()的结果。

microtask是比普通的事件(比如Timer事件)更先执行的一种任务。

l Future.sync(FutureOr<T> computation())

包含立即调用computation()的结果。

If calling computation throws, the returned future is completed with the error.

If calling computation returns a Future<T>, that future is returned.

If calling computation returns a non-future value, a future is returned which has been completed with that value.

l Future.error(Object error, [StackTrace? stackTrace])

创建一个Future,此Future有一个error的结果。

l Future.value([FutureOr<T>? value])

如果value是一个Future,那么会创建一个等待value的future执行完成的Future。

如果value是一个值,那么就相当于new Future<T>.sync(() => value)

Future的成员方法

l asStream() → Stream<T>

Creates a Stream containing the result of this future. [...]

l catchError(Function one rror, {bool test(Object error)}) → Future<T>

Handles errors emitted by this Future. [...]

确保调用 catchError() 方式在 then() 的结果上,而不是在原来的 Future 对象上调用。否则的话,catchError() 就只能处理原来 Future 对象抛出的异常,而无法处理 then() 代码里面的异常。

l then<R>(FutureOr<R> onValue(T value), {Function? one rror}) → Future<R>

Register callbacks to be called when this future completes. [...]

l timeout(Duration timeLimit, {FutureOr<T> onTimeout()}) → Future<T>

Time-out the future computation after timeLimit has passed. [...]

l whenComplete(FutureOr<void> action()) → Future<T>

Registers a function to be called when this future completes. [...]

Future的static方法

l any<T>(Iterable<Future<T>> futures) → Future<T>

Returns the result of the first future in futures to complete. [...]

l doWhile(FutureOr<bool> action()) → Future

Performs an operation repeatedly until it returns false. [...]

l forEach<T>(Iterable<T> elements, FutureOr action(T element)) → Future

Performs an action for each element of the iterable, in turn. [...]

l wait<T>(Iterable<Future<T>> futures, {bool eagerError: false, void cleanUp(T successValue)}) → Future<List<T>>

Waits for multiple futures to complete and collects their results. [...]

Stream

在 Dart API 中 Stream 对象随处可见,Stream 用来表示一系列数据。例如,HTML 中的按钮点击就是通过 stream 传递的。同样也可以将文件作为数据流来读取。

如果想从 Stream 中获取值,可以有两种选择:

l 使用 async 关键字和一个 异步循环(使用 await for 关键字标识)。

l 使用 Stream API。详情参考库概览。

await for

使用 await for 定义异步循环看起来是这样的:

await for (varOrType identifier in expression) {

  // 每当 Stream 发出一个值时会执行

}

expression的类型必须是 Stream。执行流程如下:

1. 等待直到 Stream 返回一个数据。

2. 使用 1 中 Stream 返回的数据执行循环体。

3. 重复 1、2 过程直到 Stream close。

使用 break 和 return 语句可以停止接收 Stream 数据,这样就跳出了循环并取消注册监听 Stream。

Stream

https://api.dart.cn/stable/2.10.4/dart-async/Stream-class.html

Stream提供了一种接收事件序列的方法。每个事件要么是一个数据事件,也称为流的一个元素,要么是一个错误事件,它是一个失败的通知。

当Stream已发出其所有事件时,单个“done”事件将通知侦听器已到达结束。

你可以通过Stream.listen()来监听Stream的事件,它会返回一个StreamSubscription对象,你可以在此对象上添加一些监听,暂停恢复事件的监听,或cancel取消监听。

当“done”事件被触发时,订阅者在接收事件之前被取消订阅。事件发送后,Stream就没有订阅者了。允许在此时之后向广播Stream添加新订阅者,但他们将尽快接收到新的“done”事件。

有两种类型的Stream:

l 单订阅Stream,

是只能被listen一次,多次调用是不允许的,即使在取消第一个监听之后在调用也不允许。

并且此Stream是冷流,即只有当有监听者时才会开始产生事件,取消监听后Stream停止发送事件,即使事件源仍然可以提供更多。

单订阅流通常用于流式传输较大的连续数据块,如文件I/O。

l 广播Stream。

可以被多个监听者监听,

并且此Stream是热流,即只要有事件就发送,不管有没有监听者,

单订阅Stream也可以通过asBroadcastStream()来转换成广播Stream。

当注册监听者时,只能接收到注册之后发送的事件,而不能接收到之前已经发送过的。

Stream的Api

属性

l first → Future<T>

监听Stream,只要接收到第一个事件就立马取消监听。

l isBroadcast → bool

Whether this stream is a broadcast stream.

l last → Future<T>

The last element of this stream. [...]

l length → Future<int>

The number of elements in this stream. [...]

方法

l asBroadcastStream({void onListen(StreamSubscription<T> subscription), void onCancel(StreamSubscription<T> subscription)}) → Stream<T>

Returns a multi-subscription stream that produces the same events as this.

l drain<E>([E? futureValue]) → Future<E>

Discards all data on this stream, but signals when it is done or an error occurred.

l handleError(Function one rror, {bool test(dynamic error)}) → Stream<T>

Creates a wrapper Stream that intercepts some errors from this stream. [...]

l listen(void onData(T event), {Function? one rror, void onDone(), bool? cancelOnError}) → StreamSubscription<T>

Adds a subscription to this stream. [...]

l skip(int count) → Stream<T>

Skips the first count data events from this stream.

StreamSubscription

Future<void> cancel();

void onData(void handleData(T data)?);

void one rror(Function? handleError);

void onDone(void handleDone()?);

void pause([Future<void>? resumeSignal]);

void resume();

Future<E> asFuture<E>([E? futureValue]);

bool get isPaused;

Generators

当你需要延迟地生成一连串的值时,可以考虑使用 生成器函数。

Dart 内置支持两种形式的生成器方法:

l 同步 生成器:返回一个 Iterable 对象。

l 异步 生成器:返回一个 Stream 对象。

通过在函数上加 sync* 关键字并将返回值类型设置为 Iterable 来实现一个 同步 生成器函数,在函数中使用 yield 语句来传递值:

Iterable<int> naturalsTo(int n) sync* {

  int k = 0;

  while (k < n) yield k++;

}

实现 异步 生成器函数与同步类似,只不过关键字为 async* 并且返回值为 Stream:

Stream<int> asynchronousNaturalsTo(int n) async* {

  int k = 0;

  while (k < n) yield k++;

}

如果生成器是递归调用的,可是使用 yield* 语句提升执行性能:

Iterable<int> naturalsDownFrom(int n) sync* {

  if (n > 0) {

    yield n;

    yield* naturalsDownFrom(n - 1);

  }

}

Isolates

https://www.bilibili.com/video/BV1Y7411V7iP/?spm_id_from=333.788.videocard.0

大多数计算机中,甚至在移动平台上,都在使用多核 CPU。为了有效利用多核性能,开发者一般使用共享内存的方式让线程并发地运行。然而,多线程共享数据通常会导致很多潜在的问题,并导致代码运行出错。

为了解决多线程带来的并发问题,Dart 使用 isolates 替代线程。

l 每一个 isolate 有它自己的堆内存以确保其状态不被其它 isolates 访问。公共变量的值都是isolate 独有的,类似于java的ThreadLocal。

l 每一个 isolate都有自己的线程和一个eventloop事件循环系统。

l isolate之间想要沟通是通过ReceivePort, SendPort发送接收事件来解决的。都会发送到对方isolate的eventloop。

可以通过Isolate的spawn来创建一个isloate,

external static Future<Isolate> spawn<T>(
void entryPoint(T message), T message,
{bool paused = false,
bool errorsAreFatal = true,
SendPort? onExit,
SendPort? one rror,
@Since("2.3") String? debugName});

entryPoint:是新的isolate要运行的方法,或者初始化方法,它接收第二个参数类型的值。

message:用来发送给新isolate的一个消息,通常会把当前isolate创建的SendPort包装在message内传递给新的isolate,那么新isolate就可以用SendPort去通知父isolate。

debugName:是debug时的名字,一般Isolate.current.debugName来获取到当前Isolate的名字。

static属性

external static Isolate get current;

Return an Isolate object representing the current isolate. [...]

方法

kill({int priority: beforeNextEvent}) → void

Requests the isolate to shut down. [...]

ReceivePort

abstract class ReceivePort implements Stream<dynamic>

可以看到它实现了Stream,所以可以调用Stream.listen来监听SendPort发送过来的信息,

此Stream不是广播Stream,所以如果需要的话调用asBroadcastStream()来转换成广播Stream。

l close() → void

Closes this. [...]

SendPort

l send(Object? message) → void

Sends an asynchronous message through this send port, to its corresponding ReceivePort. [...]

例子

int age = 18;
void main() {
  age++;
print("Isolate:${Isolate.current.debugName},age:$age");
ReceivePort receivePort = ReceivePort();
receivePort.listen((message) {
    print("receive data:$message");
receivePort.close();
});
Isolate.spawn(initSubIsolate, Data(receivePort.sendPort, "from main", 1),
debugName: "sub_isolate");
}
void initSubIsolate(Data data) {
  print("Isolate:${Isolate.current.debugName},age:$age");
print("Isolate:${Isolate.current.debugName}:$data");
Future.delayed(Duration(seconds: 2), () {
    data.sendPort.send(Data(null, "fuck you", data.requestCode));
});
}
class Data {
final SendPort sendPort;
  final String msg;
  final int requestCode;
Data(this.sendPort, this.msg, this.requestCode);
@override
String toString() {
return 'Data{msg: $msg, requestCode: $requestCode}';
}
}

输出:

Isolate:main,age:19

Isolate:sub_isolate,age:18

Isolate:sub_isolate:Data{msg: from main, requestCode: 1}

receive data:Data{msg: fuck you, requestCode: 1}

eventloop

Dart3_异步编程

上一篇:JavaSE 之 传统日期格式化的线程问题


下一篇:【C/C++】多线程编程