Skip to content

BullMQ

BullMQ 文档实在是不怎么友好, 自己文档里的内容有时候也跑不起来,但是耐不住是好产品,而且在 nodejs 中特别好用

concurrency

下面的代码测试 Worker 的参数 concurrency = 1 时的执行情况,根据文档介绍,concurrency 指的是相同时间正在跑的 task 数量,这个很好理解,它和时间没有关系,不是每秒多少个并发,也不是每毫秒并发,如果你的任务执行时间特别慢,它的并发就会很慢,就是这个意思。
另外,很多情况下我们使用 docker/kubernetes 进行项目部署, 我就有一个疑问了,如果 开启多个 pods 并且连接到同一个 redis, 这些 Worker 之间会受到影响吗?

js
import { Queue, Worker } from "bullmq";

const sleep = (ms: number) => {
    return new Promise((resolve) => {
        setTimeout(() => {
            resolve(true);
        }, ms);
    })
}

const main = () => {
    const queue = new Queue('queueName', {
        connection: {
            host: 'localhost',
            port: 6379,
            db: 0,
        },
    });

    new Worker('queueName', async (job) => {
        console.log(new Date().toISOString());
        await sleep(1000);
    }, {
        concurrency: 1,
    });

    setInterval(async () => {
        await queue.add('jobName', { key: new Date().toISOString() });
    }, 100);
}

main();

我让上面的代码同时在两个 terminal 中执行,结果是互不影响,也就是说如果你的 concurrency 设置 100, 开启 2 个 pod,那么这个 Queue 的并发数量是 200,开启 3 个 pods 是 300,以此类推。

pause/resume

Queue的 pause 和 resume 是全局的,不会受到 pods 个数影响, 前提是 Queue 同时连接在一个 redis的同一个 DB 上,也就是说 connection 需要一致。
Queue 一旦被 pause 了,那么必须要 resume 才能够继续执行,别无办法,除非清库 🤦‍♂️

js
import { Queue } from "bullmq";

const queue = new Queue("queueName", {
  connection: {
    host: "localhost",
    port: 6379,
    db: 0,
  },
});

setInterval(async () => {
  await queue.add("jobName", { key: new Date().toISOString() });
}, 100);

queue.pause();
// queue.pause();