码迷,mamicode.com
首页 > Windows程序 > 详细

用Nifi合并二个API、计算并生成新的API

时间:2016-08-30 22:42:42      阅读:549      评论:0      收藏:0      [点我收藏+]

标签:

1. 全景图

技术分享

技术分享

技术分享

?

2. 合并

根据attribute合并flowfile:

技术分享

?

合并 json, 并增加code,message等:

技术分享

?

3. 计算方差:

在ExecuteScript里只能用纯python, 很多第三方包都不能用;并把计算的值插入到json里,输出。

?

import simplejson as json #from scipy.stats import f_oneway import java.io from org.apache.commons.io import IOUtils from java.nio.charset import StandardCharsets from org.apache.nifi.processor.io import StreamCallback class PyStreamCallback(StreamCallback): def __init__(self): pass def process(self, inputStream, outputStream): jsonData = IOUtils.toString(inputStream, StandardCharsets.UTF_8) data = json.loads(jsonData) values = [float(i[‘fltValue‘]) for i in data["data"]] firsts = [float(i[‘first‘]) for i in data["data"]] seconds = [float(i[‘second‘]) for i in data["data"]] def stdDeviation(a): count = len(a) if count < 2: return 0 avg = sum(a)/count result = 0.0 for i in a: result += (i - avg)**2 return (result/(count - 1))**0.5 v = stdDeviation(values) f = stdDeviation(firsts) s = stdDeviation(seconds) data["valueDev"] = v data["firstDev"] = f data["secondDev"] = s outputStream.write(bytearray(json.dumps(data, indent=4).encode(‘utf-8‘))) flowFile = session.get() if (flowFile != None): flowFile = session.write(flowFile,PyStreamCallback()) session.transfer(flowFile, REL_SUCCESS)

?

4. 最终效果:

第一个API:

技术分享

第二个API:

技术分享

?

最后合并生成的API:

技术分享

技术分享

可视化图:

技术分享

?

用Nifi合并二个API、计算并生成新的API

标签:

原文地址:http://www.cnblogs.com/fengwenit/p/5823827.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!