Skip to content
Search ESC

HITL Engineering Patterns: Implementing LangGraph Interrupts for Production Approval Workflows

2026-03-20 · 18 min read · Igor Bobriakov
TL;DR
  • LangGraph's runtime interrupt() function (v0.2.x+) pauses graph execution inside any node with less than 5ms overhead, persisting full agent state to a configurable checkpointer. The older compile-time interrupt_before/interrupt_after hooks are still supported but offer less flexibility.
  • Redis-backed LangGraph checkpointing (RedisSaver) adds approximately 40ms per state transition at 100 concurrent agents on a 3-node Redis cluster -- acceptable for most approval workflows where human latency dominates.
  • A single LangGraph interrupt without a persistent checkpointer loses all agent state on process restart; always pair interrupts with a durable backend (Redis, Postgres, or SQLite for dev).
  • Implementing async approval via a FastAPI webhook + Redis pub/sub allows approval latency measured in minutes (human time) without holding a Python thread, scaling to thousands of pending approvals.
  • Multi-tier escalation -- auto-approve below threshold, human approval in mid-range, dual-approval above threshold -- reduces human review volume by 60-80% in typical financial workflow deployments.
  • LangGraph's thread_id + checkpoint_id pair is the canonical resumption key; losing either one makes a paused workflow unrecoverable without replaying from the last durable snapshot.
  • The interrupt pattern adds 1-2 extra state writes per approval gate; at 10,000 daily approvals this totals roughly 20,000 checkpoint writes -- trivial for Postgres but implement a retention policy (e.g., purge completed checkpoints older than 30 days) to prevent unbounded table growth.
  • Timeout handling for unresponded approvals must be implemented at the orchestration layer, not inside the graph; LangGraph has no built-in interrupt TTL as of v0.2.x.

Every autonomous agent eventually needs to do something irreversible — transfer funds, send an email to 50,000 users, delete a production database record, submit a regulatory filing. At that boundary, the engineering question is not philosophical (“should AI be trusted?”) but purely architectural: how do you suspend a stateful, multi-step agent mid-execution, hand control to a human, and resume from exactly the right point after a decision is made — potentially hours or days later, across process restarts and deployments?

Most teams discover this problem the hard way. They wire up a simple input() call or a blocking HTTP request, which works in a Jupyter notebook and falls apart completely under any real concurrency or failure scenario. The agent state lives in memory, the process restarts overnight, and now you have thousands of paused workflows that can never be resumed. Or they poll a database from inside the graph loop, which inverts the control flow and makes the graph impossible to test in isolation.

LangGraph’s interrupt mechanism — combined with a durable checkpointer backend — is the correct architectural answer to this problem. But “correct” does not mean “simple.” Getting it right in production requires understanding how interrupts interact with the checkpoint lifecycle, how to design async approval APIs that don’t block your thread pool, and how to handle the edge cases LangGraph’s documentation glosses over: timeouts, rejected approvals, multi-party sign-offs, and audit trails. This article covers all of it.

High-level HITL approval workflow with LangGraph interrupt nodes and async approval service

Diagram 1: High-level HITL approval workflow — agent execution pauses at interrupt nodes, state is persisted to a durable checkpointer, and an async approval service resumes the graph after human decision.

Understanding the LangGraph Interrupt Primitive

LangGraph v0.2.x introduced the runtime interrupt() function, which is the primary mechanism for pausing graph execution in modern LangGraph applications. You call interrupt() inside any node function to pause the graph at that point, passing a payload that describes what the human should review. The return value of interrupt() is the resume payload injected by the caller when they resume the graph.

This replaced the older compile-time pattern of passing interrupt_before and interrupt_after lists to graph.compile(). The compile-time hooks still work and are useful for simple cases where you want to pause at a node boundary without modifying the node code itself, but the runtime interrupt() function is more flexible: you can conditionally interrupt based on state, pass structured review payloads, and place the interrupt at any point within a node’s logic rather than only at node boundaries.

The behavioral difference between interrupting before vs. after a node’s work is critical for approval workflow design:

  • Interrupt before execution — pauses before the critical action runs. The human sees the planned inputs and can modify or veto before anything happens. This is the correct choice for destructive or irreversible operations.
  • Interrupt after execution — pauses after the node runs. The human reviews the output before it propagates downstream. Use this for content moderation, response review, or QA gates on LLM-generated text.

When interrupt() is called, the graph serializes the current state to the configured checkpointer and returns control to the caller. The caller receives the interrupt payload rather than a final answer. A LangGraph interrupt without a durable checkpointer is not a production pattern — it is a prototype that will lose all agent state on any process restart or pod eviction.

The resumption contract is simple: call graph.ainvoke(Command(resume=payload), config={"configurable": {"thread_id": tid}}) with the same thread_id. LangGraph loads the latest checkpoint for that thread, injects the resume payload as the return value of the interrupt() call, and continues execution from where it paused. The thread_id is your recovery key — treat it like a distributed transaction ID.

Checkpoint Backend Selection and Configuration

The checkpointer is the persistence layer that makes interrupts durable. LangGraph ships with three first-party options and a clean interface for custom backends:

BackendClassTypical LatencyBest ForLimitations
SQLiteSqliteSaver~2-5msLocal dev, unit testsSingle-process only, no horizontal scale
PostgreSQLPostgresSaver (langgraph-checkpoint-postgres)~15-30msEnterprise, audit trail, ACID complianceRequires connection pooling at scale
RedisRedisSaver (langgraph-checkpoint-redis)~5-15msHigh-throughput, low-latency workflowsRequires persistence config (AOF/RDB); no native SQL queries
CustomSubclass BaseCheckpointSaverVariesDynamoDB, MongoDB, S3-backed workflowsEngineering overhead; must implement get/put/list correctly

For most production approval workflows, PostgreSQL is the right default. You get ACID guarantees, native querying of pending approvals (crucial for building admin dashboards), and the checkpoint records double as an audit log with no extra engineering. Redis is the correct choice if your approval volume exceeds ~5,000/hour and checkpoint write latency is in your critical path — but you must configure Redis persistence (appendonly yes + AOF rewrite) or you trade durability for speed.

LangGraph checkpointer architecture showing Redis, Postgres, and SQLite backends with thread_id resumption key

Diagram 2: Durable checkpoint architecture — thread_id and checkpoint_id form the resumption key. Choose backend based on throughput, durability, and query requirements.

Core Implementation: Building the Interrupt-Enabled Graph

The following example implements a financial transaction approval agent. It plans a wire transfer, pauses for human review via interrupt(), and executes only after an explicit approval signal. Note the separation between graph definition and runtime configuration — this is essential for testability.

approval_graph.py
# Production HITL approval workflow using LangGraph interrupt() + PostgreSQL checkpointer
import operator
from typing import Annotated, TypedDict, Literal
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from langgraph.types import Command, interrupt
from psycopg_pool import AsyncConnectionPool
import logging
logger = logging.getLogger(__name__)
# -- State Definition --------------------------------------------------------
class TransactionState(TypedDict):
"""
Full agent state -- serialized entirely to the checkpointer at each interrupt.
Keep this JSON-serializable; avoid storing live DB connections or file handles.
"""
thread_id: str
transaction_id: str
amount: float
destination_account: str
currency: str
risk_score: float # 0.0 (low) to 1.0 (high)
planned_action: dict # populated by plan_transfer node
approval_status: Literal["pending", "approved", "rejected", "timeout"] | None
reviewer_id: str | None # who approved/rejected
reviewer_comment: str | None
executed: bool
messages: Annotated[list, operator.add] # accumulated audit trail
# -- Node: Plan the Transfer --------------------------------------------------
def plan_transfer(state: TransactionState) -> dict:
"""
Constructs the planned action payload. In production, this node would call
your LLM to synthesize transfer parameters from unstructured input.
"""
planned = {
"action": "wire_transfer",
"amount": state["amount"],
"currency": state["currency"],
"destination": state["destination_account"],
"fee_estimate": round(state["amount"] * 0.001, 2),
"estimated_arrival": "2 business days",
}
logger.info(f"[{state['thread_id']}] Planned transfer: {planned}")
return {
"planned_action": planned,
"messages": [{"role": "agent", "content": f"Planned transfer: {planned}"}],
}
# -- Node: Request Human Approval (interrupt point) --------------------------
def request_approval(state: TransactionState) -> dict:
"""
This node uses interrupt() to pause execution.
The caller receives the current state snapshot; execution halts here
until graph.ainvoke(Command(resume=payload), config=...) is called.
Note: interrupt() MUST be called inside a node function, not at graph level.
The return value of interrupt() is the resume payload injected by the caller.
"""
logger.info(f"[{state['thread_id']}] Interrupting for human approval...")
# interrupt() pauses execution and returns the resume payload when resumed
decision = interrupt({
"message": "Please review and approve or reject this transaction.",
"planned_action": state["planned_action"],
"risk_score": state["risk_score"],
"transaction_id": state["transaction_id"],
})
# Code below runs AFTER the graph is resumed with a decision
logger.info(f"[{state['thread_id']}] Received decision: {decision}")
return {
"approval_status": decision.get("status"), # "approved" | "rejected"
"reviewer_id": decision.get("reviewer_id"),
"reviewer_comment": decision.get("comment", ""),
"messages": [{"role": "reviewer", "content": str(decision)}],
}
# -- Node: Execute the Transfer -----------------------------------------------
def execute_transfer(state: TransactionState) -> dict:
"""Only reached after explicit approval."""
logger.info(f"[{state['thread_id']}] Executing approved transfer.")
# In production: call your payments API here
return {
"executed": True,
"messages": [{"role": "system", "content": "Transfer executed successfully."}],
}
# -- Node: Handle Rejection ---------------------------------------------------
def handle_rejection(state: TransactionState) -> dict:
logger.warning(f"[{state['thread_id']}] Transfer rejected by {state['reviewer_id']}.")
return {
"executed": False,
"messages": [{"role": "system", "content": f"Transfer rejected: {state['reviewer_comment']}"}],
}
# -- Conditional Edge: Route on Approval Status --------------------------------
def route_on_decision(state: TransactionState) -> Literal["execute_transfer", "handle_rejection"]:
if state.get("approval_status") == "approved":
return "execute_transfer"
return "handle_rejection"
# -- Graph Assembly ------------------------------------------------------------
def build_approval_graph(checkpointer) -> "CompiledGraph":
builder = StateGraph(TransactionState)
builder.add_node("plan_transfer", plan_transfer)
builder.add_node("request_approval", request_approval)
builder.add_node("execute_transfer", execute_transfer)
builder.add_node("handle_rejection", handle_rejection)
builder.set_entry_point("plan_transfer")
builder.add_edge("plan_transfer", "request_approval")
builder.add_conditional_edges("request_approval", route_on_decision)
builder.add_edge("execute_transfer", END)
builder.add_edge("handle_rejection", END)
# Compile with durable checkpointer -- this is what makes interrupts production-safe
return builder.compile(checkpointer=checkpointer)
# -- Checkpointer Factory (async connection pool for production) ---------------
async def get_postgres_checkpointer(dsn: str) -> AsyncPostgresSaver:
"""
Returns an AsyncPostgresSaver backed by a connection pool.
A single synchronous psycopg.connect() will block the event loop
and cannot handle concurrent checkpoint reads/writes in async FastAPI.
Always use AsyncConnectionPool for production deployments.
Call checkpointer.setup() once on first deployment to create the
required langgraph_checkpoints table.
"""
pool = AsyncConnectionPool(conninfo=dsn, min_size=2, max_size=10)
await pool.open()
checkpointer = AsyncPostgresSaver(pool)
# await checkpointer.setup() # Uncomment on first run to create schema
return checkpointer

Pro Tip — Keep State JSON-Serializable: LangGraph checkpointers serialize state via msgpack or JSON depending on backend. Any non-serializable object in your TypedDict — a live DB connection, a file handle, a datetime without proper encoding — will cause a silent serialization error at the interrupt boundary. Audit your state schema before wiring in a production checkpointer. Use str for timestamps (ISO 8601), dict for structured payloads, and list[str] for message histories.

The Async Approval API: Decoupling Human Latency from Thread Pools

The most common production mistake after getting interrupts working locally is implementing approval as a blocking synchronous call — an HTTP endpoint that waits for the human to respond before returning. This works for demos. In production, it ties up a thread (or an async task) for however long it takes a human to notice a Slack notification, which could be minutes, hours, or days. You will exhaust your thread pool long before your SLA expires.

The correct async HITL pattern is fire-and-forget on the initiation side: start the graph, let the interrupt pause it, return a 202 Accepted with the thread_id, and implement resumption as a completely separate API endpoint called by the approval UI or webhook.

approval_api.py
# FastAPI service for async HITL approval -- initiation + resumption as separate endpoints
import uuid
import asyncio
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from langgraph.types import Command
from approval_graph import build_approval_graph, get_postgres_checkpointer, TransactionState
# -- Lifespan: initialize async checkpointer with connection pool --------------
@asynccontextmanager
async def lifespan(app: FastAPI):
dsn = "postgresql://user:pass@localhost:5432/approvals"
app.state.checkpointer = await get_postgres_checkpointer(dsn)
app.state.graph = build_approval_graph(app.state.checkpointer)
yield
# Pool cleanup handled by AsyncConnectionPool context
app = FastAPI(title="HITL Approval Service", lifespan=lifespan)
# -- Request / Response Models -------------------------------------------------
class TransactionRequest(BaseModel):
amount: float
destination_account: str
currency: str = "USD"
risk_score: float = 0.5
class ApprovalDecision(BaseModel):
thread_id: str
status: str # "approved" | "rejected"
reviewer_id: str
comment: str = ""
# -- Endpoint 1: Initiate Transaction (fire-and-forget) -----------------------
@app.post("/transactions", status_code=202)
async def initiate_transaction(
req: TransactionRequest,
background_tasks: BackgroundTasks,
):
"""
Kicks off the approval graph in a background task.
Returns immediately with a thread_id for polling or webhook callbacks.
The graph will pause at the interrupt node; no thread is held waiting.
"""
thread_id = str(uuid.uuid4())
transaction_id = str(uuid.uuid4())
initial_state: TransactionState = {
"thread_id": thread_id,
"transaction_id": transaction_id,
"amount": req.amount,
"destination_account": req.destination_account,
"currency": req.currency,
"risk_score": req.risk_score,
"planned_action": {},
"approval_status": None,
"reviewer_id": None,
"reviewer_comment": None,
"executed": False,
"messages": [],
}
config = {"configurable": {"thread_id": thread_id}}
graph = app.state.graph
# Run graph in background -- it will pause at interrupt and return
background_tasks.add_task(_run_graph_until_interrupt, graph, initial_state, config)
return {
"thread_id": thread_id,
"transaction_id": transaction_id,
"status": "pending_approval",
"message": "Transaction queued for human review.",
}
async def _run_graph_until_interrupt(graph, state: TransactionState, config: dict):
"""
Runs the graph until it hits the interrupt. The interrupt pauses execution
and serializes state to Postgres. This coroutine then exits cleanly.
"""
try:
# graph.ainvoke returns when it hits an interrupt or reaches END
result = await graph.ainvoke(state, config=config)
# If we reach here without interrupt, the graph completed (e.g., auto-approved path)
except Exception as e:
import logging
logging.getLogger(__name__).error(f"Graph error for {config}: {e}", exc_info=True)
# -- Endpoint 2: Submit Approval Decision (resumes the paused graph) -----------
@app.post("/approvals")
async def submit_approval(decision: ApprovalDecision):
"""
Called by the approval UI, Slack bot, or email webhook after human review.
Resumes the interrupted graph with the decision payload.
"""
graph = app.state.graph
config = {"configurable": {"thread_id": decision.thread_id}}
# Verify the thread exists and is in interrupted state
snapshot = await graph.aget_state(config)
if snapshot is None:
raise HTTPException(status_code=404, detail="Thread not found.")
if not snapshot.next:
raise HTTPException(status_code=409, detail="Thread is not awaiting approval.")
# Resume the graph with the human decision
resume_payload = {
"status": decision.status,
"reviewer_id": decision.reviewer_id,
"comment": decision.comment,
}
result = await graph.ainvoke(
Command(resume=resume_payload),
config=config,
)
return {
"thread_id": decision.thread_id,
"final_status": result.get("approval_status"),
"executed": result.get("executed"),
}
# -- Endpoint 3: Check Pending Approval State ---------------------------------
@app.get("/transactions/{thread_id}/state")
async def get_transaction_state(thread_id: str):
"""
Returns current state of a transaction -- useful for approval UI polling
and for debugging stuck workflows.
"""
graph = app.state.graph
config = {"configurable": {"thread_id": thread_id}}
snapshot = await graph.aget_state(config)
if snapshot is None:
raise HTTPException(status_code=404, detail="Thread not found.")
return {
"thread_id": thread_id,
"next_nodes": snapshot.next,
"state": snapshot.values,
"is_interrupted": bool(snapshot.next),
}

This pattern means your web server handles the approval decision in milliseconds (load checkpoint, inject payload, run remaining nodes, return). The human latency — which is the bottleneck in any approval workflow — is completely outside your application’s runtime. You can scale the approval worker pool independently from the API layer, and a pod restart between submission and approval loses nothing because the full state is in Postgres.

Multi-Tier Escalation: Routing by Risk Score

Not every action needs the same level of human oversight. Applying a mandatory interrupt to every LLM output is the fastest way to make your agent unusable and your reviewers burned out. In production financial and compliance workflows, a three-tier routing model — auto-approve at high confidence, single-reviewer in the mid-range, dual-approval below threshold — reduces human review volume by 60-80% while maintaining full coverage of high-risk actions.

Multi-tier escalation logic routing agent decisions by confidence score to auto-approve, single reviewer, or dual approval paths

Diagram 3: Multi-tier escalation — confidence score routing determines whether an action is auto-approved, sent to a single reviewer, or requires dual sign-off before execution.

The routing logic is a conditional edge in LangGraph, not a node. This keeps it outside the interrupt boundary and ensures the routing decision itself is not subject to human review (which would be circular). Implement the escalation tiers as a pure function that returns a node name string:

escalation_router.py
# Multi-tier escalation routing for LangGraph approval workflows
from typing import Literal
from langgraph.types import interrupt
# Configurable thresholds -- externalize to environment config or feature flags
AUTO_APPROVE_THRESHOLD = 0.95 # score >= 0.95: no human required
SINGLE_REVIEW_THRESHOLD = 0.70 # 0.70 <= score < 0.95: one reviewer
# Below 0.70: dual approval required
ESCALATION_CONFIG = {
"high_value_override": 50_000, # Force dual approval above this USD amount
"new_vendor_override": True, # Force single review for first-time destinations
}
def escalation_router(state: dict) -> Literal[
"auto_approve", "single_reviewer_interrupt", "dual_approval_interrupt"
]:
"""
Conditional edge function -- determines which interrupt path to take
based on risk score and override conditions.
This runs BEFORE any interrupt fires, so it adds zero latency to
workflows that don't require human review.
"""
score = state.get("risk_score", 0.5)
amount = state.get("amount", 0)
# Hard overrides take precedence over score-based routing
if amount >= ESCALATION_CONFIG["high_value_override"]:
return "dual_approval_interrupt"
# Score-based routing
if score >= AUTO_APPROVE_THRESHOLD:
return "auto_approve"
elif score >= SINGLE_REVIEW_THRESHOLD:
return "single_reviewer_interrupt"
else:
return "dual_approval_interrupt"
# -- Dual Approval Node -------------------------------------------------------
def dual_approval_interrupt(state: dict) -> dict:
"""
Requires two separate approvals. First interrupt collects primary reviewer.
The node is called twice -- second call has primary approval in state already.
"""
primary_approval = state.get("primary_approval")
if primary_approval is None:
# First interrupt: request primary reviewer decision
decision = interrupt({
"message": "HIGH RISK: Primary approval required.",
"planned_action": state["planned_action"],
"risk_score": state["risk_score"],
"approval_round": "primary",
})
return {"primary_approval": decision}
else:
# Second interrupt: request secondary reviewer decision
decision = interrupt({
"message": "HIGH RISK: Secondary approval required.",
"planned_action": state["planned_action"],
"risk_score": state["risk_score"],
"approval_round": "secondary",
"primary_reviewer": primary_approval.get("reviewer_id"),
})
# Both approvals collected; determine final status
both_approved = (
primary_approval.get("status") == "approved"
and decision.get("status") == "approved"
)
return {
"secondary_approval": decision,
"approval_status": "approved" if both_approved else "rejected",
}

Production Trade-offs and Operational Patterns

Timeout Handling

LangGraph v0.2.x has no built-in interrupt TTL. If a reviewer never responds, the workflow sits in interrupted state indefinitely. The standard pattern is a background scheduler (APScheduler, Celery beat, or a Temporal workflow) that queries your Postgres checkpoint table for rows where created_at < NOW() - INTERVAL '24 hours' and next_node IS NOT NULL, then resumes them with a {"status": "timeout"} payload. Store the interrupt timestamp in your state object so the timeout logic has something to query against.

Checkpoint Retention and Cleanup

The interrupt pattern adds 1-2 extra state writes per approval gate. At 10,000 daily approvals, this totals roughly 20,000 checkpoint writes per day — a trivial load for PostgreSQL. The real operational concern is unbounded table growth over months. Implement a retention policy: purge checkpoint records for completed workflows older than your audit retention window (typically 30-90 days for financial workflows). A nightly DELETE FROM checkpoints WHERE status = 'completed' AND created_at < NOW() - INTERVAL '90 days' keeps your checkpoint table from growing into the hundreds of gigabytes over a year of production traffic.

Idempotency on Resume

Network retries mean your approval endpoint may receive the same decision twice. LangGraph’s checkpointer handles this gracefully at the state level — resuming an already-completed thread returns the final state without re-executing nodes — but your downstream action (the payments API call) must also be idempotent. Use a stable transaction_id as an idempotency key on your payments provider and check state["executed"] before calling the external service.

Observability

Every interrupt event should emit a structured log and a metric. At minimum: approval.queued, approval.received, approval.latency_seconds (histogram), and approval.timeout. Wire these to LangSmith for trace-level visibility into which node triggered the interrupt and what state was serialized. LangSmith’s trace UI will show you the full graph execution timeline including the pause gap, which is invaluable for debugging stuck workflows and for compliance audits.

Pro Tip — Approval Audit Trail via State Accumulator: Use an Annotated[list, operator.add] field (like the messages field in the example above) as your audit log. Every node appends an event dict rather than overwriting. When the workflow completes, this field contains a full, ordered, immutable record of every action and decision — including who approved what and when. This is your compliance log, and it’s already in the checkpoint store without any extra infrastructure.

Testing HITL Graphs

Testing interrupt-based graphs requires simulating the pause/resume cycle. Use SqliteSaver in-memory for unit tests, and assert on the snapshot.next field to verify the graph paused at the expected node. For integration tests, run the full async cycle: initiate, assert interrupted, submit decision, assert final state. Never mock the checkpointer in integration tests — serialization bugs only appear when you actually write and read from a real backend.

test_approval_graph.py
# Unit test pattern for LangGraph interrupt-based workflows
import pytest
import asyncio
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
from langgraph.types import Command
from approval_graph import build_approval_graph, TransactionState
@pytest.mark.asyncio
async def test_approval_interrupt_pauses_at_correct_node():
"""Verify the graph pauses at request_approval before executing transfer."""
async with AsyncSqliteSaver.from_conn_string(":memory:") as checkpointer:
graph = build_approval_graph(checkpointer)
initial_state: TransactionState = {
"thread_id": "test-thread-001",
"transaction_id": "txn-001",
"amount": 5000.0,
"destination_account": "ACCT-9999",
"currency": "USD",
"risk_score": 0.8,
"planned_action": {},
"approval_status": None,
"reviewer_id": None,
"reviewer_comment": None,
"executed": False,
"messages": [],
}
config = {"configurable": {"thread_id": "test-thread-001"}}
# Step 1: Run until interrupt
await graph.ainvoke(initial_state, config=config)
# Step 2: Verify graph is paused at request_approval
snapshot = await graph.aget_state(config)
assert snapshot.next == ("request_approval",), (
f"Expected interrupt at request_approval, got: {snapshot.next}"
)
assert snapshot.values["planned_action"] != {}, "planned_action should be populated"
# Step 3: Resume with approval decision
result = await graph.ainvoke(
Command(resume={"status": "approved", "reviewer_id": "reviewer-007", "comment": "LGTM"}),
config=config,
)
# Step 4: Verify execution completed
assert result["executed"] is True
assert result["approval_status"] == "approved"
assert result["reviewer_id"] == "reviewer-007"

Frequently Asked Questions

What is a LangGraph interrupt and how does it work?

A LangGraph interrupt pauses graph execution at a specific point within a node function. The agent’s full state — including memory, accumulated messages, and tool call history — is serialized to the configured checkpointer. Execution resumes only when an external caller invokes graph.ainvoke() with the same thread_id and a Command(resume=...) payload. This is the primary mechanism for implementing Human-in-the-Loop approval gates in LangGraph.

How do you persist LangGraph state across an interrupt so it survives process restarts?

You must configure a durable checkpointer when building the graph — either SqliteSaver for development or RedisSaver/PostgresSaver for production. Pass the checkpointer to graph.compile(checkpointer=your_checkpointer). Without this, state exists only in memory and is lost on any process restart, making interrupt-based workflows unreliable in production.

Can LangGraph interrupts handle asynchronous human approval (e.g., email or Slack approvals)?

Yes. The correct pattern is to pause the graph via an interrupt, persist state to a durable checkpointer, and return immediately from the API layer with a 202 Accepted and the thread_id. A separate async approval service (webhook, Slack bot, email handler) receives the human decision and calls graph.ainvoke(Command(resume=payload), config=...) to resume. This decouples human latency — which can be minutes to hours — from the application’s thread pool entirely.

What happens if a LangGraph interrupt times out and the human never responds?

LangGraph v0.2.x has no built-in TTL for interrupted workflows. You must implement timeout logic at the orchestration layer — typically a background scheduler (APScheduler, Celery beat, or a Temporal workflow) that queries for stale pending approvals and either auto-escalates, auto-rejects, or resumes the graph with a timeout signal payload. Storing the interrupt timestamp in the state object is the recommended pattern for making these queries efficient.

How does interrupt_before differ from interrupt_after in LangGraph?

interrupt_before pauses execution before the specified node runs, giving the human reviewer the ability to inspect the planned action and either approve or modify inputs before the node executes. interrupt_after pauses after the node has already run, useful for reviewing tool outputs or LLM responses before they propagate downstream. For approval workflows that gate destructive or irreversible actions, interrupt_before is almost always the correct choice. Note that these are compile-time hooks; the newer runtime interrupt() function provides equivalent control with more flexibility.

Further Reading

Building AI Agents That Need Human Oversight in Production?

Wiring LangGraph interrupts to a durable checkpointer, building async approval APIs, and designing multi-tier escalation logic that actually holds up under concurrent load and process restarts is non-trivial engineering. The gap between a working demo and a production-grade HITL system involves checkpoint backend selection, idempotency guarantees, timeout orchestration, audit trail design, and observability — all of which require careful architectural decisions before the first line of application code is written.

At ActiveWizards, our AI Agent Engineering team has built production HITL workflows for financial services, compliance, and healthcare — handling thousands of daily approvals with full audit trails, sub-200ms resumption latency, and zero state loss across deployments. We specialize in the hard parts: multi-agent orchestration, durable state management, and enterprise-ready agentic infrastructure.

Talk to Our AI Agent Engineering Team

Production Deployment

Deploy this architecture

Submit system context, constraints, and delivery pressure. A Principal Engineer reviews every submission and recommends the right next step.

[ SUBMIT SPECS ]

No SDRs. A Principal Engineer reviews every submission.

About the author

Igor Bobriakov

AI Architect. Author of Production-Ready AI Agents. 15 years deploying production AI platforms and agentic systems for enterprise clients and deep-tech startups.