Next.js Discord

Discord Forum

SSE Does not close correctly.

Unanswered
Dreamgineer posted this in #help-forum
Open in Discord
I have multiple SSEs routes that uses the same manager class. It works like Publish-Subscribe system in WebSocket.
My expectations:
- Opens connection correctly ✅
- Message reaches the client correctly ✅
- When tab is closed, server closes connection and cleanup correctly. ❌
Actual Behavior:
The connection remains open and ping interval continues to stack up over time.
What I have tried:
- Polling controller.desiredSize for null, always 1
- Using cancel() to trigger removal, doesn't trigger ❌
- Using request.signal to abort it, doesn't trigger ❌
- Putting logs in every corners and looking at them, no clue ❌

Additional Context:
Attached below is going to be the original class code and how it's being called from routes.
It works fine in next dev but breaks when ran in production.
Production:
- Docker version 29.0.0, build 3d4129b
- Official latest Bun image: oven/bun:latest
- Next.js v16.0.0

2 Replies

export class EventSourceManager {
  private list: ServerEventSource[] = [];
  private id = 0;
  new(
    topic = "_global",
    {
      onDisconnect,
      signal,
    }: { onDisconnect?: () => void; signal?: AbortSignal } = {},
  ): Response {
    let timeout: NodeJS.Timeout | undefined;
    this.id++;
    if (this.id > 100000) this.id = 0;
    const id = this.id;
    console.log(` SUB ${topic}#${id} (C${this.list.length})`);

    const push = (s: ServerEventSource) => {
      this.list.push(s);
    };
    const remove = (id: number) => {
      if (this.list.findIndex((e) => e.id === id) < 0) return;
      console.log(` DSC ${topic}#${id}`);
      this.list = this.list.toSpliced(
        this.list.findIndex((e) => e.id === id),
        1,
      );
      onDisconnect?.();
    };

    signal?.addEventListener("abort", () => {
      clearInterval(timeout);
      remove(id);
    });

    return new Response(
      new ReadableStream({
        start(controller) {
          const encoder = new TextEncoder();
          // Register this client
          const res = {
            send: (data: unknown, event?: string) =>
              res.write(
                `${event ? `event: ${event}\n` : ""}data: ${JSON.stringify(data)}\n\n`,
              ),
            write: (msg: string) => controller.enqueue(encoder.encode(msg)),
            close: () => {
              try {
                controller.close();
              } catch {}
              clearTimeout(timeout);
              remove(res.id);
            },
            topic,
            id,
          };
          timeout = setInterval(() => {
            if (controller.desiredSize === null) return res.close();
            try {
              res.write(`:ping\n\n`);
            } catch {
              res.close();
            }
          }, 5000);
          push(res);
        },
        cancel() {
          clearTimeout(timeout);
          remove(id);
        },
      }),
      {
        headers: {
          "Content-Type": "text/event-stream",
          "Cache-Control": "no-cache",
          Connection: "keep-alive",
          "X-Accel-Buffering": "no",
        },
      },
    );
  }

  publish(
    data: unknown,
    { event, topic = "_global" }: { event?: string; topic?: string },
  ) {
    console.log(` PUB #${topic}`);
    setImmediate(() => {
      for (const s of this.list) {
        if (s.topic !== topic) continue;
        try {
          s.send(data, event);
        } catch {}
      }
    });
  }

  count(topic = "_global") {
    return this.list.reduce((c, s) => c + (s.topic === topic ? 1 : 0), 0);
  }
}

export const sse = new EventSourceManager();

// @/app/api/ev/route.ts
import { sse } from "@/lib/utils";

export async function GET() {
  return sse.new("artifact-ev");
}