importRedisfrom"ioredis";import{Queue,Worker}from"groupmq";constredis=newRedis("redis://127.0.0.1:6379");constqueue=newQueue({
redis,namespace: "orders",// Will be prefixed with 'groupmq:'jobTimeoutMs: 30_000,// How long before job times outlogger: true,// Enable logging (optional)});awaitqueue.add({groupId: "user:42",data: {type: "charge",amount: 999},orderMs: Date.now(),// or event.createdAtMsmaxAttempts: 5,});constworker=newWorker({
queue,concurrency: 1,// Process 1 job at a time (can increase for parallel processing)handler: async(job)=>{console.log(`Processing:`,job.data);},});worker.run();
Per-group FIFO ordering - Jobs within the same group process in strict order, perfect for user workflows, data pipelines, and sequential operations
Parallel processing across groups - Process multiple groups simultaneously while maintaining order within each group
BullMQ-compatible API - Familiar interface with enhanced group-based capabilities
High performance - High throughput with low latency (see benchmarks)
Built-in ordering strategies - Handle out-of-order job arrivals with 'none', 'scheduler', or 'in-memory' methods
Automatic recovery - Stalled job detection and connection error handling with exponential backoff
Production ready - Atomic operations, graceful shutdown, and comprehensive logging
Zero polling - Efficient blocking operations prevent wasteful Redis calls
GroupMQ is heavily inspired by BullMQ, one of the most popular Redis-based job queue libraries for Node.js. We've taken many core concepts and design patterns from BullMQ while adapting them for our specific use case of per-group FIFO processing.
Key differences from BullMQ:
Per-group FIFO ordering, jobs within the same group are processed in strict order
Group-based concurrency, only one job per group can be active at a time
Ordered processing, built-in support for orderMs timestamp-based ordering
Cross-group parallelism, multiple groups can be processed simultaneously
No job types, simplified to a single job, instead use union typed data { type: 'paint', data: { ... } } | { type: 'repair', data: { ... } }
We're grateful to the BullMQ team for their excellent work and the foundation they've provided for the Redis job queue ecosystem.
typeQueueOptions={redis: Redis;// Redis client instance (required)namespace: string;// Unique queue name, gets 'groupmq:' prefix (required)logger?: boolean|LoggerInterface;// Enable logging (default: false)jobTimeoutMs?: number;// Job processing timeout (default: 30000ms)maxAttempts?: number;// Default max retry attempts (default: 3)reserveScanLimit?: number;// Groups to scan when reserving (default: 20)keepCompleted?: number;// Number of completed jobs to retain (default: 0)keepFailed?: number;// Number of failed jobs to retain (default: 0)schedulerLockTtlMs?: number;// Scheduler lock TTL (default: 1500ms)orderingMethod?: OrderingMethod;// Ordering strategy (default: 'none')orderingWindowMs?: number;// Time window for ordering (required for non-'none' methods)orderingMaxWaitMultiplier?: number;// Max grace period multiplier for in-memory (default: 3)orderingGracePeriodDecay?: number;// Grace period decay factor for in-memory (default: 1.0)orderingMaxBatchSize?: number;// Max jobs to collect in batch for in-memory (default: 10)};typeOrderingMethod='none'|'scheduler'|'in-memory';
Ordering Methods:
'none' - No ordering guarantees (fastest, zero overhead, no extra latency)
'scheduler' - Redis buffering for large windows (≥1000ms, requires scheduler, adds latency)
'in-memory' - Worker collection for small windows (50-500ms, no scheduler, adds latency per batch)
typeWorkerOptions<T>={queue: Queue<T>;// Queue instance to process jobs from (required)handler: (job: ReservedJob<T>)=>Promise<unknown>;// Job processing function (required)name?: string;// Worker name for logging (default: queue.name)logger?: boolean|LoggerInterface;// Enable logging (default: false)concurrency?: number;// Number of jobs to process in parallel (default: 1)heartbeatMs?: number;// Heartbeat interval (default: Math.max(1000, jobTimeoutMs/3))onError?: (err: unknown,job?: ReservedJob<T>)=>void;// Error handlermaxAttempts?: number;// Max retry attempts (default: queue.maxAttempts)backoff?: BackoffStrategy;// Retry backoff function (default: exponential with jitter)enableCleanup?: boolean;// Periodic cleanup (default: true)cleanupIntervalMs?: number;// Cleanup frequency (default: 60000ms)schedulerIntervalMs?: number;// Scheduler frequency (default: adaptive)blockingTimeoutSec?: number;// Blocking reserve timeout (default: 5s)atomicCompletion?: boolean;// Atomic completion + next reserve (default: true)stalledInterval?: number;// Check if stalled every N ms (default: 30000)maxStalledCount?: number;// Fail after N stalls (default: 1)stalledGracePeriod?: number;// Grace period before considering stalled (default: 0)};typeBackoffStrategy=(attempt: number)=>number;// returns delay in ms
When adding a job to the queue:
awaitqueue.add({groupId: string;// Required: Group ID for FIFO processing
data: T;// Required: Job payload data
orderMs?: number;// Timestamp for ordering (default: Date.now())
maxAttempts?: number;// Max retry attempts (default: queue.maxAttempts)
jobId?: string;// Custom job ID (default: auto-generated UUID)
delay?: number;// Delay in ms before job becomes available
runAt?: Date|number;// Specific time to run the job
repeat?: RepeatOptions;// Repeating job configuration (cron or interval)});typeRepeatOptions=|{every: number}// Repeat every N milliseconds|{pattern: string};// Cron pattern (standard 5-field format)
Example with delay:
awaitqueue.add({groupId: 'user:123',data: {action: 'send-reminder'},delay: 3600000,// Run in 1 hour});
Workers support configurable concurrency to process multiple jobs in parallel from different groups:
constworker=newWorker({
queue,concurrency: 8,// Process up to 8 jobs simultaneouslyhandler: async(job)=>{// Jobs from different groups can run in parallel// Jobs from the same group still run sequentially},});
Benefits:
Higher throughput for multi-group workloads
Efficient resource utilization
Still maintains per-group FIFO ordering
Considerations:
Each job consumes memory and resources
Set concurrency based on job duration and system resources
Monitor Redis connection pool (ioredis default: 10 connections)
Both Queue and Worker support optional logging for debugging and monitoring:
// Enable default loggerconstqueue=newQueue({
redis,namespace: 'orders',logger: true,// Logs to console with queue name prefix});constworker=newWorker({
queue,logger: true,// Logs to console with worker name prefixhandler: async(job)=>{/* ... */},});
GroupMQ supports simple repeatable jobs using either a fixed interval (every) or a basic cron pattern (pattern). Repeats are materialized by a lightweight scheduler that runs as part of the worker's periodic cleanup cycle.
Add a repeating job (every N ms)
awaitqueue.add({groupId: 'reports',data: {type: 'daily-summary'},repeat: {every: 5000},// run every 5 seconds});constworker=newWorker({
queue,handler: async(job)=>{// process...},// IMPORTANT: For timely repeats, run the scheduler frequentlycleanupIntervalMs: 1000,// <= repeat.every (recommended 1–2s for 5s repeats)});worker.run();
For sub-second repeats (not recommended in production):
constqueue=newQueue({
redis,namespace: 'fast',schedulerLockTtlMs: 50,// Allow fast scheduler lock});constworker=newWorker({
queue,schedulerIntervalMs: 10,// Check every 10mscleanupIntervalMs: 100,// Cleanup every 100mshandler: async(job)=>{/* ... */},});
⚠️ Fast repeats (< 1s) increase Redis load and should be used sparingly.
The scheduler is idempotent: it updates the next run time before enqueueing to prevent double runs.
Each occurrence is a normal job with a fresh jobId, preserving per-group FIFO semantics.
You can monitor repeated runs via BullBoard using the provided adapter.
// Stop worker gracefully - waits for current job to finishawaitworker.close(gracefulTimeoutMs);// Wait for queue to be emptyconstisEmpty=awaitqueue.waitForEmpty(timeoutMs);// Recover groups that might be stuck due to ordering delaysconstrecoveredCount=awaitqueue.recoverDelayedGroups();
// Job counts and statusconstcounts=awaitqueue.getJobCounts();// { active: 5, waiting: 12, delayed: 3, total: 20, uniqueGroups: 8 }constactiveCount=awaitqueue.getActiveCount();constwaitingCount=awaitqueue.getWaitingCount();constdelayedCount=awaitqueue.getDelayedCount();constcompletedCount=awaitqueue.getCompletedCount();constfailedCount=awaitqueue.getFailedCount();// Get job IDs by statusconstactiveJobIds=awaitqueue.getActiveJobs();constwaitingJobIds=awaitqueue.getWaitingJobs();constdelayedJobIds=awaitqueue.getDelayedJobs();// Get Job instances by statusconstcompletedJobs=awaitqueue.getCompletedJobs(limit);// returns Job[]constfailedJobs=awaitqueue.getFailedJobs(limit);// Group informationconstgroups=awaitqueue.getUniqueGroups();// ['user:123', 'order:456']constgroupCount=awaitqueue.getUniqueGroupsCount();constjobsInGroup=awaitqueue.getGroupJobCount('user:123');// Get specific jobconstjob=awaitqueue.getJob(jobId);// returns Job instance// Job manipulationawaitqueue.remove(jobId);awaitqueue.retry(jobId);// Re-enqueue a failed jobawaitqueue.promote(jobId);// Promote delayed job to waitingawaitqueue.changeDelay(jobId,newDelayMs);awaitqueue.updateData(jobId,newData);// Scheduler operationsawaitqueue.runSchedulerOnce();// Manual scheduler runawaitqueue.promoteDelayedJobs();// Promote delayed jobsawaitqueue.recoverDelayedGroups();// Recover stuck groups// Cleanup and shutdownawaitqueue.waitForEmpty(timeoutMs);awaitqueue.close();
Jobs returned from queue.getJob(), queue.getCompletedJobs(), etc. have these methods:
constjob=awaitqueue.getJob(jobId);// Manipulate the jobawaitjob.remove();awaitjob.retry();awaitjob.promote();awaitjob.changeDelay(newDelayMs);awaitjob.updateData(newData);awaitjob.update(newData);// Alias for updateData// Get job stateconststate=awaitjob.getState();// 'active' | 'waiting' | 'delayed' | 'completed' | 'failed'// Serialize jobconstjson=job.toJSON();
// Check worker statusconstisProcessing=worker.isProcessing();// Get current job(s) being processedconstcurrentJob=worker.getCurrentJob();// { job: ReservedJob, processingTimeMs: 1500 } | null// For concurrency > 1constcurrentJobs=worker.getCurrentJobs();// [{ job: ReservedJob, processingTimeMs: 1500 }, ...]// Get worker metricsconstmetrics=worker.getWorkerMetrics();// { jobsInProgress: 2, lastJobPickupTime: 1234567890, ... }// Graceful shutdownawaitworker.close(gracefulTimeoutMs);
:lock:{groupId}, string with job ID that currently owns the group lock (with TTL)
:processing, sorted set of active job IDs, ordered by deadline
:processing:{jobId}, hash with processing metadata (groupId, deadlineAt)
:delayed, sorted set of delayed jobs, ordered by runAt timestamp
:completed, sorted set of completed job IDs (for retention)
:failed, sorted set of failed job IDs (for retention)
:repeats, hash of repeating job definitions (groupId → config)
Waiting, job is in :g:{groupId} and group is in :ready
Delayed, job is in :delayed (scheduled for future)
Active, job is in :processing and group is locked
Completed, job is in :completed (retention)
Failed, job exceeded maxAttempts, moved to :failed (retention)
The worker runs a continuous loop optimized for both single and concurrent processing:
For concurrency = 1 (sequential):
while(!stopping){// 1. Blocking reserve (waits for job, efficient)constjob=awaitqueue.reserveBlocking(timeoutSec);// 2. Process job synchronouslyif(job){awaitprocessOne(job);}// 3. Periodic scheduler run (every schedulerIntervalMs)awaitqueue.runSchedulerOnce();// Promotes delayed jobs, processes repeats}
For concurrency > 1 (parallel):
while(!stopping){// 1. Run lightweight scheduler periodicallyawaitqueue.runSchedulerOnce();// 2. Try batch reservation if we have capacityconstcapacity=concurrency-jobsInProgress.size;if(capacity>0){constjobs=awaitqueue.reserveBatch(capacity);// Process all jobs concurrently (fire and forget)for(constjobofjobs){voidprocessOne(job);}}// 3. Blocking reserve for remaining capacityconstjob=awaitqueue.reserveBlocking(blockingTimeoutSec);if(job){voidprocessOne(job);// Process async}}
Key optimizations:
Batch reservation reduces Redis round-trips for concurrent workers
Blocking operations prevent wasteful polling
Heartbeat mechanism keeps jobs alive during long processing
Atomic completion + next reservation reduces latency
Atomic Operations (Lua Scripts)
All critical operations use Lua scripts for atomicity:
enqueue.lua, adds job to group queue, adds group to ready set
reserve.lua, finds ready group, pops head job, locks group
reserve-batch.lua, reserves one job from multiple groups atomically
complete.lua, marks job complete, unlocks group, re-adds group to ready if more jobs
complete-and-reserve-next.lua, atomic completion + reservation from same group
retry.lua, increments attempts, re-adds job to group with backoff delay
remove.lua, removes job from all data structures
When a worker reserves a job:
Find Ready Group: ZRANGE :ready 0 0 gets lowest-score group
Check Lock: PTTL :lock:{groupId} ensures group isn't locked
Pop Job: ZPOPMIN :g:{groupId} 1 gets head job atomically
Lock Group: SET :lock:{groupId} {jobId} PX {timeout}
Mark Processing: Add to :processing sorted set with deadline
Re-add Group: If more jobs exist, ZADD :ready {score} {groupId}
When a job completes successfully:
Remove from Processing: DEL :processing:{jobId}, ZREM :processing {jobId}
Mark Completed: HSET :job:{jobId} status completed
Add to Retention: ZADD :completed {now} {jobId}
Unlock Group: DEL :lock:{groupId} (only if this job owns the lock)
Check for More Jobs: ZCARD :g:{groupId}
Re-add to Ready: If jobs remain, ZADD :ready {nextScore} {groupId}
The critical fix in step 6 ensures that after a job completes, the group becomes available again for other workers to pick up the next job in the queue.
Jobs are ordered using a composite score:
score=(orderMs-baseEpoch)*1000+seq
orderMs, user-provided timestamp for event ordering
baseEpoch, fixed epoch timestamp (1704067200000) to keep scores manageable
seq, auto-incrementing sequence for tiebreaking (resets daily to prevent overflow)
This ensures:
Jobs with earlier orderMs process first
Jobs with same orderMs process in submission order