《Storm分布式实时计算模式》——3.4 Trident运算

本节书摘来自华章计算机《Storm分布式实时计算模式》一书中的第3章,第3.4节,作者:(美)P. Taylor Goetz Brian O’Neill 更多章节内容可以访问云栖社区“华章计算机”公众号查看。

3.4 Trident运算

时间戳已经生成好了,下一步是加入处理事件的逻辑组件。在Trident中,这些组件称为运算(operation)。在我们的topology中,使用两种不同的运算:filter和function。
运算通过Stream对象的方法来调用。这个例子中,我们使用了Stream对象的下述方法:


《Storm分布式实时计算模式》——3.4 Trident运算https://yqfile.alicdn.com/46e7bea64528ebdb3f64b3cb53140489e03591ff.png
" >

注意前面代码中列出的方法返回形式为Stream对象或者TridentState对象,返回可以用来创建新的数据流。因此,运算可以连在一起使用流式接口形式的Java代码。让我们再看看示例topology中的关键代码:

《Storm分布式实时计算模式》——3.4 Trident运算https://yqfile.alicdn.com/f0d2d1524271a5239e587c3c0700475aeec9a7b1.png
" >

通常,应用运算需要声明一个输入域集合和一个输出域集合,也就是funcition域。上面代码中topology第二行声明我们需要CityAssignment对数据流中的每个tuple执行操作。在每个tuple中,CityAssignment会在event字段上运算并且增加一个叫做city的新字段,这个字段会附在tuple中向后发射。
每个操作在流式风格的语法上略有不同,这取决于操作需要哪些信息。下面将介绍不同操作的详细语法和语义。
3.4.1 Trident filter
我们topology逻辑中的第一部分就是个过滤器filter,它会忽略掉我们不关心的疾病事件。在这个例子中,系统只关心脑膜炎(meningitis)的病情,从前面表格中看到,脑膜炎对应的疾病代码是320、321和322。
为了通过疾病代码过滤事件,我们需要利用Trident filter。Trident通过提供BaseFilter类,我们通过实现子类就可以方便地对tuple进行过滤,滤除系统不需要的tuple。BaseFilter类实现了Filter接口,这个接口如下面代码片段所示:


《Storm分布式实时计算模式》——3.4 Trident运算

为了在数据流中过滤tuple,应用需要通过继承BaseFilter类来实现这个接口。这个例子中,我们使用下述过滤器过滤事件:

《Storm分布式实时计算模式》——3.4 Trident运算

上面的代码中,我们从tuple中提取了DiagnosisEvent并且检查疾病代码。因为所有的脑膜炎代码小于等于322,我们也没有发送其他代码,所以只需要简单地检查代码是否小于322,就可以决定事件是否和脑膜炎有关。
Filter操作结果返回True的tuple将会被发送到下游进行操作。如果方法返回False,该tuple就不会发送到下游。
在我们的topology中,我们在数据流上使用each(inputFields,filter)方法,将这个过滤器应用到数据流的每个tuple中:

《Storm分布式实时计算模式》——3.4 Trident运算

3.4.2 Trident function
在filter之外,Storm还提供了一个更通用功能的接口function。function和Storm的bolt类似,读取tuple并且发送新的tuple。其中一个区别是,Trident function只能添加数据。function发送数据时,将新字段添加在tuple中,并不会删除或者变更已有的字段。
function接口如下代码片段所示:

《Storm分布式实时计算模式》——3.4 Trident运算https://yqfile.alicdn.com/beff14fd4d5d776524b631d21ff364c68ce26b01.png
" >

和Storm的bolt类似,function实现了一个包括实际逻辑的方法execute。function的实现也可以选用TridentCollector来发送tuple到新的function中。用这种方式,function也可以用来过滤tuple,起到filter的作用。
我们topology中的第一个function是CityAssignment,如下所示:

《Storm分布式实时计算模式》——3.4 Trident运算https://yqfile.alicdn.com/96f5af484cf5f33107478974af7e25235245791e.png
" >


《Storm分布式实时计算模式》——3.4 Trident运算

在这个function中,我们使用静态初始化的方式建立了一个我们关心的城市的地图。示例中,function包括一个地图,存储了的坐标信息包括:Philadelphia(PHL)、New York City(NYC)、San Francisco(SF)和LosAngeles(LA)。
在execute()方法中,函数遍历城市计算事件和城市之间的距离。现实系统中,地理空间的索引效率会高很多。
function声明的字段数量必须和它发射出值的字段数一致。如果不一致,Storm就会抛出IndexOutOfBoundsException异常。
我们topology中的下一个function是HourAssignment,用来转化Unix时间戳为纪元时间的小时,可以用来对事件发生进行时间上的分组操作。HourAssignment的代码如下:

《Storm分布式实时计算模式》——3.4 Trident运算


《Storm分布式实时计算模式》——3.4 Trident运算

我们重写了这个function,同时发射了小时的数值,以及由城市、疾病代码、小时组合而成的key。实际上,这个组合值会作为聚合计数的唯一标识符,后面会详细解释。
我们topology中最后两个funciton是用来侦测疾病暴发并且告警的。OutbreakDetector类的代码如下:

《Storm分布式实时计算模式》——3.4 Trident运算

这个function提取出了特定城市、疾病、时间的发生次数,并且检查计数是否超过了设定的阈值。如果超过,发送一个新的字段包括一条告警信息。在上面代码里,注意这个function实际上扮演了一个过滤器的角色,但是却作为一个function的形式来实现,是因为需要在tuple中添加新的字段。因为filter不能改变tuple,当我们既想过滤又想添加字段时必须使用function。
最后一个function的功能就是发布一个告警(并且结束程序)。代码如下:


《Storm分布式实时计算模式》——3.4 Trident运算


《Storm分布式实时计算模式》——3.4 Trident运算https://yqfile.alicdn.com/f6fa126f5a8efa4d10f9909240f12104e8f1e658.png" >

这个方法非常简单,提取了告警的内容,并写入日志,最后结束整个程序。
上一篇:python的工作记录A


下一篇:沫沫金讲:Eclipse卡死cup内存双爆,取消验证无效--看这里修改.project跟我做