Show HN: GroupMQ – A FIFO job queue for node and Redis

1 month ago 5

A fast, reliable Redis-backed per-group FIFO queue for Node + TypeScript with guaranteed job ordering and parallel processing across groups.

Website · Created by OpenPanel.dev

import Redis from "ioredis"; import { Queue, Worker } from "groupmq"; const redis = new Redis("redis://127.0.0.1:6379"); const queue = new Queue({ redis, namespace: "orders", // Will be prefixed with 'groupmq:' jobTimeoutMs: 30_000, // How long before job times out logger: true, // Enable logging (optional) }); await queue.add({ groupId: "user:42", data: { type: "charge", amount: 999 }, orderMs: Date.now(), // or event.createdAtMs maxAttempts: 5, }); const worker = new Worker({ queue, concurrency: 1, // Process 1 job at a time (can increase for parallel processing) handler: async (job) => { console.log(`Processing:`, job.data); }, }); worker.run();
  • Per-group FIFO ordering - Jobs within the same group process in strict order, perfect for user workflows, data pipelines, and sequential operations
  • Parallel processing across groups - Process multiple groups simultaneously while maintaining order within each group
  • BullMQ-compatible API - Familiar interface with enhanced group-based capabilities
  • High performance - High throughput with low latency (see benchmarks)
  • Built-in ordering strategies - Handle out-of-order job arrivals with 'none', 'scheduler', or 'in-memory' methods
  • Automatic recovery - Stalled job detection and connection error handling with exponential backoff
  • Production ready - Atomic operations, graceful shutdown, and comprehensive logging
  • Zero polling - Efficient blocking operations prevent wasteful Redis calls

GroupMQ is heavily inspired by BullMQ, one of the most popular Redis-based job queue libraries for Node.js. We've taken many core concepts and design patterns from BullMQ while adapting them for our specific use case of per-group FIFO processing.

Key differences from BullMQ:

  • Per-group FIFO ordering, jobs within the same group are processed in strict order
  • Group-based concurrency, only one job per group can be active at a time
  • Ordered processing, built-in support for orderMs timestamp-based ordering
  • Cross-group parallelism, multiple groups can be processed simultaneously
  • No job types, simplified to a single job, instead use union typed data { type: 'paint', data: { ... } } | { type: 'repair', data: { ... } }

We're grateful to the BullMQ team for their excellent work and the foundation they've provided for the Redis job queue ecosystem.

type QueueOptions = { redis: Redis; // Redis client instance (required) namespace: string; // Unique queue name, gets 'groupmq:' prefix (required) logger?: boolean | LoggerInterface; // Enable logging (default: false) jobTimeoutMs?: number; // Job processing timeout (default: 30000ms) maxAttempts?: number; // Default max retry attempts (default: 3) reserveScanLimit?: number; // Groups to scan when reserving (default: 20) keepCompleted?: number; // Number of completed jobs to retain (default: 0) keepFailed?: number; // Number of failed jobs to retain (default: 0) schedulerLockTtlMs?: number; // Scheduler lock TTL (default: 1500ms) orderingMethod?: OrderingMethod; // Ordering strategy (default: 'none') orderingWindowMs?: number; // Time window for ordering (required for non-'none' methods) orderingMaxWaitMultiplier?: number; // Max grace period multiplier for in-memory (default: 3) orderingGracePeriodDecay?: number; // Grace period decay factor for in-memory (default: 1.0) orderingMaxBatchSize?: number; // Max jobs to collect in batch for in-memory (default: 10) }; type OrderingMethod = 'none' | 'scheduler' | 'in-memory';

Ordering Methods:

  • 'none' - No ordering guarantees (fastest, zero overhead, no extra latency)
  • 'scheduler' - Redis buffering for large windows (≥1000ms, requires scheduler, adds latency)
  • 'in-memory' - Worker collection for small windows (50-500ms, no scheduler, adds latency per batch)

See Ordering Methods for detailed comparison.

type WorkerOptions<T> = { queue: Queue<T>; // Queue instance to process jobs from (required) handler: (job: ReservedJob<T>) => Promise<unknown>; // Job processing function (required) name?: string; // Worker name for logging (default: queue.name) logger?: boolean | LoggerInterface; // Enable logging (default: false) concurrency?: number; // Number of jobs to process in parallel (default: 1) heartbeatMs?: number; // Heartbeat interval (default: Math.max(1000, jobTimeoutMs/3)) onError?: (err: unknown, job?: ReservedJob<T>) => void; // Error handler maxAttempts?: number; // Max retry attempts (default: queue.maxAttempts) backoff?: BackoffStrategy; // Retry backoff function (default: exponential with jitter) enableCleanup?: boolean; // Periodic cleanup (default: true) cleanupIntervalMs?: number; // Cleanup frequency (default: 60000ms) schedulerIntervalMs?: number; // Scheduler frequency (default: adaptive) blockingTimeoutSec?: number; // Blocking reserve timeout (default: 5s) atomicCompletion?: boolean; // Atomic completion + next reserve (default: true) stalledInterval?: number; // Check if stalled every N ms (default: 30000) maxStalledCount?: number; // Fail after N stalls (default: 1) stalledGracePeriod?: number; // Grace period before considering stalled (default: 0) }; type BackoffStrategy = (attempt: number) => number; // returns delay in ms

When adding a job to the queue:

await queue.add({ groupId: string; // Required: Group ID for FIFO processing data: T; // Required: Job payload data orderMs?: number; // Timestamp for ordering (default: Date.now()) maxAttempts?: number; // Max retry attempts (default: queue.maxAttempts) jobId?: string; // Custom job ID (default: auto-generated UUID) delay?: number; // Delay in ms before job becomes available runAt?: Date | number; // Specific time to run the job repeat?: RepeatOptions; // Repeating job configuration (cron or interval) }); type RepeatOptions = | { every: number } // Repeat every N milliseconds | { pattern: string }; // Cron pattern (standard 5-field format)

Example with delay:

await queue.add({ groupId: 'user:123', data: { action: 'send-reminder' }, delay: 3600000, // Run in 1 hour });

Example with specific time:

await queue.add({ groupId: 'user:123', data: { action: 'scheduled-report' }, runAt: new Date('2025-12-31T23:59:59Z'), });

Workers support configurable concurrency to process multiple jobs in parallel from different groups:

const worker = new Worker({ queue, concurrency: 8, // Process up to 8 jobs simultaneously handler: async (job) => { // Jobs from different groups can run in parallel // Jobs from the same group still run sequentially }, });

Benefits:

  • Higher throughput for multi-group workloads
  • Efficient resource utilization
  • Still maintains per-group FIFO ordering

Considerations:

  • Each job consumes memory and resources
  • Set concurrency based on job duration and system resources
  • Monitor Redis connection pool (ioredis default: 10 connections)

Both Queue and Worker support optional logging for debugging and monitoring:

// Enable default logger const queue = new Queue({ redis, namespace: 'orders', logger: true, // Logs to console with queue name prefix }); const worker = new Worker({ queue, logger: true, // Logs to console with worker name prefix handler: async (job) => { /* ... */ }, });

Custom logger:

Works out of the box with both pino and winston

import type { LoggerInterface } from 'groupmq'; const customLogger: LoggerInterface = { debug: (msg: string, ...args: any[]) => { /* custom logging */ }, info: (msg: string, ...args: any[]) => { /* custom logging */ }, warn: (msg: string, ...args: any[]) => { /* custom logging */ }, error: (msg: string, ...args: any[]) => { /* custom logging */ }, }; const queue = new Queue({ redis, namespace: 'orders', logger: customLogger, });

What gets logged:

  • Job reservation and completion
  • Error handling and retries
  • Scheduler runs and delayed job promotions
  • Group locking and unlocking
  • Redis connection events
  • Performance warnings

Repeatable jobs (cron/interval)

GroupMQ supports simple repeatable jobs using either a fixed interval (every) or a basic cron pattern (pattern). Repeats are materialized by a lightweight scheduler that runs as part of the worker's periodic cleanup cycle.

Add a repeating job (every N ms)

await queue.add({ groupId: 'reports', data: { type: 'daily-summary' }, repeat: { every: 5000 }, // run every 5 seconds }); const worker = new Worker({ queue, handler: async (job) => { // process... }, // IMPORTANT: For timely repeats, run the scheduler frequently cleanupIntervalMs: 1000, // <= repeat.every (recommended 1–2s for 5s repeats) }); worker.run();

Add a repeating job (cron pattern)

await queue.add({ groupId: 'emails', data: { type: 'weekly-digest' }, repeat: { pattern: '0 9 * * 1-5' }, // 09:00 Mon–Fri });
await queue.removeRepeatingJob('reports', { every: 5000 }); // or await queue.removeRepeatingJob('emails', { pattern: '0 9 * * 1-5' });

Scheduler behavior and best practices

  • The worker's periodic cycle runs: cleanup(), promoteDelayedJobs(), and processRepeatingJobs().
  • Repeating jobs are enqueued during this cycle via a distributed scheduler with lock coordination.
  • Minimum practical repeat interval: ~1.5-2 seconds (controlled by schedulerLockTtlMs, default: 1500ms)
  • For sub-second repeats (not recommended in production):
    const queue = new Queue({ redis, namespace: 'fast', schedulerLockTtlMs: 50, // Allow fast scheduler lock }); const worker = new Worker({ queue, schedulerIntervalMs: 10, // Check every 10ms cleanupIntervalMs: 100, // Cleanup every 100ms handler: async (job) => { /* ... */ }, });
    ⚠️ Fast repeats (< 1s) increase Redis load and should be used sparingly.
  • The scheduler is idempotent: it updates the next run time before enqueueing to prevent double runs.
  • Each occurrence is a normal job with a fresh jobId, preserving per-group FIFO semantics.
  • You can monitor repeated runs via BullBoard using the provided adapter.
// Stop worker gracefully - waits for current job to finish await worker.close(gracefulTimeoutMs); // Wait for queue to be empty const isEmpty = await queue.waitForEmpty(timeoutMs); // Recover groups that might be stuck due to ordering delays const recoveredCount = await queue.recoverDelayedGroups();
// Job counts and status const counts = await queue.getJobCounts(); // { active: 5, waiting: 12, delayed: 3, total: 20, uniqueGroups: 8 } const activeCount = await queue.getActiveCount(); const waitingCount = await queue.getWaitingCount(); const delayedCount = await queue.getDelayedCount(); const completedCount = await queue.getCompletedCount(); const failedCount = await queue.getFailedCount(); // Get job IDs by status const activeJobIds = await queue.getActiveJobs(); const waitingJobIds = await queue.getWaitingJobs(); const delayedJobIds = await queue.getDelayedJobs(); // Get Job instances by status const completedJobs = await queue.getCompletedJobs(limit); // returns Job[] const failedJobs = await queue.getFailedJobs(limit); // Group information const groups = await queue.getUniqueGroups(); // ['user:123', 'order:456'] const groupCount = await queue.getUniqueGroupsCount(); const jobsInGroup = await queue.getGroupJobCount('user:123'); // Get specific job const job = await queue.getJob(jobId); // returns Job instance // Job manipulation await queue.remove(jobId); await queue.retry(jobId); // Re-enqueue a failed job await queue.promote(jobId); // Promote delayed job to waiting await queue.changeDelay(jobId, newDelayMs); await queue.updateData(jobId, newData); // Scheduler operations await queue.runSchedulerOnce(); // Manual scheduler run await queue.promoteDelayedJobs(); // Promote delayed jobs await queue.recoverDelayedGroups(); // Recover stuck groups // Cleanup and shutdown await queue.waitForEmpty(timeoutMs); await queue.close();

Jobs returned from queue.getJob(), queue.getCompletedJobs(), etc. have these methods:

const job = await queue.getJob(jobId); // Manipulate the job await job.remove(); await job.retry(); await job.promote(); await job.changeDelay(newDelayMs); await job.updateData(newData); await job.update(newData); // Alias for updateData // Get job state const state = await job.getState(); // 'active' | 'waiting' | 'delayed' | 'completed' | 'failed' // Serialize job const json = job.toJSON();
// Check worker status const isProcessing = worker.isProcessing(); // Get current job(s) being processed const currentJob = worker.getCurrentJob(); // { job: ReservedJob, processingTimeMs: 1500 } | null // For concurrency > 1 const currentJobs = worker.getCurrentJobs(); // [{ job: ReservedJob, processingTimeMs: 1500 }, ...] // Get worker metrics const metrics = worker.getWorkerMetrics(); // { jobsInProgress: 2, lastJobPickupTime: 1234567890, ... } // Graceful shutdown await worker.close(gracefulTimeoutMs);

Workers emit events that you can listen to:

worker.on('ready', () => { console.log('Worker is ready'); }); worker.on('completed', (job: Job) => { console.log('Job completed:', job.id); }); worker.on('failed', (job: Job) => { console.log('Job failed:', job.id, job.failedReason); }); worker.on('error', (error: Error) => { console.error('Worker error:', error); }); worker.on('closed', () => { console.log('Worker closed'); }); worker.on('graceful-timeout', (job: Job) => { console.log('Job exceeded graceful timeout:', job.id); }); // Remove event listeners worker.off('completed', handler); worker.removeAllListeners();

GroupMQ provides a BullBoard adapter for visual monitoring and management:

import { createBullBoard } from '@bull-board/api'; import { ExpressAdapter } from '@bull-board/express'; import { BullBoardGroupMQAdapter } from 'groupmq'; import express from 'express'; const serverAdapter = new ExpressAdapter(); serverAdapter.setBasePath('/admin/queues'); createBullBoard({ queues: [ new BullBoardGroupMQAdapter(queue, { displayName: 'Order Processing', description: 'Processes customer orders', readOnlyMode: false, // Allow job manipulation through UI }), ], serverAdapter, }); const app = express(); app.use('/admin/queues', serverAdapter.getRouter()); app.listen(3000, () => { console.log('BullBoard running at http://localhost:3000/admin/queues'); });

GroupMQ uses these Redis keys (all prefixed with groupmq:{namespace}:):

  • :g:{groupId}, sorted set of job IDs in a group, ordered by score (derived from orderMs and seq)
  • :ready, sorted set of group IDs that have jobs available, ordered by lowest job score
  • :job:{jobId}, hash containing job data (id, groupId, data, attempts, status, etc.)
  • :lock:{groupId}, string with job ID that currently owns the group lock (with TTL)
  • :processing, sorted set of active job IDs, ordered by deadline
  • :processing:{jobId}, hash with processing metadata (groupId, deadlineAt)
  • :delayed, sorted set of delayed jobs, ordered by runAt timestamp
  • :completed, sorted set of completed job IDs (for retention)
  • :failed, sorted set of failed job IDs (for retention)
  • :repeats, hash of repeating job definitions (groupId → config)
  1. Waiting, job is in :g:{groupId} and group is in :ready
  2. Delayed, job is in :delayed (scheduled for future)
  3. Active, job is in :processing and group is locked
  4. Completed, job is in :completed (retention)
  5. Failed, job exceeded maxAttempts, moved to :failed (retention)

The worker runs a continuous loop optimized for both single and concurrent processing:

For concurrency = 1 (sequential):

while (!stopping) { // 1. Blocking reserve (waits for job, efficient) const job = await queue.reserveBlocking(timeoutSec); // 2. Process job synchronously if (job) { await processOne(job); } // 3. Periodic scheduler run (every schedulerIntervalMs) await queue.runSchedulerOnce(); // Promotes delayed jobs, processes repeats }

For concurrency > 1 (parallel):

while (!stopping) { // 1. Run lightweight scheduler periodically await queue.runSchedulerOnce(); // 2. Try batch reservation if we have capacity const capacity = concurrency - jobsInProgress.size; if (capacity > 0) { const jobs = await queue.reserveBatch(capacity); // Process all jobs concurrently (fire and forget) for (const job of jobs) { void processOne(job); } } // 3. Blocking reserve for remaining capacity const job = await queue.reserveBlocking(blockingTimeoutSec); if (job) { void processOne(job); // Process async } }

Key optimizations:

  • Batch reservation reduces Redis round-trips for concurrent workers
  • Blocking operations prevent wasteful polling
  • Heartbeat mechanism keeps jobs alive during long processing
  • Atomic completion + next reservation reduces latency

Atomic Operations (Lua Scripts)

All critical operations use Lua scripts for atomicity:

  • enqueue.lua, adds job to group queue, adds group to ready set
  • reserve.lua, finds ready group, pops head job, locks group
  • reserve-batch.lua, reserves one job from multiple groups atomically
  • complete.lua, marks job complete, unlocks group, re-adds group to ready if more jobs
  • complete-and-reserve-next.lua, atomic completion + reservation from same group
  • retry.lua, increments attempts, re-adds job to group with backoff delay
  • remove.lua, removes job from all data structures

When a worker reserves a job:

  1. Find Ready Group: ZRANGE :ready 0 0 gets lowest-score group
  2. Check Lock: PTTL :lock:{groupId} ensures group isn't locked
  3. Pop Job: ZPOPMIN :g:{groupId} 1 gets head job atomically
  4. Lock Group: SET :lock:{groupId} {jobId} PX {timeout}
  5. Mark Processing: Add to :processing sorted set with deadline
  6. Re-add Group: If more jobs exist, ZADD :ready {score} {groupId}

When a job completes successfully:

  1. Remove from Processing: DEL :processing:{jobId}, ZREM :processing {jobId}
  2. Mark Completed: HSET :job:{jobId} status completed
  3. Add to Retention: ZADD :completed {now} {jobId}
  4. Unlock Group: DEL :lock:{groupId} (only if this job owns the lock)
  5. Check for More Jobs: ZCARD :g:{groupId}
  6. Re-add to Ready: If jobs remain, ZADD :ready {nextScore} {groupId}

The critical fix in step 6 ensures that after a job completes, the group becomes available again for other workers to pick up the next job in the queue.

Jobs are ordered using a composite score:

score = (orderMs - baseEpoch) * 1000 + seq
  • orderMs, user-provided timestamp for event ordering
  • baseEpoch, fixed epoch timestamp (1704067200000) to keep scores manageable
  • seq, auto-incrementing sequence for tiebreaking (resets daily to prevent overflow)

This ensures:

  • Jobs with earlier orderMs process first
  • Jobs with same orderMs process in submission order
  • Score is stable and sortable
  • Daily sequence reset prevents integer overflow

concurrency = 1 (Sequential):

  • Worker processes one job at a time
  • Uses blocking reserve with synchronous processing
  • Simplest mode, lowest memory, lowest Redis overhead
  • Best for: CPU-intensive jobs, resource-constrained environments

concurrency > 1 (Parallel):

  • Worker attempts batch reservation first (lower latency)
  • Processes multiple jobs concurrently (from different groups only)
  • Each job runs in parallel with its own heartbeat
  • Falls back to blocking reserve when batch is empty
  • Higher throughput, efficient for I/O-bound workloads
  • Best for: Network calls, database operations, API requests

Important: Per-group FIFO ordering is maintained regardless of concurrency level. Multiple jobs from the same group never run in parallel.

Error Handling and Retries

When a job fails:

  1. Increment Attempts: HINCRBY :job:{jobId} attempts 1
  2. Check Max Attempts: If attempts >= maxAttempts, mark as failed
  3. Calculate Backoff: Use exponential backoff strategy
  4. Re-enqueue: Add job back to :g:{groupId} with delay
  5. Unlock Group: Release lock so next job can process

If a job times out (visibility timeout expires):

  • Heartbeat mechanism extends the lock: SET :lock:{groupId} {jobId} PX {timeout}
  • If heartbeat fails, job remains locked until TTL expires
  • Cleanup cycle detects expired locks and recovers jobs

Periodic cleanup runs:

  1. Promote Delayed Jobs: Move jobs from :delayed to waiting when runAt arrives
  2. Process Repeats: Enqueue next occurrence of repeating jobs
  3. Recover Stale Locks: Find expired locks in :processing and unlock groups
  4. Recover Delayed Groups: Handle groups stuck due to ordering delays
  5. Trim Completed/Failed: Remove old completed and failed jobs per retention policy

Performance Characteristics

Latest Benchmarks (MacBook M2, 500 jobs, 4 workers, multi-process):

  • Throughput: 68-73 jobs/sec (500 jobs), 80-86 jobs/sec (5000 jobs)
  • Latency: P95 pickup ~5-5.5s, P95 processing ~45-50ms
  • Memory: ~120-145 MB per worker process
  • CPU: <1% average, <70% peak

GroupMQ maintains competitive performance while adding per-group FIFO ordering guarantees:

  • Similar throughput for group-based workloads
  • Better job ordering with guaranteed per-group FIFO processing
  • Atomic operations reduce race conditions and improve reliability

For detailed benchmark results and comparisons over time, see our Performance Benchmarks page.

Optimizations:

  • Batch Operations: reserveBatch reduces round-trips for concurrent workers
  • Blocking Operations: Efficient Redis BLPOP-style blocking prevents wasteful polling
  • Lua Scripts: All critical paths are atomic, avoiding race conditions
  • Atomic Completion: Complete job + reserve next in single operation
  • Minimal Data: Jobs store only essential fields, keeps memory low
  • Score-Based Ordering: O(log N) insertions and retrievals via sorted sets
  • Adaptive Behavior: Scheduler intervals adjust based on ordering configuration

Contributions are welcome! When making changes:

  1. Run tests and benchmarks before and after your changes to verify everything works correctly
  2. Add tests for any new features

Requires a local Redis at 127.0.0.1:6379 (no auth).

npm i npm run build npm test

Optionally:

docker run --rm -p 6379:6379 redis:7
Read Entire Article