码迷,mamicode.com
首页 > 编程语言 > 详细

python3检测ossfs可用性+钉钉通知

时间:2019-12-05 14:48:52      阅读:96      评论:0      收藏:0      [点我收藏+]

标签:post   pre   art   ==   run   __name__   running   pos   cat   

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2019-12-02 15:16
# @Author  : Anthony
# @Email   : ianghont7@163.com
# @File    : check-ossfs.py

# 检测ossfs进程是否存在
# 检测/xxxx/file是否挂载成功
# ossfs可用性检测

import psutil
import requests
import json
import subprocess
import threading

class DingTalk_Base:
    def __init__(self):
        self.__headers = {Content-Type: application/json;charset=utf-8}
        self.url = https://oapi.dingtalk.com/robot/send?access_token=xxxxx
    def send_msg(self,text):
        json_text = {
            "msgtype": "text",
            "text": {
                "content": text
            },
            "at": {
                "atMobiles": [
                    ""
                ],
                "isAtAll": False
            }
        }
        return requests.post(self.url, json.dumps(json_text), headers=self.__headers).content
class DingTalk_Disaster(DingTalk_Base):
    def __init__(self):
        super().__init__()
        # 填写机器人的url
        self.url = "https://oapi.dingtalk.com/robot/send?access_token=xxxxx"

def check_oss_pid():
    pids = psutil.pids()
    for pid in pids:
        p = psutil.Process(pid)
        if "ossfs" == p.name().strip():
            return True
    else:
        return ding.send_msg("""ossfs报警通知:
机器:机器名称
消息内容:ossfs进程不存在,请及时处理...""")



def get_ossfs_name():
    try:
        args = "ls -lh /alidata/|grep file|awk ‘{print $3}‘"
        running_shell = subprocess.Popen(args,
                                         shell=True,
                                         stdin=subprocess.PIPE,
                                         stdout=subprocess.PIPE,
                                         stderr=subprocess.PIPE,)
        out,err = running_shell.communicate()
        for line in out.splitlines():
            s1 = bytes.decode(line).strip()
            if s1.strip() != "root":
                return ding.send_msg("""ossfs报警通知:
机器:语文课堂
消息内容:/xxxx/file/挂载失败,请及时处理...""")
    except Exception as e:
        print(e)

if __name__ == "__main__":
    ding = DingTalk_Disaster()
    threads = [threading.Thread(target=get_ossfs_name),
               threading.Thread(target=check_oss_pid)]
    for t in threads:
        t.start()

 

python3检测ossfs可用性+钉钉通知

标签:post   pre   art   ==   run   __name__   running   pos   cat   

原文地址:https://www.cnblogs.com/ipyanthony/p/11989214.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!