Node.js 22 in 2026: Build a Production-Ready Real-Time Notification API with WebSockets, Queue Backpressure, and Structured Logging

Real-time product features are no longer optional in 2026. Users expect instant notifications for comments, payments, deployments, alerts, and workflow updates. The challenge is not opening a WebSocket, it is running one safely at scale with retries, backpressure, observability, and graceful recovery. In this practical guide, you will build a production-ready notification API using Node.js 22, PostgreSQL, Redis, and WebSockets, with patterns you can ship this week.

Architecture you can run in production

We will use a simple but robust setup:

  • Node.js 22 API for auth, publish endpoints, and WebSocket fan-out
  • PostgreSQL as source of truth for notifications
  • Redis Streams as an event bus with consumer groups
  • Worker to deliver events from stream to connected users
  • Structured logs + request IDs for debugging and tracing

This design avoids the classic anti-pattern of pushing directly from your REST handler to sockets only. Instead, your API writes durable state first, then emits events asynchronously.

Project setup

mkdir realtime-notify-api && cd realtime-notify-api
npm init -y
npm i fastify @fastify/websocket pg ioredis pino pino-pretty zod

Create server.js:

import Fastify from 'fastify';
import ws from '@fastify/websocket';
import { Pool } from 'pg';
import Redis from 'ioredis';
import { z } from 'zod';

const app = Fastify({ logger: { level: 'info' } });
await app.register(ws);

const pg = new Pool({ connectionString: process.env.DATABASE_URL });
const redis = new Redis(process.env.REDIS_URL);

const clients = new Map(); // userId -> Set<socket>

function addClient(userId, socket) {
  if (!clients.has(userId)) clients.set(userId, new Set());
  clients.get(userId).add(socket);
  socket.on('close', () => clients.get(userId)?.delete(socket));
}

function broadcastToUser(userId, payload) {
  const sockets = clients.get(userId);
  if (!sockets) return 0;
  let sent = 0;
  for (const s of sockets) {
    if (s.readyState === 1) {
      s.send(JSON.stringify(payload));
      sent++;
    }
  }
  return sent;
}

Create the notification schema

CREATE TABLE notifications (
  id BIGSERIAL PRIMARY KEY,
  user_id UUID NOT NULL,
  event_type TEXT NOT NULL,
  payload JSONB NOT NULL,
  created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
  delivered_at TIMESTAMPTZ
);

CREATE INDEX idx_notifications_user_created
  ON notifications(user_id, created_at DESC);

Auth + WebSocket connection

Keep auth simple here, you can replace with JWT verification later.

app.get('/ws', { websocket: true }, (socket, req) => {
  const userId = req.headers['x-user-id'];
  if (!userId) {
    socket.send(JSON.stringify({ error: 'missing user id' }));
    socket.close();
    return;
  }

  addClient(userId, socket);
  socket.send(JSON.stringify({ type: 'connected', userId }));
});

Publish endpoint with durable write + stream emit

This is the critical path. We validate input, write to Postgres, then append to Redis Stream. If sockets are down, the event is still stored and recoverable.

const PublishSchema = z.object({
  userId: z.string().uuid(),
  eventType: z.string().min(2),
  payload: z.record(z.any())
});

app.post('/notifications', async (req, reply) => {
  const parsed = PublishSchema.safeParse(req.body);
  if (!parsed.success) return reply.code(400).send(parsed.error.flatten());

  const { userId, eventType, payload } = parsed.data;

  const db = await pg.connect();
  try {
    await db.query('BEGIN');
    const ins = await db.query(
      `INSERT INTO notifications (user_id, event_type, payload)
       VALUES ($1, $2, $3) RETURNING id, created_at`,
      [userId, eventType, payload]
    );

    const event = {
      id: ins.rows[0].id,
      userId,
      eventType,
      payload,
      createdAt: ins.rows[0].created_at
    };

    await redis.xadd('notify:stream', '*', 'data', JSON.stringify(event));
    await db.query('COMMIT');

    return reply.code(202).send({ ok: true, eventId: event.id });
  } catch (err) {
    await db.query('ROLLBACK');
    req.log.error({ err }, 'publish failed');
    return reply.code(500).send({ ok: false });
  } finally {
    db.release();
  }
});

Worker for fan-out with backpressure

The worker consumes from a Redis consumer group. If a user is disconnected, do not drop the event, leave it durable for pull-based fetch later.

async function ensureGroup() {
  try {
    await redis.xgroup('CREATE', 'notify:stream', 'notify-workers', '$', 'MKSTREAM');
  } catch (e) {
    if (!String(e.message).includes('BUSYGROUP')) throw e;
  }
}

async function runWorker() {
  await ensureGroup();
  const consumer = `c-${process.pid}`;

  while (true) {
    const res = await redis.xreadgroup(
      'GROUP', 'notify-workers', consumer,
      'COUNT', 50, 'BLOCK', 5000,
      'STREAMS', 'notify:stream', '>'
    );

    if (!res) continue;

    for (const [, entries] of res) {
      for (const [streamId, fields] of entries) {
        const raw = fields[1];
        const event = JSON.parse(raw);
        const sent = broadcastToUser(event.userId, { type: event.eventType, ...event.payload });

        if (sent > 0) {
          await pg.query('UPDATE notifications SET delivered_at = now() WHERE id = $1', [event.id]);
        }

        await redis.xack('notify:stream', 'notify-workers', streamId);
      }
    }
  }
}

runWorker().catch((err) => {
  app.log.error({ err }, 'worker crashed');
  process.exit(1);
});

Fallback endpoint for missed notifications

Mobile users disconnect all the time. Add a pull API to fetch undelivered history.

app.get('/notifications/:userId', async (req) => {
  const { userId } = req.params;
  const { rows } = await pg.query(
    `SELECT id, event_type, payload, created_at, delivered_at
     FROM notifications
     WHERE user_id = $1
     ORDER BY created_at DESC
     LIMIT 100`,
    [userId]
  );
  return rows;
});

await app.listen({ port: 3000, host: '0.0.0.0' });

Production hardening checklist

  1. Idempotency keys on publish endpoint to avoid duplicate sends during retries.
  2. Rate limiting per tenant and per user for burst control.
  3. Heartbeat + stale socket cleanup every 30 to 60 seconds.
  4. Dead letter queue for poison events that fail repeatedly.
  5. Metrics: stream lag, queue depth, delivery latency, reconnect rate.
  6. Horizontal scaling: run multiple workers, shared Redis group.
  7. Security: JWT auth, tenant isolation, payload schema constraints.

Why this pattern works in 2026

In 2026, reliability and observability are baseline expectations. This architecture gives you both without heavy complexity. You get durable notification history, scalable fan-out, and clean recovery when clients drop. Most importantly, your API remains predictable under load because writes and delivery are decoupled.

If you are modernizing an older Node.js monolith, start with this notification slice first. It is a high-impact upgrade that improves user experience immediately while establishing strong event-driven foundations for future features like in-app inboxes, audit trails, and workflow automations.

Comments

Leave a Reply

Your email address will not be published. Required fields are marked *

Privacy Policy · Contact · Sitemap

© 7Tech – Programming and Tech Tutorials