A TypeScript queue system inspired by Yii2-Queue architecture, providing a clean abstraction for job processing with multiple storage backends and event-based job handling.
- Driver-based architecture: Swap between DB, SQS, and File drivers seamlessly
- Event-based jobs: Register handlers for job types without complex classes
- Type-safe API: Full TypeScript support with driver-specific option validation
- Multiple backends: Database, Amazon SQS, and File storage drivers
- Event system: Hook into queue lifecycle events
pnpm add adapter-queue
For SQS support:
pnpm add adapter-queue @aws-sdk/client-sqs
import { FileQueue } from 'adapter-queue';
// Define your job types with TypeScript
interface MyJobs {
'send-email': { to: string; subject: string; body: string };
'resize-image': { url: string; width: number; height: number };
'generate-report': { type: string; period: string };
}
const queue = new FileQueue<MyJobs>({ path: './queue-data' });
// Register type-safe handlers
queue.setHandlers({
'send-email': async ({ payload }) => {
// payload is automatically typed as { to: string; subject: string; body: string }
console.log(`Sending email to ${payload.to}: ${payload.subject}`);
await sendEmail(payload.to, payload.subject, payload.body);
},
'resize-image': async ({ payload }) => {
// payload is automatically typed as { url: string; width: number; height: number }
console.log(`Resizing image ${payload.url} to ${payload.width}x${payload.height}`);
await resizeImage(payload.url, payload.width, payload.height);
},
'generate-report': async ({ payload }) => {
// Handle report generation
console.log(`Generating ${payload.type} report for ${payload.period}`);
}
});
// Simple job addition
await queue.addJob('send-email', {
payload: {
to: '[email protected]',
subject: 'Welcome!',
body: 'Thanks for signing up!'
}
});
// Job with options (TTR supported by all drivers)
await queue.addJob('resize-image', {
payload: {
url: 'https://example.com/image.jpg',
width: 800,
height: 600
},
ttr: 300 // 5 minute timeout
});
// Job with delay (supported by File and SQS drivers)
await queue.addJob('generate-report', {
payload: {
type: 'monthly',
period: 'December 2024'
},
delay: 60, // 1 minute delay
ttr: 600 // 10 minute timeout
});
// Start processing jobs
await queue.run(true, 3); // Run continuously, poll every 3 seconds
// Or process jobs once and exit
await queue.run(false);
A file-based queue that stores jobs as individual files with JSON index tracking. Perfect for development and single-server applications.
import { FileQueue } from 'adapter-queue';
const queue = new FileQueue<MyJobs>({
path: './queue-data', // Directory to store queue files
dirMode: 0o755, // Directory permissions (optional)
fileMode: 0o644 // File permissions (optional)
});
// Supports: TTR, Delay
// Does not support: Priority
await queue.addJob('send-email', {
payload: { to: '[email protected]', subject: 'Test', body: 'File queue test' },
ttr: 300,
delay: 60
});
Use any database that implements the DatabaseAdapter
interface:
import { DbQueue } from 'adapter-queue';
// You provide the database adapter implementation
const dbAdapter = new YourDatabaseAdapter(); // implements DatabaseAdapter
const queue = new DbQueue<MyJobs>(dbAdapter);
// Supports: TTR, Delay, Priority (depends on adapter implementation)
await queue.addJob('send-email', {
payload: { to: '[email protected]', subject: 'Test', body: 'DB queue test' },
ttr: 300,
delay: 60,
priority: 5
});
Amazon SQS integration with native delay support:
import { SQSClient } from '@aws-sdk/client-sqs';
import { SqsQueue } from 'adapter-queue';
const sqsClient = new SQSClient({ region: 'us-east-1' });
const queue = new SqsQueue<MyJobs>(
sqsClient,
'https://sqs.us-east-1.amazonaws.com/123456789/my-queue'
);
// Supports: TTR, Delay
// Does not support: Priority (SQS FIFO queues would be needed)
await queue.addJob('send-email', {
payload: { to: '[email protected]', subject: 'Test', body: 'SQS test' },
ttr: 300,
delay: 60
// priority: 5 // ❌ TypeScript error - not supported by SQS driver
});
The library provides compile-time type safety for both payloads and driver-specific options:
interface MyJobs {
'send-email': { to: string; subject: string; body: string };
}
const fileQueue = new FileQueue<MyJobs>({ path: './data' });
const sqsQueue = new SqsQueue<MyJobs>(client, url);
// ✅ Payload is type-checked
await fileQueue.addJob('send-email', {
payload: { to: '[email protected]', subject: 'Test', body: 'Hello' }
});
// ✅ TTR and delay work with FileQueue
await fileQueue.addJob('send-email', {
payload: { to: '[email protected]', subject: 'Test', body: 'Hello' },
ttr: 300,
delay: 60
});
// ❌ TypeScript error - FileQueue doesn't support priority
await fileQueue.addJob('send-email', {
payload: { to: '[email protected]', subject: 'Test', body: 'Hello' },
priority: 5 // Error!
});
// ✅ SqsQueue supports delay but not priority
await sqsQueue.addJob('send-email', {
payload: { to: '[email protected]', subject: 'Test', body: 'Hello' },
delay: 30 // Works
});
// ❌ TypeScript error - SqsQueue doesn't support priority
await sqsQueue.addJob('send-email', {
payload: { to: '[email protected]', subject: 'Test', body: 'Hello' },
priority: 5 // Error!
});
// Job lifecycle events
queue.on('beforePush', (event) => {
console.log('About to add job:', event.name, event.payload);
});
queue.on('afterPush', (event) => {
console.log('Job added with ID:', event.id);
});
queue.on('beforeExec', (event) => {
console.log('Starting job:', event.id, event.name);
});
queue.on('afterExec', (event) => {
console.log('Job completed:', event.id, 'Result:', event.result);
});
queue.on('afterError', (event) => {
console.error('Job failed:', event.id, 'Error:', event.error);
});
To create your own database driver, implement the DatabaseAdapter
interface:
import { DatabaseAdapter, QueueJobRecord, JobMeta, JobStatus } from 'adapter-queue';
export class YourDatabaseAdapter implements DatabaseAdapter {
async insertJob(payload: Buffer, meta: JobMeta): Promise<string> {
// Insert job into your database
// Return unique job ID
}
async reserveJob(timeout: number): Promise<QueueJobRecord | null> {
// Find and reserve next available job
// Handle delay, priority, TTR logic
// Return job record or null
}
async completeJob(id: string): Promise<void> {
// Mark job as completed
}
async releaseJob(id: string): Promise<void> {
// Release job back to queue (for retry)
}
async failJob(id: string, error: string): Promise<void> {
// Mark job as failed
}
async getJobStatus(id: string): Promise<JobStatus | null> {
// Return 'waiting' | 'reserved' | 'done' | 'failed'
}
}
addJob<K>(name: K, request: { payload: JobMap[K], ...options }): Promise<string>
- Add job to queuesetHandlers(handlers: JobHandlers<JobMap>): void
- Register all job handlers with type safetyrun(repeat?: boolean, timeout?: number): Promise<void>
- Start processing jobsstatus(id: string): Promise<JobStatus>
- Get job status
- All drivers:
{ ttr?: number }
(time-to-run in seconds) - DbQueue:
{ ttr?, delay?, priority? }
(depends on adapter implementation) - SqsQueue:
{ ttr?, delay? }
(uses SQS DelaySeconds) - FileQueue:
{ ttr?, delay? }
(implements delay functionality)
interface JobMap {
'job-name': { /* payload type */ };
'another-job': { /* payload type */ };
}
// Jobs are defined as TypeScript interfaces, not classes
// Handlers are registered with queue.setHandlers()
The queue system supports plugins to extend functionality. Plugins can hook into the queue lifecycle to add features like task protection, metrics collection, distributed tracing, and more.
Prevents job loss during ECS container termination by automatically acquiring and releasing ECS Task Protection based on job activity.
Why ECS Task Protection?
In ECS environments, containers can be terminated during:
- Auto-scaling scale-in events
- Rolling deployments
- Spot instance interruptions
- Manual task stopping
Without protection, in-flight jobs are lost when the container terminates. ECS Task Protection prevents this by marking tasks as "protected" from termination while they're processing jobs.
How this plugin helps:
- Automatic: No manual protection management - activated only when needed
- Efficient: Protection is acquired when jobs start, released when idle
- Safe: Detects ECS draining and gracefully stops accepting new work
- Reliable: Auto-renews protection for long-running jobs
pnpm add adapter-queue
import { SQSQueue } from 'adapter-queue/sqs';
import { SQSClient } from '@aws-sdk/client-sqs';
import { EcsProtectionManager, ecsTaskProtection } from 'adapter-queue/plugins/ecs-protection-manager';
// Create protection manager (share across all queues in your app)
const protectionManager = new EcsProtectionManager();
const queue = new SQSQueue({
client: new SQSClient({ region: 'us-east-1' }),
queueUrl: process.env.SQS_QUEUE_URL!,
name: 'email-queue',
onFailure: 'delete', // or 'leaveInQueue'
plugins: [ecsTaskProtection({
manager: protectionManager,
defaultProtectionTimeout: 600 // 10 minutes default
})]
});
await queue.run(true, 3);
// Clean up when shutting down
await protectionManager.cleanup();
Features:
- Smart Protection Management: Acquires protection before polling, maintains it while jobs are active
- Reference Counting: Tracks active jobs per queue, only releases protection when all jobs complete
- TTR-Aware: Automatically extends protection for long-running jobs based on their TTR
- Draining Detection: Detects when ECS is draining and gracefully stops accepting new jobs
- Zero Dependencies: Uses built-in Node.js
fetch
API - Configurable Logging: Integrate with your existing logging system
Custom Logger Example:
import pino from 'pino';
const logger = pino();
const protectionManager = new EcsProtectionManager({
logger: {
log: (message) => logger.info(message),
warn: (message) => logger.warn(message),
error: (message, error) => logger.error({ error }, message)
}
});
Multiple Queues:
// Use the same protection manager across all queues
const protectionManager = new EcsProtectionManager();
const emailQueue = new SQSQueue({
client: new SQSClient({ region: 'us-east-1' }),
queueUrl: process.env.EMAIL_QUEUE_URL!,
name: 'email-queue',
onFailure: 'delete',
plugins: [ecsTaskProtection({ manager: protectionManager })]
});
const imageQueue = new SQSQueue({
client: new SQSClient({ region: 'us-east-1' }),
queueUrl: process.env.IMAGE_QUEUE_URL!,
name: 'image-queue',
onFailure: 'delete',
plugins: [ecsTaskProtection({ manager: protectionManager })] // Same manager, different plugin instance
});
// Both queues coordinate protection through the shared manager
await Promise.all([
emailQueue.run(true),
imageQueue.run(true)
]);
Plugins implement the QueuePlugin
interface and can hook into these lifecycle events:
init?()
- Called once when queue starts, return cleanup functionbeforePoll?()
- Called before polling for jobs, can return 'stop' to gracefully shut downbeforeJob?()
- Called after job is reserved but before executionafterJob?()
- Called after job completion (success or failure)
import { QueuePlugin } from 'adapter-queue';
function customPlugin(): QueuePlugin {
return {
async init({ queue }) {
console.log(`Plugin initialized for queue: ${queue.name}`);
return async () => console.log('Plugin cleanup');
},
async beforeJob(job) {
console.log(`Starting job ${job.id}`);
},
async afterJob(job, error) {
if (error) {
console.error(`Job ${job.id} failed:`, error);
} else {
console.log(`Job ${job.id} completed`);
}
}
};
}
Run the test suite:
pnpm test
Build the project:
pnpm run build
Type checking:
pnpm run lint
The library uses an event-based architecture:
- Abstract Queue - Common interface and job processing logic
- Drivers - Storage-specific implementations (DB, SQS, File)
- Event Handlers - Functions that process specific job types
- Type Safety - Compile-time validation of payloads and options
- Events - Lifecycle hooks for monitoring and cross-cutting concerns
Benefits:
- Swap drivers without changing job code
- Add new storage backends easily
- Type-safe job payloads and options
- Test with mock implementations
- Scale workers independently
- Monitor via events
MIT