Connectors · LangGraph

LangGraph Connector

Pause LangGraph runs with interrupt(), route encrypted approvals to every device registered for the user (web, iOS, Android, Telegram), and resume the graph from a Noxy webhook. Built on top of the Python Agent SDK.

What it does

noxy-langgraph is a thin layer over the Noxy Python SDK that turns any LangGraph node into a human-in-the-loop checkpoint. It owns three responsibilities:

  • Send — builds an encrypted actionable from your graph state and calls send_decision on the Noxy relay.
  • Suspend — calls LangGraph's interrupt() so the graph saves its state to the configured checkpointer and returns control to your server.
  • Resume — when Noxy posts the user's outcome to your webhook, it looks up the paused thread and calls graph.invoke(Command(resume=…)) with the decision in state.

Why a connector? LangGraph already models suspension natively with interrupt(). The connector wires it to Noxy's encrypted multi-device delivery and webhook resume so you do not write a polling loop or invent your own correlation table.

Flow

┌───────────┐   send_decision   ┌──────────┐   route   ┌──────────────────────┐
│ LangGraph │ ────────────────▶ │  Noxy    │ ────────▶ │ Devices (web / iOS / │
│   node    │                   │  Relay   │           │ Android / Telegram)  │
└─────┬─────┘                   └────┬─────┘           └─────────┬────────────┘
      │ interrupt()                  │                           │ Approve / Reject
      ▼                              │                           ▼
┌───────────┐                        │  webhook (outcome)  ┌──────────────┐
│Checkpoint │ ◀──────────────────────┴──────────────────── │  Your server │
│  (state)  │                resume_from_webhook()         │   FastAPI…   │
└───────────┘ ──────────── Command(resume=…) ────────────▶ └──────────────┘
  1. Your graph reaches the HITL node — the node builds an actionable from current state.
  2. The connector calls send_decision; the relay encrypts and fans out to every device for the identity.
  3. The node calls interrupt(); LangGraph persists the state and returns from graph.invoke with an __interrupt__ marker.
  4. The user approves or rejects on any device, or the decision TTL expires.
  5. Noxy posts a JSON payload to your webhook with decisionId, identityId, and outcome.
  6. You call NoxyGraphResumeHandler.resume_from_webhook(payload); the graph re-runs the HITL node, which now returns the decision to downstream state.

Requirements

  • Python 3.10 or newer.
  • A LangGraph graph compiled with a checkpointer — required for interrupt() / Command(resume=…).
  • A Noxy app — see Create App — for the NOXY_APP_TOKEN.
  • An identity for your user: phone (E.164), email, your own user id, or wallet address (0x…). See Identity types.

Installation

# Production
pip install noxy-langgraph

# With the FastAPI webhook example included
pip install "noxy-langgraph[examples]"

The package ships with noxy-sdk >= 2.1.0 and langgraph >= 0.2.0 as dependencies.

Quick start

End-to-end: a one-node graph that asks the user to approve a task before continuing.

import uuid
from typing import Optional, TypedDict

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.constants import END, START
from langgraph.graph import StateGraph
from noxy import NoxyConfig, init_noxy_agent_client

from noxy_langgraph import (
    NOXY_SENT_DECISION_ID_KEY,
    NoxyLangGraphBridge,
    build_tool_call_actionable,
)


class State(TypedDict, total=False):
    task: str
    noxy_decision: Optional[dict]
    _noxy_sent_decision_id: Optional[str]


def build_actionable(state: State) -> dict:
    return build_tool_call_actionable(
        tool="run_task",
        args={"task": state["task"]},
        title="Approve task?",
        summary=state["task"],
    )


client = init_noxy_agent_client(
    NoxyConfig(
        endpoint="https://relay.noxy.network",
        auth_token="your-app-token",
        decision_ttl_seconds=3600,
    )
)
bridge = NoxyLangGraphBridge(client, "user@example.com")  # email, phone, user_id, or 0x…

builder = StateGraph(State)
builder.add_node("noxy_hitl", bridge.create_hitl_node(build_actionable))
builder.add_edge(START, "noxy_hitl")
builder.add_edge("noxy_hitl", END)

graph = builder.compile(checkpointer=InMemorySaver())
resume_handler = bridge.create_resume_handler(graph)

config = {"configurable": {"thread_id": str(uuid.uuid4())}}
paused = graph.invoke({"task": "Send 1 wei"}, config)
# paused["__interrupt__"][0].value contains decision_id, identity_id, actionable

# Later, in your webhook handler:
final = resume_handler.resume_from_webhook({
    "decisionId": paused["__interrupt__"][0].value["decision_id"],
    "identityId": "user@example.com",
    "outcome": "approved",   # or "rejected" / "expired" / "timeout"
})
print(final["noxy_decision"])  # → {"outcome": "approved", "approved": True, ...}

Graph state schema

Two state keys are part of the contract between your graph and the connector:

KeyWritten byPurpose
noxy_decision (default; configurable)HITL node, on resumeCarries the human's decision into your graph: {outcome, approved, decision_id, identity_id, received_at}.
_noxy_sent_decision_idResume handlerMarks that the decision was already routed so re-running the HITL node after resume does not double-send. The constant NOXY_SENT_DECISION_ID_KEY exposes the literal.

Always declare both as optional fields on your TypedDict state:

from noxy_langgraph import NOXY_SENT_DECISION_ID_KEY  # "_noxy_sent_decision_id"

class State(TypedDict, total=False):
    task: str
    noxy_decision: Optional[dict]
    _noxy_sent_decision_id: Optional[str]

Why this exists. LangGraph re-executes a node from the top after resume. Without the _noxy_sent_decision_id guard the node would call send_decision a second time and use up another quota slot. The connector sets it via Command(update=…) so the second pass skips routing and goes straight to returning the human decision.

Building actionables

The HITL node accepts a build_actionable(state) -> dict callable. Use the helper to build a standard propose_tool_call payload:

from noxy_langgraph import build_tool_call_actionable

def build_actionable(state: State) -> dict:
    return build_tool_call_actionable(
        tool="transfer_funds",
        args={"to": "0x000…dEaD", "amountWei": "1"},
        title="Approve transfer of 1 wei?",
        summary="Agent wants to send 1 wei to the burn address.",
        # extra fields are merged verbatim into the actionable
        extra={"chain": "ethereum", "ttlHint": "5m"},
    )

The shape matches the Noxy decision payload — see Decisions & Lifecycle for the full schema.

Webhook payload

Noxy posts JSON to the webhook URL configured on your app:

FieldTypeDescription
decisionIdstrDecision to resume — snake_case decision_id is also accepted.
identityIdstrIdentity that took the decision (phone, email, user id, or wallet address).
outcomestrOne of approved, rejected, expired, timeout.
receivedAtstrOptional ISO timestamp of when the device submitted the outcome.

Timeout / expired outcomes

By default an expired or timeout outcome is propagated as a regular decision and your downstream nodes can branch on noxy_decision.outcome. If you want to short-circuit the graph or apply a default action, pass on_timeout:

def on_timeout(state, resume) -> dict:
    # Apply a safe default — e.g. mark the action as skipped
    return {
        "noxy_decision": resume.to_state(),
        "status": "timed_out_default",
    }

bridge.create_hitl_node(build_actionable, on_timeout=on_timeout)

API reference

NoxyLangGraphBridge(client, identity_id, *, registry=None)

One bridge per identity wires the relay client, the in-memory pending-interrupt registry, the HITL node factory, and the resume handler.

MethodReturnsPurpose
create_hitl_node(build_actionable, *, state_key="noxy_decision", on_timeout=None)Callable LangGraph nodeBuild the node you add to your graph.
create_resume_handler(graph)NoxyGraphResumeHandlerResume the compiled graph from webhook callbacks.

NoxyGraphResumeHandler

MethodWhen to use
resume_from_webhook(payload: dict) -> dictSync webhook handler.
resume_from_webhook_async(payload: dict) -> dictFastAPI / async handler. On Python 3.10 it falls back to a worker thread because LangGraph's interrupt() uses sync-only contextvars; on 3.11+ it runs natively.
resume_from_event(event) / resume_from_event_async(event)Lower-level — use when you have already parsed a NoxyWebhookEvent.

Helpers and types

SymbolDescription
create_noxy_hitl_node(client, identity_id, registry, build_actionable, …)Lower-level node factory if you prefer not to use the bridge.
build_tool_call_actionable(tool, args, title, summary, *, kind="propose_tool_call", extra=None)Build a standard actionable payload.
parse_webhook_payload(payload)Validate and parse a raw webhook JSON body into a NoxyWebhookEvent.
PendingInterrupt / PendingInterruptRegistryThread-safe in-memory map of decision_id → thread_id. Replace with your own implementation backed by Redis / Postgres for multi-process deployments.
NoxyDecisionOutcome / NoxyDecisionResume / NoxyWebhookEventTyped wrappers around the webhook payload.
NOXY_SENT_DECISION_ID_KEYString constant for the private state key ("_noxy_sent_decision_id").
SendDecisionFailedError / UnknownDecisionErrorRaised when no delivery returned a decision_id or when a webhook references an unknown decision.

FastAPI webhook server

The package ships with an end-to-end FastAPI example. Here is the minimum:

import os
import uuid
from typing import Optional, TypedDict

from fastapi import FastAPI, HTTPException
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.constants import END, START
from langgraph.graph import StateGraph
from noxy import NoxyConfig, init_noxy_agent_client

from noxy_langgraph import NoxyLangGraphBridge, build_tool_call_actionable


class State(TypedDict, total=False):
    task: str
    noxy_decision: Optional[dict]
    status: Optional[str]
    _noxy_sent_decision_id: Optional[str]


def build_actionable(state: State) -> dict:
    return build_tool_call_actionable(
        tool="execute_task",
        args={"task": state["task"]},
        title="Approve agent task?",
        summary=f"The agent wants to run: {state['task']}",
    )


def on_timeout(_state, resume) -> dict:
    return {"noxy_decision": resume.to_state(), "status": "timed_out_default"}


def after_decision(state: State) -> dict:
    decision = state.get("noxy_decision") or {}
    if decision.get("approved"):
        return {"status": "executed"}
    if state.get("status") == "timed_out_default":
        return {"status": "skipped_after_timeout"}
    return {"status": "rejected"}


client = init_noxy_agent_client(
    NoxyConfig(
        endpoint="https://relay.noxy.network",
        auth_token=os.environ["NOXY_APP_TOKEN"],
        decision_ttl_seconds=3600,
    )
)
bridge = NoxyLangGraphBridge(client, os.environ["NOXY_IDENTITY_ID"])

builder = StateGraph(State)
builder.add_node("noxy_hitl", bridge.create_hitl_node(build_actionable, on_timeout=on_timeout))
builder.add_node("after_decision", after_decision)
builder.add_edge(START, "noxy_hitl")
builder.add_edge("noxy_hitl", "after_decision")
builder.add_edge("after_decision", END)

graph = builder.compile(checkpointer=InMemorySaver())
resume_handler = bridge.create_resume_handler(graph)

app = FastAPI()


@app.post("/runs")
def start_run(body: dict) -> dict:
    thread_id = body.get("thread_id") or str(uuid.uuid4())
    config = {"configurable": {"thread_id": thread_id}}
    state = graph.invoke({"task": body.get("task", "demo task")}, config)
    return {"thread_id": thread_id, "state": state}


@app.post("/webhooks/noxy")
async def noxy_webhook(payload: dict) -> dict:
    try:
        final = await resume_handler.resume_from_webhook_async(payload)
    except Exception as exc:
        raise HTTPException(status_code=404, detail=str(exc)) from exc
    return {"ok": True, "state": final}

Run it (after pip install "noxy-langgraph[examples]"):

export NOXY_APP_TOKEN="…"
export NOXY_IDENTITY_ID="user@example.com"
uvicorn examples.webhook_server:app --reload

Production checklist

  • Persistent checkpointer. Replace InMemorySaver with the LangGraph Postgres or SQLite checkpointer so paused threads survive restarts.
  • Persistent registry. The default PendingInterruptRegistry is in-memory. For multi-process or multi-replica deployments, swap in your own implementation backed by Redis or your database — the contract is the same three methods (register / lookup / pop).
  • Idempotent webhook handler. Noxy may retry a webhook delivery. The default pop-then-resume flow is naturally idempotent (the second call returns UnknownDecisionError which you can map to HTTP 200/410); if you need stricter guarantees, persist the outcome before resuming.
  • TTL. Set decision_ttl_seconds on NoxyConfig to a sensible bound — short for synchronous user prompts, long for asynchronous approval workflows. Always handle expired/timeout with on_timeout or a downstream branch.
  • Observability. Log decision_id + thread_id at routing time and webhook resume so you can correlate a single approval across the agent, relay, and device.

Troubleshooting

SymptomCause & fix
UnknownDecisionError on webhookThe decision id is not in the registry — either it was already resumed (retry), the registry was lost (process restart with in-memory storage), or the webhook is for a different deployment. Use a persistent registry for production and respond 200 to retried webhooks for unknown ids.
SendDecisionFailedError when entering the HITL nodeThe relay returned no successful delivery with a decision_id — usually because the identity has no registered devices. Confirm the user installed a Client SDK and that the identity exactly matches.
Graph doesn't pauseThe graph was compiled without a checkpointer. interrupt() requires one — pass checkpointer=InMemorySaver() (or Postgres / SQLite) to builder.compile.
HITL node re-routes the decision after resumeYour state schema does not declare _noxy_sent_decision_id; LangGraph drops unknown keys from Command(update=…). Add the field to your TypedDict.
Async resume hangs on Python 3.10LangGraph relies on sync-only contextvars for interrupt() on 3.10. The connector falls back to asyncio.to_thread automatically — make sure your event loop allows new threads.

Where to next