LangGraph Python

How to use LangGraph with Flutch SDK. This guide shows integration patterns, not LangGraph basics.

Overview

LangGraph is a framework for building stateful workflows. See LangGraph docs for framework details.

This guide covers:

  • Wrapping LangGraph in AbstractGraphBuilder
  • Accessing agent configuration from nodes
  • Using Flutch services (ModelInitializer, McpRuntimeClient)
  • Adding interactive callbacks

Builder Integration

Wrap your LangGraph workflow in AbstractGraphBuilder.

Basic Setup

python
from flutch_sdk import AbstractGraphBuilder
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.mongodb import MongoDBSaver
from typing import TypedDict, Annotated, Sequence
from langchain_core.messages import BaseMessage, add_messages

# Define your state
class MyState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], add_messages]

class MyGraphV1Builder(AbstractGraphBuilder):
    version = "1.0.0"

    def __init__(self, checkpointer: MongoDBSaver, generate_node):
        super().__init__()
        self.checkpointer = checkpointer
        self.generate_node = generate_node

    async def build_graph(self, payload=None):
        # Build LangGraph workflow
        workflow = StateGraph(MyState)
        workflow.add_node("generate", self.generate_node.execute)
        workflow.add_edge(START, "generate")
        workflow.add_edge("generate", END)

        # Compile with checkpointer (provided by Flutch)
        return workflow.compile(checkpointer=self.checkpointer)

Key points:

  • ✅ Checkpointer injected by Flutch - automatic state persistence
  • ✅ Return compiled graph from build_graph()
  • ✅ Node methods receive state and config

Configuration Access

Access agent settings from config["configurable"]["graphSettings"] in your nodes.

Node with Configuration

python
import logging

logger = logging.getLogger(__name__)

class GenerateNode:
    async def execute(self, state: MyState, config=None):
        # Get agent-specific settings
        graph_settings = config.get("configurable", {}).get("graphSettings", {})
        system_prompt = graph_settings.get("systemPrompt", "You are a helpful assistant")
        model_id = graph_settings.get("modelSettings", {}).get("modelId", "gpt-4o")
        temperature = graph_settings.get("modelSettings", {}).get("temperature", 0.7)

        logger.debug(f"Using model: {model_id}, temp: {temperature}")

        # Use settings in your logic
        messages = [
            SystemMessage(content=system_prompt),
            *state["messages"]
        ]

        return {"messages": messages}

Agent config flows automatically:

Agent Config → payload.graphSettings → config["configurable"]["graphSettings"] → Your Node

Using Models

Inject ModelInitializer to create models from agent config.

Model Initialization

python
from flutch_sdk import ModelInitializer, McpRuntimeClient

class GenerateNode:
    def __init__(
        self,
        model_initializer: ModelInitializer,
        mcp_client: McpRuntimeClient
    ):
        self.model_initializer = model_initializer
        self.mcp_client = mcp_client

    async def execute(self, state: MyState, config=None):
        graph_settings = config.get("configurable", {}).get("graphSettings", {})
        model_id = graph_settings.get("modelSettings", {}).get("modelId")
        enabled_tools = graph_settings.get("availableTools", [])

        # Initialize model
        model = await self.model_initializer.initialize_chat_model(
            model_id=model_id,
            temperature=graph_settings.get("modelSettings", {}).get("temperature")
        )

        # Add tools if configured
        if enabled_tools:
            tools = await self.mcp_client.get_tools(enabled_tools)
            model = model.bind_tools(tools)

        # Use model
        result = await model.ainvoke(state["messages"], config)

        return {"messages": [result]}

Benefits:

  • ✅ Models from catalog - no API keys in code
  • ✅ Tools automatically filtered and converted
  • ✅ Each agent has different model/tools

Interactive Callbacks

Add callback buttons from LangGraph nodes.

Callback in Node

python
from flutch_sdk import Callback, CallbackResult, ExtendedCallbackContext, WithCallbacks, CallbackService

# 1. Create callback handler
class ApprovalCallbacks:
    @Callback("approve-action")
    async def handle_approval(self, context: ExtendedCallbackContext) -> CallbackResult:
        action = context.params.get("action")

        # Execute action
        await self.execute_action(action)

        return CallbackResult(
            success=True,
            message="Action approved!",
            patch={
                "text": "✅ Action completed",
                "disable_buttons": True
            }
        )

    async def execute_action(self, action: str):
        # Action logic
        pass

# 2. Register in builder
@WithCallbacks(ApprovalCallbacks)
class MyGraphV1Builder(AbstractGraphBuilder):
    version = "1.0.0"
    # ...

# 3. Issue callback from node
class PlanNode:
    def __init__(self, callback_service: CallbackService):
        self.callback_service = callback_service

    async def execute(self, state: MyState, config=None):
        plan = await create_plan(state)

        # Create callback button
        token = await self.callback_service.issue(
            handler="approve-action",
            params={"action": plan}
        )

        return {
            "output": {
                "text": "Please approve this plan:",
                "buttons": [
                    {"text": "Approve", "callback_token": token}
                ]
            }
        }

Flow:

Node → issue callback → User clicks → Handler executes → Workflow continues

Streaming

Enable streaming with metadata.

python
from amelie.graph_service_core import StreamChannel

async def build_graph(self, payload=None):
    workflow = StateGraph(MyState)
    workflow.add_node(
        "generate",
        self.generate_node.execute,
        metadata={
            "stream_channel": StreamChannel.TEXT  # Enable streaming
        }
    )

    return workflow.compile(checkpointer=self.checkpointer)

Stream channels:

  • StreamChannel.TEXT - Text tokens
  • StreamChannel.TOOL_CALLS - Tool executions
  • StreamChannel.REASONING - Reasoning (o1 models)

Platform automatically streams to all channels (Web, Telegram, WhatsApp).

Module Setup

Register everything in FastAPI application.

Graph Module

python
from flutch_sdk import create_graph_app
from versions.v1_0_0.builder import MyGraphV1Builder
from nodes.generate_node import GenerateNode
from callbacks.approval_callbacks import ApprovalCallbacks

# Create nodes
generate_node = GenerateNode(model_initializer, mcp_client)

# Create builders
builders = [
    MyGraphV1Builder(checkpointer, generate_node)
]

# Create app
app = create_graph_app(
    builders=builders,
    callbacks=[ApprovalCallbacks]
)

Bootstrap

python
from flutch_sdk import bootstrap
from graph_module import app

if __name__ == "__main__":
    bootstrap(app)

Complete Example

Full integration with tool calling.

State Definition

python
# src/state_model.py
from typing import TypedDict, Annotated, Sequence
from langchain_core.messages import BaseMessage, add_messages

class MyState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], add_messages]
    output: dict

Node Implementation

python
# src/nodes/generate_node.py
from flutch_sdk import ModelInitializer, McpRuntimeClient
from langchain_core.messages import SystemMessage
import logging

logger = logging.getLogger(__name__)

class GenerateNode:
    def __init__(
        self,
        model_initializer: ModelInitializer,
        mcp_client: McpRuntimeClient
    ):
        self.model_initializer = model_initializer
        self.mcp_client = mcp_client

    async def execute(self, state: MyState, config=None):
        graph_settings = config.get("configurable", {}).get("graphSettings", {})
        system_prompt = graph_settings.get("systemPrompt", "")
        model_id = graph_settings.get("modelSettings", {}).get("modelId", "gpt-4o")
        enabled_tools = graph_settings.get("availableTools", [])

        # Initialize model with tools
        model = await self.model_initializer.initialize_chat_model(model_id=model_id)

        if enabled_tools:
            tools = await self.mcp_client.get_tools(enabled_tools)
            model = model.bind_tools(tools)
            logger.debug(f"Configured {len(tools)} tools")

        # Prepare messages
        messages = [
            SystemMessage(content=system_prompt),
            *state["messages"]
        ]

        # Invoke model
        result = await model.ainvoke(messages, config)

        return {
            "messages": [result],
            "output": {
                "text": result.content,
                "metadata": {
                    "model_id": model_id,
                    "tools_used": len(getattr(result, "tool_calls", []))
                }
            }
        }

    async def execute_tools(self, state: MyState, config=None):
        last_message = state["messages"][-1]
        tool_calls = getattr(last_message, "tool_calls", [])

        tool_messages = []
        for tool_call in tool_calls:
            result = await self.mcp_client.execute_tool(
                tool_call["name"],
                tool_call["args"]
            )

            tool_messages.append({
                "type": "tool",
                "tool_call_id": tool_call["id"],
                "content": result.get("data") if result.get("success") else result.get("error"),
                "name": tool_call["name"]
            })

        return {"messages": tool_messages}

Builder

python
# src/versions/v1_0_0/builder.py
from flutch_sdk import AbstractGraphBuilder
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.mongodb import MongoDBSaver
from state_model import MyState
from nodes.generate_node import GenerateNode
from amelie.graph_service_core import StreamChannel

class MyGraphV1Builder(AbstractGraphBuilder):
    version = "1.0.0"

    def __init__(
        self,
        checkpointer: MongoDBSaver,
        generate_node: GenerateNode
    ):
        super().__init__()
        self.checkpointer = checkpointer
        self.generate_node = generate_node

    async def build_graph(self, payload=None):
        workflow = StateGraph(MyState)

        # Generate node with streaming
        workflow.add_node(
            "generate",
            self.generate_node.execute,
            metadata={
                "stream_channel": StreamChannel.TEXT
            }
        )

        # Tools node
        workflow.add_node(
            "tools",
            self.generate_node.execute_tools
        )

        # Flow
        workflow.add_edge(START, "generate")

        # Conditional: check if tools needed
        def should_continue(state: MyState) -> str:
            last_message = state["messages"][-1]
            return "tools" if getattr(last_message, "tool_calls", []) else END

        workflow.add_conditional_edges(
            "generate",
            should_continue,
            {
                "tools": "tools",
                END: END
            }
        )

        # Loop back after tools
        workflow.add_edge("tools", "generate")

        return workflow.compile(checkpointer=self.checkpointer)