第29章:コンシューマ設計(少なくとも1回届く世界の作法)📬🛡️
この章でできるようになること🎯✨
- 「同じイベントが2回来ても壊れない」コンシューマを作れる(冪等コンシューマ)🛡️
- 重複・順不同・再配達が“ふつうに起きる”前提で設計できる📨📨🔀
- リトライと**DLQ(配信不能キュー)**を「運用できる形」で組み込める🧯🗑️
- CampusCafeのイベント(例:
OrderPaid/StockReserved)を安全に処理できる☕📦💳
まず結論:分散メッセージは「だいたい少なくとも1回」届く📬
メッセージングでは、ネットワークの都合で「受け取ったのにACKが返せなかった」「処理中に落ちた」みたいなことが起きるよね…😵💫 だから多くのブローカーは at-least-once(少なくとも1回) の世界になりがち。
- RabbitMQは ACK(確認応答)を使うと at-least-once になる(ACKしない限り再配達されうる)📨🔁 (rabbitmq.com)
- Azure Service Busも「ロック&確定(Complete)/放棄(Abandon)/DLQ」みたいに、処理結果に応じて明示的に扱う設計が基本になるよ📦✅ (Microsoft Learn)
CampusCafeで「壊れやすい」代表例💥
たとえば OrderPaid(決済完了イベント)を受けたコンシューマが、
- 注文ステータスをPaidに更新する
- 通知を送る(「決済できたよ〜🔔」)
- ポイントを付与する
…をやるとして、同じイベントが2回来たらどうなる?😇 そのままだと「ポイント2倍」「通知2回」「二重処理」が起きるかも💦
だから必要なのが 冪等コンシューマ(Idempotent Consumer) だよ🛡️✨
冪等コンシューマの王道パターンは2つ🧩

パターンA:処理そのものを“自然に冪等”にする(Upsert/一意制約)🌱
- 例:注文ステータス更新を「
Paid以外ならPaidにする」みたいにする - 例:ポイント付与を「(OrderId, Rule)に一意制約」して二重付与をDBで止める
DBの一意制約で守るのは強いよ💪 ただし「通知送信」みたいな外部副作用は、DBだけで完結しないから注意⚠️
パターンB:Inbox(処理済みテーブル)で“重複排除”する📥🗃️
イベント(メッセージ)には ユニークなID(MessageId / EventId)を持たせて、
コンシューマ側で「このID、もう処理した?」をDBに記録する方式だよ✨
Inbox(またはprocessed_messages)にEventIdを保存- ユニークインデックスで二重登録を防ぐ
- すでに存在したら「重複だからスキップ」📨➡️🗑️
この考え方が「Idempotent Consumer / Inbox」パターンとしてよく使われるよ📌 (event-driven.io)
今日のミニ実装:Inbox方式で「2回来ても壊れない」コンシューマを作る🧪✅
つくるもの🛠️
OrderPaidイベントを処理するコンシューマ- SQLiteに
InboxとOrdersを作る - 同じ
EventIdを2回流しても結果が1回分になるのを確認🎉
DB設計(最小構成)🗃️
テーブルイメージ📌
-
inbox_messagesEventId(ユニーク!)ProcessedAt
-
ordersOrderId(ユニーク)Status(例:Created / Paid / Cancelled)
ポイント:「Inbox登録」と「Orders更新」を同じトランザクションでやること! これで「Inboxだけ登録された/Ordersだけ更新された」みたいな事故を減らせるよ🛡️✨
実装(SQLite + ADO.NETでシンプル版)🧁
以下は「メッセージを受けた」想定で HandleOrderPaidAsync を呼ぶミニ構成だよ📨
using Microsoft.Data.Sqlite;
public sealed record OrderPaidEvent(string EventId, string OrderId, DateTimeOffset PaidAt);
public static class Consumer
{
public static async Task HandleOrderPaidAsync(SqliteConnection conn, OrderPaidEvent ev)
{
// 1) トランザクション開始(Inboxと業務更新を同じトランザクションで!)
await using var tx = await conn.BeginTransactionAsync();
// 2) InboxにEventIdを登録(重複ならユニーク制約で弾かれる)
// → ここが「重複排除の栓」🛡️
var insertInbox = conn.CreateCommand();
insertInbox.Transaction = tx;
insertInbox.CommandText = """
INSERT INTO inbox_messages (event_id, processed_at)
VALUES ($event_id, $processed_at);
""";
insertInbox.Parameters.AddWithValue("$event_id", ev.EventId);
insertInbox.Parameters.AddWithValue("$processed_at", DateTimeOffset.UtcNow.ToString("O"));
try
{
await insertInbox.ExecuteNonQueryAsync();
}
catch (SqliteException ex) when (ex.SqliteErrorCode == 19) // SQLITE_CONSTRAINT
{
// もう処理済み=重複イベント📨📨
await tx.RollbackAsync();
return;
}
// 3) 業務更新:注文をPaidにする(すでにPaidでも結果が同じになるように)
var updateOrder = conn.CreateCommand();
updateOrder.Transaction = tx;
updateOrder.CommandText = """
UPDATE orders
SET status = 'Paid'
WHERE order_id = $order_id
AND status <> 'Paid';
""";
updateOrder.Parameters.AddWithValue("$order_id", ev.OrderId);
await updateOrder.ExecuteNonQueryAsync();
// 4) コミット
await tx.CommitAsync();
}
}
事前に作るDDL(SQLite)📄
CREATE TABLE IF NOT EXISTS inbox_messages (
event_id TEXT PRIMARY KEY,
processed_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS orders (
order_id TEXT PRIMARY KEY,
status TEXT NOT NULL
);
ミニ演習:同じイベントを2回流して確認しよう📨📨✅
手順🪄
ordersにorder_id='O-100' , status='Created'を入れるEventId='E-1'のOrderPaidを2回処理する- 結果:
orders.statusはPaidのまま、inbox_messagesはE-1が1行だけ✨
期待する“壊れない挙動”💯
- 2回目は
inbox_messagesの一意制約で止まって 何も起きない - だから「ポイント2倍」みたいな事故が消える🛡️
“順番”が必要なときだけ、専用の仕組みを使う🔀➡️📌
順不同は普通に起きるよ〜😵💫 でも全部を「順番保証」にするとコストが高い(遅くなる/詰まりやすい)💦
どうしても順番が必要な例📌
OrderCreated → OrderPaid → OrderCompletedを必ず順に処理したい- “同じOrderIdのイベントだけ”順番が守れればOK
この手のケースでは、Azure Service Busの Sessions(FIFO) が使えるよ✨
「同じ SessionId=OrderId のイベントは順に処理」みたいな感じ! (Microsoft Learn)
DLQ(配信不能キュー)は「失敗の墓場」じゃなく「調査の保管庫」🗑️🔎
何度やっても失敗する“毒メッセージ”はあるある…🥲 そこでDLQに逃がして、あとで調査・修正・再投入できるようにするのが大事!
- Azure Service BusのDLQは「処理できない/配信できないメッセージを保管して検査できる」目的だよ📦🔍 (Microsoft Learn)
DLQ運用のミニルール(おすすめ)🧭
- DLQに入ったら「原因タグ(理由)」を必ず記録する(例:ValidationError / SchemaMismatch)🏷️
- DLQの滞留件数を監視する(増えたら異常)📈
- 「直せる失敗」は修正して再投入、「直せない」は隔離してアーカイブ🧯
Azure Service Busでの“確定/放棄”のイメージ(超ざっくり)✅↩️🕰️
Service Busの .NET クライアントでは、メッセージに対して代表的にこんな操作があるよ👇
Complete:処理成功 → キューから削除✅Abandon:今は失敗 → ロック解除して再配達へ↩️Defer:いったん保留(あとでシーケンス番号で回収)🕰️
(用語の整理に便利!) (Microsoft Learn)
重要アップデート(2026視点)📣
Azure Service Busの古いSDK(WindowsAzure.ServiceBus / Microsoft.Azure.ServiceBus など)は 2026-09-30 に退役予定で、移行が推奨されてるよ⚠️
今から書くなら “最新のAzure SDK” 側で進めるのが安心! (Microsoft Learn)
フレームワークの助けも借りてOK(MassTransit例)🚌✨
「コンシューマ内で処理して、その後にイベントを発行したい」みたいなとき、 MassTransitの In-Memory Outbox を使うと「処理が成功するまで発行を保留」してくれるよ📤🧠 (masstransit.io)
ただし!それでもコンシューマが冪等であることは前提になるよ🛡️ (masstransit.io)
実装チェックリスト(そのままコピペで使える)✅✅✅
- イベントに
EventId(ユニーク)を持たせた?🔑 -
Inbox(処理済み記録)をDBに持った?📥 -
Inbox登録と業務更新を同一トランザクションにした?🧱 - 業務更新は「すでに適用済みでも結果が同じ」になってる?(冪等)🛡️
- 外部副作用(通知/他API呼び出し)は“二重でも平気”にできてる?🔔
- リトライしていい失敗/ダメな失敗の線引きがある?🚥
- 毒メッセージはDLQに逃がして調査できる?🗑️🔎
- 監視:失敗率、リトライ回数、DLQ滞留を見れる?📈
AI活用(Copilot / Codex向けプロンプト例)🤖✨
① Inboxパターンの実装レビューをしてもらう🔍
- 「この
HandleOrderPaidAsyncが冪等になってるか、重複時の挙動とトランザクション境界をチェックして。改善点も出して」
② “副作用”の洗い出しをさせる🧨
- 「このコンシューマ内の副作用(通知、外部API、イベント発行)を列挙して、二重実行でも安全にする方法を提案して」
③ DLQの運用ルールを作らせる🧹
- 「DLQに落ちたメッセージを調査・復旧する運用手順(原因分類、再投入条件、監視項目)をCampusCafe向けに作って」
章末まとめ🌸
- “少なくとも1回届く”世界では、重複はバグじゃなく仕様📨📨
- だからコンシューマは 冪等 にするのが正義🛡️✨
- 王道は Inbox(処理済みテーブル+一意制約) 📥
- 順序が本当に必要なときだけ、Sessions(FIFO) など専用機能を使う🔀➡️ (Microsoft Learn)
- 失敗はDLQで“見える化”して、運用で勝つ🗑️🔎 (Microsoft Learn)