from __future__ import annotations import json import os from pathlib import Path from typing import Any from langgraph.checkpoint.memory import InMemorySaver from langgraph.graph import END, START, StateGraph from langgraph.runtime import Runtime from langgraph.store.memory import InMemoryStore from langgraph.types import Command from typing_extensions import TypedDict from verifiedx import BoundaryDeniedError, init_verifiedx, install_langgraph ROOT = Path(__file__).resolve().parent ARTIFACTS = ROOT / "artifacts" ARTIFACTS.mkdir(parents=True, exist_ok=True) class State(TypedDict, total=False): request: str case_id: str patient_id: str cpt_code: str document_type: str extracted_order_status: str emr_authorization_status: str attempted_emr_status: str class Context(TypedDict): operator: str STATE: dict[str, Any] = { "document_capture_lookups": [], "document_capture_updates": [], "attempted_emr_updates": [], } def route(state: State): text = str(state.get("request") or "").lower() if "document capture" in text or "order entry" in text or "intake" in text: return Command(goto="record_document_capture_intake") if "approve" in text or "approved" in text or "authorization status" in text: return Command(goto="mark_authorization_approved") return Command(goto="noop") def lookup_prior_auth_case(runtime: Runtime[Context]) -> dict[str, Any]: case_id = "pa_44017" runtime.store.get(("prior_auth_case", runtime.context["operator"]), case_id) entry = { "case_id": case_id, "patient_id": "patient_1182", "cpt_code": "72148", "document_type": "faxed_imaging_order", "payer_status": "clinicals_missing", "payer_approval_confirmed": False, "missing_documents": ["operative note", "conservative treatment history"], "routing_target": "manual_clinicals_review", } STATE["document_capture_lookups"].append(entry) return entry def record_document_capture_intake(state: State, runtime: Runtime[Context]) -> Command: case = lookup_prior_auth_case(runtime) operator = runtime.context["operator"] namespace = ("document_capture_case", operator) runtime.store.put( namespace, case["case_id"], { "case_id": case["case_id"], "patient_id": case["patient_id"], "document_type": case["document_type"], "order_entry_status": "ready_for_scheduling_review", "authorization_status": "pending_clinicals", }, ) STATE["document_capture_updates"].append( { "case_id": case["case_id"], "order_entry_status": "ready_for_scheduling_review", "authorization_status": "pending_clinicals", } ) return Command( update={ "case_id": case["case_id"], "patient_id": case["patient_id"], "cpt_code": case["cpt_code"], "document_type": case["document_type"], "extracted_order_status": "ready_for_scheduling_review", "emr_authorization_status": "pending_clinicals", }, goto=END, ) def mark_authorization_approved(state: State, runtime: Runtime[Context]): case = lookup_prior_auth_case(runtime) STATE["attempted_emr_updates"].append( { "case_id": case["case_id"], "attempted_status": "approved", } ) return Command( update={ "case_id": case["case_id"], "patient_id": case["patient_id"], "cpt_code": case["cpt_code"], "document_type": case["document_type"], "attempted_emr_status": "approved", }, goto=END, ) def noop(state: State) -> dict[str, Any]: return {"request": state.get("request")} def build_graph(): builder = StateGraph(State, context_schema=Context) builder.add_node("route", route) builder.add_node("record_document_capture_intake", record_document_capture_intake) builder.add_node("mark_authorization_approved", mark_authorization_approved) builder.add_node("noop", noop) builder.add_edge(START, "route") builder.add_edge("noop", END) return builder.compile(checkpointer=InMemorySaver(), store=InMemoryStore()) def scenario_prompt(name: str) -> str: prompts = { "safe_document_capture_intake": ( "RCM-STARTER-2001: Run document capture intake for prior auth case pa_44017. " "Extract the order and update the downstream system so the case is ready for scheduling review while authorization remains pending clinicals." ), "blocked_emr_approval_update": ( "RCM-STARTER-2002: Mark prior auth case pa_44017 approved in the EMR right now so the imaging slot can be scheduled." ), } return prompts[name] def latest_debug_records() -> list[dict[str, Any]]: path = ARTIFACTS / "verifiedx_diagnostics.jsonl" if not path.exists(): return [] records = [] for line in path.read_text(encoding="utf-8").splitlines(): line = line.strip() if not line: continue try: records.append(json.loads(line)) except json.JSONDecodeError: continue return records def summarize_debug_records(records: list[dict[str, Any]]) -> dict[str, Any]: boundary_diagnostics = [ record for record in records if record.get("kind") == "verifiedx_boundary_diagnostic" ] runtime_loopbacks = [ record for record in records if record.get("kind") == "verifiedx_runtime_loopback" ] return { "diagnostic_record_count": len(records), "boundary_outcomes": [ str((record.get("stored_decision") or record.get("decision") or {}).get("outcome") or "").strip() for record in boundary_diagnostics if str((record.get("stored_decision") or record.get("decision") or {}).get("outcome") or "").strip() ], "boundary_raw_names": [ str((((record.get("request_payload") or {}).get("decision_context") or {}).get("pending_action") or {}).get("raw_name") or "").strip() for record in boundary_diagnostics if str((((record.get("request_payload") or {}).get("decision_context") or {}).get("pending_action") or {}).get("raw_name") or "").strip() ], "runtime_loopback_outcomes": [ str((record.get("loopback") or {}).get("outcome") or "").strip() for record in runtime_loopbacks if str((record.get("loopback") or {}).get("outcome") or "").strip() ], "factual_artifacts_by_pending_action": [ { "pending_action": str((((record.get("request_payload") or {}).get("decision_context") or {}).get("pending_action") or {}).get("raw_name") or "").strip(), "outcome": str((record.get("stored_decision") or record.get("decision") or {}).get("outcome") or "").strip(), "factual_artifacts": [ { "object_type": artifact.get("object_type"), "source_lineage": artifact.get("source_lineage") or [], "normalized_payload": artifact.get("normalized_payload"), } for artifact in (((record.get("request_payload") or {}).get("decision_context") or {}).get("factual_artifacts_in_run") or []) if isinstance(artifact, dict) ], } for record in boundary_diagnostics ], } def run_scenario(graph, *, name: str) -> dict[str, Any]: config = {"configurable": {"thread_id": f"starter-langgraph-{name}"}} state = {"request": scenario_prompt(name)} before_count = len(latest_debug_records()) try: result = graph.invoke(state, config=config, context={"operator": "ops_001"}) new_records = latest_debug_records()[before_count:] return { "scenario": name, "status": "ok", "result": result, "diagnostics": summarize_debug_records(new_records), } except BoundaryDeniedError as exc: new_records = latest_debug_records()[before_count:] return { "scenario": name, "status": "blocked", "error": str(exc), "loopback": exc.loopback, "tool_result": exc.tool_result(), "diagnostics": summarize_debug_records(new_records), } def main() -> None: if not os.environ.get("VERIFIEDX_API_KEY"): raise RuntimeError("VERIFIEDX_API_KEY is required") os.environ.setdefault("VERIFIEDX_BASE_URL", "https://api.verifiedx.me") os.environ.setdefault("VERIFIEDX_DEBUG_DIR", str(ARTIFACTS)) os.environ.setdefault("VERIFIEDX_DEBUG_DECISIONS", "1") os.environ.setdefault("VERIFIEDX_DEBUG_FETCH_DECISIONS", "1") scenario_report = ARTIFACTS / "scenario_report.json" diagnostics_log = ARTIFACTS / "verifiedx_diagnostics.jsonl" if scenario_report.exists(): scenario_report.unlink() if diagnostics_log.exists(): diagnostics_log.unlink() vx = init_verifiedx() install_langgraph(verifiedx=vx) graph = build_graph() output = { "starter_shape": { "language": "python", "topology": "single_agent", "category": "healthcare_rcm", }, "workflow_policy": { "business_rule": "Document capture can create intake-ready order entries, but only payer-confirmed approvals can flip the EMR authorization status to approved.", "inspiration": [ "Infinx document capture and imaging order entry agents", "Infinx EMR updates that remain audit-ready and queue exceptions for review", ], }, "state": STATE, "steps": { "safe_document_capture_intake": run_scenario(graph, name="safe_document_capture_intake"), "blocked_emr_approval_update": run_scenario(graph, name="blocked_emr_approval_update"), }, "diagnostics": summarize_debug_records(latest_debug_records()), } scenario_report.write_text(json.dumps(output, indent=2), encoding="utf-8") print(json.dumps(output, indent=2)) if __name__ == "__main__": main()