Как правильно остановить Thread, когда он уже не нужен?

Рейтинг: 1Ответов: 1Опубликовано: 06.07.2023

Есть класс, который комментирует новые посты в тг каналах аккаунта. Аккаунты берутся из третьего источника. Мне нужно, чтобы после удаления аккаунта из этого источника, комментирование тоже останавливалось. Я подумал, что это можно сделать, просто остановив поток. (для каждого аккаунта у меня отдельный поток) Когда аккаунт удаляется, я вызываю метод stop() Я нашёл два варианта, как можно остановить поток, но, к сожалению, ни один не рабочий(или я так думаю..).

  1. В классе определяю поле event = threading.Event(). При вызове метода stop() вызываю event.set(). Параллельно этому я проверяю в бесконечном цикле, изменилось ли значение event(по крайней мере, я пытаюсь это проверять, но исходя из того, что ничего не получается, я делаю что-то не так) Код:
class UserChecking:
    def __init__(self,session, app_id, app_hash):
        self.session = session
        self.event = threading.Event()
        self.client : TelegramClient = None
        self.app_id = app_id
        self.app_hash = app_hash
        self.thread = threading.Thread(target=self.startParsing, args=(self.session, self.app_id, self.app_hash),
                                       name=f"user_{self.session}-Thread")
        self.thread.start()
    def work(self):
        loop1 = asyncio.new_event_loop()
        asyncio.set_event_loop(loop1)
        loop1.run_until_complete(self.startChecking())

        loop2 = asyncio.new_event_loop()
        asyncio.set_event_loop(loop2)
        loop2.run_until_complete(self.startCommenting())
    def startCommenting(self):
        self.thread.start()
    async def startChecking(self):
        await self.checkForStop()
    def startParsing(self, session, app_id, app_hash):
        self.setClient(session, app_id, app_hash)
        self.client.start()
        self.client.run_until_disconnected()

    async def getChannels(self):
        async with self.client:
            for dialog in await self.client.get_dialogs():
                if dialog.is_channel:
                    await self.checkPosts(dialog)

    async def checkPosts(self, channel):
        @self.client.on(events.NewMessage(chats=channel.id))
        async def handler(event):
            message = await req(event.text)
            if message:
                try:
                    #await asyncio.sleep(random.randint(120, 300))
                    await self.client.send_message(entity=channel, message=message, comment_to=event)
                except:
                    pass

    def setClient(self,session, app_id, app_hash):
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        self.client = TelegramClient(session=f"sessions/{session}", api_id=int(app_id), api_hash=app_hash, loop=loop)
        self.client.loop.run_until_complete(self.getChannels())
    async def checkForStop(self):
        while True:
            if self.event.is_set():
                self.client.disconnect()
                return
    def stop(self):
        self.event.set()
  1. Т.к. здесь все держится на telethon и на client, то я просто в stop() вызываю self.client.disconnect(). Это не работает. Есть предположение, почему это не работает: потому что работа с клиентом происходит в отдельном потоке, а останавливаю я в другом. Если это так, то как это решается? Код:
class UserChecking:
    def __init__(self,session, app_id, app_hash):
        self.session = session
        self.client : TelegramClient = None
        self.app_id = app_id
        self.app_hash = app_hash
        self.thread = threading.Thread(target=self.startParsing, args=(self.session, self.app_id, self.app_hash),
                                       name=f"user_{self.session}-Thread")
        self.thread.start()
    def startParsing(self, session, app_id, app_hash):
        self.setClient(session, app_id, app_hash)
        self.client.start()
        self.client.run_until_disconnected()

    async def getChannels(self):
        async with self.client:
            for dialog in await self.client.get_dialogs():
                if dialog.is_channel:
                    await self.checkPosts(dialog)

    async def checkPosts(self, channel):
        @self.client.on(events.NewMessage(chats=channel.id))
        async def handler(event):
            message = await req(event.text)
            if message:
                try:
                    #await asyncio.sleep(random.randint(120, 300))
                    await self.client.send_message(entity=channel, message=message, comment_to=event)
                except:
                    pass

    def setClient(self,session, app_id, app_hash):
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        self.client = TelegramClient(session=f"sessions/{session}", api_id=int(app_id), api_hash=app_hash, loop=loop)
        self.client.loop.run_until_complete(self.getChannels())
    def stop(self):
        self.client.disconnect()

Правильно ли я мыслю? Что нужно исправить? Или же нужен совершенно другой подход?

Если нужна информация, как я создаю объекты этого класса(все происходит в синхронной функции main()):

for row in cur.execute("SELECT phoneNumber,app_id,app_hash FROM users"):
        usersToCheck.append(UserChecking(row[0],row[1],row[2])) 

Ответы

▲ 0

Спустя миллионы попыток решил проблему:

При удаления пользователя так же удалил файлы сессии, теперь при попытке оставить комментарий к посту будет выбрасываться исключение и останавливаться поток. Да, не идеально, но хоть как-то. Если кто-то знает, как остановить поток сразу же - напишите, пожалуйста

Обернул этот момент в try, catch, чтобы в консоли ошибок не было

        try:
            self.client.run_until_disconnected()
        except sqlite3.OperationalError:
            return