Одна и та же задача Celery/Django/Redis выполняется несколько раз

В моем текущем проекте мне нужно получить данные с более чем 700 конечных точек, а затем отправить эти данные еще на более чем 700 конечных точек. Мой подход состоит в том, чтобы использовать Django Celery Redis, поставить более 70 конечных точек на каждого рабочего, чтобы было около 10 рабочих, которые будут извлекать данные, а затем публиковать данные.

Для этого я использую Chord для выполнения параллельной задачи, а затем рассчитываю время, необходимое для этого.

Проблема в том, что Celery выполняет одну и ту же задачу несколько раз. task_get_data — это основной метод, который сначала получает список веб-сайтов, затем разбивает его на группы по 70 в каждой, а затем вызывает task_post_data с помощью Chord.

В выводе ниже вы можете увидеть website_A, website_B и т. д. несколько раз, я вручную проверил свои данные и все остальное, и нет повторения веб-сайтов, но когда задача сельдерея отправляется, создается несколько записей.

Кроме того, есть ли способ контролировать количество рабочих и что они обрабатывают?

Ниже приведен код

os.environ.setdefault('DJANGO_SETTINGS_MODULE','django_backend.settings')

app = Celery('django_backend', backend='redis://localhost:6379', broker='redis://localhost:6379')

app.config_from_object('django.conf:settings', namespace='CELERY')
# app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

def post_data(json_obj, website):
    for items in json_obj:
        md = md + items['data']
    n = 50
    list_of_objects = [md[i:i+n] for i in range(0, len(md), n)]
    
    print("Starting to post data using post__data")
    for items in list_of_objects:
        data_details = PostDataDetails()
        data_details.data = items
        post_data_response = ""
        try:
            post_data_response = client.post_data(post_data_details = data_details)
            
            print("Successfully posted data with status code " + str(post_data_response.status) + " for " + website)
        except Exception as e:
           
    return post_data_response.status

def read_from_file():
        with open('./django_backend/data.json') as json_file:
            data = json.load(json_file)
            # print(data)
        return data

def split_list(l,n):
    website_list = [l[i:i+n] for i in range(0, len(l), n)]
    return website_list


#-----------this is the main method---------
@app.task(bind=True, name="task_get_data")
def task_get_data(self):
    start_time = datetime.datetime.now()
    try:
        website = task_helper.get_data()
        task_helper.write_logs("info", "Successfully read from cache")
    except Exception as e:
        task_helper.write_logs("error", "Error in reading from cache. Error" + str(e))
  
    website_list_chunks = split_list(list(website.keys()),70)
    callback = get_time_difference.s(start_time)

    try:
        task_helper.write_logs("info", "Starting the task_post_poller_data task to post  data")
        header = [task_post_data.s(website_list) for website_list in website_list_chunks]
        result = chord(header)(callback)
        print(result)
        task_helper.write_logs("info", "Successfully completed the task to post  data")
    except Exception as e:
        task_helper.write_logs("error", "Error in creating task_post_data task. Error" + str(e))


@app.task(bind=True, name="task_post_data")
def task_post_data(self,website_list=None) -> dict:
    json_object_response = True
    post_data_response = None
    for website in website_list:
        if json_object_response:
            file_data = read_from_file()
            try:
                post_data_response = post_data(file_data, website)
                # pass
            except Exception as e:
                print("error", "Error in creating task_post_poller_data task. Error" + str(e))
    return post_data_response    

Я запускаю код с помощью команды celery -A django_backend worker -l debug --purge и отправляю задачу с помощью этой команды.

python manage.py shell
>>>from django_backend.tasks import task_get_data
>>>task_get_data.delay()

Ниже вывод из консоли

[2021-07-20 19:54:54,789: INFO/ForkPoolWorker-3] Successfully posted  data with status code 200 for  website_D
[2021-07-20 19:54:54,835: INFO/ForkPoolWorker-5] Successfully posted  data with status code 200 for  website_E
[2021-07-20 19:54:54,840: INFO/ForkPoolWorker-2] Successfully posted  data with status code 200 for  website_B
[2021-07-20 19:54:54,843: INFO/ForkPoolWorker-1] Successfully posted  data with status code 200 for  website_A
[2021-07-20 19:54:54,882: INFO/ForkPoolWorker-6] Successfully posted  data with status code 200 for  website_P
[2021-07-20 19:54:54,891: INFO/ForkPoolWorker-8] Successfully posted  data with status code 200 for  website_I
[2021-07-20 19:54:54,895: INFO/ForkPoolWorker-4] Successfully posted  data with status code 200 for  website_R
[2021-07-20 19:54:55,021: INFO/ForkPoolWorker-3] Successfully posted  data with status code 200 for  website_D
[2021-07-20 19:54:55,025: INFO/ForkPoolWorker-7] Successfully posted  data with status code 200 for  website_C
[2021-07-20 19:54:55,073: INFO/ForkPoolWorker-2] Successfully posted  data with status code 200 for  website_B
[2021-07-20 19:54:55,086: INFO/ForkPoolWorker-1] Successfully posted  data with status code 200 for  website_A

person Samvid Kulkarni    schedule 20.07.2021    source источник


Ответы (1)


Это одна из известных проблем с Celery и Redis. В одном из моих проектов я назначал уникальный идентификатор в кеше для каждой задачи, а затем в начале задачи просто проверял, существует ли уже ключ или нет. Вы можете написать диспетчер контекста для такой вещи. Что-то вроде этого

@contextmanager
def task_lock(lock_id, oid, lock_expire_seconds=600, unlock_after_finish=False):
    """
    Be sure that task runs only once
    :param lock_id: unique id of task
    :param oid: unique id of current job (needs for debug only)
    :param lock_expire_seconds: task will be unlocked in x seconds
    :param unlock_after_finish: bool, allow run next task after finish of current one
    """
    timeout_at = datetime.utcnow() + timedelta(seconds=lock_expire_seconds)
    oid = "{}-{}".format(os.environ.get("HOSTNAME", ""), oid)
    # cache.add fails if the key already exists
    status = cache.add(lock_id, oid, lock_expire_seconds)
    try:
        yield status
    finally:
        # cache delete is very slow, but we have to use it to take
        # advantage of using add() for atomic locking
        if unlock_after_finish and datetime.utcnow() < timeout_at:
            # don't release the lock if we exceeded the timeout
            # to lessen the chance of releasing an expired lock
            # owned by someone else.
            cache.delete(lock_id)

И тогда в вашем коде задачи вы можете сделать

def some_task():
    with task_lock("task-lcok", current_time, lock_expire_seconds=10) as acquired:
        if acquired:
            # do something

В противном случае в конфигурации есть еще кое-что, с чем можно поиграться. Например, проверьте эти ответы.

  1. Celery снова и снова запускает длительные выполненные задачи
  2. проблема с Github
  3. одна и та же задача Celery/Redis выполняется несколько раз параллельно
person Sadan A.    schedule 20.07.2021
comment
Большое спасибо за ваш код, не могли бы вы показать мне, как я могу использовать это с Chord или Group? - person Samvid Kulkarni; 21.07.2021
comment
Я не уверен, как использовать его с группой или аккордом, но я отредактировал свой ответ, указав, как я его использовал. Я надеюсь, что это поможет или даст вам представление. - person Sadan A.; 21.07.2021
comment
Большое тебе спасибо. Ваш код помог. Что я сделал, так это создал еще одну задачу, в которой я бы делал только часть публикации на веб-сайтах, и я вызывал эту функцию внутри цикла with. - person Samvid Kulkarni; 23.07.2021