SSE Does not close correctly.
Unanswered
Dreamgineer posted this in #help-forum
I have multiple SSEs routes that uses the same manager class. It works like Publish-Subscribe system in WebSocket.
My expectations:
- Opens connection correctly ✅
- Message reaches the client correctly ✅
- When tab is closed, server closes connection and cleanup correctly. ❌
Actual Behavior:
The connection remains open and ping interval continues to stack up over time.
What I have tried:
- Polling
- Using
- Using
- Putting logs in every corners and looking at them, no clue ❌
Additional Context:
Attached below is going to be the original class code and how it's being called from routes.
It works fine in
Production:
- Docker version 29.0.0, build 3d4129b
- Official latest Bun image:
- Next.js v16.0.0
My expectations:
- Opens connection correctly ✅
- Message reaches the client correctly ✅
- When tab is closed, server closes connection and cleanup correctly. ❌
Actual Behavior:
The connection remains open and ping interval continues to stack up over time.
What I have tried:
- Polling
controller.desiredSize for null, always 1 ❌- Using
cancel() to trigger removal, doesn't trigger ❌- Using
request.signal to abort it, doesn't trigger ❌- Putting logs in every corners and looking at them, no clue ❌
Additional Context:
Attached below is going to be the original class code and how it's being called from routes.
It works fine in
next dev but breaks when ran in production.Production:
- Docker version 29.0.0, build 3d4129b
- Official latest Bun image:
oven/bun:latest- Next.js v16.0.0
2 Replies
export class EventSourceManager {
private list: ServerEventSource[] = [];
private id = 0;
new(
topic = "_global",
{
onDisconnect,
signal,
}: { onDisconnect?: () => void; signal?: AbortSignal } = {},
): Response {
let timeout: NodeJS.Timeout | undefined;
this.id++;
if (this.id > 100000) this.id = 0;
const id = this.id;
console.log(` SUB ${topic}#${id} (C${this.list.length})`);
const push = (s: ServerEventSource) => {
this.list.push(s);
};
const remove = (id: number) => {
if (this.list.findIndex((e) => e.id === id) < 0) return;
console.log(` DSC ${topic}#${id}`);
this.list = this.list.toSpliced(
this.list.findIndex((e) => e.id === id),
1,
);
onDisconnect?.();
};
signal?.addEventListener("abort", () => {
clearInterval(timeout);
remove(id);
});
return new Response(
new ReadableStream({
start(controller) {
const encoder = new TextEncoder();
// Register this client
const res = {
send: (data: unknown, event?: string) =>
res.write(
`${event ? `event: ${event}\n` : ""}data: ${JSON.stringify(data)}\n\n`,
),
write: (msg: string) => controller.enqueue(encoder.encode(msg)),
close: () => {
try {
controller.close();
} catch {}
clearTimeout(timeout);
remove(res.id);
},
topic,
id,
};
timeout = setInterval(() => {
if (controller.desiredSize === null) return res.close();
try {
res.write(`:ping\n\n`);
} catch {
res.close();
}
}, 5000);
push(res);
},
cancel() {
clearTimeout(timeout);
remove(id);
},
}),
{
headers: { "Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
"X-Accel-Buffering": "no",
},
},
);
}
publish(
data: unknown,
{ event, topic = "_global" }: { event?: string; topic?: string },
) {
console.log(` PUB #${topic}`);
setImmediate(() => {
for (const s of this.list) {
if (s.topic !== topic) continue;
try {
s.send(data, event);
} catch {}
}
});
}
count(topic = "_global") {
return this.list.reduce((c, s) => c + (s.topic === topic ? 1 : 0), 0);
}
}
export const sse = new EventSourceManager();
// @/app/api/ev/route.ts
import { sse } from "@/lib/utils";
export async function GET() {
return sse.new("artifact-ev");
}