标签:auto cts pytho models for ace 定义 ring mic
引入django-celery-beat包
INSTALLED_APPS = [
...
'django_celery_beat'
...
]
定义celery app
from __future__ import absolute_import
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_celery_demo.settings')
app = Celery('my_celery')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
定义配置文件:
# celery相关配置
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/2'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
# CELERY_RESULT_BACKEND = 'django-db'
INSTALLED_APPS = [
...
'django_celery_results'
...
]
#使用database作为结果存储 向installed_apps中添加django_celery_results
定义任务:
from logging import info
from django_celery_beat.models import PeriodicTask, IntervalSchedule
from celery import shared_task
import json
def my_task(string):
schedule, created = IntervalSchedule.objects.get_or_create(every=10, period=IntervalSchedule.SECONDS)
PeriodicTask.objects.create(
interval=schedule,
name='add device %s' % string,
task='celery_app.task.test_func',
args=json.dumps([string])
)
@shared_task
def test_func(string):
info(string)
相关Model:
class IntervalSchedule(models.Model):
"""Schedule executing on a regular interval.
Example: execute every 2 days
every=2, period=DAYS
"""
DAYS = DAYS
HOURS = HOURS
MINUTES = MINUTES
SECONDS = SECONDS
MICROSECONDS = MICROSECONDS
...
class CrontabSchedule(models.Model):
"""Timezone Aware Crontab-like schedule.
Example: Run every hour at 0 minutes for days of month 10-15
minute="0", hour="*", day_of_week="*",
day_of_month="10-15", month_of_year="*"
"""
#
minute = models.CharField(...)
hour = models.CharField(...)
day_of_week = models.CharField(...)
day_of_month = models.CharField(...)
month_of_year = models.CharField(...)
timezone = timezone_field.TimeZoneField(...)
class ClockedSchedule(models.Model):
"""clocked schedule."""
clocked_time = models.DateTimeField(
verbose_name=_('Clock Time'),
help_text=_('Run the task at clocked time'),
)
enabled = models.BooleanField(...)
python -m celery -A celery_app worker -l info -B -c 5
标签:auto cts pytho models for ace 定义 ring mic
原文地址:https://www.cnblogs.com/donghaoblogs/p/12411850.html