PHP in 2026: Build a Reliable Real-Time Notification Pipeline with Symfony Messenger, Mercure, and Redis Streams

Real-time features are expected in modern apps, but many teams still run into the same pain points: dropped events, overloaded workers, and hard-to-debug socket failures in production. If you are building notifications, collaborative updates, or live dashboards, you need a design that handles bursts without losing reliability.

In this guide, we will build a practical Node.js event delivery architecture using uWebSockets.js for high-throughput WebSocket connections, NATS JetStream as a durable event backbone, and idempotent consumers for safe retries. We will keep the examples framework-friendly so you can adapt this to Express, Fastify, or NestJS.

Why this architecture works

Pure in-memory pub/sub is fast but fragile. Durable queues are reliable but can add latency if used alone for fanout. Combining both gives you the best of both worlds:

  • WebSocket layer for low-latency fanout to connected clients.
  • JetStream for at-least-once durability, replay, and backpressure control.
  • Idempotent handlers so duplicate deliveries are safe.

System design at a glance

  1. API receives an event-producing action (for example, “order shipped”).
  2. Event is persisted to JetStream with a stable event ID.
  3. Notification worker consumes the stream and resolves target users.
  4. Gateway pushes to online users via WebSockets.
  5. If user is offline, event remains available for replay on reconnect.

Event envelope (contract first)

A consistent event schema prevents future integration pain.

// event-envelope.js
export function makeEvent({ type, tenantId, userId, payload }) {
  return {
    id: crypto.randomUUID(),
    type, // e.g. notification.order.shipped
    tenantId,
    userId,
    payload,
    createdAt: new Date().toISOString(),
    version: 1
  };
}

Step 1, publish durable events to JetStream

Install dependencies:

npm i nats zod
// bus.js
import { connect, StringCodec } from "nats";
import { z } from "zod";

const EventSchema = z.object({
  id: z.string().uuid(),
  type: z.string(),
  tenantId: z.string(),
  userId: z.string(),
  payload: z.record(z.any()),
  createdAt: z.string(),
  version: z.number()
});

const sc = StringCodec();

export async function createBus() {
  const nc = await connect({ servers: process.env.NATS_URL ?? "nats://127.0.0.1:4222" });
  const js = nc.jetstream();

  return {
    async publish(event) {
      const valid = EventSchema.parse(event);
      const subject = `events.${valid.tenantId}.${valid.type}`;

      await js.publish(subject, sc.encode(JSON.stringify(valid)), {
        msgID: valid.id // enables de-dup windows server-side
      });

      return valid.id;
    },
    close: () => nc.close()
  };
}

Why msgID matters: if your API retries after a timeout, JetStream can suppress duplicate inserts during its configured deduplication window.

Step 2, consume safely with explicit ack + retry policy

Do not auto-ack. Acknowledge only after downstream work is successful.

// worker.js
import { connect, StringCodec } from "nats";

const sc = StringCodec();

async function startWorker() {
  const nc = await connect({ servers: process.env.NATS_URL ?? "nats://127.0.0.1:4222" });
  const js = nc.jetstream();

  const consumer = await js.consumers.get("EVENTS", "notify-worker");
  const messages = await consumer.consume({
    max_messages: 100,
    expires: 30_000
  });

  for await (const m of messages) {
    try {
      const event = JSON.parse(sc.decode(m.data));
      await deliverNotification(event); // your business logic
      m.ack();
    } catch (err) {
      console.error("delivery failed", err);
      m.nak(5_000); // retry after delay
    }
  }
}

async function deliverNotification(event) {
  // Push to websocket gateway or fallback channel
}

startWorker().catch(console.error);

Step 3, high-throughput WebSocket gateway with room mapping

The gateway should be stateless. Keep connection and user-room mappings in memory, but treat JetStream as the source of truth for replay.

// gateway.js
import uWS from "uWebSockets.js";

const rooms = new Map(); // userId -> Set(ws)

function join(userId, ws) {
  if (!rooms.has(userId)) rooms.set(userId, new Set());
  rooms.get(userId).add(ws);
}

function leave(userId, ws) {
  const set = rooms.get(userId);
  if (!set) return;
  set.delete(ws);
  if (set.size === 0) rooms.delete(userId);
}

export function broadcastToUser(userId, event) {
  const sockets = rooms.get(userId);
  if (!sockets) return 0;

  const data = JSON.stringify(event);
  let sent = 0;
  for (const ws of sockets) {
    if (ws.send(data, false, false) === 1) sent++;
  }
  return sent;
}

uWS.App()
  .ws("/ws", {
    open: (ws) => {
      // In production, extract user from JWT/session token
      const userId = ws.getUserData?.().userId ?? "unknown";
      ws.userId = userId;
      join(userId, ws);
    },
    close: (ws) => leave(ws.userId, ws)
  })
  .listen(9001, (token) => {
    if (token) console.log("WebSocket gateway running on :9001");
  });

Step 4, make delivery idempotent

At-least-once systems can redeliver messages. Guard side effects with event IDs.

// idempotency.js
import Redis from "ioredis";
const redis = new Redis(process.env.REDIS_URL);

export async function shouldProcess(eventId) {
  // SETNX: returns 1 only if key did not exist
  const ok = await redis.set(`event:done:${eventId}`, "1", "EX", 86400, "NX");
  return ok === "OK";
}

Use this before sending push/email or writing audit logs:

if (!(await shouldProcess(event.id))) {
  m.ack();
  return;
}
await deliverNotification(event);
m.ack();

Operational checklist for production

  • Backpressure: cap per-connection queue depth, then drop oldest low-priority events.
  • Heartbeats: send ping/pong and terminate stale sockets quickly.
  • Auth refresh: revalidate JWT claims on reconnect and periodically for long sessions.
  • Tenant isolation: subject naming like events.<tenant>.<type> plus scoped credentials.
  • Observability: trace publish to delivery latency (P50/P95/P99), and retry counts per event type.
  • Replay UX: on reconnect, fetch missed events since last acknowledged cursor.

A minimal reconnect strategy on client side

let retryMs = 500;

function connect() {
  const ws = new WebSocket("wss://api.example.com/ws");

  ws.onopen = () => {
    retryMs = 500;
    // optionally send lastSeenEventCursor
  };

  ws.onmessage = (e) => {
    const event = JSON.parse(e.data);
    // render event and persist last seen cursor
  };

  ws.onclose = () => {
    setTimeout(connect, retryMs);
    retryMs = Math.min(retryMs * 2, 10_000);
  };
}

connect();

Final thoughts

Reliable real-time delivery is not about choosing one magical library. It is about combining low-latency transport with durable event storage, then designing consumers to be retry-safe. With Node.js, JetStream, and a stateless WebSocket gateway, you can scale from a single app feature to a platform-level event system without rewriting from scratch.

If you implement just three things this week, make them these: durable event IDs, explicit ack/nak handling, and idempotent side effects. Those three choices eliminate most real-time incidents before they hit your users.

Comments

Leave a Reply

Your email address will not be published. Required fields are marked *

Privacy Policy · Contact · Sitemap

© 7Tech – Programming and Tech Tutorials