Как записать результаты выполнения функций в потоках в массив?

Рейтинг: 0Ответов: 3Опубликовано: 06.03.2023

Нашел код который демонстрирует работу многопоточности в python 3. Я хочу сделать чтобы в каждом потоке выполнилась моя функция и после завершения работы всех потоков я получил массив с результатами всех потоков

import queue
import threading
import time


# The queue for tasks
q = queue.Queue()

# Worker, handles each task
def worker():
    while True:
        item = q.get()
        if item is None:
            break
        
        #здесь должна выполниться моя функция я записать результат в массив
        result = subprocess.run(['command', 'arg1', 'arg2'], stdout=subprocess.PIPE)

        q.task_done()


def start_workers(worker_pool=1000):
    threads = []
    for i in range(worker_pool):
        t = threading.Thread(target=worker)
        t.start()
        threads.append(t)
    return threads


def stop_workers(threads):
    # stop workers
    for i in threads:
        q.put(None)
    for t in threads:
        t.join()


def create_queue(task_items):
    for item in task_items:
        q.put(item)


if __name__ == "__main__":
    # Dummy tasks
    tasks = [item for item in range(10)]

    # Start up your workers
    workers = start_workers(worker_pool=10)
    create_queue(tasks)

    # Blocks until all tasks are complete
    result = q.join()
    print(result)

    result2 = worker.join()
    print(result2)

    stop_workers(workers)

Ответы

▲ 2Принят

Проще всего наверное сделать ещё одну очередь для хранения результатов (очереди потокобезопасны):

q = queue.Queue()
# Для хранения результатов
result = queue.Queue()

def worker():
        ...
        # Помещаем результат в очередь
        result.put(subprocess.run(['command', 'arg1', 'arg2'], stdout=subprocess.PIPE))
...
stop_workers(workers)

# Разбираем результаты
for item in result.queue:
    print(item)
▲ 3

multiprocessing.dummy - интерфейс к Thread, повторяющий api multiprocessing. Функция map очень сильно упрощают обработку массивов.

from multiprocessing.dummy import Pool

pool = Pool(1000)

def runner(item):
    ...
    result = ...
    return result

if __name__ == "__main__":
    results = pool.map(runner, range(10))

Очереди, ожидания, запуск воркеров уже реализованы.

▲ 0

Часто, удобно не ждать, пока абсолютно все задачи закончат выполнятся в пуле pool.map, и только потом обработать все результаты скопом, а начинать обрабатывать результат каждой из задач по отдельности, сразу по окончании ее выполнения, параллельно продолжающейся работе в пуле. pool.imap_unordered будет возвращать результаты не в том порядке, в котором мы добавляли задачи, а какая из задач быстрее закончит выполнятся, результат той первее и вернется. Таким образом можно приделать прогресбар, или ускорить обработку результатов.

Например ниже, в пуле потоков будет отослано 10 запросов в гугл, ответы на запросы будут сохранены в файлы. В выводе скрипта видно, что сохранение файлов(обработка результатов работы пула) происходит параллельно работе пула запросов.

import multiprocessing.pool, time, random, urllib.request

Headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:23.0) Gecko/20100101 Firefox/23.0'}

def runner(word):
    print(f"\n-> {time.strftime('%H:%M:%S', time.gmtime())} search {word}")
    time.sleep(random.randint(1, 5))
    with urllib.request.urlopen(urllib.request.Request(f'https://www.google.com/search?q={word}', headers=Headers)) as http_body:
        return word, http_body.read()

if __name__ == '__main__':
    [*search_words] = range(10)
    with multiprocessing.pool.ThreadPool(processes=4) as pool:
        for word, http_body in pool.imap_unordered(runner, search_words):
            print(f"\n<- {time.strftime('%H:%M:%S', time.gmtime())} save {word}, len={len(http_body)}")
            with open(f'{word}_http_body.html', 'wb') as f:
                f.write(http_body)

out:

-> 20:55:38 search 0
-> 20:55:38 search 1
-> 20:55:38 search 2
-> 20:55:38 search 3
-> 20:55:41 search 4
<- 20:55:41 save 0, len=82176
-> 20:55:42 search 5
<- 20:55:42 save 2, len=78236
-> 20:55:43 search 6
<- 20:55:43 save 1, len=117082
-> 20:55:43 search 7
<- 20:55:43 save 3, len=147944
-> 20:55:44 search 8
<- 20:55:44 save 4, len=83972
-> 20:55:45 search 9
<- 20:55:45 save 5, len=72776
<- 20:55:46 save 8, len=138741
<- 20:55:48 save 6, len=83061
<- 20:55:49 save 7, len=114936
<- 20:55:49 save 9, len=191621