Оцените реализацию многопоточных воркеров

Рейтинг: 0Ответов: 0Опубликовано: 04.09.2014

Это скелет, но рабочий.
Где хочу использовать - для выполнения задач в несколько потоков.
Задачи будут приходить неважно откуда, хоть из БД, хоть из очереди сообщений (rabbitmq, zeromq), хоть gearman.

Требования: необходимо выполнять задачи по мере их поступления, результаты должны возвращаться централизованно. В случае, если мы хотим послать результаты обратно в очередь задач, чтобы у нас не создавалось по одному подключению на поток, а использовался 1 общий.

Мой подход:

Сделал пул воркеров на основе классов Process.
Им передал в качестве агрументов очередь, которую они должны слушать на наличие задач, и очередь, в которую они должны слать результаты.
И сделал еще 1 поток, который слушает только очередь результатов. Соответственно, может брать оттуда данные, и пулять их в rabbitmq.

Этот подход позволяет реализовать функционал изменения количества воркеров в пулле при необходимости без перезапуска главного потока.

Код:

# -*- coding: utf8

__author__ = 'nolka'

import logging
import time
from telnetlib import Telnet
from multiprocessing import Queue, Process
import urllib2

class Worker(object):
    def __init__(self, name):
        self.file_name = None
        self.handle = None
        self.name = name
        self.listen = True

    def __del__(self):
        if self.handle:
            self.handle.close()

    def work(self, url):
        result = urllib2.urlopen(url)
        if result.code == 200:
            return result.read()

def doWork(id, jobs, results):
    w = Worker("Worker%s" % id)
    while True:
        task = jobs.get()
        if task is None:
            logging.warn("%s is exiting..." % w.name)
            break

        logging.info(task)
        r = w.work(task)
        results.put(r)

def someResultMethod(results, someObj):
    while True:
        result = results.get()
        if result is None:
            logging.warn("Received poison...")
            break
        logging.info("Received result length: %s" % len(result))
        if result:
            someObj.write(result)

def main():
    threads_count = 2
    workers = list()
    results_thread = None

    listen_files = [
        "http://stackoverflow.com",
        "http://caldina-club.com",
        "http://www.land-cruiser.ru",
        "http://habrahabr.ru",
        "http://pikabu.ru"
    ]

    someObject = open("/tmp/somelog", "w")

    logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.DEBUG)
    jobs = Queue()
    results = Queue()
    for i in xrange(threads_count):
        t = Process(target=doWork, args=(i, jobs, results))
        t.start()
        workers.append(t)

    for file in listen_files:
        jobs.put(file)

    # В этот процесс можно будет расшарить, к примеру, rabbitmq
    results_thread = Process(target=someResultMethod, args=(results, someObject ))
    results_thread.start()

    for file in listen_files:
        jobs.put(None)

    print("some heavy job")
    time.sleep(.10)
    print("heavy job is done... exiting...")

    jobs.close()
    jobs.join_thread()
    for w in workers:
        w.join()

    print("Killing results manager...")
    results.put(None)
    print("Poison sent...")
    results_thread.join()

    someObject.close()

    print("exit.")

if __name__ == "__main__":
    main()

Ответы

Ответов пока нет.