python, структура данных для distance matrix

Рейтинг: 0Ответов: 1Опубликовано: 08.06.2023

Необходимо выстроить матрицу расстояний между большим набором кластеров. Потом объединить ближайшие два кластера и проделывать то же самое пока не останется N кластеров (иерархическая кластеризация, пишу сам по приколу, не хочу использовать sklearn и scipy). То есть фич этой матрицы в процессе работы- она динамическая. Поэтому не хочется использовать стандартную структуру данных типа array- много потрачу на перезаписях. Есть класс Node:

class Node:
    def __init__(self, link, value):
        self.link = link
        self.value = value

    def reset(self, new_link, new_value):
        self.link = new_link
        self.value = new_value

Node олицетворяет одну ячейку матрицы расстояний, атрибуты link- это set, который показывает между какими кластерами (вернее их индексами) измерено расстояние, само расстояние записано в атрибут value. Теперь нужно придумать структуру данных для самой матрицы. Необходимо, что бы поиск наименьшего value по классам Node было быстрым и объединение классов Node (например выяснится, что кратчайшее расстояние у нас от кластера 2 до 5, значит мы выкидываем кластеры 2 и 5 и делаем новый кластер m, вносим его в матрицу расстояний) было быстрым

Ответы

▲ 0

Конечно,вот пример реализации структуры данных для матрицы расстояний и операций поиска минимального значения и объединения классов Node:

class DistanceMatrix:
    def __init__(self, num_clusters):
        self.matrix = [[Node(set(), float('inf')) for _ in range(num_clusters)] for _ in range(num_clusters)]
        self.heap = BinaryHeap()
        self.num_clusters = num_clusters

    def update_distance(self, cluster1, cluster2, distance):
        self.matrix[cluster1][cluster2].reset(set([cluster1, cluster2]), distance)
        self.heap.insert(self.matrix[cluster1][cluster2])

    def merge_clusters(self, cluster1, cluster2):
        new_cluster = self.num_clusters
        self.num_clusters += 1

        for i in range(self.num_clusters):
            if i != cluster1 and i != cluster2:
                dist1 = self.matrix[i][cluster1].value
                dist2 = self.matrix[i][cluster2].value
                min_distance = min(dist1, dist2)

                self.update_distance(i, new_cluster, min_distance)

        self.heap = BinaryHeap()

        for i in range(self.num_clusters):
            if i != new_cluster:
                self.heap.insert(self.matrix[i][new_cluster])

        return new_cluster

    def find_closest_clusters(self):
        min_distance_node = self.heap.extract_min()
        return min_distance_node.link

    def perform_clustering(self, desired_num_clusters):
        while self.num_clusters > desired_num_clusters:
            closest_clusters = self.find_closest_clusters()
            new_cluster = self.merge_clusters(closest_clusters[0], closest_clusters[1])
            print(f"Merged clusters {closest_clusters[0]} and {closest_clusters[1]} into new cluster {new_cluster}")

    def print_matrix(self):
        for row in self.matrix:
            print([node.value for node in row])

class Node:
    def __init__(self, link, value):
        self.link = link
        self.value = value

    def reset(self, new_link, new_value):
        self.link = new_link
        self.value = new_value

class BinaryHeap:
    def __init__(self):
        self.heap = []

    def insert(self, node):
        self.heap.append(node)
        self._sift_up(len(self.heap) - 1)

    def extract_min(self):
        if len(self.heap) == 0:
            return None
        min_node = self.heap[0]
        last_node = self.heap.pop()
        if len(self.heap) > 0:
            self.heap[0] = last_node
            self._sift_down(0)
        return min_node

    def _sift_up(self, index):
        parent_idx = (index - 1) // 2
        while index > 0 and self.heap[parent_idx].value > self.heap[index].value:
            self.heap[parent_idx], self.heap[index] = self.heap[index], self.heap[parent_idx]
            index = parent_idx
            parent_idx = (index - 1) // 2

    def _sift_down(self, index):
        left_child_idx = 2 * index + 1
        right_child_idx = 2 * index + 2
        smallest = index
        if left_child_idx < len(self.heap) and self.heap[left_child_idx].value < self.heap[smallest].value:
            smallest = left_child_idx
        if right_child_idx < len(self.heap) and self.heap[right_child_idx].value < self.heap[smallest].value:
            smallest = right_child_idx
        if smallest != index:
            self.heap[index], self.heap[smallest] = self.heap[smallest], self.heap[index]
            self._sift_down(smallest)

Теперь вы можете использовать класс DistanceMatrix для построения матрицы расстояний, нахождения ближайших кластеров и выполнения иерархической кластеризации. Пример использования:

# Создание объекта DistanceMatrix с заданным количеством кластеров
distance_matrix = DistanceMatrix(num_clusters)

# Обновление расстояния между кластерами
distance_matrix.update_distance(cluster1, cluster2, distance)

# Выполнение иерархической кластеризации
distance_matrix.perform_clustering(desired_num_clusters)

# Вывод матрицы расстояний
distance_matrix.print_matrix()

Пожалуйста, обратите внимание, что это только пример реализации, и вам может потребоваться внести некоторые изменения или дополнения в соответствии с вашими потребностями.