Проблемы с Redis/socker.io в Веб-приложении Node.js
Захотел реализовать фичу онлайн-уведомлений пользователям. По итогу, сюдя из логов, подключение есть как на бэке, так и на фронте, сообщения есть, но фронт не принимает сообщения!
Подробнее:
- Я запустил сервер Redis у себя на локальной машине.
- Создал файл notifications.ts на бэке, где:
/* eslint-disable @typescript-eslint/no-explicit-any */
import Redis from 'ioredis'
import { Server } from 'socket.io'
import { env } from './env'
import { logger } from './logger'
const redis = new Redis(env.REDIS_URL)
export const initNotificationSystem = (httpServer: any) => {
const io = new Server(httpServer, {
cors: {
origin: env.WEBAPP_URL,
methods: ['GET', 'POST'],
credentials: true,
},
})
// Подключение пользователей
io.on('connection', async (socket) => {
const userId = (socket.handshake.headers['user-id'] as string) || (socket.handshake.auth.userId as string)
logger.info('notifications', `User ${userId} connected, joining room user:${userId}`)
if (!userId) {
logger.error('notifications', `Invalid userId: ${userId}`)
return socket.disconnect()
}
const keys = await redis.keys(`pending:notification:${userId}:*`)
for (const key of keys) {
const rawNotification = await redis.get(key)
if (rawNotification) {
const notification = JSON.parse(rawNotification)
socket.emit('notification', { type: 'notification', data: notification })
await redis.del(key)
}
}
await redis.sadd('online:users', userId)
socket.on('disconnect', async () => {
await redis.srem('online:users', userId)
logger.info('socket', `User ${userId} disconnected`)
})
// Подписываемся на канал пользователя
socket.join(`user:${userId}`)
// Обработка прочтения уведомлений
socket.on('markAsRead', async (notificationId) => {
// Сохраняем в БД и публикуем событие
await markNotificationAsRead(notificationId, userId)
})
})
return io
}
export const publishNotification = async (
userId: string,
dbNotification: {
link: string
id: string
createdAt: Date
title: string
userId: string
message: string
isRead: boolean
expiresAt: Date
}
) => {
logger.info('notifications', `Publishing to channel user: ${userId}`)
// Публикуем в Redis Stream для гарантированной доставки
await redis.xadd('notifications:stream', '*', 'userId', userId, 'notification', JSON.stringify(dbNotification))
// Публикуем в Redis Pub/Sub для мгновенной доставки онлайн пользователям
await redis.publish(
`user:${userId}`,
JSON.stringify({ type: 'notification', data: dbNotification, event: 'notification' })
)
await redis.expire(`user:${userId}`, 86400)
logger.info('notifications', `Notification published to Redis`)
}
export const markNotificationAsRead = async (notificationId: string, userId: string) => {
// Публикуем событие обновления
await redis.publish(
`user:${userId}`,
JSON.stringify({
type: 'notification:read',
data: { notificationId },
})
)
}
3. Создал файл notificationWorket.ts на бэке:
import Redis from 'ioredis'
import { env } from '../lib/env'
import { logger } from '../lib/logger'
const redis = new Redis(env.REDIS_URL)
async function processNotifications() {
logger.info('worker', 'Worker started')
while (true) {
try {
const result = await redis.xread('BLOCK', '0', 'STREAMS', 'notifications:stream', '$')
if (!result) {
continue
}
logger.info('worker', 'Processing notifications from stream')
const [key, messages] = result[0]
for (const [id, fields] of messages) {
const userId = fields[1]
const notification = JSON.parse(fields[3])
logger.info('worker', `Processing notification for user: ${userId}`)
// Проверяем онлайн ли пользователь
const isOnline = await redis.sismember('online:users', userId)
if (!isOnline) {
logger.info('worker', `User ${userId} is offline, sending alternative notification`)
await redis.setex(
`pending:notification:${userId}:${notification.id}`,
86400, // TTL = 24 часа
JSON.stringify(notification)
)
}
await redis.xdel('notifications:stream', id)
}
} catch (err) {
logger.error('worker', `Error processing notifications: ${err}`)
await new Promise((resolve) => setTimeout(resolve, 5000))
}
}
}
processNotifications()
4. socket.ts на фронте:
import { io, type Socket } from 'socket.io-client'
import { env } from './env'
// Типы для событий уведомлений
type NotificationEvent = {
type: 'notification'
data: {
id: string
title: string
message: string
link: string
isRead: boolean
createdAt: string
}
}
type MarkAsReadEvent = {
type: 'notification:read'
data: {
notificationId: string
}
}
type SocketEvent = NotificationEvent | MarkAsReadEvent
// Создаем и экспортируем socket
export const socket: Socket = io(env.VITE_SOCKET_SERVER_URL, {
transports: ['websocket'],
withCredentials: true,
})
socket.on('connect', () => {
console.log('Subscribed to notifications for user:', socket.auth) // <-- Логируем подключение
})
socket.on('notification', (data) => {
console.log('Received notification:', data) // <-- Логируем получение
})
// Типизированные обработчики событий
export const setupNotificationListeners = (
onNewNotification: (data: NotificationEvent['data']) => void,
onMarkAsRead: (notificationId: string) => void
) => {
socket.on('notification', (event: SocketEvent) => {
console.log('Raw notification received:', event) // Добавьте этот лог
if (event && event.type === 'notification') {
console.log('Processed notification:', event.data)
}
if (event.type === 'notification') {
onNewNotification(event.data)
} else if (event.type === 'notification:read') {
onMarkAsRead(event.data.notificationId)
}
})
socket.on('connect_error', (err) => {
console.error('Socket error:', err)
})
}
// Утилита для аутентификации сокета
export const authenticateSocket = (userId: string) => {
if (socket.connected) {
socket.disconnect()
}
// Явно задаем параметры подключения
socket.io.opts.extraHeaders = {
'user-id': userId, // Передаем ID в заголовках
}
socket.auth = { userId } // Дублируем в auth
socket.connect()
}
// Отключение всех слушателей при unmount компонента
export const cleanupSocketListeners = () => {
socket.off('notification')
}
Логи на бэке при создании (вызове publishNotification) уведомления:
info trpc 2025-08-11 12:15:43
Creating notification for user: c547eff0-7d3b-46d4-8a59-a2413a64fbca
info notifications 2025-08-11 12:15:43
Publishing to channel user: c547eff0-7d3b-46d4-8a59-a2413a64fbca
info notifications 2025-08-11 12:15:43
Notification published to Redis
Логи сервера Redis:
1) "pmessage"
2) "user:*"
3) "user:c547eff0-7d3b-46d4-8a59-a2413a64fbca"
4) "{\"type\":\"notification\",\"data\":{\"id\":\"32d27833-9127-4559-9044-8313edf148e9\",\"userId\":\"c547eff0-7d3b-46d4-8a59-a2413a64fbca\",\"title\":\"\xd0\x9e\xd0\xbf\xd0\xbb\xd0\xb0\xd1\x82\xd0\xb0 \xd0\xbe\xd1\x82\xd0\xba\xd0\xbb\xd0\xbe\xd0\xbd\xd0\xb5\xd0\xbd\xd0\xb0 \xd0\xb0\xd0\xb4\xd0\xbc\xd0\xb8\xd0\xbd\xd0\xbe\xd0\xbc!\",\"message\":\"\xd0\x9c\xd0\xbe\xd0\xb6\xd0\xb5\xd1\x82\xd0\xb5 \xd0\xbd\xd0\xb0\xd0\xbf\xd0\xb8\xd1\x81\xd0\xb0\xd1\x82\xd1\x8c \xd0\xb2 \xd0\xbf\xd0\xbe\xd0\xb4\xd0\xb4\xd0\xb5\xd1\x80\xd0\xb6\xd0\xba\xd1\x83!\",\"link\":\"/support\",\"isRead\":false,\"createdAt\":\"2025-08-11T09:15:43.644Z\",\"expiresAt\":\"2025-09-10T12:15:43.644Z\"},\"event\":\"notification\"}"
Логи фронта:
Subscribed to notifications for user: {userId: 'c547eff0-7d3b-46d4-8a59-a2413a64fbca'}
Суть в том, что подключение к socker.io с обеих сторон есть, а подписки на событий нет. Помогите!
Я ожидал онлайн-получения, или отложенной отправки, уведомлений.
Источник: Stack Overflow на русском