Server-Sent Events plugin for Fastify. Provides first-class SSE support with clean API integration, session management, and streaming capabilities.
- 🚀 Clean API: Route-level SSE support with
{ sse: true }
- 📡 Streaming: Native support for Node.js streams and async iterators
- 🔄 Reconnection: Built-in message replay with
Last-Event-ID
- 💓 Heartbeat: Configurable keep-alive mechanism
- 🎯 Lifecycle: Full integration with Fastify hooks and error handling
- 📝 TypeScript: Complete type definitions included
- ⚡ Performance: Efficient backpressure handling
npm i @fastify/sse
const fastify = require('fastify')({ logger: true })
// Register the plugin
await fastify.register(require('@fastify/sse'))
// Create an SSE endpoint
fastify.get('/events', { sse: true }, async (request, reply) => {
// Send a message
await reply.sse.send({ data: 'Hello SSE!' })
// Send with full options
await reply.sse.send({
id: '123',
event: 'update',
data: { message: 'Hello World' },
retry: 1000
})
})
await fastify.listen({ port: 3000 })
await fastify.register(require('@fastify/sse'), {
// Optional: heartbeat interval in milliseconds (default: 30000)
heartbeatInterval: 30000,
// Optional: default serializer (default: JSON.stringify)
serializer: (data) => JSON.stringify(data)
})
// Enable SSE for a route
fastify.get('/events', { sse: true }, handler)
// With options
fastify.get('/events', {
sse: {
heartbeat: false, // Disable heartbeat for this route
serializer: customSerializer // Custom serializer for this route
}
}, handler)
Send SSE messages. Accepts various source types:
// Simple data
await reply.sse.send({ data: 'hello' })
// Full SSE message
await reply.sse.send({
id: '123',
event: 'update',
data: { message: 'Hello' },
retry: 1000
})
// Plain string
await reply.sse.send('plain text message')
// Async generator
async function* generateEvents() {
for (let i = 0; i < 10; i++) {
yield { id: i, data: `Event ${i}` }
await sleep(1000)
}
}
await reply.sse.send(generateEvents())
// Node.js Readable stream
const stream = fs.createReadStream('data.jsonl')
await reply.sse.send(stream)
// Transform existing stream
const transformStream = new Transform({
transform(chunk, encoding, callback) {
callback(null, { data: chunk.toString() })
}
})
someSource.pipe(transformStream)
await reply.sse.send(transformStream)
Create a transform stream for use in pipeline operations:
// Use with pipeline for efficient streaming
const { pipeline } = require('stream/promises')
const fs = require('fs')
fastify.get('/file-stream', { sse: true }, async (request, reply) => {
const fileStream = fs.createReadStream('data.jsonl')
// Parse each line as JSON and convert to SSE format
const parseTransform = new Transform({
transform(chunk, encoding, callback) {
const lines = chunk.toString().split('\n').filter(Boolean)
for (const line of lines) {
try {
const data = JSON.parse(line)
this.push({ id: data.id, data })
} catch (err) {
// Skip invalid JSON lines
}
}
callback()
}
})
// Stream file data through SSE
await pipeline(
fileStream,
parseTransform,
reply.sse.stream(),
reply.raw,
{ end: false }
)
})
fastify.get('/live', { sse: true }, async (request, reply) => {
// Keep connection alive (prevents automatic close)
reply.sse.keepAlive()
// Send initial message
await reply.sse.send({ data: 'Connected' })
// Set up periodic updates
const interval = setInterval(async () => {
if (reply.sse.isConnected) {
await reply.sse.send({ data: 'ping' })
} else {
clearInterval(interval)
}
}, 1000)
// Clean up when connection closes
reply.sse.onClose(() => {
clearInterval(interval)
console.log('Connection closed')
})
})
Handle client reconnections with Last-Event-ID
:
const messageHistory = []
fastify.get('/events', { sse: true }, async (request, reply) => {
// Handle replay on reconnection
await reply.sse.replay(async (lastEventId) => {
// Find messages after the last received ID
const startIndex = messageHistory.findIndex(msg => msg.id === lastEventId)
const messagesToReplay = startIndex !== -1
? messageHistory.slice(startIndex + 1)
: messageHistory
// Send missed messages
for (const message of messagesToReplay) {
await reply.sse.send(message)
}
})
// Send new message
const newMessage = { id: Date.now(), data: 'New event' }
messageHistory.push(newMessage)
await reply.sse.send(newMessage)
})
reply.sse.lastEventId
: Client's last received event IDreply.sse.isConnected
: Connection status (boolean)reply.sse.keepAlive()
: Prevent connection from auto-closingreply.sse.close()
: Manually close the connectionreply.sse.replay(callback)
: Handle message replayreply.sse.onClose(callback)
: Register close callback
Routes with { sse: true }
automatically fall back to regular handlers when the client doesn't request SSE:
fastify.get('/data', { sse: true }, async (request, reply) => {
const data = await getData()
// Check if this is an SSE request
if (request.headers.accept?.includes('text/event-stream')) {
// SSE client - stream the data
await reply.sse.send({ data })
} else {
// Regular client - return JSON
return { data }
}
})
fastify.get('/stream', { sse: true }, async (request, reply) => {
try {
async function* riskyGenerator() {
yield { data: 'before error' }
throw new Error('Something went wrong')
}
await reply.sse.send(riskyGenerator())
} catch (error) {
// Handle errors gracefully
await reply.sse.send({
event: 'error',
data: { message: 'Stream error occurred' }
})
}
})
// Plugin-level serializer
await fastify.register(require('@fastify/sse'), {
serializer: (data) => {
// Custom serialization logic
return typeof data === 'string' ? data : JSON.stringify(data)
}
})
// Route-level serializer
fastify.get('/custom', {
sse: {
serializer: (data) => `CUSTOM:${JSON.stringify(data)}`
}
}, async (request, reply) => {
await reply.sse.send({ data: 'test' }) // Outputs: "CUSTOM:\"test\""
})
Testing SSE endpoints is simplified with standard Fastify injection:
const response = await fastify.inject({
method: 'GET',
url: '/events',
headers: {
accept: 'text/event-stream'
}
})
assert.strictEqual(response.statusCode, 200)
assert.strictEqual(response.headers['content-type'], 'text/event-stream')
assert.ok(response.body.includes('data: "Hello SSE!"'))
<!DOCTYPE html>
<html>
<head>
<title>SSE Client</title>
</head>
<body>
<div id="messages"></div>
<script>
const eventSource = new EventSource('/events')
const messagesDiv = document.getElementById('messages')
eventSource.onmessage = function(event) {
const data = JSON.parse(event.data)
messagesDiv.innerHTML += '<div>' + JSON.stringify(data) + '</div>'
}
eventSource.addEventListener('update', function(event) {
console.log('Update event:', JSON.parse(event.data))
})
eventSource.onerror = function(event) {
console.error('SSE error:', event)
}
</script>
</body>
</html>
Full TypeScript support included:
import fastify from 'fastify'
import fastifySSE, { SSEMessage } from '@fastify/sse'
const app = fastify()
await app.register(fastifySSE)
app.get('/events', { sse: true }, async (request, reply) => {
const message: SSEMessage = {
id: '123',
event: 'test',
data: { hello: 'world' }
}
await reply.sse.send(message)
// TypeScript knows about SSE properties
console.log(reply.sse.isConnected) // boolean
console.log(reply.sse.lastEventId) // string | null
})
See the examples directory for complete working examples:
- Basic Usage - Simple SSE endpoints
- More examples coming soon...
Feature | fastify-sse-v2 | @fastify/sse |
---|---|---|
Basic SSE | ✅ | ✅ |
Async Iterators | ✅ | ✅ |
Stream Support | ✅ | ✅ Enhanced |
Session Management | ❌ | ✅ |
Last-Event-ID | ❌ | ✅ |
Connection Health | ❌ | ✅ |
Fastify Integration | ✅ Full | |
Testing Support | ❌ | ✅ |