Job Queue Best Practices
When working with multiple jobs, proper priority and queue setup is crucial for system performance and reliability. Here are the best practices:
1. Queue Separation Strategy
Separate queues by business domain and criticality:
// config/queue.ts
const queueConfig: QueueConfig = {
connection: 'redis',
// Define multiple queues
queues: {
critical: {
concurrency: 2,
limiter: {
max: 100,
duration: 60000
}
},
emails: {
concurrency: 5,
limiter: {
max: 200,
duration: 60000
}
},
media: {
concurrency: 3,
limiter: {
max: 50,
duration: 60000
}
},
reports: {
concurrency: 1,
limiter: {
max: 10,
duration: 60000
}
}
}
}
2. Priority System Design
Use a consistent priority schema across your application:
// app/Services/QueuePriorityService.ts
export class QueuePriority {
// Critical system operations (100-90)
static readonly SYSTEM_CRITICAL = 100
static readonly SECURITY_ALERT = 95
static readonly PAYMENT_PROCESSING = 90
// High priority user-facing (89-70)
static readonly PASSWORD_RESET = 85
static readonly ORDER_CONFIRMATION = 80
static readonly ACCOUNT_VERIFICATION = 75
static readonly USER_NOTIFICATION = 70
// Normal operations (69-30)
static readonly WELCOME_EMAIL = 50
static readonly DATA_SYNC = 40
static readonly IMAGE_PROCESSING = 35
static readonly CONTENT_MODERATION = 30
// Background tasks (29-0)
static readonly NEWSLETTER = 20
static readonly ANALYTICS = 15
static readonly CLEANUP = 10
static readonly REPORTING = 5
}
3. Queue Assignment Strategy
Organize jobs by queue based on resource requirements and business priority:
// app/Services/JobDispatchService.ts
import { QueuePriority } from './QueuePriorityService'
export class JobDispatchService {
// Critical queue - System essential operations
static async dispatchCritical(jobClass: any, payload: any, priority: number) {
return Queue.dispatch(jobClass, payload, {
queue: 'critical',
priority,
attempts: 5,
backoff: { type: 'exponential', delay: 1000 },
removeOnComplete: 100,
removeOnFail: 50
})
}
// Email queue - All email operations
static async dispatchEmail(jobClass: any, payload: any, priority: number) {
return Queue.dispatch(jobClass, payload, {
queue: 'emails',
priority,
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
removeOnComplete: 200,
removeOnFail: 100
})
}
// Media queue - Resource-intensive operations
static async dispatchMedia(jobClass: any, payload: any, priority: number) {
return Queue.dispatch(jobClass, payload, {
queue: 'media',
priority,
attempts: 2,
backoff: { type: 'fixed', delay: 5000 },
removeOnComplete: 50,
removeOnFail: 25
})
}
// Reports queue - Long-running, low priority
static async dispatchReport(jobClass: any, payload: any, priority: number) {
return Queue.dispatch(jobClass, payload, {
queue: 'reports',
priority,
attempts: 1,
removeOnComplete: 20,
removeOnFail: 10
})
}
}
4. Job Implementation Examples
// Critical Jobs
export default class ProcessPayment extends Job {
static get $$key() { return 'ProcessPayment' }
// Dispatch method
static async dispatch(payload: PaymentPayload) {
return JobDispatchService.dispatchCritical(
ProcessPayment,
payload,
QueuePriority.PAYMENT_PROCESSING
)
}
}
// Email Jobs
export default class SendPasswordReset extends Job {
static get $$key() { return 'SendPasswordReset' }
static async dispatch(payload: EmailPayload) {
return JobDispatchService.dispatchEmail(
SendPasswordReset,
payload,
QueuePriority.PASSWORD_RESET
)
}
}
// Media Jobs
export default class ProcessVideo extends Job {
static get $$key() { return 'ProcessVideo' }
static async dispatch(payload: VideoPayload) {
return JobDispatchService.dispatchMedia(
ProcessVideo,
payload,
QueuePriority.IMAGE_PROCESSING
)
}
}
5. Worker Configuration
Run different workers for different queues:
# Production setup with multiple workers
# Critical operations - dedicated workers
node ace queue:work --queue=critical --concurrency=2
# Email processing - high throughput
node ace queue:work --queue=emails --concurrency=5
# Media processing - resource intensive
node ace queue:work --queue=media --concurrency=2
# Background reports - single worker
node ace queue:work --queue=reports --concurrency=1
6. Advanced Queue Management
// app/Services/QueueManagerService.ts
export class QueueManagerService {
// Smart dispatching based on system load
static async smartDispatch(jobClass: any, payload: any, options: {
category: 'critical' | 'email' | 'media' | 'report'
priority: number
userFacing?: boolean
}) {
const { category, priority, userFacing } = options
// Adjust priority based on system load
const adjustedPriority = await this.adjustPriorityForLoad(priority, category)
// Add user-facing boost
const finalPriority = userFacing ? adjustedPriority + 10 : adjustedPriority
switch (category) {
case 'critical':
return JobDispatchService.dispatchCritical(jobClass, payload, finalPriority)
case 'email':
return JobDispatchService.dispatchEmail(jobClass, payload, finalPriority)
case 'media':
return JobDispatchService.dispatchMedia(jobClass, payload, finalPriority)
case 'report':
return JobDispatchService.dispatchReport(jobClass, payload, finalPriority)
}
}
private static async adjustPriorityForLoad(basePriority: number, category: string): Promise<number> {
// Get queue stats
const stats = await Queue.getQueueStats(category)
// If queue is backed up, boost priority for critical items
if (stats.waiting > 100 && basePriority >= QueuePriority.PASSWORD_RESET) {
return basePriority + 5
}
return basePriority
}
}
7. Monitoring and Alerting
// app/Services/QueueMonitorService.ts
export class QueueMonitorService {
static async checkQueueHealth() {
const queues = ['critical', 'emails', 'media', 'reports']
for (const queueName of queues) {
const stats = await Queue.getQueueStats(queueName)
// Alert if critical queue is backed up
if (queueName === 'critical' && stats.waiting > 10) {
await this.sendAlert(`Critical queue backed up: ${stats.waiting} jobs waiting`)
}
// Alert if any queue has too many failures
if (stats.failed > 50) {
await this.sendAlert(`High failure rate in ${queueName}: ${stats.failed} failures`)
}
}
}
}
8. Complete Usage Example
// In your controller or service
export default class UserController {
public async register({ request }: HttpContextContract) {
const userData = request.all()
// Create user
const user = await User.create(userData)
// Dispatch jobs with proper prioritization
// High priority - user waiting for email
await SendPasswordReset.dispatch({
email: user.email,
token: 'reset-token'
})
// Normal priority - welcome flow
await SendWelcomeEmail.dispatch({
email: user.email,
name: user.name
})
// Low priority - analytics
await TrackUserRegistration.dispatch({
userId: user.id,
source: 'web'
})
}
public async processOrder({ request }: HttpContextContract) {
const orderData = request.all()
// Critical - payment processing
await ProcessPayment.dispatch({
orderId: orderData.id,
amount: orderData.total
})
// High priority - order confirmation
await SendOrderConfirmation.dispatch({
email: orderData.customerEmail,
orderId: orderData.id
})
}
}
Key Best Practices Summary:
- Separate queues by resource requirements and business criticality
- Use consistent priority numbering across your application
- Keep critical operations in dedicated queues with higher concurrency
- Monitor queue health and set up alerts
- Use job-specific retry strategies based on importance
- Consider user-facing vs background operations when setting priorities
- Implement circuit breakers for external service calls
- Keep queue processing idempotent and fault-tolerant
This approach ensures your most important jobs get processed first while maintaining system stability and performance.