How to use LangGraph with Flutch SDK. This guide shows integration patterns, not LangGraph basics.
Overview
LangGraph is a framework for building stateful workflows. See LangGraph docs for framework details.
This guide covers:
- Wrapping LangGraph in AbstractGraphBuilder
- Accessing agent configuration from nodes
- Using Flutch services (ModelInitializer, McpRuntimeClient)
- Adding interactive callbacks
Builder Integration
Wrap your LangGraph workflow in AbstractGraphBuilder.
Basic Setup
pythonfrom flutch_sdk import AbstractGraphBuilder from langgraph.graph import StateGraph, START, END from langgraph.checkpoint.mongodb import MongoDBSaver from typing import TypedDict, Annotated, Sequence from langchain_core.messages import BaseMessage, add_messages # Define your state class MyState(TypedDict): messages: Annotated[Sequence[BaseMessage], add_messages] class MyGraphV1Builder(AbstractGraphBuilder): version = "1.0.0" def __init__(self, checkpointer: MongoDBSaver, generate_node): super().__init__() self.checkpointer = checkpointer self.generate_node = generate_node async def build_graph(self, payload=None): # Build LangGraph workflow workflow = StateGraph(MyState) workflow.add_node("generate", self.generate_node.execute) workflow.add_edge(START, "generate") workflow.add_edge("generate", END) # Compile with checkpointer (provided by Flutch) return workflow.compile(checkpointer=self.checkpointer)
Key points:
- ✅ Checkpointer injected by Flutch - automatic state persistence
- ✅ Return compiled graph from
build_graph() - ✅ Node methods receive state and config
Configuration Access
Access agent settings from config["configurable"]["graphSettings"] in your nodes.
Node with Configuration
pythonimport logging logger = logging.getLogger(__name__) class GenerateNode: async def execute(self, state: MyState, config=None): # Get agent-specific settings graph_settings = config.get("configurable", {}).get("graphSettings", {}) system_prompt = graph_settings.get("systemPrompt", "You are a helpful assistant") model_id = graph_settings.get("modelSettings", {}).get("modelId", "gpt-4o") temperature = graph_settings.get("modelSettings", {}).get("temperature", 0.7) logger.debug(f"Using model: {model_id}, temp: {temperature}") # Use settings in your logic messages = [ SystemMessage(content=system_prompt), *state["messages"] ] return {"messages": messages}
Agent config flows automatically:
Agent Config → payload.graphSettings → config["configurable"]["graphSettings"] → Your Node
Using Models
Inject ModelInitializer to create models from agent config.
Model Initialization
pythonfrom flutch_sdk import ModelInitializer, McpRuntimeClient class GenerateNode: def __init__( self, model_initializer: ModelInitializer, mcp_client: McpRuntimeClient ): self.model_initializer = model_initializer self.mcp_client = mcp_client async def execute(self, state: MyState, config=None): graph_settings = config.get("configurable", {}).get("graphSettings", {}) model_id = graph_settings.get("modelSettings", {}).get("modelId") enabled_tools = graph_settings.get("availableTools", []) # Initialize model model = await self.model_initializer.initialize_chat_model( model_id=model_id, temperature=graph_settings.get("modelSettings", {}).get("temperature") ) # Add tools if configured if enabled_tools: tools = await self.mcp_client.get_tools(enabled_tools) model = model.bind_tools(tools) # Use model result = await model.ainvoke(state["messages"], config) return {"messages": [result]}
Benefits:
- ✅ Models from catalog - no API keys in code
- ✅ Tools automatically filtered and converted
- ✅ Each agent has different model/tools
Interactive Callbacks
Add callback buttons from LangGraph nodes.
Callback in Node
pythonfrom flutch_sdk import Callback, CallbackResult, ExtendedCallbackContext, WithCallbacks, CallbackService # 1. Create callback handler class ApprovalCallbacks: @Callback("approve-action") async def handle_approval(self, context: ExtendedCallbackContext) -> CallbackResult: action = context.params.get("action") # Execute action await self.execute_action(action) return CallbackResult( success=True, message="Action approved!", patch={ "text": "✅ Action completed", "disable_buttons": True } ) async def execute_action(self, action: str): # Action logic pass # 2. Register in builder @WithCallbacks(ApprovalCallbacks) class MyGraphV1Builder(AbstractGraphBuilder): version = "1.0.0" # ... # 3. Issue callback from node class PlanNode: def __init__(self, callback_service: CallbackService): self.callback_service = callback_service async def execute(self, state: MyState, config=None): plan = await create_plan(state) # Create callback button token = await self.callback_service.issue( handler="approve-action", params={"action": plan} ) return { "output": { "text": "Please approve this plan:", "buttons": [ {"text": "Approve", "callback_token": token} ] } }
Flow:
Node → issue callback → User clicks → Handler executes → Workflow continues
Streaming
Enable streaming with metadata.
pythonfrom amelie.graph_service_core import StreamChannel async def build_graph(self, payload=None): workflow = StateGraph(MyState) workflow.add_node( "generate", self.generate_node.execute, metadata={ "stream_channel": StreamChannel.TEXT # Enable streaming } ) return workflow.compile(checkpointer=self.checkpointer)
Stream channels:
StreamChannel.TEXT- Text tokensStreamChannel.TOOL_CALLS- Tool executionsStreamChannel.REASONING- Reasoning (o1 models)
Platform automatically streams to all channels (Web, Telegram, WhatsApp).
Module Setup
Register everything in FastAPI application.
Graph Module
pythonfrom flutch_sdk import create_graph_app from versions.v1_0_0.builder import MyGraphV1Builder from nodes.generate_node import GenerateNode from callbacks.approval_callbacks import ApprovalCallbacks # Create nodes generate_node = GenerateNode(model_initializer, mcp_client) # Create builders builders = [ MyGraphV1Builder(checkpointer, generate_node) ] # Create app app = create_graph_app( builders=builders, callbacks=[ApprovalCallbacks] )
Bootstrap
pythonfrom flutch_sdk import bootstrap from graph_module import app if __name__ == "__main__": bootstrap(app)
Complete Example
Full integration with tool calling.
State Definition
python# src/state_model.py from typing import TypedDict, Annotated, Sequence from langchain_core.messages import BaseMessage, add_messages class MyState(TypedDict): messages: Annotated[Sequence[BaseMessage], add_messages] output: dict
Node Implementation
python# src/nodes/generate_node.py from flutch_sdk import ModelInitializer, McpRuntimeClient from langchain_core.messages import SystemMessage import logging logger = logging.getLogger(__name__) class GenerateNode: def __init__( self, model_initializer: ModelInitializer, mcp_client: McpRuntimeClient ): self.model_initializer = model_initializer self.mcp_client = mcp_client async def execute(self, state: MyState, config=None): graph_settings = config.get("configurable", {}).get("graphSettings", {}) system_prompt = graph_settings.get("systemPrompt", "") model_id = graph_settings.get("modelSettings", {}).get("modelId", "gpt-4o") enabled_tools = graph_settings.get("availableTools", []) # Initialize model with tools model = await self.model_initializer.initialize_chat_model(model_id=model_id) if enabled_tools: tools = await self.mcp_client.get_tools(enabled_tools) model = model.bind_tools(tools) logger.debug(f"Configured {len(tools)} tools") # Prepare messages messages = [ SystemMessage(content=system_prompt), *state["messages"] ] # Invoke model result = await model.ainvoke(messages, config) return { "messages": [result], "output": { "text": result.content, "metadata": { "model_id": model_id, "tools_used": len(getattr(result, "tool_calls", [])) } } } async def execute_tools(self, state: MyState, config=None): last_message = state["messages"][-1] tool_calls = getattr(last_message, "tool_calls", []) tool_messages = [] for tool_call in tool_calls: result = await self.mcp_client.execute_tool( tool_call["name"], tool_call["args"] ) tool_messages.append({ "type": "tool", "tool_call_id": tool_call["id"], "content": result.get("data") if result.get("success") else result.get("error"), "name": tool_call["name"] }) return {"messages": tool_messages}
Builder
python# src/versions/v1_0_0/builder.py from flutch_sdk import AbstractGraphBuilder from langgraph.graph import StateGraph, START, END from langgraph.checkpoint.mongodb import MongoDBSaver from state_model import MyState from nodes.generate_node import GenerateNode from amelie.graph_service_core import StreamChannel class MyGraphV1Builder(AbstractGraphBuilder): version = "1.0.0" def __init__( self, checkpointer: MongoDBSaver, generate_node: GenerateNode ): super().__init__() self.checkpointer = checkpointer self.generate_node = generate_node async def build_graph(self, payload=None): workflow = StateGraph(MyState) # Generate node with streaming workflow.add_node( "generate", self.generate_node.execute, metadata={ "stream_channel": StreamChannel.TEXT } ) # Tools node workflow.add_node( "tools", self.generate_node.execute_tools ) # Flow workflow.add_edge(START, "generate") # Conditional: check if tools needed def should_continue(state: MyState) -> str: last_message = state["messages"][-1] return "tools" if getattr(last_message, "tool_calls", []) else END workflow.add_conditional_edges( "generate", should_continue, { "tools": "tools", END: END } ) # Loop back after tools workflow.add_edge("tools", "generate") return workflow.compile(checkpointer=self.checkpointer)