Потоки Python: заставить основной поток сообщать о прогрессе

Я запускаю некоторые задания параллельно, что иногда может занять много времени, поэтому я хочу, чтобы основной поток сообщал о ходе выполнения. Например, каждый час.

Ниже приведена упрощенная версия того, что я придумал. Код будет выполняться test_function в 2 потоках с аргументами из input_arguments. Каждые 5 секунд он будет печатать % выполненных заданий.

import threading
import queue
import time


def test_function(x):
    time.sleep(4)
    print("Finished ", x)


num_processes = 2
input_arguments = range(10)

# Define a worker which will continuously execute function taking input parameters from the queue
def worker():
    while True:
        x = q.get()
        if x is None:
            break
        test_function(x)
        q.task_done()

# Initialize queue and the threads
q = queue.Queue()
threads = []
for i in range(num_processes):
    t = threading.Thread(target=worker)
    t.start()
    threads.append(t)

# Create a queue of input parameters for function
for item in input_arguments:
    q.put(item)

# Report progress every 5 seconds
report_progress(q)

# stop workers
for i in range(num_processes):
    q.put(None)
for t in threads:
    t.join()

Где report_progress определяется следующим образом

def report_progress(q):
    qsize_init = q.qsize()
    while not q.empty():
        time.sleep(5)
        portion_finished = 1 - q.qsize() / qsize_init
        print("run_parallel: {:.1%} jobs are finished".format(portion_finished))

Однако я хочу сообщать о прогрессе каждый час, а не 5 секунд, и если все задания будут завершены, программа может просто простаивать в течение многих минут.

Другая возможность состоит в том, чтобы определить report_progress по-другому:

def report_progress(q):
    qsize_init = q.qsize()
    time_start = time.time()
    while not q.empty():
        current_time = time.time()
        if current_time - time_start > 5:
            portion_finished = 1 - q.qsize() / qsize_init
            print("run_parallel: {:.1%} jobs are finished".format(portion_finished))
            time_start = time.time()

Я беспокоюсь, что постоянная проверка этого условия будет истощать ресурсы ЦП, небольшая часть, но в масштабе часов это может быть много.

Есть ли стандартный способ справиться с этим?

Питон: 3.6


person icemtel    schedule 28.11.2018    source источник
comment
Возможно, вместо опроса вы должны зависеть от событий. В этом случае вам нужно будет установить время ожидания события на 1 час, и если событие произойдет, вы продолжите. связанный ответ   -  person Yakup Türkan    schedule 28.11.2018
comment
стандартный способ одновременного опроса многих событий (например, ожидание 1-часового тайм-аута и смерти процесса) — это мультиплексирование ввода-вывода. В python наиболее удобным интерфейсом для этого является модуль asyncio.   -  person Andrii Maletskyi    schedule 28.11.2018
comment
Но использование стандартного способа может быть слишком сложным для вашего довольно простого случая. Вы можете просто: (1) - спать 10 секунд, (2) - проверить состояние потока, выйти из программы, если он мертв, (3) если прошел час с момента последнего отчета о ходе выполнения, рассчитать и распечатать новый, (4) гото (1)   -  person Andrii Maletskyi    schedule 28.11.2018
comment
@AndriyMaletsky У меня уже есть код, основанный на модуле threading, и я хотел добавить к нему небольшое дополнение. Если я использую asyncio, мне придется все это переписать, это правильно? Идея из вашего второго комментария выглядит как хороший компромисс   -  person icemtel    schedule 28.11.2018
comment
Вы правы, asyncio трудно интегрировать с существующим кодом. Довольно распространенным шаблоном является процесс менеджера, запускающий asyncio, он планирует работу, проверяет прогресс, проверяет, были ли все задания отменены пользователем, другими словами, делает много вещей одновременно. А рабочие процессы, которые запускаются как подпроцессы, тем самым изолируются от asyncio. Такой подход позволяет не переписывать рабочий код   -  person Andrii Maletskyi    schedule 28.11.2018
comment
Это звучит как хорошие функции, я рассмотрю этот пакет, если мне понадобится что-то более продвинутое в будущем.   -  person icemtel    schedule 28.11.2018


Ответы (1)


Пока я буду использовать простое решение, предложенное в комментариях @Andriy Maletsky.

Основной поток будет проверять каждые несколько секунд, не пуст ли еще q, и будет печатать сообщение о ходе выполнения, если с момента последнего отчета прошло более 1 часа.

time_between_reports = 3600
time_between_checks = 5
def report_progress_until_finished(q):
    qsize_init = q.qsize()
    last_report_time = time.time()
    while not q.empty():
        time_elapsed = time.time() - last_report_time
        if time_elapsed > time_between_reports:
            portion_finished = 1 - q.qsize() / qsize_init
            print("run_parallel: {:.1%} jobs are finished".format(portion_finished))
            last_report_time = time.time()
        time.sleep(time_between_checks)
person icemtel    schedule 28.11.2018