Appearance
BullMQ
BullMQ 文档实在是不怎么友好, 自己文档里的内容有时候也跑不起来,但是耐不住是好产品,而且在 nodejs 中特别好用
concurrency
下面的代码测试 Worker 的参数 concurrency = 1 时的执行情况,根据文档介绍,concurrency 指的是相同时间正在跑的 task 数量,这个很好理解,它和时间没有关系,不是每秒多少个并发,也不是每毫秒并发,如果你的任务执行时间特别慢,它的并发就会很慢,就是这个意思。
另外,很多情况下我们使用 docker/kubernetes 进行项目部署, 我就有一个疑问了,如果 开启多个 pods 并且连接到同一个 redis, 这些 Worker 之间会受到影响吗?
js
import { Queue, Worker } from "bullmq";
const sleep = (ms: number) => {
return new Promise((resolve) => {
setTimeout(() => {
resolve(true);
}, ms);
})
}
const main = () => {
const queue = new Queue('queueName', {
connection: {
host: 'localhost',
port: 6379,
db: 0,
},
});
new Worker('queueName', async (job) => {
console.log(new Date().toISOString());
await sleep(1000);
}, {
concurrency: 1,
});
setInterval(async () => {
await queue.add('jobName', { key: new Date().toISOString() });
}, 100);
}
main();
我让上面的代码同时在两个 terminal 中执行,结果是互不影响,也就是说如果你的 concurrency 设置 100, 开启 2 个 pod,那么这个 Queue 的并发数量是 200,开启 3 个 pods 是 300,以此类推。
pause/resume
Queue的 pause 和 resume 是全局的,不会受到 pods 个数影响, 前提是 Queue 同时连接在一个 redis的同一个 DB 上,也就是说 connection 需要一致。
Queue 一旦被 pause 了,那么必须要 resume 才能够继续执行,别无办法,除非清库 🤦♂️
js
import { Queue } from "bullmq";
const queue = new Queue("queueName", {
connection: {
host: "localhost",
port: 6379,
db: 0,
},
});
setInterval(async () => {
await queue.add("jobName", { key: new Date().toISOString() });
}, 100);
queue.pause();
// queue.pause();