参考
https://dart.cn/guides/language/language-tour#asynchrony-support
https://dart.cn/guides/libraries/library-tour#dartasync---asynchronous-programming
https://dart.cn/codelabs/async-await
异步编程
在dart中是没有线程的概念的,取而代之的是异步编程。
可以使用两种方式创建异步函数:
l 通过async和await关键字
l 通过Future api来链式创建
async和await的操作和Future是分不开的,只是async和await更简化了逻辑,看起来像是同步代码一样。
在直接使用 Future API 前,首先应该考虑 await 来替代。代码中使用 await 表达式会比直接使用 Future API 更容易理解。
async和await
声明一个异步函数,在方法名后边添加上关键字async即可,同时返回值要修改成用Future<T>,T是函数体要返回的值,如下例:
Future<void> main() async {
var future = getName();
future.then((value) => print('getName:$value'));
print("after get name");
}
Future<String> getName() async {
print("getName start");
return "wz";
}
输出结果是:
getName start
after get name
getName:wz
上边的代码其实并不会异步执行,因为异步执行是在遇到第一await时会挂起函数,并直接返回Future<T>,同时函数会等待await指定的操作完成后才继续方法后边的代码。
修改一下上边的getName的代码:
Future<String> getName() async {
print("getName start");
await Future.delayed(Duration(seconds: 2));
print("after delay");
return "wz";
}
输出结果是:
getName start
after get name
after delay
getName:wz
await使用说明:
l await只能用在async中。
l await后可以跟普通函数(但没有意义的),也可以跟异步函数(返回值是Future<T>的函数)。
l await等待的操作结果是可以赋值给一个变量的,此时会把后边的Future<T>的T取出来,这样我们就可以用同步的方式编写异步调用,
var entrypoint = await findEntrypoint();
var exitCode = await runExecutable(entrypoint, args);
await flushThenExit(exitCode);
处理异常
使用 try、catch 以及 finally 来处理使用 await 导致的异常:
try {
version = await lookUpVersion();
。。。
} catch (e) {
// 无法找到版本时做出的反应
}
如果有异常发生的话,lookUpVersion下边的代码就不会执行了,会直接跳到catch中。
Future
https://api.dart.cn/stable/2.10.4/dart-async/Future-class.html
从 Dart 2.1 开始,使用 Future 和 Stream 不需要导入 dart:async ,因为 dart:core 库 export 了这些类。
上边通过async和await返回了一个Future对象,可以通过此对象注册一些监听,比如执行完后再去执行其他,异常处理等。
Future<int> future = getFuture();
future.then((value) => handleValue(value))
.catchError((error) => handleError(error));
FutureOr<T>
先说一下这个类,因为Future中有很多方法都有此类,
此类表示的是一个值,这个值可以是Future<T>或T。
此类声明是内部future或value泛型类型的公共代理。对此类的引用被解析为内部类型。
任何类extend、mixin或implement 此类都是一个编译时错误。
Future的构造函数
l Future(FutureOr<T> computation())
创建一个Future,其中包含Timer.run异步调用computation()的结果。
l Future.delayed(Duration duration, [FutureOr<T> computation()])
创建一个Future,会在指定的延时后执行computation()
l Future.microtask(FutureOr<T> computation())
其中包含scheduleMicrotask异步调用computation()的结果。
microtask是比普通的事件(比如Timer事件)更先执行的一种任务。
l Future.sync(FutureOr<T> computation())
包含立即调用computation()的结果。
If calling computation throws, the returned future is completed with the error.
If calling computation returns a Future<T>, that future is returned.
If calling computation returns a non-future value, a future is returned which has been completed with that value.
l Future.error(Object error, [StackTrace? stackTrace])
创建一个Future,此Future有一个error的结果。
l Future.value([FutureOr<T>? value])
如果value是一个Future,那么会创建一个等待value的future执行完成的Future。
如果value是一个值,那么就相当于new Future<T>.sync(() => value)
Future的成员方法
l asStream() → Stream<T>
Creates a Stream containing the result of this future. [...]
l catchError(Function one rror, {bool test(Object error)}) → Future<T>
Handles errors emitted by this Future. [...]
确保调用 catchError() 方式在 then() 的结果上,而不是在原来的 Future 对象上调用。否则的话,catchError() 就只能处理原来 Future 对象抛出的异常,而无法处理 then() 代码里面的异常。
l then<R>(FutureOr<R> onValue(T value), {Function? one rror}) → Future<R>
Register callbacks to be called when this future completes. [...]
l timeout(Duration timeLimit, {FutureOr<T> onTimeout()}) → Future<T>
Time-out the future computation after timeLimit has passed. [...]
l whenComplete(FutureOr<void> action()) → Future<T>
Registers a function to be called when this future completes. [...]
Future的static方法
l any<T>(Iterable<Future<T>> futures) → Future<T>
Returns the result of the first future in futures to complete. [...]
l doWhile(FutureOr<bool> action()) → Future
Performs an operation repeatedly until it returns false. [...]
l forEach<T>(Iterable<T> elements, FutureOr action(T element)) → Future
Performs an action for each element of the iterable, in turn. [...]
l wait<T>(Iterable<Future<T>> futures, {bool eagerError: false, void cleanUp(T successValue)}) → Future<List<T>>
Waits for multiple futures to complete and collects their results. [...]
Stream
在 Dart API 中 Stream 对象随处可见,Stream 用来表示一系列数据。例如,HTML 中的按钮点击就是通过 stream 传递的。同样也可以将文件作为数据流来读取。
如果想从 Stream 中获取值,可以有两种选择:
l 使用 async 关键字和一个 异步循环(使用 await for 关键字标识)。
l 使用 Stream API。详情参考库概览。
await for
使用 await for 定义异步循环看起来是这样的:
await for (varOrType identifier in expression) {
// 每当 Stream 发出一个值时会执行
}
expression的类型必须是 Stream。执行流程如下:
1. 等待直到 Stream 返回一个数据。
2. 使用 1 中 Stream 返回的数据执行循环体。
3. 重复 1、2 过程直到 Stream close。
使用 break 和 return 语句可以停止接收 Stream 数据,这样就跳出了循环并取消注册监听 Stream。
Stream
https://api.dart.cn/stable/2.10.4/dart-async/Stream-class.html
Stream提供了一种接收事件序列的方法。每个事件要么是一个数据事件,也称为流的一个元素,要么是一个错误事件,它是一个失败的通知。
当Stream已发出其所有事件时,单个“done”事件将通知侦听器已到达结束。
你可以通过Stream.listen()来监听Stream的事件,它会返回一个StreamSubscription对象,你可以在此对象上添加一些监听,暂停恢复事件的监听,或cancel取消监听。
当“done”事件被触发时,订阅者在接收事件之前被取消订阅。事件发送后,Stream就没有订阅者了。允许在此时之后向广播Stream添加新订阅者,但他们将尽快接收到新的“done”事件。
有两种类型的Stream:
l 单订阅Stream,
是只能被listen一次,多次调用是不允许的,即使在取消第一个监听之后在调用也不允许。
并且此Stream是冷流,即只有当有监听者时才会开始产生事件,取消监听后Stream停止发送事件,即使事件源仍然可以提供更多。
单订阅流通常用于流式传输较大的连续数据块,如文件I/O。
l 广播Stream。
可以被多个监听者监听,
并且此Stream是热流,即只要有事件就发送,不管有没有监听者,
单订阅Stream也可以通过asBroadcastStream()来转换成广播Stream。
当注册监听者时,只能接收到注册之后发送的事件,而不能接收到之前已经发送过的。
Stream的Api
属性
l first → Future<T>
监听Stream,只要接收到第一个事件就立马取消监听。
l isBroadcast → bool
Whether this stream is a broadcast stream.
l last → Future<T>
The last element of this stream. [...]
l length → Future<int>
The number of elements in this stream. [...]
方法
l asBroadcastStream({void onListen(StreamSubscription<T> subscription), void onCancel(StreamSubscription<T> subscription)}) → Stream<T>
Returns a multi-subscription stream that produces the same events as this.
l drain<E>([E? futureValue]) → Future<E>
Discards all data on this stream, but signals when it is done or an error occurred.
l handleError(Function one rror, {bool test(dynamic error)}) → Stream<T>
Creates a wrapper Stream that intercepts some errors from this stream. [...]
l listen(void onData(T event), {Function? one rror, void onDone(), bool? cancelOnError}) → StreamSubscription<T>
Adds a subscription to this stream. [...]
l skip(int count) → Stream<T>
Skips the first count data events from this stream.
StreamSubscription
Future<void> cancel();
void onData(void handleData(T data)?);
void one rror(Function? handleError);
void onDone(void handleDone()?);
void pause([Future<void>? resumeSignal]);
void resume();
Future<E> asFuture<E>([E? futureValue]);
bool get isPaused;
Generators
当你需要延迟地生成一连串的值时,可以考虑使用 生成器函数。
Dart 内置支持两种形式的生成器方法:
l 同步 生成器:返回一个 Iterable 对象。
l 异步 生成器:返回一个 Stream 对象。
通过在函数上加 sync* 关键字并将返回值类型设置为 Iterable 来实现一个 同步 生成器函数,在函数中使用 yield 语句来传递值:
Iterable<int> naturalsTo(int n) sync* {
int k = 0;
while (k < n) yield k++;
}
实现 异步 生成器函数与同步类似,只不过关键字为 async* 并且返回值为 Stream:
Stream<int> asynchronousNaturalsTo(int n) async* {
int k = 0;
while (k < n) yield k++;
}
如果生成器是递归调用的,可是使用 yield* 语句提升执行性能:
Iterable<int> naturalsDownFrom(int n) sync* {
if (n > 0) {
yield n;
yield* naturalsDownFrom(n - 1);
}
}
Isolates
https://www.bilibili.com/video/BV1Y7411V7iP/?spm_id_from=333.788.videocard.0
大多数计算机中,甚至在移动平台上,都在使用多核 CPU。为了有效利用多核性能,开发者一般使用共享内存的方式让线程并发地运行。然而,多线程共享数据通常会导致很多潜在的问题,并导致代码运行出错。
为了解决多线程带来的并发问题,Dart 使用 isolates 替代线程。
l 每一个 isolate 有它自己的堆内存以确保其状态不被其它 isolates 访问。公共变量的值都是isolate 独有的,类似于java的ThreadLocal。
l 每一个 isolate都有自己的线程和一个eventloop事件循环系统。
l isolate之间想要沟通是通过ReceivePort, SendPort发送接收事件来解决的。都会发送到对方isolate的eventloop。
可以通过Isolate的spawn来创建一个isloate,
external static Future<Isolate> spawn<T>(
void entryPoint(T message), T message,
{bool paused = false,
bool errorsAreFatal = true,
SendPort? onExit,
SendPort? one rror,
@Since("2.3") String? debugName});
entryPoint:是新的isolate要运行的方法,或者初始化方法,它接收第二个参数类型的值。
message:用来发送给新isolate的一个消息,通常会把当前isolate创建的SendPort包装在message内传递给新的isolate,那么新isolate就可以用SendPort去通知父isolate。
debugName:是debug时的名字,一般Isolate.current.debugName来获取到当前Isolate的名字。
static属性
external static Isolate get current;
Return an Isolate object representing the current isolate. [...]
方法
kill({int priority: beforeNextEvent}) → void
Requests the isolate to shut down. [...]
ReceivePort
abstract class ReceivePort implements Stream<dynamic>
可以看到它实现了Stream,所以可以调用Stream.listen来监听SendPort发送过来的信息,
此Stream不是广播Stream,所以如果需要的话调用asBroadcastStream()来转换成广播Stream。
l close() → void
Closes this. [...]
SendPort
l send(Object? message) → void
Sends an asynchronous message through this send port, to its corresponding ReceivePort. [...]
例子
int age = 18;
void main() {
age++;
print("Isolate:${Isolate.current.debugName},age:$age");
ReceivePort receivePort = ReceivePort();
receivePort.listen((message) {
print("receive data:$message");
receivePort.close();
});
Isolate.spawn(initSubIsolate, Data(receivePort.sendPort, "from main", 1),
debugName: "sub_isolate");
}
void initSubIsolate(Data data) {
print("Isolate:${Isolate.current.debugName},age:$age");
print("Isolate:${Isolate.current.debugName}:$data");
Future.delayed(Duration(seconds: 2), () {
data.sendPort.send(Data(null, "fuck you", data.requestCode));
});
}
class Data {
final SendPort sendPort;
final String msg;
final int requestCode;
Data(this.sendPort, this.msg, this.requestCode);
@override
String toString() {
return 'Data{msg: $msg, requestCode: $requestCode}';
}
}
输出:
Isolate:main,age:19
Isolate:sub_isolate,age:18
Isolate:sub_isolate:Data{msg: from main, requestCode: 1}
receive data:Data{msg: fuck you, requestCode: 1}