码迷,mamicode.com
首页 > 其他好文 > 详细

#sora#celery worker guide abstract

时间:2015-05-18 09:22:21      阅读:344      评论:0      收藏:0      [点我收藏+]

标签:

技术分享


celery worker guide abstract


启动worker:

e.g. celery -A proj worker -l info

     celery -A proj worker --loglevel=INFO --concurrency=10 -n worker1.%h


备注:

The hostname argument can expand the following variables:


%h: Hostname including domain name.

%n: Hostname only.

%d: Domain name only.


E.g. if the current hostname is george.example.com then these will expand to:


worker1.%h -> worker1.george.example.com

worker1.%n -> worker1.george

worker1.%d -> worker1.example.com



关闭worker:

如果遇到worker死循环无法退出,可以用此:

ps auxww | grep ‘celery worker‘ | awk ‘{print $2}‘ | xargs kill -9


要重启worker,最简单的方法便是用celery multi,但是要注意为每个worker指定pidfile和logfile

 celery multi start 1 -A proj -l info -c4 --pidfile=/var/run/celery/%n.pid

 celery multi restart 1 --pidfile=/var/run/celery/%n.pid

在生产环境中应使用init脚本或supervisord运行为daemon



几种信号:

The worker’s main process overrides the following signals:


TERM Warm shutdown, wait for tasks to complete.

QUIT Cold shutdown, terminate ASAP

USR1 Dump traceback for all active threads.

USR2 Remote debug, see celery.contrib.rdb.


其他补充说明:

Variables in file paths


The file path arguments for --logfile, --pidfile and --statedb can contain variables that the worker will expand:


Node name replacements


%h: Hostname including domain name.

%n: Hostname only.

%d: Domain name only.

%i: Prefork pool process index or 0 if MainProcess.

%I: Prefork pool process index with separator.

E.g. if the current hostname is george.example.com then these will expand to:


--logfile=%h.log -> george.example.com.log

--logfile=%n.log -> george.log

--logfile=%d -> example.com.log



关于prefork:

Prefork pool process index


The prefork pool process index specifiers will expand into a different filename depending on the process that will eventually need to open the file.


This can be used to specify one log file per child process.


Note that the numbers will stay within the process limit even if processes exit or if autoscale/maxtasksperchild/time limits are used. I.e. the number is the process index not the process count or pid.


%i - Pool process index or 0 if MainProcess.


Where -n worker1@example.com -c2 -f %n-%i.log will result in three log files:


worker1-0.log (main process)

worker1-1.log (pool process 1)

worker1-2.log (pool process 2)

%I - Pool process index with separator.


Where -n worker1@example.com -c2 -f %n%I.log will result in three log files:


worker1.log (main process)

worker1-1.log (pool process 1)

worker1-2.log (pool process 2)



并发:

可以用eventlet或prefork等方案,但似乎使用eventlet时要有所注意



远程控制:

可以使用多种命令对运行中的worker修改配置之类


broadcast()

e.g.

启动worker:

root@workgroup0:~/celeryapp/configtest# ls

logging  proj

root@workgroup0:~/celeryapp/configtest# celery -A proj worker --loglevel=INFO --concurrency=10 -n worker1.%h


另一个终端中:

目录层次如下:

root@workgroup0:~/celeryapp/configtest# ls

logging  proj

root@workgroup0:~/celeryapp/configtest# cd proj

root@workgroup0:~/celeryapp/configtest/proj# la

agent.py  agent.pyc  celery.py  celery.pyc  config.py  config.pyc  __init__.py  __init__.pyc

root@workgroup0:~/celeryapp/configtest/proj# 

root@workgroup0:~/celeryapp/configtest/proj# cd ..

root@workgroup0:~/celeryapp/configtest# python

Python 2.7.6 (default, Mar 22 2014, 22:59:56) 

[GCC 4.8.2] on linux2

Type "help", "copyright", "credits" or "license" for more information.

>>> from proj.celery import app

>>> app.control.broadcast(‘rate_limit‘, {‘task_name‘: ‘proj.agent.add‘, ‘rate_limit‘: ‘200/m‘}, reply=True)

[{u‘celery@worker1.workgroup0.hzg.com‘: {u‘ok‘: u‘new rate limit set successfully‘}}]

>>> 


可以看到已经成功修改了worker的rate-limit,

要注意相关模块的层次关系,否则会出现错误

也可以指定worker:

>>> app.control.broadcast(‘rate_limit‘, {

...     ‘task_name‘: ‘myapp.mytask‘,

...     ‘rate_limit‘: ‘200/m‘}, reply=True,

...                             destination=[‘worker1@example.com‘])

[{‘worker1.example.com‘: ‘New rate limit set successfully‘}]


取消任务和取消多个任务:

例子:

>>> result.revoke()


>>> AsyncResult(id).revoke()


>>> app.control.revoke(‘d9078da5-9915-40a0-bfa1-392c7bde42ed‘)


>>> app.control.revoke(‘d9078da5-9915-40a0-bfa1-392c7bde42ed‘,

...                    terminate=True)


>>> app.control.revoke(‘d9078da5-9915-40a0-bfa1-392c7bde42ed‘,

...                    terminate=True, signal=‘SIGKILL‘)


>>> app.control.revoke([

...    ‘7993b0aa-1f0b-4780-9af0-c47c0858b3f2‘,

...    ‘f565793e-b041-4b2b-9ca4-dca22762a55d‘,

...    ‘d9d35e03-2997-42d0-a13e-64a66b88a618‘,

])



任务撤销持久化:

文档描述:

Revoking tasks works by sending a broadcast message to all the workers, the workers then keep a list of revoked tasks in memory. When a worker starts up it will synchronize revoked tasks with other workers in the cluster.


The list of revoked tasks is in-memory so if all workers restart the list of revoked ids will also vanish. If you want to preserve this list between restarts you need to specify a file for these to be stored in by using the –statedb argument to celery worker:


e.g. 

celery -A proj worker -l info --statedb=/var/run/celery/worker.state

celery multi start 2 -l info --statedb=/var/run/celery/%n.state



设置任务超时:

文档描述:

The time limit (–time-limit) is the maximum number of seconds a task may run before the process executing it is terminated and replaced by a new process. You can also enable a soft time limit (–soft-time-limit), this raises an exception the task can catch to clean up before the hard time limit kills it


e.g.

软实时:

from myapp import app

from celery.exceptions import SoftTimeLimitExceeded


@app.task

def mytask():

    try:

        do_work()

    except SoftTimeLimitExceeded:

        clean_up_in_a_hurry()


也可以通过设置CELERYD_TASK_TIME_LIMIT / CELERYD_TASK_SOFT_TIME_LIMIT变量处理



运行时修改worker参数:

Changing time limits at runtime

e.g.

>>> app.control.time_limit(‘tasks.crawl_the_web‘,

                           soft=60, hard=120, reply=True)

[{‘worker1.example.com‘: {‘ok‘: ‘time limits set successfully‘}}]


Changing rate-limits at runtime

e.g.

>>> app.control.rate_limit(‘myapp.mytask‘, ‘200/m‘)


>>> app.control.rate_limit(‘myapp.mytask‘, ‘200/m‘,

...            destination=[‘celery@worker1.example.com‘])

tips:如果CELERY_DISABLE_RATE_LIMITS是enable,这个修改将会无效



其他参数与功能(注意有些并发方案并不支持某些特性):

Max tasks per child setting:

With this option you can configure the maximum number of tasks a worker can execute before it’s replaced by a new process.

The option can be set using the workers –maxtasksperchild argument or using the CELERYD_MAX_TASKS_PER_CHILD setting.


Autoscaling:

The autoscaler component is used to dynamically resize the pool based on load:


The autoscaler adds more pool processes when there is work to do,

and starts removing processes when the workload is low.


It’s enabled by the --autoscale option, which needs two numbers: the maximum and minimum number of pool processes:


--autoscale=AUTOSCALE

     Enable autoscaling by providing

     max_concurrency,min_concurrency.  Example:

       --autoscale=10,3 (always keep 3 processes, but grow to

      10 if necessary).


tips:You can also define your own rules for the autoscaler by subclassing Autoscaler. Some ideas for metrics include load average or the amount of memory available. You can specify a custom autoscaler with the CELERYD_AUTOSCALER setting.



Queues:

A worker instance can consume from any number of queues. By default it will consume from all queues defined in the CELERY_QUEUES setting (which if not specified defaults to the queue named celery).


e.g.

$ celery -A proj worker -l info -Q foo,bar,baz


If the queue name is defined in CELERY_QUEUES it will use that configuration, but if it’s not defined in the list of queues Celery will automatically generate a new queue for you (depending on the CELERY_CREATE_MISSING_QUEUES option).


You can also tell the worker to start and stop consuming from a queue at runtime using the remote control commands add_consumer and cancel_consumer.


还有Adding consumers,Cancelling consumers和List of active queues三个子模块在文档中,有更详细的example



Autoreloading(测试时可以用,不建议引入生产环境):

Starting celery worker with the --autoreload option will enable the worker to watch for file system changes to all imported task modules imported (and also any non-task modules added to the CELERY_IMPORTS setting or the -I|--include option).


警告:This is an experimental feature intended for use in development only, using auto-reload in production is discouraged as the behavior of reloading a module in Python is undefined, and may cause hard to diagnose bugs and crashes. 



Pool Restart Command

Requires the CELERYD_POOL_RESTARTS setting to be enabled.


The remote control command pool_restart sends restart requests to the workers child processes. It is particularly useful for forcing the worker to import new modules, or for reloading already imported modules. This command does not interrupt executing tasks.


e.g.

>>> app.control.broadcast(‘pool_restart‘,

...                       arguments={‘modules‘: [‘foo‘, ‘bar‘]})


>>> app.control.broadcast(‘pool_restart‘,

...                       arguments={‘modules‘: [‘foo‘],

...                                  ‘reload‘: True})


If you don’t specify any modules then all known tasks modules will be imported/reloaded:

>>> app.control.broadcast(‘pool_restart‘, arguments={‘reload‘: True})



查看worker的相关信息:

app.control.inspect lets you inspect running workers. It uses remote control commands under the hood.


You can also use the celery command to inspect workers, and it supports the same commands as the app.control interface.


e.g.

# Inspect all nodes.

>>> i = app.control.inspect()


# Specify multiple nodes to inspect.

>>> i = app.control.inspect([‘worker1.example.com‘,

                            ‘worker2.example.com‘])


# Specify a single node to inspect.

>>> i = app.control.inspect(‘worker1.example.com‘)


有以下几种功能:

Dump of registered tasks

You can get a list of tasks registered in the worker using the registered()


Dump of currently executing tasks

You can get a list of active tasks using active()


Dump of scheduled (ETA) tasks

You can get a list of tasks waiting to be scheduled by using scheduled():


Dump of reserved tasks

Reserved tasks are tasks that has been received, but is still waiting to be executed.

You can get a list of these using reserved()



Statistics

The remote control command inspect stats (or stats()) will give you a long list of useful (or not so useful) statistics about the worker:


e.g.

$ celery -A proj inspect stats


关于其输出的相关说明请参见文档


其他功能:

Remote shutdown

Ping

Enable/disable events

Writing your own remote control commands

(见文档)


#sora#celery worker guide abstract

标签:

原文地址:http://my.oschina.net/hochikong/blog/415929

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!