LlamaIndex

How to use LlamaIndex Workflows with Flutch SDK.

Overview

LlamaIndex Workflows is an event-driven framework for RAG. See LlamaIndex docs for framework details.

This guide covers:

  • Wrapping workflows in AbstractGraphBuilder
  • Accessing agent configuration in workflow steps
  • Using Flutch services (ModelInitializer, McpRuntimeClient)
  • Adding interactive callbacks
  • Indexing and retrieval with agent config

Builder Integration

Wrap your LlamaIndex Workflow in AbstractGraphBuilder.

Basic Setup

python
from flutch_sdk import AbstractGraphBuilder
from llama_index.core.workflow import Workflow, StartEvent, StopEvent, step
from llama_index.core import VectorStoreIndex

class RAGWorkflow(Workflow):
    def __init__(self, index: VectorStoreIndex, config: dict):
        super().__init__()
        self.index = index
        self.config = config

    @step
    async def retrieve(self, ev: StartEvent) -> StopEvent:
        query = ev.query

        # Use config
        top_k = self.config.get("retrievalSettings", {}).get("topK", 10)
        retriever = self.index.as_retriever(similarity_top_k=top_k)
        nodes = await retriever.aretrieve(query)

        return StopEvent(result={"documents": nodes})

class RAGGraphV1Builder(AbstractGraphBuilder):
    version = "1.0.0"

    def __init__(self, vector_index: VectorStoreIndex):
        super().__init__()
        self.vector_index = vector_index

    async def build_graph(self, payload=None):
        # Get agent configuration
        config = payload.graph_settings if payload else {}

        # Create workflow with config
        workflow = RAGWorkflow(
            index=self.vector_index,
            config=config
        )

        return workflow

Key points:

  • ✅ Pass agent config to workflow initialization
  • ✅ Workflow can access config in all steps
  • ✅ Return workflow instance from build_graph()

Configuration Access

Access agent settings from workflow config passed during initialization.

Workflow with Configuration

python
from llama_index.core.workflow import Workflow, step
import logging

logger = logging.getLogger(__name__)

class RAGWorkflow(Workflow):
    def __init__(self, index: VectorStoreIndex, config: dict):
        super().__init__()
        self.index = index
        self.retrieval_settings = config.get("retrievalSettings", {})
        self.rerank_settings = config.get("rerankSettings", {})
        self.model_settings = config.get("modelSettings", {})

    @step
    async def retrieve(self, ev: QueryEvent) -> RetrievalEvent:
        # Use configured retrieval settings
        top_k = self.retrieval_settings.get("topK", 10)
        threshold = self.retrieval_settings.get("similarityThreshold", 0.7)

        logger.debug(f"Retrieving top {top_k} with threshold {threshold}")

        retriever = self.index.as_retriever(
            similarity_top_k=top_k,
            similarity_cutoff=threshold
        )

        nodes = await retriever.aretrieve(ev.query)

        return RetrievalEvent(
            query=ev.query,
            documents=[n.get_content() for n in nodes],
            scores=[n.score for n in nodes]
        )

Agent config flows:

Agent Config → payload.graphSettings → workflow.__init__(config) → All steps

Using Models

Use ModelInitializer to get models from agent config.

Model Initialization in Workflow

python
from flutch_sdk import ModelInitializer, McpRuntimeClient
from llama_index.core.workflow import Workflow, step
from llama_index.core import Settings

class RAGWorkflow(Workflow):
    def __init__(
        self,
        index: VectorStoreIndex,
        config: dict,
        model_initializer: ModelInitializer
    ):
        super().__init__()
        self.index = index
        self.config = config
        self.model_initializer = model_initializer

    @step
    async def generate(self, ev: RerankEvent) -> StopEvent:
        # Get model config
        model_settings = self.config.get("modelSettings", {})
        model_id = model_settings.get("modelId", "gpt-4o")
        temperature = model_settings.get("temperature", 0.7)

        # Initialize model from catalog
        model = await self.model_initializer.initialize_chat_model(
            model_id=model_id,
            temperature=temperature
        )

        # Build context
        context = "\n\n".join(ev.documents)
        prompt = f"Context:\n{context}\n\nQuestion: {ev.query}\n\nAnswer:"

        # Generate
        response = await model.acomplete(prompt)

        return StopEvent(
            result={
                "text": str(response),
                "sources": ev.documents
            }
        )

Pass ModelInitializer in Builder

python
class RAGGraphV1Builder(AbstractGraphBuilder):
    version = "1.0.0"

    def __init__(
        self,
        vector_index: VectorStoreIndex,
        model_initializer: ModelInitializer
    ):
        super().__init__()
        self.vector_index = vector_index
        self.model_initializer = model_initializer

    async def build_graph(self, payload=None):
        config = payload.graph_settings if payload else {}

        workflow = RAGWorkflow(
            index=self.vector_index,
            config=config,
            model_initializer=self.model_initializer  # Pass to workflow
        )

        return workflow

Benefits:

  • ✅ Models from catalog - no API keys in code
  • ✅ Each agent uses different model
  • ✅ Centralized model management

Interactive Callbacks

Add callback buttons from workflow steps.

Callback in Workflow

python
from flutch_sdk import Callback, CallbackResult, ExtendedCallbackContext, WithCallbacks, CallbackService

# 1. Create callback handler
class DocumentCallbacks:
    @Callback("select-document")
    async def handle_selection(self, context: ExtendedCallbackContext) -> CallbackResult:
        doc_id = context.params.get("doc_id")

        # Process selection
        document = await self.get_document(doc_id)

        return CallbackResult(
            success=True,
            message="Document selected",
            patch={
                "text": f"Using document: {document.title}",
                "disable_buttons": True
            }
        )

# 2. Register in builder
@WithCallbacks(DocumentCallbacks)
class RAGGraphV1Builder(AbstractGraphBuilder):
    version = "1.0.0"
    # ...

# 3. Issue callback from workflow step
class RAGWorkflow(Workflow):
    def __init__(self, index, config, callback_service: CallbackService):
        super().__init__()
        self.index = index
        self.config = config
        self.callback_service = callback_service

    @step
    async def present_sources(self, ev: RetrievalEvent) -> StopEvent:
        # Create buttons for each source
        buttons = []
        for i, doc in enumerate(ev.documents[:3]):
            token = await self.callback_service.issue(
                handler="select-document",
                params={"doc_id": doc.id}
            )
            buttons.append({
                "text": f"Source {i+1}",
                "callback_token": token
            })

        return StopEvent(
            result={
                "text": "Select a source to continue:",
                "buttons": buttons
            }
        )

Flow:

Workflow step → issue callback → User clicks → Handler executes → Continue

Document Indexing

Index documents with agent-specific configuration.

Vector Store Setup

python
from llama_index.core import VectorStoreIndex, Document
from llama_index.vector_stores.mongodb import MongoDBAtlasVectorSearch
from llama_index.embeddings.openai import OpenAIEmbedding
import os

class RAGGraphV1Builder(AbstractGraphBuilder):
    version = "1.0.0"

    def __init__(self, model_initializer: ModelInitializer):
        super().__init__()
        self.model_initializer = model_initializer

        # Initialize vector store
        self.vector_store = MongoDBAtlasVectorSearch(
            mongodb_client=mongo_client,
            db_name="flutch_vectors",
            collection_name="documents",
            index_name="vector_index"
        )

    async def build_graph(self, payload=None):
        config = payload.graph_settings if payload else {}

        # Get embedding model from config
        embedding_model_id = config.get("embeddingModel", "text-embedding-3-small")
        embedding_model = await self.model_initializer.initialize_embedding_model(
            model_id=embedding_model_id
        )

        # Create index with configured embeddings
        index = VectorStoreIndex.from_vector_store(
            vector_store=self.vector_store,
            embed_model=embedding_model
        )

        workflow = RAGWorkflow(index=index, config=config)
        return workflow

Benefits:

  • ✅ Different agents use different embedding models
  • ✅ Centralized document storage
  • ✅ Per-agent indexing configuration

Streaming

Enable streaming responses.

python
from llama_index.core.workflow import Workflow, step

class RAGWorkflow(Workflow):
    @step
    async def generate(self, ev: RerankEvent) -> StopEvent:
        # Build context
        context = "\n\n".join(ev.documents)
        prompt = f"Context:\n{context}\n\nQuestion: {ev.query}"

        # Stream response
        response_stream = await model.astream_complete(prompt)

        # Yield chunks - platform handles streaming
        full_response = ""
        async for chunk in response_stream:
            full_response += chunk.delta
            yield chunk.delta  # Automatically streamed to user

        return StopEvent(
            result={
                "text": full_response,
                "sources": ev.documents
            }
        )

Platform automatically streams to all channels (Web, Telegram, WhatsApp).

Module Setup

Register everything in FastAPI application.

Graph Module

python
from flutch_sdk import create_graph_app
from versions.v1_0_0.builder import RAGGraphV1Builder
from callbacks.document_callbacks import DocumentCallbacks

# Create builders
builders = [
    RAGGraphV1Builder(vector_index, model_initializer)
]

# Create app
app = create_graph_app(
    builders=builders,
    callbacks=[DocumentCallbacks]
)

Bootstrap

python
from flutch_sdk import bootstrap
from graph_module import app

if __name__ == "__main__":
    bootstrap(app)

Complete Example

Full RAG workflow with Flutch integration.

Event Definitions

python
# src/events.py
from llama_index.core.workflow import Event

class QueryEvent(Event):
    query: str

class RetrievalEvent(Event):
    query: str
    documents: list
    scores: list

class RerankEvent(Event):
    query: str
    documents: list
    relevance_scores: list

Workflow Implementation

python
# src/workflows/rag_workflow.py
from flutch_sdk import ModelInitializer
from llama_index.core.workflow import Workflow, StartEvent, StopEvent, step
from llama_index.core import VectorStoreIndex
from events import QueryEvent, RetrievalEvent, RerankEvent
import logging

logger = logging.getLogger(__name__)

class RAGWorkflow(Workflow):
    def __init__(
        self,
        index: VectorStoreIndex,
        config: dict,
        model_initializer: ModelInitializer
    ):
        super().__init__()
        self.index = index
        self.config = config
        self.model_initializer = model_initializer

    @step
    async def start(self, ev: StartEvent) -> QueryEvent:
        return QueryEvent(query=ev.query)

    @step
    async def retrieve(self, ev: QueryEvent) -> RetrievalEvent:
        # Use configured settings
        top_k = self.config.get("retrievalSettings", {}).get("topK", 10)
        threshold = self.config.get("retrievalSettings", {}).get("similarityThreshold", 0.7)

        logger.debug(f"Retrieving top {top_k} documents")

        retriever = self.index.as_retriever(
            similarity_top_k=top_k,
            similarity_cutoff=threshold
        )
        nodes = await retriever.aretrieve(ev.query)

        return RetrievalEvent(
            query=ev.query,
            documents=[n.get_content() for n in nodes],
            scores=[n.score for n in nodes]
        )

    @step
    async def rerank(self, ev: RetrievalEvent) -> RerankEvent:
        # Check if reranking enabled
        rerank_enabled = self.config.get("rerankSettings", {}).get("enabled", True)

        if not rerank_enabled:
            return RerankEvent(
                query=ev.query,
                documents=ev.documents,
                relevance_scores=ev.scores
            )

        top_n = self.config.get("rerankSettings", {}).get("topN", 5)

        # Use reranker
        from llama_index.postprocessor.cohere_rerank import CohereRerank
        reranker = CohereRerank(top_n=top_n)

        reranked = await reranker.apostprocess_nodes(
            ev.documents,
            query_str=ev.query
        )

        return RerankEvent(
            query=ev.query,
            documents=[n.get_content() for n in reranked],
            relevance_scores=[n.score for n in reranked]
        )

    @step
    async def generate(self, ev: RerankEvent) -> StopEvent:
        # Get model from config
        model_settings = self.config.get("modelSettings", {})
        model_id = model_settings.get("modelId", "gpt-4o")
        temperature = model_settings.get("temperature", 0.7)

        model = await self.model_initializer.initialize_chat_model(
            model_id=model_id,
            temperature=temperature
        )

        # Build context with citations
        context = "\n\n".join([
            f"[{i+1}] {doc}"
            for i, doc in enumerate(ev.documents)
        ])

        prompt = f"""Context:
{context}

Question: {ev.query}

Answer using only the provided context. Cite sources using [1], [2], etc."""

        # Generate response
        response = await model.acomplete(prompt)

        return StopEvent(
            result={
                "text": str(response),
                "sources": ev.documents,
                "relevance_scores": ev.relevance_scores
            }
        )

Builder

python
# src/versions/v1_0_0/builder.py
from flutch_sdk import AbstractGraphBuilder, ModelInitializer
from llama_index.core import VectorStoreIndex
from llama_index.vector_stores.mongodb import MongoDBAtlasVectorSearch
from workflows.rag_workflow import RAGWorkflow

class RAGGraphV1Builder(AbstractGraphBuilder):
    version = "1.0.0"

    def __init__(
        self,
        mongo_client,
        model_initializer: ModelInitializer
    ):
        super().__init__()
        self.model_initializer = model_initializer

        # Initialize vector store
        vector_store = MongoDBAtlasVectorSearch(
            mongodb_client=mongo_client,
            db_name="flutch_vectors",
            collection_name="documents",
            index_name="vector_index"
        )

        self.vector_store = vector_store

    async def build_graph(self, payload=None):
        config = payload.graph_settings if payload else {}

        # Get embedding model
        embedding_model_id = config.get("embeddingModel", "text-embedding-3-small")
        embedding_model = await self.model_initializer.initialize_embedding_model(
            model_id=embedding_model_id
        )

        # Create index
        index = VectorStoreIndex.from_vector_store(
            vector_store=self.vector_store,
            embed_model=embedding_model
        )

        # Create workflow
        workflow = RAGWorkflow(
            index=index,
            config=config,
            model_initializer=self.model_initializer
        )

        return workflow