标签:
kafka指定topic每分钟接收到的数据总量监控
需求:获取kafka每分钟接收到的数据总量,并以 时间戳-topicName-flow的格式存入Mysql
设计思路:
1.得到kafka当前时间点的sum(logsize),并存入指定的file文件。
2.一分钟后再次执行脚本,得到即时的sum(logsize),同时读取file文件里的数字x,sum(logsize)-x即为一分钟内kafka指定topic的flow(流量)。
3.将获取到的flow存入mysql,同时将file文件中的数字更新。
4.通过定时器crontab循环以上流程。
shell脚本:
#!/bin/bash
#mysql-host
mysql_host=‘192.168.60.161‘
#mysql-port
mysql_port=‘3306‘
#mysql-username
username=‘root‘
#mysql-password
#注意:此处如果mysql密码不为空,需要去下面切换一下mysql的执行代码,注释掉下面那一行,放开上面。
password=‘‘
#mysql-DBNAME
DBNAME=‘kafka_monitor‘
#mysql-tableName
tableName=‘kafka_flow_monitor‘
#zookeeper-ip
zookeeper=‘192.168.60.158:2181‘
#topicID
topic=‘nifi_test‘
#KAFKA_HOME(kafka安装目录)
KAFKA_HOME=‘/home/kafka‘
#filePath(中间结果存放文件路径)
file=‘/tmp/kafka/dataCount.txt‘
#group
group=$(grep ‘group.id=‘ $KAFKA_HOME/config/consumer.properties | cut -d ‘=‘ -f2)
if [ $# -eq 1 ];then
topic=$1
fi
out=$(sh $KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper $zookeeper --group $group --topic $topic)
if [ $? -ne 0 ];then
sh $KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK earliest $KAFKA_HOME/config/consumer.properties $topic >> /dev/null
out=$(sh $KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper $zookeeper --group $group --topic $topic)
fi
i=0
sum=0
for line in $(echo "$out"); do
let i++
if [ $i -gt 7 ]; then
b=$(( $i % 7 ))
if [ $b -eq 5 ];then
sum=$(($sum + $line))
fi
fi
done
x=$(cat $file)
dataCount=$[ $sum-$x ]
if [ $dataCount -lt 0 ];then
dataCount=$sum
fi
dateTime=$(date ‘+%Y-%m-%d %H:%M:%S‘)
insert_sql="insert into ${tableName} values(‘$dateTime‘,‘$topic‘,$dataCount)"
echo $insert_sql
之后将此脚本加入到linux的定时任务内即可
[root@slave158 bin]# crontab -e
编辑文件:
* * * * * sh /home/kafka/bin/mysh.sh <topicName> >> /dev/null
此处的为你想要监控的topicName,前面的5颗星从左到右分别代表分钟,小时,天,周,月。如果想5分钟执行一次则可以写为*/5 * * * * ,每小时的第10分钟执行,则写为10 * * * *。当然你还可以输出日志到指定的目录以便后期分析:
* * * * * sh /home/kafka/bin/getDataCount.sh <topicName> >> /home/kafka/bin/getDataCount.log
由于只需求分析kafka每分钟接收到的数据总量,由此来判别kafka或者负责发送数据的客户端是否出现问题,所以这里只监控了logsize属性,大家可以由此扩展,即可监控到kafka指定topic的偏移量offset以及冗余量lag,做为kafka性能分析的依据。
标签:
原文地址:http://blog.csdn.net/qq_28326077/article/details/51331564