celery worker guide abstract
e.g. celery -A proj worker -l info
celery -A proj worker --loglevel=INFO --concurrency=10 -n worker1.%h
The hostname argument can expand the following variables:
%h: Hostname including domain name.
%n: Hostname only.
%d: Domain name only.
E.g. if the current hostname is george.example.com then these will expand to:
worker1.%h -> worker1.george.example.com
worker1.%n -> worker1.george
worker1.%d -> worker1.example.com
ps auxww | grep ‘celery worker‘ | awk ‘{print $2}‘ | xargs kill -9
要重启worker,最简单的方法便是用celery multi,但是要注意为每个worker指定pidfile和logfile
celery multi start 1 -A proj -l info -c4 --pidfile=/var/run/celery/%n.pid
celery multi restart 1 --pidfile=/var/run/celery/%n.pid
The worker’s main process overrides the following signals:
TERM Warm shutdown, wait for tasks to complete.
QUIT Cold shutdown, terminate ASAP
USR1 Dump traceback for all active threads.
USR2 Remote debug, see celery.contrib.rdb.
Variables in file paths
The file path arguments for --logfile, --pidfile and --statedb can contain variables that the worker will expand:
Node name replacements
%h: Hostname including domain name.
%n: Hostname only.
%d: Domain name only.
%i: Prefork pool process index or 0 if MainProcess.
%I: Prefork pool process index with separator.
E.g. if the current hostname is george.example.com then these will expand to:
--logfile=%h.log -> george.example.com.log
--logfile=%n.log -> george.log
--logfile=%d -> example.com.log
Prefork pool process index
The prefork pool process index specifiers will expand into a different filename depending on the process that will eventually need to open the file.
This can be used to specify one log file per child process.
Note that the numbers will stay within the process limit even if processes exit or if autoscale/maxtasksperchild/time limits are used. I.e. the number is the process index not the process count or pid.
%i - Pool process index or 0 if MainProcess.
Where -n worker1@example.com -c2 -f %n-%i.log will result in three log files:
worker1-0.log (main process)
worker1-1.log (pool process 1)
worker1-2.log (pool process 2)
%I - Pool process index with separator.
Where -n worker1@example.com -c2 -f %n%I.log will result in three log files:
worker1.log (main process)
worker1-1.log (pool process 1)
worker1-2.log (pool process 2)
root@workgroup0:~/celeryapp/configtest# ls
logging proj
root@workgroup0:~/celeryapp/configtest# celery -A proj worker --loglevel=INFO --concurrency=10 -n worker1.%h
root@workgroup0:~/celeryapp/configtest# ls
logging proj
root@workgroup0:~/celeryapp/configtest# cd proj
root@workgroup0:~/celeryapp/configtest/proj# la
agent.py agent.pyc celery.py celery.pyc config.py config.pyc __init__.py __init__.pyc
root@workgroup0:~/celeryapp/configtest/proj# cd ..
root@workgroup0:~/celeryapp/configtest# python
Python 2.7.6 (default, Mar 22 2014, 22:59:56)
[GCC 4.8.2] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> from proj.celery import app
>>> app.control.broadcast(‘rate_limit‘, {‘task_name‘: ‘proj.agent.add‘, ‘rate_limit‘: ‘200/m‘}, reply=True)
[{u‘celery@worker1.workgroup0.hzg.com‘: {u‘ok‘: u‘new rate limit set successfully‘}}]
>>> app.control.broadcast(‘rate_limit‘, {
... ‘task_name‘: ‘myapp.mytask‘,
... ‘rate_limit‘: ‘200/m‘}, reply=True,
... destination=[‘worker1@example.com‘])
[{‘worker1.example.com‘: ‘New rate limit set successfully‘}]
>>> result.revoke()
>>> AsyncResult(id).revoke()
>>> app.control.revoke(‘d9078da5-9915-40a0-bfa1-392c7bde42ed‘)
>>> app.control.revoke(‘d9078da5-9915-40a0-bfa1-392c7bde42ed‘,
... terminate=True)
>>> app.control.revoke(‘d9078da5-9915-40a0-bfa1-392c7bde42ed‘,
... terminate=True, signal=‘SIGKILL‘)
>>> app.control.revoke([
... ‘7993b0aa-1f0b-4780-9af0-c47c0858b3f2‘,
... ‘f565793e-b041-4b2b-9ca4-dca22762a55d‘,
... ‘d9d35e03-2997-42d0-a13e-64a66b88a618‘,
Revoking tasks works by sending a broadcast message to all the workers, the workers then keep a list of revoked tasks in memory. When a worker starts up it will synchronize revoked tasks with other workers in the cluster.
The list of revoked tasks is in-memory so if all workers restart the list of revoked ids will also vanish. If you want to preserve this list between restarts you need to specify a file for these to be stored in by using the –statedb argument to celery worker:
celery -A proj worker -l info --statedb=/var/run/celery/worker.state
celery multi start 2 -l info --statedb=/var/run/celery/%n.state
The time limit (–time-limit) is the maximum number of seconds a task may run before the process executing it is terminated and replaced by a new process. You can also enable a soft time limit (–soft-time-limit), this raises an exception the task can catch to clean up before the hard time limit kills it
from myapp import app
from celery.exceptions import SoftTimeLimitExceeded
def mytask():
except SoftTimeLimitExceeded:
Changing time limits at runtime
>>> app.control.time_limit(‘tasks.crawl_the_web‘,
soft=60, hard=120, reply=True)
[{‘worker1.example.com‘: {‘ok‘: ‘time limits set successfully‘}}]
Changing rate-limits at runtime
>>> app.control.rate_limit(‘myapp.mytask‘, ‘200/m‘)
>>> app.control.rate_limit(‘myapp.mytask‘, ‘200/m‘,
... destination=[‘celery@worker1.example.com‘])
Max tasks per child setting:
With this option you can configure the maximum number of tasks a worker can execute before it’s replaced by a new process.
The option can be set using the workers –maxtasksperchild argument or using the CELERYD_MAX_TASKS_PER_CHILD setting.
The autoscaler component is used to dynamically resize the pool based on load:
The autoscaler adds more pool processes when there is work to do,
and starts removing processes when the workload is low.
It’s enabled by the --autoscale option, which needs two numbers: the maximum and minimum number of pool processes:
Enable autoscaling by providing
max_concurrency,min_concurrency. Example:
--autoscale=10,3 (always keep 3 processes, but grow to
10 if necessary).
tips:You can also define your own rules for the autoscaler by subclassing Autoscaler. Some ideas for metrics include load average or the amount of memory available. You can specify a custom autoscaler with the CELERYD_AUTOSCALER setting.
A worker instance can consume from any number of queues. By default it will consume from all queues defined in the CELERY_QUEUES setting (which if not specified defaults to the queue named celery).
$ celery -A proj worker -l info -Q foo,bar,baz
If the queue name is defined in CELERY_QUEUES it will use that configuration, but if it’s not defined in the list of queues Celery will automatically generate a new queue for you (depending on the CELERY_CREATE_MISSING_QUEUES option).
You can also tell the worker to start and stop consuming from a queue at runtime using the remote control commands add_consumer and cancel_consumer.
还有Adding consumers,Cancelling consumers和List of active queues三个子模块在文档中,有更详细的example
Starting celery worker with the --autoreload option will enable the worker to watch for file system changes to all imported task modules imported (and also any non-task modules added to the CELERY_IMPORTS setting or the -I|--include option).
警告:This is an experimental feature intended for use in development only, using auto-reload in production is discouraged as the behavior of reloading a module in Python is undefined, and may cause hard to diagnose bugs and crashes.
Pool Restart Command
Requires the CELERYD_POOL_RESTARTS setting to be enabled.
The remote control command pool_restart sends restart requests to the workers child processes. It is particularly useful for forcing the worker to import new modules, or for reloading already imported modules. This command does not interrupt executing tasks.
>>> app.control.broadcast(‘pool_restart‘,
... arguments={‘modules‘: [‘foo‘, ‘bar‘]})
>>> app.control.broadcast(‘pool_restart‘,
... arguments={‘modules‘: [‘foo‘],
... ‘reload‘: True})
If you don’t specify any modules then all known tasks modules will be imported/reloaded:
>>> app.control.broadcast(‘pool_restart‘, arguments={‘reload‘: True})
app.control.inspect lets you inspect running workers. It uses remote control commands under the hood.
You can also use the celery command to inspect workers, and it supports the same commands as the app.control interface.
# Inspect all nodes.
>>> i = app.control.inspect()
# Specify multiple nodes to inspect.
>>> i = app.control.inspect([‘worker1.example.com‘,
# Specify a single node to inspect.
>>> i = app.control.inspect(‘worker1.example.com‘)
Dump of registered tasks
You can get a list of tasks registered in the worker using the registered()
Dump of currently executing tasks
You can get a list of active tasks using active()
Dump of scheduled (ETA) tasks
You can get a list of tasks waiting to be scheduled by using scheduled():
Dump of reserved tasks
Reserved tasks are tasks that has been received, but is still waiting to be executed.
You can get a list of these using reserved()
The remote control command inspect stats (or stats()) will give you a long list of useful (or not so useful) statistics about the worker:
$ celery -A proj inspect stats
Remote shutdown
Enable/disable events
Writing your own remote control commands
