码迷,mamicode.com
首页 > 其他好文 > 详细

flink02------1.自定义source

时间:2020-06-16 23:19:16      阅读:61      评论:0      收藏:0      [点我收藏+]

标签:socket   tuple   runtime   code   lov   apache   extends   seconds   static   

1.自定义sink

  在flink中,sink负责最终数据的输出。使用DataStream实例中的addSink方法,传入自定义的sink类

定义一个printSink(),使得其打印显示的是真正的task号(默认的情况是task的id+1)

MyPrintSink

技术图片
package cn._51doit.flink.day02;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

public class MyPrintSink<T> extends RichSinkFunction<T> {

    @Override
    public void invoke(T value, Context context) throws Exception {

        int index = getRuntimeContext().getIndexOfThisSubtask();

        System.out.println(index + " > " + value);
    }
}
View Code

MyPrintSinkDemo

技术图片
package cn._51doit.flink.day02;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class MyPrintSinkDemo {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        });
        SingleOutputStreamOperator<Tuple2<String, Integer>> res = wordAndOne.keyBy(0).sum(1);

        res.addSink(new MyPrintSink<>());

        env.execute();
    }
}
View Code

 

2. StreamingSink

 用的比较多,可以将结果输出到本地或者hdfs中去,并且支持exactly once

技术图片
package cn._51doit.flink.day02;


import akka.remote.WireFormats;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

import java.util.concurrent.TimeUnit;

public class StreamFileSinkDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
        SingleOutputStreamOperator<String> upper = lines.map(String::toUpperCase);
        String path = "E:\\flink";

        env.enableCheckpointing(10000);

        StreamingFileSink<String> sink = StreamingFileSink
                .forRowFormat(new Path(path), new SimpleStringEncoder<String>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                // 滚动生成文件的最长时间
                                .withRolloverInterval(TimeUnit.SECONDS.toMillis(30)) 
                                // 间隔多长时间没写文件,则文件滚动
                                .withInactivityInterval(TimeUnit.SECONDS.toMillis(10))
                                // 文件大小超过1m,则滚动
                                .withMaxPartSize(1024 * 1024 * 1024)
                                .build())
                .build();
        upper.addSink(sink);
        env.execute();

    }
}
View Code

 

 

flink02------1.自定义source

标签:socket   tuple   runtime   code   lov   apache   extends   seconds   static   

原文地址:https://www.cnblogs.com/jj1106/p/13149681.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!