标签:package aggregate 技术 filter parse cal rom artifact def
POM文件需要导入的依赖:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>${flink.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.12</artifactId> <version>1.7.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.12</artifactId> <version>1.7.1</version> </dependency>
package flink; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; public class WordExample { public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //创建构建字符串的数据集 DataSet<String> text = env.fromElements( "flink test","" + "I think I hear them. Stand, ho! Who‘s there?"); //分割字符串,按照key进行分组,统计相同的key个数 DataSet<Tuple2<String, Integer>> wordCount = text.flatMap(new LineSplitter()) .groupBy(0).sum(1); wordCount.print(); } }
package flink; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; public class LineSplitter implements FlatMapFunction<String, Tuple2<String,Integer>> { @Override public void flatMap(String o, Collector<Tuple2<String, Integer>> collector) throws Exception { for (String word : o.split(" ")) { collector.collect(new Tuple2<String, Integer>(word,1)); } } }
package flink import org.apache.flink.api.scala._ object WordCountExample { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val text = env.fromElements("Who‘s there?", "I think I hear them. Stand, ho! Who‘s there?") val counts = text.flatMap(_.toLowerCase().split("\\W+")filter(_.nonEmpty)) .map((_,1)).groupBy(0).sum(1) counts.print() } }
package flink; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; public class WordCount { public static void main(String[] args) throws Exception { final int port; try { final ParameterTool params = ParameterTool.fromArgs(args); port = params.getInt("port"); } catch (Exception e) { System.out.println("No port specified.Please run ‘SocketWindowWordCount--port <port>‘"); return; } //get the execution enviroment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //get input data by connecting to the socket DataStream<String> text = env.socketTextStream("localhost", port, ‘\n‘); //parse the data,group it.window it,and aggregeate the counts DataStream<WordWithCount> windowCounts = text .flatMap(new FlatMapFunction<String, WordWithCount>() { @Override public void flatMap(String s, Collector<WordWithCount> collector) { for (String word : s.split("\\s")) { collector.collect(new WordWithCount(word, 1L)); } } }).keyBy("word").timeWindow(Time.seconds(10), Time.seconds(5)) .reduce(new ReduceFunction<WordWithCount>() { @Override public WordWithCount reduce(WordWithCount wordWithCount, WordWithCount t1) throws Exception { return new WordWithCount(wordWithCount.word, wordWithCount.count + t1.count); } }); //print the result with a single thread,rather than in parallel windowCounts.print().setParallelism(1); env.execute("Socket Window WordCount"); } }
package flink; public class WordWithCount { public String word; public long count; public WordWithCount() { } public WordWithCount(String word, long count) { this.word = word; this.count = count; } @Override public String toString() { return word + ":" + count; } }
package flink import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.time.Time object SokcetWindowWordCount { case class WordWithCount(word: String, count: Long) def main(args: Array[String]): Unit = { //the port to connect to val port: Int = try { ParameterTool.fromArgs(args).getInt("port") } catch { case e: Exception => { System.err.println("No port specified.Please run ‘SocketWindowWordCount --port<port>‘") return } } //get the execution enviroment val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //parse input data by connecting to the socket val text = env.socketTextStream("localhost", port, ‘\n‘) //parse the data.group it.window it.and aggregate the counts val windowCount = text .flatMap{w => w.split("\\s")} .map{w => WordWithCount(w, 1)} .keyBy("word") .timeWindow(Time.seconds(10), Time.seconds(5)) .sum("count") //print the results with a single thread ,rather than in parallel windowCount.print().setParallelism(1) env.execute("Socket Window WordCount") } }
运行,传参:
终端使用nc命令进行模拟发送数据到9999端口
运行结果:
注意事项:
千万不要把包导错了,java就导java,scala就导scala,如果导错,程序跑步起来
标签:package aggregate 技术 filter parse cal rom artifact def
原文地址:https://www.cnblogs.com/Gxiaobai/p/10290990.html