Key Takeaway
By the end of this blueprint you will have a production RAG pipeline using pgvector for dense retrieval, BM25 for sparse matching, a cross-encoder reranker for precision, and citation-aware prompt construction that lets users trace every claim back to a source document and page number.
Prerequisites
- PostgreSQL 16+ with the pgvector extension installed
- Python 3.11+ with familiarity with async patterns
- An embedding model API key (OpenAI, Cohere, or a local model via sentence-transformers)
- Basic understanding of vector similarity search concepts
- Docker for running PostgreSQL and any local embedding models
Why RAG Over Fine-Tuning?
Fine-tuning bakes knowledge into model weights, which works well for style and format but poorly for factual grounding. When your source data changes weekly or daily — internal docs, knowledge bases, product catalogs — fine-tuning cannot keep up. RAG keeps the LLM's reasoning capabilities intact while swapping the knowledge layer at query time. This means you can update your corpus without retraining, attribute every answer to a source document, and enforce access control on retrieval results without modifying the model.
RAG and fine-tuning are not mutually exclusive. Fine-tune for style and domain-specific reasoning patterns, then use RAG for factual grounding. The combination outperforms either approach alone for most enterprise use cases.
Architecture Overview
The pipeline is split into two paths: an offline ingestion path that processes documents through chunking, embedding, and indexing, and an online query path that retrieves, reranks, and synthesizes answers. A metadata store tracks document provenance so every generated response can cite its sources back to specific document sections and page numbers.
Document Ingestion and Chunking
Chunking strategy has an outsized impact on retrieval quality. Chunks that are too large dilute the signal with irrelevant context; chunks that are too small lose the context needed for coherent answers. The sweet spot depends on your content type: 200-400 tokens for dense technical documentation, 400-800 tokens for narrative content like reports and articles. We use recursive character splitting with overlap to maintain context across chunk boundaries.
"""Document chunking with recursive splitting and metadata preservation."""
from dataclasses import dataclass
from hashlib import sha256
from typing import Generator
from langchain_text_splitters import RecursiveCharacterTextSplitter
@dataclass
class Chunk:
content: str
doc_id: str
chunk_index: int
token_count: int
metadata: dict # page_number, section, heading, source_url
def chunk_document(
text: str,
doc_id: str,
metadata: dict,
chunk_size: int = 512,
chunk_overlap: int = 64,
) -> Generator[Chunk, None, None]:
"""Split a document into overlapping chunks with metadata.
Uses recursive splitting that respects paragraph and sentence
boundaries, falling back to character splitting only when necessary.
"""
splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=["\n\n", "\n", ". ", " ", ""],
length_function=lambda t: len(t.split()), # token-approximate
)
chunks = splitter.split_text(text)
for i, chunk_text in enumerate(chunks):
yield Chunk(
content=chunk_text,
doc_id=doc_id,
chunk_index=i,
token_count=len(chunk_text.split()),
metadata={
**metadata,
"chunk_hash": sha256(chunk_text.encode()).hexdigest()[:16],
},
)Embedding and Indexing with pgvector
We use pgvector inside PostgreSQL rather than a standalone vector database. This keeps your vectors co-located with metadata, enables transactional consistency between document updates and index changes, and avoids adding another managed service to your infrastructure. The HNSW index type provides sub-linear query time with excellent recall at the cost of higher memory usage during index builds.
"""Embedding generation and pgvector storage."""
import asyncio
from typing import List
import asyncpg
import numpy as np
from openai import AsyncOpenAI
EMBEDDING_MODEL = "text-embedding-3-small"
EMBEDDING_DIM = 1536
client = AsyncOpenAI()
async def embed_batch(texts: List[str], batch_size: int = 100) -> List[List[float]]:
"""Generate embeddings in batches to respect API rate limits."""
all_embeddings = []
for i in range(0, len(texts), batch_size):
batch = texts[i : i + batch_size]
response = await client.embeddings.create(
model=EMBEDDING_MODEL,
input=batch,
)
all_embeddings.extend([e.embedding for e in response.data])
return all_embeddings
async def store_chunks(pool: asyncpg.Pool, chunks, embeddings):
"""Store chunks with embeddings in pgvector."""
async with pool.acquire() as conn:
await conn.executemany(
"""
INSERT INTO document_chunks
(doc_id, chunk_index, content, embedding, metadata)
VALUES ($1, $2, $3, $4::vector, $5::jsonb)
ON CONFLICT (doc_id, chunk_index)
DO UPDATE SET content = EXCLUDED.content,
embedding = EXCLUDED.embedding,
metadata = EXCLUDED.metadata
""",
[
(c.doc_id, c.chunk_index, c.content,
str(emb), dict(c.metadata))
for c, emb in zip(chunks, embeddings)
],
)-- pgvector extension and chunks table
CREATE EXTENSION IF NOT EXISTS vector;
CREATE TABLE document_chunks (
id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
doc_id TEXT NOT NULL,
chunk_index INTEGER NOT NULL,
content TEXT NOT NULL,
embedding vector(1536) NOT NULL,
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT now(),
UNIQUE (doc_id, chunk_index)
);
-- HNSW index for fast approximate nearest neighbor search
CREATE INDEX idx_chunks_embedding ON document_chunks
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 200);
-- GIN index on metadata for filtered searches
CREATE INDEX idx_chunks_metadata ON document_chunks
USING gin (metadata jsonb_path_ops);Hybrid Retrieval: Dense + Sparse
Dense vector search excels at semantic similarity but struggles with exact keyword matches — searching for a specific error code or product SKU can miss relevant documents that a simple text search would find instantly. Hybrid retrieval combines dense vector search with sparse BM25 keyword matching and merges the results using Reciprocal Rank Fusion (RRF). This consistently outperforms either approach alone across diverse query types.
"""Hybrid retrieval combining dense vector search with BM25."""
from dataclasses import dataclass
from typing import List
import asyncpg
@dataclass
class RetrievalResult:
chunk_id: int
doc_id: str
content: str
score: float
metadata: dict
async def hybrid_search(
pool: asyncpg.Pool,
query_embedding: List[float],
query_text: str,
top_k: int = 20,
rrf_k: int = 60,
) -> List[RetrievalResult]:
"""Perform hybrid search using RRF to merge dense and sparse results.
Reciprocal Rank Fusion: score = sum(1 / (k + rank_i)) for each method.
"""
async with pool.acquire() as conn:
rows = await conn.fetch(
"""
WITH dense AS (
SELECT id, doc_id, content, metadata,
ROW_NUMBER() OVER (
ORDER BY embedding <=> $1::vector
) AS rank
FROM document_chunks
ORDER BY embedding <=> $1::vector
LIMIT $3
),
sparse AS (
SELECT id, doc_id, content, metadata,
ROW_NUMBER() OVER (
ORDER BY ts_rank(
to_tsvector('english', content),
plainto_tsquery('english', $2)
) DESC
) AS rank
FROM document_chunks
WHERE to_tsvector('english', content)
@@ plainto_tsquery('english', $2)
LIMIT $3
),
combined AS (
SELECT COALESCE(d.id, s.id) AS id,
COALESCE(d.doc_id, s.doc_id) AS doc_id,
COALESCE(d.content, s.content) AS content,
COALESCE(d.metadata, s.metadata) AS metadata,
COALESCE(1.0 / ($4 + d.rank), 0) +
COALESCE(1.0 / ($4 + s.rank), 0) AS rrf_score
FROM dense d
FULL OUTER JOIN sparse s ON d.id = s.id
)
SELECT * FROM combined ORDER BY rrf_score DESC LIMIT $3
""",
str(query_embedding), query_text, top_k, rrf_k,
)
return [
RetrievalResult(
chunk_id=r["id"], doc_id=r["doc_id"],
content=r["content"], score=r["rrf_score"],
metadata=dict(r["metadata"]),
)
for r in rows
]Cross-Encoder Reranking
The initial retrieval stage optimizes for recall — casting a wide net to avoid missing relevant documents. The reranking stage optimizes for precision — scoring each candidate against the query with a cross-encoder model that reads query and document together, producing a much more accurate relevance score than bi-encoder similarity. We retrieve 20 candidates and rerank down to the top 5.
"""Cross-encoder reranking for retrieval precision."""
from typing import List
from sentence_transformers import CrossEncoder
# Load once at module level — ~420MB model
reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")
def rerank(
query: str,
results: List[dict],
top_k: int = 5,
) -> List[dict]:
"""Rerank retrieval results using a cross-encoder model.
Args:
query: The user's search query.
results: List of dicts with 'content' field.
top_k: Number of top results to return after reranking.
Returns:
Reranked and truncated list of results.
"""
if not results:
return []
pairs = [(query, r["content"]) for r in results]
scores = reranker.predict(pairs)
for result, score in zip(results, scores):
result["rerank_score"] = float(score)
reranked = sorted(results, key=lambda r: r["rerank_score"], reverse=True)
return reranked[:top_k]Citation-Aware Prompt Construction
Once you have your top-k reranked chunks, the final step is constructing a prompt that instructs the LLM to ground its response in the retrieved context and cite sources. Each chunk gets a numbered reference tag, and the system prompt requires the model to include inline citations like [1], [2] in its response. Post-processing then maps these references back to document metadata for user-facing source links.
Place retrieved context before the user question in the prompt. LLMs attend more strongly to content near the beginning and end of the context window. Put the most relevant chunks first and the user question last for best results.
Chunking Strategy Comparison
| Strategy | Chunk Size | Best For | Recall | Precision |
|---|---|---|---|---|
| Fixed-size | 512 tokens | General purpose | Medium | Medium |
| Recursive character | 256-512 tokens | Structured docs | High | Medium |
| Semantic splitting | Variable | Narrative content | High | High |
| Parent-child | 128 + 1024 tokens | Dense technical | Very High | High |
| Sliding window | 512 tokens, 128 overlap | Conversation logs | High | Medium |
Monitoring Retrieval Quality
A RAG pipeline can silently degrade as your corpus grows or changes. Monitor retrieval quality continuously by tracking three key metrics: retrieval recall (are relevant documents being found?), answer faithfulness (is the LLM's response grounded in the retrieved context?), and citation accuracy (do the cited sources actually support the claims?). Log every retrieval session with the query, retrieved chunks, reranked chunks, and generated response so you can replay and debug quality issues.
Watch for embedding drift: if your embedding model is updated or swapped, existing vectors become incompatible with new query embeddings. Always re-embed your entire corpus when changing embedding models, and version your embedding model alongside your index.
Data Quality
Retrieval Quality
Production Operations
Version History
1.0.0 · 2026-03-01
- • Initial publication with pgvector, hybrid search, and cross-encoder reranking
- • Chunking strategies comparison and recursive splitter implementation
- • Citation-aware prompt construction pattern
- • Production checklist and monitoring guidance