Может ли СУБД быть источником события для внешней программы?

Рейтинг: 1Ответов: 2Опубликовано: 08.02.2023

При работе с БД обычно источником события является клиентское приложение. Например - клиент подключился и вставил запись в БД. СУБД ответило ошибкой или приняло данные.

Но возможен ли в какой СУБД обратный сценарий - отправка клиентскому ПО уведомления, что БД что-то произошло?

Да, например в Mysql(MariaDB) есть механизм планировщика событий - но там тоже работа идёт внутри БД.

Известные мне решения: клиент просто бесконечно опрашивает некую таблицу с определённым интервалом и, при появлении в ней данных что-то делает.

Ответы

▲ 1

Для postgres есть несколько вариантов.

NOTIFY/LISTEN.

Из плюсов - работает из коробки. Минусы:

  • негарантированная доставка уведомлений
  • необходимость вручную создавать триггеры

Тут можно посмотреть рабочий пример на python.

Логическая репликация

Это гораздо более надежный способ. В этом способе лог транзакций декодируется и используется приложением. Доставка гарантированна, т.к. потребитель уведомляет сервер после получения и обработки сообщения.

Сначала нужно запустить postgres с включенной логической репликацией и с установленным плагином для декодирования WAL в json. Я использую готовый образ cо сконфигурированным postgres и установленным wal2json для простоты:

docker run -d --name "logical" -e POSTGRES_PASSWORD=123 -p 10000:5432 -d debezium/postgres:14

Пример кода, который слушает изменения:

import psycopg2
from psycopg2.errors import UndefinedObject
from psycopg2.extras import LogicalReplicationConnection

my_connection = psycopg2.connect(
    "dbname='postgres' host='localhost' port='10000' user='postgres' password='123'",
    connection_factory=LogicalReplicationConnection,
)
cur = my_connection.cursor()
try:
    cur.drop_replication_slot("wal2json_test_slot")
except UndefinedObject:
    pass
cur.create_replication_slot("wal2json_test_slot", output_plugin="wal2json")
cur.start_replication(
    slot_name="wal2json_test_slot", options={"pretty-print": 1}, decode=True
)


def consume(msg):
    print(msg.payload)
    msg.cursor.send_feedback(flush_lsn=msg.data_start)


cur.consume_stream(consume)

Теперь если выполнить операцию типа insert into table1 values (1, 'hello') получим:

{
    "change": [
        {
            "kind": "insert",
            "schema": "public",
            "table": "table1",
            "columnnames": ["i", "t"],
            "columntypes": ["integer", "text"],
            "columnvalues": [1, "hello"]
        }
    ]
}
▲ 0

PostgreSQL может генерировать уведомления Notify, отправляя их всем подписанным на канал Listen.