Как остановить потоки при получения данных отличных от None хотя бы из 1 потока?

Рейтинг: 0Ответов: 1Опубликовано: 29.04.2023

Есть кусок функции. Сейчас в data_list попадают в пересмешку и данные и None. Нужно сделать так, чтобы при попадании первых данных (не None) в data_list мы не дожидались выполнения остальных потоков.

def check_site(self, log):
    site = ['https://yandex.ru/internet', 'https://wtfismyip.com', 'https://whoer.net', 'https://2ip.ru']
    with ThreadPoolExecutor(max_workers=50) as pool:
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        futures = [loop.run_in_executor(pool, check_site, self.user, self.password, self.ip, self.port,
                                        self.protocol, url, log) for url in site]
        data_list = loop.run_until_complete(asyncio.gather(*futures))

Необходимо также отметить, что запросы отправляются через curl в формате:

curl -x http://usbW1q:fE3U35@193.111.5.203:8000 https://8.8.8.8 -w %{time_connect}

Может я что то не так делаю(((

Вот функция где я делаю futures и вызывают функцию wait_first_completed_not_none

async def check_dns(self, log):
    dns = ['https://google.com', 'https://208.67.222.222']
    protocols = ['http', 'socks5']
    futures = [asyncio.create_task(check_dns_no_class(self.user, self.password, self.ip, self.port, url, protocol,
                                                      log)) for url in dns for protocol in protocols]
    data = await wait_first_completed_not_none(futures)

и в data попадают данные которые были первыми получены только при выполнении всех futures и все futures выполняются по очереди

а тут я вызываю функцию proxy.check_dns

def run(proxy_attr, log, connection):
    proxy = Proxy(proxy_attr[0], proxy_attr[1], proxy_attr[2], proxy_attr[3])
    log.info(f'Check proxy: {proxy.ip}:{proxy.port}:{proxy.user}:{proxy.password}')
    id_proxy = add_proxy_to_database(connection, proxy)
    log.info(f'Proxy: {proxy.ip}:{proxy.port}:{proxy.user}:{proxy.password} add database')
    asyncio.run(proxy.check_dns(log))

Ответы

▲ 1Принят

Чтобы остановиться при завершении первого же таска, можно использовать asyncio.wait с параметром return_when=FIRST_COMPLETED. Если нужно проверять чтобы результат был не None, можно проверять результат в done, и повторять asynco.wait(pending), пока не вернется что-то кроме None.

Пример реализации ожидания первого таска, вернувшего не None, для асинхронных http запросов использую aiohttp:

import aiohttp
import asyncio
import traceback


async def fetch(url, session):
    async with session.get(url) as response:
        try:
            return await response.text()
        except Exception:
            # Просто печатаем ошибку и возвращаем None
            traceback.print_exc()  
            return None


async def wait_first_completed_not_none(futures):
    while True:
        # Если получили пустой список,
        # сразу возвращаем None, иначе будет бесконечный цикл
        if not futures:
            return None

        done, pending = await asyncio.wait(futures, return_when=asyncio.FIRST_COMPLETED)
        # done - список завершившихся тасков
        # pending - список незавершившихся тасков

        # Циклом идем по списку завершившихся тасков, проверяем результат
        for item in done:
            result = item.result()
            if result is not None:
                # Останавливаем незавершившиеся таски
                for pending_task in pending:
                    pending_task.cancel()
                
                return result

        # Если среди выполненных не было результатов не None, то идем на второй круг
        # Список незавершившихся тасков становится списком тасков, из которых мы будем ожидать результата
        futures = pending


async def check_site():
    site = ['https://yandex.ru/internet', 'https://wtfismyip.com', 'https://whoer.net', 'https://2ip.ru']

    async with aiohttp.ClientSession() as session:
        futures = []
        for link in site:
            futures.append(asyncio.create_task(fetch(link, session)))

        result = await wait_first_completed_not_none(futures)
        if result is None:
            print(result)
        else:
            print(result[:1000])



def main():
    asyncio.run(check_site())


if __name__ == "__main__":
    main()

Пример результата:

<!doctype html><html lang="ru"><head><title data-react-helmet="true">Яндекс.Интернетометр — проверка скорости интернета</title><meta data-react-helmet="true" name="description" content="Тест скорости интернета. IP-адрес, операционная система, версия браузера, cookies, разрешение экрана и другие параметры системы."/><meta data-react-helmet="true" name="keywords" content="скорость, интернет, тест, измерить, интернетометр, проверить, интернет соединение, узнать, замерить, cookies, cookie, версия браузера, разрешение экрана"/><meta data-react-helmet="true" name="og:type" content="website"/><meta data-react-helmet="true" name="og:title" content="Яндекс.Интернетометр — ваша скорость интернета"/><meta data-react-helmet="true" name="og:description" content="Проверка скорости интернета. IP-адрес, операционная система, версия браузера, cookies, разрешение экрана и другие параметры системы."/><meta data-react-helmet="true" name="og:url" content="https://yandex.ru/internet/"/><meta data-react-helm

В зависимости от того, какой сайт ответит раньше, результат может быть другим.