Помогите разобраться почему не работает асинхронность и как это исправить?

Рейтинг: 0Ответов: 0Опубликовано: 06.05.2023

check_dns_no_class - не работает асинхронно, выполняется поочередно каждый таск во futures и и только после выполнения всех тасков берутся(как я понял) данные из первого выполненного таска. Как это можно исправить? есть догадка что все блокируется subprocess - изначально была идея все сделать через ThreadPoolExecutor+ asyncio.gather - но тогда не понятно как брать валидный результат не дожидаясь выполнения остальных потоков, а лучше уничтожать не завершившиеся при этом потоки.

def run(proxy_attr, log, connection):
    proxy = Proxy(proxy_attr[0], proxy_attr[1], proxy_attr[2], proxy_attr[3])
    log.info(f'Check proxy: {proxy.ip}:{proxy.port}:{proxy.user}:{proxy.password}')
    id_proxy = add_proxy_to_database(connection, proxy)
    log.info(f'Proxy: {proxy.ip}:{proxy.port}:{proxy.user}:{proxy.password} add database')
    asyncio.run(proxy.check_dns(log))
async def check_dns(self, log):
    dns = ['https://google.com', 'https://208.67.222.222']
    protocols = ['http', 'socks5']

    futures = [asyncio.create_task(check_dns_no_class(self.user, self.password, self.ip, self.port, url, protocol,
                                                      log)) for url in dns for protocol in protocols]
    data = await wait_first_completed_not_none(futures)
    if data is not None:
        log.debug(f'[{self.ip}:{self.port}:{self.user}:{self.password}][{data["dns"]}][{data["protocol"]}] '
                  f'Response: PROXY CHECKED on dns')
        self.status = 1
        self.protocol = data['protocol']
        self.dns = data['dns']

async def check_dns_no_class(user, password, ip, port, url, protocol, log):
    w = "%{http_code}"
    curl_url = f'curl -x "{protocol}://{user}:{password}@{ip}:{port}" -w {w} {url}'
    log.info(f'[{ip}:{port}:{user}:{password}][{url}] Сurl request: {protocol}')
    process = await asyncio.create_subprocess_shell(curl_url, stdout=asyncio.subprocess.PIPE,
                                                    stderr=asyncio.subprocess.PIPE)
    try:
        stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=3)
        data = codecs.decode(stdout)
        error = re.findall(r"curl:[^\r\n]*", codecs.decode(stderr))
    except asyncio.TimeoutError:
        log.warning(f'[{ip}:{port}:{user}:{password}][{url}][{protocol}] Response: Time Out')
        return
    code = data
    if code == '000':
        log.warning(f'[{ip}:{port}:{user}:{password}][{url}][{protocol}] Response: {error}')
        return
    else:
        return {'protocol': protocol, 'dns': url}
async def wait_first_completed_not_none(futures):
    while True:
        # Если получили пустой список,
        # сразу возвращаем None, иначе будет бесконечный цикл
        if not futures:
            return None

        done, pending = await asyncio.wait(futures, return_when=asyncio.FIRST_COMPLETED)
        # done - список завершившихся тасков
        # pending - список незавершившихся тасков

        # Циклом идем по списку завершившихся тасков, проверяем результат
        for item in done:
            result = item.result()
            if result is not None:
                # Останавливаем незавершившиеся таски
                for pending_task in pending:
                    pending_task.cancel()
                return result

        # Если среди выполненных не было результатов не None, то идем на второй круг
        # Список незавершившихся тасков становится списком тасков, из которых мы будем ожидать результата
        futures = pending

Exception ignored in: <function BaseSubprocessTransport.__del__ at 0x00000235E9C071C0>
Traceback (most recent call last):
  File "C:\Users\Home\AppData\Local\Programs\Python\Python310\lib\asyncio\base_subprocess.py", line 126, in __del__
    self.close()
  File "C:\Users\Home\AppData\Local\Programs\Python\Python310\lib\asyncio\base_subprocess.py", line 104, in close
    proto.pipe.close()
  File "C:\Users\Home\AppData\Local\Programs\Python\Python310\lib\asyncio\proactor_events.py", line 109, in close
    self._loop.call_soon(self._call_connection_lost, None)
  File "C:\Users\Home\AppData\Local\Programs\Python\Python310\lib\asyncio\base_events.py", line 753, in call_soon
    self._check_closed()
  File "C:\Users\Home\AppData\Local\Programs\Python\Python310\lib\asyncio\base_events.py", line 515, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed

Exception ignored in: <function _ProactorBasePipeTransport.__del__ at 0x00000235E9C1CD30>
Traceback (most recent call last):
  File "C:\Users\Home\AppData\Local\Programs\Python\Python310\lib\asyncio\proactor_events.py", line 116, in __del__
    _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
  File "C:\Users\Home\AppData\Local\Programs\Python\Python310\lib\asyncio\proactor_events.py", line 80, in __repr__
    info.append(f'fd={self._sock.fileno()}')
  File "C:\Users\Home\AppData\Local\Programs\Python\Python310\lib\asyncio\windows_utils.py", line 102, in fileno
    raise ValueError("I/O operation on closed pipe")
ValueError: I/O operation on closed pipe

Ответы

Ответов пока нет.