基于 Postgres 实现一个 Job Queue
今天看到一篇赞美 Postgres 的文章:Postgres is Too Good (And Why That's Actually a Problem) ,显然是非常吸引眼球的,作者用 PG 实现了需要用到的所有微服务。
做 AFFiNE 我们是用 Redis 的 pub/sub 实现的 Job Queue,所以想参考一下用 Postgres 实现对比看看。
直接问 ChatGPT 怎样实现
基于 Postgres 的 LISTEN/NOTIFY 实现任务队列服务
https://chatgpt.com/share/685527b8-724c-8010-9e5a-1aac521a7acc
表结构
init.sql
CREATE TABLE tasks (
"id" SERIAL PRIMARY KEY,
"type" TEXT NOT NULL,
"payload" JSONB,
"status" TEXT NOT NULL DEFAULT 'pending',
"created_at" TIMESTAMPTZ(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updated_at" TIMESTAMPTZ(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"processed_at" TIMESTAMPTZ(3)
);
Job Queue 基本操作
添加任务并广播通知
INSERT INTO tasks (type, payload) VALUES ('send_email', '{"to": "test@example.com"}');
NOTIFY task_queue, 'new_task';
监听任务消息并拉取任务
监听任务消息
await jobListener.query('LISTEN task_queue');
jobListener.on('notification', async (msg) => {
if (msg.channel === 'task_queue') {
// 在这里拉取任务
}
});
拉取任务 SQL
UPDATE tasks
SET status = 'processing', processed_at = NOW(), updated_at = NOW()
WHERE id = (
SELECT id FROM tasks WHERE status = 'pending'
ORDER BY created_at ASC LIMIT 1
FOR UPDATE SKIP LOCKED
)
RETURNING *;
可运行的代码
先启动一个 Postgres 测试服务
docker run --name pg-container-job-queue-demo \
-e POSTGRES_USER=postgres \
-e POSTGRES_PASSWORD=test123123 \
-e POSTGRES_DB=mydb \
-v $(pwd)/init.sql:/docker-entrypoint-initdb.d/init.sql \
-p 5432:5432 \
-d postgres:16
启动 demo.ts
代码,每 5 秒触发一个任务,并打印结果
node demo.ts
完整代码
请移步 https://github.com/fengmk2/fengmk2.github.com/tree/master/blog/2025/job-queue-on-postgres
有爱
希望本文对你有用 ^_^