Working with Jobs using The @rlanz/bull-queue
Package
I’ll help you understand how to work with Jobs using the @rlanz/bull-queue
package in AdonisJS. This is a popular queue package for AdonisJS that provides a clean interface for background job processing.
Installation and Setup
First, install the package:
npm install @rlanz/bull-queue
node ace configure @rlanz/bull-queue
Basic Job Creation
Create a job using the Ace command:
node ace make:job SendEmail
This creates a job file in app/Jobs/SendEmail.ts
:
import { Job } from '@rlanz/bull-queue'
export default class SendEmail extends Job {
// This is the unique key for the job
static get $$key() {
return 'SendEmail'
}
// The handle method contains the job logic
public async handle(payload: { email: string; subject: string; body: string }) {
// Your job logic here
console.log('Sending email to:', payload.email)
// Example: Send email using a mail service
// await Mail.send('emails.welcome', payload, (message) => {
// message.to(payload.email).subject(payload.subject)
// })
}
// Optional: Handle job failures
public async failed(payload: any, error: Error) {
console.error('Job failed:', error.message)
// Log the error, send notifications, etc.
}
}
Dispatching Jobs
You can dispatch jobs from anywhere in your application:
// In a controller or service
import SendEmail from 'App/Jobs/SendEmail'
import Queue from '@rlanz/bull-queue/services/main'
export default class UserController {
public async register({ request }: HttpContextContract) {
const userData = request.only(['email', 'name'])
// Create user...
// Dispatch the email job
await Queue.dispatch(SendEmail, {
email: userData.email,
subject: 'Welcome!',
body: `Hello ${userData.name}, welcome to our platform!`
})
// Or dispatch with options
await Queue.dispatch(SendEmail, payload, {
delay: 5000, // Delay by 5 seconds
attempts: 3, // Retry 3 times on failure
priority: 10 // Higher priority
})
}
}
Job Options and Configuration
You can configure various job options:
// Dispatch with specific options
await Queue.dispatch(SendEmail, payload, {
// Delay execution
delay: 60000, // 1 minute delay
// Job attempts and backoff
attempts: 5,
backoff: {
type: 'exponential',
delay: 2000,
},
// Job priority (higher number = higher priority)
priority: 10,
// Remove job after completion
removeOnComplete: 10, // Keep only last 10 completed jobs
removeOnFail: 5, // Keep only last 5 failed jobs
})
Queue Configuration
Configure your queues in config/queue.ts
:
import { QueueConfig } from '@rlanz/bull-queue'
const queueConfig: QueueConfig = {
// Default connection
connection: 'redis',
connections: {
redis: {
host: Env.get('REDIS_HOST', '127.0.0.1'),
port: Env.get('REDIS_PORT', 6379),
password: Env.get('REDIS_PASSWORD', ''),
db: 0,
}
},
// Job processing options
jobs: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000,
},
removeOnComplete: 50,
removeOnFail: 50,
}
}
export default queueConfig
Advanced Job Features
Recurring Jobs (Cron-like)
import { CronJob } from '@rlanz/bull-queue'
export default class DailyReport extends CronJob {
static get $$key() {
return 'DailyReport'
}
// Run every day at midnight
static get schedule() {
return '0 0 * * *'
}
public async handle() {
// Generate and send daily reports
console.log('Generating daily report...')
}
}
Job Progress Tracking
export default class ProcessImages extends Job {
static get $$key() {
return 'ProcessImages'
}
public async handle(payload: { images: string[] }, job: any) {
const images = payload.images
for (let i = 0; i < images.length; i++) {
// Process each image
await this.processImage(images[i])
// Update job progress
const progress = Math.round(((i + 1) / images.length) * 100)
await job.progress(progress)
}
}
private async processImage(imagePath: string) {
// Image processing logic
}
}
Starting the Queue Worker
Start the queue worker to process jobs:
node ace queue:work
Or start with specific options:
# Process specific queues
node ace queue:work --queue=emails,images
# Set concurrency
node ace queue:work --concurrency=5
# Run in production mode
node ace queue:work --no-delay
Best Practices
- Keep jobs idempotent: Jobs should be safe to run multiple times
- Handle failures gracefully: Always implement the
failed
method - Use appropriate timeouts: Set reasonable timeouts for long-running jobs
- Monitor your queues: Use Bull Dashboard or similar tools for monitoring
- Use different queues for different job types: Separate high-priority from low-priority jobs
// Example of a robust job
export default class ProcessPayment extends Job {
static get $$key() {
return 'ProcessPayment'
}
public async handle(payload: { orderId: string, amount: number }) {
const { orderId, amount } = payload
try {
// Check if already processed (idempotency)
const order = await Order.find(orderId)
if (order.status === 'paid') {
return // Already processed
}
// Process payment
const result = await PaymentService.charge(amount)
// Update order
order.status = 'paid'
order.transactionId = result.id
await order.save()
} catch (error) {
// Log error for debugging
Logger.error('Payment processing failed', { orderId, error: error.message })
throw error // Re-throw to mark job as failed
}
}
public async failed(payload: any, error: Error) {
// Notify administrators
await NotificationService.sendAlert(`Payment job failed: ${error.message}`)
}
}
This should give you a solid foundation for working with jobs in AdonisJS using the @rlanz/bull-queue
package!