How to Build a RAG Pipeline with Clean Document Ingestion
Why Clean Ingestion Is the Most Important Part of RAG
Most RAG tutorials skip straight to embeddings and vector databases. That is a mistake.
The retrieval quality of your RAG pipeline is bounded by the quality of your ingested data. If you feed raw HTML into your embedding model, you are embedding navigation bars, cookie banners, and ad scripts alongside the actual content. If you feed a PDF through a naive text extractor, you get garbled column layouts, broken tables, and headers interleaved with body text.
This is the garbage-in, garbage-out problem applied to LLMs. Poor ingestion causes three concrete failures:
- Token waste. Boilerplate and formatting artifacts consume context window space without contributing to retrieval accuracy. A 500-word article extracted from raw HTML can easily become 15,000 tokens of noise.
- Retrieval misses. When chunks contain navigation text or broken formatting, the embedding vectors become diluted. Relevant passages score lower in similarity search because the embedding captures noise alongside signal.
- Hallucination risk. When the retrieved context is messy, the LLM has to work harder to extract meaning. It fills gaps with its own knowledge rather than grounding in the source material.
Clean document ingestion solves all three. The rest of this tutorial shows how to build a RAG pipeline that gets ingestion right from the start.
Architecture Overview
The pipeline has five stages:
Source Documents (PDFs, URLs, DOCX, scanned images)
|
[1. Ingest] — Convert to clean markdown
|
[2. Chunk] — Split into retrieval units
|
[3. Embed] — Generate vector embeddings
|
[4. Store] — Upsert to vector database
|
[5. Query] — Retrieve + Generate answer
Each stage is independent and replaceable. You can swap the vector database without touching ingestion. You can change your chunking strategy without re-ingesting documents. This modularity matters in production.
We will use Python throughout, with the following stack:
- Ingestion: Drive AI Markdown API for document-to-markdown conversion
- Embeddings: OpenAI
text-embedding-3-small - Vector store: ChromaDB (swap for Pinecone, Weaviate, or Qdrant as needed)
- Generation: OpenAI
gpt-4o
Prerequisites
Install the dependencies:
pip install thedriveai openai chromadb tiktoken
Set your API keys:
import os
os.environ["OPENAI_API_KEY"] = "sk-..."
DRIVE_AI_API_KEY = "tda_live_..."
Get a free Drive AI API key at dev.thedrive.ai. The free tier includes 100 credits per month.
Step 1: Ingest — Convert Documents to Clean Markdown
The ingestion step converts source documents into clean, structured markdown. This is where most pipelines fail because they use basic text extraction that loses structure, mangles tables, and includes boilerplate.
The Drive AI Markdown API handles this with a single GET request. It supports 107+ file types, renders JavaScript for SPAs, runs OCR with vision model proofreading on scanned documents, and strips all boilerplate automatically.
Ingesting a Webpage
import requests
def ingest_url(url: str) -> str:
"""Convert a URL to clean markdown using Drive AI."""
response = requests.get(
f"https://dev.thedrive.ai/md/{url}",
headers={"X-API-Key": DRIVE_AI_API_KEY},
)
response.raise_for_status()
return response.text
markdown = ingest_url("https://openai.com/index/gpt-4o")
print(markdown[:500])
The returned markdown preserves heading hierarchy, table structure, code blocks, and lists. Navigation, footers, ads, and tracking scripts are stripped.
Ingesting a PDF or Document
The same endpoint handles PDFs, Word documents, spreadsheets, and other file types. Pass the URL where the file is hosted:
# PDF hosted at a URL
markdown = ingest_url("https://arxiv.org/pdf/2312.10997")
# Google Docs, Notion pages, etc. — all work the same way
markdown = ingest_url("https://docs.google.com/document/d/...")
Batch Ingestion
For multiple documents, ingest them in parallel:
from concurrent.futures import ThreadPoolExecutor, as_completed
def ingest_batch(urls: list[str], max_workers: int = 5) -> dict[str, str]:
"""Ingest multiple URLs in parallel. Returns {url: markdown} mapping."""
results = {}
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_url = {
executor.submit(ingest_url, url): url for url in urls
}
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
results[url] = future.result()
except Exception as e:
print(f"Failed to ingest {url}: {e}")
return results
At 1 credit per conversion on the free tier, you get 100 documents per month at no cost.
Step 2: Chunk — Split Markdown into Retrieval Units
Raw markdown needs to be split into chunks that are small enough for precise retrieval but large enough to carry meaning. There are two primary strategies.
Strategy A: Heading-Based Chunking
Split on markdown headings. This preserves semantic boundaries because authors use headings to delineate topics.
import re
def chunk_by_headings(
markdown: str,
max_chunk_size: int = 1500,
source_url: str = "",
) -> list[dict]:
"""Split markdown on headings. Oversized sections get subdivided."""
sections = re.split(r"(?=^#{1,3} )", markdown, flags=re.MULTILINE)
chunks = []
for section in sections:
section = section.strip()
if not section:
continue
# Extract heading for metadata
heading_match = re.match(r"^(#{1,3}) (.+)", section)
heading = heading_match.group(2) if heading_match else ""
if len(section) <= max_chunk_size:
chunks.append({
"text": section,
"heading": heading,
"source": source_url,
})
else:
# Subdivide oversized sections with overlap
sub_chunks = chunk_fixed_size(section, max_chunk_size, overlap=200)
for i, sub in enumerate(sub_chunks):
chunks.append({
"text": sub,
"heading": f"{heading} (part {i + 1})",
"source": source_url,
})
return chunks
Strategy B: Fixed-Size Chunking with Overlap
When documents lack heading structure, use fixed-size chunks with overlap to avoid splitting mid-sentence.
def chunk_fixed_size(
text: str,
max_size: int = 1000,
overlap: int = 200,
) -> list[str]:
"""Split text into fixed-size chunks with overlap."""
chunks = []
start = 0
while start < len(text):
end = start + max_size
# Avoid splitting mid-sentence: find the last period or newline
if end < len(text):
for sep in ["\n\n", "\n", ". ", " "]:
last_sep = text.rfind(sep, start, end)
if last_sep > start:
end = last_sep + len(sep)
break
chunks.append(text[start:end].strip())
start = end - overlap
return chunks
Which Strategy to Use
Use heading-based chunking when your source documents have clear structure (articles, documentation, reports). Use fixed-size chunking for unstructured text (transcripts, emails, raw notes). In practice, the heading-based approach works well for most RAG applications because the Drive AI Markdown API preserves document structure.
Step 3: Embed — Generate Vector Embeddings
Convert each chunk into a vector embedding. We use OpenAI's text-embedding-3-small model here, but you can substitute any embedding model (Cohere, Voyage AI, open-source models via Sentence Transformers).
from openai import OpenAI
client = OpenAI()
def embed_texts(texts: list[str], model: str = "text-embedding-3-small") -> list[list[float]]:
"""Generate embeddings for a list of texts."""
response = client.embeddings.create(
input=texts,
model=model,
)
return [item.embedding for item in response.data]
Batch your embedding calls. The OpenAI embeddings API accepts up to 2048 texts per request:
def embed_chunks(chunks: list[dict], batch_size: int = 100) -> list[dict]:
"""Add embedding vectors to each chunk."""
for i in range(0, len(chunks), batch_size):
batch = chunks[i : i + batch_size]
texts = [c["text"] for c in batch]
embeddings = embed_texts(texts)
for chunk, embedding in zip(batch, embeddings):
chunk["embedding"] = embedding
return chunks
Step 4: Store — Upsert to a Vector Database
Insert the embedded chunks into a vector database for retrieval. This example uses ChromaDB, which runs locally with no infrastructure setup. For production, consider Pinecone, Weaviate, or Qdrant.
ChromaDB Setup
import chromadb
chroma_client = chromadb.PersistentClient(path="./chroma_db")
collection = chroma_client.get_or_create_collection(
name="documents",
metadata={"hnsw:space": "cosine"},
)
Upserting Chunks
import uuid
def store_chunks(chunks: list[dict], collection):
"""Store embedded chunks in ChromaDB."""
ids = [str(uuid.uuid4()) for _ in chunks]
documents = [c["text"] for c in chunks]
embeddings = [c["embedding"] for c in chunks]
metadatas = [
{"source": c.get("source", ""), "heading": c.get("heading", "")}
for c in chunks
]
collection.upsert(
ids=ids,
documents=documents,
embeddings=embeddings,
metadatas=metadatas,
)
print(f"Stored {len(chunks)} chunks in the vector database.")
Using Pinecone Instead
If you prefer a managed vector database, the Pinecone integration is similar:
# pip install pinecone-client
from pinecone import Pinecone
pc = Pinecone(api_key="your-pinecone-key")
index = pc.Index("documents")
def store_chunks_pinecone(chunks: list[dict], index):
"""Store embedded chunks in Pinecone."""
vectors = [
{
"id": str(uuid.uuid4()),
"values": c["embedding"],
"metadata": {
"text": c["text"],
"source": c.get("source", ""),
"heading": c.get("heading", ""),
},
}
for c in chunks
]
# Upsert in batches of 100
for i in range(0, len(vectors), 100):
index.upsert(vectors=vectors[i : i + 100])
Step 5: Retrieve and Generate
With chunks stored, you can now query the pipeline. Embed the user's question, retrieve the most relevant chunks, and pass them to an LLM for answer generation.
def retrieve(query: str, collection, top_k: int = 5) -> list[dict]:
"""Retrieve the most relevant chunks for a query."""
query_embedding = embed_texts([query])[0]
results = collection.query(
query_embeddings=[query_embedding],
n_results=top_k,
include=["documents", "metadatas", "distances"],
)
return [
{
"text": doc,
"source": meta.get("source", ""),
"heading": meta.get("heading", ""),
"score": 1 - dist, # Convert distance to similarity
}
for doc, meta, dist in zip(
results["documents"][0],
results["metadatas"][0],
results["distances"][0],
)
]
def generate_answer(query: str, context_chunks: list[dict]) -> str:
"""Generate an answer grounded in the retrieved context."""
context = "\n\n---\n\n".join(
f"[Source: {c['source']}]\n{c['text']}" for c in context_chunks
)
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "system",
"content": (
"You are a helpful assistant. Answer the user's question "
"based only on the provided context. If the context does not "
"contain enough information, say so. Cite sources when possible."
),
},
{
"role": "user",
"content": f"Context:\n{context}\n\nQuestion: {query}",
},
],
temperature=0,
)
return response.choices[0].message.content
Querying the Pipeline
query = "What are the key differences between GPT-4 and GPT-4o?"
chunks = retrieve(query, collection, top_k=5)
answer = generate_answer(query, chunks)
print(answer)
Handling Different Source Types
One of the reasons clean ingestion matters is that real-world RAG pipelines ingest documents from many sources. Each type has its own extraction challenges.
Webpages and SPAs
Standard HTML fetching fails on JavaScript-rendered pages. The Drive AI Markdown API renders JavaScript before extraction, so React apps, Next.js pages, and dynamically loaded content all get captured correctly.
# All of these work identically
markdown = ingest_url("https://react-docs-site.com/getting-started")
markdown = ingest_url("https://company.notion.site/internal-docs")
markdown = ingest_url("https://medium.com/@author/article-title")
PDFs (Native and Scanned)
Native PDFs (those with selectable text) are extracted with layout analysis that preserves tables, columns, and reading order. Scanned PDFs are processed with OCR and then proofread by a vision model to correct OCR errors.
# Native PDF
markdown = ingest_url("https://example.com/annual-report-2025.pdf")
# Scanned document — OCR happens automatically
markdown = ingest_url("https://example.com/scanned-contract.pdf")
Office Documents
Word documents, Excel spreadsheets, and PowerPoint files are all supported. Tables in Excel are converted to markdown tables. Slide content from PowerPoint is extracted in reading order.
markdown = ingest_url("https://example.com/quarterly-review.docx")
markdown = ingest_url("https://example.com/financial-data.xlsx")
markdown = ingest_url("https://example.com/pitch-deck.pptx")
Using the Extract API for Structured Data
When you need structured fields rather than full markdown (for example, pulling specific metadata from invoices or contracts), use the Drive AI Extract API:
import requests
def extract_structured(file_url: str, schema: dict) -> dict:
"""Extract structured data from a document using a schema."""
response = requests.post(
"https://dev.thedrive.ai/api/v1/extract",
headers={
"X-API-Key": DRIVE_AI_API_KEY,
"Content-Type": "application/json",
},
json={
"url": file_url,
"schema": schema,
},
)
response.raise_for_status()
return response.json()
# Example: extract invoice fields
invoice_data = extract_structured(
"https://example.com/invoice.pdf",
schema={
"vendor_name": "string",
"invoice_number": "string",
"total_amount": "number",
"line_items": [{"description": "string", "amount": "number"}],
},
)
Production Considerations
A tutorial pipeline is not a production pipeline. Here are the things you need to add before deploying.
Caching Ingested Documents
Do not re-ingest documents that have not changed. Hash the source URL or file content and check the cache before calling the API.
import hashlib
ingestion_cache: dict[str, str] = {}
def ingest_with_cache(url: str) -> str:
"""Ingest a URL, using a cache to avoid redundant API calls."""
cache_key = hashlib.sha256(url.encode()).hexdigest()
if cache_key in ingestion_cache:
return ingestion_cache[cache_key]
markdown = ingest_url(url)
ingestion_cache[cache_key] = markdown
return markdown
In production, replace the in-memory dict with Redis or a database.
Error Handling and Retries
Network requests fail. Wrap ingestion calls with retry logic.
import time
def ingest_with_retry(url: str, max_retries: int = 3) -> str:
"""Ingest a URL with exponential backoff on failure."""
for attempt in range(max_retries):
try:
return ingest_url(url)
except requests.exceptions.RequestException as e:
if attempt == max_retries - 1:
raise
wait = 2 ** attempt
print(f"Retry {attempt + 1}/{max_retries} for {url} in {wait}s: {e}")
time.sleep(wait)
Monitoring Retrieval Quality
Log your retrieval scores. If the top chunk's similarity score drops below a threshold, the answer is likely unreliable. Return a fallback message instead of a hallucinated answer.
def retrieve_with_threshold(
query: str,
collection,
top_k: int = 5,
min_score: float = 0.3,
) -> list[dict]:
"""Retrieve chunks, filtering out low-confidence results."""
chunks = retrieve(query, collection, top_k)
filtered = [c for c in chunks if c["score"] >= min_score]
if not filtered:
print(f"Warning: no chunks above threshold {min_score} for query: {query}")
return filtered
Incremental Updates
When source documents change, you need to update the vector store without re-processing everything. Track which chunks came from which source, delete stale chunks, and insert new ones.
def update_source(url: str, collection):
"""Re-ingest a source and replace its chunks in the vector store."""
# Delete existing chunks from this source
existing = collection.get(where={"source": url})
if existing["ids"]:
collection.delete(ids=existing["ids"])
# Re-ingest, chunk, embed, and store
markdown = ingest_url(url)
chunks = chunk_by_headings(markdown, source_url=url)
chunks = embed_chunks(chunks)
store_chunks(chunks, collection)
Full Working Example
Here is the complete pipeline in a single script. Copy this, set your API keys, and run it.
"""
RAG Pipeline with Clean Document Ingestion
Requires: pip install thedriveai openai chromadb
"""
import hashlib
import os
import re
import uuid
from concurrent.futures import ThreadPoolExecutor, as_completed
import chromadb
import requests
from openai import OpenAI
# --- Configuration ---
DRIVE_AI_API_KEY = os.environ.get("DRIVE_AI_API_KEY", "tda_live_...")
openai_client = OpenAI()
# --- Step 1: Ingest ---
def ingest_url(url: str) -> str:
response = requests.get(
f"https://dev.thedrive.ai/md/{url}",
headers={"X-API-Key": DRIVE_AI_API_KEY},
)
response.raise_for_status()
return response.text
def ingest_batch(urls: list[str], max_workers: int = 5) -> dict[str, str]:
results = {}
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_url = {executor.submit(ingest_url, url): url for url in urls}
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
results[url] = future.result()
except Exception as e:
print(f"Failed to ingest {url}: {e}")
return results
# --- Step 2: Chunk ---
def chunk_fixed_size(text: str, max_size: int = 1000, overlap: int = 200) -> list[str]:
chunks = []
start = 0
while start < len(text):
end = start + max_size
if end < len(text):
for sep in ["\n\n", "\n", ". ", " "]:
last_sep = text.rfind(sep, start, end)
if last_sep > start:
end = last_sep + len(sep)
break
chunks.append(text[start:end].strip())
start = end - overlap
return chunks
def chunk_by_headings(
markdown: str, max_chunk_size: int = 1500, source_url: str = ""
) -> list[dict]:
sections = re.split(r"(?=^#{1,3} )", markdown, flags=re.MULTILINE)
chunks = []
for section in sections:
section = section.strip()
if not section:
continue
heading_match = re.match(r"^(#{1,3}) (.+)", section)
heading = heading_match.group(2) if heading_match else ""
if len(section) <= max_chunk_size:
chunks.append(
{"text": section, "heading": heading, "source": source_url}
)
else:
sub_chunks = chunk_fixed_size(section, max_chunk_size, overlap=200)
for i, sub in enumerate(sub_chunks):
chunks.append(
{
"text": sub,
"heading": f"{heading} (part {i + 1})",
"source": source_url,
}
)
return chunks
# --- Step 3: Embed ---
def embed_texts(
texts: list[str], model: str = "text-embedding-3-small"
) -> list[list[float]]:
response = openai_client.embeddings.create(input=texts, model=model)
return [item.embedding for item in response.data]
def embed_chunks(chunks: list[dict], batch_size: int = 100) -> list[dict]:
for i in range(0, len(chunks), batch_size):
batch = chunks[i : i + batch_size]
texts = [c["text"] for c in batch]
embeddings = embed_texts(texts)
for chunk, embedding in zip(batch, embeddings):
chunk["embedding"] = embedding
return chunks
# --- Step 4: Store ---
def get_collection():
chroma_client = chromadb.PersistentClient(path="./chroma_db")
return chroma_client.get_or_create_collection(
name="documents", metadata={"hnsw:space": "cosine"}
)
def store_chunks(chunks: list[dict], collection):
ids = [str(uuid.uuid4()) for _ in chunks]
documents = [c["text"] for c in chunks]
embeddings = [c["embedding"] for c in chunks]
metadatas = [
{"source": c.get("source", ""), "heading": c.get("heading", "")}
for c in chunks
]
collection.upsert(
ids=ids,
documents=documents,
embeddings=embeddings,
metadatas=metadatas,
)
print(f"Stored {len(chunks)} chunks.")
# --- Step 5: Retrieve & Generate ---
def retrieve(query: str, collection, top_k: int = 5) -> list[dict]:
query_embedding = embed_texts([query])[0]
results = collection.query(
query_embeddings=[query_embedding],
n_results=top_k,
include=["documents", "metadatas", "distances"],
)
return [
{
"text": doc,
"source": meta.get("source", ""),
"heading": meta.get("heading", ""),
"score": 1 - dist,
}
for doc, meta, dist in zip(
results["documents"][0],
results["metadatas"][0],
results["distances"][0],
)
]
def generate_answer(query: str, context_chunks: list[dict]) -> str:
context = "\n\n---\n\n".join(
f"[Source: {c['source']}]\n{c['text']}" for c in context_chunks
)
response = openai_client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "system",
"content": (
"Answer the user's question based only on the provided context. "
"If the context does not contain enough information, say so."
),
},
{"role": "user", "content": f"Context:\n{context}\n\nQuestion: {query}"},
],
temperature=0,
)
return response.choices[0].message.content
# --- Run the Pipeline ---
if __name__ == "__main__":
# 1. Ingest documents
urls = [
"https://openai.com/index/gpt-4o",
"https://docs.anthropic.com/en/docs/about-claude/models",
]
documents = ingest_batch(urls)
print(f"Ingested {len(documents)} documents.")
# 2. Chunk all documents
all_chunks = []
for url, markdown in documents.items():
chunks = chunk_by_headings(markdown, source_url=url)
all_chunks.extend(chunks)
print(f"Created {len(all_chunks)} chunks.")
# 3. Embed
all_chunks = embed_chunks(all_chunks)
print("Embeddings generated.")
# 4. Store
collection = get_collection()
store_chunks(all_chunks, collection)
# 5. Query
query = "What are the key capabilities of GPT-4o?"
results = retrieve(query, collection)
answer = generate_answer(query, results)
print(f"\nQuery: {query}")
print(f"Answer: {answer}")
Save this as rag_pipeline.py, set your OPENAI_API_KEY and DRIVE_AI_API_KEY environment variables, and run it:
export OPENAI_API_KEY="sk-..."
export DRIVE_AI_API_KEY="tda_live_..."
python rag_pipeline.py
The pipeline ingests the specified URLs, chunks the content, generates embeddings, stores them in a local ChromaDB instance, and answers queries grounded in the ingested documents.
What to Build Next
This pipeline covers the core loop. From here, you can extend it with:
- LangChain or LlamaIndex integration. Use Drive AI as the document loader within your existing framework. The clean markdown output plugs directly into
Documentobjects. - Metadata filtering. Add fields like
document_type,date_ingested, ordepartmentto your chunk metadata and filter at query time. - Hybrid search. Combine vector similarity with keyword search (BM25) for better retrieval on exact-match queries.
- Reranking. Add a cross-encoder reranker (Cohere Rerank,
cross-encoder/ms-marco-MiniLM-L-6-v2) between retrieval and generation to improve precision. - Evaluation. Use frameworks like Ragas or DeepEval to measure retrieval accuracy, answer faithfulness, and context relevance.
The ingestion layer is the foundation. Get clean data in, and every downstream component performs better.
Have questions? Reach out at contact@thedrive.ai.
Share it with your network
