Skip to main content

第18章:Workerサービスを分離して動かす👷‍♂️⚙️

この章は「API」と「Worker(非同期処理担当)」を 別サービスとしてComposeに同居させて、キュー処理が流れるのをログで見てニヤける回です😎✨ (ここができると、急に“開発スタック感”が完成します🚀)


1) まずイメージだけ掴もう🧠💡

ざっくり、こういう流れを作ります👇

  • API 🧑‍💻 → Queueに仕事を積む📨
  • Worker 👷‍♂️ → Queueから仕事を取って処理⚙️
  • Redis 🟥 → Queueの土台(仕事の待ち行列)
  • (必要なら)Postgres 🐘 → 結果を保存する先

BullMQだと「Queueに add() した仕事を、Workerが拾って処理する」っていう動きになります📬✨ (docs.bullmq.io)


2) なぜ“Workerを分離”するの?🏗️

分けると、いいことが多いです😊

  • APIが軽くなる:重い処理でAPIのレスポンスが遅くなりにくい⚡
  • 落ち方がキレイ:WorkerがコケてもAPIは生き残れる🧯
  • スケールしやすい:Workerだけ増やせる(後で --scale worker=3 とか)📈
  • 責務が分かれる:「受付係(API)」と「作業員(Worker)」で役割が明確👮‍♂️👷‍♂️

3) この章のゴール🎯✨

  • docker compose upAPI / Worker / Redis が一緒に起動する🐳
  • APIにリクエストすると ジョブがRedisに積まれる📨
  • Workerのログに “処理したよ” が出る👀✨
  • ついでに、起動順の待ち合わせも最低限いれる⏳ (Composeは“起動=ready”までは待たないので、必要なら healthcheckdepends_on: condition を使います) (Docker Documentation)

4) 実装:まずはコード側(APIとWorker)🧑‍💻🛠️

ここでは例として BullMQ を使います📦 (Redisがあるだけで始めやすいので、この教材の流れと相性◎)

4-1) Queue共通設定(src/queue.ts)🧩

// src/queue.ts
import { Queue } from "bullmq";

export const queueName = "demo";

export const connection = {
host: process.env.REDIS_HOST ?? "redis",
port: Number(process.env.REDIS_PORT ?? 6379),
};

export const demoQueue = new Queue(queueName, { connection });

ポイント👉 host: "redis"Composeのサービス名です🕸️ コンテナ同士は localhost じゃなくて サービス名で通信するのが基本👍


4-2) API:ジョブを積む(src/server.ts)📨

// src/server.ts
import express from "express";
import { demoQueue } from "./queue";

const app = express();
app.use(express.json());

app.get("/health", (_req, res) => res.status(200).send("ok"));

app.post("/jobs/hello", async (req, res) => {
const name = (req.body?.name ?? "world") as string;

const job = await demoQueue.add("hello", { name });
res.json({ ok: true, jobId: job.id });
});

app.listen(3000, "0.0.0.0", () => {
console.log("API listening on http://localhost:3000");
});

4-3) Worker:ジョブを処理する(src/worker.ts)👷‍♂️⚙️

// src/worker.ts
import { Worker } from "bullmq";
import { connection, queueName } from "./queue";

const concurrency = Number(process.env.WORKER_CONCURRENCY ?? 2);

const worker = new Worker(
queueName,
async (job) => {
if (job.name === "hello") {
const { name } = job.data as { name: string };
// ここが「重い処理」のつもり😎(例なのでログだけ)
console.log(`👷‍♂️ job=${job.id} hello, ${name} 🌟`);
return { greeted: name };
}
console.log(`👷‍♂️ unknown job name: ${job.name}`);
return null;
},
{ connection, concurrency }
);

worker.on("completed", (job) => {
console.log(`✅ completed job=${job.id}`);
});

worker.on("failed", (job, err) => {
console.log(`❌ failed job=${job?.id} err=${err.message}`);
});

// ちゃんと止まれるように(Compose停止時に大事)🧯
process.on("SIGTERM", async () => {
console.log("🛑 SIGTERM received. Closing worker...");
await worker.close(); // BullMQ推奨の優雅な停止方法✨
process.exit(0);
});

BullMQのWorkerは「Queueに積まれた仕事を受け取って処理する“受信者”」という位置づけです📬 (docs.bullmq.io) また、停止時は worker.close()新規受付を止めて、処理中ジョブを待つのが基本です🧯 (docs.bullmq.io)


4-4) package.json にスクリプトを用意🧪

(例:TypeScriptは tsx で直実行する想定)

{
"scripts": {
"dev:api": "tsx watch src/server.ts",
"dev:worker": "tsx watch src/worker.ts"
}
}

5) Compose側:Workerサービスを追加する🐳➕

5-1) compose.yaml(全体例)📄

ここでは **API と Worker は同じビルド成果物(同じDockerfile)**を使って、command だけ変えます👍 「同じコードベースで、起動プロセスだけ分ける」って感じ✨

services:
redis:
image: redis:7
ports:
- "6379:6379"
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 3s
retries: 20

api:
build: .
command: npm run dev:api
ports:
- "3000:3000"
environment:
REDIS_HOST: redis
REDIS_PORT: "6379"
depends_on:
redis:
condition: service_healthy
init: true
stop_grace_period: 30s

worker:
build: .
command: npm run dev:worker
environment:
REDIS_HOST: redis
REDIS_PORT: "6379"
WORKER_CONCURRENCY: "2"
depends_on:
redis:
condition: service_healthy
restart: unless-stopped
init: true
stop_grace_period: 30s

ここ、超大事ポイント💥

  • depends_on: condition: service_healthy を使うと、Redisのhealthcheckが通ってからAPI/Workerを起動できます⏳ (Docker Documentation)
  • init: trueシグナル転送と子プロセス回収(地味に大事)をしてくれます👍 (Docker Documentation)
  • stop_grace_periodSIGTERM後に待つ猶予。Workerを“優雅に止める”のに相性◎🧯 (Docker Documentation)
  • restart: unless-stopped は Workerみたいな常駐プロセスに便利です🔁 (Docker Documentation)

ちなみに、Composeの推奨は「Compose Specification」ベースです(古い2.x/3.xの流派を統合したやつ)📘 (Docker Documentation)


6) 起動して“流れ”を確認しよう👀✨

6-1) 起動🚀

docker compose up --build

別ターミナルで状況確認👀

docker compose ps
docker compose logs -f worker

6-2) APIにリクエストしてジョブ投入📨

PowerShellでも curl でOK👌

curl -X POST http://localhost:3000/jobs/hello ^
-H "Content-Type: application/json" ^
-d "{\"name\":\"komiyamma\"}"

すると…Workerログにこういうのが出たら勝ち🎉

  • 👷‍♂️ job=... hello, komiyamma 🌟
  • ✅ completed job=...

BullMQは「Workerが動いてなくてもジョブはRedisに積まれて、Workerが繋がった瞬間に拾われる」設計です📦 (docs.bullmq.io)


7) “分離できた”を確信する小ワザ😎🔬

7-1) Workerだけ止めてみる🛑

docker compose stop worker

その状態でAPIにジョブを投げると、Workerログは流れない(当然) でもRedisにはジョブが積まれているので…

7-2) Workerを復活させると、溜まった分が流れる🔥

docker compose start worker
docker compose logs -f worker

「うおお、溜まってたのを処理した!」ってなれば成功です🎉


8) 設計の超入門メモ📝✨(ここが成長ポイント)

  • APIは“受付”:できるだけ短く終わる(レスポンス命)⚡
  • Workerは“作業”:時間がかかってOK(ただし落ちても復旧できる形に)🧯
  • Queueは“緩衝材”:ピークを吸収して、壊れにくくする📦

これが分かると、次の章(リトライ・DLQっぽい考え方)に自然に繋がります🔁✨


9) よくある詰まりポイント集🪤😵‍💫

  • Redisに繋がらない

    • コンテナ内で localhost を使ってない? → redis にする(サービス名)🕸️
    • depends_on を “起動順だけ”と思って油断 → ready待ちは healthcheck + condition が必要⏳ (Docker Documentation)
  • 止めたときにWorkerが変に残る

    • worker.close() で優雅に止めるのが基本🧯 (docs.bullmq.io)
    • stop_grace_period を短くしすぎると、処理中にブチ切られがち⚠️ (Docker Documentation)
  • Workerが落ちたら復旧しない


10) AI(Copilot/Codex)に頼むならこう言う🤖🪄

コピペで使える指示例👇

  • 「BullMQで QueueWorker を分離した最小構成を作って。APIは /jobs/helloqueue.add()、Workerはログ出力、SIGTERMで worker.close() して」
  • 「Composeで worker サービスを追加して、depends_on: condition: service_healthyhealthcheck も入れて」

AIは便利だけど、“サービス名で接続”(redis)と “止め方”(close + grace)だけは人間がチェックすると事故が減ります👍🧯


まとめ🎉

この章でやったことはこれ👇

  • Workerを APIと別プロセス(別サービス) に分離👷‍♂️
  • Composeで 同居させて一発起動🐳
  • ジョブ投入 → Worker処理を ログで目視👀✨
  • healthcheck / depends_on condition / stop_grace_period / init で運用の土台も少しだけ固めた🧱

次(第19章)は、ここに **「失敗時のリトライ」「無限リトライ地獄を避ける」**を足して、急に実戦レベルに近づけます🧯🔁✨