Как сделать update таблицы в БД массивом?

Рейтинг: 2Ответов: 1Опубликовано: 16.05.2023

Подскажите пожалуйста, как можно сделать update таблицы используя массивы pd.DataFrame?

Сохранился фрагмент старой функции которая сравнивала значения таблицы из БД и таблицы которая приходит по API:

with self.connection.cursor() as cursor:
    # Values - значения датафрейма полученные по API. 
    for i in values:
        cursor.execute(f"""SELECT *
                           FROM {schema}.{tname}
                           WHERE article='{i[1]}';""")
        row = cursor.fetchall()
        # Проверка на идентичность. Проверялись только необходимые столбцы.
        if not functools.reduce(lambda x, y: x and y,
                                map(lambda p, q: p == q,
                                    (row[0][:3] + row[0][4:]),
                                    tuple(i[:3]) + tuple(i[4:])), True):
            # Если имелись расхождения, то циклом проверялось идентичность каждой ячейки и если значение отличалось, заменялось.
            for cell, new_cell, column_name in \
                zip(row[0][:3] + row[0][4:], tuple(i[:3]) + tuple(i[4:]),
                    tuple(column_name_list[:3] + column_name_list[4:])):
                if cell != new_cell:
                    cursor.execute(f"""UPDATE {schema}.{tname}
                                       SET {column_name[0]}='{new_cell}'
                                       WHERE article='{i[1]}'""")

Такой способ обновления данных крайне неэффективен, особенно когда дело доходит до таблиц с большим количеством записей. Помимо этого нужно подобную логику адаптировать под каждую таблицу, что я считаю некорректно, наверняка можно сделать унифицировано.

Сейчас я пытаюсь реализовать следующую функцию:

def update_table(self, dataframe: pd.DataFrame, dataframe_db: pd.DataFrame, tname: str, schema: str = None, pk: list or str = "*"):

    dataframe_comp = dataframe.compare(dataframe_db)

    # UPDATE ACTION HERE

Но в голову приходит только пройти циклом по датафрейму сравнения, что в целом, то же самое что было ранее, но уже можно использовать для разных таблиц. Подскажите пожалуйста, какие есть инструменты в psycopg2, SQLAlchemy которые смогут обновить данные, возможно без цикла, либо, если циклом, то сделали бы это более эффективно.

Ответы

▲ 2Принят

Попробую набросать логику без SQLAlchemy

Подготовим тестовую схему

create table public.my_test_data
(
    id            integer           not null
        primary key,
    name          varchar,
    some_property varchar,
    price         numeric default 0 not null
);

create table public.test_data_price_history
(
    item_id   integer
        references public.my_test_data
            -- Так себе отсылка, но пришлось как-то обыгрывать ваше желание удалять записи
            -- при условии сохранения истории цены
            on update cascade on delete set null,
    old_value numeric,
    new_value numeric,
    datetime  timestamp default (CURRENT_TIMESTAMP AT TIME ZONE 'UTC'::text)
);

create function price_history_update() returns trigger
as
$$
begin
    if tg_op = 'INSERT' then
        insert into test_data_price_history (item_id, new_value) values (new.id, new.price);
    elseif tg_op = 'UPDATE' then
        insert into test_data_price_history (item_id, old_value, new_value) values (new.id, old.price, new.price);
    end if;
    return new;
end;
$$ language plpgsql;


create trigger price_history_new_item
    after insert
    on public.my_test_data
    for each row
execute procedure public.price_history_update();

create trigger price_history_upd_item
    after update
    on public.my_test_data
    for each row
    when (new.price IS DISTINCT FROM old.price)
execute procedure public.price_history_update();

create type incoming_json as
(
    id            int,
    name          varchar,
    some_property varchar,
    price         numeric
);

Теперь собственно Python

import pandas as pd
import psycopg2 as pg

df1 = pd.DataFrame(
    {
        'id': [1, 2, 3, 4, 5],
        'name': ['N1', 'N2', 'N3', 'N4', 'N5'],
        'some_property': ['A', 'B', 'C', 'D', 'A\'E'],
        'price': [45.62, 41.13, 125.04, 16.08, 11.16]
     }
)


def upsert(df: pd.DataFrame):
    payload = df.to_json(orient='records')

    with pg.connect('user=me dbname=me host=192.168.150.1') as conn:
        with conn.cursor() as cur:
            try:
                cur.execute(
                    """
    with recursive data as (select * from jsonb_populate_recordset(null::incoming_json, %s::jsonb)),
                   _ as (
                       insert into my_test_data select * from data on conflict (id)
                           do update set name = excluded.name,
                               some_property = excluded.some_property,
                               price = excluded.price)
    delete
    from my_test_data mtd
    where not exists(select 1
                     from data d
                     where d.id = mtd.id)

                    """,
                    [payload]
                )
            except Exception:
                cur.connection.rollback()
                raise
            else:
                cur.connection.commit()

upsert(df1)

my_test_data

id name some_property price
1 N1 A 45.62
2 N2 B 41.13
3 N3 C 125.04
4 N4 D 16.08
5 N5 A'E 11.16

test_data_price_history

item_id old_value new_value datetime
1 null 45.62 2023-05-16 23:52:31.508051
2 null 41.13 2023-05-16 23:52:31.508051
3 null 125.04 2023-05-16 23:52:31.508051
4 null 16.08 2023-05-16 23:52:31.508051
5 null 11.16 2023-05-16 23:52:31.508051
df2 = pd.DataFrame(
    {
        'id': [1, 2, 3, 4, 6],
        'name': ['N1', 'N2', 'N3', 'N4', 'N6'],
        'some_property': ['A', 'B', 'C', 'D', 'B\'E'],
        'price': [45.62, 41.13, 113.03, 16.08, 88.0]
     }
)
upsert(df2)

my_test_data

id name some_property price
1 N1 A 45.62
2 N2 B 41.13
3 N3 C 113.03
4 N4 D 16.08
6 N6 B'E 88

test_data_price_history

item_id old_value new_value datetime
1 null 45.62 2023-05-16 23:52:31.508051
2 null 41.13 2023-05-16 23:52:31.508051
3 null 125.04 2023-05-16 23:52:31.508051
4 null 16.08 2023-05-16 23:52:31.508051
null null 11.16 2023-05-16 23:52:31.508051
3 125.04 113.03 2023-05-16 23:55:09.474256
6 null 88 2023-05-16 23:55:09.474256