标签:
Flink 的流数据处理程序是常规的程序 ,通过再流数据上,实现了各种转换 (比如 过滤, 更新中间状态, 定义窗口, 聚合)。流数据可以来之多种数据源 (比如, 消息队列, socket 流, 文件). 通过sink组件落地流计算的最终结果,比如可以把数据落地文件系统,标准输出流比如命令行界面, Flink 的程序可以运行在多种上下文环境 ,可以单独只是Flink api,也可以嵌入其他程序. execution可以运行在本地的 JVM里, 也可以 运行在多台机器的集群中。为了创建你的流数据处理程序,,我们建议您从程序骨架开始,然后逐步添加您自己的transformations。后面的章节,是一些附加的操作,和一些高级功能。
下面是一个可运行的完整的例子 ,带窗口的流数据wordcount程序, 数据源来自一个每5秒一次的socket. 你可以复制黏贴并本地运行.
1 public class WindowWordCount { 2 public static void main(String[] args) throws Exception { 3 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 4 DataStream<Tuple2<String, Integer>> dataStream = env.socketTextStream("localhost", 9999) 5 .flatMap(new Splitter()) 6 .keyBy(0) 7 .timeWindow(Time.of(5, TimeUnit.SECONDS)) 8 .sum(1); 9 dataStream.print(); 10 env.execute("Window WordCount"); 11 } 12 public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> { 13 @Override 14 public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception { 15 for (String word: sentence.split(" ")) { 16 out.collect(new Tuple2<String, Integer>(word, 1)); 17 } 18 } 19 } 20 }
为了运行这个程序, 启动netcat在terminal中,敲下面这段:
nc -lk 9999
随便敲几个字. 这几个字将作为例子程序的输入. 如果你在5秒内,按了重复的字符,他们的count将会超过1. (如果你敲得不够快,可以提高5秒这个设置?).
为了可以写Flink的代码, 你需要导入对应的语言的DataStream 的依赖包到你的工程里。
最简单的做法是使用quickstart 脚本: either for Java or for Scala. 你可以从模板中创建一个空工程 (a Maven Archetype), 这个工程已经准备好了一切编程所需的一切了.通过敲下面的代码,你可使用archetype 来手工的创建一个工程:
mvn archetype:generate / -DarchetypeGroupId=org.apache.flink/ -DarchetypeArtifactId=flink-quickstart-java / -DarchetypeVersion=1.0-SNAPSHOT
mvn archetype:generate / -DarchetypeGroupId=org.apache.flink/ -DarchetypeArtifactId=flink-quickstart-scala / -DarchetypeVersion=1.0-SNAPSHOT
这个archetypes依赖,稳定版本或者当前的版本(-SNAPSHOT
).
如果你想为一个存在的maven工程添加Flink,在你的pom文件里添加下面这段依赖。
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>1.0-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>1.0-SNAPSHOT</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala</artifactId> <version>1.0-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>1.0-SNAPSHOT</version> </dependency>
就像例子程序里看到的, Flink 的流数据编程就像大多数java程序一样的,是一个带main函数一样的java程序 . 每个程序由相同的基本部分组成:
ExecutionEnvironment
,我们现在会对每一个步骤一个概述,,请参阅有各个部分的关详细信息的。
StreamExecutionEnvironment
是所有 Flink 流数据程序的基础. 你可以通过treamExecutionEnvironment这个类的任意一个
静态方法获取 :
getExecutionEnvironment() createLocalEnvironment() createLocalEnvironment(int parallelism) createLocalEnvironment(int parallelism, Configuration customConfiguration) createRemoteEnvironment(String host, int port, String... jarFiles) createRemoteEnvironment(String host, int port, int parallelism, String... jarFiles)
一般来说,你只需要使用 getExecutionEnvironment()
, 因为这个方法会根据环境的上下文获取正确的对象: 如果你一般java程序一样,在IDE里执行你的程序,它会创建一个本地environment ,用来在本地机器上执行你的程序。如果你的程序打包成jar, 然后通过命令行或者web界面调用这个jar, Flink 的cluster manager 会执行你的主函数,此时getExecutionEnvironment()
会返回一个集群环境的execution来执行你的程序。
environment 有多个方法,可以用来定义不同的数据源 。包括文件系统, sockets, and 和外部系统. 你可以调用下面的代码来获取socket里的数据源,用来调试用:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> lines = env.socketTextStream("localhost", 9999)
这行代码会返回一个DataStream ,你可以在这个对象使用transformations. 更多的数据源相关的,可以阅读数据源那一章。
一旦获取了DataStream 对象,你可以调用transformations ,来创建一个新的DataStream。 然后你还可以写回socket, 或者继续调用transform , 或者和其他的DataStreams结合, 或者 把数据落地到其他外部系统(比如, 消息队列或者文件系统). 你可以通过调用DataStream的方法来调用各种不同的transformation。 并嵌入你自定义的函数到transformation中。举个例子, map transformation大概是像是下面这样的:
DataStream<String> input = ...; DataStream<Integer> intValues = input.map(new MapFunction<String, Integer>() { @Override public Integer map(String value) { return Integer.parseInt(value); } });
这段代码会产生一个新的DataStream ,并将原始流上的String类型的数据转换成Integer类型。更多的详细内容可以看transformation这一章。
一旦你有的一个含有你最终计算结果的DataStream, 你可以把结果落地到外部系统 (比如HDFS, Kafka, Elasticsearch), 或者写回socket, 或者写入到文件系统, 或者打印出来.
writeAsText(String path, ...) writeAsCsv(String path, ...) writeToSocket(String hostname, int port, ...) print() addSink(...)
一旦你编写好转换和落地等操作,你需要通过调用execute()
来触发程序开始执行,具体的执行方式依赖具体的StreamExecutionEnvironment
. 这个方法会再本地机器上执行,也可能在集群上提交这个程序。
env.execute();
下面除了例子是scala编写的其他和上面一样
As presented in the example, Flink DataStream programs look like regular Scala programs with a main()
method. Each program consists of the same basic parts:
StreamExecutionEnvironment
,We will now give an overview of each of those steps, please refer to the respective sections for more details.
The StreamExecutionEnvironment
is the basis for all Flink DataStream programs. You can obtain one using these static methods on classStreamExecutionEnvironment
:
def getExecutionEnvironment def createLocalEnvironment(parallelism: Int = Runtime.getRuntime.availableProcessors()) def createRemoteEnvironment(host: String, port: Int, jarFiles: String*) def createRemoteEnvironment(host: String, port: Int, parallelism: Int, jarFiles: String*)
Typically, you only need to use getExecutionEnvironment
, since this will do the right thing depending on the context: if you are executing your program inside an IDE or as a regular Java program it will create a local environment that will execute your program on your local machine. If you created a JAR file from your program, and invoke it through the command line or the web interface, the Flink cluster manager will execute your main method and getExecutionEnvironment()
will return an execution environment for executing your program on a cluster.
For specifying data sources the execution environment has several methods to read from files, sockets, and external systems using various methods. To just read data from a socket (useful also for debugging), you can use:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment DataStream<String> lines = env.socketTextStream("localhost", 9999)
This will give you a DataStream on which you can then apply transformations. For more information on data sources and input formats, please refer to Data Sources.
Once you have a DataStream you can apply transformations to create a new DataStream which you can then write to a file, transform again, combine with other DataStreams, or push to an external system. You apply transformations by calling methods on DataStream with your own custom transformation function. For example, a map transformation looks like this:
val input: DataStream[String] = ... val mapped = input.map { x => x.toInt }
This will create a new DataStream by converting every String in the original set to an Integer. For more information and a list of all the transformations, please refer to Transformations.
Once you have a DataStream containing your final results, you can push the result to an external system (HDFS, Kafka, Elasticsearch), write it to a socket, write to a file, or print it.
writeAsText(path: String, ...) writeAsCsv(path: String, ...) writeToSocket(hostname: String, port: Int, ...) print() addSink(...)
Once you specified the complete program you need to trigger the program execution by calling execute
on StreamExecutionEnvironment
. This will either execute on the local machine or submit the program for execution on a cluster, depending on the chosen execution environment.
env.execute()
流数据是一种相同类型的无限的数据集合。
Transformations会返回不同子类类型的of DataStream
,并且转换后的还可以继续transformations,比如 keyBy(…)
方法会返回KeyedDataStream
,这个也是流数据, 通过一个 key在本地做分区的流数据。还可以进行窗口操作。
Flink的所有流数据程序是延迟执的。当main函数执行后, 数据加载和转换不是立刻执行的,相反的,每一步操作会加入一个执行计划.。直到evn执行execute方法来启动程序,这个执行计划才会执行。不论是本地执行还是在集群上执行.
延迟执行让你可以构建复杂的程序,并且让flink执行起来,像是个完整的计划单元。
数据transformation让流数据产生新的流数据,. 程序可以结合多个流数据来构建复杂的应用拓扑结构。
这章给出了所有可用的transformations的详细说明。
Transformation | 详细说明 |
---|---|
Map DataStream → DataStream |
取一个元素并产生一个元素(一进对一出的意思)。下面的例子是一个map函数,该函数将输入流的值加倍: DataStream<Integer> dataStream = //... dataStream.map(new MapFunction<Integer, Integer>() { @Override public Integer map(Integer value) throws Exception { return 2 * value; } }); |
FlatMap DataStream → DataStream |
需要一个元素,并产生零个,一个或多个元素(void返回值,对返回无要求,依赖out如何发送)。下面的例子是一个flatmap功能拆分句子的话:
|
Filter DataStream → DataStream |
对每个元素执行boolean函数,过滤掉false的值。下面的例子是滤出零值的过滤器:
|
KeyBy DataStream → KeyedStream |
从逻辑上将一个流数据划分成不相交的分区,每个分区包含相同的键的元素。在内部是用通过哈希分区来实现的。查看如何指定键的键。这一转变返回keyeddatastream。下面例子展示如何定义分区。
|
Reduce KeyedStream → DataStream |
这是一个keyeddatastream特有的滚动的reduce功能, 多对一:对所有同key的元素进行传入的运算,将总的结果发送出去。
|
Fold DataStream→ DataStream |
有一个初始值(0),其他和上面一样。这是一个keyeddatastream特有的滚动的reduce功能, 多对一:对所有同key的元素进行传入的运算,将总的结果发送出去。
|
Aggregations KeyedStream → DataStream |
这是一个keyeddatastream特有的滚动的聚合功能. min 和minBy 的区别是min 返回最小值, minBy 返回有指定key的最小值,对应的元素。
|
Window KeyedStream → WindowedStream |
可以对分区完KeyedStreams进行分区. Windows根据每个key对应的数据的某些特征进行分组 (比如:每五秒到达的数据根据key划分为一个组). 后面有一章专门详细介绍windows
|
WindowAll DataStream → AllWindowedStream |
Windows也能在一般的DataStream上使用而不仅仅是对KeyedStream 。Windows能对所有的stream event 进行分组(比如:对最近的5秒的数据进行分组). 警告: 这是在许多情况下,一个非平行变换。所有的记录都会聚集在一个任务的windowall算子。
|
(Window) Apply WindowedStream → DataStream AllWindowedStream → DataStream |
对windowStream的每一个小窗口应用一个函数.。下面的例子是对每个window的数据做sum的操作. 注意: 如果你使用的是上面的 那个windowAll 的transformation, 你需要传递AllWindowFunction ,而不是windowFunction。
|
(Window) Reduce WindowedStream → DataStream |
对一个window里的数据做reduce,并返回reduce的结果。
|
(Window) Fold WindowedStream → DataStream |
对一个window里的数据做fold,并返回fold的结果。
|
windows上的聚合 WindowedStream → DataStream |
聚合window内的内容.。 min 和minBy 的区别是min 返回最小值, minBy 返回有指定key的最小值,对应的元素。
|
Union DataStream* → DataStream |
连接两个或者多个datastream,并创建一个包含这几个dataStream里的所有元素的新的dataStream。 注意: 如果你union同一个datastream,还是只能获取其中一个。
|
Window Join DataStream,DataStream → DataStream |
在一个window内,根据给定的key的条件是否满足,来join两个流。
|
Window CoGroup DataStream,DataStream → DataStream |
在一个window内,根据给定的key的条件是否满足,对两个流合并后,并分组。
|
Connect DataStream,DataStream → ConnectedStreams |
连接两个流,并保留同样的流类型. 连接后的两个流之间可以共享 state。
|
CoMap, CoFlatMap ConnectedStreams → DataStream |
map 和 flatMap在连接后的流中的效果是类似的。
|
Split DataStream → SplitStream |
根据某些标准,将一个流分离成两个或者多个流。
|
Select SplitStream → DataStream |
从分离后的流中,选出一个或者多个流。
|
Iterate DataStream → IterativeStream → DataStream |
Creates a "feedback" loop in the flow , by redirecting the output of one operator to some previous operator. This is especially useful for defining algorithms that continuously update a model. The following code starts with a stream and applies the iteration body continuously. Elements that are greater than 0 are sent back to the feedback channel, and the rest of the elements are forwarded downstream. See iterations for a complete description.
|
Extract Timestamps DataStream → DataStream |
可以抽出时间语义窗口里面的记录的时间戳,详情请见working with time
|
Transformation | Description |
---|---|
Map DataStream → DataStream |
Takes one element and produces one element. A map function that doubles the values of the input stream:
|
FlatMap DataStream → DataStream |
Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:
|
Filter DataStream → DataStream |
Evaluates a boolean function for each element and retains those for which the function returns true. A filter that filters out zero values:
|
KeyBy DataStream → KeyedStream |
Logically partitions a stream into disjoint partitions, each partition containing elements of the same key. Internally, this is implemented with hash partitioning. See keys on how to specify keys. This transformation returns a KeyedDataStream.
|
Reduce KeyedStream → DataStream |
A "rolling" reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value.
|
Fold DataStream → DataStream |
A "rolling" fold on a keyed data stream with an initial value. Combines the current element with the last folded value and emits the new value.
|
Aggregations KeyedStream → DataStream |
Rolling aggregations on a keyed data stream. The difference between min and minBy is that min returns the minimun value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).
|
Window KeyedStream → WindowedStream |
Windows can be defined on already partitioned KeyedStreams. Windows group the data in each key according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a description of windows.
|
WindowAll DataStream → AllWindowedStream |
Windows can be defined on regular DataStreams. Windows group all the stream events according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a complete description of windows. WARNING: This is in many cases a non-parallel transformation. All records will be gathered in one task for the windowAll operator.
|
Window Apply WindowedStream → DataStream AllWindowedStream → DataStream |
Applies a general function to the window as a whole. Below is a function that manually sums the elements of a window. Note: If you are using a windowAll transformation, you need to use an AllWindowFunction instead.
|
Window Reduce WindowedStream → DataStream |
Applies a functional reduce function to the window and returns the reduced value.
|
Window Fold WindowedStream → DataStream |
Applies a functional fold function to the window and returns the folded value.
|
Aggregations on windows WindowedStream → DataStream |
Aggregates the contents of a window. The difference between min and minBy is that min returns the minimun value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).
|
Union DataStream* → DataStream |
Union of two or more data streams creating a new stream containing all the elements from all the streams. Node: If you union a data stream with itself you will still only get each element once.
|
Window Join DataStream,DataStream → DataStream |
Join two data streams on a given key and a common window.
|
Window CoGroup DataStream,DataStream → DataStream |
Cogroups two data streams on a given key and a common window.
|
Connect DataStream,DataStream → ConnectedStreams |
"Connects" two data streams retaining their types, allowing for shared state between the two streams.
|
CoMap, CoFlatMap ConnectedStreams → DataStream |
Similar to map and flatMap on a connected data stream
|
Split DataStream → SplitStream |
Split the stream into two or more streams according to some criterion.
|
Select SplitStream → DataStream |
Select one or more streams from a split stream.
|
Iterate DataStream → IterativeStream → DataStream |
Creates a "feedback" loop in the flow, by redirecting the output of one operator to some previous operator. This is especially useful for defining algorithms that continuously update a model. The following code starts with a stream and applies the iteration body continuously. Elements that are greater than 0 are sent back to the feedback channel, and the rest of the elements are forwarded downstream. See iterations for a complete description.
|
Extract Timestamps DataStream → DataStream |
Extracts timestamps from records in order to work with windows that use event time semantics. See working with time.
|
The following transformations are available on data streams of Tuples:
Transformation | Description |
---|---|
Project DataStream → DataStream |
Selects a subset of fields from the tuples
|
Transformation | Description |
---|---|
Project DataStream → DataStream |
Selects a subset of fields from the tuples
|
Flink 也通过以下Functions对transformation后的流进行底层的控制(如果需要的话)
Transformation | Description |
---|---|
Hash partitioning DataStream → DataStream |
对keyBy后相同key的流返回DataStream,而不是KeyedStream
|
Custom partitioning DataStream → DataStream |
通过使用用户自定义的分区规则来给每个元素选择目标task
|
Random partitioning DataStream → DataStream |
随机均匀将将元素分区
|
Rebalancing (Round-robin partitioning) DataStream → DataStream |
循环对元素分区,对每个分区进行负载均衡,用于优化处理数据倾斜的情况
|
Broadcasting DataStream → DataStream |
将每个元素以广播形式发送到每一个分区
|
Transformation | Description |
---|---|
Hash partitioning DataStream → DataStream |
Identical to keyBy but returns a DataStream instead of a KeyedStream.
|
Custom partitioning DataStream → DataStream |
Uses a user-defined Partitioner to select the target task for each element.
|
Random partitioning DataStream → DataStream |
Partitions elements randomly according to a uniform distribution.
|
Rebalancing (Round-robin partitioning) DataStream → DataStream |
Partitions elements round-robin, creating equal load per partition. Useful for performance optimization in the presence of data skew.
|
Broadcasting DataStream → DataStream |
Broadcasts elements to every partition.
|
Task chain 和资源组
chain两个接下来的transformations 的意思是将他们放在同一个线程里面执行,这会工作得更好。默认情况下,flink尽可能地将操作进行chain(例如:接下来的2个map transformation)。API提供了对chain的细粒度控制:
如果你想在整个job中禁用chain,那你可以使用StreamExecutionEnvironment.disableOperatorChaining()
。接下来的functions会提供更细粒度的控制。注意,这些functions只能用在transformation之后的DataStream中。比如,你可以这样:someStream.map(...).startNewChain(),但是不能这样:someStream.startNewChain()。
一个资源组就是Flink里的一个slot,参考slots,根据需要,你可以在对每一个slot进行手动隔离操作
Transformation | Description |
---|---|
Start new chain |
使用如下操作开始新的chain,以下两个map将会被chain,但filter不会和第一个map进行chain操作
|
Disable chaining |
禁用对这个map操作的chain操作
|
Start a new resource group |
启动一个新的包含这个map和后续一些操作的资源组
|
Isolate resources |
对slot进行分离操作
|
Transformation | Description |
---|---|
Start new chain |
Begin a new chain, starting with this operator. The two mappers will be chained, and filter will not be chained to the first mapper.
|
Disable chaining |
Do not chain the map operator
|
Start a new resource group |
Start a new resource group containing the map and the subsequent operators.
|
Isolate resources |
Isolate the operator in its own slot.
|
keyBy transformation 需要指定的键已经在DataStream上定义。
使用方式如下
DataStream<...> input = // [...] DataStream<...> windowed = input .keyBy(/*define key here*/) .window(/*define window here*/);
Flink的数据模型不是基于键值对的。因此,你不需要将dataStream转换为键值类型。“键”是虚拟的:他们就像function一样定义在数据流之上用来指导分组操作
参见 the relevant section of the DataSet API documentation来了解怎样去指定键。仅仅只需要将DataSet替换为DataStream和groupBy
withkeyBy
.
一些transformations以用户自定义的额functions为参数
See the relevant section of the DataSet API documentation.
Flink places some restrictions on the type of elements that are used in DataStreams and in results of transformations. The reason for this is that the system analyzes the types to determine efficient execution strategies.
See the relevant section of the DataSet API documentation.
数据源可以通过StreamExecutionEnvironment.addSource(sourceFunction)
. 来创建,你可以直接使用Flink自带的functions来创建,同时也能够编写实现了 SourceFuntion 、theParallelSourceFunction
接口的类来定义自己的non-parallel的数据源,或编写继承RichParallelSourceFunction的类来定义parallel的自己的数据源。
在StreamExecutionEnvironment中已经有几个定义好的数据源可用了:
基于文件的:
readTextFile(path)
/ TextInputFormat
- Reads files line wise and returns them as Strings.
readTextFileWithValue(path)
/ TextValueInputFormat
- Reads files line wise and returns them as StringValues. StringValues are mutable strings.
readFile(path)
/ Any input format - Reads files as dictated by the input format.
readFileOfPrimitives(path, Class)
/ PrimitiveInputFormat
- Parses files of new-line (or another char sequence) delimited primitive data types such as String
or Integer
.
readFileStream
- create a stream by appending elements when there are changes to a file
基于Socket的:
socketTextStream
- Reads from a socket. Elements can be separated by a delimiter.基于集合的:
fromCollection(Collection)
- Creates a data stream from the Java Java.util.Collection. All elements in the collection must be of the same type.
fromCollection(Iterator, Class)
- Creates a data stream from an iterator. The class specifies the data type of the elements returned by the iterator.
fromElements(T ...)
- Creates a data stream from the given sequence of objects. All objects must be of the same type.
fromParallelCollection(SplittableIterator, Class)
- Creates a data stream from an iterator, in parallel. The class specifies the data type of the elements returned by the iterator.
generateSequence(from, to)
- Generates the sequence of numbers in the given interval, in parallel.
自定义:
addSource
- Attache a new source function. For example, to read from Apache Kafka you can use addSource(new FlinkKafkaConsumer082<>(...))
. See connectors for more details.
Sources can by created by using StreamExecutionEnvironment.addSource(sourceFunction)
. You can either use one of the source functions that come with Flink or write a custom source by implementing the SourceFunction
for non-parallel sources, or by implementing theParallelSourceFunction
interface or extending RichParallelSourceFunction
for parallel sources.
There are several predefined stream sources accessible from the StreamExecutionEnvironment
:
File-based:
readTextFile(path)
/ TextInputFormat
- Reads files line wise and returns them as Strings.
readTextFileWithValue(path)
/ TextValueInputFormat
- Reads files line wise and returns them as StringValues. StringValues are mutable strings.
readFile(path)
/ Any input format - Reads files as dictated by the input format.
readFileOfPrimitives(path, Class)
/ PrimitiveInputFormat
- Parses files of new-line (or another char sequence) delimited primitive data types such as String
or Integer
.
readFileStream
- create a stream by appending elements when there are changes to a file
Socket-based:
socketTextStream
- Reads from a socket. Elements can be separated by a delimiter.Collection-based:
fromCollection(Seq)
- Creates a data stream from the Java Java.util.Collection. All elements in the collection must be of the same type.
fromCollection(Iterator)
- Creates a data stream from an iterator. The class specifies the data type of the elements returned by the iterator.
fromElements(elements: _*)
- Creates a data stream from the given sequence of objects. All objects must be of the same type.
fromParallelCollection(SplittableIterator)
- Creates a data stream from an iterator, in parallel. The class specifies the data type of the elements returned by the iterator.
generateSequence(from, to)
- Generates the sequence of numbers in the given interval, in parallel.
Custom:
addSource
- Attache a new source function. For example, to read from Apache Kafka you can use addSource(new FlinkKafkaConsumer082<>(...))
. See connectors for more details.StreamExecutionEnvironment提供ExecutionConfig来设置job的运行时配置
See the relevant section of the DataSet API documentation.
以下是对应API:
enableTimestamps()
/ disableTimestamps()
: 给从源头发出来的event加上一个时间戳. areTimestampsEnabled()
.返回现在的时间
setAutoWatermarkInterval(long milliseconds)
: Set the interval for automatic watermark emission.设置Watermarks 发送的时间间隔。可以通过getAutoWatermarkInterval()来获的目前的间隔。
Flink可以将数据输出到文件、socket,外部系统,或者打印出来.Flink自带多种内置的输出形式:
writeAsText()
/ TextOuputFormat
- Writes elements line-wise as Strings. The Strings are obtained by calling the toString() method of each element.
writeAsCsv(...)
/ CsvOutputFormat
- Writes tuples as comma-separated value files. Row and field delimiters are configurable. The value for each field comes from the toString() method of the objects.
print()
/ printToErr()
- Prints the toString() value of each element on the standard out / strandard error stream. Optionally, a prefix (msg) can be provided which is prepended to the output. This can help to distinguish between different calls to print. If the parallelism is greater than 1, the output will also be prepended with the identifier of the task which produced the output.
write()
/ FileOutputFormat
- Method and base class for custom file outputs. Supports custom object-to-bytes conversion.
writeToSocket
- Writes elements to a socket according to a SerializationSchema
addSink
- Invokes a custom sink function. Flink comes bundled with connectors to other systems (such as Apache Kafka) that are implemented as sink functions.
Data sinks consume DataStreams and forward them to files, sockets, external systems, or print them. Flink comes with a variety of built-in output formats that are encapsulated behind operations on the DataStreams:
writeAsText()
/ TextOuputFormat
- Writes elements line-wise as Strings. The Strings are obtained by calling the toString() method of each element.
writeAsCsv(...)
/ CsvOutputFormat
- Writes tuples as comma-separated value files. Row and field delimiters are configurable. The value for each field comes from the toString() method of the objects.
print()
/ printToErr()
- Prints the toString() value of each element on the standard out / strandard error stream. Optionally, a prefix (msg) can be provided which is prepended to the output. This can help to distinguish between different calls to print. If the parallelism is greater than 1, the output will also be prepended with the identifier of the task which produced the output.
write()
/ FileOutputFormat
- Method and base class for custom file outputs. Supports custom object-to-bytes conversion.
writeToSocket
- Writes elements to a socket according to a SerializationSchema
addSink
- Invokes a custom sink function. Flink comes bundled with connectors to other systems (such as Apache Kafka) that are implemented as sink functions.
Before running a streaming program in a distributed cluster, it is a good idea to make sure that the implemented algorithm works as desired. Hence, implementing data analysis programs is usually an incremental process of checking results, debugging, and improving.
Flink provides features to significantly ease the development process of data analysis programs by supporting local debugging from within an IDE, injection of test data, and collection of result data. This section give some hints how to ease the development of Flink programs.
A LocalStreamEnvironment
starts a Flink system within the same JVM process it was created in. If you start the LocalEnvironement from an IDE, you can set breakpoints in your code and easily debug your program.
A LocalEnvironment is created and used as follows:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); DataStream<String> lines = env.addSource(/* some source */); // build your program env.execute();
val env = StreamExecutionEnvironment.createLocalEnvironment() val lines = env.addSource(/* some source */) // build your program env.execute()
Flink provides special data sources which are backed by Java collections to ease testing. Once a program has been tested, the sources and sinks can be easily replaced by sources and sinks that read from / write to external systems.
Collection data sources can be used as follows:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); // Create a DataStream from a list of elements DataStream<Integer> myInts = env.fromElements(1, 2, 3, 4, 5); // Create a DataStream from any Java collection List<Tuple2<String, Integer>> data = ... DataStream<Tuple2<String, Integer>> myTuples = env.fromCollection(data); // Create a DataStream from an Iterator Iterator<Long> longIt = ... DataStream<Long> myLongs = env.fromCollection(longIt, Long.class);
val env = StreamExecutionEnvironment.createLocalEnvironment() // Create a DataStream from a list of elements val myInts = env.fromElements(1, 2, 3, 4, 5) // Create a DataStream from any Collection val data: Seq[(String, Int)] = ... val myTuples = env.fromCollection(data) // Create a DataStream from an Iterator val longIt: Iterator[Long] = ... val myLongs = env.fromCollection(longIt)
Note: Currently, the collection data source requires that data types and iterators implement Serializable
. Furthermore, collection data sources can not be executed in parallel ( parallelism = 1).
时间窗口,通常是指一定时间内的一组event的组合。 时间窗口函数定义了时间的类型,目前支持三种不同的时间窗口:
Processing time: Processing time是指当transformation发生时候的机器的时间, Processing time 是最简单的时间类型,也是性能最高的。. 但是,在分布式和异步环境下,机器时间,往往不一致和有很多不确定性。
Event time: Event time是指每个event发生的时间 。 这份时间一般是当消息进入flink前,消息本身自带的。或者从消息的某个字段中抽取出来. 当使用event time的情况下,乱序的消息可以被适当的处理。. 举个例子, 在12分的时间窗口里,当一个10分钟的event在12分钟的时候到达了,transformation也会正确的处理这些乱了序的event。. Event time 的处理方式提供了可预测的结果。 , 但会带来更多的延迟, 因为乱序的消息需要被缓存起来到内存里。
Ingestion time: Ingestion(食入,摄取) time 是当event进入到flink的时间。.当event消息进入到flink被分配到的task所在的机器上的时间,作为分配给event的时间,. Ingestion time比 processing time更有确定性和可预测性, 比event time有更低的延迟。因为不依赖外部系统。因此, Ingestion time 提供了一种处于两者之间的解决方案。 Ingestion time 其实可以说是 event time的一种特殊情况,实际上,ingestion time 和eventtime在flink的底层中的处理方式是一样的。
当使用 event time时, transformations需要避免无限的等待event到达,Watermarks 提供了一种控制event time的偏移时间的机制。Watermarks是由 sources发射出来的. 一个watermark 带有一个确定的时间戳(long),比如转换后是2015-12-03 14:17:30 ,则表示,不会再有比这个时间更早的时间的消息会到达。
你可以通过下面的方式,选择你需要的时间语义。
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
默认情况是TimeCharacteristic.ProcessingTime
, 写一个processing time的语义的程序是不需要,再做其他事情。
如果要写一个event time语义的程序 , 需要做下面4个步骤:
1:设置event time的语义 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
2:使用DataStream.assignTimestamps(...)
告诉flink,时间戳和event的关联。比如说,第几个字段是时间戳。
设置 让时间戳有效,enableTimestamps()
, 还有watermark的发射间隔。(setAutoWatermarkInterval(long milliseconds)
) inExecutionConfig
.
举个例子, 假设 我们有一个tuple的数据流, 并且里面的第一个字段是时间戳 (产生这些消息的系统赋予的,非flink), 并且我们知道处理时间和这个时间的落差不会超过1秒。
DataStream<Tuple4<Long,Integer,Double,String>> stream = //... stream.assignTimestamps(new TimestampExtractor<Tuple4<Long,Integer,Double,String>>{ @Override public long extractTimestamp(Tuple4<Long,Integer,Double,String> element, long currentTimestamp) { return element.f0; } @Override public long extractWatermark(Tuple4<Long,Integer,Double,String> element, long currentTimestamp) { return element.f0 - 1000; } @Override public long getCurrentWatermark() { return Long.MIN_VALUE; } });
val stream: DataStream[(Long,Int,Double,String)] = null; stream.assignTimestampts(new TimestampExtractor[(Long, Int, Double, String)] { override def extractTimestamp(element: (Long, Int, Double, String), currentTimestamp: Long): Long = element._1 override def extractWatermark(element: (Long, Int, Double, String), currentTimestamp: Long): Long = element._1 - 1000 override def getCurrentWatermark: Long = Long.MinValue })
如果你确定,你的时间戳一定是升序的,按顺序到达,你可以使用 AscendingTimestampExtractor
, 系统会自动的发射watermark:
DataStream<Tuple4<Long,Integer,Double,String>> stream = //... stream.assignTimestamps(new AscendingTimestampExtractor<Tuple4<Long,Integer,Double,String>>{ @Override public long extractAscendingTimestamp(Tuple4<Long,Integer,Double,String> element, long currentTimestamp) { return element.f0; } });
stream.extractAscendingTimestamp(record => record._1)
使用 ingestion time 语义,你需要:
1:env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
.
你可以想象一下,这个设置,其实是是event time的简写。因为source根据当前的机器时间,flink注入和发射都是在flink做的,所以flink可以推断后面的那些参数,所以自动做的。
Flink提供了好多方法,为KeyedStream定义windowon . 每个window 包含了同样的key的元素 。
Flink提供具有灵活性的通用窗口和普通情况下能通用的一些数据。在你需要自定义时间窗口之前,先看看这些预定义的能不能使用吧。
Transformation | Description |
---|---|
跳动时间 window KeyedStream → WindowedStream |
定义一个5秒跳动的窗口. 表示根据元素的时间戳,5秒为一个单位组织起来的窗口, 并且每个元素只会在一个窗口中出现一次。时间戳根据上面的env设置的语义而定。
|
滑动时间window KeyedStream → WindowedStream |
定义一个5秒的窗口, 1秒滑动一次. 表示根据元素的时间戳,5秒为一个单位组织起来的窗口 ,但是每个元素可能在多个窗口中出现多次。
|
跳动个数 window KeyedStream → WindowedStream |
1000个一个单位的窗口,一个元素只会出现一次
|
滑动个数 window KeyedStream → WindowedStream |
1000个一个单位的窗口,一个元素可能出现多次
|
Transformation | Description |
---|---|
Tumbling time window KeyedStream → WindowedStream |
Defines a window of 5 seconds, that "tumbles". This means that elements are grouped according to their timestamp in groups of 5 second duration, and every element belongs to exactly one window. The notion of time is specified by the selected TimeCharacteristic (see time).
|
Sliding time window KeyedStream → WindowedStream |
Defines a window of 5 seconds, that "slides" by 1 seconds. This means that elements are grouped according to their timestamp in groups of 5 second duration, and elements can belong to more than one window (since windows overlap by at most 4 seconds) The notion of time is specified by the selected TimeCharacteristic (see time).
|
Tumbling count window KeyedStream → WindowedStream |
Defines a window of 1000 elements, that "tumbles". This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element belongs to exactly one window.
|
Sliding count window KeyedStream → WindowedStream |
Defines a window of 1000 elements, that "slides" every 100 elements. This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element can belong to more than one window (as windows overlap by at most 900 elements).
|
这个机制可以定义出功能更丰富的窗口,相反的需要写更多的代码。 举个例子,下面是一个自定义的窗口,每个window持有最新的5秒并且每1秒滑动一次。 但是,当100个元素被添加到window后,window的execution函数,会被跟踪(触发)。之后每一次execution执行都会被跟踪(触发)。 window会保留10个元素:
keyedStream .window(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS)) .trigger(Count.of(100)) .evictor(Count.of(10));
keyedStream .window(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS)) .trigger(Count.of(100)) .evictor(Count.of(10))
构造一个自定义窗口的一般方式是,
(1)指定一个WindowAssigner
,
(2)指定 一个触发器Trigger
(optionally),
(3)指定一个逐出器Evictor
(optionally).
WindowAssigner定义了如何组织一个窗口
(时间或者个数) 一个window 元素的逻辑组合,有一个begin value,和一个end value。相应的,有一个begin time和end time. 带有时间戳的元素 。
举个例子,滑动时间窗口分配器,定义了5秒为一个单位,每1秒滑动一次,假设,以毫秒为单位,时间从0毫秒开始,然后我们有6个窗口: [0,5000], [1000,6000], [2000,7000], [3000, 8000], [4000, 9000], and [5000, 10000]. 每个进来的元素,根据他们的时间戳,被分配到这6个窗口中,可能出现在多个窗口里,比如带有2000 时间戳的元素,会被分配到前三个窗口。Flink 运行,会绑定在对应的窗口分配器,可以覆盖更多的场景. 你可以自定义你的window类型,通过继承WindowAssigner
类。
Transformation | Description |
---|---|
Global window KeyedStream → WindowedStream |
所有进来的元素,按key分组,每个组放在相同的window里。这些window没有默认的trigger,因此如果没有自定义trigger的话,这些数据是不会被trigger触发的
|
Tumbling time windows KeyedStream → WindowedStream |
所有进来的元素,根据元素各自的时间戳被分配到一个window里,windows之间不交叉, 每个元素最多只会出现在一个window里一次。 The window 有一个默认的 trigger. 针对event/ingestion time这两钟语义, 当收到一个高于自己的end value的watermark, window就会 触发。, 而对于 processing time 当前的processing time 超过他的current end value.
|
Sliding time windows KeyedStream → WindowedStream |
所有进来的元素,根据元素各自的时间戳被分配到一个window里,windows之间可能会产生交叉, 。The window 有一个默认的 trigger. 针对event/ingestion time这两钟语义, 当收到一个高于自己的end value的watermark, window就会 触发,而对于 processing time 触发条件则是当前的processing time 超过他的current end value.
|
Transformation | Description |
---|---|
Global window KeyedStream → WindowedStream |
All incoming elements of a given key are assigned to the same window. The window does not contain a default trigger, hence it will never be triggered if a trigger is not explicitly specified.
|
Tumbling time windows KeyedStream → WindowedStream |
Incoming elements are assigned to a window of a certain size (1 second below) based on their timestamp. Windows do not overlap, i.e., each element is assigned to exactly one window. The notion of time is specified by the selected TimeCharacteristic (see time). The window comes with a default trigger. For event/ingestion time, a window is triggered when a watermark with value higher than its end-value is received, whereas for processing time when the current processing time exceeds its current end value.
|
Sliding time windows KeyedStream → WindowedStream |
Incoming elements are assigned to a window of a certain size (5 seconds below) based on their timestamp. Windows "slide" by the provided value (1 second in the example), and hence overlap. The window comes with a default trigger. For event/ingestion time, a window is triggered when a watermark with value higher than its end-value is received, whereas for processing time when the current processing time exceeds its current end value.
|
Trigger定义了,跟在每一个的window后面的函数(sum,count),什么时候evaluated (“fires”)。如果没有指定trigger,就使用默认的trigger
. Flink 自带了一组trigger,如果默认的trigger都没法满足你的应用,可以通过实现Trigger接口实现自己的trigger
. 注意,如果使用自定义trigger后,会覆盖默认的trigger.
Transformation | Description |
---|---|
Processing time trigger |
当前的处理时间超过他的end-value时,则发射一个window,从此之后,被跟踪的window上的元素就会被丢弃。
|
Watermark trigger |
当接收到一个超过end value的watermark时,则发射一个window。被跟踪的window上的元素就会被丢弃。
|
Continuous processing time trigger |
每个being fire 的 window会定期的考虑if()。当当前时间超过他的end-value的时候,才会真正发射,被触发的窗口里的函数将会保留。
|
Continuous watermark time trigger |
每个being fire 的 window会定期的考虑if()。当watermark时间超过他的end-value的时候,才会真正发射,被触发的窗口里的函数将会保留。
|
Count trigger |
超过1000个元素后,这个窗口就会被发射,处于准备发射状态的窗口里的元素,将会被保留。
|
Purging trigger |
Takes any trigger as an argument and forces the triggered window elements to be "purged" (discarded) after triggering.
|
Delta trigger |
每个being fire 的 window会定期的考虑if()。当最后一个元素和第一个插入的元素运算后满足true的时候,才会真正发射。
|
Transformation | Description |
---|---|
Processing time trigger |
A window is fired when the current processing time exceeds its end-value. The elements on the triggered window are henceforth discarded.
|
Watermark trigger |
A window is fired when a watermark with value that exceeds the window‘s end-value has been received. The elements on the triggered window are henceforth discarded.
|
Continuous processing time trigger |
A window is periodically considered for being fired (every 5 seconds in the example). The window is actually fired only when the current processing time exceeds its end-value. The elements on the triggered window are retained.
|
Continuous watermark time trigger |
A window is periodically considered for being fired (every 5 seconds in the example). A window is actually fired when a watermark with value that exceeds the window‘s end-value has been received. The elements on the triggered window are retained.
|
Count trigger |
A window is fired when it has more than a certain number of elements (1000 below). The elements of the triggered window are retained.
|
Purging trigger |
Takes any trigger as an argument and forces the triggered window elements to be "purged" (discarded) after triggering.
|
Delta trigger |
A window is periodically considered for being fired (every 5000 milliseconds in the example). A window is actually fired when the value of the last added element exceeds the value of the first element inserted in the window according to a `DeltaFunction`.
|
当trigger进行了fire之后, 并且执行sum和count之前, 有一个可选的逐出器可以移除保留元素。. Flink 自带了一组evictors ,你还可以通过实现Evictor接口,实现自定义的逐出器。
.
Transformation | Description |
---|---|
Time evictor |
从window的begin处开始移除元素,知道最后剩下 end value -1秒到 end value的元素。
|
Count evictor |
保留倒数的最后1000 元素,其他的丢掉。
|
Delta evictor |
从window的begin开始,一直丢元素,知道某个元素,8比最后一个元素15小5。(通过一个阈值5 ,和一个函数).
|
Transformation | Description |
---|---|
Time evictor |
Evict all elements from the beginning of the window, so that elements from end-value - 1 second until end-value are retained (the resulting window size is 1 second).
|
Count evictor |
Retain 1000 elements from the end of the window backwards, evicting all others.
|
Delta evictor |
Starting from the beginning of the window, evict elements until an element with value lower than the value of the last element is found (by a threshold and a DeltaFunction).
|
window 分配器,trigger,evictor的机制都很强大。这些机制让你可以定义各种不同类型的window。Flink’s的基本window其实是在这三个机制之上包了一层的,.下面是一些通用的端口是如何通过这三种机制来构造的 。
Window type | Definition |
---|---|
Tumbling count window
|
|
Sliding count window
|
|
Tumbling event time window
|
|
Sliding event time window
|
|
Tumbling processing time window
|
|
Sliding processing time window
|
|
You也可以对普通流(stream,之前都是keyedStream)定义窗口。通过调用 the windowAll
这个transformation.这个stream 包含了所有keyed的stream, but 在一个单独的task里evaluated (在一个单独的计算节点上). 定义trigger和evictor的语法是一样的:
nonKeyedStream .windowAll(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS)) .trigger(Count.of(100)) .evictor(Count.of(10));
nonKeyedStream .windowAll(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS)) .trigger(Count.of(100)) .evictor(Count.of(10))
基本的window 定义也适用于普通的nokey的windows:
Transformation | Description |
---|---|
Tumbling time window all DataStream → WindowedStream |
Defines a window of 5 seconds, that "tumbles". This means that elements are grouped according to their timestamp in groups of 5 second duration, and every element belongs to exactly one window. The notion of time used is controlled by the StreamExecutionEnvironment.
|
Sliding time window all DataStream → WindowedStream |
Defines a window of 5 seconds, that "slides" by 1 seconds. This means that elements are grouped according to their timestamp in groups of 5 second duration, and elements can belong to more than one window (since windows overlap by at least 4 seconds) The notion of time used is controlled by the StreamExecutionEnvironment.
|
Tumbling count window all DataStream → WindowedStream |
Defines a window of 1000 elements, that "tumbles". This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element belongs to exactly one window.
|
Sliding count window all DataStream → WindowedStream |
Defines a window of 1000 elements, that "slides" every 100 elements. This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element can belong to more than one window (as windows overlap by at least 900 elements).
|
Transformation | Description |
---|---|
Tumbling time window all DataStream → WindowedStream |
Defines a window of 5 seconds, that "tumbles". This means that elements are grouped according to their timestamp in groups of 5 second duration, and every element belongs to exactly one window. The notion of time used is controlled by the StreamExecutionEnvironment.
|
Sliding time window all DataStream → WindowedStream |
Defines a window of 5 seconds, that "slides" by 1 seconds. This means that elements are grouped according to their timestamp in groups of 5 second duration, and elements can belong to more than one window (since windows overlap by at least 4 seconds) The notion of time used is controlled by the StreamExecutionEnvironment.
|
Tumbling count window all DataStream → WindowedStream |
Defines a window of 1000 elements, that "tumbles". This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element belongs to exactly one window.
|
Sliding count window all DataStream → WindowedStream |
Defines a window of 1000 elements, that "slides" every 100 elements. This means that elements are grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, and every element can belong to more than one window (as windows overlap by at least 900 elements).
|
Flink有检查点机制来恢复出现问题的job,此机制需要对信息进行持久化来支持之后需要的时候进行再次访问。
检查点机制会保存数据源的处理进度和用户自定义的状态来提供一次性处理语义
可以在StreamExecutionEnvironment上调用enableCheckpointing(n)来启用检查点机制,n代表检查点时间片的秒数
Other parameters for checkpointing include:
检查点中的的其它参数:
setNumberOfExecutionRerties()
这个方法返回这个job在失败后已经重启的次数。检查点启用打但是这个值没有显示设置的情况下,job会无限次地重新启动。docs on streaming fault tolerance 详细介绍了Flink的容错机制
当源添加了快照机制后Flink可以确保将一次处理语义更新为用户自定义的状态。现在只支持源头是kafka(和一些内置的数据生成器)。The following table lists the state update guarantees of Flink coupled with the bundled sources:
Source | Guarantees | Notes |
---|---|---|
Apache Kafka | exactly once | Use the appropriate Kafka connector for your version |
RabbitMQ | at most once | |
Twitter Streaming API | at most once | |
Collections | at most once | |
Files | at least once | At failure the file will be read from the beginning |
Sockets | at most once |
To guarantee end-to-end exactly-once record delivery (in addition to exactly-once updates), the data sink needs to take part in the snapshotting mechanism. The following table lists the delivery guarantees (assuming exactly-once state updates) of Flink coupled with bundled sinks:
Sink | Guarantees | Notes |
---|---|---|
HDFS rolling sink | exactly once | Implementation depends on Hadoop version |
Elasticsearch | at least once | |
Kafka producer | at least once | |
File sinks | at least once | |
Socket sinks | at lest once | |
Standard output | at least once |
You can control the number of parallel instances created for each operator by calling the operator.setParallelism(int)
method.
By default, elements are not transferred on the network one-by-one (which would cause unnecessary network traffic) but are buffered. The size of the buffers (which are actually transferred between machines) can be set in the Flink config files. While this method is good for optimizing throughput, it can cause latency issues when the incoming stream is not fast enough. To control throughput and latency, you can useenv.setBufferTimeout(timeoutMillis)
on the execution environment (or on individual operators) to set a maximum wait time for the buffers to fill up. After this time, the buffers are sent automatically even if they are not full. The default value for this timeout is 100 ms.
Usage:
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); env.setBufferTimeout(timeoutMillis); env.genereateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment env.setBufferTimeout(timeoutMillis) env.genereateSequence(1,10).map(myMap).setBufferTimeout(timeoutMillis)
To maximize throughput, set setBufferTimeout(-1)
which will remove the timeout and buffers will only be flushed when they are full. To minimize latency, set the timeout to a value close to 0 (for example 5 or 10 ms). A buffer timeout of 0 should be avoided, because it can cause severe performance degradation.
All transformations in Flink may look like functions (in the functional processing terminology), but are in fact stateful operators. You can make everytransformation (map
, filter
, etc) stateful by declaring local variables or using Flink’s state interface. You can register any local variable as managedstate by implementing an interface. In this case, and also in the case of using Flink’s native state interface, Flink will automatically take consistent snapshots of your state periodically, and restore its value in the case of a failure.
The end effect is that updates to any form of state are the same under failure-free execution and execution under failures.
First, we look at how to make local variables consistent under failures, and then we look at Flink’s state interface.
By default state checkpoints will be stored in-memory at the JobManager. For proper persistence of large state, Flink supports storing the checkpoints on file systems (HDFS, S3, or any mounted POSIX file system), which can be configured in the flink-conf.yaml
or viaStreamExecutionEnvironment.setStateBackend(…)
.
Local variables can be checkpointed by using the Checkpointed
interface.
When the user-defined function implements the Checkpointed
interface, the snapshotState(…)
and restoreState(…)
methods will be executed to draw and restore function state.
In addition to that, user functions can also implement the CheckpointNotifier
interface to receive notifications on completed checkpoints via thenotifyCheckpointComplete(long checkpointId)
method. Note that there is no guarantee for the user function to receive a notification if a failure happens between checkpoint completion and notification. The notifications should hence be treated in a way that notifications from later checkpoints can subsume missing notifications.
For example the same counting, reduce function shown for OperatorState
s by using the Checkpointed
interface instead:
public class CounterSum implements ReduceFunction<Long>, Checkpointed<Long> {
// persistent counterprivatelong counter =0; @Override public Long reduce(Long value1, Long value2){ counter++;return value1 + value2;}// regularly persists state during normal operation @Override public Serializable snapshotState(long checkpointId,long checkpointTimestamp){return counter;}// restores state on recovery from failure @Override publicvoidrestoreState(Long state){ counter = state;}}
The state interface gives access to key/value states, which are a collection of key/value pairs. Because the state is partitioned by the keys (distributed accross workers), it can only be used on the KeyedStream
, created via stream.keyBy(…)
(which means also that it is usable in all types of functions on keyed windows).
The handle to the state can be obtained from the function’s RuntimeContext
. The state handle will then give access to the value mapped under the key of the current record or window - each key consequently has its own value.
The following code sample shows how to use the key/value state inside a reduce function. When creating the state handle, one needs to supply a name for that state (a function can have multiple states of different types), the type of the state (used to create efficient serializers), and the default value (returned as a value for keys that do not yet have a value associated).
public class CounterSum implements RichReduceFunction<Long> { /** The state handle */ private OperatorState<Long> counter; @Override public Long reduce(Long value1, Long value2) { counter.update(counter.value() + 1); return value1 + value2; } @Override public void open(Configuration config) { counter = getRuntimeContext().getKeyValueState("myCounter", Long.class, 0L); } }
State updated by this is usually kept locally inside the flink process (unless one configures explicitly an external state backend). This means that lookups and updates are process local and this very fast.
The important implication of having the keys set implicitly is that it forces programs to group the stream by key (via the keyBy()
function), making the key partitioning transparent to Flink. That allows the system to efficiently restore and redistribute keys and state.
The Scala API has shortcuts that for stateful map()
or flatMap()
functions on KeyedStream
, which give the state of the current key as an option directly into the function, and return the result with a state update:
val stream: DataStream[(String, Int)] = ... val counts: DataStream[(String, Int)] = stream .keyBy(_._1) .mapWithState((in: (String, Int), count: Option[Int]) => count match { case Some(c) => ( (in._1, c), Some(c + in._2) ) case None => ( (in._1, 0), Some(in._2) ) })
Stateful sources require a bit more care as opposed to other operators. In order to make the updates to the state and output collection atomic (required for exactly-once semantics on failure/recovery), the user is required to get a lock from the source’s context.
Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the flink.streaming.api.checkpoint.CheckpointNotifier
interface.
Flink currently only provides processing guarantees for jobs without iterations. Enabling checkpointing on an iterative job causes an exception. In order to force checkpointing on an iterative program the user needs to set a special flag when enabling checkpointing:env.enableCheckpointing(interval, force = true)
.
Please note that records in flight in the loop edges (and the state changes associated with them) will be lost during failure.
Iterative streaming programs implement a step function and embed it into an IterativeStream
. As a DataStream program may never finish, there is no maximum number of iterations. Instead, you need to specify which part of the stream is fed back to the iteration and which part is forwarded downstream using a split
transformation or a filter
. Here, we show an example using filters. First, we define an IterativeStream
IterativeStream<Integer> iteration = input.iterate();
Then, we specify the logic that will be executed inside the loop using a series of trasformations (here a simple map
transformation)
DataStream<Integer> iterationBody = iteration.map(/* this is executed many times */);
To close an iteration and define the iteration tail, call the closeWith(feedbackStream)
method of the IterativeStream
. The DataStream given to the closeWith
function will be fed back to the iteration head. A common pattern is to use a filter to separate the part of the strem that is fed back, and the part of the stream which is propagated forward. These filters can, e.g., define the “termination” logic, where an element is allowed to propagate downstream rather than being fed back.
iteration.closeWith(iterationBody.filter(/* one part of the stream */)); DataStream<Integer> output = iterationBody.filter(/* some other part of the stream */);
By default the partitioning of the feedback stream will be automatically set to be the same as the input of the iteration head. To override this the user can set an optional boolean flag in the closeWith
method.
For example, here is program that continuously subtracts 1 from a series of integers until they reach zero:
DataStream<Long> someIntegers = env.generateSequence(0, 1000); IterativeStream<Long> iteration = someIntegers.iterate(); DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>() { @Override public Long map(Long value) throws Exception { return value - 1 ; } }); DataStream<Long> stillGreaterThanZero = minusOne.filter(new FilterFunction<Long>() { @Override public boolean filter(Long value) throws Exception { return (value > 0); } }); iteration.closeWith(stillGreaterThanZero); DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() { @Override public boolean filter(Long value) throws Exception { return (value <= 0); } });
Iterative streaming programs implement a step function and embed it into an IterativeStream
. As a DataStream program may never finish, there is no maximum number of iterations. Instead, you need to specify which part of the stream is fed back to the iteration and which part is forwarded downstream using a split
transformation or a filter
. Here, we show an example iteration where the body (the part of the computation that is repeated) is a simple map transformation, and the elements that are fed back are distinguished by the elements that are forwarded downstream using filters.
val iteratedStream = someDataStream.iterate( iteration => { val iterationBody = iteration.map(/* this is executed many times */) (tail.filter(/* one part of the stream */), tail.filter(/* some other part of the stream */)) })
By default the partitioning of the feedback stream will be automatically set to be the same as the input of the iteration head. To override this the user can set an optional boolean flag in the closeWith
method.
For example, here is program that continuously subtracts 1 from a series of integers until they reach zero:
val someIntegers: DataStream[Long] = env.generateSequence(0, 1000) val iteratedStream = someIntegers.iterate( iteration => { val minusOne = iteration.map( v => v - 1) val stillGreaterThanZero = minusOne.filter (_ > 0) val lessThanZero = minusOne.filter(_ <= 0) (stillGreaterThanZero, lessThanZero) } )
Connectors provide code for interfacing with various third-party systems.
Currently these systems are supported:
To run an application using one of these connectors, additional third party components are usually required to be installed and launched, e.g. the servers for the message queues. Further instructions for these can be found in the corresponding subsections. Docker containers are also provided encapsulating these services to aid users getting started with connectors.
This connector provides access to event streams served by Apache Kafka.
Flink provides special Kafka Connectors for reading and writing data to Kafka topics. The Flink Kafka Consumer integrates with Flink’s checkpointing mechanisms to provide different processing guarantees (most importantly exactly-once guarantees).
For exactly-once processing Flink can not rely on the auto-commit capabilities of the Kafka consumers. The Kafka consumer might commit offsets to Kafka which have not been processed successfully.
Please pick a package (maven artifact id) and class name for your use-case and environment. For most users, the flink-connector-kafka-083
package and the FlinkKafkaConsumer082
class are appropriate.
Package | Supported since | Class name | Kafka version | Checkpointing behavior | Notes |
---|---|---|---|---|---|
flink-connector-kafka | 0.9, 0.10 | KafkaSource | 0.8.1, 0.8.2 | Does not participate in checkpointing (no consistency guarantees) | Uses the old, high level KafkaConsumer API, autocommits to ZK via Kafka |
flink-connector-kafka | 0.9, 0.10 | PersistentKafkaSource | 0.8.1, 0.8.2 | Does not guarantee exactly-once processing, element order, or strict partition assignment | Uses the old, high level KafkaConsumer API, offsets are committed into ZK manually |
flink-connector-kafka-083 | 0.9.1, 0.10 | FlinkKafkaConsumer081 | 0.8.1 | Guarantees exactly-once processing | Uses the SimpleConsumer API of Kafka internally. Offsets are committed to ZK manually |
flink-connector-kafka-083 | 0.9.1, 0.10 | FlinkKafkaConsumer082 | 0.8.2 | Guarantee exactly-once processing | Uses the SimpleConsumer API of Kafka internally. Offsets are committed to ZK manually |
Then, import the connector in your maven project:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>1.0-SNAPSHOT</version> </dependency>
Note that the streaming connectors are currently not part of the binary distribution. See how to link with them for cluster execution here.
advertised.host.name
setting in theconfig/server.properties
file must be set to the machine’s IP address.The standard FlinkKafkaConsumer082
is a Kafka consumer providing access to one topic.
The following parameters have to be provided for the FlinkKafkaConsumer082(...)
constructor:
Example:
Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("zookeeper.connect", "localhost:2181"); properties.setProperty("group.id", "test"); DataStream<String> stream = env .addSource(new FlinkKafkaConsumer082<>("topic", new SimpleStringSchema(), properties)) .print();
val properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("zookeeper.connect", "localhost:2181"); properties.setProperty("group.id", "test"); stream = env .addSource(new KafkaSource[String]("topic", new SimpleStringSchema(), properties)) .print
As Kafka persists all the data, a fault tolerant Kafka consumer can be provided.
The FlinkKafkaConsumer082 can read a topic, and if the job fails for some reason, the source will continue on reading from where it left off after a restart. For example if there are 3 partitions in the topic with offsets 31, 122, 110 read at the time of job failure, then at the time of restart it will continue on reading from those offsets, no matter whether these partitions have new messages.
To use fault tolerant Kafka Consumers, checkpointing of the topology needs to be enabled at the execution environment:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000);
Also note that Flink can only restart the topology if enough processing slots are available to restart the topology. So if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards. Flink on YARN supports automatic restart of lost YARN containers.
A class providing an interface for sending data to Kafka.
The following arguments have to be provided for the KafkaSink(…)
constructor in order:
Example:
stream.addSink(new KafkaSink<String>("localhost:9092", "test", new SimpleStringSchema()));
stream.addSink(new KafkaSink[String]("localhost:9092", "test", new SimpleStringSchema))
The user can also define custom Kafka producer configuration for the KafkaSink with the constructor:
public KafkaSink(String zookeeperAddress, String topicId, Properties producerConfig, SerializationSchema<IN, byte[]> serializationSchema)
public KafkaSink(String zookeeperAddress, String topicId, Properties producerConfig, SerializationSchema serializationSchema)
If this constructor is used, the user needs to make sure to set the broker(s) with the “metadata.broker.list” property. Also the serializer configuration should be left default, and the serialization should be set via SerializationSchema.
The Apache Kafka official documentation can be found here.
This connector provides a Sink that can write to an Elasticsearch Index. To use this connector, add the following dependency to your project:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch</artifactId> <version>1.0-SNAPSHOT</version> </dependency>
Note that the streaming connectors are currently not part of the binary distribution. See here for information about how to package the program with the libraries for cluster execution.
Instructions for setting up an Elasticsearch cluster can be found here. Make sure to set and remember a cluster name. This must be set when creating a Sink for writing to your cluster
The connector provides a Sink that can send data to an Elasticsearch Index.
The sink can use two different methods for communicating with Elasticsearch:
See here for information about the differences between the two modes.
This code shows how to create a sink that uses an embedded Node for communication:
DataStream<String> input = ...; Map<String, String> config = Maps.newHashMap(); // This instructs the sink to emit after every element, otherwise they would be buffered config.put("bulk.flush.max.actions", "1"); config.put("cluster.name", "my-cluster-name"); input.addSink(new ElasticsearchSink<>(config, new IndexRequestBuilder<String>() { @Override public IndexRequest createIndexRequest(String element, RuntimeContext ctx) { Map<String, Object> json = new HashMap<>(); json.put("data", element); return Requests.indexRequest() .index("my-index") .type("my-type") .source(json); } }));
val input: DataStream[String] = ... val config = new util.HashMap[String, String] config.put("bulk.flush.max.actions", "1") config.put("cluster.name", "my-cluster-name") text.addSink(new ElasticsearchSink(config, new IndexRequestBuilder[String] { override def createIndexRequest(element: String, ctx: RuntimeContext): IndexRequest = { val json = new util.HashMap[String, AnyRef] json.put("data", element) println("SENDING: " + element) Requests.indexRequest.index("my-index").`type`("my-type").source(json) } }))
Note how a Map of Strings is used to configure the Sink. The configuration keys are documented in the Elasticsearch documentation here. Especially important is the cluster.name
parameter that must correspond to the name of your cluster.
Internally, the sink uses a BulkProcessor
to send index requests to the cluster. This will buffer elements before sending a request to the cluster. The behaviour of the BulkProcessor
can be configured using these config keys: * bulk.flush.max.actions: Maximum amount of elements to buffer * bulk.flush.max.size.mb: Maximum amount of data (in megabytes) to buffer * bulk.flush.interval.ms: Interval at which to flush data regardless of the other two settings in milliseconds
This example code does the same, but with a TransportClient
:
DataStream<String> input = ...; Map<String, String> config = Maps.newHashMap(); // This instructs the sink to emit after every element, otherwise they would be buffered config.put("bulk.flush.max.actions", "1"); config.put("cluster.name", "my-cluster-name"); List<TransportAddress> transports = new ArrayList<String>(); transports.add(new InetSocketTransportAddress("node-1", 9300)); transports.add(new InetSocketTransportAddress("node-2", 9300)); input.addSink(new ElasticsearchSink<>(config, transports, new IndexRequestBuilder<String>() { @Override public IndexRequest createIndexRequest(String element, RuntimeContext ctx) { Map<String, Object> json = new HashMap<>(); json.put("data", element); return Requests.indexRequest() .index("my-index") .type("my-type") .source(json); } }));
val input: DataStream[String] = ... val config = new util.HashMap[String, String] config.put("bulk.flush.max.actions", "1") config.put("cluster.name", "my-cluster-name") val transports = new ArrayList[String] transports.add(new InetSocketTransportAddress("node-1", 9300)) transports.add(new InetSocketTransportAddress("node-2", 9300)) text.addSink(new ElasticsearchSink(config, transports, new IndexRequestBuilder[String] { override def createIndexRequest(element: String, ctx: RuntimeContext): IndexRequest = { val json = new util.HashMap[String, AnyRef] json.put("data", element) println("SENDING: " + element) Requests.indexRequest.index("my-index").`type`("my-type").source(json) } }))
The difference is that we now need to provide a list of Elasticsearch Nodes to which the sink should connect using a TransportClient
.
More about information about Elasticsearch can be found here.
This connector provides a Sink that writes rolling files to any filesystem supported by Hadoop FileSystem. To use this connector, add the following dependency to your project:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-filesystem</artifactId> <version>1.0-SNAPSHOT</version> </dependency>
Note that the streaming connectors are currently not part of the binary distribution. See here for information about how to package the program with the libraries for cluster execution.
The rolling behaviour as well as the writing can be configured but we will get to that later. This is how you can create a default rolling sink:
DataStream<String> input = ...; input.addSink(new RollingSink<String>("/base/path"));
val input: DataStream[String] = ... input.addSink(new RollingSink("/base/path"))
The only required parameter is the base path where the rolling files (buckets) will be stored. The sink can be configured by specifying a custom bucketer, writer and batch size.
By default the rolling sink will use the pattern "yyyy-MM-dd--HH"
to name the rolling buckets. This pattern is passed to SimpleDateFormat
with the current system time to form a bucket path. A new bucket will be created whenever the bucket path changes. For example, if you have a pattern that contains minutes as the finest granularity you will get a new bucket every minute. Each bucket is itself a directory that contains several part files: Each parallel instance of the sink will create its own part file and when part files get too big the sink will also create a new part file next to the others. To specify a custom bucketer use setBucketer()
on a RollingSink
.
The default writer is StringWriter
. This will call toString()
on the incoming elements and write them to part files, separated by newline. To specify a custom writer use setWriter()
on a RollingSink
. If you want to write Hadoop SequenceFiles you can use the providedSequenceFileWriter
which can also be configured to use compression.
The last configuration option is the batch size. This specifies when a part file should be closed and a new one started. (The default part file size is 384 MB).
Example:
DataStream<Tuple2<IntWritable,Text>> input = ...; RollingSink sink = new RollingSink<String>("/base/path"); sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm")); sink.setWriter(new SequenceFileWriter<IntWritable, Text>()); sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB, input.addSink(sink);
val input: DataStream[Tuple2[IntWritable, Text]] = ... val sink = new RollingSink[String]("/base/path") sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm")) sink.setWriter(new SequenceFileWriter[IntWritable, Text]()) sink.setBatchSize(1024 * 1024 * 400) // this is 400 MB, input.addSink(sink)
This will create a sink that writes to bucket files that follow this schema:
/base/path/{date-time}/part-{parallel-task}-{count}
Where date-time
is the string that we get from the date/time format, parallel-task
is the index of the parallel sink instance and count
is the running number of part files that where created because of the batch size.
For in-depth information, please refer to the JavaDoc for RollingSink.
This connector provides access to data streams from RabbitMQ. To use this connector, add the following dependency to your project:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-rabbitmq</artifactId> <version>1.0-SNAPSHOT</version> </dependency>
Note that the streaming connectors are currently not part of the binary distribution. See linking with them for cluster execution here.
Follow the instructions from the RabbitMQ download page. After the installation the server automatically starts, and the application connecting to RabbitMQ can be launched.
A class providing an interface for receiving data from RabbitMQ.
The followings have to be provided for the RMQSource(…)
constructor in order:
Example:
DataStream<String> stream = env .addSource(new RMQSource<String>("localhost", "hello", new SimpleStringSchema())) .print
stream = env .addSource(new RMQSource[String]("localhost", "hello", new SimpleStringSchema)) .print
A class providing an interface for sending data to RabbitMQ.
The followings have to be provided for the RMQSink(…)
constructor in order:
Example:
stream.addSink(new RMQSink<String>("localhost", "hello", new StringToByteSerializer()));
stream.addSink(new RMQSink[String]("localhost", "hello", new StringToByteSerializer))
More about RabbitMQ can be found here.
Twitter Streaming API provides opportunity to connect to the stream of tweets made available by Twitter. Flink Streaming comes with a built-inTwitterSource
class for establishing a connection to this stream. To use this connector, add the following dependency to your project:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-twitter</artifactId> <version>1.0-SNAPSHOT</version> </dependency>
Note that the streaming connectors are currently not part of the binary distribution. See linking with them for cluster execution here.
In order to connect to Twitter stream the user has to register their program and acquire the necessary information for the authentication. The process is described below.
First of all, a Twitter account is needed. Sign up for free at twitter.com/signup or sign in at Twitter’s Application Management and register the application by clicking on the “Create New App” button. Fill out a form about your program and accept the Terms and Conditions. After selecting the application, the API key and API secret (called consumerKey
and sonsumerSecret
in TwitterSource
respectively) is located on the “API Keys” tab. The necessary access token data (token
and secret
) can be acquired here. Remember to keep these pieces of information secret and do not push them to public repositories.
Create a properties file, and pass its path in the constructor of TwitterSource
. The content of the file should be similar to this:
#properties file for my app secret=*** consumerSecret=*** token=***-*** consumerKey=***
The TwitterSource
class has two constructors.
public TwitterSource(String authPath, int numberOfTweets);
to emit finite number of tweetspublic TwitterSource(String authPath);
for streamingBoth constructors expect a String authPath
argument determining the location of the properties file containing the authentication information. In the first case, numberOfTweets
determines how many tweet the source emits.
In contrast to other connectors, the TwitterSource
depends on no additional services. For example the following code should run gracefully:
DataStream<String> streamSource = env.addSource(new TwitterSource("/PATH/TO/myFile.properties"));
streamSource = env.addSource(new TwitterSource("/PATH/TO/myFile.properties"))
The TwitterSource
emits strings containing a JSON code. To retrieve information from the JSON code you can add a FlatMap or a Map function handling JSON code. For example, there is an implementation JSONParseFlatMap
abstract class among the examples. JSONParseFlatMap
is an extension of the FlatMapFunction
and has a
String getField(String jsonText, String field);
getField(jsonText : String, field : String) : String
function which can be use to acquire the value of a given field.
There are two basic types of tweets. The usual tweets contain information such as date and time of creation, id, user, language and many more details. The other type is the delete information.
TwitterLocal
is an example how to use TwitterSource
. It implements a language frequency counter program.
A Docker container is provided with all the required configurations for test running the connectors of Apache Flink. The servers for the message queues will be running on the docker container while the example topology can be run on the user’s computer.
The official Docker installation guide can be found here. After installing Docker an image can be pulled for each connector. Containers can be started from these images where all the required configurations are set.
For the easiest setup, create a jar with all the dependencies of the flink-streaming-connectors project.
cd /PATH/TO/GIT/flink/flink-staging/flink-streaming-connectors mvn assembly:assembly ~~~bash This creates an assembly jar under *flink-streaming-connectors/target*. #### RabbitMQ Pull the docker image: ~~~bash sudo docker pull flinkstreaming/flink-connectors-rabbitmq
To run the container, type:
sudo docker run -p 127.0.0.1:5672:5672 -t -i flinkstreaming/flink-connectors-rabbitmq
Now a terminal has started running from the image with all the necessary configurations to test run the RabbitMQ connector. The -p flag binds the localhost’s and the Docker container’s ports so RabbitMQ can communicate with the application through these.
To start the RabbitMQ server:
sudo /etc/init.d/rabbitmq-server start
To launch the example on the host computer, execute:
java -cp /PATH/TO/JAR-WITH-DEPENDENCIES org.apache.flink.streaming.connectors.rabbitmq.RMQTopology \ > log.txt 2> errorlog.txt
There are two connectors in the example. One that sends messages to RabbitMQ, and one that receives messages from the same queue. In the logger messages, the arriving messages can be observed in the following format:
<DATE> INFO rabbitmq.RMQTopology: String: <one> arrived from RMQ <DATE> INFO rabbitmq.RMQTopology: String: <two> arrived from RMQ <DATE> INFO rabbitmq.RMQTopology: String: <three> arrived from RMQ <DATE> INFO rabbitmq.RMQTopology: String: <four> arrived from RMQ <DATE> INFO rabbitmq.RMQTopology: String: <five> arrived from RMQ
Pull the image:
sudo docker pull flinkstreaming/flink-connectors-kafka
To run the container type:
sudo docker run -p 127.0.0.1:2181:2181 -p 127.0.0.1:9092:9092 -t -i \ flinkstreaming/flink-connectors-kafka
Now a terminal has started running from the image with all the necessary configurations to test run the Kafka connector. The -p flag binds the localhost’s and the Docker container’s ports so Kafka can communicate with the application through these. First start a zookeeper in the background:
/kafka_2.9.2-0.8.1.1/bin/zookeeper-server-start.sh /kafka_2.9.2-0.8.1.1/config/zookeeper.properties \ > zookeeperlog.txt &
Then start the kafka server in the background:
/kafka_2.9.2-0.8.1.1/bin/kafka-server-start.sh /kafka_2.9.2-0.8.1.1/config/server.properties \ > serverlog.txt 2> servererr.txt &
To launch the example on the host computer execute:
java -cp /PATH/TO/JAR-WITH-DEPENDENCIES org.apache.flink.streaming.connectors.kafka.KafkaTopology \ > log.txt 2> errorlog.txt
In the example there are two connectors. One that sends messages to Kafka, and one that receives messages from the same queue. In the logger messages, the arriving messages can be observed in the following format:
<DATE> INFO kafka.KafkaTopology: String: (0) arrived from Kafka <DATE> INFO kafka.KafkaTopology: String: (1) arrived from Kafka <DATE> INFO kafka.KafkaTopology: String: (2) arrived from Kafka <DATE> INFO kafka.KafkaTopology: String: (3) arrived from Kafka <DATE> INFO kafka.KafkaTopology: String: (4) arrived from Kafka <DATE> INFO kafka.KafkaTopology: String: (5) arrived from Kafka <DATE> INFO kafka.KafkaTopology: String: (6) arrived from Kafka <DATE> INFO kafka.KafkaTopology: String: (7) arrived from Kafka <DATE> INFO kafka.KafkaTopology: String: (8) arrived from Kafka <DATE> INFO kafka.KafkaTopology: String: (9) arrived from Kafka
See the relevant section of the DataSet API documentation.
See the relevant section of the DataSet API documentation.
See the relevant section of the DataSet API documentation.
标签:
原文地址:http://www.cnblogs.com/Chuck-wu/p/5017436.html