标签:one data groups pen kafka das strftime inf column
#!/usr/local/python37/bin/python
#获取kafka命令中lags的值,来判定现在队列中有多少未消费,如果超过5000,则可能有延迟
import os
import re
import datetime
import time
import pandas as pd
import numpy as np
import subprocess
#now=datetime.datetime.now().strftime("%Y-%m-%dT%H:%M")
now=time.time()
lagsInfos=os.popen("sh /opt/elk/kafka_node1/bin/kafka-consumer-groups.sh --describe --bootstrap-server 192.168.10.100:9092 --group logstash | awk ‘{if($5>20){print $1,$5}}‘").read()
#定义dataframe的index及value的列表
columnList=[]
#lagList=[]
#print(lagsInfos.splitlines())
for i in range(1,len(lagsInfos.splitlines())):
lagList=[]
lagInfo=lagsInfos.splitlines()[i].split()
lagList.append(lagInfo[0])
lagList.append(int(lagInfo[1]))
columnList.append(lagList)
df=pd.DataFrame(columnList,columns=["topics","LAG"])
dfResult=df.groupby("topics",as_index=False).sum()
h1=dfResult.loc[dfResult["LAG"]>100,["topics","LAG"]]
#print(h1)
if len(h1)==0:
print("OK")
else:
#将要发送的短信内容中的空格和换行符替换成url里面的格式,否则在发送短信时会报错
msg=str(h1).replace(" ","%20").replace("\n","%0a")
url="‘http://sms.domain.com/Smsweb/sms?pid=smsPid&pwd=Mjdfadklfae&phone=1111111111&msg="+msg+"‘"
print(url)
result=subprocess.getoutput("curl " + url)
print(result)
标签:one data groups pen kafka das strftime inf column
原文地址:https://blog.51cto.com/happyting/2536191