大多数流程图属于以下两类之一:
- 数据流图。 在这种类型的图中,数据沿着图的边缘传递。 节点接收、转换然后传递数据消息。
- 依赖图。 在这种类型的图中,节点操作的数据直接通过共享内存获取,不沿边传递。
Data Flow Graph
在数据流图中,节点是发送和接收数据消息的计算。 一些节点可能只发送消息,其他节点可能只接收消息,而其他节点可能会发送消息以响应它们接收到的消息。
在下面的数据流图中,最左边的节点生成从 1 到 10 的整数值并将它们传递给两个后继节点。 其中一个后继者对其接收的每个值进行平方并将结果传递到下游。 第二个后继者将它接收到的每个值立方化,并将结果传递到下游。 最右边的节点从两个中间节点接收值。 当它接收到每个值时,它会将其添加到值的运行总和中。 当应用程序运行完成时,sum
的值将等于从 1 到 10 的正方形和立方体序列的总和。
以下代码片段显示了上面显示的简单数据流图的实现:
int sum = 0;
graph g;
function_node< int, int > squarer( g, unlimited, [](const int &v) {
return v*v;
} );
function_node< int, int > cuber( g, unlimited, [](const int &v) {
return v*v*v;
} );
function_node< int, int > summer( g, 1, [&](const int &v ) -> int {
return sum += v;
} );
make_edge( squarer, summer );
make_edge( cuber, summer );
for ( int i = 1; i <= 10; ++i ) {
squarer.try_put(i);
cuber.try_put(i);
}
g.wait_for_all();
cout << "Sum is " << sum << "\n";
在上面的实现中,创建了以下 function_node
:
- 计算平方值
- 计算立方值
- 将值添加到全局
sum
由于squarer
和cuber
节点没有副作用,因此它们是在无限并发的情况下创建的。summer
节点通过对全局变量的引用来更新sum
,因此并行执行是不安全的。 因此,它的并发限制为1
。上面简单数据流图中的节点 F 被实现为一个循环,将消息同时发送到squarer
和cuber
节点。
对第一个实现的一个小小的改进是引入了一个额外的节点类型,一个broadcast_node
。broadcast_node
将它收到的任何消息广播给它的所有后继者。
这可以用单个try_put
替换循环中的两个try_put
:
broadcast_node<int> b(g);
make_edge( b, squarer );
make_edge( b, cuber );
for ( int i = 1; i <= 10; ++i ) {
b.try_put(i);
}
g.wait_for_all();
一个更好的选择是引入一个 input_node
,这将使实现更像上面的简单数据流图。 input_node
,顾名思义,只发送消息,不接收消息。 它的构造函数接受两个参数:
template< typename Body > input_node( graph &g, Body body)
Body
是一个函数对象或 lambda 表达式,它包含一个函数运算符:
Output Body::operator()( oneapi::tbb::flow_control &fc );
你可以用 input_node
替换示例中的循环
input_node< int > src( g, src_body(10) );
make_edge( src, squarer );
make_edge( src, cuber );
src.activate();
g.wait_for_all();
运行时库将重复调用 src_body
的函数运算符 operator ()
,直到在其函数运算符内部调用 fc.stop()
才停止。 因此,你需要创建类似于上面简单数据流图中的循环体的主体。 所有这些更改后的最终实现如下所示:
class src_body {
const int my_limit;
int my_next_value;
public:
src_body(int l) : my_limit(l), my_next_value(1) {}
int operator()( oneapi::tbb::flow_control& fc ) {
if ( my_next_value <= my_limit ) {
return my_next_value++;
} else {
fc.stop();
return int();
}
}
};
int main() {
int sum = 0;
graph g;
function_node< int, int > squarer( g, unlimited, [](const int &v) {
return v*v;
} );
function_node< int, int > cuber( g, unlimited, [](const int &v) {
return v*v*v;
} );
function_node< int, int > summer( g, 1, [&](const int &v ) -> int {
return sum += v;
} );
make_edge( squarer, summer );
make_edge( cuber, summer );
input_node< int > src( g, src_body(10) );
make_edge( src, squarer );
make_edge( src, cuber );
src.activate();
g.wait_for_all();
cout << "Sum is " << sum << "\n";
}
这个最终实现具有来自上面简单数据流图的所有节点和边。 在这个简单的例子中,与显式循环相比,使用 input_node
没有太大优势。 但是,因为 input_node
能够对下游节点的行为做出反应,所以它可以限制更复杂图中的内存使用。
Dependence Graph
在依赖图中,节点调用 Body
对象来执行计算,边创建这些计算的偏序。 在运行时,库会根据指定的偏序在合法的情况下生成并调度任务以执行主体对象。 下图显示了可以使用依赖图表示的应用程序示例。
依赖图是数据流图的一种特殊情况,其中节点之间传递的数据类型为 oneapi::tbb::flow::continue_msg
。 与一般数据流图不同,依赖图中的节点不会为它们收到的每条消息生成一个任务。 相反,他们知道他们拥有的前辈的数量,计算他们收到的消息,并且只有当这个计数等于他们的前辈的总数时才会产生一个任务来执行他们的主体。
下图显示了依赖图的另一个示例。 它具有与上图相同的拓扑结构,但用简单的功能代替了三明治制作步骤。 在这种偏序中,函数 A 必须在任何其他计算开始执行之前完成执行。 函数 B 必须在 C 和 D 开始执行之前完成; E 必须在 D 和 F 开始执行之前完成。 这是一个偏序,因为例如,在 B 和 E 或 C 和 F 之间没有明确的排序要求。
为了将其实现为流图,continue_node
对象用于节点,continue_msg
对象用作消息。 continue_node
构造函数有两个参数:
template< typename Body > continue_node( graph &g, Body body)
第一个参数是它所属的图,第二个参数是函数对象或 lambda 表达式。 与 function_node
不同,continue_node
始终被假定为具有无限的并发性,并且在满足其依赖关系时会立即生成任务。
以下代码片段是该图中示例的实现。
typedef continue_node< continue_msg > node_t;
typedef const continue_msg & msg_t;
int main() {
oneapi::tbb::flow::graph g;
node_t A(g, [](msg_t){ a(); } );
node_t B(g, [](msg_t){ b(); } );
node_t C(g, [](msg_t){ c(); } );
node_t D(g, [](msg_t){ d(); } );
node_t E(g, [](msg_t){ e(); } );
node_t F(g, [](msg_t){ f(); } );
make_edge(A, B);
make_edge(B, C);
make_edge(B, D);
make_edge(A, E);
make_edge(E, D);
make_edge(E, F);
A.try_put( continue_msg() );
g.wait_for_all();
return 0;
}
该图的一种可能执行如下所示。 D 的执行直到 B 和 E 都完成后才开始。 当一个任务在 wait_for_all
中等待时,它的线程可以参与执行 oneTBB 工作池中的其他任务。
同样,重要的是要注意流程图中的所有执行都是异步发生的。 对 A.try_put
的调用在增加计数器并生成一个任务来执行 A 的主体后,快速将控制权返回给调用线程。 同样,主体任务执行 lambda 表达式,然后将 continue_msg
放到所有后继节点(如果有)。只有对 wait_for_all
的调用会阻塞,因为它应该阻塞,即使在这种情况下,调用线程也可以在等待时用于执行 oneTBB 工作池中的任务。
上面的时间线显示了当有足够的线程来执行所有可以并行并发执行的任务时的顺序。 如果线程较少,那么产生的一些任务将需要等待,直到有线程可以执行它们。
Predefined Node Types
预定义节点类型 | 描述 |
---|---|
input_node | 具有通用输出类型的单输出节点。激活后,它会执行用户代码(Body 模板参数)以生成其输出。如果下游节点已接受先前生成的输出,则调用其 Body 。否则,先前的输出会被临时缓冲,直到它被下游接受,然后再次调用 Body 。 |
function_node | 一个单输入单输出节点,将其输出广播给所有后继节点。具有通用输入和输出类型。执行一个用户代码,并发级别和缓冲策略可控。对于每个输入,只返回一个输出。 |
continue_node | 一个单输入单输出节点,将其输出广播给所有后继节点。它有一个输入,需要 1 个或多个 continue_msg 类型的输入,并具有通用输出类型。当它在其输入处接收到 N 个 continue_msg 对象时,它会执行一次用户代码。 N 等于前驱数加上在构造时分配的任何额外偏移量。 |
multifunction_node | 单输入多输出节点。它有一个通用输入类型和几个通用输出类型。它执行一次用户代码,并且具有可控的并发级别和缓冲策略。正文可以在每个输出端口上输出零个或多个消息。 |
broadcast_node | 一个单输入单输出节点,将收到的每条消息广播给所有后继节点。它的输入和输出是相同的泛型类型。它不缓冲消息。 |
buffer_node、queue_node、priority_queue_node 和 sequencer_node | 单输入单输出节点,缓冲消息并将其输出发送给一个后继节点。发送消息的顺序是特定于节点的。这些节点的独特之处在于它们仅发送给单个后继而不是所有后继。 |
join_node | 多输入单输出节点。有几种通用输入类型,输出类型是这些通用类型的元组。该节点组合来自每个输入端口的一条消息以创建一个向所有后继者广播的元组。用于组合消息的策略可以选择为排队、保留或标签匹配。 |
split_node | 单输入多输出节点。输入类型是泛型类型的元组,元组中的每种类型都有一个输出端口。该节点接收一个值元组并在相应的输出端口上输出该元组的每个元素。 |
write_once_node, overwrite_node | 单输入单输出节点缓冲单个消息并将其输出广播给所有后继节点。广播后,节点保留收到的最后一条消息,因此它可供任何未来的后继者使用。 write_once_node 将只接受它收到的第一条消息,而 overwrite_node 将接受所有消息,将它们广播给所有后继者,并用新值替换旧值。 |
limiter_node | 一个多输入、单输出节点,将其输出广播给所有后继节点。主输入类型和输出类型是相同的泛型类型。当节点广播消息时,它会增加一个内部计数器。如果增量导致它达到用户分配的阈值,它将不再广播消息。一个特殊的输入端口可用于调整内部计数,允许广播更多的消息。该节点不缓冲消息。 |
indexer_node | 一个多输入、单输出的节点,将其输出消息广播给它的所有后继节点。输入类型是一个泛型类型列表,输出类型是一个 tagged_msg 。消息是输入中列出的类型之一,标签标识接收消息的端口。消息在到达输入端口时被单独广播。 |
composite_node | 可能有 0、1 或多个端口用于输入和输出的节点。 composite_node 将一组其他节点打包在一起,并维护一个对与其相邻的端口的引用的元组。这允许复合节点的相应端口用于制作迄今为止由复合节点中的实际节点制作的边。 |
async_node(预览功能) | 允许流图与用户或其他运行时管理的外部活动进行通信的节点。该节点接收通用类型的消息,调用用户代码将消息提交给外部活动。外部活动可以使用一个特殊的接口来返回一个泛型类型并把它放到 async_node 的所有后继者中。 |