openTSDB详解之Deferred
类
Deferred
类在github
中的介绍,如下:
简单翻译一下:Deferred
类是Java 库提供一些有用的构建模块去 构建高性能,多线程,异步的java应用。它的实现灵感来自Twisted
的异步库(twisted.internet.defer
)。
Deferred允许你轻松地构建异步的处理链,这个处理链必须触发,当一个异步的事件(I/O
,RPC
以及其它)完成。它能被广泛用于构建异步API,在一个多线程服务器或者是客户端中。
0.前言
为了简单的异步处理,延迟结果的一个线程安全的实现。(原文:A thread-safe implementation of a Deferred result for easy asynchronous processing.
)
这个实现是基于Twisted的Python代码中的Deferred
接口。
这个api【译者注:即Deffered
】用一个简单且优雅的方式去管理异步及动态管道(处理链),没有显式的定义一个有限状态机【译者注:什么是有限状态机?】。
我们都很繁忙,不一定有时间去仔细阅读使用手册(RTFM:Read The Funcking Manual
)。请将注意力放在你必须处理的,不变的东西上。除此之外,这里提供一个关于执行Deferred
的概要:
- 一个
Deferred
像是一个future
,具有与其相连的动态Callback chain
【译者注:我将此译作callback链 或是回调链】 - 当
Deferred
结果变成可用的,callback 链将会被触发【因为callback是绑定在Deferred
对象上的】 - 一个callback的结果将会被传递给下一个
- 当一个callback返回另一个
Deferred
时,在调用链中的下一个callback不能执行,直到其它的Deferred
对象变得用时【译者注:这个规则好像就是为了下述的Deferred
(B)给Deferred
(A)而制定的】 - 实际上有两个callback链。一个用于正常处理,一个用于错误处理/修复。 处理错误的Callback叫做
errback
。Deferred
是一种重要的构建块,可以以线程安全的方式编写易于使用的异步api
1.理解Deferred
的概念
其思想就是:一个Deferred
对象代表一个尚不可用的结果。 一个异步的操作(I/O,RPC···
)已经开始,并且在将来某个时刻将它们的处理结果(要么成功,要么失败)传递给Deferred
对象。Deferred
和Future
对象之间不同的关键点是有一个callback
链与Deferred
对象关联,然而对于一个future
对象,你需要在某些时候手动触发才能得到结果,这将造成一些问题,比如:你怎么知道这些结果是可用的?如果这些结果本身依赖于另外的future
对象时该怎么办?【译者注:而这两个问题,恰好能够在Deferred
对象中解决】
你开始执行一个异步操作,当操作完成时,你肯定想知道调用异步操作返回的结果。如果操作成功完成,你想你的callback使用它的结果去执行在你开启异步操作时做的事情。如果有一个错误,你就想触发一些错误处理代码。
但是对于一个Deferred
对象,不仅仅只有单个的callback。你能够添加任意数目的callbacks,这允许你能以一个相对简单的,优雅的方式有效且容易地构建复杂的处理管道。
2.理解callback chain
让我们举一个例子。为了让其他人使用你的简单的远程存储服务,你正在为他们写一个客户端库(client library)。当你的用户调用你的库中的get()方法时,你想从你的远端服务检索一些数据,并将它们交给用户,但是你想用异步的方式处理这件事儿。
当你的客户端库的用户调用get()方法时,你收集一个请求,并且通过一个socket包将它发送到远端的服务器。在发送请求给socket之前,为了保持请求和这个Deferred
对象之间的一个连接,你创建了一个Deferred
对象,然后将它存储在某个地方,例如,在一个map中。然后你返回这个Deferred
对象给用户, 这就是一旦RPC完成时, 他们为什么能够访问的到Deferred
结果的原因。
或早或晚,RPC调用将会成功完成,亦或失败,并且你的socket将会变得可读(或者可能会在一个失败的事件中关闭)【译者注:socket变的可读是什么意思?】。我们假设现在所有事都像期待那样运行,因此socket也是可读的,所以你从socket包中读取响应(回复)。在这个时候,**你提取远程调用的结果,然后你将其交给Deferred
对象,这个Deferred
对象是你为了这个请求而创建的(记住,你已经将它存储在某个地方,所以一旦你得到这个Deferred
对象,你就能给它远程调用的结果)。然后Deferred
对象存储这个结果并且触发任何添加到这个Deferred
对象的callback。**期待的结果是:你的客户端库的用户,在调用你的get()方法之后,将会添加一个Callback到你给的Deferred
对象上。这样,当Deferred
结果变得可用时,你将使用参数的方式来调用它。【因为你已经将远程调用的结果发送到Deferred
对象中】
到目前为止,我们已经解释了的东西并非超过带有一个callback的future
对象的范畴。但是Deferred
对象所能做的远非如此。现在让我们假设,为了避免再次通过网络得到相同的值, 其它一些人想在你的客户端库上构建一个缓冲层。想使用cache的用户将会调用cache库中的get方法而不是直接调用你的客户端库。【译者注:这个是没有问题的,因为先要从缓冲区取数据,如果没有的话,再通过RPC过程取得数据。】
假设缓冲库已经存储过一次调用的结果。 它将会创建一个Deferred
对象,并立即将缓冲的结果交付, 并返回这个Deferred
对象给用户。用户将会添加一个callback,这个callback将会立即调用因为Deferred
结果已经可用。所以整个调用过程立即完成,并且完全来自同一个线程。没有上下文切换(没有涉及到其他的线程,没有I/O以及其它的东西),没有什么被阻塞,一切都完成的十分迅速。
现在让我们假设缓冲库有一个缓冲丢失, 并且需要使用上述的源客户端库去做一个远程get()方法调用。RPC被发送给远端的服务器,并且客户端库返回一个Deferred
对象给缓冲库。这就是令人感到兴奋的事物了。在返回缓冲库给用户之前,缓冲库会添加它自己的callback给Deferred
对象。这个callback将会从远端服务器取到结果,将其添加到cache中,并且返回【译者注:这个就是缓冲库的callback的作用】。通常,用户添加他们自己的callback去处理结果。所以现在Deferred
对象有两个callbacks与其相联系。
1st callback 2nd callback
`Deferred`: add to cache --> user callback
当RPC完成,源客户端库将会从传输中反序列化结果,并将其传递给Deferred
对象。第一个callback将会被触发,这将添加结果到缓存库。然后,无论第一个callback返回什么,都会被传递给第二个callback。结果就是:缓存的callback返回得到的响应,并不会做任何更改,所以这将会被(完整地)传递给用户自定义的callback。
现在去理解第一个callback能够返回另外的任意的值就非常重要, 并且这个值能够被传递给第二个callback。第一次听到这个逻辑稍有不可思议,但是它的确是Deferred
背后的关键点。
为了解释原因,可以让事情更复杂一点儿。 假设得到请求的远端服务器是一个相对简单的,且低层次的存储服务(联想到分布式缓存),所以它仅仅使用字节数组工作,它并不在意内容是什么。所以原始的客户端库仅仅从网络中反序列化字节数组并且传递字节数组到Deferred
对象中。
现在你正在编写一个高层次(higher-level library
)的库,使用这个存储系统(上述的存储服务)去存储一些你自定义的对象。所以当你从服务器中得到字节数组时,你需要进一步反序列化字节数组成某种对象。你的高层次库用户不在意你使用何种远端存储系统,他们在意的事情仅仅是:异步得到这些对象。你的高层次库构建于一个源端的低层次库,这个低层次库只是用来做RPC通信。
当使用高层次库的用户调用get()方法,你调用低层次库的get()方法,这个低层次库中的get()方法将会发起一个RPC调用并且返回一个Deferred
对象给高层次库。 为了进一步的从字节数组到对象的反序列化,高层次库添加第一个callback。 然后高层次库的用户添加他们自己的callback,这个callback将使用这个对象一些事情。所以现在的整个流程看上去像是这个样子。
1st callback 2nd callback
`Deferred`: de-serialize to an object --> user callback
当结果来自网络时,字节数组反序列化自socket。第一个callback【译者注:第一个callback的作用就是从 字节数组—反序列化—>object】被调用并且它的参数是原始的结果——字节数组。所以1st callback进一步地反序列化字节数组成某些对象,并将其返回。 2nd callback然后被调用并且它的参数就是之前callback的结果——即反序列化而得到的对象。
现在回到缓存库,它和高层次库毫无联系。它做的所有是:某些实现某些接口的对象,带有一个get()方法,它保持一个映射——无论接收到什么参数到被缓存给这个特殊调用的对象。由于callback 链调用的方式,可能透明地使用缓存库以及高层次库。想使用缓存的用户仅仅需要使用缓存库和高层次库。现在当他们调用缓存库的get()方法时,且有一个cache丢失时,下面的这些将会依次发生:
-
缓存库调用高层次库中的get()方法,高层次库调用低层次库中的get()方法
-
低层次库创建一个
Deferred
对象,发起一个RPC过程并且返回它的Deferred
对象 -
高层次库添加它的callback(用于对象反序列化)到
Deferred
对象中,并且将这个对象返回 -
客户端添加自定义的callback(用于缓存更新)到
Deferred
对象中,并将其返回 -
用户得到
Deferred
对象并且添加自己的callback,使用从数据中心检索到的对象做一些事情1st callback 2nd callback 3rd callback `Deferred`: de-serialize --> add to cache --> user callback result: (none available)
一旦响应返回,第一个callback被调用,它反序列化得到一个对象,并返回这个对象。目前Deferred
对象的结果变成了一个反序列化的对象。Deferred
对象目前的状态如下所示:
2nd callback 3rd callback
`Deferred`: add to cache --> user callback
result: de-serialized object
因为在调用链中有许多callbacks,Deferred
对象使用当前的结果(反序列化而成的对象)作为参数调用下一个callback。callback添加对象到自己的缓存并且不做任何修改的返回。【译者注:这里的cache层的库的callback只做添加object到自己的缓存中,而不做任何其它的操作】
3rd callback
`Deferred`: user callback
result: de-serialized object
最终,用户的callback被调用,使用一个object作为参数(原文Finally, the user's callback is invoked with the object in argument.
)
`Deferred`: (no more callbacks)
result: (whatever the user's callback returned)
如果你认为这正在变得有趣,继续读下去, 因为你尚未读到Deferred
中最有趣的地方。
3.使用Deferred
构建动态处理链
让我们将之前的例子变得更加复杂一点儿。 假设为这些调用服务的远端存储服务是一个运行在很多台机器上的分布式存储服务。数据分区存储在许多节点上并且从一个节点中来回移动(由于机器损坏或者其它不知名的原因)【译者注:其实正在为Deferred
用于HBase的环境做铺垫】。为了执行一个get调用,低层次客户端库首先需要知道哪个服务器目前存储了请求数据。假设还有另外的服务器,同时也是分布式服务的一部分,这个服务器保存了索引以及每条数据移动的轨迹。低层次的客户端库首先需要使用第一台服务器找到数据的实际存储位置(这是第一个RPC过程)【译者注:这个第一台服务器就是HBase中-root-表所在的服务器】,然后再从存储节点中检索数据(这是第二次RPC过程)。末端用户不在意检索数据涉及两个阶段,它们仅仅想调用get()方法,当数据(一个字节数组)是可用时被返回。
这大概就是Deferred
最有用的特性。当用户调用get()方法时, 低层次库将会发起第一个RPC调用到索引服务器去定位用户需要的数据的位置。当发起这个RPC去查找时,一个Deferred
对象被创建。低层次库的get()代码添加了第一个callback去处理查询响应,然后返回给用户。
1st callback 2nd callback
`Deferred`: index lookup --> user callback
result: (none available)
最终,查找的RPC完成,然后Deferred
对象得到查找响应。所以在触发第一个callback之前,Deferred
对象将会是这个状态。
1st callback 2nd callback
`Deferred`: index lookup --> user callback
result: lookup response
第一个callback运行并且现在知道该去哪儿找最开始请求的数据了。它发起了一个get()请求到正确的存储节点上。这么做将会创建另一个Deferred
对象【译者注:为什么这里会创建另一个Deferred
对象?=》保存get()请求的结果】,让我们称其为B, 它由查询索引的callback返回。并且这就是奇迹发生的地方。现在我们所出的状态如下:
(A) 2nd callback | (B)
|
`Deferred`: user callback | `Deferred`: (no more callbacks)
result: `Deferred` (B) | result: (none available)
由于一个callback返回一个Deferred
,我们不能调用用户的callback,因为用户不想知道他们的callback接收的是一个Deferred
对象,他们想接收的是一个字节数组。目前的callback得到终止,并且停止处理callback 链。但是这个callback链需要被恢复,无论get()调用完成了什么Deferred
(原文:This callback chain needs to be resumed whenever the
Deferredof the get call [(B)] completes.
)。为了实现这个,一个callback被追加到其它的Deferred
,这个Deferred
对象将会恢复callback链的执行。
(A) 2nd callback | (B) 1st callback
|
`Deferred`: user callback | `Deferred`: resume (A)
result: `Deferred` (B) | result: (none available)
一旦(A)添加callback到(B)上,它会立即返回,没有阻塞一个线程或者像是这类的事情需要等待。所以接收查找请求的整个过程以及发送get的RPC请求非常迅速,没有任何阻塞。
现在当get响应从网络中返回时,通常,RPC层从字节数组中反序列化,并且将其交给(B)
(A) 2nd callback | (B) 1st callback
|
`Deferred`: user callback | `Deferred`: resume (A)
result: `Deferred` (B) | result: byte array
(B)的唯一一个callback 正准备设置(A)的结果,并且恢复(A)的callback 链
(A) 2nd callback | (B) 1st callback
|
`Deferred`: user callback | `Deferred`: resume (A)
result: byte array | result: byte array
【译者注:这里的使用了Deferred
(B) 用于设置Deferred
(A)的值】
所以现在(A)恢复了它的callback链,并且调用用户的callback 使用字节数组作为参数,而这正是用户所想要的
(A) | (B) 1st callback
|
`Deferred`: (no more cb) | `Deferred`: resume (A)
result: (return value of | result: byte array
the user's cb)
然后(B)移动到它callback 链的下一个,但是并不存在下一个callback链节点,所以(B)完成了。
(A) | (B)
|
`Deferred`: (no more cb) | `Deferred`: (no more cb)
result: (return value of | result: byte array
the user's cb)
读取获取响应的整个过程,恢复初始的Deferred
对象以及执行第二个Deferred
对象发生在同一个线程里,有序的,没有任何阻塞(前提是:用户的callback没有阻塞,它也一定不能阻塞)
我们做的本质上与动态构建一个隐式有限状态机去处理获取请求的生命流程是相同的。这个简单的API允许你去构建任意的复杂的处理管道,这个处理管道会动态的决定在每个管道的阶段关于下一步要做什么。
处理错误:
一个Deferred
对象实际上有两个而不是一个callback 链。第一个链是正常的处理流程链,第二个则是错误处理链。Twisted将错误处理callback称为errback,所以我们在这里也保持这相同的术语。当异步处理以一个错误结束时, Deferred
对象必须抛出捕获的异常而不是给出结果(或者如果没有异常被捕捉,一个异常必须被创建并且传递给Deferred
对象)。当目前的Deferred
结果是一个异常的实例时,下一个errback将会被调用。至于正常的callbacks,无论errback返回什么,都会是目前的结果。如果当前的结果仍然是一个异常的实例,那么下一个errback将会被调用。如果当前的结果不再是一个异常,下一个callback将会被调用。
当一个callback或者errback本身抛出一个异常时,它由Deferred
对象捕获,并且成为当前的结果,这就意味着链中的下一个errback将会被触发,以当前的这个异常作为参数。注意Deferred
对象仅仅捕获异常,并不会抛出或者是报错。
约定及规则
仔细阅读下面这个规则,因这些是你正确实现的保证
- 1.一个
Deferred
对象能够收到仅仅一个初始值 - 2.每次仅仅一个线程在执行callback链
- 3.每个由callback动作在下一个callback被调用之前发生。换句话说,如果一个callback链操作一个变量(并且没有别人操作它),是不需要同步要求的。
- 4.执行callback链的线程 传递初始值给
Deferred
对象。这个类不创建或者管理任何的线程或者执行者。 - 5.一旦callback被执行,
Deferred
对象将会失去对它的引用。 - 6.每个添加了callback到一个
Deferred
对象的方法的时间复杂度是O(1) - 7.一个
Deferred
对象不能接收本身作为一个初始的或者是中间值 ,因为这将引发一个无线循环。 - 8.你必须注意不能构建一个
Deferred
的循环,因为这会造成一个无限的循环(幸运的是,它会因为CallbackOverflowError而报错) - 9.Callbacks以及errbacks不能接收一个
Deferred
作为参数。这是因为它们总是接收之间的callback的结果,当这个结果作为一个Deferred
对象时,我们怀疑callback链的执行直到其它的Deferred
对象的结果是可用的 - 10.Callbacks不能接收一个异常作为参数。这是因为它们总是导致errbacks。
- 11.使用
Deferred
对象的监视器会导致死锁,所以不要使用,换句话说,写synchronized (some_
Deferred) { ... }
或者是任何等价的处理会让你的保证无效。原文如下:
Using the monitor of a Deferred can lead to a deadlock, so don't use it. In other words, writing
synchronized (some_Deferred) { ... }
(or anything equivalent) voids your warranty.
4.Deferred类字段详解
- result
/**
* The current result. This reference is either of type T or Exception.
* 当前的结果。这个引用要么是类型T,要么就是Exception
* This reference doesn't need to be volatile because it's guaranteed that
* only 1 thread at a time will access this reference, and all accesses
* happen in between a volatile access on the {@link #state} variable.
* 这个引用不需要使用volatile修饰,因为它保证每次仅仅只有1个线程访问这个引用,并且所有的访问
* 发生一个对state变量的 volatile access 访问。
*
* This reference isn't touched until the initial call to {@link #callback}
* occurs. The thread that invokes {@link #callback} is going to run through
* the callback chain in {@link #runCallbacks}, but it will first set the
* state to RUNNING (volatile access). When the method returns, the state
* is changed either to DONE or PAUSED (volatile access).
* 这个引用不会被创建直到对callback()的第一次调用发生。 调用callback的线程将运行callback链在runCallbacks,但是它将被首次设置为RUNNING(volatie access)。当其它线程返回时,状态要么被修改为DONE,要么被修改为PAUSED(volatile access)
When going out of
* PAUSED, the state is set back to RUNNING before updating the result
* (volatile access). Going from state DONE to RUNNING again also involves
* a volatile access.
当由PAUSED状态退出时,状态将会被设置成RUNNING,在更新结果(volatile access)。从DONE状态变为RUNNING状态同样会涉及一个volatile access.
*
* The new Java memory model thus guarantees that accesses to this reference
* will always be consistent, since they always happen after a volatile
* access to the state associated with this reference. See also the FAQ on
* JSR 133 (Java Memory Model) at http://su.pr/2WxCIb#volatile
* 新的java 内存模型因此保证了对这个引用的方法总是持续的,因为它们总是在volatile access 与这个引用相关联的状态之后发生。
*/
private Object result;
5.Deferred类方法详解
-
fromResult()
方法
这个方法十分重要,在OpentSDB中起到了关键作用。一定要好好理解,这里也十分欢迎对此方法理解的小伙伴来此交流。
/**
* Constructs a {@code Deferred} with a result that's readily available.
* 使用一个易读的结果构造一个Deferred对象。
*
* 这个方法与下面的写法相同
* This is equivalent to writing:
* Deferred<T> d = new Deferred<T>();
* d.callback(result);
* }
*
* Callbacks added to this {@code Deferred} will be immediately called.
* 添加到这个Deferred对象的Callbacks将会被立即调用
*
* @param result The "deferred" result.【这个意思应该是:回调后的值】
* @return a new {@code Deferred}.返回一个新的Deferred对象
*/
public static <T> Deferred<T> fromResult(final T result) {
return new Deferred<T>(result);
}
-
addCallbackDeferring()
方法
/**
* Registers a callback. 注册一个callback
* This has the exact same effect as {@link #addCallback}, but keeps the type
* information "correct" when the callback to add returns a {@code Deferred}.
这个方法有相同的作用于addCallback(),但是保持类型消息“correct”,当callback被添加到一个
Deferred对象时。
* @param cb The callback to register.
* 需要注册的callback
* @return {@code this} with an "updated" type.
*/
@SuppressWarnings("unchecked")
public <R, D extends Deferred<R>>
Deferred<R> addCallbackDeferring(final Callback<D, T> cb) {
return addCallbacks((Callback<R, T>) ((Object) cb), Callback.PASSTHROUGH);
}
-
Deferred()
方法
/** Private constructor used when the result is already available.
当结果可用时,使用私有构造器
*/
private Deferred(final Object result) {
this.result = result;
state = DONE;
}