码迷,mamicode.com
首页 > 其他好文 > 详细

自动统计Kafka集群日志

时间:2016-01-04 18:26:44      阅读:439      评论:0      收藏:0      [点我收藏+]

标签:kafka   python   

编写python脚本 statistic.py

#!/usr/bin/python
"""pip install kazoo"""
"""pip install kafka-python"""
import time
import threading
from kazoo.client import KazooClient
from  kafka.consumer import KafkaConsumer
import elasticsearch
EARLYRES={}
def _get_partitions_logsize( topic,zookeepers,broker_list ):
    zk=KazooClient( hosts = zookeepers,read_only = True )
    try:
        zk.start()
       # res={}
    path = "/brokers/topics/"+topic+"/partitions"
    if zk.exists( path ):
        partitions = zk.get_children( path )
        eachsave = {}
        consumer = KafkaConsumer( topic,group_id="kafka_monitor",metadata_broker_list=broker_list.split(",") )
        fetch = consumer._offsets.fetch
        for partition in partitions:
            logsize = fetch[ ( topic, int(partition ) ) ]
            eachsave[ int( partition ) ] = logsize
    else:
        return {}
    return eachsave
    except Exception as e:
     #   print e
    return {}
    finally:
    zk.stop()
def analyze_logsize( zookeepers,broker_list ) :
    zk = KazooClient(hosts=zookeepers,read_only=True)
    try:
        zk.start()
        path = "/brokers/topics/"
        sum=0
        if zk.exists(path):
            topics = zk.get_children( path )
            for topic in topics:
                add_dict = {}
                nowpartitions = _get_partitions_logsize(topic,zookeepers,broker_list)
                if nowpartitions !=  {}:
                    for partition in nowpartitions:
                        sum += nowpartitions [ partition ]
            return sum    
        else:
            pass
    except Exception as e:
        pass
        print e
    finally:
        zk.stop()
if __name__ == ‘__main__‘:
    a = analyze_logsize( "127.0.0.1:2181/kafka/1001", "127.0.0.1" )
    utc = time.localtime( time.time() )
    File = open ( "/usr/home/shixi_kaiwen/script/develop/logsizecheck/2016/increment_day."+time.strftime( ‘%m-%d ‘,utc),"w" )
    File2 = open ( "/usr/home/shixi_kaiwen/script/develop/logsizecheck/2016/lastlogsize","r+")
    last = int ( File2.read() )
    increment = a - last
    increment = str ( increment )
    File.write( increment )
    File.close()
    File2.close()
    File3 = open( "/usr/home/shixi_kaiwen/script/develop/logsizecheck/2016/lastlogsize", "w")
    File3.write ( str( a ) )
    File3.close()
    print "last = ",last
    print "now_logsize = ",a
    print "increment = ",increment

在其目录下创建lastlogsize文件

echo "" > ./lastlogsize

添加crontab任务

   crontab -e   

    1 16 * * * python /usr/home/shixi_kaiwen/script/develop/logsizecheck/2016/statistic.py >> python /usr/home/shixi_kaiwen/script/develop/logsizecheck/2016/statistic.py >> /usr/home/shixi_kaiwen/script/develop/logsizecheck/2016/output.log 2>&1


本文出自 “谦虚” 博客,请务必保留此出处http://openex.blog.51cto.com/6280069/1731406

自动统计Kafka集群日志

标签:kafka   python   

原文地址:http://openex.blog.51cto.com/6280069/1731406

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!