How to use LlamaIndex Workflows with Flutch SDK.
Overview
LlamaIndex Workflows is an event-driven framework for RAG. See LlamaIndex docs for framework details.
This guide covers:
- Wrapping workflows in AbstractGraphBuilder
- Accessing agent configuration in workflow steps
- Using Flutch services (ModelInitializer, McpRuntimeClient)
- Adding interactive callbacks
- Indexing and retrieval with agent config
Builder Integration
Wrap your LlamaIndex Workflow in AbstractGraphBuilder.
Basic Setup
pythonfrom flutch_sdk import AbstractGraphBuilder from llama_index.core.workflow import Workflow, StartEvent, StopEvent, step from llama_index.core import VectorStoreIndex class RAGWorkflow(Workflow): def __init__(self, index: VectorStoreIndex, config: dict): super().__init__() self.index = index self.config = config @step async def retrieve(self, ev: StartEvent) -> StopEvent: query = ev.query # Use config top_k = self.config.get("retrievalSettings", {}).get("topK", 10) retriever = self.index.as_retriever(similarity_top_k=top_k) nodes = await retriever.aretrieve(query) return StopEvent(result={"documents": nodes}) class RAGGraphV1Builder(AbstractGraphBuilder): version = "1.0.0" def __init__(self, vector_index: VectorStoreIndex): super().__init__() self.vector_index = vector_index async def build_graph(self, payload=None): # Get agent configuration config = payload.graph_settings if payload else {} # Create workflow with config workflow = RAGWorkflow( index=self.vector_index, config=config ) return workflow
Key points:
- ✅ Pass agent config to workflow initialization
- ✅ Workflow can access config in all steps
- ✅ Return workflow instance from
build_graph()
Configuration Access
Access agent settings from workflow config passed during initialization.
Workflow with Configuration
pythonfrom llama_index.core.workflow import Workflow, step import logging logger = logging.getLogger(__name__) class RAGWorkflow(Workflow): def __init__(self, index: VectorStoreIndex, config: dict): super().__init__() self.index = index self.retrieval_settings = config.get("retrievalSettings", {}) self.rerank_settings = config.get("rerankSettings", {}) self.model_settings = config.get("modelSettings", {}) @step async def retrieve(self, ev: QueryEvent) -> RetrievalEvent: # Use configured retrieval settings top_k = self.retrieval_settings.get("topK", 10) threshold = self.retrieval_settings.get("similarityThreshold", 0.7) logger.debug(f"Retrieving top {top_k} with threshold {threshold}") retriever = self.index.as_retriever( similarity_top_k=top_k, similarity_cutoff=threshold ) nodes = await retriever.aretrieve(ev.query) return RetrievalEvent( query=ev.query, documents=[n.get_content() for n in nodes], scores=[n.score for n in nodes] )
Agent config flows:
Agent Config → payload.graphSettings → workflow.__init__(config) → All steps
Using Models
Use ModelInitializer to get models from agent config.
Model Initialization in Workflow
pythonfrom flutch_sdk import ModelInitializer, McpRuntimeClient from llama_index.core.workflow import Workflow, step from llama_index.core import Settings class RAGWorkflow(Workflow): def __init__( self, index: VectorStoreIndex, config: dict, model_initializer: ModelInitializer ): super().__init__() self.index = index self.config = config self.model_initializer = model_initializer @step async def generate(self, ev: RerankEvent) -> StopEvent: # Get model config model_settings = self.config.get("modelSettings", {}) model_id = model_settings.get("modelId", "gpt-4o") temperature = model_settings.get("temperature", 0.7) # Initialize model from catalog model = await self.model_initializer.initialize_chat_model( model_id=model_id, temperature=temperature ) # Build context context = "\n\n".join(ev.documents) prompt = f"Context:\n{context}\n\nQuestion: {ev.query}\n\nAnswer:" # Generate response = await model.acomplete(prompt) return StopEvent( result={ "text": str(response), "sources": ev.documents } )
Pass ModelInitializer in Builder
pythonclass RAGGraphV1Builder(AbstractGraphBuilder): version = "1.0.0" def __init__( self, vector_index: VectorStoreIndex, model_initializer: ModelInitializer ): super().__init__() self.vector_index = vector_index self.model_initializer = model_initializer async def build_graph(self, payload=None): config = payload.graph_settings if payload else {} workflow = RAGWorkflow( index=self.vector_index, config=config, model_initializer=self.model_initializer # Pass to workflow ) return workflow
Benefits:
- ✅ Models from catalog - no API keys in code
- ✅ Each agent uses different model
- ✅ Centralized model management
Interactive Callbacks
Add callback buttons from workflow steps.
Callback in Workflow
pythonfrom flutch_sdk import Callback, CallbackResult, ExtendedCallbackContext, WithCallbacks, CallbackService # 1. Create callback handler class DocumentCallbacks: @Callback("select-document") async def handle_selection(self, context: ExtendedCallbackContext) -> CallbackResult: doc_id = context.params.get("doc_id") # Process selection document = await self.get_document(doc_id) return CallbackResult( success=True, message="Document selected", patch={ "text": f"Using document: {document.title}", "disable_buttons": True } ) # 2. Register in builder @WithCallbacks(DocumentCallbacks) class RAGGraphV1Builder(AbstractGraphBuilder): version = "1.0.0" # ... # 3. Issue callback from workflow step class RAGWorkflow(Workflow): def __init__(self, index, config, callback_service: CallbackService): super().__init__() self.index = index self.config = config self.callback_service = callback_service @step async def present_sources(self, ev: RetrievalEvent) -> StopEvent: # Create buttons for each source buttons = [] for i, doc in enumerate(ev.documents[:3]): token = await self.callback_service.issue( handler="select-document", params={"doc_id": doc.id} ) buttons.append({ "text": f"Source {i+1}", "callback_token": token }) return StopEvent( result={ "text": "Select a source to continue:", "buttons": buttons } )
Flow:
Workflow step → issue callback → User clicks → Handler executes → Continue
Document Indexing
Index documents with agent-specific configuration.
Vector Store Setup
pythonfrom llama_index.core import VectorStoreIndex, Document from llama_index.vector_stores.mongodb import MongoDBAtlasVectorSearch from llama_index.embeddings.openai import OpenAIEmbedding import os class RAGGraphV1Builder(AbstractGraphBuilder): version = "1.0.0" def __init__(self, model_initializer: ModelInitializer): super().__init__() self.model_initializer = model_initializer # Initialize vector store self.vector_store = MongoDBAtlasVectorSearch( mongodb_client=mongo_client, db_name="flutch_vectors", collection_name="documents", index_name="vector_index" ) async def build_graph(self, payload=None): config = payload.graph_settings if payload else {} # Get embedding model from config embedding_model_id = config.get("embeddingModel", "text-embedding-3-small") embedding_model = await self.model_initializer.initialize_embedding_model( model_id=embedding_model_id ) # Create index with configured embeddings index = VectorStoreIndex.from_vector_store( vector_store=self.vector_store, embed_model=embedding_model ) workflow = RAGWorkflow(index=index, config=config) return workflow
Benefits:
- ✅ Different agents use different embedding models
- ✅ Centralized document storage
- ✅ Per-agent indexing configuration
Streaming
Enable streaming responses.
pythonfrom llama_index.core.workflow import Workflow, step class RAGWorkflow(Workflow): @step async def generate(self, ev: RerankEvent) -> StopEvent: # Build context context = "\n\n".join(ev.documents) prompt = f"Context:\n{context}\n\nQuestion: {ev.query}" # Stream response response_stream = await model.astream_complete(prompt) # Yield chunks - platform handles streaming full_response = "" async for chunk in response_stream: full_response += chunk.delta yield chunk.delta # Automatically streamed to user return StopEvent( result={ "text": full_response, "sources": ev.documents } )
Platform automatically streams to all channels (Web, Telegram, WhatsApp).
Module Setup
Register everything in FastAPI application.
Graph Module
pythonfrom flutch_sdk import create_graph_app from versions.v1_0_0.builder import RAGGraphV1Builder from callbacks.document_callbacks import DocumentCallbacks # Create builders builders = [ RAGGraphV1Builder(vector_index, model_initializer) ] # Create app app = create_graph_app( builders=builders, callbacks=[DocumentCallbacks] )
Bootstrap
pythonfrom flutch_sdk import bootstrap from graph_module import app if __name__ == "__main__": bootstrap(app)
Complete Example
Full RAG workflow with Flutch integration.
Event Definitions
python# src/events.py from llama_index.core.workflow import Event class QueryEvent(Event): query: str class RetrievalEvent(Event): query: str documents: list scores: list class RerankEvent(Event): query: str documents: list relevance_scores: list
Workflow Implementation
python# src/workflows/rag_workflow.py from flutch_sdk import ModelInitializer from llama_index.core.workflow import Workflow, StartEvent, StopEvent, step from llama_index.core import VectorStoreIndex from events import QueryEvent, RetrievalEvent, RerankEvent import logging logger = logging.getLogger(__name__) class RAGWorkflow(Workflow): def __init__( self, index: VectorStoreIndex, config: dict, model_initializer: ModelInitializer ): super().__init__() self.index = index self.config = config self.model_initializer = model_initializer @step async def start(self, ev: StartEvent) -> QueryEvent: return QueryEvent(query=ev.query) @step async def retrieve(self, ev: QueryEvent) -> RetrievalEvent: # Use configured settings top_k = self.config.get("retrievalSettings", {}).get("topK", 10) threshold = self.config.get("retrievalSettings", {}).get("similarityThreshold", 0.7) logger.debug(f"Retrieving top {top_k} documents") retriever = self.index.as_retriever( similarity_top_k=top_k, similarity_cutoff=threshold ) nodes = await retriever.aretrieve(ev.query) return RetrievalEvent( query=ev.query, documents=[n.get_content() for n in nodes], scores=[n.score for n in nodes] ) @step async def rerank(self, ev: RetrievalEvent) -> RerankEvent: # Check if reranking enabled rerank_enabled = self.config.get("rerankSettings", {}).get("enabled", True) if not rerank_enabled: return RerankEvent( query=ev.query, documents=ev.documents, relevance_scores=ev.scores ) top_n = self.config.get("rerankSettings", {}).get("topN", 5) # Use reranker from llama_index.postprocessor.cohere_rerank import CohereRerank reranker = CohereRerank(top_n=top_n) reranked = await reranker.apostprocess_nodes( ev.documents, query_str=ev.query ) return RerankEvent( query=ev.query, documents=[n.get_content() for n in reranked], relevance_scores=[n.score for n in reranked] ) @step async def generate(self, ev: RerankEvent) -> StopEvent: # Get model from config model_settings = self.config.get("modelSettings", {}) model_id = model_settings.get("modelId", "gpt-4o") temperature = model_settings.get("temperature", 0.7) model = await self.model_initializer.initialize_chat_model( model_id=model_id, temperature=temperature ) # Build context with citations context = "\n\n".join([ f"[{i+1}] {doc}" for i, doc in enumerate(ev.documents) ]) prompt = f"""Context: {context} Question: {ev.query} Answer using only the provided context. Cite sources using [1], [2], etc.""" # Generate response response = await model.acomplete(prompt) return StopEvent( result={ "text": str(response), "sources": ev.documents, "relevance_scores": ev.relevance_scores } )
Builder
python# src/versions/v1_0_0/builder.py from flutch_sdk import AbstractGraphBuilder, ModelInitializer from llama_index.core import VectorStoreIndex from llama_index.vector_stores.mongodb import MongoDBAtlasVectorSearch from workflows.rag_workflow import RAGWorkflow class RAGGraphV1Builder(AbstractGraphBuilder): version = "1.0.0" def __init__( self, mongo_client, model_initializer: ModelInitializer ): super().__init__() self.model_initializer = model_initializer # Initialize vector store vector_store = MongoDBAtlasVectorSearch( mongodb_client=mongo_client, db_name="flutch_vectors", collection_name="documents", index_name="vector_index" ) self.vector_store = vector_store async def build_graph(self, payload=None): config = payload.graph_settings if payload else {} # Get embedding model embedding_model_id = config.get("embeddingModel", "text-embedding-3-small") embedding_model = await self.model_initializer.initialize_embedding_model( model_id=embedding_model_id ) # Create index index = VectorStoreIndex.from_vector_store( vector_store=self.vector_store, embed_model=embedding_model ) # Create workflow workflow = RAGWorkflow( index=index, config=config, model_initializer=self.model_initializer ) return workflow