Haystack

How to use Haystack Pipelines with Flutch SDK. This guide shows integration patterns, not Haystack basics.

Overview

Haystack is a framework for production RAG pipelines. See Haystack docs for framework details.

This guide covers:

  • Wrapping pipelines in AbstractGraphBuilder
  • Accessing agent configuration in components
  • Using Flutch services (ModelInitializer, McpRuntimeClient)
  • Adding interactive callbacks
  • Document indexing with agent config

Builder Integration

Wrap your Haystack Pipeline in AbstractGraphBuilder.

Basic Setup

python
from flutch_sdk import AbstractGraphBuilder
from haystack import Pipeline
from haystack.components.embedders import SentenceTransformersTextEmbedder
from haystack.components.retrievers import InMemoryEmbeddingRetriever
from haystack.components.generators import OpenAIGenerator

class SearchGraphV1Builder(AbstractGraphBuilder):
    version = "1.0.0"

    def __init__(self, document_store):
        super().__init__()
        self.document_store = document_store

    async def build_graph(self, payload=None):
        # Get agent configuration
        config = payload.graph_settings if payload else {}

        # Create pipeline with config
        pipeline = Pipeline()

        # Add components with configured settings
        top_k = config.get("retrievalSettings", {}).get("topK", 10)
        model = config.get("modelSettings", {}).get("model", "gpt-4")

        pipeline.add_component(
            "embedder",
            SentenceTransformersTextEmbedder()
        )
        pipeline.add_component(
            "retriever",
            InMemoryEmbeddingRetriever(
                document_store=self.document_store,
                top_k=top_k
            )
        )
        pipeline.add_component(
            "generator",
            OpenAIGenerator(model=model)
        )

        # Connect components
        pipeline.connect("embedder.embedding", "retriever.query_embedding")
        pipeline.connect("retriever.documents", "generator.documents")

        return pipeline

Key points:

  • ✅ Access config via payload.graph_settings
  • ✅ Configure components with agent settings
  • ✅ Return pipeline from build_graph()

Configuration Access

Configure pipeline components with agent-specific settings.

Dynamic Pipeline Configuration

python
from flutch_sdk import AbstractGraphBuilder
from haystack import Pipeline
import logging

logger = logging.getLogger(__name__)

class SearchGraphV1Builder(AbstractGraphBuilder):
    version = "1.0.0"

    def __init__(self, document_store):
        super().__init__()
        self.document_store = document_store

    async def build_graph(self, payload=None):
        config = payload.graph_settings if payload else {}

        # Extract settings
        retrieval_settings = config.get("retrievalSettings", {})
        ranking_settings = config.get("rankingSettings", {})
        generation_settings = config.get("generationSettings", {})

        top_k = retrieval_settings.get("topK", 10)
        top_n = ranking_settings.get("topN", 5)
        model = generation_settings.get("model", "gpt-4")
        temperature = generation_settings.get("temperature", 0.7)

        logger.debug(f"Building pipeline: top_k={top_k}, model={model}")

        # Create pipeline with configured components
        pipeline = Pipeline()

        pipeline.add_component(
            "retriever",
            InMemoryEmbeddingRetriever(
                document_store=self.document_store,
                top_k=top_k
            )
        )

        pipeline.add_component(
            "ranker",
            TransformersSimilarityRanker(top_k=top_n)
        )

        pipeline.add_component(
            "generator",
            OpenAIGenerator(
                model=model,
                generation_kwargs={"temperature": temperature}
            )
        )

        # Connect...

        return pipeline

Agent config flows:

Agent Config → payload.graphSettings → Component parameters → Pipeline

Using Models

Use ModelInitializer to get models from agent config.

Custom Generator Component

python
from flutch_sdk import ModelInitializer
from haystack import component
from haystack.dataclasses import Document
from typing import List
import logging

logger = logging.getLogger(__name__)

@component
class FlutchGenerator:
    """Custom generator using Flutch ModelInitializer"""

    def __init__(self, model_initializer: ModelInitializer, config: dict):
        self.model_initializer = model_initializer
        self.config = config

    @component.output_types(replies=List[str])
    async def run(self, documents: List[Document], query: str):
        # Get model from config
        model_settings = self.config.get("modelSettings", {})
        model_id = model_settings.get("modelId", "gpt-4o")
        temperature = model_settings.get("temperature", 0.7)

        logger.debug(f"Using model: {model_id}")

        # Initialize model from catalog
        model = await self.model_initializer.initialize_chat_model(
            model_id=model_id,
            temperature=temperature
        )

        # Build context
        context = "\n\n".join([doc.content for doc in documents])
        prompt = f"Context:\n{context}\n\nQuestion: {query}\n\nAnswer:"

        # Generate
        response = await model.acomplete(prompt)

        return {"replies": [str(response)]}

Use in Builder

python
class SearchGraphV1Builder(AbstractGraphBuilder):
    version = "1.0.0"

    def __init__(
        self,
        document_store,
        model_initializer: ModelInitializer
    ):
        super().__init__()
        self.document_store = document_store
        self.model_initializer = model_initializer

    async def build_graph(self, payload=None):
        config = payload.graph_settings if payload else {}

        pipeline = Pipeline()

        # Use Flutch generator
        pipeline.add_component(
            "generator",
            FlutchGenerator(
                model_initializer=self.model_initializer,
                config=config
            )
        )

        # Connect...

        return pipeline

Benefits:

  • ✅ Models from catalog - no API keys in code
  • ✅ Each agent uses different model
  • ✅ Centralized model management

Interactive Callbacks

Add callback buttons from custom components.

Callback Component

python
from flutch_sdk import Callback, CallbackResult, ExtendedCallbackContext, WithCallbacks, CallbackService
from haystack import component

# 1. Create callback handler
class SourceCallbacks:
    @Callback("select-source")
    async def handle_selection(self, context: ExtendedCallbackContext) -> CallbackResult:
        doc_id = context.params.get("doc_id")

        # Process selection
        document = await self.get_document(doc_id)

        return CallbackResult(
            success=True,
            message="Source selected",
            patch={
                "text": f"Selected: {document.title}",
                "disable_buttons": True
            }
        )

# 2. Register in builder
@WithCallbacks(SourceCallbacks)
class SearchGraphV1Builder(AbstractGraphBuilder):
    version = "1.0.0"
    # ...

# 3. Custom component that issues callback
@component
class SourceSelector:
    def __init__(self, callback_service: CallbackService):
        self.callback_service = callback_service

    @component.output_types(output=dict)
    async def run(self, documents: List[Document]):
        # Create buttons for each source
        buttons = []
        for doc in documents[:3]:
            token = await self.callback_service.issue(
                handler="select-source",
                params={"doc_id": doc.id}
            )
            buttons.append({
                "text": f"Source: {doc.meta.get('title', 'Unknown')}",
                "callback_token": token
            })

        return {
            "output": {
                "text": "Select a source:",
                "buttons": buttons
            }
        }

Flow:

Component → issue callback → User clicks → Handler executes → Continue

Document Indexing

Index documents with agent-specific configuration.

Indexing Pipeline

python
from haystack import Pipeline
from haystack.components.converters import TextFileToDocument
from haystack.components.preprocessors import DocumentCleaner, DocumentSplitter
from haystack.components.embedders import SentenceTransformersDocumentEmbedder
from haystack.components.writers import DocumentWriter

class SearchGraphV1Builder(AbstractGraphBuilder):
    version = "1.0.0"

    def __init__(
        self,
        document_store,
        model_initializer: ModelInitializer
    ):
        super().__init__()
        self.document_store = document_store
        self.model_initializer = model_initializer

    async def create_indexing_pipeline(self, config: dict):
        """Create indexing pipeline with configured embeddings"""

        # Get embedding model from config
        embedding_model_id = config.get("embeddingModel", "text-embedding-3-small")
        embedding_model = await self.model_initializer.initialize_embedding_model(
            model_id=embedding_model_id
        )

        pipeline = Pipeline()

        pipeline.add_component("converter", TextFileToDocument())
        pipeline.add_component("cleaner", DocumentCleaner())
        pipeline.add_component(
            "splitter",
            DocumentSplitter(
                split_by="sentence",
                split_length=config.get("chunkSize", 3)
            )
        )
        pipeline.add_component(
            "embedder",
            embedding_model  # Use configured model
        )
        pipeline.add_component(
            "writer",
            DocumentWriter(document_store=self.document_store)
        )

        # Connect
        pipeline.connect("converter", "cleaner")
        pipeline.connect("cleaner", "splitter")
        pipeline.connect("splitter", "embedder")
        pipeline.connect("embedder", "writer")

        return pipeline

Benefits:

  • ✅ Different agents use different embedding models
  • ✅ Per-agent chunk size configuration
  • ✅ Centralized document storage

Custom Components

Create custom components that use Flutch services.

Retriever with MCP Tools

python
from flutch_sdk import McpRuntimeClient
from haystack import component
from haystack.dataclasses import Document
from typing import List

@component
class ToolEnhancedRetriever:
    """Retriever that can use MCP tools for enhanced search"""

    def __init__(
        self,
        document_store,
        mcp_client: McpRuntimeClient,
        config: dict
    ):
        self.document_store = document_store
        self.mcp_client = mcp_client
        self.enabled_tools = config.get("availableTools", [])

    @component.output_types(documents=List[Document])
    async def run(self, query: str):
        # First, try using tools to expand query
        enhanced_query = query

        if "web-search" in self.enabled_tools:
            # Use web search tool to get fresh context
            result = await self.mcp_client.execute_tool(
                "web-search",
                {"query": query}
            )
            if result.get("success"):
                enhanced_query = f"{query} {result.get('data', '')}"

        # Then retrieve from document store
        results = self.document_store.embedding_retrieval(
            query=enhanced_query,
            top_k=10
        )

        return {"documents": results}

Benefits:

  • ✅ Tools from catalog - automatically converted
  • ✅ Each agent has different tools
  • ✅ Seamless tool integration

Module Setup

Register everything in FastAPI application.

Graph Module

python
from flutch_sdk import create_graph_app
from versions.v1_0_0.builder import SearchGraphV1Builder
from callbacks.source_callbacks import SourceCallbacks

# Create builders
builders = [
    SearchGraphV1Builder(document_store, model_initializer)
]

# Create app
app = create_graph_app(
    builders=builders,
    callbacks=[SourceCallbacks]
)

Bootstrap

python
from flutch_sdk import bootstrap
from graph_module import app

if __name__ == "__main__":
    bootstrap(app)

Complete Example

Full RAG pipeline with Flutch integration.

Query Pipeline

python
# src/pipelines/query_pipeline.py
from flutch_sdk import ModelInitializer
from haystack import Pipeline
from haystack.components.embedders import SentenceTransformersTextEmbedder
from haystack.components.retrievers import InMemoryEmbeddingRetriever
from haystack.components.rankers import TransformersSimilarityRanker
from haystack.components.builders import PromptBuilder

def create_query_pipeline(
    document_store,
    model_initializer: ModelInitializer,
    config: dict
):
    """Create query pipeline with agent configuration"""

    # Extract settings
    retrieval_settings = config.get("retrievalSettings", {})
    ranking_settings = config.get("rankingSettings", {})
    generation_settings = config.get("generationSettings", {})

    pipeline = Pipeline()

    # Embedder
    pipeline.add_component(
        "query_embedder",
        SentenceTransformersTextEmbedder()
    )

    # Retriever with configured top_k
    pipeline.add_component(
        "retriever",
        InMemoryEmbeddingRetriever(
            document_store=document_store,
            top_k=retrieval_settings.get("topK", 10)
        )
    )

    # Ranker with configured top_n
    pipeline.add_component(
        "ranker",
        TransformersSimilarityRanker(
            model=ranking_settings.get("model", "cross-encoder/ms-marco-MiniLM-L-6-v2"),
            top_k=ranking_settings.get("topN", 5)
        )
    )

    # Prompt builder
    prompt_template = """
    Context:
    {% for doc in documents %}
        {{ doc.content }}
    {% endfor %}

    Question: {{ query }}

    Answer the question using only the provided context.
    """

    pipeline.add_component(
        "prompt_builder",
        PromptBuilder(template=prompt_template)
    )

    # Generator using Flutch model
    from components.flutch_generator import FlutchGenerator
    pipeline.add_component(
        "generator",
        FlutchGenerator(
            model_initializer=model_initializer,
            config=config
        )
    )

    # Connect components
    pipeline.connect("query_embedder.embedding", "retriever.query_embedding")
    pipeline.connect("retriever.documents", "ranker.documents")
    pipeline.connect("ranker.documents", "prompt_builder.documents")
    pipeline.connect("prompt_builder.prompt", "generator.prompt")

    return pipeline

Custom Generator

python
# src/components/flutch_generator.py
from flutch_sdk import ModelInitializer
from haystack import component
import logging

logger = logging.getLogger(__name__)

@component
class FlutchGenerator:
    def __init__(self, model_initializer: ModelInitializer, config: dict):
        self.model_initializer = model_initializer
        self.config = config

    @component.output_types(replies=list)
    async def run(self, prompt: str):
        # Get model settings
        model_settings = self.config.get("generationSettings", {})
        model_id = model_settings.get("modelId", "gpt-4o")
        temperature = model_settings.get("temperature", 0.7)
        max_tokens = model_settings.get("maxTokens", 500)

        logger.debug(f"Generating with {model_id}")

        # Initialize model from catalog
        model = await self.model_initializer.initialize_chat_model(
            model_id=model_id,
            temperature=temperature,
            max_tokens=max_tokens
        )

        # Generate
        response = await model.acomplete(prompt)

        return {"replies": [str(response)]}

Builder

python
# src/versions/v1_0_0/builder.py
from flutch_sdk import AbstractGraphBuilder, ModelInitializer
from haystack_integrations.document_stores.mongodb_atlas import MongoDBAtlasDocumentStore
from pipelines.query_pipeline import create_query_pipeline
import os

class SearchGraphV1Builder(AbstractGraphBuilder):
    version = "1.0.0"

    def __init__(self, model_initializer: ModelInitializer):
        super().__init__()
        self.model_initializer = model_initializer

        # Initialize document store
        self.document_store = MongoDBAtlasDocumentStore(
            mongo_connection_string=os.getenv("MONGODB_URI"),
            database_name="flutch_docs",
            collection_name="documents"
        )

    async def build_graph(self, payload=None):
        # Get agent configuration
        config = payload.graph_settings if payload else {}

        # Create pipeline with config
        pipeline = create_query_pipeline(
            document_store=self.document_store,
            model_initializer=self.model_initializer,
            config=config
        )

        return pipeline

    async def execute(self, payload):
        # Build pipeline
        pipeline = await self.build_graph(payload)

        # Extract query
        query = payload.message.content

        # Run pipeline
        result = pipeline.run({
            "query_embedder": {"text": query},
            "prompt_builder": {"query": query}
        })

        # Extract response
        response = result["generator"]["replies"][0]
        documents = result["ranker"]["documents"]

        # Return with sources
        return {
            "output": {
                "text": response,
                "metadata": {
                    "sources": [
                        {
                            "content": doc.content,
                            "score": doc.score,
                            "source": doc.meta.get("source")
                        }
                        for doc in documents
                    ]
                }
            }
        }