Проблема с SSE потоком

Рейтинг: 0Ответов: 0Опубликовано: 21.07.2025

Реакт постоянно пересоздает поток при открытии новой вкладки и тем самым создает максимальное количество потоков (то есть 6).При этом реализовано мастер вкладка через Broadcast Channel но он ситуацию не спасает

import { useEffect, useRef } from 'react';
import { useDispatch, useSelector } from 'react-redux';
import { startStream, stopStream, addNotification } from '../../store/actions';
import { API_URL } from '../../api';

const MASTER_TTL = 60 * 1000; // 60 секунд
const HEARTBEAT_INTERVAL = 20 * 1000; // 20 секунд

const useStream = () => {
  const dispatch = useDispatch();
  const isStreaming = useSelector((state) => state.stream.isStreaming);
  const isMasterRef = useRef(false);
  const channelRef = useRef(null);
  const eventSourceRef = useRef(null);
  const heartbeatRef = useRef(null);
  const tabId = useRef(`${Date.now()}_${Math.random()}`);

  // Получаем userId для уникальности ключей
  const userId = localStorage.getItem('current_user_id') || 'nouser';
  const CHANNEL_NAME = `notifications_channel_${userId}`;
  const MASTER_KEY = `notifications_stream_master_${userId}`;
  const LAST_NOTIFICATION_KEY = `last_notification_${userId}`;

  useEffect(() => {
    const token = localStorage.getItem('access_token');
    if (!token) return;

    const channel = new BroadcastChannel(CHANNEL_NAME);
    channelRef.current = channel;

    // --- Reconnect SSE ---
    let reconnectAttempts = 0;
    const maxReconnectDelay = 2000; // 2 секунды максимум
    let reconnectTimeout = null;
    let isReconnecting = false;
    const reconnectSSE = () => {
      if (isReconnecting) return;
      isReconnecting = true;
      const delay = Math.min(500 * 2 ** reconnectAttempts, maxReconnectDelay);
      if (reconnectTimeout) clearTimeout(reconnectTimeout);
      reconnectTimeout = setTimeout(() => {
        if (isMasterRef.current) {
          startSSE();
          reconnectAttempts++;
        }
        isReconnecting = false;
      }, delay);
    };

    // Логирование для отладки
    const logStatus = (msg) => {
      console.log(`[useStream][tabId=${tabId.current}] ${msg}`);
    };

    // Проверка TTL мастера
    const canBecomeMaster = () => {
      const master = JSON.parse(localStorage.getItem(MASTER_KEY));
      if (!master) return true;
      return Date.now() - master.timestamp > MASTER_TTL;
    };

    // Стать мастером с задержкой и двойной проверкой
    const becomeMaster = async () => {
      logStatus('Пробую стать мастером...');
      // Случайная задержка 300-800мс для уменьшения гонок
      await new Promise(res => setTimeout(res, 300 + Math.random() * 500));
      // Повторная проверка
      const master = JSON.parse(localStorage.getItem(MASTER_KEY));
      if (master && Date.now() - master.timestamp <= MASTER_TTL && master.tabId !== tabId.current) {
        logStatus('Обнаружен новый мастер, не становлюсь мастером.');
        return;
      }
      localStorage.setItem(MASTER_KEY, JSON.stringify({ timestamp: Date.now(), tabId: tabId.current }));
      isMasterRef.current = true;
      logStatus('Стал мастером!');
      startSSE();
      heartbeatRef.current = setInterval(() => {
        localStorage.setItem(MASTER_KEY, JSON.stringify({ timestamp: Date.now(), tabId: tabId.current }));
      }, HEARTBEAT_INTERVAL);
    };

    // Сброс мастерства
    const releaseMaster = () => {
      logStatus('Освобождаю мастерство');
      isMasterRef.current = false;
      if (heartbeatRef.current) clearInterval(heartbeatRef.current);
      localStorage.removeItem(MASTER_KEY);
      dispatch(stopStream());
      if (eventSourceRef.current) {
        eventSourceRef.current.close();
        eventSourceRef.current = null;
      }
    };
    

    // Запуск SSE
    const startSSE = () => {
      logStatus(`Запуск SSE (tabId=${tabId.current})`);
      dispatch(startStream());
      // Перед созданием нового EventSource всегда закрываем старый
      if (eventSourceRef.current) {
        // Если EventSource уже открыт, не создаём новый
        if (eventSourceRef.current.readyState === 1) {
          logStatus('EventSource уже открыт, не создаём новый');
          return;
        }
        eventSourceRef.current.close();
        eventSourceRef.current = null;
      }
      const eventSource = new EventSource(`${API_URL}/notifications/stream?token=${token}`);
      eventSourceRef.current = eventSource;
      eventSource.onmessage = (event) => {
        reconnectAttempts = 0; // сбросить счётчик при успешном сообщении
        const data = JSON.parse(event.data);
        if (data.type === 'update') {
          dispatch(addNotification({
            tasks: {
              ids: data.tasks.ids || [],
              count: data.tasks.ids?.length || 0
            },
            comments: {
              ids: data.comments.ids || [],
              count: data.comments.ids?.length || 0
            },
            alert_expired: data.alert_expired || { ids: [], count: 0 },
            reminders: data.reminders || []
          }));
          channel.postMessage({
            type: 'notification',
            payload: {
              tasks: {
                ids: data.tasks.ids || [],
                count: data.tasks.ids?.length || 0
              },
              comments: {
                ids: data.comments.ids || [],
                count: data.comments.ids?.length || 0
              },
              alert_expired: data.alert_expired || { ids: [], count: 0 },
              reminders: data.reminders || []
            }
          });
          localStorage.setItem(LAST_NOTIFICATION_KEY, JSON.stringify({
            tasks: {
              ids: data.tasks.ids || [],
              count: data.tasks.ids?.length || 0
            },
            comments: {
              ids: data.comments.ids || [],
              count: data.comments.ids?.length || 0
            },
            alert_expired: data.alert_expired || { ids: [], count: 0 },
            reminders: data.reminders || []
          }));
          // Сохраняем reminders, если есть
          if (data.reminders) {
            localStorage.setItem(`reminders_${userId}`, JSON.stringify(data.reminders));
          }
        }
      };
      eventSource.onerror = (error) => {
        logStatus('Ошибка SSE, закрываю и пробую переподключиться');
        if (eventSourceRef.current) {
          eventSourceRef.current.close();
          eventSourceRef.current = null;
        }
        reconnectSSE();
      };
    };

    // Слушаем канал
    channel.onmessage = (event) => {
      if (event.data?.type === 'notification') {
        dispatch(addNotification(event.data.payload));
      } else if (event.data?.type === 'master-check') {
        if (isMasterRef.current) {
          const lastNotification = localStorage.getItem(LAST_NOTIFICATION_KEY);
          if (lastNotification) {
            channel.postMessage({ type: 'notification', payload: JSON.parse(lastNotification) });
          }
          channel.postMessage({ type: 'master-alive', tabId: tabId.current });
        }
      }
    };

    // Попытка стать мастером
    const tryBecomeMaster = async () => {
      const master = localStorage.getItem(MASTER_KEY);
      if (!master) {
        await becomeMaster();
        return true;
      }
      // Проверяем TTL
      const masterObj = JSON.parse(master);
      if (Date.now() - masterObj.timestamp > MASTER_TTL) {
        await becomeMaster();
        return true;
      }
      return masterObj.tabId === tabId.current;
    };

Ответы

Ответов пока нет.