标签:local env window link adt count() imp code 运行
1.运行环境
有一些三种方式获取当前环境
getExecutionEnvironment()
createLocalEnvironment()
createRemoteEnvironment
2.批处理
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> text = env.readTextFile("file:///o.txt");
text.print()
3.map操作
text.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String s) throws Exception {
return new Tuple2<>(s, 1);
}
});
4.table操作
import org.apache.flink.table.api.* import static org.apache.flink.table.api.Expressions.* // environment configuration ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); // register Orders table in table environment // ... // specify table program Table orders = tEnv.from("Orders"); // schema (a, b, c, rowtime) Table counts = orders .groupBy($("a")) .select($("a"), $("b").count().as("cnt")); // conversion to DataSet DataSet<Row> result = tEnv.toDataSet(counts, Row.class); result.print();
Table orders = tEnv.from("Orders"); // schema (a, b, c, rowtime)
Table result = orders
.filter(
and(
$("a").isNotNull(),
$("b").isNotNull(),
$("c").isNotNull()
))
.select($("a").lowerCase().as("a"), $("b"), $("rowtime"))
.window(Tumble.over(lit(1).hours()).on($("rowtime")).as("hourlyWindow"))
.groupBy($("hourlyWindow"), $("a"))
.select($("a"), $("hourlyWindow").end().as("hour"), $("b").avg().as("avgBillingAmount"));
5.batch和stream
Table orders = tableEnv.from("Orders"); // Distinct aggregation on group by Table groupByDistinctResult = orders .groupBy($("a")) .select($("a"), $("b").sum().distinct().as("d")); // Distinct aggregation on time window group by Table groupByWindowDistinctResult = orders .window(Tumble .over(lit(5).minutes())) .on($("rowtime")) .as("w") ) .groupBy($("a"), $("w")) .select($("a"), $("b").sum().distinct().as("d")); // Distinct aggregation on over window Table result = orders .window(Over .partitionBy($("a")) .orderBy($("rowtime")) .preceding(UNBOUNDED_RANGE) .as("w")) .select( $("a"), $("b").avg().distinct().over($("w")), $("b").max().over($("w")), $("b").min().over($("w")) );
标签:local env window link adt count() imp code 运行
原文地址:https://www.cnblogs.com/yangyang12138/p/13352682.html