Максимально быстрая фильтрация динамических данных по диапазону временного ряда

Рейтинг: 3Ответов: 3Опубликовано: 27.02.2025

Примечание

В этой задаче можно использовать любые типы данных и модули, в том числе и сторонних разработчиков (numpy, pandas, sorted containers и т.д.). Главный критерий - скорость выполнения.

Дано:

На вход нашей логики последовательно поступают одинаковые по структуре словари, содержащие информацию (параметры) некоего события. В этих словарях несколько значений являются типами float (назовем ключи param_1, param_2...) и одно из значений является временной меткой этого события (назовем ключ dt). Мы разбираем этот словарь по отдельным спискам. Каждый параметр - в отдельный список, временной ряд - в отдельный список. Таким образом под одним и тем же индексом в разных списках содержится информация об одном событии. О временных метках нам известно, что они неубывающие, то есть последующая метка может быть либо равна либо больше предыдущей. Таймдельта между рядом стоящими метками - условно-случайна, может быть микросекунды, может быть минуты, может вообще условно не быть.

Так же нам задан временной интервал (назовем duration)

Задача

Для последующей обработки нам необходимо, чтобы в вышеуказанных списках содержалась информация о событиях, которые попадают в duration относительно последнего пришедшего события. Все события старше - удаляются.

Мое решение

Берем pandas.Dataframe. Приходит событие - раскладываем словарь по столбцам датафрейма. Фильтруем по столбцу dt по маске dt > время последнего события - duration. Разбирем датафрейм на списки.

Написал для наглядности симулятор процесса:

import time
import datetime
import random
import pandas as pd

random.seed(123)
duration = 30
df = pd.DataFrame(columns=['Param_1', 'Param_2', 'Param_3', 'dt'])

for _ in range(10000):
    # получили словарь
    input_dict = {'Param_1': random.random(),
                  'Param_2': random.random(),
                  'Param_3': random.random(),
                  'dt': datetime.datetime.now()}

    # добавили в датафрейм
    df.loc[0 if df.empty else df.index[-1]+1] = input_dict.values()

    # отфильтровали
    df = df.loc[df['dt'].gt(df.dt.iloc[-1] - datetime.timedelta(seconds=duration))]

    # разобрали по спискам
    param_1 = df.Param_1.to_list()
    param_2 = df.Param_2.to_list()
    param_3 = df.Param_3.to_list()
    dt = df.dt.to_list()

    # это рандомная задержка, чтобы эмулировать случайную разницу между событиями
    time.sleep(random.random() * 3)

Вопрос

Есть варианты быстрее? Буду благодарен за любые комментарии по оптимизации кода.

Ответы

▲ 4Принят

Библиотеки вроде Pandas наверняка будут быстрее, а алгоритмически для этого может применяться двунаправленная очередь collections.deque

При добавлении в конец удаляются (вы сами удаляете) из начала записи, пока время не станет больше lasttime-duration

from collections import deque
from time import time, sleep
from random import randint

duration = 1

d = deque()
for i in range(100):
    sleep(randint(1,10)/100)
    t = time()
    while d and t-d[0]>duration:
        d.popleft()
    d.append(t)
    print(len(d))
▲ 2

Примечание: Очередь deque из ответа MBo работает в 10 раз быстрее чем Pandas в этой задаче, лучше используйте её. По сути это и есть кольцевой буфер, про который я в конце ответа пишу.


Я немного погонял ваш код (выключив time.sleep и уменьшив duration). Довольно много времени (которое сильно зависит от того, какая часть данных попадает в интервал) у вас съедает вот этот кусок кода:

    # разобрали по спискам
    param_1 = df.Param_1.to_list()
    param_2 = df.Param_2.to_list()
    param_3 = df.Param_3.to_list()
    dt = df.dt.to_list()

Так что сам то код достаточно быстрый, а вот преобразование столбцов в list съедает тучу времени. Подумайте, нужны ли вам вообще именно списки на выходе. Если брать из Pandas прямо numpy.array, которые там внутри лежат, то на это время вообще не тратится:

    # разобрали по спискам
    param_1 = df.Param_1.values
    param_2 = df.Param_2.values
    param_3 = df.Param_3.values
    dt = df.dt.values

Кстати, вот это преобразование просто лишнее:

df.loc[len(df)] = [val for val in input_dict.values()]

Можно прямо взять и присвоить:

df.loc[len(df)] = input_dict.values()

Из дальнейшего, я бы попробовал перейти от дат к timestamp, может быть будет быстрее. В остальном не думаю, что код такой уж сильно неоптимальный и особо нуждается в переработке.

Ещё можно попробовать использовать какую-то in memory базу данных. Но будет ли быстрее - не знаю.

Так то если совсем алгоритмически оптимально подходить, то, наверное, можно завести нужное число кольцевых буферов (по числу нужных списков) и гонять данные по ним, просто сдвигая указатели начала и конца. Но опять же получение из них списков не будет тривиальным. )

▲ 1

Коллеги, я нашел решение этой задачи, пока оно самое быстрое из всех предложенных (с оговорками, см. примечание в конце теста). Особая благодарность выражается @MBo за наводку на collections.deque и @CrazyElf за ценные советы и размышления. Вы натолкнули меня на текущее решение. Публикую тестовые замеры и свой вариант.

Тестовые данные

Так как данная логика работает с уже заполненным датасетом, я, для чистоты эксперимента, создал стартовые тестовые списки с временным рядом.

import time
import datetime
import random
import pandas as pd
from collections import deque
import numpy as np
from bisect import bisect

# Количество повторов в тестах
iterations = 60

# Частота значений в тестовом временном ряде
data_test_frequency = 1

# Частота значений в поступающих данных
stream_frequency = 5

# Длинна временного ряда, под который обрезаем датасет
duration = 32 * 60

# Здесь храним время тестов
total_time_first = total_time_second = total_time_third = total_time_fourth = total_time_fifth = 0

# Функция, генерирующая тестовый таймлайн и списки
def test_data(dur=duration, ds=data_test_frequency):

    random.seed(123)
    np.random.seed(123)
    start_timestamp = timestamp = datetime.datetime.now()
    timeline = [timestamp]

    while start_timestamp - timestamp <= datetime.timedelta(seconds=dur):
        timeline.append(timestamp := timestamp - datetime.timedelta(seconds=random.random()*ds))

    return (timeline[::-1],
            np.random.rand(len(timeline)).tolist(),
            np.random.rand(len(timeline)).tolist(),
            np.random.rand(len(timeline)).tolist()
            )

Первое решение

Через pandas, как и было в вопросе, код не повторяю.

Среднее время одной итерации: 0.0057396014531453455

Второе решение

Создаем четыре объекта deque. Как указано в ответе @MBo удаляем из них начальные элементы до тех пор пока начальный элемент не войдет в наше условие. Конвертируем объекты deque в списки.

ldt, l1, l2, l3 = test_data()

deq_dt = deque(ldt)
deq_par1 = deque(l1)
deq_par2 = deque(l2)
deq_par3 = deque(l3)
for _ in range(iterations):
    # получили словарь
    input_dict = {'Param_1': random.random(),
                  'Param_2': random.random(),
                  'Param_3': random.random(),
                  'dt': datetime.datetime.now()}

    
    start_time = time.time()

    # Начало измеряемого блока
    # В дальнейших примерах буду приводить только этот блок

    deq_dt.append(input_dict['dt'])
    deq_par1.append(input_dict['Param_1'])
    deq_par2.append(input_dict['Param_2'])
    deq_par3.append(input_dict['Param_3'])

    while deq_dt and input_dict['dt'] - deq_dt[0] > datetime.timedelta(seconds=duration):
        deq_dt.popleft()
        deq_par1.popleft()
        deq_par2.popleft()
        deq_par3.popleft()

    lst_dt = list(deq_dt)
    lst_par1 = list(deq_par1)
    lst_par2 = list(deq_par2)
    lst_par3 = list(deq_par3)
    #------Конец измеряемого блока

    print(len(lst_dt), second:=time.time() - start_time)
    total_time_second += second

    time.sleep(random.random() * stream_frequency)
print(f'All deque mean time: {total_time_second/iterations}')
print('df/all_deque', total_time_first/total_time_second)

Среднее время одной итерации: 0.00020250876744588216

Отношение времени варианта с Pandas к варианту с deque: 28.342483762729824

На разных настройках тестов профит колеблется от 25 до 65 раз

Третий вариант

Создаем один объект deque и четыре базовых списка. В том же цикле, в котором мы удаляем элементы из deque мы заводим счетчик, который считает количество удаленных объектов. После окончания цикла этот счетчик будет содержать индекс списков, от которых надо взять срез, чтобы отрезать временные метки, не входящие в условие. Берем четыре среза.

    counter = 0
    deq_dt.append(input_dict['dt'])
    lst_dt_c.append(input_dict['dt'])
    lst_par1_c.append(input_dict['Param_1'])
    lst_par2_c.append(input_dict['Param_2'])
    lst_par3_c.append(input_dict['Param_3'])

    while deq_dt and input_dict['dt'] - deq_dt[0] > datetime.timedelta(seconds=duration):
        counter += 1
        deq_dt.popleft()

    lst_dt_c = lst_dt_c[counter:]
    lst_par1_c = lst_par1_c[counter:]
    lst_par2_c = lst_par2_c[counter:]
    lst_par3_c = lst_par3_c[counter:]

Среднее время одной итерации: 0.00015288591384887695

Отношение времени варианта со всеми deque к варианту со счетчиком через deque: 1.3245743989603638

На разных настройках тестов профит колеблется от 20 до 40 процентов

*Я еще пробовал не вводить список на таймлайн, а конвертировать объект deque в список - этот вариант оказался медленнее, но, см. примечание в конце теста

Четвертый вариант

Как видно из предыдущего варианта, эта задача свелась к как можно более быстрому поиску индекса первого элемента, который удовлетворит попаданию в заданный duration. Я предположил, что это типовая задача и точно, оказалось, что для numpy.array существует функция np.searchsorted (не прошла тесты из-за непомерной затратности добавления элементов в numpy.array и последующей конвертации в ванильные списки), а для списков существует стандартный модуль bisect и в нем одноименная функция, которая принимает отсортированный список и какой-то элемент, для которого она возвращает индекс во входном списке, по которому можно поместить данный элемент с сохранением сортировки. Это нам подойдет.

    lst_dt_c.append(input_dict['dt'])
    lst_par1_c.append(input_dict['Param_1'])
    lst_par2_c.append(input_dict['Param_2'])
    lst_par3_c.append(input_dict['Param_3'])

    counter = bisect(lst_dt_c, input_dict['dt']-datetime.timedelta(seconds=duration))

    lst_dt_c = lst_dt_c[counter:]
    lst_par1_c = lst_par1_c[counter:]
    lst_par2_c = lst_par2_c[counter:]
    lst_par3_c = lst_par3_c[counter:]

Среднее время одной итерации: 0.00014625787734985353

Отношение времени предыдущего варианта к варианту с bisect: 1.045317466786209

На разных настройках тестов профит колеблется от 3 до 10 процентов

Примечания

Результаты сильно зависят от структуры таймлайна, от его частоты и количества объектов. Например, чем больше объектов, тем сильнее проигрывает конвертация из deque в list против срезов, при малом количестве объектов разница уже не столь заметна. Также если в какой-то момент частота входных данных становится велика и количество обрезаемых объектов становится мало (или вообще на многих итерациях ничего не обрезается), то метод со счетчиком начинает выигрывать у bisect, я думаю, потому, что bisect в любом случае ищет индекс, а счетчик сразу отвалится на первой же проверке. Так что, несмотря на то, что я старался подбирать параметры теста под свои рабочие данные, но все же какой из вариантов будет лучшим станет понятно только когда я эту логику прикручу к рабочим процессам.

Большое спасибо, что дочитали! :)