Проблемы с Redis/socker.io в Веб-приложении Node.js

Рейтинг: 0Ответов: 0Опубликовано: 11.08.2025

Захотел реализовать фичу онлайн-уведомлений пользователям. По итогу, сюдя из логов, подключение есть как на бэке, так и на фронте, сообщения есть, но фронт не принимает сообщения!

Подробнее:

  1. Я запустил сервер Redis у себя на локальной машине.
  2. Создал файл notifications.ts на бэке, где:
/* eslint-disable @typescript-eslint/no-explicit-any */
import Redis from 'ioredis'
import { Server } from 'socket.io'
import { env } from './env'
import { logger } from './logger'

const redis = new Redis(env.REDIS_URL)

export const initNotificationSystem = (httpServer: any) => {
  const io = new Server(httpServer, {
    cors: {
      origin: env.WEBAPP_URL,
      methods: ['GET', 'POST'],
      credentials: true,
    },
  })

  // Подключение пользователей
  io.on('connection', async (socket) => {
    const userId = (socket.handshake.headers['user-id'] as string) || (socket.handshake.auth.userId as string)
    logger.info('notifications', `User ${userId} connected, joining room user:${userId}`)
    if (!userId) {
      logger.error('notifications', `Invalid userId: ${userId}`)
      return socket.disconnect()
    }

    const keys = await redis.keys(`pending:notification:${userId}:*`)
    for (const key of keys) {
      const rawNotification = await redis.get(key)
      if (rawNotification) {
        const notification = JSON.parse(rawNotification)
        socket.emit('notification', { type: 'notification', data: notification })
        await redis.del(key)
      }
    }

    await redis.sadd('online:users', userId)

    socket.on('disconnect', async () => {
      await redis.srem('online:users', userId)
      logger.info('socket', `User ${userId} disconnected`)
    })

    // Подписываемся на канал пользователя
    socket.join(`user:${userId}`)

    // Обработка прочтения уведомлений
    socket.on('markAsRead', async (notificationId) => {
      // Сохраняем в БД и публикуем событие
      await markNotificationAsRead(notificationId, userId)
    })
  })

  return io
}

export const publishNotification = async (
  userId: string,
  dbNotification: {
    link: string
    id: string
    createdAt: Date
    title: string
    userId: string
    message: string
    isRead: boolean
    expiresAt: Date
  }
) => {
  logger.info('notifications', `Publishing to channel user: ${userId}`)
  // Публикуем в Redis Stream для гарантированной доставки
  await redis.xadd('notifications:stream', '*', 'userId', userId, 'notification', JSON.stringify(dbNotification))

  // Публикуем в Redis Pub/Sub для мгновенной доставки онлайн пользователям
  await redis.publish(
    `user:${userId}`,
    JSON.stringify({ type: 'notification', data: dbNotification, event: 'notification' })
  )

  await redis.expire(`user:${userId}`, 86400)
  logger.info('notifications', `Notification published to Redis`)
}

export const markNotificationAsRead = async (notificationId: string, userId: string) => {
  // Публикуем событие обновления
  await redis.publish(
    `user:${userId}`,
    JSON.stringify({
      type: 'notification:read',
      data: { notificationId },
    })
  )
}

3. Создал файл notificationWorket.ts на бэке:

import Redis from 'ioredis'
import { env } from '../lib/env'
import { logger } from '../lib/logger'

const redis = new Redis(env.REDIS_URL)

async function processNotifications() {
  logger.info('worker', 'Worker started')
  while (true) {
    try {
      const result = await redis.xread('BLOCK', '0', 'STREAMS', 'notifications:stream', '$')

      if (!result) {
        continue
      }

      logger.info('worker', 'Processing notifications from stream')
      const [key, messages] = result[0]
      for (const [id, fields] of messages) {
        const userId = fields[1]
        const notification = JSON.parse(fields[3])
        logger.info('worker', `Processing notification for user: ${userId}`)

        // Проверяем онлайн ли пользователь
        const isOnline = await redis.sismember('online:users', userId)

        if (!isOnline) {
          logger.info('worker', `User ${userId} is offline, sending alternative notification`)
          await redis.setex(
            `pending:notification:${userId}:${notification.id}`,
            86400, // TTL = 24 часа
            JSON.stringify(notification)
          )
        }

        await redis.xdel('notifications:stream', id)
      }
    } catch (err) {
      logger.error('worker', `Error processing notifications: ${err}`)
      await new Promise((resolve) => setTimeout(resolve, 5000))
    }
  }
}

processNotifications()

4. socket.ts на фронте:

import { io, type Socket } from 'socket.io-client'
import { env } from './env'

// Типы для событий уведомлений
type NotificationEvent = {
  type: 'notification'
  data: {
    id: string
    title: string
    message: string
    link: string
    isRead: boolean
    createdAt: string
  }
}

type MarkAsReadEvent = {
  type: 'notification:read'
  data: {
    notificationId: string
  }
}

type SocketEvent = NotificationEvent | MarkAsReadEvent

// Создаем и экспортируем socket
export const socket: Socket = io(env.VITE_SOCKET_SERVER_URL, {
  transports: ['websocket'],
  withCredentials: true,
})

socket.on('connect', () => {
  console.log('Subscribed to notifications for user:', socket.auth) // <-- Логируем подключение
})

socket.on('notification', (data) => {
  console.log('Received notification:', data) // <-- Логируем получение
})

// Типизированные обработчики событий
export const setupNotificationListeners = (
  onNewNotification: (data: NotificationEvent['data']) => void,
  onMarkAsRead: (notificationId: string) => void
) => {
  socket.on('notification', (event: SocketEvent) => {
    console.log('Raw notification received:', event) // Добавьте этот лог
    if (event && event.type === 'notification') {
      console.log('Processed notification:', event.data)
    }
    if (event.type === 'notification') {
      onNewNotification(event.data)
    } else if (event.type === 'notification:read') {
      onMarkAsRead(event.data.notificationId)
    }
  })
  socket.on('connect_error', (err) => {
    console.error('Socket error:', err)
  })
}

// Утилита для аутентификации сокета
export const authenticateSocket = (userId: string) => {
  if (socket.connected) {
    socket.disconnect()
  }
  // Явно задаем параметры подключения
  socket.io.opts.extraHeaders = {
    'user-id': userId, // Передаем ID в заголовках
  }
  socket.auth = { userId } // Дублируем в auth
  socket.connect()
}

// Отключение всех слушателей при unmount компонента
export const cleanupSocketListeners = () => {
  socket.off('notification')
}

Логи на бэке при создании (вызове publishNotification) уведомления:

info trpc 2025-08-11 12:15:43
Creating notification for user: c547eff0-7d3b-46d4-8a59-a2413a64fbca
info notifications 2025-08-11 12:15:43
Publishing to channel user: c547eff0-7d3b-46d4-8a59-a2413a64fbca
info notifications 2025-08-11 12:15:43
Notification published to Redis

Логи сервера Redis:

1) "pmessage"
2) "user:*"
3) "user:c547eff0-7d3b-46d4-8a59-a2413a64fbca"
4) "{\"type\":\"notification\",\"data\":{\"id\":\"32d27833-9127-4559-9044-8313edf148e9\",\"userId\":\"c547eff0-7d3b-46d4-8a59-a2413a64fbca\",\"title\":\"\xd0\x9e\xd0\xbf\xd0\xbb\xd0\xb0\xd1\x82\xd0\xb0 \xd0\xbe\xd1\x82\xd0\xba\xd0\xbb\xd0\xbe\xd0\xbd\xd0\xb5\xd0\xbd\xd0\xb0 \xd0\xb0\xd0\xb4\xd0\xbc\xd0\xb8\xd0\xbd\xd0\xbe\xd0\xbc!\",\"message\":\"\xd0\x9c\xd0\xbe\xd0\xb6\xd0\xb5\xd1\x82\xd0\xb5 \xd0\xbd\xd0\xb0\xd0\xbf\xd0\xb8\xd1\x81\xd0\xb0\xd1\x82\xd1\x8c \xd0\xb2 \xd0\xbf\xd0\xbe\xd0\xb4\xd0\xb4\xd0\xb5\xd1\x80\xd0\xb6\xd0\xba\xd1\x83!\",\"link\":\"/support\",\"isRead\":false,\"createdAt\":\"2025-08-11T09:15:43.644Z\",\"expiresAt\":\"2025-09-10T12:15:43.644Z\"},\"event\":\"notification\"}"

Логи фронта:

Subscribed to notifications for user: {userId: 'c547eff0-7d3b-46d4-8a59-a2413a64fbca'}

Суть в том, что подключение к socker.io с обеих сторон есть, а подписки на событий нет. Помогите!

Я ожидал онлайн-получения, или отложенной отправки, уведомлений.

Ответы

Ответов пока нет.