标签:日志 status remove method self timestamp env targe asc
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from sfo_common.agent import Agent
from sfo_common.import_common import *
class ElkLog(object):
"""
处理ELK数据类
"""
def __init__(self):
pass
def get_elk_log_json(self):
"""
通过调用elasticsearch接口查询指定索引数据,计算集群的平均响应时间
:return:
"""
try:
day = time.strftime("%Y.%m.%d",time.localtime(time.time()))
clusters = config.elk_index_name.split(‘,‘)
if clusters:
for cluster in clusters:
index_name="{}-swift-proxy-{}".format(cluster,day)
req_url = ‘{}{}/_search?pretty‘.format(config.elk_server_url,index_name)
headers = {‘content-type‘: "application/json"}
l_time = datetime.datetime.now() + datetime.timedelta(minutes=-5)
now_time = util.local2utc(datetime.datetime.now().strftime(‘%Y-%m-%d %H:%M:%S.%f‘))
now_time_5m = util.local2utc(l_time.strftime(‘%Y-%m-%d %H:%M:%S.%f‘))
body = {
"query": {
"bool":{
"must":{
"match_all":{}
},
"filter":{
"range":{
"@timestamp":{
"gte":now_time_5m,
"lte":now_time
}
}
}
}
},
"size": 10000,
"sort": {
"@timestamp": { "order": "asc" }
},
"_source": ["status", "method","client_ip","remote_ip","timestamp","request_time","@timestamp"]
}
#print req_url,body,headers
response = requests.post(req_url,data=json.dumps(body),headers=headers)
total_time=head_total_time=get_total_time=put_total_time=post_total_time=delete_total_time = 0.0
head_count=get_count=put_count=post_count=delete_count = 0
if response.status_code == 200:
tps = SfoClusterTps()
res_data = json.loads(response.text,encoding=‘UTF-8‘)
if res_data and res_data.has_key(‘hits‘):
hits = res_data[‘hits‘]
total = hits[‘total‘]
list = hits[‘hits‘]
if list and total > 0:
for obj in list:
if isinstance(obj,dict) and obj.has_key(‘_source‘):
source = obj[‘_source‘]
if source.has_key(‘request_time‘):
total_time += float(source[‘request_time‘])
if source.has_key(‘method‘) and str(source[‘method‘]).strip().upper()==‘HEAD‘:
head_count += 1
if source.has_key(‘request_time‘):
head_total_time += float(source[‘request_time‘])
if source.has_key(‘method‘) and str(source[‘method‘]).strip().upper()==‘GET‘:
get_count += 1
if source.has_key(‘request_time‘):
get_total_time += float(source[‘request_time‘])
if source.has_key(‘method‘) and str(source[‘method‘]).strip().upper()==‘PUT‘:
put_count += 1
if source.has_key(‘request_time‘):
put_total_time += float(source[‘request_time‘])
if source.has_key(‘method‘) and str(source[‘method‘]).strip().upper()==‘POST‘:
post_count += 1
if source.has_key(‘request_time‘):
post_total_time += float(source[‘request_time‘])
if source.has_key(‘method‘) and str(source[‘method‘]).strip().upper()==‘DELETE‘:
delete_count += 1
if source.has_key(‘request_time‘):
delete_total_time += float(source[‘request_time‘])
tps.guid = str(uuid.uuid1())
tps.cluster_name = cluster
if total > 0:
tps.avg_time = ‘%.2f‘%(total_time/total*1000)
else:
tps.avg_time = 0
if head_count > 0:
tps.head_time = ‘%.2f‘%(head_total_time/head_count*1000)
else:
tps.head_time = 0
if get_count > 0:
tps.get_time = ‘%.2f‘%(get_total_time/get_count*1000)
else:
tps.get_time = 0
if put_count > 0:
tps.put_time = ‘%.2f‘%(put_total_time/put_count*1000)
else:
tps.put_time = 0
if post_count > 0:
tps.post_time = ‘%.2f‘%(post_total_time/post_count*1000)
else:
tps.post_time = 0
if delete_count > 0:
tps.delete_time = ‘%.2f‘%(delete_total_time/delete_count*1000)
else:
tps.delete_time = 0
tps.add_time = time.strftime(‘%Y-%m-%d %H:%M:%S‘, time.localtime(time.time()))
db.session.add(tps)
db.session.commit()
else:
pass
else:
pass
else:
pass
except Exception as ex:
logger.exception("get_elk_log_json function execute exception:" + str(ex))
finally:
db.session.close()
db.session.remove()
#schedule tasks
def get_elklog_json_schl(executor):
"""
起线程执行日志分析
:param executor:
:return:
"""
try:
el = ElkLog()
executor.submit(el.get_elk_log_json)
#threading.Thread(target=el.get_elk_log_json).start()
except Exception as ex:
logger.exception("get_elklog_json_schl function execute exception:" + str(ex))
class ElklogUnitAgnet(Agent):
def __init__(self, pidfile):
Agent.__init__(self, pidfile)
def run(self):
try:
sys.stdout.flush()
hostname = socket.getfqdn()
hostip = socket.gethostbyname(hostname)
logger.info("hostname is {}, ip is {}".format(hostname, hostip))
#use schedule
with ThreadPoolExecutor(config.thread_workers) as executor:
schedule.every(config.upload_refresh).seconds.do(get_elklog_json_schl,executor)
schedule.run_all(0)
while True:
schedule.run_pending()
time.sleep(0.1)
except Exception as ex:
logger.exception("elk log agent run exception:" + str(ex))
def main():
agent = ElklogUnitAgnet(config.elklog_agnet_pfile)
try:
if len(sys.argv) == 3:
if ‘elklog‘ == sys.argv[1]:
if ‘start‘ == sys.argv[2]:
agent.start()
if ‘stop‘ == sys.argv[2]:
agent.stop()
else:
print("Unknown command")
sys.exit(2)
else:
print("usage: %s" % (sys.argv[0],))
sys.exit(2)
except Exception as ex:
logger.exception("elk log process run exception:" + str(ex))
if __name__ == ‘__main__‘:
main()
###########################################################################################
更多查询方式接口:
查询一条记录
curl -H "Content-Type: application/json" -X POST ‘http://192.168.1.1:9200/swift-nginx-2018.08.31/_search?pretty‘ -d ‘{"query": { "match_all": {} },"size": 1}‘
查询offset为20的记录
curl -H "Content-Type: application/json" -X POST ‘http://192.168.1.1:9200/swift-nginx-2018.08.31/_search?pretty‘ -d ‘{"query": { "match": { "offset": 20 } }}‘
查询结果只返回指定的字段
curl -H "Content-Type: application/json" -X POST ‘http://192.168.1.1:9200/swift-nginx-2018.08.31/_search?pretty‘ -d ‘{"query": { "match_all": {} },"_source": ["host", "message"]}‘
查询结果排序
curl -H "Content-Type: application/json" -X POST ‘http://192.168.1.1:9200/swift-nginx-2018.08.31/_search?pretty‘ -d ‘{"query": { "match_all": {} },"_source": ["offset","host", "message"]},"sort": { "offset": { "order": "desc" } }‘
返回从10开始的10条记录
curl -H "Content-Type: application/json" -X POST ‘http://192.168.1.1:9200/swift-nginx-2018.08.31/_search?pretty‘ -d ‘{"query": { "match_all": {} },"from": 10,"size": 10,"_source": ["offset","host", "message"]},"sort": { "offset": { "order": "desc" } }‘
全部查询接口实例请参考:
https://www.cnblogs.com/pilihaotian/p/5830754.html
标签:日志 status remove method self timestamp env targe asc
原文地址:https://www.cnblogs.com/chmyee/p/9907676.html