Connectors · LangGraph
LangGraph Connector
Pause LangGraph runs with interrupt(), route encrypted approvals to every device registered for the user (web, iOS, Android, Telegram), and resume the graph from a Noxy webhook. Built on top of the Python Agent SDK.
What it does
noxy-langgraph is a thin layer over the Noxy Python SDK that turns any LangGraph node into a human-in-the-loop checkpoint. It owns three responsibilities:
- Send — builds an encrypted actionable from your graph state and calls
send_decisionon the Noxy relay. - Suspend — calls LangGraph's
interrupt()so the graph saves its state to the configured checkpointer and returns control to your server. - Resume — when Noxy posts the user's outcome to your webhook, it looks up the paused thread and calls
graph.invoke(Command(resume=…))with the decision in state.
Why a connector? LangGraph already models suspension natively with interrupt(). The connector wires it to Noxy's encrypted multi-device delivery and webhook resume so you do not write a polling loop or invent your own correlation table.
Flow
┌───────────┐ send_decision ┌──────────┐ route ┌──────────────────────┐
│ LangGraph │ ────────────────▶ │ Noxy │ ────────▶ │ Devices (web / iOS / │
│ node │ │ Relay │ │ Android / Telegram) │
└─────┬─────┘ └────┬─────┘ └─────────┬────────────┘
│ interrupt() │ │ Approve / Reject
▼ │ ▼
┌───────────┐ │ webhook (outcome) ┌──────────────┐
│Checkpoint │ ◀──────────────────────┴──────────────────── │ Your server │
│ (state) │ resume_from_webhook() │ FastAPI… │
└───────────┘ ──────────── Command(resume=…) ────────────▶ └──────────────┘
- Your graph reaches the HITL node — the node builds an actionable from current state.
- The connector calls
send_decision; the relay encrypts and fans out to every device for the identity. - The node calls
interrupt(); LangGraph persists the state and returns fromgraph.invokewith an__interrupt__marker. - The user approves or rejects on any device, or the decision TTL expires.
- Noxy posts a JSON payload to your webhook with
decisionId,identityId, andoutcome. - You call
NoxyGraphResumeHandler.resume_from_webhook(payload); the graph re-runs the HITL node, which now returns the decision to downstream state.
Requirements
- Python 3.10 or newer.
- A LangGraph graph compiled with a checkpointer — required for
interrupt()/Command(resume=…). - A Noxy app — see Create App — for the
NOXY_APP_TOKEN. - An identity for your user: phone (E.164), email, your own user id, or wallet address (
0x…). See Identity types.
Installation
# Production
pip install noxy-langgraph
# With the FastAPI webhook example included
pip install "noxy-langgraph[examples]"The package ships with noxy-sdk >= 2.1.0 and langgraph >= 0.2.0 as dependencies.
Quick start
End-to-end: a one-node graph that asks the user to approve a task before continuing.
import uuid
from typing import Optional, TypedDict
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.constants import END, START
from langgraph.graph import StateGraph
from noxy import NoxyConfig, init_noxy_agent_client
from noxy_langgraph import (
NOXY_SENT_DECISION_ID_KEY,
NoxyLangGraphBridge,
build_tool_call_actionable,
)
class State(TypedDict, total=False):
task: str
noxy_decision: Optional[dict]
_noxy_sent_decision_id: Optional[str]
def build_actionable(state: State) -> dict:
return build_tool_call_actionable(
tool="run_task",
args={"task": state["task"]},
title="Approve task?",
summary=state["task"],
)
client = init_noxy_agent_client(
NoxyConfig(
endpoint="https://relay.noxy.network",
auth_token="your-app-token",
decision_ttl_seconds=3600,
)
)
bridge = NoxyLangGraphBridge(client, "user@example.com") # email, phone, user_id, or 0x…
builder = StateGraph(State)
builder.add_node("noxy_hitl", bridge.create_hitl_node(build_actionable))
builder.add_edge(START, "noxy_hitl")
builder.add_edge("noxy_hitl", END)
graph = builder.compile(checkpointer=InMemorySaver())
resume_handler = bridge.create_resume_handler(graph)
config = {"configurable": {"thread_id": str(uuid.uuid4())}}
paused = graph.invoke({"task": "Send 1 wei"}, config)
# paused["__interrupt__"][0].value contains decision_id, identity_id, actionable
# Later, in your webhook handler:
final = resume_handler.resume_from_webhook({
"decisionId": paused["__interrupt__"][0].value["decision_id"],
"identityId": "user@example.com",
"outcome": "approved", # or "rejected" / "expired" / "timeout"
})
print(final["noxy_decision"]) # → {"outcome": "approved", "approved": True, ...}Graph state schema
Two state keys are part of the contract between your graph and the connector:
| Key | Written by | Purpose |
|---|---|---|
noxy_decision (default; configurable) | HITL node, on resume | Carries the human's decision into your graph: {outcome, approved, decision_id, identity_id, received_at}. |
_noxy_sent_decision_id | Resume handler | Marks that the decision was already routed so re-running the HITL node after resume does not double-send. The constant NOXY_SENT_DECISION_ID_KEY exposes the literal. |
Always declare both as optional fields on your TypedDict state:
from noxy_langgraph import NOXY_SENT_DECISION_ID_KEY # "_noxy_sent_decision_id"
class State(TypedDict, total=False):
task: str
noxy_decision: Optional[dict]
_noxy_sent_decision_id: Optional[str]Why this exists. LangGraph re-executes a node from the top after resume. Without the _noxy_sent_decision_id guard the node would call send_decision a second time and use up another quota slot. The connector sets it via Command(update=…) so the second pass skips routing and goes straight to returning the human decision.
Building actionables
The HITL node accepts a build_actionable(state) -> dict callable. Use the helper to build a standard propose_tool_call payload:
from noxy_langgraph import build_tool_call_actionable
def build_actionable(state: State) -> dict:
return build_tool_call_actionable(
tool="transfer_funds",
args={"to": "0x000…dEaD", "amountWei": "1"},
title="Approve transfer of 1 wei?",
summary="Agent wants to send 1 wei to the burn address.",
# extra fields are merged verbatim into the actionable
extra={"chain": "ethereum", "ttlHint": "5m"},
)The shape matches the Noxy decision payload — see Decisions & Lifecycle for the full schema.
Webhook payload
Noxy posts JSON to the webhook URL configured on your app:
| Field | Type | Description |
|---|---|---|
decisionId | str | Decision to resume — snake_case decision_id is also accepted. |
identityId | str | Identity that took the decision (phone, email, user id, or wallet address). |
outcome | str | One of approved, rejected, expired, timeout. |
receivedAt | str | Optional ISO timestamp of when the device submitted the outcome. |
Timeout / expired outcomes
By default an expired or timeout outcome is propagated as a regular decision and your downstream nodes can branch on noxy_decision.outcome. If you want to short-circuit the graph or apply a default action, pass on_timeout:
def on_timeout(state, resume) -> dict:
# Apply a safe default — e.g. mark the action as skipped
return {
"noxy_decision": resume.to_state(),
"status": "timed_out_default",
}
bridge.create_hitl_node(build_actionable, on_timeout=on_timeout)API reference
NoxyLangGraphBridge(client, identity_id, *, registry=None)
One bridge per identity wires the relay client, the in-memory pending-interrupt registry, the HITL node factory, and the resume handler.
| Method | Returns | Purpose |
|---|---|---|
create_hitl_node(build_actionable, *, state_key="noxy_decision", on_timeout=None) | Callable LangGraph node | Build the node you add to your graph. |
create_resume_handler(graph) | NoxyGraphResumeHandler | Resume the compiled graph from webhook callbacks. |
NoxyGraphResumeHandler
| Method | When to use |
|---|---|
resume_from_webhook(payload: dict) -> dict | Sync webhook handler. |
resume_from_webhook_async(payload: dict) -> dict | FastAPI / async handler. On Python 3.10 it falls back to a worker thread because LangGraph's interrupt() uses sync-only contextvars; on 3.11+ it runs natively. |
resume_from_event(event) / resume_from_event_async(event) | Lower-level — use when you have already parsed a NoxyWebhookEvent. |
Helpers and types
| Symbol | Description |
|---|---|
create_noxy_hitl_node(client, identity_id, registry, build_actionable, …) | Lower-level node factory if you prefer not to use the bridge. |
build_tool_call_actionable(tool, args, title, summary, *, kind="propose_tool_call", extra=None) | Build a standard actionable payload. |
parse_webhook_payload(payload) | Validate and parse a raw webhook JSON body into a NoxyWebhookEvent. |
PendingInterrupt / PendingInterruptRegistry | Thread-safe in-memory map of decision_id → thread_id. Replace with your own implementation backed by Redis / Postgres for multi-process deployments. |
NoxyDecisionOutcome / NoxyDecisionResume / NoxyWebhookEvent | Typed wrappers around the webhook payload. |
NOXY_SENT_DECISION_ID_KEY | String constant for the private state key ("_noxy_sent_decision_id"). |
SendDecisionFailedError / UnknownDecisionError | Raised when no delivery returned a decision_id or when a webhook references an unknown decision. |
FastAPI webhook server
The package ships with an end-to-end FastAPI example. Here is the minimum:
import os
import uuid
from typing import Optional, TypedDict
from fastapi import FastAPI, HTTPException
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.constants import END, START
from langgraph.graph import StateGraph
from noxy import NoxyConfig, init_noxy_agent_client
from noxy_langgraph import NoxyLangGraphBridge, build_tool_call_actionable
class State(TypedDict, total=False):
task: str
noxy_decision: Optional[dict]
status: Optional[str]
_noxy_sent_decision_id: Optional[str]
def build_actionable(state: State) -> dict:
return build_tool_call_actionable(
tool="execute_task",
args={"task": state["task"]},
title="Approve agent task?",
summary=f"The agent wants to run: {state['task']}",
)
def on_timeout(_state, resume) -> dict:
return {"noxy_decision": resume.to_state(), "status": "timed_out_default"}
def after_decision(state: State) -> dict:
decision = state.get("noxy_decision") or {}
if decision.get("approved"):
return {"status": "executed"}
if state.get("status") == "timed_out_default":
return {"status": "skipped_after_timeout"}
return {"status": "rejected"}
client = init_noxy_agent_client(
NoxyConfig(
endpoint="https://relay.noxy.network",
auth_token=os.environ["NOXY_APP_TOKEN"],
decision_ttl_seconds=3600,
)
)
bridge = NoxyLangGraphBridge(client, os.environ["NOXY_IDENTITY_ID"])
builder = StateGraph(State)
builder.add_node("noxy_hitl", bridge.create_hitl_node(build_actionable, on_timeout=on_timeout))
builder.add_node("after_decision", after_decision)
builder.add_edge(START, "noxy_hitl")
builder.add_edge("noxy_hitl", "after_decision")
builder.add_edge("after_decision", END)
graph = builder.compile(checkpointer=InMemorySaver())
resume_handler = bridge.create_resume_handler(graph)
app = FastAPI()
@app.post("/runs")
def start_run(body: dict) -> dict:
thread_id = body.get("thread_id") or str(uuid.uuid4())
config = {"configurable": {"thread_id": thread_id}}
state = graph.invoke({"task": body.get("task", "demo task")}, config)
return {"thread_id": thread_id, "state": state}
@app.post("/webhooks/noxy")
async def noxy_webhook(payload: dict) -> dict:
try:
final = await resume_handler.resume_from_webhook_async(payload)
except Exception as exc:
raise HTTPException(status_code=404, detail=str(exc)) from exc
return {"ok": True, "state": final}Run it (after pip install "noxy-langgraph[examples]"):
export NOXY_APP_TOKEN="…"
export NOXY_IDENTITY_ID="user@example.com"
uvicorn examples.webhook_server:app --reloadProduction checklist
- Persistent checkpointer. Replace
InMemorySaverwith the LangGraph Postgres or SQLite checkpointer so paused threads survive restarts. - Persistent registry. The default
PendingInterruptRegistryis in-memory. For multi-process or multi-replica deployments, swap in your own implementation backed by Redis or your database — the contract is the same three methods (register/lookup/pop). - Idempotent webhook handler. Noxy may retry a webhook delivery. The default
pop-then-resume flow is naturally idempotent (the second call returnsUnknownDecisionErrorwhich you can map to HTTP 200/410); if you need stricter guarantees, persist the outcome before resuming. - TTL. Set
decision_ttl_secondsonNoxyConfigto a sensible bound — short for synchronous user prompts, long for asynchronous approval workflows. Always handleexpired/timeoutwithon_timeoutor a downstream branch. - Observability. Log
decision_id+thread_idat routing time and webhook resume so you can correlate a single approval across the agent, relay, and device.
Troubleshooting
| Symptom | Cause & fix |
|---|---|
UnknownDecisionError on webhook | The decision id is not in the registry — either it was already resumed (retry), the registry was lost (process restart with in-memory storage), or the webhook is for a different deployment. Use a persistent registry for production and respond 200 to retried webhooks for unknown ids. |
SendDecisionFailedError when entering the HITL node | The relay returned no successful delivery with a decision_id — usually because the identity has no registered devices. Confirm the user installed a Client SDK and that the identity exactly matches. |
| Graph doesn't pause | The graph was compiled without a checkpointer. interrupt() requires one — pass checkpointer=InMemorySaver() (or Postgres / SQLite) to builder.compile. |
| HITL node re-routes the decision after resume | Your state schema does not declare _noxy_sent_decision_id; LangGraph drops unknown keys from Command(update=…). Add the field to your TypedDict. |
| Async resume hangs on Python 3.10 | LangGraph relies on sync-only contextvars for interrupt() on 3.10. The connector falls back to asyncio.to_thread automatically — make sure your event loop allows new threads. |
Where to next
LangChain Connector
Same flow, exposed as a HumanInTheLoopMiddleware for create_agent.
Python SDK
The underlying Agent SDK if you want to call send_decision directly.
Decisions & Lifecycle
Payload shape, TTL, and outcome semantics that the actionable maps to.
Best Practices
TTLs, idempotency, retries, and quota guidance for production HITL.