FastAPI хранение Pydantic model в redis

Рейтинг: 0Ответов: 2Опубликовано: 10.08.2023

Пишу приложение на FastAPI. Есть Pydantic model:

class Status(str, Enum):
    inactive = 'inactive'
    active = 'active'
    done = 'done'
    deprecated = 'deprecated'

class ShowTask(BaseModel):

    tsk_id: UUID
    title: constr(max_length=255)
    describe: str
    date_of_create: date
    status: Status

    class Config:
        json_encoders = {
            UUID: lambda v: str(v)  # конвертация UUID в str при сериализации в JSON
        }

        from_attributes = True

Хочу хранить данные в кеше redis. Использую библиотеку aioredis. Вот коннект:

redis = aioredis.from_url(
    settings.redis_url,
    encoding="utf-8",
    decode_responses=True,
    password=settings.redis_password)

И вот endpoint, через который получаем данные из базы и потом кешим

@task_router.get("/get-task-list", response_model=list[ShowTask])
async def get_task_list(
        skip: int | None = None,
        limit: int | None = None,
        db: AsyncSession = Depends(get_db)
):
    try:
        # Данные из кэша
        cache = await redis.get(f"task-list.{skip}.{limit}")
        if cache is not None:
            try:
                data = json.loads(cache)
                print(data)
                show_tasks = [ShowTask(**item) for item in data]
                return {
                    "status": 200,
                    "from": "redis",
                    "data": show_tasks
                }
            except json.JSONDecodeError as e:
                print("Ошибка при десериализации данных:", e)

        # Получение данных из бд
        task_list: list[ShowTask] = await service.get_multi(skip=skip, limit=limit, 
        session=db)

        # Сохранение сериализованных данных в Redis
        await redis.set(f"task-list.{skip}.{limit}", json.dumps(task_list))

        return task_list
    except UniqueConstraintError as err:
        raise HTTPException(status_code=400, detail=str(err))
    except DatabaseError as err:
        raise HTTPException(status_code=500, detail=str(err))
    except Exception as err:
        raise HTTPException(status_code=500, detail=str(err))

Не могу получить обратно из кеша объект Pydantic model. В редис сохраняется строка с экранами. Пробовал использовать pickle, но при обратной распаковке выходит ошибка, что формат не соответствует utf-8. Как сохранять в redis питоновские объекты и получать их обратно?

Ответы

▲ 1Принят

Минимальный пример кодирования списка объектов в json строку и обратного декодирования:

# Код для pydantic >= 2.*
import json
from pydantic import BaseModel, TypeAdapter
from pydantic.json import pydantic_encoder


class User(BaseModel):
    name: str
    age: int


response_list = [User(name="name", age=123)]
type_adapter = TypeAdapter(list[User])

print(f"Исходный список {response_list!r}")

encoded = type_adapter.dump_json(response_list).decode("utf-8")
print(f"JSON строка: {encoded!r}")

decoded = type_adapter.validate_json(encoded)
print(f"Декодированный список: {decoded!r}")

Вывод:

Исходный список [User(name='name', age=123)]
JSON строка: '[{"name": "name", "age": 123}]'
Декодированный список: [User(name='name', age=123)]

То что получается в encoded кладете в redis. После получения строки из redis декодируете ее через type adapter.


Полный пример на вашем классе. json_encoders убрал, т.к. на Pydantic 2 выдается предупреждение, что этот ключ конфигурации удален:

UserWarning: Valid config keys have changed in V2:
* 'json_encoders' has been removed

но и без него все прекрасно работает.

from datetime import date
from enum import Enum
from uuid import UUID, uuid4

from pydantic import BaseModel, TypeAdapter, constr


class Status(str, Enum):
    inactive = "inactive"
    active = "active"
    done = "done"
    deprecated = "deprecated"


class ShowTask(BaseModel):
    tsk_id: UUID
    title: constr(max_length=255)
    describe: str
    date_of_create: date
    status: Status


response_list = [
    ShowTask(
        tsk_id=uuid4(),
        title="Some title",
        describe="description",
        date_of_create="2023-09-10",
        status=Status("active"),
    ),
]
type_adapter = TypeAdapter(list[ShowTask])

print(f"Исходный список {response_list!r}")

encoded = type_adapter.dump_json(response_list).decode("utf-8")
print(f"JSON строка: {encoded!r}")

decoded = type_adapter.validate_json(encoded)
print(f"Декодированный список: {decoded!r}")

Вывод:

Исходный список [ShowTask(tsk_id=UUID('c1f45979-7b2c-4751-9698-950c929a7aed'), title='Some title', describe='description', date_of_create=datetime.date(2023, 9, 10), status=<Status.active: 'active'>)]
JSON строка: '[{"tsk_id":"c1f45979-7b2c-4751-9698-950c929a7aed","title":"Some title","describe":"description","date_of_create":"2023-09-10","status":"active"}]'
Декодированный список: [ShowTask(tsk_id=UUID('c1f45979-7b2c-4751-9698-950c929a7aed'), title='Some title', describe='description', date_of_create=datetime.date(2023, 9, 10), status=<Status.active: 'active'>)]
▲ 0

Короткий ответ. Могу предложить более универсальный способ. Для этого нужно использовать pickle вместо json.

 expire, key = 60, "key"
 old_serialized_response = await redis.get(key)
 if old_serialized_response is not None:
     return pickle.loads(old_serialized_response)
 new_response = await func(*args, **kwargs)
 serialized_response = pickle.dumps(new_response)
 await redis.set(key, serialized_response, ex=expire)
 return new_response

Продвинутый ответ. Также вы можете написать универсальный декоратор.

from typing import Callable, Dict, Tuple
from functools import wraps
class CacheService:
    def __init__(
        self,
        prefix_key: str = "",
        expire: int = 60,
        build_key: Callable[[Callable, Tuple, Dict], str] = build_key_default,
    ):
        self.prefix_key = prefix_key
        self.expire = expire
        self.build_key = build_key

    def caching(
        self,
        expire: int | None = None,
        prefix_key: str | None = None,
        build_key: Callable[[Callable, Tuple, Dict], str] = None,
    ):
        def wrapper(func):
            @wraps(func)
            @with_redis_client
            async def inner(
                *args,
                redis_client: Redis,
                **kwargs,
            ):
                nonlocal expire, prefix_key, build_key
                expire = expire or self.expire
                prefix_key = prefix_key or self.prefix_key
                build_key = build_key or self.build_key

                key = f"{prefix_key}{build_key(func, *args, **kwargs)}"

                old_serialized_response = await redis_client.get(key)
                if old_serialized_response is not None:
                    return pickle.loads(old_serialized_response)
                new_response = await func(*args, **kwargs)
                serialized_response = pickle.dumps(new_response)
                await redis_client.set(key, serialized_response, ex=expire)

                return new_response

            return inner

        return wrapper

У вспомогательных функций может быть другая реализация. Я напишу свою, и готов получить критику.

from typing import Callable
delimiter = ":"
def build_key_default(func: Callable, *args, **kwargs) -> str:
    key = f"{delimiter}".join([func.__name__, repr(args), repr(sorted(kwargs))])
    return key

import redis.asyncio as aioredis
from app.config.main import settings
def with_redis_client(func):
    """
    @with_redis_client
    async def func(arg, redis_client, kwarg=None):
        pass

    await func(1, kwarg=2)
    """

    async def wrapper(*args, **kwargs):
        redis_client = aioredis.Redis.from_url(url=settings.REDIS_URL)
        result = await func(*args, redis_client=redis_client, **kwargs)
        await redis_client.aclose()
        return result

    return wrapper

Наконец, самое интересное. Применяем следующим образом:

cache = CacheService()


@api_router.get("/lala")
@cache.caching(prefix_key="prefix:", expire=120)
async def lala():
    await asyncio.sleep(3)  # Первый раз: `sleep`. Второй раз: данные из кеша
    return {1: 1}

Благодаря pickle не нужно думать о том, какой тип данных должен возвращать endpoint.