标签:out 环境 超时 register lol nbsp 使用 sys 方法
最近有同学留言咨询,Flink消费Kafka的一些问题,今天笔者将用一个小案例来为大家介绍如何将Kafka中的数据,通过Flink任务来消费并存储到HDFS上。
这里举个消费Kafka的数据的场景。比如,电商平台、游戏平台产生的用户数据,入库到Kafka中的Topic进行存储,然后采用Flink去实时消费积累到HDFS上,积累后的数据可以构建数据仓库(如Hive)做数据分析,或是用于数据训练(算法模型)。如下图所示:
整个流程,需要依赖的组件有Kafka、Flink、Hadoop。由于Flink提交需要依赖Hadoop的计算资源和存储资源,所以Hadoop的YARN和HDFS均需要启动。各个组件版本如下:
组件 | 版本 |
Kafka | 2.4.0 |
Flink | 1.10.0 |
Hadoop | 2.10.0 |
Flink消费Kafka集群中的数据,需要依赖Flink包,依赖如下:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-filesystem_2.12</artifactId> <version>${flink.connector.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.12</artifactId> <version>${flink.kafka.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>${flink.streaming.version}</version> </dependency>
编写消费Topic的Flink代码,这里不对Topic中的数据做逻辑处理,直接消费并存储到HDFS上。代码如下:
/** * Flink consumer topic data and store into hdfs. * * @author smartloli. * * Created by Mar 15, 2020 */ public class Kafka2Hdfs { private static Logger LOG = LoggerFactory.getLogger(Kafka2Hdfs.class); public static void main(String[] args) { if (args.length != 3) { LOG.error("kafka(server01:9092), hdfs(hdfs://cluster01/data/), flink(parallelism=2) must be exist."); return; } String bootStrapServer = args[0]; String hdfsPath = args[1]; int parallelism = Integer.parseInt(args[2]); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); env.setParallelism(parallelism); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<String> transction = env.addSource(new FlinkKafkaConsumer010<>("test_bll_data", new SimpleStringSchema(), configByKafkaServer(bootStrapServer))); // Storage into hdfs BucketingSink<String> sink = new BucketingSink<>(hdfsPath); sink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd")); sink.setBatchSize(1024 * 1024 * 1024); // this is 1GB sink.setBatchRolloverInterval(1000 * 60 * 60); // one hour producer a file into hdfs transction.addSink(sink); env.execute("Kafka2Hdfs"); } private static Object configByKafkaServer(String bootStrapServer) { Properties props = new Properties(); props.setProperty("bootstrap.servers", bootStrapServer); props.setProperty("group.id", "test_bll_group"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); return props; } }
public void onProcessingTime(long timestamp) throws Exception { long currentProcessingTime = processingTimeService.getCurrentProcessingTime(); closePartFilesByTime(currentProcessingTime); processingTimeService.registerTimer(currentProcessingTime + inactiveBucketCheckInterval, this); }
public void close() throws Exception { if (state != null) { for (Map.Entry<String, BucketState<T>> entry : state.bucketStates.entrySet()) { closeCurrentPartFile(entry.getValue()); } } }
public void invoke(T value) throws Exception { Path bucketPath = bucketer.getBucketPath(clock, new Path(basePath), value); long currentProcessingTime = processingTimeService.getCurrentProcessingTime(); BucketState<T> bucketState = state.getBucketState(bucketPath); if (bucketState == null) { bucketState = new BucketState<>(currentProcessingTime); state.addBucketState(bucketPath, bucketState); } if (shouldRoll(bucketState, currentProcessingTime)) { openNewPartFile(bucketPath, bucketState); } bucketState.writer.write(value); bucketState.lastWrittenToTime = currentProcessingTime; }
Flink消费Kafka数据并写到HDFS的代码实现是比较简短了,没有太多复杂的逻辑。实现的时候,注意Kafka的地址、反序列化需要在属性中配置、以及Flink任务提交的时候,设置yarn-cluster模式、设置好内存和CPU、HDFS存储路径等信息。
这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!
另外,博主出书了《Kafka并不难学》和《Hadoop大数据挖掘从入门到进阶实战》,喜欢的朋友或同学, 可以在公告栏那里点击购买链接购买博主的书进行学习,在此感谢大家的支持。关注下面公众号,根据提示,可免费获取书籍的教学视频。
标签:out 环境 超时 register lol nbsp 使用 sys 方法
原文地址:https://www.cnblogs.com/smartloli/p/12499142.html