Оцените реализацию многопоточных воркеров
Это скелет, но рабочий.
Где хочу использовать - для выполнения задач в несколько потоков.
Задачи будут приходить неважно откуда, хоть из БД, хоть из очереди сообщений (rabbitmq, zeromq), хоть gearman.
Требования: необходимо выполнять задачи по мере их поступления, результаты должны возвращаться централизованно. В случае, если мы хотим послать результаты обратно в очередь задач, чтобы у нас не создавалось по одному подключению на поток, а использовался 1 общий.
Мой подход:
Сделал пул воркеров на основе классов Process.
Им передал в качестве агрументов очередь, которую они должны слушать на наличие задач, и очередь, в которую они должны слать результаты.
И сделал еще 1 поток, который слушает только очередь результатов. Соответственно, может брать оттуда данные, и пулять их в rabbitmq.
Этот подход позволяет реализовать функционал изменения количества воркеров в пулле при необходимости без перезапуска главного потока.
Код:
# -*- coding: utf8
__author__ = 'nolka'
import logging
import time
from telnetlib import Telnet
from multiprocessing import Queue, Process
import urllib2
class Worker(object):
def __init__(self, name):
self.file_name = None
self.handle = None
self.name = name
self.listen = True
def __del__(self):
if self.handle:
self.handle.close()
def work(self, url):
result = urllib2.urlopen(url)
if result.code == 200:
return result.read()
def doWork(id, jobs, results):
w = Worker("Worker%s" % id)
while True:
task = jobs.get()
if task is None:
logging.warn("%s is exiting..." % w.name)
break
logging.info(task)
r = w.work(task)
results.put(r)
def someResultMethod(results, someObj):
while True:
result = results.get()
if result is None:
logging.warn("Received poison...")
break
logging.info("Received result length: %s" % len(result))
if result:
someObj.write(result)
def main():
threads_count = 2
workers = list()
results_thread = None
listen_files = [
"http://stackoverflow.com",
"http://caldina-club.com",
"http://www.land-cruiser.ru",
"http://habrahabr.ru",
"http://pikabu.ru"
]
someObject = open("/tmp/somelog", "w")
logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.DEBUG)
jobs = Queue()
results = Queue()
for i in xrange(threads_count):
t = Process(target=doWork, args=(i, jobs, results))
t.start()
workers.append(t)
for file in listen_files:
jobs.put(file)
# В этот процесс можно будет расшарить, к примеру, rabbitmq
results_thread = Process(target=someResultMethod, args=(results, someObject ))
results_thread.start()
for file in listen_files:
jobs.put(None)
print("some heavy job")
time.sleep(.10)
print("heavy job is done... exiting...")
jobs.close()
jobs.join_thread()
for w in workers:
w.join()
print("Killing results manager...")
results.put(None)
print("Poison sent...")
results_thread.join()
someObject.close()
print("exit.")
if __name__ == "__main__":
main()