标签:cci src info file __name__ ssd host bsp 验证码
from celery import Celery import time #测试 app = Celery(‘TASK‘, broker=‘redis://localhost‘, backend=‘redis://localhost‘) @app.task def add(x, y): print("running..add.", x, y) return x + y @app.task def minus(x, y): time.sleep(60) print("running..minus.", x, y) return x - y
celery -A tasks worker --loglevel=info # tasks是tasks.py文件:必须在tasks.py所在目录下执行
# -*- coding: utf-8 -*- # celery.py from celery import Celery import os,sys import django # 1.添加django项目根路径 CELERY_BASE_DIR = os.path.dirname(os.path.abspath(__file__)) sys.path.insert(0, os.path.join(CELERY_BASE_DIR, ‘../opwf‘)) # 2.添加django环境 os.environ.setdefault("DJANGO_SETTINGS_MODULE","opwf.settings") django.setup() # 读取配置 # 3.celery基本配置 app = Celery(‘proj‘, broker=‘redis://localhost:6379/14‘, backend=‘redis://localhost:6379/15‘, include=[‘celery_task.tasks‘, ]) # 4.实例化时可以添加下面这个属性 app.conf.update( result_expires=3600, #执行结果放到redis里,一个小时没人取就丢弃 ) # 5.配置定时任务:每5秒钟执行 调用一次celery_pro下tasks.py文件中的add函数 app.conf.beat_schedule = { ‘add-every-5-seconds‘: { ‘task‘: ‘celery_task.tasks.test_task_crontab‘, ‘schedule‘: 5.0, ‘args‘: (16, 16) }, } # 6.添加时区配置 app.conf.timezone = ‘Asia/Shanghai‘ print(dir(app)) if __name__ == ‘__main__‘: app.start()
from .celery import app import os,sys from .celery import CELERY_BASE_DIR ‘‘‘测试定时任务‘‘‘ @app.task() def test_task_crontab(x,y): ‘‘‘添加django项目路径‘‘‘ sys.path.insert(0, os.path.join(CELERY_BASE_DIR, ‘../loonflow‘)) from utils.rl_sms import test_crontab res = test_crontab(x,y) return x + y @app.task(bind=True) def send_sms_code(self, mobile, datas): sys.path.insert(0, os.path.join(CELERY_BASE_DIR, ‘../loonflow‘)) # 在方法中导包 from utils.rl_sms import send_message # time.sleep(5) try: # 用 res 接收发送结果, 成功是:0, 失败是:-1 res = send_message(mobile, datas) except Exception as e: res = ‘-1‘ if res == ‘-1‘: # 如果发送结果是 -1 就重试. self.retry(countdown=5, max_retries=3, exc=Exception(‘短信发送失败‘))
from ronglian_sms_sdk import SmsSDK from user.models import User accId = ‘8aaf070875774c6d01758d48effb0a36‘ accToken = ‘10ccc33de038428d864cb5eb122649f6‘ appId = ‘8aaf070875774c6d01758d48f0cd0a3d‘ def send_message(mobile, datas): user = User.objects.all()[0] print(user.username, ‘%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%‘) sdk = SmsSDK(accId, accToken, appId) tid = ‘1‘ # 测试模板id为: 1. 内容为: 【云通讯】您的验证码是{1},请于{2}分钟内正确输入。 # mobile = ‘13303479527‘ # datas = (‘666777‘, ‘5‘) # 模板中的参数按照位置传递 resp = sdk.sendMessage(tid, mobile, datas) print("##########################################") print(‘执行了这个方法 send_message‘) return resp def test_crontab(x,y): print(‘############### 执行test_crontab测试任务 #############‘) print(‘############### 邮件审批超时提醒 #############‘)
标签:cci src info file __name__ ssd host bsp 验证码
原文地址:https://www.cnblogs.com/shensy/p/14103495.html