Task queue mixin for Bull.
$ npm install moleculer-bull --save
const QueueService = require("moleculer-bull");
broker.createService({
name: "task-worker",
mixins: [QueueService()],
queues: {
"mail.send"(job) {
this.logger.info("New job received!", job.data);
job.progress(10);
return this.Promise.resolve({
done: true,
id: job.data.id,
worker: process.pid
});
}
}
});
const QueueService = require("moleculer-bull");
broker.createService({
name: "task-worker",
mixins: [QueueService("redis://0.0.0.0:32777")],
queues: {
"mail.send"(job) {
this.logger.info("New job received!", job.data);
job.progress(10);
return this.Promise.resolve({
done: true,
id: job.data.id,
worker: process.pid
});
}
}
});
const QueueService = require("moleculer-bull");
broker.createService({
name: "task-worker",
mixins: [QueueService()],
queues: {
"mail.send": {
name: 'important',
concurrency: 5,
process(job) {
this.logger.info("New job received!", job.data);
job.progress(10);
return this.Promise.resolve({
done: true,
id: job.data.id,
worker: process.pid
});
}
}
}
});
const QueueService = require("moleculer-bull");
broker.createService({
name: "task-worker",
mixins: [QueueService()],
queues: {
"mail.send": [
{
name: 'vip',
process(job) {
this.logger.info("New important job received!", job.data);
// Send email a vip way here.
return this.Promise.resolve({ 'info': 'Process success.' });
}
},
{
name: 'normal',
process(job) {
this.logger.info("New normal job received!", job.data);
return this.Promise.resolve({ 'info': 'Process success.' });
}
}
]
}
});
This is same as:
const emailQueue = new Bull('mail.send');
// Worker
emailQueue.process('vip', processImportant);
emailQueue.process('normal', processNormal);
const QueueService = require("moleculer-bull");
broker.createService({
name: "job-maker",
mixins: [QueueService()],
methods: {
sendEmail(payload) {
this.createJob("mail.send", payload);
this.getQueue("mail.send").on("global:progress", (jobID, progress) => {
this.logger.info(`Job #${jobID} progress is ${progress}%`);
});
this.getQueue("mail.send").on("global:completed", (job, res) => {
this.logger.info(`Job #${job.id} completed!. Result:`, res);
});
}
}
});
const QueueService = require("moleculer-bull");
broker.createService({
name: "job-maker",
mixins: [QueueService()],
methods: {
sendEmail(payload) {
this.createJob("mail.send", "important", payload);
this.getQueue("mail.send").on("global:progress", (jobID, progress) => {
this.logger.info(`Job #${jobID} progress is ${progress}%`);
});
this.getQueue("mail.send").on("global:completed", (job, res) => {
this.logger.info(`Job #${job.id} completed!. Result:`, res);
});
}
}
});
$ npm test
In development with watching
$ npm run ci
The project is available under the MIT license.
Copyright (c) 2016-2019 MoleculerJS