标签:socket tuple runtime code lov apache extends seconds static
1.自定义sink
在flink中,sink负责最终数据的输出。使用DataStream实例中的addSink方法,传入自定义的sink类
定义一个printSink(),使得其打印显示的是真正的task号(默认的情况是task的id+1)
MyPrintSink
package cn._51doit.flink.day02; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction; public class MyPrintSink<T> extends RichSinkFunction<T> { @Override public void invoke(T value, Context context) throws Exception { int index = getRuntimeContext().getIndexOfThisSubtask(); System.out.println(index + " > " + value); } }
MyPrintSinkDemo
package cn._51doit.flink.day02; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; public class MyPrintSinkDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> lines = env.socketTextStream("localhost", 8888); SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] words = value.split(" "); for (String word : words) { out.collect(Tuple2.of(word, 1)); } } }); SingleOutputStreamOperator<Tuple2<String, Integer>> res = wordAndOne.keyBy(0).sum(1); res.addSink(new MyPrintSink<>()); env.execute(); } }
2. StreamingSink
用的比较多,可以将结果输出到本地或者hdfs中去,并且支持exactly once
package cn._51doit.flink.day02; import akka.remote.WireFormats; import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import java.util.concurrent.TimeUnit; public class StreamFileSinkDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> lines = env.socketTextStream("localhost", 8888); SingleOutputStreamOperator<String> upper = lines.map(String::toUpperCase); String path = "E:\\flink"; env.enableCheckpointing(10000); StreamingFileSink<String> sink = StreamingFileSink .forRowFormat(new Path(path), new SimpleStringEncoder<String>("UTF-8")) .withRollingPolicy( DefaultRollingPolicy.builder() // 滚动生成文件的最长时间 .withRolloverInterval(TimeUnit.SECONDS.toMillis(30)) // 间隔多长时间没写文件,则文件滚动 .withInactivityInterval(TimeUnit.SECONDS.toMillis(10)) // 文件大小超过1m,则滚动 .withMaxPartSize(1024 * 1024 * 1024) .build()) .build(); upper.addSink(sink); env.execute(); } }
标签:socket tuple runtime code lov apache extends seconds static
原文地址:https://www.cnblogs.com/jj1106/p/13149681.html