码迷,mamicode.com
首页 > 编程语言 > 详细

Python模拟Linux的Crontab, 写个任务计划需求

时间:2018-08-21 16:00:50      阅读:169      评论:0      收藏:0      [点我收藏+]

标签:stop   分享图片   ESS   tst   float   proc   计划执行   current   etc   

Python模拟Linux的Crontab, 写个任务计划需求

来具体点

  

需求:
    执行一个程序, 程序一直是运行状态, 这里假设是一个函数

    当程序运行30s的时候, 需要终止程序, 可以用python, c, c++语言实现

    扩展需求:
        当1分钟以后, 需要重新启动程序

def process_2():
    # 时间几点执行
    # 执行process_1
    # 开始监听--计时
    # 当时间超过多少s以后, 强制结束
        #(注意进程里面的任务执行进程是否还存在, 举个例子, 进程的任务是在创建一个进程, 那强制结束的是任务的进程, 而你创建的进程呢?)
    pass

 

技术分享图片
#!/usr/bin/env python
# -*- coding=utf-8 -*-


import sys, time, multiprocessing, datetime, os

# -----<自定义Error>----- #
class MyCustomError(Exception):
    def __init__(self, msg=None, retcode=4999):
        self.retcode = int(retcode)
        try:
            if not msg:
                msg = "Setting time is less than the current time Error. "
        except:
            msg = "Unknown Error!!!"
        Exception.__init__(self, self.retcode, msg)

# -----<格式化时间变成时间戳>----- #
class FormatDatetime():
    ‘‘‘
    Use method:
        FormatDatetime.become_timestamp("2018-08-21 13:19:00")
    ‘‘‘

    @staticmethod
    def become_timestamp(dtdt):
        # 将时间类型转换成时间戳
        if isinstance(dtdt, datetime.datetime):
            timestamp = time.mktime(dtdt.timetuple())
            return timestamp

        elif isinstance(dtdt, str):
            if dtdt.split(" ")[1:]:
                a_datetime = datetime.datetime.strptime(dtdt, "%Y-%m-%d  %H:%M:%S")
                timestamp = time.mktime(a_datetime.timetuple())
            else:
                a_datetime = datetime.datetime.strptime(dtdt, "%Y-%m-%d")
                timestamp = time.mktime(a_datetime.timetuple())
            return timestamp

        elif isinstance(dtdt, float):
            return dtdt


# -----<计时器>----- #
def timer(arg):
    ‘‘‘
    use method:
        timer(14)

    :param arg: 倒计时时间
    :return: True
    ‘‘‘
    # 倒计时时间
    back_time = arg
    while back_time != 0:
        sys.stdout.write(\r)  ##意思是打印在清空
        back_time -= 1
        sys.stdout.write("%s" % (int(back_time)))
        sys.stdout.flush()
        time.sleep(1)
    return True

# -----<自定义任务函数>----- #
class Tasks:
    @staticmethod
    def start():
        ip1 = "42.93.48.16"
        ip2 = "192.168.2.141"
        adjure = """hping3 -c 100000 -d 120 -S -w 64 -p 57511 -a 10.10.10.10 --flood --rand-source {}""".format(ip2)
        os.system(adjure)

    @staticmethod
    def stop():
        adjure = """netstat -anpo | grep hping3 | awk -F "[ /]+" ‘{print $7}‘ | xargs -i -t kill -9 {}"""
        os.system(adjure)


# -----<任务函数>----- #
def slow_worker():
    ‘‘‘
    你的自定义任务, 需要执行的代码
    :return:
    ‘‘‘
    print(Starting worker)
    time.sleep(0.1)
    print(Finished worker)
    Tasks.start()


# -----<任务计划执行时间>----- #
def crontab_time(custom_time):
    # custom_time = "2018-08-21 13:44:00"
    date_time = FormatDatetime.become_timestamp(custom_time)
    now_time = time.time()
    # 余数, 有小数
    remainder_data = int(date_time - now_time)

    if remainder_data < 0:
        raise MyCustomError
    else:
        while remainder_data > 0:
            remainder_data -= 1
            time.sleep(1)
        return True


# -----<执行函数>----- #
def my_crontab(custom_time=‘‘, frequency=1, countdown=0):
    ‘‘‘
    几点几分执行任务
    此函数属于业务逻辑, 可以考虑再次封装
    custom_time = "2018-08-21 13:19:00" 时间格式必须是这样
    frequency = 1 # 频次
    countdown = 0 # 倒计时多少s
    :return:
    ‘‘‘
    if custom_time:
        crontab_time_status = crontab_time(custom_time)
        if crontab_time_status:
            for i in range(frequency):
                p = multiprocessing.Process(target=slow_worker)
                p.start()
                if countdown:
                    status = timer(countdown)
                    if status:
                        p.terminate()
                        Tasks.stop()
    else:
        for i in range(frequency):
            p = multiprocessing.Process(target=slow_worker)
            p.start()
            if countdown:
                status = timer(countdown)
                if status:
                    p.terminate()
                    Tasks.stop()


if __name__ == __main__:
    my_crontab(frequency=1, countdown=50)
View Code

 

Python模拟Linux的Crontab, 写个任务计划需求

标签:stop   分享图片   ESS   tst   float   proc   计划执行   current   etc   

原文地址:https://www.cnblogs.com/renfanzi/p/9511705.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!