用Nifi合并二个API、计算并生成新的API

1. 全景图

用Nifi合并二个API、计算并生成新的API

用Nifi合并二个API、计算并生成新的API

用Nifi合并二个API、计算并生成新的API

 

2. 合并

根据attribute合并flowfile:

用Nifi合并二个API、计算并生成新的API

 

合并 json, 并增加code,message等:

用Nifi合并二个API、计算并生成新的API

 

3. 计算方差:

在ExecuteScript里只能用纯python, 很多第三方包都不能用;并把计算的值插入到json里,输出。

 

import simplejson as json
#from scipy.stats import f_oneway
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
jsonData = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
data = json.loads(jsonData)
values = [float(i['fltValue']) for i in data["data"]]
firsts = [float(i['first']) for i in data["data"]]
seconds = [float(i['second']) for i in data["data"]] def stdDeviation(a):
count = len(a)
if count < 2: return 0
avg = sum(a)/count
result = 0.0
for i in a: result += (i - avg)**2
return (result/(count - 1))**0.5 v = stdDeviation(values)
f = stdDeviation(firsts)
s = stdDeviation(seconds) data["valueDev"] = v
data["firstDev"] = f
data["secondDev"] = s outputStream.write(bytearray(json.dumps(data, indent=4).encode('utf-8'))) flowFile = session.get()
if (flowFile != None):
flowFile = session.write(flowFile,PyStreamCallback())
session.transfer(flowFile, REL_SUCCESS)

 

4. 最终效果:

第一个API:

用Nifi合并二个API、计算并生成新的API

第二个API:

用Nifi合并二个API、计算并生成新的API

 

最后合并生成的API:

用Nifi合并二个API、计算并生成新的API

用Nifi合并二个API、计算并生成新的API

可视化图:

用Nifi合并二个API、计算并生成新的API

 

 

NIFI 中国社区 QQ群:595034369

上一篇:程序设计入门-C语言基础知识-翁恺-第七周:指针与字符串-详细笔记(七)


下一篇:利用ant脚本 自动构建svn增量/全量 系统程序升级包【转】