Blog
15 min read

How to Build a RAG Pipeline with Clean Document Ingestion

Why Clean Ingestion Is the Most Important Part of RAG

Most RAG tutorials skip straight to embeddings and vector databases. That is a mistake.

The retrieval quality of your RAG pipeline is bounded by the quality of your ingested data. If you feed raw HTML into your embedding model, you are embedding navigation bars, cookie banners, and ad scripts alongside the actual content. If you feed a PDF through a naive text extractor, you get garbled column layouts, broken tables, and headers interleaved with body text.

This is the garbage-in, garbage-out problem applied to LLMs. Poor ingestion causes three concrete failures:

  • Token waste. Boilerplate and formatting artifacts consume context window space without contributing to retrieval accuracy. A 500-word article extracted from raw HTML can easily become 15,000 tokens of noise.
  • Retrieval misses. When chunks contain navigation text or broken formatting, the embedding vectors become diluted. Relevant passages score lower in similarity search because the embedding captures noise alongside signal.
  • Hallucination risk. When the retrieved context is messy, the LLM has to work harder to extract meaning. It fills gaps with its own knowledge rather than grounding in the source material.

Clean document ingestion solves all three. The rest of this tutorial shows how to build a RAG pipeline that gets ingestion right from the start.

Architecture Overview

The pipeline has five stages:

Source Documents (PDFs, URLs, DOCX, scanned images)
        |
    [1. Ingest] — Convert to clean markdown
        |
    [2. Chunk]  — Split into retrieval units
        |
    [3. Embed]  — Generate vector embeddings
        |
    [4. Store]  — Upsert to vector database
        |
    [5. Query]  — Retrieve + Generate answer

Each stage is independent and replaceable. You can swap the vector database without touching ingestion. You can change your chunking strategy without re-ingesting documents. This modularity matters in production.

We will use Python throughout, with the following stack:

  • Ingestion: Drive AI Markdown API for document-to-markdown conversion
  • Embeddings: OpenAI text-embedding-3-small
  • Vector store: ChromaDB (swap for Pinecone, Weaviate, or Qdrant as needed)
  • Generation: OpenAI gpt-4o

Prerequisites

Install the dependencies:

pip install thedriveai openai chromadb tiktoken

Set your API keys:

import os

os.environ["OPENAI_API_KEY"] = "sk-..."
DRIVE_AI_API_KEY = "tda_live_..."

Get a free Drive AI API key at dev.thedrive.ai. The free tier includes 100 credits per month.

Step 1: Ingest — Convert Documents to Clean Markdown

The ingestion step converts source documents into clean, structured markdown. This is where most pipelines fail because they use basic text extraction that loses structure, mangles tables, and includes boilerplate.

The Drive AI Markdown API handles this with a single GET request. It supports 107+ file types, renders JavaScript for SPAs, runs OCR with vision model proofreading on scanned documents, and strips all boilerplate automatically.

Ingesting a Webpage

import requests

def ingest_url(url: str) -> str:
    """Convert a URL to clean markdown using Drive AI."""
    response = requests.get(
        f"https://dev.thedrive.ai/md/{url}",
        headers={"X-API-Key": DRIVE_AI_API_KEY},
    )
    response.raise_for_status()
    return response.text
markdown = ingest_url("https://openai.com/index/gpt-4o")
print(markdown[:500])

The returned markdown preserves heading hierarchy, table structure, code blocks, and lists. Navigation, footers, ads, and tracking scripts are stripped.

Ingesting a PDF or Document

The same endpoint handles PDFs, Word documents, spreadsheets, and other file types. Pass the URL where the file is hosted:

# PDF hosted at a URL
markdown = ingest_url("https://arxiv.org/pdf/2312.10997")

# Google Docs, Notion pages, etc. — all work the same way
markdown = ingest_url("https://docs.google.com/document/d/...")

Batch Ingestion

For multiple documents, ingest them in parallel:

from concurrent.futures import ThreadPoolExecutor, as_completed

def ingest_batch(urls: list[str], max_workers: int = 5) -> dict[str, str]:
    """Ingest multiple URLs in parallel. Returns {url: markdown} mapping."""
    results = {}
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_url = {
            executor.submit(ingest_url, url): url for url in urls
        }
        for future in as_completed(future_to_url):
            url = future_to_url[future]
            try:
                results[url] = future.result()
            except Exception as e:
                print(f"Failed to ingest {url}: {e}")
    return results

At 1 credit per conversion on the free tier, you get 100 documents per month at no cost.

Step 2: Chunk — Split Markdown into Retrieval Units

Raw markdown needs to be split into chunks that are small enough for precise retrieval but large enough to carry meaning. There are two primary strategies.

Strategy A: Heading-Based Chunking

Split on markdown headings. This preserves semantic boundaries because authors use headings to delineate topics.

import re

def chunk_by_headings(
    markdown: str,
    max_chunk_size: int = 1500,
    source_url: str = "",
) -> list[dict]:
    """Split markdown on headings. Oversized sections get subdivided."""
    sections = re.split(r"(?=^#{1,3} )", markdown, flags=re.MULTILINE)
    chunks = []

    for section in sections:
        section = section.strip()
        if not section:
            continue

        # Extract heading for metadata
        heading_match = re.match(r"^(#{1,3}) (.+)", section)
        heading = heading_match.group(2) if heading_match else ""

        if len(section) <= max_chunk_size:
            chunks.append({
                "text": section,
                "heading": heading,
                "source": source_url,
            })
        else:
            # Subdivide oversized sections with overlap
            sub_chunks = chunk_fixed_size(section, max_chunk_size, overlap=200)
            for i, sub in enumerate(sub_chunks):
                chunks.append({
                    "text": sub,
                    "heading": f"{heading} (part {i + 1})",
                    "source": source_url,
                })

    return chunks

Strategy B: Fixed-Size Chunking with Overlap

When documents lack heading structure, use fixed-size chunks with overlap to avoid splitting mid-sentence.

def chunk_fixed_size(
    text: str,
    max_size: int = 1000,
    overlap: int = 200,
) -> list[str]:
    """Split text into fixed-size chunks with overlap."""
    chunks = []
    start = 0

    while start < len(text):
        end = start + max_size

        # Avoid splitting mid-sentence: find the last period or newline
        if end < len(text):
            for sep in ["\n\n", "\n", ". ", " "]:
                last_sep = text.rfind(sep, start, end)
                if last_sep > start:
                    end = last_sep + len(sep)
                    break

        chunks.append(text[start:end].strip())
        start = end - overlap

    return chunks

Which Strategy to Use

Use heading-based chunking when your source documents have clear structure (articles, documentation, reports). Use fixed-size chunking for unstructured text (transcripts, emails, raw notes). In practice, the heading-based approach works well for most RAG applications because the Drive AI Markdown API preserves document structure.

Step 3: Embed — Generate Vector Embeddings

Convert each chunk into a vector embedding. We use OpenAI's text-embedding-3-small model here, but you can substitute any embedding model (Cohere, Voyage AI, open-source models via Sentence Transformers).

from openai import OpenAI

client = OpenAI()

def embed_texts(texts: list[str], model: str = "text-embedding-3-small") -> list[list[float]]:
    """Generate embeddings for a list of texts."""
    response = client.embeddings.create(
        input=texts,
        model=model,
    )
    return [item.embedding for item in response.data]

Batch your embedding calls. The OpenAI embeddings API accepts up to 2048 texts per request:

def embed_chunks(chunks: list[dict], batch_size: int = 100) -> list[dict]:
    """Add embedding vectors to each chunk."""
    for i in range(0, len(chunks), batch_size):
        batch = chunks[i : i + batch_size]
        texts = [c["text"] for c in batch]
        embeddings = embed_texts(texts)
        for chunk, embedding in zip(batch, embeddings):
            chunk["embedding"] = embedding
    return chunks

Step 4: Store — Upsert to a Vector Database

Insert the embedded chunks into a vector database for retrieval. This example uses ChromaDB, which runs locally with no infrastructure setup. For production, consider Pinecone, Weaviate, or Qdrant.

ChromaDB Setup

import chromadb

chroma_client = chromadb.PersistentClient(path="./chroma_db")
collection = chroma_client.get_or_create_collection(
    name="documents",
    metadata={"hnsw:space": "cosine"},
)

Upserting Chunks

import uuid

def store_chunks(chunks: list[dict], collection):
    """Store embedded chunks in ChromaDB."""
    ids = [str(uuid.uuid4()) for _ in chunks]
    documents = [c["text"] for c in chunks]
    embeddings = [c["embedding"] for c in chunks]
    metadatas = [
        {"source": c.get("source", ""), "heading": c.get("heading", "")}
        for c in chunks
    ]

    collection.upsert(
        ids=ids,
        documents=documents,
        embeddings=embeddings,
        metadatas=metadatas,
    )
    print(f"Stored {len(chunks)} chunks in the vector database.")

Using Pinecone Instead

If you prefer a managed vector database, the Pinecone integration is similar:

# pip install pinecone-client
from pinecone import Pinecone

pc = Pinecone(api_key="your-pinecone-key")
index = pc.Index("documents")

def store_chunks_pinecone(chunks: list[dict], index):
    """Store embedded chunks in Pinecone."""
    vectors = [
        {
            "id": str(uuid.uuid4()),
            "values": c["embedding"],
            "metadata": {
                "text": c["text"],
                "source": c.get("source", ""),
                "heading": c.get("heading", ""),
            },
        }
        for c in chunks
    ]
    # Upsert in batches of 100
    for i in range(0, len(vectors), 100):
        index.upsert(vectors=vectors[i : i + 100])

Step 5: Retrieve and Generate

With chunks stored, you can now query the pipeline. Embed the user's question, retrieve the most relevant chunks, and pass them to an LLM for answer generation.

def retrieve(query: str, collection, top_k: int = 5) -> list[dict]:
    """Retrieve the most relevant chunks for a query."""
    query_embedding = embed_texts([query])[0]
    results = collection.query(
        query_embeddings=[query_embedding],
        n_results=top_k,
        include=["documents", "metadatas", "distances"],
    )
    return [
        {
            "text": doc,
            "source": meta.get("source", ""),
            "heading": meta.get("heading", ""),
            "score": 1 - dist,  # Convert distance to similarity
        }
        for doc, meta, dist in zip(
            results["documents"][0],
            results["metadatas"][0],
            results["distances"][0],
        )
    ]


def generate_answer(query: str, context_chunks: list[dict]) -> str:
    """Generate an answer grounded in the retrieved context."""
    context = "\n\n---\n\n".join(
        f"[Source: {c['source']}]\n{c['text']}" for c in context_chunks
    )

    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {
                "role": "system",
                "content": (
                    "You are a helpful assistant. Answer the user's question "
                    "based only on the provided context. If the context does not "
                    "contain enough information, say so. Cite sources when possible."
                ),
            },
            {
                "role": "user",
                "content": f"Context:\n{context}\n\nQuestion: {query}",
            },
        ],
        temperature=0,
    )
    return response.choices[0].message.content

Querying the Pipeline

query = "What are the key differences between GPT-4 and GPT-4o?"
chunks = retrieve(query, collection, top_k=5)
answer = generate_answer(query, chunks)
print(answer)

Handling Different Source Types

One of the reasons clean ingestion matters is that real-world RAG pipelines ingest documents from many sources. Each type has its own extraction challenges.

Webpages and SPAs

Standard HTML fetching fails on JavaScript-rendered pages. The Drive AI Markdown API renders JavaScript before extraction, so React apps, Next.js pages, and dynamically loaded content all get captured correctly.

# All of these work identically
markdown = ingest_url("https://react-docs-site.com/getting-started")
markdown = ingest_url("https://company.notion.site/internal-docs")
markdown = ingest_url("https://medium.com/@author/article-title")

PDFs (Native and Scanned)

Native PDFs (those with selectable text) are extracted with layout analysis that preserves tables, columns, and reading order. Scanned PDFs are processed with OCR and then proofread by a vision model to correct OCR errors.

# Native PDF
markdown = ingest_url("https://example.com/annual-report-2025.pdf")

# Scanned document — OCR happens automatically
markdown = ingest_url("https://example.com/scanned-contract.pdf")

Office Documents

Word documents, Excel spreadsheets, and PowerPoint files are all supported. Tables in Excel are converted to markdown tables. Slide content from PowerPoint is extracted in reading order.

markdown = ingest_url("https://example.com/quarterly-review.docx")
markdown = ingest_url("https://example.com/financial-data.xlsx")
markdown = ingest_url("https://example.com/pitch-deck.pptx")

Using the Extract API for Structured Data

When you need structured fields rather than full markdown (for example, pulling specific metadata from invoices or contracts), use the Drive AI Extract API:

import requests

def extract_structured(file_url: str, schema: dict) -> dict:
    """Extract structured data from a document using a schema."""
    response = requests.post(
        "https://dev.thedrive.ai/api/v1/extract",
        headers={
            "X-API-Key": DRIVE_AI_API_KEY,
            "Content-Type": "application/json",
        },
        json={
            "url": file_url,
            "schema": schema,
        },
    )
    response.raise_for_status()
    return response.json()

# Example: extract invoice fields
invoice_data = extract_structured(
    "https://example.com/invoice.pdf",
    schema={
        "vendor_name": "string",
        "invoice_number": "string",
        "total_amount": "number",
        "line_items": [{"description": "string", "amount": "number"}],
    },
)

Production Considerations

A tutorial pipeline is not a production pipeline. Here are the things you need to add before deploying.

Caching Ingested Documents

Do not re-ingest documents that have not changed. Hash the source URL or file content and check the cache before calling the API.

import hashlib

ingestion_cache: dict[str, str] = {}

def ingest_with_cache(url: str) -> str:
    """Ingest a URL, using a cache to avoid redundant API calls."""
    cache_key = hashlib.sha256(url.encode()).hexdigest()
    if cache_key in ingestion_cache:
        return ingestion_cache[cache_key]

    markdown = ingest_url(url)
    ingestion_cache[cache_key] = markdown
    return markdown

In production, replace the in-memory dict with Redis or a database.

Error Handling and Retries

Network requests fail. Wrap ingestion calls with retry logic.

import time

def ingest_with_retry(url: str, max_retries: int = 3) -> str:
    """Ingest a URL with exponential backoff on failure."""
    for attempt in range(max_retries):
        try:
            return ingest_url(url)
        except requests.exceptions.RequestException as e:
            if attempt == max_retries - 1:
                raise
            wait = 2 ** attempt
            print(f"Retry {attempt + 1}/{max_retries} for {url} in {wait}s: {e}")
            time.sleep(wait)

Monitoring Retrieval Quality

Log your retrieval scores. If the top chunk's similarity score drops below a threshold, the answer is likely unreliable. Return a fallback message instead of a hallucinated answer.

def retrieve_with_threshold(
    query: str,
    collection,
    top_k: int = 5,
    min_score: float = 0.3,
) -> list[dict]:
    """Retrieve chunks, filtering out low-confidence results."""
    chunks = retrieve(query, collection, top_k)
    filtered = [c for c in chunks if c["score"] >= min_score]
    if not filtered:
        print(f"Warning: no chunks above threshold {min_score} for query: {query}")
    return filtered

Incremental Updates

When source documents change, you need to update the vector store without re-processing everything. Track which chunks came from which source, delete stale chunks, and insert new ones.

def update_source(url: str, collection):
    """Re-ingest a source and replace its chunks in the vector store."""
    # Delete existing chunks from this source
    existing = collection.get(where={"source": url})
    if existing["ids"]:
        collection.delete(ids=existing["ids"])

    # Re-ingest, chunk, embed, and store
    markdown = ingest_url(url)
    chunks = chunk_by_headings(markdown, source_url=url)
    chunks = embed_chunks(chunks)
    store_chunks(chunks, collection)

Full Working Example

Here is the complete pipeline in a single script. Copy this, set your API keys, and run it.

"""
RAG Pipeline with Clean Document Ingestion
Requires: pip install thedriveai openai chromadb
"""

import hashlib
import os
import re
import uuid
from concurrent.futures import ThreadPoolExecutor, as_completed

import chromadb
import requests
from openai import OpenAI

# --- Configuration ---
DRIVE_AI_API_KEY = os.environ.get("DRIVE_AI_API_KEY", "tda_live_...")
openai_client = OpenAI()


# --- Step 1: Ingest ---
def ingest_url(url: str) -> str:
    response = requests.get(
        f"https://dev.thedrive.ai/md/{url}",
        headers={"X-API-Key": DRIVE_AI_API_KEY},
    )
    response.raise_for_status()
    return response.text


def ingest_batch(urls: list[str], max_workers: int = 5) -> dict[str, str]:
    results = {}
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_url = {executor.submit(ingest_url, url): url for url in urls}
        for future in as_completed(future_to_url):
            url = future_to_url[future]
            try:
                results[url] = future.result()
            except Exception as e:
                print(f"Failed to ingest {url}: {e}")
    return results


# --- Step 2: Chunk ---
def chunk_fixed_size(text: str, max_size: int = 1000, overlap: int = 200) -> list[str]:
    chunks = []
    start = 0
    while start < len(text):
        end = start + max_size
        if end < len(text):
            for sep in ["\n\n", "\n", ". ", " "]:
                last_sep = text.rfind(sep, start, end)
                if last_sep > start:
                    end = last_sep + len(sep)
                    break
        chunks.append(text[start:end].strip())
        start = end - overlap
    return chunks


def chunk_by_headings(
    markdown: str, max_chunk_size: int = 1500, source_url: str = ""
) -> list[dict]:
    sections = re.split(r"(?=^#{1,3} )", markdown, flags=re.MULTILINE)
    chunks = []
    for section in sections:
        section = section.strip()
        if not section:
            continue
        heading_match = re.match(r"^(#{1,3}) (.+)", section)
        heading = heading_match.group(2) if heading_match else ""
        if len(section) <= max_chunk_size:
            chunks.append(
                {"text": section, "heading": heading, "source": source_url}
            )
        else:
            sub_chunks = chunk_fixed_size(section, max_chunk_size, overlap=200)
            for i, sub in enumerate(sub_chunks):
                chunks.append(
                    {
                        "text": sub,
                        "heading": f"{heading} (part {i + 1})",
                        "source": source_url,
                    }
                )
    return chunks


# --- Step 3: Embed ---
def embed_texts(
    texts: list[str], model: str = "text-embedding-3-small"
) -> list[list[float]]:
    response = openai_client.embeddings.create(input=texts, model=model)
    return [item.embedding for item in response.data]


def embed_chunks(chunks: list[dict], batch_size: int = 100) -> list[dict]:
    for i in range(0, len(chunks), batch_size):
        batch = chunks[i : i + batch_size]
        texts = [c["text"] for c in batch]
        embeddings = embed_texts(texts)
        for chunk, embedding in zip(batch, embeddings):
            chunk["embedding"] = embedding
    return chunks


# --- Step 4: Store ---
def get_collection():
    chroma_client = chromadb.PersistentClient(path="./chroma_db")
    return chroma_client.get_or_create_collection(
        name="documents", metadata={"hnsw:space": "cosine"}
    )


def store_chunks(chunks: list[dict], collection):
    ids = [str(uuid.uuid4()) for _ in chunks]
    documents = [c["text"] for c in chunks]
    embeddings = [c["embedding"] for c in chunks]
    metadatas = [
        {"source": c.get("source", ""), "heading": c.get("heading", "")}
        for c in chunks
    ]
    collection.upsert(
        ids=ids,
        documents=documents,
        embeddings=embeddings,
        metadatas=metadatas,
    )
    print(f"Stored {len(chunks)} chunks.")


# --- Step 5: Retrieve & Generate ---
def retrieve(query: str, collection, top_k: int = 5) -> list[dict]:
    query_embedding = embed_texts([query])[0]
    results = collection.query(
        query_embeddings=[query_embedding],
        n_results=top_k,
        include=["documents", "metadatas", "distances"],
    )
    return [
        {
            "text": doc,
            "source": meta.get("source", ""),
            "heading": meta.get("heading", ""),
            "score": 1 - dist,
        }
        for doc, meta, dist in zip(
            results["documents"][0],
            results["metadatas"][0],
            results["distances"][0],
        )
    ]


def generate_answer(query: str, context_chunks: list[dict]) -> str:
    context = "\n\n---\n\n".join(
        f"[Source: {c['source']}]\n{c['text']}" for c in context_chunks
    )
    response = openai_client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {
                "role": "system",
                "content": (
                    "Answer the user's question based only on the provided context. "
                    "If the context does not contain enough information, say so."
                ),
            },
            {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {query}"},
        ],
        temperature=0,
    )
    return response.choices[0].message.content


# --- Run the Pipeline ---
if __name__ == "__main__":
    # 1. Ingest documents
    urls = [
        "https://openai.com/index/gpt-4o",
        "https://docs.anthropic.com/en/docs/about-claude/models",
    ]
    documents = ingest_batch(urls)
    print(f"Ingested {len(documents)} documents.")

    # 2. Chunk all documents
    all_chunks = []
    for url, markdown in documents.items():
        chunks = chunk_by_headings(markdown, source_url=url)
        all_chunks.extend(chunks)
    print(f"Created {len(all_chunks)} chunks.")

    # 3. Embed
    all_chunks = embed_chunks(all_chunks)
    print("Embeddings generated.")

    # 4. Store
    collection = get_collection()
    store_chunks(all_chunks, collection)

    # 5. Query
    query = "What are the key capabilities of GPT-4o?"
    results = retrieve(query, collection)
    answer = generate_answer(query, results)
    print(f"\nQuery: {query}")
    print(f"Answer: {answer}")

Save this as rag_pipeline.py, set your OPENAI_API_KEY and DRIVE_AI_API_KEY environment variables, and run it:

export OPENAI_API_KEY="sk-..."
export DRIVE_AI_API_KEY="tda_live_..."
python rag_pipeline.py

The pipeline ingests the specified URLs, chunks the content, generates embeddings, stores them in a local ChromaDB instance, and answers queries grounded in the ingested documents.

What to Build Next

This pipeline covers the core loop. From here, you can extend it with:

  • LangChain or LlamaIndex integration. Use Drive AI as the document loader within your existing framework. The clean markdown output plugs directly into Document objects.
  • Metadata filtering. Add fields like document_type, date_ingested, or department to your chunk metadata and filter at query time.
  • Hybrid search. Combine vector similarity with keyword search (BM25) for better retrieval on exact-match queries.
  • Reranking. Add a cross-encoder reranker (Cohere Rerank, cross-encoder/ms-marco-MiniLM-L-6-v2) between retrieval and generation to improve precision.
  • Evaluation. Use frameworks like Ragas or DeepEval to measure retrieval accuracy, answer faithfulness, and context relevance.

The ingestion layer is the foundation. Get clean data in, and every downstream component performs better.

Have questions? Reach out at contact@thedrive.ai.

Share it with your network