标签:split tuple exe ati single 加载 执行环境 int 指定
流式处理WordCount:
public class StreamWordCount {
public static void main(String[] args) throws Exception {
//创建一个流处理的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//接受socket数据流
DataStreamSource<String> textDataSteam = env.socketTextStream("localhost",7777);
//逐一读取数据,打散之后进行WordCount(逻辑计算)
SingleOutputStreamOperator<Tuple2<String, Integer>> wordCountDataStream = textDataSteam
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] tokens = s.split(" ");
for (String token : tokens) {
if (token.length() > 0) {
collector.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
})
.filter(new FilterFunction<Tuple2<String, Integer>>() {
public boolean filter(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
if (stringIntegerTuple2.equals(null)) {
return false;
}
return true;
}
})
.keyBy(0)
.sum(1);
//打印输出
wordCountDataStream.print();
//执行任务
env.execute("StreamWordCountJob");
//测试需要开启端口7777
}
}
整个Flink程序一共分为5步:设定Flink执行环境、创建和加载数据集、对数据集指定转换操作逻辑、指定计算结果输出位置
标签:split tuple exe ati single 加载 执行环境 int 指定
原文地址:https://www.cnblogs.com/xinjitu-001/p/13401491.html