码迷,mamicode.com
首页 > 数据库 > 详细

Flink通过SQLClinet创建kafka源表并进行实时计算

时间:2020-04-09 19:09:25      阅读:458      评论:0      收藏:0      [点我收藏+]

标签:_id   fse   prope   bootstra   bin   category   bootstrap   技术   nec   

1.通过自建kafka的生产者来产生数据

/bin/kafka-console-producter.sh --broker-list 192.168.58.177:9092 --topic my_topic

数据

{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662868", "item_id":"1784", "category_id": "54123654", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662854", "item_id":"1456", "category_id": "12345678", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662858", "item_id":"1457", "category_id": "12345679", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}

技术图片

2.在kafka进行消费

/bin/kafka-console-consumer.sh --bootstrap-server 192.168.58.177:9092 --topic my_topic --partition 0 --offset 0

技术图片

 

 

3.在Flink的sqlclient 创建表

CREATE TABLE user_log1 (
    user_id VARCHAR,
    item_id VARCHAR,
    category_id VARCHAR,
    behavior VARCHAR,
    ts VARCHAR
) WITH (
    connector.type = kafka,
    connector.version = universal,
    connector.topic = my-topic-one,
    connector.startup-mode = earliest-offset,
    connector.properties.group.id = testGroup,
    connector.properties.zookeeper.connect = 192.168.58.171:2181,192.168.58.177:2181,192.168.58.178:2181,
    connector.properties.bootstrap.servers = 192.168.58.177:9092,
    format.type = json
);

技术图片

 

实时计算 

select item_id,count(*) from user_log1 group by item_id;

技术图片

 

Flink通过SQLClinet创建kafka源表并进行实时计算

标签:_id   fse   prope   bootstra   bin   category   bootstrap   技术   nec   

原文地址:https://www.cnblogs.com/yaowentao/p/12668885.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!