Wire the three call sites
Every button you wired up across the last two lessons fired the dispatcher from one place: the inspector, calling dispatch() directly with a hand-built payload. That proved the engine. It did not connect the engine to anything real. This lesson fires the dispatcher from the three product surfaces it was built for — sending an invitation, changing a member’s role, and the Stripe past-due webhook — and it fires it under one rule that holds at all three: only after the transaction commits.
By the end, the flow runs end-to-end the way it will in production. sendInvitation to someone who already has an account writes the invitation, commits, then drops an inbox row plus an email for the invitee. changeMemberRole writes its auditLogs row and a notifications row, so the compliance trail and the user’s inbox both record the change. And a customer.subscription.updated event that flips an org to past_due lands, commits, then notifies every owner of that org. Open /inbox after any of these and you are reading real rows, not a fixture.
Your mission
Section titled “Your mission”This lesson moves the dispatcher off the inspector’s direct-fire demo and onto the three call sites it exists to serve: sendInvitation and changeMemberRole (the invitation actions in lib/invitations/) and the past-due branch of the Stripe webhook. The dispatcher itself is finished — you are not changing it. You are deciding where and when each call site invokes it, and the “when” is the entire lesson. These three edits are one capability, not three; they are verified as a set, because each is the same move applied to a different shape.
The move is this: do the action’s transactional work with await withTenant(...) (or db.transaction(...) for the webhook), let it commit, then await dispatch(...). Never the other way around. A notification fired inside the transaction is a notification you cannot take back when the transaction rolls back — you would tell a user they were invited to an org that, a millisecond later, never got the invitation row. That exact failure is the reason the seam insists on fire-after-commit, and it is the one discipline tying the three call sites together. Two of them make it almost free: sendInvitation and changeMemberRole already structure their writes inside a withTenant block that returns before any notification code, so the dispatch call simply goes after it.
The webhook is the awkward one, and it is the reason this lesson exists rather than being a footnote. The owner list you notify has to be read inside the transaction — the owners must reflect the transition you are committing, not whatever the table held a moment earlier — but you cannot dispatch there, because the transaction has not committed yet. So the handler splits the two halves: inside tx it reads the owners and pushes an org.billing.past_due descriptor onto a pendingDispatches: NotificationEvent[] array that the route captures in a closure; the route’s POST drains that array with the dispatcher only after db.transaction resolves. Read inside, dispatch after. The heavier-duty version of this pattern is the transactional outbox — a durable pending_dispatches table that a worker drains, surviving a crash between commit and dispatch — and it is the right next step at scale; an in-memory array is the honest v1, and you will name the upgrade rather than build it.
A few constraints shape the rest. The dispatcher trusts its caller completely: it does no authorization. The admin check lives at the action boundary, in authedAction('admin', ...), so the gating is the call site’s job and the seam’s job is only to send — which also means a direct dispatch() call from some non-action path would bypass that check entirely, and that is by design, not an oversight. The two dedup layers compose cleanly without knowing about each other: processed_events at the webhook handler catches a redelivered Stripe event, while notificationDedup inside the dispatcher catches a duplicate notification even when two distinct event ids would otherwise produce one. And there is one wart you are accepting on purpose: an invite to an existing user sends two emails — chapter 065’s invitation email (the one named sendEmail call that lives outside the seam, already sent after commit in sendInvitation) plus the dispatcher’s org.invitation.sent. Merging them is a real improvement and a real next step; v1 keeps both because that invitation-email path predates the seam, and pretending otherwise would mean rewriting working code this lesson does not touch.
Out of scope, explicitly: changing lib/email.ts, merging those duplicate invite emails, and short-circuiting a no-op role change.
sendInvitation to an existing user writes the invitation, commits, then produces one inbox row plus one email increment for the invitee.changeMemberRole writes both an auditLogs row and a notifications row, with one email increment for the affected member.sendEmail( and db.insert(notifications) outside lib/notifications/ returns only the one named exception, chapter 065’s invitation email in sendInvitation — the seam holds.Coding time
Section titled “Coding time”Add the dispatch() call after commit in src/lib/invitations/send.ts and src/lib/invitations/manage.ts, then wire the past-due path across src/lib/webhooks/stripe.ts and src/app/api/webhooks/stripe/route.ts. Three short edits to existing code, built against the brief and the Lesson 4 tests. Each of these files already carries a // TODO(L4) marking exactly where its edit goes.
Reference solution and walkthrough
The invitation: dispatch after the commit
Section titled “The invitation: dispatch after the commit”Start with send.ts, the simplest of the three because the structure is already in your favor. The invitation row and its audit entry co-transact inside withTenant, which returns invitationId once that transaction commits. Chapter 065’s invitation email then sends through its own sendEmail call. Your edit is the block after it: one dispatch call, sitting where the // TODO(L4) was.
const org = await db.query.organization.findFirst({ where: eq(organization.id, ctx.orgId),});const orgName = org?.name ?? 'your organization';const acceptUrl = await signedInviteUrl(invitationId, rawToken);
const sent = await sendEmail({ to: email, subject: `You're invited to ${orgName}`, react: createElement(InviteEmail, { orgName, inviterName: ctx.user.name, role, acceptUrl, expiresAt: new Date(Date.now() + INVITATION_TTL_SECONDS * 1000), }), idempotencyKey: `invite:${invitationId}`,});
await dispatch({ type: 'org.invitation.sent', recipientUserIds: existingUser ? [existingUser.id] : [], subjectId: invitationId, payload: { invitedEmail: email, role, orgName, inviterName: ctx.user.name, acceptUrl, },});
revalidatePath('/inspector');return ok({ invitationId, emailSent: sent.ok });The dispatch call runs after withTenant has already committed invitationId, so the notification can never fire for an invitation that rolled back. Fire-after-commit, made structural.
const org = await db.query.organization.findFirst({ where: eq(organization.id, ctx.orgId),});const orgName = org?.name ?? 'your organization';const acceptUrl = await signedInviteUrl(invitationId, rawToken);
const sent = await sendEmail({ to: email, subject: `You're invited to ${orgName}`, react: createElement(InviteEmail, { orgName, inviterName: ctx.user.name, role, acceptUrl, expiresAt: new Date(Date.now() + INVITATION_TTL_SECONDS * 1000), }), idempotencyKey: `invite:${invitationId}`,});
await dispatch({ type: 'org.invitation.sent', recipientUserIds: existingUser ? [existingUser.id] : [], subjectId: invitationId, payload: { invitedEmail: email, role, orgName, inviterName: ctx.user.name, acceptUrl, },});
revalidatePath('/inspector');return ok({ invitationId, emailSent: sent.ok });The empty-array no-op. existingUser was resolved at the top of the action via db.query.user.findFirst({ where: eq(user.email, email) }), so a non-user invitee dispatches to zero recipients and the dispatcher loops cleanly over nothing.
const org = await db.query.organization.findFirst({ where: eq(organization.id, ctx.orgId),});const orgName = org?.name ?? 'your organization';const acceptUrl = await signedInviteUrl(invitationId, rawToken);
const sent = await sendEmail({ to: email, subject: `You're invited to ${orgName}`, react: createElement(InviteEmail, { orgName, inviterName: ctx.user.name, role, acceptUrl, expiresAt: new Date(Date.now() + INVITATION_TTL_SECONDS * 1000), }), idempotencyKey: `invite:${invitationId}`,});
await dispatch({ type: 'org.invitation.sent', recipientUserIds: existingUser ? [existingUser.id] : [], subjectId: invitationId, payload: { invitedEmail: email, role, orgName, inviterName: ctx.user.name, acceptUrl, },});
revalidatePath('/inspector');return ok({ invitationId, emailSent: sent.ok });The dedup key for this event — the dispatcher’s notificationDedup window keys off subjectId, so this invitation can’t double-notify.
const org = await db.query.organization.findFirst({ where: eq(organization.id, ctx.orgId),});const orgName = org?.name ?? 'your organization';const acceptUrl = await signedInviteUrl(invitationId, rawToken);
const sent = await sendEmail({ to: email, subject: `You're invited to ${orgName}`, react: createElement(InviteEmail, { orgName, inviterName: ctx.user.name, role, acceptUrl, expiresAt: new Date(Date.now() + INVITATION_TTL_SECONDS * 1000), }), idempotencyKey: `invite:${invitationId}`,});
await dispatch({ type: 'org.invitation.sent', recipientUserIds: existingUser ? [existingUser.id] : [], subjectId: invitationId, payload: { invitedEmail: email, role, orgName, inviterName: ctx.user.name, acceptUrl, },});
revalidatePath('/inspector');return ok({ invitationId, emailSent: sent.ok });The payload — exactly the fields the registry’s invite-sent templates consume: invitedEmail, role, orgName, inviterName, acceptUrl. Note NotificationEvent carries no orgId field; the org name travels in the payload as copy.
const org = await db.query.organization.findFirst({ where: eq(organization.id, ctx.orgId),});const orgName = org?.name ?? 'your organization';const acceptUrl = await signedInviteUrl(invitationId, rawToken);
const sent = await sendEmail({ to: email, subject: `You're invited to ${orgName}`, react: createElement(InviteEmail, { orgName, inviterName: ctx.user.name, role, acceptUrl, expiresAt: new Date(Date.now() + INVITATION_TTL_SECONDS * 1000), }), idempotencyKey: `invite:${invitationId}`,});
await dispatch({ type: 'org.invitation.sent', recipientUserIds: existingUser ? [existingUser.id] : [], subjectId: invitationId, payload: { invitedEmail: email, role, orgName, inviterName: ctx.user.name, acceptUrl, },});
revalidatePath('/inspector');return ok({ invitationId, emailSent: sent.ok });The chapter 065 invitation email — the one named non-seam sendEmail call. It predates the dispatcher and still fires on its own after commit, sending the invitee’s accept-link email directly.
The recipientUserIds: existingUser ? [existingUser.id] : [] ternary is the detail worth slowing down on. An invitation can go to an email that has no account yet — that is the normal case for inviting someone new — and for that invitee there is no user to notify in-app. The instinct is to guard: if (existingUser) dispatch(...). Resist it. The dispatcher already loops over its recipient list, and a loop over zero recipients does nothing and returns { sent: 0, deduped: 0, suppressedByPrefs: 0 } — a clean no-op, not an error. Handing it an empty array keeps the call site uniform: there is one dispatch call, always, and the data decides whether anything happens. That is the better seam. existingUser itself was resolved at the very top of the action, by the lookup that also rejects an invite to someone who is already a member:
const existingUser = await db.query.user.findFirst({ where: eq(user.email, email), });The new import { dispatch } from '@/lib/notifications' is the only import this file gains. Everything else — withTenant, the audit write, the invitation email — was already here from chapter 065. The single await sendEmail(...) block above your dispatch is the one named exception to the seam: it predates the dispatcher, it sends the invitee’s accept-link email directly, and merging it into the dispatcher is a deferred improvement, not this lesson’s work.
The role change: keep the audit write, add the notification
Section titled “The role change: keep the audit write, add the notification”manage.ts is the same shape, with one thing to be careful about: do not disturb the existing audit write. The role update and its logAudit(tx, ...) entry co-transact inside withTenant — that pairing is deliberate, because a role that changed with no audit row is exactly the gap a compliance table exists to close. Leave it alone. Read the org name after the commit, then dispatch.
await withTenant(ctx.orgId, async (tx) => { await tx .update(member) .set({ role: newRole }) .where( and(eq(member.id, memberId), eq(member.organizationId, ctx.orgId)), ); await logAudit(tx, { action: 'member.role-changed', subjectType: 'member', subjectId: memberId, payload: { before: target.role, after: newRole }, }); });
const org = await db.query.organization.findFirst({ where: eq(organization.id, ctx.orgId), }); const orgName = org?.name ?? 'your organization';
await dispatch({ type: 'org.member.role_changed', recipientUserIds: [target.userId], subjectId: memberId, payload: { newRole, before: target.role, orgName, actorName: ctx.user.name, }, });
revalidatePath('/inspector'); return ok({ memberId, role: newRole });The result is a deliberate dual write: logAudit records the change for compliance, and dispatch tells the affected member their role moved. Two tables, two audiences, one action — and the test for this requirement checks both, that logAudit(tx, ...) still sits inside withTenant and that dispatch(...) fires after it. The recipient is [target.userId], the single member whose role changed, and subjectId is the memberId so the dedup window keys on this specific membership. Note that org is read from the global db after the transaction, not from tx — by then the transaction has committed and the org name is just a label for the notification copy; it does not need to be transactionally consistent with the role change. This file also gains the organization import alongside dispatch, since the chapter 065 version did not read the org row.
The webhook: read inside the transaction, dispatch after it
Section titled “The webhook: read inside the transaction, dispatch after it”This is the load-bearing edit, and it is split across two files because the read and the dispatch happen at two different moments. Walk both tabs.
export const dispatch = async ( tx: Transaction, event: Stripe.Event, pendingDispatches: NotificationEvent[],): Promise<void> => { switch (event.type) { case 'checkout.session.completed': await onCheckoutCompleted(tx, event); break; case 'customer.subscription.updated': await onSubscriptionUpdated(tx, event, pendingDispatches); break; case 'customer.subscription.deleted': await onSubscriptionDeleted(tx, event); break; default: log.info({ eventId: event.id, eventType: event.type }, 'unhandled'); return; } log.info({ eventId: event.id, eventType: event.type }, 'dispatched');};
export const onSubscriptionUpdated = async ( tx: Transaction, event: Stripe.Event, pendingDispatches: NotificationEvent[],): Promise<void> => { const sub = event.data.object as Stripe.Subscription; const patch = subscriptionToEntitlement(sub, loadCatalog()); const eventAt = new Date(event.created * 1000);
const updated = await tx .update(planEntitlements) .set({ ...patch, lastEventAt: eventAt }) .where( and( eq(planEntitlements.subscriptionId, sub.id), or( isNull(planEntitlements.lastEventAt), lt(planEntitlements.lastEventAt, eventAt), ), ), ) .returning({ organizationId: planEntitlements.organizationId });
const row = updated[0]; if (!row) { log.info({ eventId: event.id, subscriptionId: sub.id }, 'stale_ordering'); return; }
await logAudit(tx, { organizationId: row.organizationId, actorUserId: null, action: 'billing.subscription.updated', subjectType: 'subscription', subjectId: sub.id, payload: { plan: patch.plan, status: patch.status }, }); log.info( { eventId: event.id, orgId: row.organizationId, plan: patch.plan }, 'subscription_updated', );
if (patch.status === 'past_due') { const org = await tx.query.organization.findFirst({ where: eq(organization.id, row.organizationId), }); const owners = await tx.query.member.findMany({ where: and( eq(member.organizationId, row.organizationId), eq(member.role, 'owner'), ), }); pendingDispatches.push({ type: 'org.billing.past_due', recipientUserIds: owners.map((owner) => owner.userId), subjectId: sub.id, payload: { orgName: org?.name ?? 'your organization', plan: patch.plan, }, }); }};Collect-only — the handler never calls the notification dispatcher. Both dispatch and onSubscriptionUpdated gain a pendingDispatches: NotificationEvent[] param. On patch.status === 'past_due' (after the non-stale UPDATE returned a row), the handler reads the org row and its owner members inside tx, then pushes one org.billing.past_due descriptor — recipientUserIds: owners.map(o => o.userId), subjectId: sub.id — onto the array and returns.
import type { NotificationEvent } from '@/lib/notifications';import { dispatch as dispatchNotification } from '@/lib/notifications';
const pendingDispatches: NotificationEvent[] = []; let duplicate = false; await db.transaction(async (tx) => { const claimed = await claimEvent(tx, 'stripe', event.id, event.type); if (!claimed) { duplicate = true; log.info({ eventId: event.id }, 'duplicate'); return; } log.info({ eventId: event.id }, 'claimed'); await dispatch(tx, event, pendingDispatches); });
for (const e of pendingDispatches) { await dispatchNotification(e); }
return Response.json({ received: true, duplicate }, { status: 200 });The route owns the array and the drain. pendingDispatches is declared before db.transaction so the closure can fill it; the Stripe-router dispatch(tx, event, pendingDispatches) runs inside the tx. After the transaction resolves, a for...of loop drains the array with dispatchNotification(e). The notifications dispatch is imported aliased as dispatchNotification because the Stripe-router dispatch already binds (tx, event) and the two have different arities.
The shape to hold onto: the handler collects, the route dispatches. onSubscriptionUpdated reads the owner ids inside tx, so they reflect the transition this transaction is committing — read them from the global db and a concurrent change could give you owners that disagree with the past-due state you just wrote. But it cannot fire the notification there, so it pushes a descriptor onto the array and returns. The array lives in route.ts, declared before db.transaction so the closure can fill it, drained by a plain for...of loop once the transaction resolves. If the transaction throws, that loop never runs, and that is the fire-after-commit guarantee in one line of control flow: rolled-back state has no descriptor to drain because the drain is downstream of the commit.
Two small things that look odd at a glance:
dispatchmeans two different functions in this code.route.tsalready imports the Stripe router’sdispatch(tx, event, pendingDispatches), so it brings in the notification dispatcher under an alias —import { dispatch as dispatchNotification } from '@/lib/notifications'— and drains the array withdispatchNotification(e). They have different arities and different jobs; the alias keeps them from colliding. The notification dispatcher takes only the event, exactly as it did from the inspector.- The handler reads
organdownersseparately. That is two queries inside the transaction, both scoped torow.organizationId(the org the UPDATE returned). The owner read filters onmember.role === 'owner', andowners.map((owner) => owner.userId)becomes the recipient list — every owner gets the past-due notice. For the seeded Acme, that is just Alice, so you will see exactly one row and one email; an org with three owners would fan out to three.
The transactional-outbox alternative belongs in your head as the next reach. If the process crashes after the transaction commits but before the drain loop runs, those notifications are lost — the in-memory array died with the process. The durable version writes the descriptors to a pending_dispatches table inside the same transaction (so they commit atomically with the state change) and a background worker drains that table, retrying on failure. That is more machinery than a v1 needs, and the contract — collect inside, dispatch after — is identical, so you can swap the array for a table later without touching a single call site. Naming it and deferring it is the right call here.
The seam check (requirement 6)
Section titled “The seam check (requirement 6)”The sixth requirement is the one the test suite cannot assert at runtime, because it is a property of the source, not of any single execution: outside lib/notifications/, the only sendEmail( call and the only db.insert(notifications) should be the one named exception. Run the search yourself:
grep -rn "db.insert(notifications)" src --include="*.ts" | grep -v "lib/notifications/"grep -rn "sendEmail(" src --include="*.ts" | grep -v "lib/notifications/"The first should return nothing — writeInboxChannel is the only writer of the notifications table, and it lives inside the seam. The second returns exactly one hit: the chapter 065 invitation email in send.ts. That single result is the seam holding. Anything else the search turns up is a call site that learned to notify on its own instead of going through the dispatcher, which is the precise drift the seam exists to prevent — every channel decision, every dedup, every preference check would silently route around the one place that owns them.
For the pieces these edits lean on but do not re-teach: authedAction and withTenant were built in The authedAction wrapper and the audit write in The append-only audit log; the webhook’s verify-then-claim-then-commit shape is Claim once, mutate once and the project version in Claim the event inside one transaction; and the dispatcher contract you are calling — NotificationEvent in, DispatchResult out — is One seam, many channels and this chapter’s Registry, dispatcher, and dedup.
Chris Richardson's canonical writeup of the durable table this lesson's in-memory pendingDispatches array stands in for.
Moment of truth
Section titled “Moment of truth”Run the lesson’s gate:
pnpm test:lesson 4The suite drives all three call sites against the same local Postgres and email mock the app uses. The two action call sites run behind authedAction, which resolves a Better Auth session through next/headers — a path vitest’s node environment cannot enter — so for those the suite drives the dispatcher with the exact event each call site builds and reads the wiring back out of the source: that dispatch sits after the commit, that the recipient list is the existing-user-or-empty ternary, that the audit write stays put. The webhook handler is a plain function, so it gets driven directly with a real transaction and a fixture past-due subscription, then the collected descriptors are drained exactly as the route drains them. It needs this chapter’s migration applied and pnpm db:seed run — Acme’s lone owner, Alice, is the past-due fan-out fixture. A green run looks like this:
✓ tests/lessons/Lesson 4.test.ts (8) ✓ sendInvitation to an existing user notifies that user (2) ✓ produces one inbox row and one email increment for the invitee ✓ fires the dispatcher after the withTenant transaction commits ✓ inviting a non-user address no-ops the dispatcher (2) ✓ writes no inbox row and leaves the email counter flat on an empty recipient list ✓ builds the recipient list as the existing-user-or-empty ternary ✓ changeMemberRole writes both an audit row and a notification (2) ✓ produces one inbox row and one email increment for the affected member ✓ keeps the audit-log write inside the tx and dispatches after commit ✓ the past-due webhook fans out to every org owner after commit (1) ✓ collects an owner-targeted descriptor and dispatching it lands one inbox row + email per owner ✓ a rolled-back action notifies nobody (1) ✓ writes no inbox row and bumps no email counter when the transaction rolls back
Test Files 1 passed (1) Tests 8 passed (8)The suite reads rows and the source; it does not click through the real surfaces. Walk them once by hand to confirm the production path does what the gate proves the wiring does:
auditLogs row and a notifications row are written, with one email increment.customer.subscription.updated with past_due status (via the stripe CLI or the inspector’s Fire billing-past-due) — processed_events, the entitlement, the audit row, and one notification per owner all land; replay the same event and the handler’s claim blocks it, with the dispatcher’s dedup as the second layer.Wrap invite in rollback control’s note, and confirm through the tests that a rolled-back action writes no rows and bumps no counter.sendEmail( and db.insert(notifications) outside lib/notifications/ return only the named chapter 065 invitation-email exception.(userId, createdAt desc) index and the partial unread index back the inbox feed.That is the project. Three real call sites now dispatch after commit, the invitation actions and the Stripe webhook produce real notifications and real emails, and /inbox shows real rows — the production-shaped flow, end to end.
Because this is where the build stops, it is worth naming what the dispatcher hands off to later chapters — each a clean extension of the same contract, none of it work for now:
- Caching the inbox feed. When inbox volume justifies it, the later chapter on tag-driven cache invalidation tags the feed read so a new notification busts exactly that cache; this project deliberately reads the table live and caches nothing.
- Redis-backed dedup. When throughput crosses the threshold where a database write per dedup check stops being free, the rate-limiting chapter moves the dedup window into Upstash Redis. The dispatcher’s contract does not change — only where
isDuplicatelooks. - A durable channel queue. The background-work chapter moves channel sends behind a Trigger.dev worker, so a slow email provider no longer blocks the action. Again the contract holds: the call site still just
await dispatch(...). - Error and audit discipline. A later security chapter formalizes the channel-failure log line and the audit trail you have been writing.
- Integration tests. The testing chapter writes the integration suite for preferences-respected, default-on, dedup, and channel-independence against a real database.
- Observability. The observability chapter treats
DispatchResultas the structured-log shape, with dashboards on dedup rate, suppression rate, and channel-failure rate built on it.