Как сделать многопроцессорность?

Рейтинг: 3Ответов: 3Опубликовано: 08.04.2023

Функция принимает список чисел и для каждого делает список из чисел на которые оно делится без остатка. Я пытаюсь реализовать передачу нескольких чисел, но получаю очень много ошибок. Объясните как можно в моем случае реализовать многопроцессорность, что я делаю неправильно и нужно ли использовать RLock если я уже использую Manager.list()?

import time
from multiprocessing import cpu_count, Process, Manager


def factorize(factors: list, shared_value: list, *number):
    for num in number:
        for i in range(1, num+1):
            if num % i == 0:
                factors.append(i)
        shared_value.append(factors)
    return shared_value


if __name__ == '__main__':
    cpu_cores = cpu_count()
    start_time = time.time()

    with Manager() as manager:
        processes = []
        shared_list = manager.list()
        shared_factors = manager.list()
        for _ in range(cpu_cores+1):
            pr = Process(target=factorize, args=(shared_factors, shared_list, 128, 255, 99999, 10651060, ))
            pr.start()
            processes.append(pr)

    [el.join() for el in processes]
    end_time = time.time()
    running_time = end_time - start_time

    print(list(shared_list))
    print('Time:', running_time)

    assert shared_list[0] == [1, 2, 4, 8, 16, 32, 64, 128]
    assert shared_list[1] == [1, 3, 5, 15, 17, 51, 85, 255]
    assert shared_list[2] == [1, 3, 9, 41, 123, 271, 369, 813, 2439, 11111, 33333, 99999]
    assert shared_list[3] == [1, 2, 4, 5, 7, 10, 14, 20, 28, 35, 70, 140, 76079, 152158, 304316, 380395, 532553, 760790,
                 1065106, 1521580, 2130212, 2662765, 5325530, 10651060]

Ответы

▲ 6Принят

Ошибка в том что вы обращаетесь к shared_list когда manager уже уничтожен:

if __name__ == '__main__':
    ...
    with Manager() as manager:
        ...
        shared_list = manager.list()
        ...

    [el.join() for el in processes] # дочерние процессы ещё работают
                                    # а manager уже уничтожен
    ...
    print(list(shared_list))        # переменная зависит от нерабочего manager
    ...

Код можно исправить так:

if __name__ == '__main__':
    with Manager() as manager:
        cpu_cores = cpu_count()
        
        # весь код должен быть внутри контекста manager
        ...

        assert shared_list[3] == [1, 2, 4, 5, 7, 10, 14, 20, 28, 35, 70, 140, 76079, 152158, 304316, 380395, 532553, 760790,
                 1065106, 1521580, 2130212, 2662765, 5325530, 10651060]

P.S. RLock не нужен. Ваш код вполне работоспособен без него.

P.P.S. Я говорил только про техническую ошибку. Код не разделяет работу между процессами - все делают одно и то же. Если поставить цель доработать ваш код, нужно ввести очередь из которой процессы будут выбирать задачи и решать их. Это не лучший способ, но рабочий:

import time
import queue
from multiprocessing import cpu_count, Process, Manager


def factorize(queue_, numbers, factorizations):
    while True:
        try:
            k = queue_.get_nowait()
        except queue.Empty:
            break

        num = numbers[k]
        factors = factorizations[k]
        for i in range(1, num + 1):
            if num % i == 0:
                factors.append(i)


if __name__ == '__main__':
    with Manager() as manager:
        start_time = time.time()

        numbers = manager.list([128, 255, 99999, 10651060])
        factorizations = manager.list([manager.list() for _ in numbers])
        queue_ = manager.Queue()
        for k in range(len(numbers)):
            queue_.put(k)

        processes = [
            Process(target=factorize, args=(queue_, numbers, factorizations))
            for _ in range(cpu_count() + 1)
        ]

        for p in processes:
            p.start()

        for p in processes:
            p.join() 

        end_time = time.time()
        running_time = end_time - start_time

        print(*map(list, factorizations))
        print('Time:', running_time)

        assert list(factorizations[0]) == [1, 2, 4, 8, 16, 32, 64, 128]
        assert list(factorizations[1]) == [1, 3, 5, 15, 17, 51, 85, 255]
        assert list(factorizations[2]) == [1, 3, 9, 41, 123, 271, 369, 813, 2439, 11111, 33333, 99999]
        assert list(factorizations[3]) == [1, 2, 4, 5, 7, 10, 14, 20, 28, 35, 70, 140, 76079, 152158, 304316, 380395, 532553, 760790,
                     1065106, 1521580, 2130212, 2662765, 5325530, 10651060]

P.P.P.S. Pool.map - готовое решение для вашей задачи. Он сам запустит процессы по числу процессоров, распределит числа между процессами, подождёт окончания вычислений, соберёт результаты в общий список:

import time
from multiprocessing import Pool


def factorize(num):
    factors = []
    for i in range(1, num + 1):
        if num % i == 0:
            factors.append(i)
    return factors


if __name__ == '__main__':
    start_time = time.time()

    with Pool() as pool:
        factorizations = pool.map(factorize, [128, 255, 99999, 10651060])

    end_time = time.time()
    running_time = end_time - start_time

    print(*factorizations)
    print('Time:', running_time)

    assert factorizations[0] == [1, 2, 4, 8, 16, 32, 64, 128]
    assert factorizations[1] == [1, 3, 5, 15, 17, 51, 85, 255]
    assert factorizations[2] == [1, 3, 9, 41, 123, 271, 369, 813, 2439, 11111, 33333, 99999]
    assert factorizations[3] == [1, 2, 4, 5, 7, 10, 14, 20, 28, 35, 70, 140, 76079, 152158, 304316, 380395, 532553, 760790,
                 1065106, 1521580, 2130212, 2662765, 5325530, 10651060]
▲ 4

1 ошибка - процессы запускаются с одинаковыми начальными условиями. Работает полезно один, остальные греют воздух.

2 ошибка - менеджер закрыт.

Если хочешь чистый код для обработки последовательностей и списков - используй multiprocessing.Pool().imap(), в этом случае с чанками

Внешний цикл оставь в главном процессе, перенос его воркера не даёт преимуществ.

▲ 3

Проблема в том, что вы передаете один и тот же список factors в качестве аргумента для каждого процесса. Это означает, что каждый процесс будет пытаться добавить элементы в один и тот же список, что приведет к ошибке доступа к разделяемой памяти. Чтобы решить эту проблему, вы можете создать отдельный список factors для каждого процесса внутри функции factorize(). Это можно сделать, например, путем изменения функции factorize() следующим образом:

def factorize(shared_factors: list, shared_list: list, *number):
    for num in number:
        factors = []
        for i in range(1, num+1):
            if num % i == 0:
                factors.append(i)
        shared_factors.append(factors)
    shared_list.extend(shared_factors)
    return shared_list

Здесь мы создаем список factors внутри функции factorize() для каждого процесса, а затем добавляем его в общий список shared_factors. Затем мы добавляем все элементы из shared_factors в общий список shared_list с помощью метода extend(). Возвращаемым значением функции является общий список shared_list. Чтобы ответить на ваш вопрос о RLock, если вы используете Manager.list(), то вам не нужно использовать RLock, потому что Manager.list() уже обеспечивает безопасный доступ к разделяемому списку между процессами. Однако, если вы вместо этого используете обычный список, то вам нужно будет использовать RLock для синхронизации доступа к списку между процессами. Наконец, если вы хотите передавать несколько чисел, то вы можете просто передать их в виде аргументов после shared_list. Например, чтобы передать числа 128, 255, 99999 и 10651060, вы можете вызвать функцию factorize() следующим образом:

factorize(shared_factors, shared_list, 128, 255, 99999, 10651060)