码迷,mamicode.com
首页 > Windows程序 > 详细

Flink 1.8 DataStream API Programming Guide 数据流API编程指南

时间:2019-08-18 11:58:03      阅读:153      评论:0      收藏:0      [点我收藏+]

标签:cut   state   tca   程序开发   nconf   也会   table   flat   ocs   

 
Flink中的DataStream程序是实现数据流转换的常规程序(例如,过滤,更新状态,定义窗口,聚合)。 最初从各种源(例如,消息队列,套接字流,文件)创建数据流。 结果通过接收器(sink)返回,接收器可以例如将数据写入文件或标准输出(例如命令行终端)。 Flink程序可以在各种环境中运行,独立运行或嵌入其他程序中。 执行可以在本地JVM中执行,也可以在许多计算机的集群上执行。

示例程序(Example Program)

以下程序是流窗口字数统计应用程序的完整工作示例,它在5秒窗口中对来自Web套接字的单词进行计数。 您可以复制并粘贴代码以在本地运行它。
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class WindowWordCount {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream("localhost", 9999)
                .flatMap(new Splitter())
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1);

        dataStream.print();

        env.execute("Window WordCount");
    }

    public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word: sentence.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }

}
要运行示例程序,首先从终端使用netcat启动输入流:
nc -lk 9999
只需键入一些单词就可以返回一个新单词。 这些将是字数统计程序的输入。 如果要查看大于1的计数,请在5秒内反复键入相同的单词(如果不能快速输入,则将窗口大小从5秒增加?)。
 

数据源(Data Sources)

源是您的程序从中读取输入的位置。 您可以使用StreamExecutionEnvironment.addSource(sourceFunction)将源附加到程序。 Flink附带了许多预先实现的源函数,但您可以通过为非并行源实现SourceFunction,或者通过实现ParallelSourceFunction接口或为并行源扩展RichParallelSourceFunction来编写自己的自定义源。
 
可以从StreamExecutionEnvironment访问几个预定义的流源:
基于文件(File-based):
  • readTextFile(path)
  • readFile(fileInputFormat, path)
  • readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)
基于套接字(Socket-based):
  • socketTextStream - Reads from a socket. Elements can be separated by a delimiter.
基于集合(Collection-based):
  • fromCollection(Collection)
  • fromCollection(Iterator, Class)
  • fromElements(T ...)
  • fromParallelCollection(SplittableIterator, Class)
  • generateSequence(from, to)
定制(Custom):
  • addSource - 附加新的源函数。
 

数据流转换(DataStream Transformations)

有关可用流转换的概述,请参阅运算符
 

数据接收器(Data Sinks)

数据接收器使用DataStream并将它们转发到文件,套接字,外部系统或打印它们。 Flink带有各种内置输出格式,这些格式封装在DataStreams上的操作后面:
  • writeAsText() / TextOutputFormat
  • writeAsCsv(...) / CsvOutputFormat
  • print() / printToErr()
  • writeUsingOutputFormat() / FileOutputFormat
  • writeToSocket
  • addSink 调用自定义接收器功能。 Flink绑定了其他系统(如Apache Kafka)的连接器,这些系统实现为接收器函数。

迭代(Iterations)

迭代流程序实现步进功能并将其嵌入到IterativeStream中。 由于DataStream程序可能永远不会完成,因此没有最大迭代次数。 相反,您需要指定流的哪个部分反馈到迭代,哪个部分使用拆分(split)转换或过滤器(filter)向下游转发。 在这里,我们展示了使用过滤器的示例。 首先,我们定义一个IterativeStream。
IterativeStream<Integer> iteration = input.iterate();
然后,我们使用一系列转换指定将在循环内执行的逻辑(这里是一个简单的映射(map)转换)
DataStream<Integer> iterationBody = iteration.map(/* this is executed many times */); 
要关闭迭代并定义迭代尾部,请调用IterativeStream的closeWith(feedbackStream)方法。 给closeWith函数的DataStream将反馈给迭代头。 常见的模式是使用过滤器来分离反馈的流的一部分和向前传播的流的一部分。 这些过滤器可以例如定义“终止”逻辑,其中允许元件向下游传播而不是反馈。
iteration.closeWith(iterationBody.filter(/* one part of the stream */));
DataStream<Integer> output = iterationBody.filter(/* some other part of the stream */);
例如,这里是从一系列整数中连续减去1直到它们达到零的程序:
DataStream<Long> someIntegers = env.generateSequence(0, 1000);

IterativeStream<Long> iteration = someIntegers.iterate();

DataStream<Long> minusOne = iteration.map(value -> value - 1);

DataStream<Long> stillGreaterThanZero = minusOne.filter(value -> (value > 0));

iteration.closeWith(stillGreaterThanZero);

DataStream<Long> lessThanZero = minusOne.filter(value -> (value <= 0));

执行参数(Execution Parameters)

StreamExecutionEnvironment包含ExecutionConfig,它允许为运行时设置特定于作业的配置值。
有关大多数参数的说明,请参阅执行配置。 这些参数特别适用于DataStream API:
  • setAutoWatermarkInterval(long milliseconds):设置自动水印输出的间隔。 你可以使用long getAutoWatermarkInterval()获取当前值
 

容错(Fault Tolerance)

State&Checkpointing描述了如何启用和配置Flink的检查点机制。
 

控制延迟(Controlling Latency)

默认情况下,元素不会逐个传输到网络上(这会导致不必要的网络流量),但会被缓冲。 可以在Flink配置文件中设置缓冲区的大小(实际在计算机之间传输)。 虽然此方法有利于优化吞吐量(throughput),但当传入流速度不够快时,可能会导致延迟问题。 要控制吞吐量和延迟,可以在执行环境(或单个运算符)上使用env.setBufferTimeout(timeoutMillis)来设置缓冲区填充的最长等待时间。 在此之后,即使缓冲区未满,也会自动发送缓冲区。 此超时的默认值为100毫秒。
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setBufferTimeout(timeoutMillis);

env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
要最大化吞吐量,请设置setBufferTimeout(-1),这将删除超时,缓冲区只有在满时才会刷新。 要最小化延迟,请将超时设置为接近0的值(例如5或10 ms)。 应避免缓冲区超时为0,因为它可能导致严重的性能下降。
 

调试(Debugging)

在分布式集群中运行流处理程序之前,最好确保实现的算法按预期工作。 因此,实现数据分析程序通常是检查结果,调试和改进的增量过程。
 
Flink通过支持IDE内的本地调试,测试数据的注入和结果数据的收集,提供了显著简化数据分析程序开发过程的功能。 本节提供了一些如何简化Flink程序开发的提示。
 

本地执行环境(Local Execution Environment)

LocalStreamEnvironment在其创建的同一JVM进程中启动Flink系统。如果从IDE启动LocalEnvironment,则可以在代码中设置断点并轻松调试程序。
创建LocalEnvironment并使用如下:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

DataStream<String> lines = env.addSource(/* some source */);
// build your program

env.execute();

集合数据源(Collection Data Sources)

Flink提供了特殊的数据源,这些数据源由Java集合支持,以方便测试。 一旦程序经过测试,源和接收器可以很容易地被读取/写入外部系统的源和接收器替换。
 
集合数据源可以如下使用:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

// Create a DataStream from a list of elements
DataStream<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);

// Create a DataStream from any Java collection
List<Tuple2<String, Integer>> data = ...
DataStream<Tuple2<String, Integer>> myTuples = env.fromCollection(data);

// Create a DataStream from an Iterator
Iterator<Long> longIt = ...
DataStream<Long> myLongs = env.fromCollection(longIt, Long.class);
注意:目前集合数据源要求数据类型和迭代器实现Serializable。 此外,集合数据源不能并行执行(并行度= 1)。

迭代数据接收器(Iterator Data Sink)

Flink还提供了一个接收器来收集DataStream结果,以便进行测试和调试。 它可以使用如下:
import org.apache.flink.streaming.experimental.DataStreamUtils

DataStream<Tuple2<String, Integer>> myResult = ...
Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult) 
注意:flink-streaming-contrib模块已从Flink 1.5.0中删除。 它的类已经转移到了flink-streaming-java和flink-streaming-scala。

Flink 1.8 DataStream API Programming Guide 数据流API编程指南

标签:cut   state   tca   程序开发   nconf   也会   table   flat   ocs   

原文地址:https://www.cnblogs.com/sxpujs/p/11371561.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!