Помогите разобраться почему не работает асинхронность и как это исправить?
check_dns_no_class
- не работает асинхронно, выполняется поочередно каждый таск во futures
и и только после выполнения всех тасков берутся(как я понял) данные из первого выполненного таска.
Как это можно исправить? есть догадка что все блокируется subprocess
- изначально была идея все сделать через ThreadPoolExecutor
+ asyncio.gather
- но тогда не понятно как брать валидный результат не дожидаясь выполнения остальных потоков, а лучше уничтожать не завершившиеся при этом потоки.
def run(proxy_attr, log, connection):
proxy = Proxy(proxy_attr[0], proxy_attr[1], proxy_attr[2], proxy_attr[3])
log.info(f'Check proxy: {proxy.ip}:{proxy.port}:{proxy.user}:{proxy.password}')
id_proxy = add_proxy_to_database(connection, proxy)
log.info(f'Proxy: {proxy.ip}:{proxy.port}:{proxy.user}:{proxy.password} add database')
asyncio.run(proxy.check_dns(log))
async def check_dns(self, log):
dns = ['https://google.com', 'https://208.67.222.222']
protocols = ['http', 'socks5']
futures = [asyncio.create_task(check_dns_no_class(self.user, self.password, self.ip, self.port, url, protocol,
log)) for url in dns for protocol in protocols]
data = await wait_first_completed_not_none(futures)
if data is not None:
log.debug(f'[{self.ip}:{self.port}:{self.user}:{self.password}][{data["dns"]}][{data["protocol"]}] '
f'Response: PROXY CHECKED on dns')
self.status = 1
self.protocol = data['protocol']
self.dns = data['dns']
async def check_dns_no_class(user, password, ip, port, url, protocol, log):
w = "%{http_code}"
curl_url = f'curl -x "{protocol}://{user}:{password}@{ip}:{port}" -w {w} {url}'
log.info(f'[{ip}:{port}:{user}:{password}][{url}] Сurl request: {protocol}')
process = await asyncio.create_subprocess_shell(curl_url, stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
try:
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=3)
data = codecs.decode(stdout)
error = re.findall(r"curl:[^\r\n]*", codecs.decode(stderr))
except asyncio.TimeoutError:
log.warning(f'[{ip}:{port}:{user}:{password}][{url}][{protocol}] Response: Time Out')
return
code = data
if code == '000':
log.warning(f'[{ip}:{port}:{user}:{password}][{url}][{protocol}] Response: {error}')
return
else:
return {'protocol': protocol, 'dns': url}
async def wait_first_completed_not_none(futures):
while True:
# Если получили пустой список,
# сразу возвращаем None, иначе будет бесконечный цикл
if not futures:
return None
done, pending = await asyncio.wait(futures, return_when=asyncio.FIRST_COMPLETED)
# done - список завершившихся тасков
# pending - список незавершившихся тасков
# Циклом идем по списку завершившихся тасков, проверяем результат
for item in done:
result = item.result()
if result is not None:
# Останавливаем незавершившиеся таски
for pending_task in pending:
pending_task.cancel()
return result
# Если среди выполненных не было результатов не None, то идем на второй круг
# Список незавершившихся тасков становится списком тасков, из которых мы будем ожидать результата
futures = pending
Exception ignored in: <function BaseSubprocessTransport.__del__ at 0x00000235E9C071C0>
Traceback (most recent call last):
File "C:\Users\Home\AppData\Local\Programs\Python\Python310\lib\asyncio\base_subprocess.py", line 126, in __del__
self.close()
File "C:\Users\Home\AppData\Local\Programs\Python\Python310\lib\asyncio\base_subprocess.py", line 104, in close
proto.pipe.close()
File "C:\Users\Home\AppData\Local\Programs\Python\Python310\lib\asyncio\proactor_events.py", line 109, in close
self._loop.call_soon(self._call_connection_lost, None)
File "C:\Users\Home\AppData\Local\Programs\Python\Python310\lib\asyncio\base_events.py", line 753, in call_soon
self._check_closed()
File "C:\Users\Home\AppData\Local\Programs\Python\Python310\lib\asyncio\base_events.py", line 515, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Exception ignored in: <function _ProactorBasePipeTransport.__del__ at 0x00000235E9C1CD30>
Traceback (most recent call last):
File "C:\Users\Home\AppData\Local\Programs\Python\Python310\lib\asyncio\proactor_events.py", line 116, in __del__
_warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
File "C:\Users\Home\AppData\Local\Programs\Python\Python310\lib\asyncio\proactor_events.py", line 80, in __repr__
info.append(f'fd={self._sock.fileno()}')
File "C:\Users\Home\AppData\Local\Programs\Python\Python310\lib\asyncio\windows_utils.py", line 102, in fileno
raise ValueError("I/O operation on closed pipe")
ValueError: I/O operation on closed pipe
Источник: Stack Overflow на русском