import sqlite3
import logging
from datetime import datetime
class Database:
def __init__(self, db_file: str = "db.sqlite3") -> None:
self.init_db(db_file)
self.connection = sqlite3.connect(db_file)
self.cursor = self.connection.cursor()
def init_db(self, db_file: str) -> None:
"""Инициализация базы данных.
При инициализации базы данных создается таблица 'users' с полями 'id', 'user_id', 'username' и 'time'.
Если таблица уже существует, ничего не делается."""
with sqlite3.connect(db_file) as conn:
conn.execute("""CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
user_id BIGINT NOT NULL,
username VARCHAR(32) NOT NULL,
time TEXT NOT NULL)""")
conn.commit()
def check_user(self, user_id: int) -> bool:
"""Проверка существования пользователя в базе данных.
Проверяет, существует ли пользователь с указанным user_id в базе данных."""
with self.connection:
self.cursor.execute("SELECT user_id FROM users WHERE user_id = ?", (user_id,))
result = self.cursor.fetchone()
return bool(result)
def add_user(self, user_id: int, username: str) -> None:
"""Добавление нового пользователя в базу данных.
Добавляет нового пользователя с указанным user_id и username в базу данных."""
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with self.connection:
self.cursor.execute("INSERT INTO users (user_id, username, time) VALUES (?, ?, ?)", (user_id, username, current_time))
self.connection.commit()
logging.info(f"Добавлен пользователь с ID = {user_id} | username = {username}")
def check_username(self, user_id: int, username: str) -> None:
"""Проверка изменения имени пользователя.
При вводе команды /start проверяет, изменилось ли имя пользователя с user_id в базе данных.
Если имя пользователя изменилось, вызывается функция update_username для обновления имени пользователя."""
with self.connection:
self.cursor.execute("SELECT username FROM users WHERE user_id = ?", (user_id,))
result = self.cursor.fetchone()
if result and result[0] != username:
self.update_username(user_id, username)
def update_username(self, user_id: int, username: str) -> None:
"""Обновление имени пользователя в базе данных.
Обновляет имя пользователя с указанным user_id в базе данных."""
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with self.connection:
self.cursor.execute("UPDATE users SET username = ?, time = ? WHERE user_id = ?", (username, current_time, user_id))
self.connection.commit()
logging.info(f"Имя пользователя с ID = {user_id} был изменен на {username}")
Пример использоваения:
@dp.message_handler(commands="start")
async def cmd_start(message: types.Message) -> None:
user_id = message.from_user.id
username = message.from_user.username
db = Database()
if not db.check_user(user_id):
db.add_user(user_id, username)
else:
db.check_username(user_id, username)