A RAG pipeline consists of two main parts: the ingestion pipeline (preparing and indexing data) and the query pipeline (answering questions). Both must be carefully designed.
The ingestion pipeline processes raw data and makes it available for retrieval:
Raw Data → Loader → Splitter → Enricher → Embedder → Vector Store
# 1. Load documents
from langchain_community.document_loaders import PyPDFLoader
loader = PyPDFLoader("company_handbook.pdf")
documents = loader.load()
# 2. Create chunks
from langchain_text_splitters import RecursiveCharacterTextSplitter
splitter = RecursiveCharacterTextSplitter(chunk_size=800, chunk_overlap=150)
chunks = splitter.split_documents(documents)
# 3. Enrich metadata
for chunk in chunks:
chunk.metadata["category"] = classify_content(chunk.page_content)
chunk.metadata["summary"] = generate_summary(chunk.page_content)
# 4. Create embeddings and store
from langchain_community.vectorstores import Chroma
vectorstore = Chroma.from_documents(chunks, OpenAIEmbeddings())
The query pipeline answers questions based on indexed data:
Question → Query Transformation → Retrieval → Re-Ranking → Context → LLM → Answer
Before retrieval, the question can be optimized:
| Technique | Description | When to use |
|---|---|---|
| Query rewriting | LLM rephrases the question | Colloquial questions |
| Query expansion | Add synonyms and related terms | Low recall |
| HyDE | Generate hypothetical answer and use as query | Complex questions |
| Step-back | Formulate more abstract question | Too specific questions |
After retrieval, results are re-sorted for higher relevance:
from langchain.retrievers import ContextualCompressionRetriever
from langchain_cohere import CohereRerank
reranker = CohereRerank(model="rerank-v3.5", top_n=3)
retriever = ContextualCompressionRetriever(
base_compressor=reranker,
base_retriever=vectorstore.as_retriever(search_kwargs={"k": 20})
)
# Fetches 20 results, re-ranks to top 3
| Phase | Speed | Quality |
|---|---|---|
| Retrieval (vector search) | Fast (ms) | Good |
| Re-ranking (cross-encoder) | Slow (100ms+) | Excellent |
Chunks often contain irrelevant parts. Contextual compression extracts only relevant passages:
from langchain.retrievers.document_compressors import LLMChainExtractor
compressor = LLMChainExtractor.from_llm(llm)
retriever = ContextualCompressionRetriever(
base_compressor=compressor,
base_retriever=base_retriever
)
Some questions require multiple retrieval steps:
Question: "How has revenue developed for the product with the highest customer satisfaction score?"
Hop 1: "Which product has the highest customer satisfaction score?"
→ Product X
Hop 2: "How has Product X's revenue developed?"
→ Revenue data
# Iterative retrieval
def multi_hop_retrieve(question: str, max_hops: int = 3):
context = []
current_query = question
for hop in range(max_hops):
results = retriever.invoke(current_query)
context.extend(results)
# Check if sufficient context is available
if has_sufficient_context(context, question):
break
# Generate follow-up query based on existing context
current_query = generate_followup(question, context)
return context
┌─── Query Rewrite ───┐
│ │
User Query ──▶ Router ──▶ Retriever ──▶ Re-Ranker ──▶ LLM ──▶ Answer
│ │
└─── Metadata Filter ─┘
Practical tip: Start with a simple pipeline: Retriever + LLM. Add re-ranking if relevance isn't sufficient. Query transformation pays off when users ask colloquially. Multi-hop is only needed for complex questions — measure before you optimize.