码迷,mamicode.com
首页 > 其他好文 > 详细

CELERY 内存中的变量串数据

时间:2020-06-15 12:22:53      阅读:58      评论:0      收藏:0      [点我收藏+]

标签:应该   access   lin   direct   src   mit   roc   任务   recv   

CELERY 内存中的变量串数据

0X01 背景

周四周五,花了两天的时间,思考着celery内存泄露的问题。
情境如下:
在使用内存变量的时候(跨步骤调用变量),突然想到一个问题,celery 多进程会不会共用一个变量,导致多个进程在运行的时候变量错乱了呢?
比如在进程1中把target 设置为A,在进程2中把target设置成2,两个进程的target共用一个,导致进程1拿到的target其实是A。

再具体点,插件扫描入口有集群名,经过层层调用最后在发包函数上需要这东西,最好通过内存方式的变量来传递,但是如果进程间会相互覆盖,就凉了。

0X02 DEMO与疑问

于是写了个demo作测试

1) celery任务生产端? celery_test.py

from tasks import task_main
for i in range(100):
    task_main.apply_async(args=[str(i)], queue=‘Aefault‘, routing_key=‘aefault‘)

2) 变量文件? data_type.py

import copy
import types

my_data = dict()

class AttribDict(dict):
    """
    This class defines the dictionary with added capability to access members as attributes

    >>> foo = AttribDict()
    >>> foo.bar = 1
    >>> foo.bar
    1
    """

    def __init__(self, indict=None, attribute=None):
        if indict is None:
            indict = {}

        # Set any attributes here - before initialisation
        # these remain as normal attributes
        self.attribute = attribute
        dict.__init__(self, indict)
        self.__initialised = True

        # After initialisation, setting attributes
        # is the same as setting an item

    def __getattr__(self, item):
        """
        Maps values to attributes
        Only called if there *is NOT* an attribute with this name
        """

        try:
            return self.__getitem__(item)
        except KeyError:
            return ""
            # raise AttributeError("unable to access item ‘%s‘" % item)

    def __setattr__(self, item, value):
        """
        Maps attributes to values
        Only if we are initialised
        """

        # This test allows attributes to be set in the __init__ method
        if "_AttribDict__initialised" not in self.__dict__:
            return dict.__setattr__(self, item, value)

        # Any normal attributes are handled normally
        elif item in self.__dict__:
            dict.__setattr__(self, item, value)

        else:
            self.__setitem__(item, value)

    def __getstate__(self):
        return self.__dict__

    def __setstate__(self, dict):
        self.__dict__ = dict

    def __deepcopy__(self, memo):
        retVal = self.__class__()
        memo[id(self)] = retVal

        for attr in dir(self):
            if not attr.startswith(‘_‘):
                value = getattr(self, attr)
                if not isinstance(value, (types.BuiltinFunctionType, types.FunctionType, types.MethodType)):
                    setattr(retVal, attr, copy.deepcopy(value, memo))

        for key, value in self.items():
            retVal.__setitem__(key, copy.deepcopy(value, memo))

        return retVal


data_info = AttribDict()
print(id(data_info))

def change_add():
    b = AttribDict()
    data_info = b

change_add()
print(id(data_info))

3) celery任务消费与子任务再生产 tasks.py

from celery import Task
from celery_app import app
from data_type import data_info, change_add, my_data
import random

@app.task(name=‘tasks.main‘, base=MyTask, bind=True, max_retries=10, default_retry_delay=10)
def task_main(self, param):

    # change_add()
    # print(str(os.getpid()) + "   " + str(threading.currentThread().ident))
    data_info["A_DATA " + param + " " + str(random.randint(0, 1000))] = 0
    my_data["A_DATA " + param + " " + str(random.randint(0, 1000))] = 0
    print("--- MAIN " + param + "  " + str(id(data_info)) + "  " + str(data_info))
    print("=== MAIN " + param + "  " + str(id(my_data)) + "  " + str(my_data))
    task_node.apply_async(args=[str(param)], queue=‘Aefault‘, routing_key=‘aefault‘)

4) 子任务消费 recv.py

from celery_app import app
from data_type import data_info, change_add, my_data
from recv2 import recv2
import random


@app.task()
def task_node(param):
    print("--- NODE " + param + "  " + str(id(data_info)) + "  " + str(data_info))
    print("=== NODE " + param + "  " + str(id(my_data)) + "  " + str(my_data))
    data_info["B_DATA " + param + " " + str(random.randint(0, 1000))] = 0
    my_data["B_DATA " + param + " " + str(random.randint(0, 1000))] = 0
    recv2(param)

5) 子任务调用 recv2.py

from data_type import data_info, change_add, my_data

def recv2(param):
    print("--- recv " + param + "  " + str(id(data_info)) + "  " + str(data_info))
    print("=== recv " + param + "  " + str(id(my_data)) + "  " + str(my_data))

6) 日志记录

[2020-06-12 17:39:11,535: WARNING/ForkPoolWorker-10] --- MAIN 0  140072813068960  {‘A_DATA 0 243‘: 0}
[2020-06-12 17:39:11,535: WARNING/ForkPoolWorker-10] === MAIN 0  140072813219184  {‘A_DATA 0 711‘: 0}
[2020-06-12 17:39:11,542: INFO/MainProcess] Received task: tasks.main[281a82a5-57e3-4442-80ea-6b5218871b98]  
[2020-06-12 17:39:11,544: INFO/MainProcess] Received task: tasks.main[ef098266-0cab-438e-ba6a-db3ee486c0da]  
[2020-06-12 17:39:11,546: WARNING/ForkPoolWorker-3] --- MAIN 1  140072813068960  {‘A_DATA 1 61‘: 0}
[2020-06-12 17:39:11,546: WARNING/ForkPoolWorker-3] === MAIN 1  140072813219184  {‘A_DATA 1 85‘: 0}
[2020-06-12 17:39:11,548: INFO/MainProcess] Received task: tasks.main[152afd1e-92c1-4738-8602-bde7d29f8ce4]  
[2020-06-12 17:39:11,548: WARNING/ForkPoolWorker-2] --- MAIN 2  140072813068960  {‘A_DATA 2 521‘: 0}
[2020-06-12 17:39:11,549: WARNING/ForkPoolWorker-2] === MAIN 2  140072813219184  {‘A_DATA 2 802‘: 0}
[2020-06-12 17:39:11,550: WARNING/ForkPoolWorker-6] --- MAIN 3  140072813068960  {‘A_DATA 3 574‘: 0}
[2020-06-12 17:39:11,551: WARNING/ForkPoolWorker-6] === MAIN 3  140072813219184  {‘A_DATA 3 208‘: 0}
[2020-06-12 17:39:11,560: INFO/MainProcess] Received task: tasks.main[de16ad35-bde8-41ed-a910-8eb8e9dff486]  
[2020-06-12 17:39:11,562: INFO/MainProcess] Received task: tasks.main[444cbb4e-7af2-4123-bf59-b69b1d3ed06c]  
[2020-06-12 17:39:11,563: WARNING/ForkPoolWorker-8] --- MAIN 4  140072813068960  {‘A_DATA 4 241‘: 0}
[2020-06-12 17:39:11,564: WARNING/ForkPoolWorker-1] --- MAIN 5  140072813068960  {‘A_DATA 5 108‘: 0}
[2020-06-12 17:39:11,564: WARNING/ForkPoolWorker-8] === MAIN 4  140072813219184  {‘A_DATA 4 101‘: 0}
[2020-06-12 17:39:11,564: WARNING/ForkPoolWorker-1] === MAIN 5  140072813219184  {‘A_DATA 5 710‘: 0}
[2020-06-12 17:39:11,577: INFO/MainProcess] Received task: tasks.main[9251870c-67e6-45e8-bedb-9e18a7adee61]  
[2020-06-12 17:39:11,579: WARNING/ForkPoolWorker-7] --- MAIN 6  140072813068960  {‘A_DATA 6 250‘: 0}
[2020-06-12 17:39:11,579: WARNING/ForkPoolWorker-7] === MAIN 6  140072813219184  {‘A_DATA 6 558‘: 0}
[2020-06-12 17:39:11,581: INFO/MainProcess] Received task: tasks.main[8fb18613-5475-4ba8-877f-99ec7b34ae0e]  
[2020-06-12 17:39:11,583: WARNING/ForkPoolWorker-5] --- MAIN 7  140072813068960  {‘A_DATA 7 766‘: 0}
[2020-06-12 17:39:11,583: WARNING/ForkPoolWorker-5] === MAIN 7  140072813219184  {‘A_DATA 7 392‘: 0}
[2020-06-12 17:39:11,593: INFO/MainProcess] Received task: tasks.main[87d5ad4d-3e08-4f37-a0a8-f41737ac16e3]  
[2020-06-12 17:39:11,600: INFO/MainProcess] Received task: tasks.main[01263f6c-1840-4270-9265-9351454ff410]  
[2020-06-12 17:39:11,602: WARNING/ForkPoolWorker-4] --- MAIN 8  140072813068960  {‘A_DATA 8 204‘: 0}
[2020-06-12 17:39:11,602: WARNING/ForkPoolWorker-9] --- MAIN 9  140072813068960  {‘A_DATA 9 350‘: 0}
[2020-06-12 17:39:11,602: WARNING/ForkPoolWorker-4] === MAIN 8  140072813219184  {‘A_DATA 8 676‘: 0}
[2020-06-12 17:39:11,602: WARNING/ForkPoolWorker-9] === MAIN 9  140072813219184  {‘A_DATA 9 996‘: 0}
[2020-06-12 17:39:11,680: INFO/MainProcess] Received task: recv.task_node[520f917f-2692-43de-ac72-e0a350980ff0]  
[2020-06-12 17:39:11,681: INFO/ForkPoolWorker-3] Task tasks.main[281a82a5-57e3-4442-80ea-6b5218871b98] succeeded in 0.135809466243s: None
[2020-06-12 17:39:11,683: INFO/MainProcess] Received task: recv.task_node[c22c353d-abfe-48eb-b023-50211f35854f]  
[2020-06-12 17:39:11,685: INFO/ForkPoolWorker-2] Task tasks.main[ef098266-0cab-438e-ba6a-db3ee486c0da] succeeded in 0.137533180416s: None
[2020-06-12 17:39:11,686: INFO/MainProcess] Received task: recv.task_node[d9fd63ff-3058-43e4-ba6e-c1fa92e50bbe]  
[2020-06-12 17:39:11,690: INFO/ForkPoolWorker-6] Task tasks.main[152afd1e-92c1-4738-8602-bde7d29f8ce4] succeeded in 0.140443377197s: None
[2020-06-12 17:39:11,691: INFO/ForkPoolWorker-10] Task tasks.main[f15c3b32-5a70-479d-a622-addd0f03330c] succeeded in 0.156630814075s: None
[2020-06-12 17:39:11,691: WARNING/ForkPoolWorker-3] --- NODE 1  140072813068960  {‘A_DATA 1 61‘: 0}
[2020-06-12 17:39:11,692: WARNING/ForkPoolWorker-3] === NODE 1  140072813219184  {‘A_DATA 1 85‘: 0}
[2020-06-12 17:39:11,692: WARNING/ForkPoolWorker-3] --- recv 1  140072813068960  {‘A_DATA 1 61‘: 0, ‘B_DATA 1 706‘: 0}
[2020-06-12 17:39:11,692: WARNING/ForkPoolWorker-3] === recv 1  140072813219184  {‘A_DATA 1 85‘: 0, ‘B_DATA 1 214‘: 0}
[2020-06-12 17:39:11,692: INFO/MainProcess] Received task: recv.task_node[7fd20514-f293-493c-9d67-0526cc38e85a]  
[2020-06-12 17:39:11,692: INFO/ForkPoolWorker-3] Task recv.task_node[520f917f-2692-43de-ac72-e0a350980ff0] succeeded in 0.000800415873528s: None
[2020-06-12 17:39:11,696: WARNING/ForkPoolWorker-2] --- NODE 2  140072813068960  {‘A_DATA 2 521‘: 0}
[2020-06-12 17:39:11,696: WARNING/ForkPoolWorker-2] === NODE 2  140072813219184  {‘A_DATA 2 802‘: 0}
[2020-06-12 17:39:11,697: WARNING/ForkPoolWorker-2] --- recv 2  140072813068960  {‘B_DATA 2 81‘: 0, ‘A_DATA 2 521‘: 0}
[2020-06-12 17:39:11,697: WARNING/ForkPoolWorker-2] === recv 2  140072813219184  {‘A_DATA 2 802‘: 0, ‘B_DATA 2 714‘: 0}
[2020-06-12 17:39:11,697: INFO/ForkPoolWorker-2] Task recv.task_node[c22c353d-abfe-48eb-b023-50211f35854f] succeeded in 0.000871725380421s: None
[2020-06-12 17:39:11,699: WARNING/ForkPoolWorker-3] --- NODE 3  140072813068960  {‘A_DATA 1 61‘: 0, ‘B_DATA 1 706‘: 0}
[2020-06-12 17:39:11,699: WARNING/ForkPoolWorker-3] === NODE 3  140072813219184  {‘A_DATA 1 85‘: 0, ‘B_DATA 1 214‘: 0}
[2020-06-12 17:39:11,699: WARNING/ForkPoolWorker-3] --- recv 3  140072813068960  {‘A_DATA 1 61‘: 0, ‘B_DATA 1 706‘: 0, ‘B_DATA 3 7‘: 0}
[2020-06-12 17:39:11,699: WARNING/ForkPoolWorker-10] --- NODE 0  140072813068960  {‘A_DATA 0 243‘: 0}
[2020-06-12 17:39:11,700: WARNING/ForkPoolWorker-3] === recv 3  140072813219184  {‘A_DATA 1 85‘: 0, ‘B_DATA 1 214‘: 0, ‘B_DATA 3 383‘: 0}
[2020-06-12 17:39:11,700: WARNING/ForkPoolWorker-10] === NODE 0  140072813219184  {‘A_DATA 0 711‘: 0}
[2020-06-12 17:39:11,700: INFO/ForkPoolWorker-3] Task recv.task_node[7fd20514-f293-493c-9d67-0526cc38e85a] succeeded in 0.000573091208935s: None
[2020-06-12 17:39:11,700: WARNING/ForkPoolWorker-10] --- recv 0  140072813068960  {‘A_DATA 0 243‘: 0, ‘B_DATA 0 345‘: 0}
[2020-06-12 17:39:11,700: WARNING/ForkPoolWorker-10] === recv 0  140072813219184  {‘B_DATA 0 392‘: 0, ‘A_DATA 0 711‘: 0}
[2020-06-12 17:39:11,700: INFO/ForkPoolWorker-10] Task recv.task_node[d9fd63ff-3058-43e4-ba6e-c1fa92e50bbe] succeeded in 0.000787496566772s: None
[2020-06-12 17:39:11,700: INFO/ForkPoolWorker-1] Task tasks.main[444cbb4e-7af2-4123-bf59-b69b1d3ed06c] succeeded in 0.1372801736s: None
[2020-06-12 17:39:11,702: INFO/MainProcess] Received task: recv.task_node[adf4ab3e-0930-43a7-a1f9-2b5a2d45556c]  
[2020-06-12 17:39:11,704: WARNING/ForkPoolWorker-1] --- NODE 5  140072813068960  {‘A_DATA 5 108‘: 0}
[2020-06-12 17:39:11,704: WARNING/ForkPoolWorker-1] === NODE 5  140072813219184  {‘A_DATA 5 710‘: 0}
[2020-06-12 17:39:11,705: WARNING/ForkPoolWorker-1] --- recv 5  140072813068960  {‘A_DATA 5 108‘: 0, ‘B_DATA 5 46‘: 0}
[2020-06-12 17:39:11,705: WARNING/ForkPoolWorker-1] === recv 5  140072813219184  {‘B_DATA 5 134‘: 0, ‘A_DATA 5 710‘: 0}
[2020-06-12 17:39:11,705: INFO/ForkPoolWorker-1] Task recv.task_node[adf4ab3e-0930-43a7-a1f9-2b5a2d45556c] succeeded in 0.00115340203047s: None
[2020-06-12 17:39:11,706: INFO/ForkPoolWorker-5] Task tasks.main[8fb18613-5475-4ba8-877f-99ec7b34ae0e] succeeded in 0.123670294881s: None
[2020-06-12 17:39:11,706: INFO/ForkPoolWorker-8] Task tasks.main[de16ad35-bde8-41ed-a910-8eb8e9dff486] succeeded in 0.143696308136s: None
[2020-06-12 17:39:11,708: INFO/MainProcess] Received task: recv.task_node[4f58e5a2-da57-40d9-8630-dbad91fe9247]  
[2020-06-12 17:39:11,711: INFO/MainProcess] Received task: recv.task_node[5a5fb2be-6c0d-46aa-bd99-2a22270b89ee]  
[2020-06-12 17:39:11,711: WARNING/ForkPoolWorker-3] --- NODE 7  140072813068960  {‘A_DATA 1 61‘: 0, ‘B_DATA 1 706‘: 0, ‘B_DATA 3 7‘: 0}
[2020-06-12 17:39:11,712: WARNING/ForkPoolWorker-3] === NODE 7  140072813219184  {‘A_DATA 1 85‘: 0, ‘B_DATA 1 214‘: 0, ‘B_DATA 3 383‘: 0}
[2020-06-12 17:39:11,712: WARNING/ForkPoolWorker-3] --- recv 7  140072813068960  {‘B_DATA 7 399‘: 0, ‘A_DATA 1 61‘: 0, ‘B_DATA 1 706‘: 0, ‘B_DATA 3 7‘: 0}
[2020-06-12 17:39:11,712: WARNING/ForkPoolWorker-3] === recv 7  140072813219184  {‘A_DATA 1 85‘: 0, ‘B_DATA 1 214‘: 0, ‘B_DATA 3 383‘: 0, ‘B_DATA 7 142‘: 0}
[2020-06-12 17:39:11,712: INFO/ForkPoolWorker-3] Task recv.task_node[4f58e5a2-da57-40d9-8630-dbad91fe9247] succeeded in 0.000611014664173s: None
[2020-06-12 17:39:11,713: WARNING/ForkPoolWorker-2] --- NODE 4  140072813068960  {‘B_DATA 2 81‘: 0, ‘A_DATA 2 521‘: 0}
[2020-06-12 17:39:11,713: WARNING/ForkPoolWorker-2] === NODE 4  140072813219184  {‘A_DATA 2 802‘: 0, ‘B_DATA 2 714‘: 0}
[2020-06-12 17:39:11,713: WARNING/ForkPoolWorker-2] --- recv 4  140072813068960  {‘B_DATA 2 81‘: 0, ‘A_DATA 2 521‘: 0, ‘B_DATA 4 25‘: 0}
[2020-06-12 17:39:11,713: WARNING/ForkPoolWorker-2] === recv 4  140072813219184  {‘A_DATA 2 802‘: 0, ‘B_DATA 4 744‘: 0, ‘B_DATA 2 714‘: 0}
[2020-06-12 17:39:11,713: INFO/ForkPoolWorker-9] Task tasks.main[01263f6c-1840-4270-9265-9351454ff410] succeeded in 0.112046726048s: None
[2020-06-12 17:39:11,713: INFO/ForkPoolWorker-2] Task recv.task_node[5a5fb2be-6c0d-46aa-bd99-2a22270b89ee] succeeded in 0.000574573874474s: None
[2020-06-12 17:39:11,716: INFO/ForkPoolWorker-7] Task tasks.main[9251870c-67e6-45e8-bedb-9e18a7adee61] succeeded in 0.137739785016s: None
[2020-06-12 17:39:11,716: INFO/MainProcess] Received task: recv.task_node[c5c172f5-bc96-4ef1-83ec-a054fb461076]  
[2020-06-12 17:39:11,717: INFO/ForkPoolWorker-4] Task tasks.main[87d5ad4d-3e08-4f37-a0a8-f41737ac16e3] succeeded in 0.11572509259s: None
[2020-06-12 17:39:11,717: WARNING/ForkPoolWorker-5] --- NODE 9  140072813068960  {‘A_DATA 7 766‘: 0}
[2020-06-12 17:39:11,717: WARNING/ForkPoolWorker-5] === NODE 9  140072813219184  {‘A_DATA 7 392‘: 0}
[2020-06-12 17:39:11,718: WARNING/ForkPoolWorker-5] --- recv 9  140072813068960  {‘A_DATA 7 766‘: 0, ‘B_DATA 9 5‘: 0}
[2020-06-12 17:39:11,718: WARNING/ForkPoolWorker-5] === recv 9  140072813219184  {‘B_DATA 9 30‘: 0, ‘A_DATA 7 392‘: 0}
[2020-06-12 17:39:11,718: INFO/ForkPoolWorker-5] Task recv.task_node[c5c172f5-bc96-4ef1-83ec-a054fb461076] succeeded in 0.000654295086861s: None
[2020-06-12 17:39:11,719: INFO/MainProcess] Received task: recv.task_node[26820268-5d73-4127-abb6-5acf4e1cd516]  
[2020-06-12 17:39:11,721: INFO/MainProcess] Received task: recv.task_node[121cb307-fbcd-44d5-a826-9244a253b468]  
[2020-06-12 17:39:11,722: WARNING/ForkPoolWorker-1] --- NODE 6  140072813068960  {‘A_DATA 5 108‘: 0, ‘B_DATA 5 46‘: 0}
[2020-06-12 17:39:11,722: WARNING/ForkPoolWorker-1] === NODE 6  140072813219184  {‘B_DATA 5 134‘: 0, ‘A_DATA 5 710‘: 0}
[2020-06-12 17:39:11,722: WARNING/ForkPoolWorker-1] --- recv 6  140072813068960  {‘B_DATA 6 220‘: 0, ‘A_DATA 5 108‘: 0, ‘B_DATA 5 46‘: 0}
[2020-06-12 17:39:11,722: WARNING/ForkPoolWorker-1] === recv 6  140072813219184  {‘B_DATA 6 540‘: 0, ‘B_DATA 5 134‘: 0, ‘A_DATA 5 710‘: 0}
[2020-06-12 17:39:11,722: INFO/ForkPoolWorker-1] Task recv.task_node[26820268-5d73-4127-abb6-5acf4e1cd516] succeeded in 0.00059811770916s: None
[2020-06-12 17:39:11,723: WARNING/ForkPoolWorker-9] --- NODE 8  140072813068960  {‘A_DATA 9 350‘: 0}
[2020-06-12 17:39:11,723: WARNING/ForkPoolWorker-9] === NODE 8  140072813219184  {‘A_DATA 9 996‘: 0}
[2020-06-12 17:39:11,723: WARNING/ForkPoolWorker-9] --- recv 8  140072813068960  {‘B_DATA 8 166‘: 0, ‘A_DATA 9 350‘: 0}
[2020-06-12 17:39:11,723: WARNING/ForkPoolWorker-9] === recv 8  140072813219184  {‘A_DATA 9 996‘: 0, ‘B_DATA 8 273‘: 0}
[2020-06-12 17:39:11,724: INFO/ForkPoolWorker-9] Task recv.task_node[121cb307-fbcd-44d5-a826-9244a253b468] succeeded in 0.000640526413918s: None

从数据上来说,在开启proform的模式下(多进程),c =1(数量为1),第一个进程赋予变量?my_data的值在第二个进程中还能使用。
测试数据的环境是python2 + celery 3.2.1,会在3-5秒的时间内重置内存里的变量,但是地址是不变的。
在python3和celery4的情况,干脆就不重置了,在设置 my_data[random.randint(0, 1000)]=0? 的情况下,完成一个任务,my_data多一个键值,意味着内存里的变量是不重置的。

0X03 画个图

技术图片
A生产数据,B消费再生产,C消费的时候调用了D。
B改动了E,再发送任务给broker。C再读取E,能拿到B之前写入E的数据(不是通过broker,而是存在内存中的)。
所以,这个现象说明了,多个进程之间的变量用的是同一个内存,会互相重置导致错乱么?

0X03 多进程串数据?

设置中CELERYD_MAX_TASKS_PER_CHILD=7? # 每个worker执行了多少任务
启动时preform -c 3

其实运行的模型长这样
技术图片
进程1 2 3之间的数据是不互相影响的。
但是在一个进程中的任务1-7使用的是相同的内存。所以在任务1中,B改了数据E;任务2中,C再读取E,就能读到数据。
虽然从理论上来说,B生产任务发送给C,并没有E的数据,E不应该拿到。但是因为worker未销毁,变量的内存不变,所以就拿到了,同一个进程中的所有任务共享了。

会引起串数据的问题么?不会。因为任务1-7其实是串联执行的。
一个进程在执行任务运行C的时候,并不会执行其他任务,也不会有一个B或者一个C来修改掉E的数据。

所以多进程不会串数据。

0X04 协程/多线程串数据?

1) 协程与多线程是会串数据的。

以多线程为例,执行一个进程,进程中有n个线程,共用一处内存,存储在内存中的变量随时会被改来改去的。
技术图片

想要在C设置一个数据,由调用的末端D拿到,如果只是放在一个变量中,随时可能会被其他线程拿到,也不能用锁吧。
这种情况,设置
dict = {
"一个任务的标识": "一个任务想要的数据",
"一个任务的标识": "一个任务想要的数据"
}

2) 标识上

os.getpid() 进程ID大家都是一样的;
但是线程ID是不一样的,任务的标识可以用线程ID;

print(threading.currentThread().name)
print(threading.currentThread().ident)

也可以用task.id,任务id在多进程、多线程、多协程的情况下都是可以使用的,不用考虑太多。

3) task id的调用:

recv2.py(步骤D)

from data_type import data_info, change_add, my_data


def recv2(param):
    from recv import task_node
    print(task_node.request)
    print(task_node.request.id)
    print("--- recv " + param + "  " + str(id(data_info)) + "  " + str(data_info))
    print("=== recv " + param + "  " + str(id(my_data)) + "  " + str(my_data))

<Context: {
‘lang‘: ‘py‘,
‘task‘: ‘recv.task_node‘,
‘id‘: ‘1f4c9e66-3673-4163-a8f7-2a5a8418f742‘,
‘shadow‘: None,
‘eta‘: None,
‘expires‘: None,
‘group‘: None,
‘retries‘: 0,
‘timelimit‘: [None, None],
‘root_id‘: ‘f804e984-6e76-4b9d-98fb-6fe3f6bb935f‘,
‘parent_id‘: ‘f804e984-6e76-4b9d-98fb-6fe3f6bb935f‘,
‘argsrepr‘: "[‘2‘]",
‘kwargsrepr‘: ‘{}‘,
‘origin‘: ‘gen579559@127.0.0.1‘,
‘reply_to‘: ‘7c0a805f-de11-36bc-b5f4-f6bd1f5ea5e4‘,
‘correlation_id‘: ‘1f4c9e66-3673-4163-a8f7-2a5a8418f742‘,
‘hostname‘: ‘celery@127.0.0.1‘,
‘delivery_info‘: {
?‘exchange‘: ‘‘,
?‘routing_key‘: ‘Aefault‘,
?‘priority‘: 0,
?‘redelivered‘: None
},
‘args‘: [‘2‘],
‘kwargs‘: {},
‘is_eager‘: False,
‘callbacks‘: None,
‘errbacks‘: None,
‘chain‘: None,
‘chord‘: None,
‘called_directly‘: False,
‘_protected‘: 1
}>

4)内存泄露

因为多线程用task.id或者线程id做标识,变量会越变越大,所以理论上应该设置?CELERYD_MAX_TASKS_PER_CHILD = 100 / 1000,执行固定数量的任务后销毁重启。

CELERY 内存中的变量串数据

标签:应该   access   lin   direct   src   mit   roc   任务   recv   

原文地址:https://www.cnblogs.com/huim/p/13129943.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!