Для postgres есть несколько вариантов.
NOTIFY/LISTEN.
Из плюсов - работает из коробки.
Минусы:
- негарантированная доставка уведомлений
- необходимость вручную создавать триггеры
Тут можно посмотреть рабочий пример на python.
Логическая репликация
Это гораздо более надежный способ. В этом способе лог транзакций декодируется и используется приложением. Доставка гарантированна, т.к. потребитель уведомляет сервер после получения и обработки сообщения.
Сначала нужно запустить postgres с включенной логической репликацией и с установленным плагином для декодирования WAL в json. Я использую готовый образ cо сконфигурированным postgres и установленным wal2json для простоты:
docker run -d --name "logical" -e POSTGRES_PASSWORD=123 -p 10000:5432 -d debezium/postgres:14
Пример кода, который слушает изменения:
import psycopg2
from psycopg2.errors import UndefinedObject
from psycopg2.extras import LogicalReplicationConnection
my_connection = psycopg2.connect(
"dbname='postgres' host='localhost' port='10000' user='postgres' password='123'",
connection_factory=LogicalReplicationConnection,
)
cur = my_connection.cursor()
try:
cur.drop_replication_slot("wal2json_test_slot")
except UndefinedObject:
pass
cur.create_replication_slot("wal2json_test_slot", output_plugin="wal2json")
cur.start_replication(
slot_name="wal2json_test_slot", options={"pretty-print": 1}, decode=True
)
def consume(msg):
print(msg.payload)
msg.cursor.send_feedback(flush_lsn=msg.data_start)
cur.consume_stream(consume)
Теперь если выполнить операцию типа insert into table1 values (1, 'hello')
получим:
{
"change": [
{
"kind": "insert",
"schema": "public",
"table": "table1",
"columnnames": ["i", "t"],
"columntypes": ["integer", "text"],
"columnvalues": [1, "hello"]
}
]
}