Проблема с SSE потоком
Реакт постоянно пересоздает поток при открытии новой вкладки и тем самым создает максимальное количество потоков (то есть 6).При этом реализовано мастер вкладка через Broadcast Channel но он ситуацию не спасает
import { useEffect, useRef } from 'react';
import { useDispatch, useSelector } from 'react-redux';
import { startStream, stopStream, addNotification } from '../../store/actions';
import { API_URL } from '../../api';
const MASTER_TTL = 60 * 1000; // 60 секунд
const HEARTBEAT_INTERVAL = 20 * 1000; // 20 секунд
const useStream = () => {
const dispatch = useDispatch();
const isStreaming = useSelector((state) => state.stream.isStreaming);
const isMasterRef = useRef(false);
const channelRef = useRef(null);
const eventSourceRef = useRef(null);
const heartbeatRef = useRef(null);
const tabId = useRef(`${Date.now()}_${Math.random()}`);
// Получаем userId для уникальности ключей
const userId = localStorage.getItem('current_user_id') || 'nouser';
const CHANNEL_NAME = `notifications_channel_${userId}`;
const MASTER_KEY = `notifications_stream_master_${userId}`;
const LAST_NOTIFICATION_KEY = `last_notification_${userId}`;
useEffect(() => {
const token = localStorage.getItem('access_token');
if (!token) return;
const channel = new BroadcastChannel(CHANNEL_NAME);
channelRef.current = channel;
// --- Reconnect SSE ---
let reconnectAttempts = 0;
const maxReconnectDelay = 2000; // 2 секунды максимум
let reconnectTimeout = null;
let isReconnecting = false;
const reconnectSSE = () => {
if (isReconnecting) return;
isReconnecting = true;
const delay = Math.min(500 * 2 ** reconnectAttempts, maxReconnectDelay);
if (reconnectTimeout) clearTimeout(reconnectTimeout);
reconnectTimeout = setTimeout(() => {
if (isMasterRef.current) {
startSSE();
reconnectAttempts++;
}
isReconnecting = false;
}, delay);
};
// Логирование для отладки
const logStatus = (msg) => {
console.log(`[useStream][tabId=${tabId.current}] ${msg}`);
};
// Проверка TTL мастера
const canBecomeMaster = () => {
const master = JSON.parse(localStorage.getItem(MASTER_KEY));
if (!master) return true;
return Date.now() - master.timestamp > MASTER_TTL;
};
// Стать мастером с задержкой и двойной проверкой
const becomeMaster = async () => {
logStatus('Пробую стать мастером...');
// Случайная задержка 300-800мс для уменьшения гонок
await new Promise(res => setTimeout(res, 300 + Math.random() * 500));
// Повторная проверка
const master = JSON.parse(localStorage.getItem(MASTER_KEY));
if (master && Date.now() - master.timestamp <= MASTER_TTL && master.tabId !== tabId.current) {
logStatus('Обнаружен новый мастер, не становлюсь мастером.');
return;
}
localStorage.setItem(MASTER_KEY, JSON.stringify({ timestamp: Date.now(), tabId: tabId.current }));
isMasterRef.current = true;
logStatus('Стал мастером!');
startSSE();
heartbeatRef.current = setInterval(() => {
localStorage.setItem(MASTER_KEY, JSON.stringify({ timestamp: Date.now(), tabId: tabId.current }));
}, HEARTBEAT_INTERVAL);
};
// Сброс мастерства
const releaseMaster = () => {
logStatus('Освобождаю мастерство');
isMasterRef.current = false;
if (heartbeatRef.current) clearInterval(heartbeatRef.current);
localStorage.removeItem(MASTER_KEY);
dispatch(stopStream());
if (eventSourceRef.current) {
eventSourceRef.current.close();
eventSourceRef.current = null;
}
};
// Запуск SSE
const startSSE = () => {
logStatus(`Запуск SSE (tabId=${tabId.current})`);
dispatch(startStream());
// Перед созданием нового EventSource всегда закрываем старый
if (eventSourceRef.current) {
// Если EventSource уже открыт, не создаём новый
if (eventSourceRef.current.readyState === 1) {
logStatus('EventSource уже открыт, не создаём новый');
return;
}
eventSourceRef.current.close();
eventSourceRef.current = null;
}
const eventSource = new EventSource(`${API_URL}/notifications/stream?token=${token}`);
eventSourceRef.current = eventSource;
eventSource.onmessage = (event) => {
reconnectAttempts = 0; // сбросить счётчик при успешном сообщении
const data = JSON.parse(event.data);
if (data.type === 'update') {
dispatch(addNotification({
tasks: {
ids: data.tasks.ids || [],
count: data.tasks.ids?.length || 0
},
comments: {
ids: data.comments.ids || [],
count: data.comments.ids?.length || 0
},
alert_expired: data.alert_expired || { ids: [], count: 0 },
reminders: data.reminders || []
}));
channel.postMessage({
type: 'notification',
payload: {
tasks: {
ids: data.tasks.ids || [],
count: data.tasks.ids?.length || 0
},
comments: {
ids: data.comments.ids || [],
count: data.comments.ids?.length || 0
},
alert_expired: data.alert_expired || { ids: [], count: 0 },
reminders: data.reminders || []
}
});
localStorage.setItem(LAST_NOTIFICATION_KEY, JSON.stringify({
tasks: {
ids: data.tasks.ids || [],
count: data.tasks.ids?.length || 0
},
comments: {
ids: data.comments.ids || [],
count: data.comments.ids?.length || 0
},
alert_expired: data.alert_expired || { ids: [], count: 0 },
reminders: data.reminders || []
}));
// Сохраняем reminders, если есть
if (data.reminders) {
localStorage.setItem(`reminders_${userId}`, JSON.stringify(data.reminders));
}
}
};
eventSource.onerror = (error) => {
logStatus('Ошибка SSE, закрываю и пробую переподключиться');
if (eventSourceRef.current) {
eventSourceRef.current.close();
eventSourceRef.current = null;
}
reconnectSSE();
};
};
// Слушаем канал
channel.onmessage = (event) => {
if (event.data?.type === 'notification') {
dispatch(addNotification(event.data.payload));
} else if (event.data?.type === 'master-check') {
if (isMasterRef.current) {
const lastNotification = localStorage.getItem(LAST_NOTIFICATION_KEY);
if (lastNotification) {
channel.postMessage({ type: 'notification', payload: JSON.parse(lastNotification) });
}
channel.postMessage({ type: 'master-alive', tabId: tabId.current });
}
}
};
// Попытка стать мастером
const tryBecomeMaster = async () => {
const master = localStorage.getItem(MASTER_KEY);
if (!master) {
await becomeMaster();
return true;
}
// Проверяем TTL
const masterObj = JSON.parse(master);
if (Date.now() - masterObj.timestamp > MASTER_TTL) {
await becomeMaster();
return true;
}
return masterObj.tabId === tabId.current;
};
Источник: Stack Overflow на русском