How to use Haystack Pipelines with Flutch SDK. This guide shows integration patterns, not Haystack basics.
Overview
Haystack is a framework for production RAG pipelines. See Haystack docs for framework details.
This guide covers:
- Wrapping pipelines in AbstractGraphBuilder
- Accessing agent configuration in components
- Using Flutch services (ModelInitializer, McpRuntimeClient)
- Adding interactive callbacks
- Document indexing with agent config
Builder Integration
Wrap your Haystack Pipeline in AbstractGraphBuilder.
Basic Setup
pythonfrom flutch_sdk import AbstractGraphBuilder from haystack import Pipeline from haystack.components.embedders import SentenceTransformersTextEmbedder from haystack.components.retrievers import InMemoryEmbeddingRetriever from haystack.components.generators import OpenAIGenerator class SearchGraphV1Builder(AbstractGraphBuilder): version = "1.0.0" def __init__(self, document_store): super().__init__() self.document_store = document_store async def build_graph(self, payload=None): # Get agent configuration config = payload.graph_settings if payload else {} # Create pipeline with config pipeline = Pipeline() # Add components with configured settings top_k = config.get("retrievalSettings", {}).get("topK", 10) model = config.get("modelSettings", {}).get("model", "gpt-4") pipeline.add_component( "embedder", SentenceTransformersTextEmbedder() ) pipeline.add_component( "retriever", InMemoryEmbeddingRetriever( document_store=self.document_store, top_k=top_k ) ) pipeline.add_component( "generator", OpenAIGenerator(model=model) ) # Connect components pipeline.connect("embedder.embedding", "retriever.query_embedding") pipeline.connect("retriever.documents", "generator.documents") return pipeline
Key points:
- ✅ Access config via
payload.graph_settings - ✅ Configure components with agent settings
- ✅ Return pipeline from
build_graph()
Configuration Access
Configure pipeline components with agent-specific settings.
Dynamic Pipeline Configuration
pythonfrom flutch_sdk import AbstractGraphBuilder from haystack import Pipeline import logging logger = logging.getLogger(__name__) class SearchGraphV1Builder(AbstractGraphBuilder): version = "1.0.0" def __init__(self, document_store): super().__init__() self.document_store = document_store async def build_graph(self, payload=None): config = payload.graph_settings if payload else {} # Extract settings retrieval_settings = config.get("retrievalSettings", {}) ranking_settings = config.get("rankingSettings", {}) generation_settings = config.get("generationSettings", {}) top_k = retrieval_settings.get("topK", 10) top_n = ranking_settings.get("topN", 5) model = generation_settings.get("model", "gpt-4") temperature = generation_settings.get("temperature", 0.7) logger.debug(f"Building pipeline: top_k={top_k}, model={model}") # Create pipeline with configured components pipeline = Pipeline() pipeline.add_component( "retriever", InMemoryEmbeddingRetriever( document_store=self.document_store, top_k=top_k ) ) pipeline.add_component( "ranker", TransformersSimilarityRanker(top_k=top_n) ) pipeline.add_component( "generator", OpenAIGenerator( model=model, generation_kwargs={"temperature": temperature} ) ) # Connect... return pipeline
Agent config flows:
Agent Config → payload.graphSettings → Component parameters → Pipeline
Using Models
Use ModelInitializer to get models from agent config.
Custom Generator Component
pythonfrom flutch_sdk import ModelInitializer from haystack import component from haystack.dataclasses import Document from typing import List import logging logger = logging.getLogger(__name__) @component class FlutchGenerator: """Custom generator using Flutch ModelInitializer""" def __init__(self, model_initializer: ModelInitializer, config: dict): self.model_initializer = model_initializer self.config = config @component.output_types(replies=List[str]) async def run(self, documents: List[Document], query: str): # Get model from config model_settings = self.config.get("modelSettings", {}) model_id = model_settings.get("modelId", "gpt-4o") temperature = model_settings.get("temperature", 0.7) logger.debug(f"Using model: {model_id}") # Initialize model from catalog model = await self.model_initializer.initialize_chat_model( model_id=model_id, temperature=temperature ) # Build context context = "\n\n".join([doc.content for doc in documents]) prompt = f"Context:\n{context}\n\nQuestion: {query}\n\nAnswer:" # Generate response = await model.acomplete(prompt) return {"replies": [str(response)]}
Use in Builder
pythonclass SearchGraphV1Builder(AbstractGraphBuilder): version = "1.0.0" def __init__( self, document_store, model_initializer: ModelInitializer ): super().__init__() self.document_store = document_store self.model_initializer = model_initializer async def build_graph(self, payload=None): config = payload.graph_settings if payload else {} pipeline = Pipeline() # Use Flutch generator pipeline.add_component( "generator", FlutchGenerator( model_initializer=self.model_initializer, config=config ) ) # Connect... return pipeline
Benefits:
- ✅ Models from catalog - no API keys in code
- ✅ Each agent uses different model
- ✅ Centralized model management
Interactive Callbacks
Add callback buttons from custom components.
Callback Component
pythonfrom flutch_sdk import Callback, CallbackResult, ExtendedCallbackContext, WithCallbacks, CallbackService from haystack import component # 1. Create callback handler class SourceCallbacks: @Callback("select-source") async def handle_selection(self, context: ExtendedCallbackContext) -> CallbackResult: doc_id = context.params.get("doc_id") # Process selection document = await self.get_document(doc_id) return CallbackResult( success=True, message="Source selected", patch={ "text": f"Selected: {document.title}", "disable_buttons": True } ) # 2. Register in builder @WithCallbacks(SourceCallbacks) class SearchGraphV1Builder(AbstractGraphBuilder): version = "1.0.0" # ... # 3. Custom component that issues callback @component class SourceSelector: def __init__(self, callback_service: CallbackService): self.callback_service = callback_service @component.output_types(output=dict) async def run(self, documents: List[Document]): # Create buttons for each source buttons = [] for doc in documents[:3]: token = await self.callback_service.issue( handler="select-source", params={"doc_id": doc.id} ) buttons.append({ "text": f"Source: {doc.meta.get('title', 'Unknown')}", "callback_token": token }) return { "output": { "text": "Select a source:", "buttons": buttons } }
Flow:
Component → issue callback → User clicks → Handler executes → Continue
Document Indexing
Index documents with agent-specific configuration.
Indexing Pipeline
pythonfrom haystack import Pipeline from haystack.components.converters import TextFileToDocument from haystack.components.preprocessors import DocumentCleaner, DocumentSplitter from haystack.components.embedders import SentenceTransformersDocumentEmbedder from haystack.components.writers import DocumentWriter class SearchGraphV1Builder(AbstractGraphBuilder): version = "1.0.0" def __init__( self, document_store, model_initializer: ModelInitializer ): super().__init__() self.document_store = document_store self.model_initializer = model_initializer async def create_indexing_pipeline(self, config: dict): """Create indexing pipeline with configured embeddings""" # Get embedding model from config embedding_model_id = config.get("embeddingModel", "text-embedding-3-small") embedding_model = await self.model_initializer.initialize_embedding_model( model_id=embedding_model_id ) pipeline = Pipeline() pipeline.add_component("converter", TextFileToDocument()) pipeline.add_component("cleaner", DocumentCleaner()) pipeline.add_component( "splitter", DocumentSplitter( split_by="sentence", split_length=config.get("chunkSize", 3) ) ) pipeline.add_component( "embedder", embedding_model # Use configured model ) pipeline.add_component( "writer", DocumentWriter(document_store=self.document_store) ) # Connect pipeline.connect("converter", "cleaner") pipeline.connect("cleaner", "splitter") pipeline.connect("splitter", "embedder") pipeline.connect("embedder", "writer") return pipeline
Benefits:
- ✅ Different agents use different embedding models
- ✅ Per-agent chunk size configuration
- ✅ Centralized document storage
Custom Components
Create custom components that use Flutch services.
Retriever with MCP Tools
pythonfrom flutch_sdk import McpRuntimeClient from haystack import component from haystack.dataclasses import Document from typing import List @component class ToolEnhancedRetriever: """Retriever that can use MCP tools for enhanced search""" def __init__( self, document_store, mcp_client: McpRuntimeClient, config: dict ): self.document_store = document_store self.mcp_client = mcp_client self.enabled_tools = config.get("availableTools", []) @component.output_types(documents=List[Document]) async def run(self, query: str): # First, try using tools to expand query enhanced_query = query if "web-search" in self.enabled_tools: # Use web search tool to get fresh context result = await self.mcp_client.execute_tool( "web-search", {"query": query} ) if result.get("success"): enhanced_query = f"{query} {result.get('data', '')}" # Then retrieve from document store results = self.document_store.embedding_retrieval( query=enhanced_query, top_k=10 ) return {"documents": results}
Benefits:
- ✅ Tools from catalog - automatically converted
- ✅ Each agent has different tools
- ✅ Seamless tool integration
Module Setup
Register everything in FastAPI application.
Graph Module
pythonfrom flutch_sdk import create_graph_app from versions.v1_0_0.builder import SearchGraphV1Builder from callbacks.source_callbacks import SourceCallbacks # Create builders builders = [ SearchGraphV1Builder(document_store, model_initializer) ] # Create app app = create_graph_app( builders=builders, callbacks=[SourceCallbacks] )
Bootstrap
pythonfrom flutch_sdk import bootstrap from graph_module import app if __name__ == "__main__": bootstrap(app)
Complete Example
Full RAG pipeline with Flutch integration.
Query Pipeline
python# src/pipelines/query_pipeline.py from flutch_sdk import ModelInitializer from haystack import Pipeline from haystack.components.embedders import SentenceTransformersTextEmbedder from haystack.components.retrievers import InMemoryEmbeddingRetriever from haystack.components.rankers import TransformersSimilarityRanker from haystack.components.builders import PromptBuilder def create_query_pipeline( document_store, model_initializer: ModelInitializer, config: dict ): """Create query pipeline with agent configuration""" # Extract settings retrieval_settings = config.get("retrievalSettings", {}) ranking_settings = config.get("rankingSettings", {}) generation_settings = config.get("generationSettings", {}) pipeline = Pipeline() # Embedder pipeline.add_component( "query_embedder", SentenceTransformersTextEmbedder() ) # Retriever with configured top_k pipeline.add_component( "retriever", InMemoryEmbeddingRetriever( document_store=document_store, top_k=retrieval_settings.get("topK", 10) ) ) # Ranker with configured top_n pipeline.add_component( "ranker", TransformersSimilarityRanker( model=ranking_settings.get("model", "cross-encoder/ms-marco-MiniLM-L-6-v2"), top_k=ranking_settings.get("topN", 5) ) ) # Prompt builder prompt_template = """ Context: {% for doc in documents %} {{ doc.content }} {% endfor %} Question: {{ query }} Answer the question using only the provided context. """ pipeline.add_component( "prompt_builder", PromptBuilder(template=prompt_template) ) # Generator using Flutch model from components.flutch_generator import FlutchGenerator pipeline.add_component( "generator", FlutchGenerator( model_initializer=model_initializer, config=config ) ) # Connect components pipeline.connect("query_embedder.embedding", "retriever.query_embedding") pipeline.connect("retriever.documents", "ranker.documents") pipeline.connect("ranker.documents", "prompt_builder.documents") pipeline.connect("prompt_builder.prompt", "generator.prompt") return pipeline
Custom Generator
python# src/components/flutch_generator.py from flutch_sdk import ModelInitializer from haystack import component import logging logger = logging.getLogger(__name__) @component class FlutchGenerator: def __init__(self, model_initializer: ModelInitializer, config: dict): self.model_initializer = model_initializer self.config = config @component.output_types(replies=list) async def run(self, prompt: str): # Get model settings model_settings = self.config.get("generationSettings", {}) model_id = model_settings.get("modelId", "gpt-4o") temperature = model_settings.get("temperature", 0.7) max_tokens = model_settings.get("maxTokens", 500) logger.debug(f"Generating with {model_id}") # Initialize model from catalog model = await self.model_initializer.initialize_chat_model( model_id=model_id, temperature=temperature, max_tokens=max_tokens ) # Generate response = await model.acomplete(prompt) return {"replies": [str(response)]}
Builder
python# src/versions/v1_0_0/builder.py from flutch_sdk import AbstractGraphBuilder, ModelInitializer from haystack_integrations.document_stores.mongodb_atlas import MongoDBAtlasDocumentStore from pipelines.query_pipeline import create_query_pipeline import os class SearchGraphV1Builder(AbstractGraphBuilder): version = "1.0.0" def __init__(self, model_initializer: ModelInitializer): super().__init__() self.model_initializer = model_initializer # Initialize document store self.document_store = MongoDBAtlasDocumentStore( mongo_connection_string=os.getenv("MONGODB_URI"), database_name="flutch_docs", collection_name="documents" ) async def build_graph(self, payload=None): # Get agent configuration config = payload.graph_settings if payload else {} # Create pipeline with config pipeline = create_query_pipeline( document_store=self.document_store, model_initializer=self.model_initializer, config=config ) return pipeline async def execute(self, payload): # Build pipeline pipeline = await self.build_graph(payload) # Extract query query = payload.message.content # Run pipeline result = pipeline.run({ "query_embedder": {"text": query}, "prompt_builder": {"query": query} }) # Extract response response = result["generator"]["replies"][0] documents = result["ranker"]["documents"] # Return with sources return { "output": { "text": response, "metadata": { "sources": [ { "content": doc.content, "score": doc.score, "source": doc.meta.get("source") } for doc in documents ] } } }