Skip to content
Published on

Document Parsing Technology Guide: PDF Parsing, OCR, Layout Analysis, and LLM-Based Extraction Pipeline

Authors
  • Name
    Twitter

Document Parsing Technology Guide

Introduction

The vast majority of enterprise knowledge resides in unstructured documents: PDF reports, scanned contracts, research papers, invoices, and medical records. According to McKinsey, approximately 80% of enterprise data exists in such unstructured formats, and failing to leverage this data effectively means leaving most of your data assets untapped.

The quality of RAG (Retrieval-Augmented Generation) systems, knowledge search engines, and document automation systems ultimately depends on how accurately input documents are parsed. The principle of "Garbage in, garbage out" applies more than ever in the field of Document Parsing.

This article systematically covers the entire Document Parsing process with practical code: PDF parsing library comparisons, OCR engine selection criteria, layout analysis models, table extraction techniques, LLM-based multimodal document understanding, chunking strategies for RAG optimization, and production pipeline construction.

Overview of Document Parsing

Why Document Parsing Matters

Document Parsing is the technology of extracting structured information from unstructured documents. Beyond simple text extraction, the key is understanding the logical structure of documents (headings, body text, tables, figure captions, etc.) and organizing information into meaningful units.

Here are the major scenarios where Document Parsing is essential:

ScenarioDescriptionKey Technology
RAG PipelineSplit documents into chunks and store in vector DBChunking, Embeddings
Knowledge Base ConstructionExtract structured knowledge from internal documentsNER, Relation Extraction
Document AutomationExtract key fields from invoices and contractsTemplate Matching, Key-Value Extraction
Regulatory ComplianceAutomatically track changes in regulatory documentsChange Detection, Comparative Analysis
Research Paper AnalysisExtract methodology, results, and citations from papersSection Classification, Metadata Extraction

Document Parsing Pipeline Architecture

A typical Document Parsing pipeline consists of the following stages:

  1. Document Ingestion: Input documents in various formats (PDF, images, Word, HTML)
  2. Preprocessing: Image correction, noise removal, page separation
  3. Text Extraction: Native PDF text extraction or OCR
  4. Layout Analysis: Document structure recognition (headings, body, tables, figures)
  5. Structured Extraction: Table parsing, key-value pair extraction, NER
  6. Post-processing: Text cleaning, chunking, metadata attachment
  7. Storage/Indexing: Store in vector DB or search engine
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional

class DocumentType(Enum):
    NATIVE_PDF = "native_pdf"      # PDF with text layer
    SCANNED_PDF = "scanned_pdf"    # Scanned image PDF
    IMAGE = "image"                # JPG, PNG, etc.
    MIXED_PDF = "mixed_pdf"        # Native + scanned hybrid

@dataclass
class ParsedDocument:
    text: str
    pages: list = field(default_factory=list)
    tables: list = field(default_factory=list)
    images: list = field(default_factory=list)
    metadata: dict = field(default_factory=dict)
    doc_type: Optional[DocumentType] = None

    def get_chunks(self, strategy: str = "recursive", chunk_size: int = 1000):
        """Chunk document using specified strategy"""
        if strategy == "recursive":
            return self._recursive_chunk(chunk_size)
        elif strategy == "semantic":
            return self._semantic_chunk(chunk_size)
        elif strategy == "structure":
            return self._structure_based_chunk()
        return []

    def _recursive_chunk(self, chunk_size: int):
        separators = ["\n\n", "\n", ". ", " "]
        return self._split_text(self.text, separators, chunk_size)

    def _split_text(self, text: str, separators: list, chunk_size: int):
        chunks = []
        if len(text) <= chunk_size:
            return [text]
        sep = separators[0] if separators else " "
        parts = text.split(sep)
        current = ""
        for part in parts:
            if len(current) + len(part) + len(sep) > chunk_size:
                if current:
                    chunks.append(current.strip())
                current = part
            else:
                current = current + sep + part if current else part
        if current:
            chunks.append(current.strip())
        return chunks

    def _semantic_chunk(self, chunk_size: int):
        # Semantic chunking implementation (embedding-based)
        return self._recursive_chunk(chunk_size)

    def _structure_based_chunk(self):
        # Document structure-based chunking
        return [page.get("text", "") for page in self.pages if page.get("text")]

PDF Parsing Technologies and Tools

PDF is the most widely used document format, yet it is also the most challenging format to parse. Since PDF is fundamentally a "print layout" format, the logical order of text is not guaranteed in the file structure.

Comparison of Major PDF Parsing Libraries

LibraryStrengthsWeaknessesBest For
PyMuPDF (fitz)Fast speed, rich features, image extractionLicense (AGPL)General PDF processing
pdfplumberAccurate table extraction, coordinate-based accessSlower speedTable-heavy documents
PyPDF2Pure Python, easy installationInaccurate with complex PDFsSimple text extraction
CamelotDedicated table extractionCannot process full PDFsWhen only tables are needed
pdfminer.sixDetailed layout informationComplex APIWhen layout analysis is needed

PDF Parsing with PyMuPDF

import fitz  # PyMuPDF

class PyMuPDFParser:
    """PyMuPDF-based PDF parser"""

    def __init__(self, pdf_path: str):
        self.doc = fitz.open(pdf_path)
        self.pages = []

    def extract_text_with_layout(self) -> list:
        """Extract text per page with layout information"""
        results = []
        for page_num, page in enumerate(self.doc):
            blocks = page.get_text("dict")["blocks"]
            page_data = {
                "page_num": page_num + 1,
                "width": page.rect.width,
                "height": page.rect.height,
                "blocks": []
            }
            for block in blocks:
                if block["type"] == 0:  # Text block
                    text_content = ""
                    for line in block["lines"]:
                        line_text = ""
                        for span in line["spans"]:
                            line_text += span["text"]
                        text_content += line_text + "\n"
                    page_data["blocks"].append({
                        "type": "text",
                        "bbox": block["bbox"],
                        "text": text_content.strip(),
                        "font_size": block["lines"][0]["spans"][0]["size"]
                        if block["lines"] and block["lines"][0]["spans"] else 0
                    })
                elif block["type"] == 1:  # Image block
                    page_data["blocks"].append({
                        "type": "image",
                        "bbox": block["bbox"],
                        "image_data": block.get("image", None)
                    })
            results.append(page_data)
        return results

    def extract_images(self, output_dir: str) -> list:
        """Extract all images from the PDF"""
        import os
        os.makedirs(output_dir, exist_ok=True)
        image_paths = []
        for page_num, page in enumerate(self.doc):
            images = page.get_images(full=True)
            for img_idx, img in enumerate(images):
                xref = img[0]
                pix = fitz.Pixmap(self.doc, xref)
                if pix.n < 5:  # GRAY or RGB
                    img_path = os.path.join(
                        output_dir,
                        f"page_{page_num + 1}_img_{img_idx + 1}.png"
                    )
                    pix.save(img_path)
                    image_paths.append(img_path)
                pix = None
        return image_paths

    def detect_document_type(self) -> DocumentType:
        """Determine whether PDF is native or scanned"""
        total_text_len = 0
        total_images = 0
        for page in self.doc:
            total_text_len += len(page.get_text())
            total_images += len(page.get_images())
        if total_text_len < 100 and total_images > 0:
            return DocumentType.SCANNED_PDF
        elif total_text_len > 100 and total_images > len(self.doc) * 0.5:
            return DocumentType.MIXED_PDF
        return DocumentType.NATIVE_PDF

    def close(self):
        self.doc.close()

Precision Parsing with pdfplumber

pdfplumber excels particularly in table extraction and provides precise coordinate information for each character.

import pdfplumber

class PdfPlumberParser:
    """pdfplumber-based precision PDF parser"""

    def __init__(self, pdf_path: str):
        self.pdf = pdfplumber.open(pdf_path)

    def extract_tables(self) -> list:
        """Extract tables from all pages"""
        all_tables = []
        for page_num, page in enumerate(self.pdf.pages):
            tables = page.extract_tables(
                table_settings={
                    "vertical_strategy": "lines",
                    "horizontal_strategy": "lines",
                    "snap_tolerance": 3,
                    "join_tolerance": 3,
                    "edge_min_length": 3,
                    "min_words_vertical": 3,
                    "min_words_horizontal": 1,
                }
            )
            for table_idx, table in enumerate(tables):
                if table and len(table) > 1:
                    headers = table[0]
                    rows = table[1:]
                    all_tables.append({
                        "page": page_num + 1,
                        "table_index": table_idx,
                        "headers": headers,
                        "rows": rows,
                        "num_rows": len(rows),
                        "num_cols": len(headers) if headers else 0
                    })
        return all_tables

    def extract_text_outside_tables(self) -> str:
        """Extract only text outside table regions"""
        full_text = []
        for page in self.pdf.pages:
            # Collect table bounding boxes
            table_bboxes = []
            tables = page.find_tables()
            for table in tables:
                table_bboxes.append(table.bbox)
            # Crop and exclude table regions
            filtered_page = page
            for bbox in table_bboxes:
                filtered_page = filtered_page.outside_bbox(bbox)
            text = filtered_page.extract_text()
            if text:
                full_text.append(text)
        return "\n\n".join(full_text)

    def close(self):
        self.pdf.close()

OCR-Based Document Recognition

OCR (Optical Character Recognition) is essential for processing scanned documents and image-based PDFs. OCR technology is rapidly evolving from traditional rule-based approaches to deep learning-based methods.

Comparison of Major OCR Engines

EngineLanguagesAccuracySpeedFeatures
Tesseract 5100+Medium-HighMediumOpen source, most widely used
EasyOCR80+Medium-HighSlowPyTorch-based, easy installation
PaddleOCR80+HighFastDeveloped by Baidu, high accuracy
Google Vision API100+HighestFastCloud service, paid
Azure Document Intelligence100+HighestFastEnterprise, structured extraction support

Using Tesseract OCR

import pytesseract
from PIL import Image
import cv2
import numpy as np

class TesseractOCR:
    """Tesseract-based OCR processor"""

    def __init__(self, lang: str = "eng"):
        self.lang = lang
        self.config = "--oem 3 --psm 6"  # LSTM engine + uniform text block

    def preprocess_image(self, image_path: str) -> np.ndarray:
        """Image preprocessing to improve OCR accuracy"""
        img = cv2.imread(image_path)
        # Convert to grayscale
        gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        # Noise removal
        denoised = cv2.fastNlMeansDenoising(gray, h=10)
        # Binarization (Otsu's method)
        _, binary = cv2.threshold(
            denoised, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU
        )
        # Deskewing
        coords = np.column_stack(np.where(binary > 0))
        if len(coords) > 0:
            angle = cv2.minAreaRect(coords)[-1]
            if angle < -45:
                angle = -(90 + angle)
            else:
                angle = -angle
            if abs(angle) > 0.5:
                h, w = binary.shape
                center = (w // 2, h // 2)
                matrix = cv2.getRotationMatrix2D(center, angle, 1.0)
                binary = cv2.warpAffine(
                    binary, matrix, (w, h),
                    flags=cv2.INTER_CUBIC,
                    borderMode=cv2.BORDER_REPLICATE
                )
        return binary

    def extract_text(self, image_path: str, preprocess: bool = True) -> str:
        """Extract text from image"""
        if preprocess:
            img = self.preprocess_image(image_path)
        else:
            img = Image.open(image_path)
        text = pytesseract.image_to_string(
            img, lang=self.lang, config=self.config
        )
        return text.strip()

    def extract_with_boxes(self, image_path: str) -> list:
        """Extract text with bounding boxes"""
        img = self.preprocess_image(image_path)
        data = pytesseract.image_to_data(
            img, lang=self.lang, config=self.config,
            output_type=pytesseract.Output.DICT
        )
        results = []
        for i in range(len(data["text"])):
            if data["text"][i].strip():
                results.append({
                    "text": data["text"][i],
                    "confidence": data["conf"][i],
                    "bbox": {
                        "x": data["left"][i],
                        "y": data["top"][i],
                        "w": data["width"][i],
                        "h": data["height"][i]
                    },
                    "block_num": data["block_num"][i],
                    "line_num": data["line_num"][i]
                })
        return results

High-Accuracy OCR with PaddleOCR

PaddleOCR demonstrates particularly high accuracy with Asian languages (Korean, Japanese, Chinese).

from paddleocr import PaddleOCR

class PaddleOCRProcessor:
    """PaddleOCR-based high-accuracy OCR"""

    def __init__(self, lang: str = "en"):
        self.ocr = PaddleOCR(
            use_angle_cls=True,  # Text direction detection
            lang=lang,
            use_gpu=True,
            det_db_thresh=0.3,
            det_db_box_thresh=0.5,
            rec_batch_num=16
        )

    def process_image(self, image_path: str) -> dict:
        """Extract text and layout information from image"""
        result = self.ocr.ocr(image_path, cls=True)
        extracted = {
            "lines": [],
            "full_text": "",
            "confidence_avg": 0.0
        }
        if not result or not result[0]:
            return extracted

        total_conf = 0
        lines = []
        for line in result[0]:
            bbox = line[0]  # 4-point coordinates
            text = line[1][0]
            confidence = line[1][1]
            lines.append({
                "text": text,
                "confidence": confidence,
                "bbox": bbox,
                "y_center": (bbox[0][1] + bbox[2][1]) / 2
            })
            total_conf += confidence

        # Sort by y-coordinate (reading order)
        lines.sort(key=lambda x: (x["y_center"], x["bbox"][0][0]))
        extracted["lines"] = lines
        extracted["full_text"] = "\n".join(l["text"] for l in lines)
        extracted["confidence_avg"] = (
            total_conf / len(lines) if lines else 0
        )
        return extracted

    def process_pdf(self, pdf_path: str) -> list:
        """OCR process all pages of a PDF"""
        import fitz
        doc = fitz.open(pdf_path)
        results = []
        for page_num, page in enumerate(doc):
            # Convert page to high-resolution image
            mat = fitz.Matrix(2.0, 2.0)  # 2x scale
            pix = page.get_pixmap(matrix=mat)
            img_path = f"/tmp/page_{page_num}.png"
            pix.save(img_path)
            # Run OCR
            ocr_result = self.process_image(img_path)
            ocr_result["page_num"] = page_num + 1
            results.append(ocr_result)
        doc.close()
        return results

Layout Analysis and Structure Extraction

Layout analysis is the process of identifying regions such as text blocks, headings, tables, figures, and captions in documents, and determining the logical reading order. Recently, transformer-based models have been leading this field.

Major Layout Analysis Models

ModelDeveloperCore TechnologyFeatures
LayoutLMv3MicrosoftMultimodal TransformerUnified text+image+layout
DiT (Document Image Transformer)MicrosoftVision TransformerImage-based document understanding
DonutNAVER CLOVAOCR-free approachDirect document understanding without OCR
Table TransformerMicrosoftDETR-basedSpecialized in table detection/structure recognition
UnstructuredOpen SourceHybridMulti-model combination pipeline

Document Structure Analysis with LayoutLMv3

from transformers import (
    LayoutLMv3ForTokenClassification,
    LayoutLMv3Processor,
)
from PIL import Image
import torch

class LayoutAnalyzer:
    """LayoutLMv3-based document layout analyzer"""

    LABEL_MAP = {
        0: "O",
        1: "B-TITLE",
        2: "I-TITLE",
        3: "B-TEXT",
        4: "I-TEXT",
        5: "B-TABLE",
        6: "I-TABLE",
        7: "B-FIGURE",
        8: "I-FIGURE",
        9: "B-LIST",
        10: "I-LIST",
        11: "B-HEADER",
        12: "I-HEADER",
        13: "B-FOOTER",
        14: "I-FOOTER",
    }

    def __init__(self, model_name: str = "microsoft/layoutlmv3-base"):
        self.processor = LayoutLMv3Processor.from_pretrained(
            model_name, apply_ocr=True
        )
        self.model = LayoutLMv3ForTokenClassification.from_pretrained(
            model_name, num_labels=len(self.LABEL_MAP)
        )
        self.model.eval()

    def analyze(self, image_path: str) -> list:
        """Analyze document image layout"""
        image = Image.open(image_path).convert("RGB")
        encoding = self.processor(
            image,
            return_tensors="pt",
            truncation=True,
            max_length=512
        )
        with torch.no_grad():
            outputs = self.model(**encoding)
        predictions = outputs.logits.argmax(-1).squeeze().tolist()
        tokens = self.processor.tokenizer.convert_ids_to_tokens(
            encoding["input_ids"].squeeze()
        )
        # Map predictions to tokens
        elements = []
        current_label = None
        current_text = ""
        for token, pred in zip(tokens, predictions):
            label = self.LABEL_MAP.get(pred, "O")
            if label.startswith("B-"):
                if current_text and current_label:
                    elements.append({
                        "type": current_label,
                        "text": current_text.strip()
                    })
                current_label = label[2:]
                current_text = token.replace("##", "")
            elif label.startswith("I-") and current_label:
                current_text += token.replace("##", "")
            else:
                if current_text and current_label:
                    elements.append({
                        "type": current_label,
                        "text": current_text.strip()
                    })
                current_label = None
                current_text = ""
        if current_text and current_label:
            elements.append({
                "type": current_label,
                "text": current_text.strip()
            })
        return elements

Unified Parsing with Unstructured

Unstructured is an open-source library that supports various document formats and provides an integrated pipeline combining multiple parsing engines.

from unstructured.partition.pdf import partition_pdf
from unstructured.partition.auto import partition
from unstructured.chunking.title import chunk_by_title

class UnstructuredParser:
    """Unstructured-based unified document parser"""

    def __init__(self, strategy: str = "hi_res"):
        self.strategy = strategy  # "fast", "ocr_only", "hi_res"

    def parse_pdf(self, pdf_path: str) -> list:
        """Parse PDF into structured elements"""
        elements = partition_pdf(
            filename=pdf_path,
            strategy=self.strategy,
            infer_table_structure=True,
            languages=["eng"],
            extract_images_in_pdf=True,
            extract_image_block_output_dir="./extracted_images"
        )
        parsed = []
        for element in elements:
            parsed.append({
                "type": type(element).__name__,
                "text": str(element),
                "metadata": {
                    "page_number": element.metadata.page_number,
                    "coordinates": (
                        element.metadata.coordinates
                        if hasattr(element.metadata, "coordinates")
                        else None
                    ),
                    "parent_id": element.metadata.parent_id,
                }
            })
        return parsed

    def parse_and_chunk(
        self, file_path: str, max_characters: int = 1000
    ) -> list:
        """Parse and chunk by title in one step"""
        elements = partition(
            filename=file_path,
            strategy=self.strategy
        )
        chunks = chunk_by_title(
            elements,
            max_characters=max_characters,
            combine_text_under_n_chars=200,
            new_after_n_chars=800
        )
        return [
            {
                "text": str(chunk),
                "type": type(chunk).__name__,
                "metadata": chunk.metadata.to_dict()
            }
            for chunk in chunks
        ]

Table Extraction Techniques

Accurately extracting tables from documents is one of the most challenging tasks in Document Parsing. Tables can have complex cell merging, nested structures, and diverse styles.

Key Challenges in Table Extraction

  • Table Detection: Accurately identifying table regions in documents
  • Structure Recognition: Recognizing row/column structure, merged cells, and header rows
  • Cell Content Extraction: Accurately extracting text from each cell
  • Borderless Tables: Recognizing the structure of tables without visible lines

Table Detection with Table Transformer

from transformers import (
    TableTransformerForObjectDetection,
    AutoImageProcessor,
)
from PIL import Image
import torch

class TableExtractor:
    """Table Transformer-based table extractor"""

    def __init__(self):
        self.processor = AutoImageProcessor.from_pretrained(
            "microsoft/table-transformer-detection"
        )
        self.detection_model = TableTransformerForObjectDetection.from_pretrained(
            "microsoft/table-transformer-detection"
        )
        self.structure_processor = AutoImageProcessor.from_pretrained(
            "microsoft/table-transformer-structure-recognition"
        )
        self.structure_model = TableTransformerForObjectDetection.from_pretrained(
            "microsoft/table-transformer-structure-recognition"
        )

    def detect_tables(self, image_path: str, threshold: float = 0.7) -> list:
        """Detect table regions in an image"""
        image = Image.open(image_path).convert("RGB")
        inputs = self.processor(images=image, return_tensors="pt")
        with torch.no_grad():
            outputs = self.detection_model(**inputs)
        target_sizes = torch.tensor([image.size[::-1]])
        results = self.processor.post_process_object_detection(
            outputs, threshold=threshold, target_sizes=target_sizes
        )[0]
        tables = []
        for score, label, box in zip(
            results["scores"], results["labels"], results["boxes"]
        ):
            tables.append({
                "score": score.item(),
                "label": self.detection_model.config.id2label[label.item()],
                "bbox": box.tolist()  # [x1, y1, x2, y2]
            })
        return tables

    def recognize_structure(
        self, image_path: str, table_bbox: list
    ) -> dict:
        """Recognize internal structure of detected table"""
        image = Image.open(image_path).convert("RGB")
        # Crop table region
        table_image = image.crop(table_bbox)
        inputs = self.structure_processor(
            images=table_image, return_tensors="pt"
        )
        with torch.no_grad():
            outputs = self.structure_model(**inputs)
        target_sizes = torch.tensor([table_image.size[::-1]])
        results = self.structure_processor.post_process_object_detection(
            outputs, threshold=0.5, target_sizes=target_sizes
        )[0]
        structure = {"rows": [], "columns": [], "cells": []}
        for score, label, box in zip(
            results["scores"], results["labels"], results["boxes"]
        ):
            label_name = self.structure_model.config.id2label[label.item()]
            entry = {"bbox": box.tolist(), "score": score.item()}
            if "row" in label_name:
                structure["rows"].append(entry)
            elif "column" in label_name:
                structure["columns"].append(entry)
            else:
                structure["cells"].append(entry)
        # Sort by coordinates
        structure["rows"].sort(key=lambda x: x["bbox"][1])
        structure["columns"].sort(key=lambda x: x["bbox"][0])
        return structure

Simple Table Extraction with Camelot

import camelot
import pandas as pd

def extract_tables_with_camelot(
    pdf_path: str, pages: str = "all", flavor: str = "lattice"
) -> list:
    """Extract tables from PDF using Camelot

    Args:
        pdf_path: Path to PDF file
        pages: Pages to extract ("all" or "1,2,3")
        flavor: "lattice" (line-based) or "stream" (whitespace-based)
    """
    tables = camelot.read_pdf(
        pdf_path,
        pages=pages,
        flavor=flavor,
        strip_text="\n"
    )
    results = []
    for i, table in enumerate(tables):
        df = table.df
        results.append({
            "table_index": i,
            "page": table.page,
            "accuracy": table.accuracy,
            "data": df.to_dict(orient="records"),
            "shape": df.shape,
            "dataframe": df
        })
    return results


# Usage example
if __name__ == "__main__":
    tables = extract_tables_with_camelot(
        "financial_report.pdf",
        pages="1-5",
        flavor="lattice"
    )
    for t in tables:
        print(f"Table {t['table_index']} (page {t['page']})")
        print(f"  Accuracy: {t['accuracy']:.1f}%")
        print(f"  Shape: {t['shape']}")
        print(t["dataframe"].head())

LLM-Based Document Understanding

The emergence of multimodal LLMs like GPT-4V and Claude 3.5 is fundamentally transforming document understanding approaches. Instead of traditional OCR + post-processing pipelines, document images can be directly fed to LLMs for content understanding and structuring.

Document Processing with Multimodal LLMs

import anthropic
import base64
from pathlib import Path

class LLMDocumentProcessor:
    """LLM-based multimodal document processor"""

    def __init__(self, model: str = "claude-sonnet-4-20250514"):
        self.client = anthropic.Anthropic()
        self.model = model

    def _encode_image(self, image_path: str) -> tuple:
        """Encode image to base64"""
        path = Path(image_path)
        suffix = path.suffix.lower()
        media_type_map = {
            ".png": "image/png",
            ".jpg": "image/jpeg",
            ".jpeg": "image/jpeg",
            ".gif": "image/gif",
            ".webp": "image/webp",
        }
        media_type = media_type_map.get(suffix, "image/png")
        with open(image_path, "rb") as f:
            data = base64.standard_b64encode(f.read()).decode("utf-8")
        return data, media_type

    def extract_structured_data(
        self, image_path: str, schema_description: str
    ) -> str:
        """Extract structured data from document image"""
        data, media_type = self._encode_image(image_path)
        prompt = f"""Analyze this document image and extract structured data as JSON matching the following schema.

Schema:
{schema_description}

Instructions:
- Extract all text accurately
- Preserve row/column structure for tables
- Mark uncertain content as null
- Respond only in JSON format"""

        response = self.client.messages.create(
            model=self.model,
            max_tokens=4096,
            messages=[
                {
                    "role": "user",
                    "content": [
                        {
                            "type": "image",
                            "source": {
                                "type": "base64",
                                "media_type": media_type,
                                "data": data,
                            },
                        },
                        {
                            "type": "text",
                            "text": prompt,
                        },
                    ],
                }
            ],
        )
        return response.content[0].text

    def analyze_document_layout(self, image_path: str) -> str:
        """Analyze document layout and extract structure"""
        data, media_type = self._encode_image(image_path)
        prompt = """Analyze the layout of this document and return the following information as JSON:

1. Document type (paper, report, invoice, contract, etc.)
2. Section structure (titles and hierarchy)
3. Table presence and location description
4. Figure/chart presence and description
5. Key-value pairs (if applicable)
6. Reading order of the full text

Respond only in JSON format."""

        response = self.client.messages.create(
            model=self.model,
            max_tokens=4096,
            messages=[
                {
                    "role": "user",
                    "content": [
                        {
                            "type": "image",
                            "source": {
                                "type": "base64",
                                "media_type": media_type,
                                "data": data,
                            },
                        },
                        {"type": "text", "text": prompt},
                    ],
                }
            ],
        )
        return response.content[0].text

    def compare_documents(
        self, image_path_1: str, image_path_2: str
    ) -> str:
        """Compare and analyze two document images"""
        data1, mt1 = self._encode_image(image_path_1)
        data2, mt2 = self._encode_image(image_path_2)
        prompt = """Compare these two documents and analyze the following:
1. Similarities
2. Differences
3. Added content
4. Removed content
5. Modified content

Respond in structured JSON format."""

        response = self.client.messages.create(
            model=self.model,
            max_tokens=4096,
            messages=[
                {
                    "role": "user",
                    "content": [
                        {
                            "type": "image",
                            "source": {
                                "type": "base64",
                                "media_type": mt1,
                                "data": data1,
                            },
                        },
                        {
                            "type": "image",
                            "source": {
                                "type": "base64",
                                "media_type": mt2,
                                "data": data2,
                            },
                        },
                        {"type": "text", "text": prompt},
                    ],
                }
            ],
        )
        return response.content[0].text

Hybrid Approach: OCR + LLM Correction

A hybrid approach that first extracts text via OCR and then uses an LLM to correct errors and organize structure delivers excellent results in practice.

class HybridDocumentProcessor:
    """OCR + LLM hybrid document processor"""

    def __init__(self):
        self.ocr = PaddleOCRProcessor(lang="en")
        self.llm = LLMDocumentProcessor()

    def process(self, image_path: str) -> dict:
        """Process document using hybrid approach"""
        # Step 1: Extract text with OCR
        ocr_result = self.ocr.process_image(image_path)
        raw_text = ocr_result["full_text"]
        confidence = ocr_result["confidence_avg"]

        # Step 2: Correct and structure with LLM
        correction_prompt = (
            "Review and correct the following OCR-extracted text. "
            "Fix low-confidence portions based on context, "
            "and organize the logical structure (headings, body, lists) "
            "into Markdown format.\n\n"
            f"OCR extracted text (average confidence: {confidence:.2f}):\n"
            "---\n"
            f"{raw_text}\n"
            "---\n\n"
            "Return the corrected Markdown."
        )

        import anthropic
        client = anthropic.Anthropic()
        response = client.messages.create(
            model="claude-sonnet-4-20250514",
            max_tokens=4096,
            messages=[{"role": "user", "content": correction_prompt}],
        )
        corrected_text = response.content[0].text

        return {
            "raw_ocr": raw_text,
            "ocr_confidence": confidence,
            "corrected_text": corrected_text,
            "method": "hybrid_ocr_llm"
        }

Document Chunking Strategies for RAG

When the ultimate goal of Document Parsing is a RAG pipeline, the chunking strategy is the key factor that determines retrieval quality. Improper chunking significantly degrades retrieval accuracy and causes hallucinations due to context loss.

Chunking Strategy Comparison

StrategyDescriptionProsCons
Fixed-sizeSplit by fixed token/character countSimple, uniform sizeContext disruption
RecursiveSplit by separator priorityPreserves structure, flexibleUneven sizes
SemanticSplit based on embedding similarityPreserves meaning unitsHigh compute cost
Structure-basedSplit by headings/sectionsMaintains logical structureRequires structure recognition
Sliding WindowSplit with overlapContext continuityIncreased storage

Advanced Chunking Implementation

from typing import Optional
import numpy as np

class AdvancedChunker:
    """Advanced chunker supporting multiple chunking strategies"""

    def __init__(self, embedding_model=None):
        self.embedding_model = embedding_model

    def fixed_size_chunk(
        self, text: str, chunk_size: int = 1000, overlap: int = 200
    ) -> list:
        """Fixed-size + overlap chunking"""
        chunks = []
        start = 0
        while start < len(text):
            end = start + chunk_size
            chunk = text[start:end]
            # Cut at sentence boundaries
            if end < len(text):
                last_period = chunk.rfind(". ")
                last_newline = chunk.rfind("\n")
                cut_point = max(last_period, last_newline)
                if cut_point > chunk_size * 0.5:
                    chunk = chunk[:cut_point + 1]
                    end = start + cut_point + 1
            chunks.append({
                "text": chunk.strip(),
                "start": start,
                "end": end,
                "index": len(chunks)
            })
            start = end - overlap
        return chunks

    def recursive_chunk(
        self,
        text: str,
        chunk_size: int = 1000,
        separators: Optional[list] = None,
    ) -> list:
        """Recursive splitting chunking"""
        if separators is None:
            separators = ["\n\n\n", "\n\n", "\n", ". ", ", ", " "]
        chunks = []
        self._recursive_split(text, separators, chunk_size, chunks)
        return [
            {"text": c, "index": i}
            for i, c in enumerate(chunks)
            if c.strip()
        ]

    def _recursive_split(
        self, text: str, separators: list, chunk_size: int, result: list
    ):
        if len(text) <= chunk_size:
            result.append(text)
            return
        sep = separators[0] if separators else " "
        remaining_seps = separators[1:] if len(separators) > 1 else []
        parts = text.split(sep)
        current = ""
        for part in parts:
            test = current + sep + part if current else part
            if len(test) > chunk_size:
                if current:
                    if len(current) > chunk_size and remaining_seps:
                        self._recursive_split(
                            current, remaining_seps, chunk_size, result
                        )
                    else:
                        result.append(current)
                current = part
            else:
                current = test
        if current:
            if len(current) > chunk_size and remaining_seps:
                self._recursive_split(
                    current, remaining_seps, chunk_size, result
                )
            else:
                result.append(current)

    def semantic_chunk(
        self, text: str, threshold: float = 0.5, min_size: int = 100
    ) -> list:
        """Semantic chunking - embedding similarity based"""
        if not self.embedding_model:
            raise ValueError("Embedding model is required for semantic chunking")
        # Split into sentences
        sentences = [s.strip() for s in text.split(". ") if s.strip()]
        if len(sentences) <= 1:
            return [{"text": text, "index": 0}]
        # Compute embeddings for each sentence
        embeddings = self.embedding_model.encode(sentences)
        # Calculate similarity between adjacent sentences
        similarities = []
        for i in range(len(embeddings) - 1):
            sim = np.dot(embeddings[i], embeddings[i + 1]) / (
                np.linalg.norm(embeddings[i])
                * np.linalg.norm(embeddings[i + 1])
            )
            similarities.append(sim)
        # Split at points where similarity falls below threshold
        chunks = []
        current_chunk = sentences[0]
        for i, sim in enumerate(similarities):
            if sim < threshold and len(current_chunk) >= min_size:
                chunks.append(current_chunk)
                current_chunk = sentences[i + 1]
            else:
                current_chunk += ". " + sentences[i + 1]
        if current_chunk:
            chunks.append(current_chunk)
        return [
            {"text": c, "index": i}
            for i, c in enumerate(chunks)
        ]

    def structure_based_chunk(self, parsed_elements: list) -> list:
        """Structure-based chunking using layout analysis results"""
        chunks = []
        current_chunk = {
            "title": "",
            "content": "",
            "tables": [],
            "metadata": {}
        }
        for element in parsed_elements:
            elem_type = element.get("type", "")
            elem_text = element.get("text", "")
            if elem_type in ("Title", "TITLE"):
                # Start new section
                if current_chunk["content"]:
                    chunks.append(current_chunk.copy())
                current_chunk = {
                    "title": elem_text,
                    "content": "",
                    "tables": [],
                    "metadata": element.get("metadata", {})
                }
            elif elem_type in ("Table", "TABLE"):
                current_chunk["tables"].append(elem_text)
            else:
                current_chunk["content"] += elem_text + "\n"
        if current_chunk["content"]:
            chunks.append(current_chunk)
        return chunks

Building a Production Pipeline

Now let's combine the individual technologies covered so far to build an end-to-end Document Parsing pipeline suitable for production environments.

Pipeline Architecture

The overall pipeline consists of the following stages:

  1. Input Processing: Detect and normalize various document formats
  2. Parsing Strategy Selection: Choose optimal parser based on document type
  3. Text/Structure Extraction: OCR, layout analysis, table extraction
  4. LLM Enhancement: Quality improvement through multimodal LLM
  5. Chunking and Indexing: Chunk for RAG and store in vector DB
import os
import json
import logging
from pathlib import Path
from typing import Optional
from dataclasses import dataclass, field

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class PipelineConfig:
    """Pipeline configuration"""
    ocr_engine: str = "paddleocr"       # tesseract, paddleocr, easyocr
    ocr_lang: str = "en"
    layout_model: str = "unstructured"  # layoutlm, unstructured
    chunking_strategy: str = "recursive"  # fixed, recursive, semantic, structure
    chunk_size: int = 1000
    chunk_overlap: int = 200
    use_llm_correction: bool = True
    llm_model: str = "claude-sonnet-4-20250514"
    output_format: str = "json"         # json, markdown

@dataclass
class ProcessedDocument:
    """Processed document result"""
    source_path: str
    doc_type: str
    pages: list = field(default_factory=list)
    full_text: str = ""
    tables: list = field(default_factory=list)
    chunks: list = field(default_factory=list)
    metadata: dict = field(default_factory=dict)
    processing_log: list = field(default_factory=list)


class DocumentParsingPipeline:
    """Production Document Parsing Pipeline"""

    def __init__(self, config: Optional[PipelineConfig] = None):
        self.config = config or PipelineConfig()
        self.chunker = AdvancedChunker()

    def process(self, file_path: str) -> ProcessedDocument:
        """End-to-end document processing"""
        result = ProcessedDocument(source_path=file_path, doc_type="")
        logger.info(f"Processing: {file_path}")

        try:
            # 1. Detect document type
            doc_type = self._detect_type(file_path)
            result.doc_type = doc_type
            result.processing_log.append(
                f"Document type detected: {doc_type}"
            )

            # 2. Select and execute parsing strategy
            if doc_type == "native_pdf":
                raw_result = self._parse_native_pdf(file_path)
            elif doc_type in ("scanned_pdf", "image"):
                raw_result = self._parse_with_ocr(file_path)
            elif doc_type == "mixed_pdf":
                raw_result = self._parse_mixed_pdf(file_path)
            else:
                raw_result = self._parse_generic(file_path)

            result.full_text = raw_result.get("text", "")
            result.tables = raw_result.get("tables", [])
            result.pages = raw_result.get("pages", [])

            # 3. LLM correction (optional)
            if self.config.use_llm_correction and result.full_text:
                result.full_text = self._llm_correct(result.full_text)
                result.processing_log.append("LLM correction applied")

            # 4. Chunking
            result.chunks = self._chunk_document(result)
            result.processing_log.append(
                f"Created {len(result.chunks)} chunks "
                f"with strategy: {self.config.chunking_strategy}"
            )

            # 5. Generate metadata
            result.metadata = {
                "source": file_path,
                "doc_type": doc_type,
                "total_pages": len(result.pages),
                "total_tables": len(result.tables),
                "total_chunks": len(result.chunks),
                "text_length": len(result.full_text),
                "config": {
                    "ocr_engine": self.config.ocr_engine,
                    "chunking_strategy": self.config.chunking_strategy,
                    "chunk_size": self.config.chunk_size,
                }
            }
            logger.info(
                f"Processing complete: "
                f"{len(result.chunks)} chunks created"
            )

        except Exception as e:
            logger.error(f"Error processing {file_path}: {e}")
            result.processing_log.append(f"Error: {str(e)}")

        return result

    def _detect_type(self, file_path: str) -> str:
        """Auto-detect document type"""
        ext = Path(file_path).suffix.lower()
        if ext in (".jpg", ".jpeg", ".png", ".tiff", ".bmp"):
            return "image"
        elif ext == ".pdf":
            import fitz
            doc = fitz.open(file_path)
            total_text = sum(len(page.get_text()) for page in doc)
            total_images = sum(len(page.get_images()) for page in doc)
            doc.close()
            if total_text < 100:
                return "scanned_pdf"
            elif total_images > len(doc) * 0.5:
                return "mixed_pdf"
            return "native_pdf"
        return "unknown"

    def _parse_native_pdf(self, file_path: str) -> dict:
        """Parse native PDF"""
        parser = PyMuPDFParser(file_path)
        pages = parser.extract_text_with_layout()
        plumber = PdfPlumberParser(file_path)
        tables = plumber.extract_tables()
        text = plumber.extract_text_outside_tables()
        parser.close()
        plumber.close()
        return {"text": text, "tables": tables, "pages": pages}

    def _parse_with_ocr(self, file_path: str) -> dict:
        """OCR-based document parsing"""
        if self.config.ocr_engine == "paddleocr":
            processor = PaddleOCRProcessor(lang=self.config.ocr_lang)
            if file_path.lower().endswith(".pdf"):
                results = processor.process_pdf(file_path)
                text = "\n\n".join(r["full_text"] for r in results)
                return {"text": text, "pages": results, "tables": []}
            else:
                result = processor.process_image(file_path)
                return {
                    "text": result["full_text"],
                    "pages": [result],
                    "tables": []
                }
        else:
            ocr = TesseractOCR(lang="eng")
            text = ocr.extract_text(file_path)
            return {"text": text, "pages": [], "tables": []}

    def _parse_mixed_pdf(self, file_path: str) -> dict:
        """Parse mixed PDF - native + OCR"""
        native_result = self._parse_native_pdf(file_path)
        ocr_result = self._parse_with_ocr(file_path)
        combined_text = native_result["text"] or ocr_result["text"]
        return {
            "text": combined_text,
            "tables": native_result["tables"],
            "pages": native_result["pages"]
        }

    def _parse_generic(self, file_path: str) -> dict:
        """Generic document parsing (using Unstructured)"""
        parser = UnstructuredParser(strategy="hi_res")
        elements = parser.parse_pdf(file_path)
        text = "\n\n".join(e["text"] for e in elements)
        return {"text": text, "pages": [], "tables": []}

    def _llm_correct(self, text: str) -> str:
        """Text correction using LLM"""
        if len(text) < 100:
            return text
        import anthropic
        client = anthropic.Anthropic()
        sample = text[:3000] if len(text) > 3000 else text
        response = client.messages.create(
            model=self.config.llm_model,
            max_tokens=4096,
            messages=[
                {
                    "role": "user",
                    "content": (
                        "Correct errors in the following OCR-extracted text. "
                        "Preserve the original meaning and structure while "
                        "fixing typos, garbled characters, and line break "
                        f"errors only.\n\n{sample}"
                    ),
                }
            ],
        )
        return response.content[0].text

    def _chunk_document(self, doc: ProcessedDocument) -> list:
        """Document chunking"""
        strategy = self.config.chunking_strategy
        if strategy == "fixed":
            return self.chunker.fixed_size_chunk(
                doc.full_text,
                self.config.chunk_size,
                self.config.chunk_overlap
            )
        elif strategy == "recursive":
            return self.chunker.recursive_chunk(
                doc.full_text,
                self.config.chunk_size
            )
        elif strategy == "structure":
            if doc.pages:
                return self.chunker.structure_based_chunk(doc.pages)
            return self.chunker.recursive_chunk(
                doc.full_text, self.config.chunk_size
            )
        return self.chunker.recursive_chunk(
            doc.full_text, self.config.chunk_size
        )

    def save_results(
        self, result: ProcessedDocument, output_dir: str
    ):
        """Save processing results"""
        os.makedirs(output_dir, exist_ok=True)
        base_name = Path(result.source_path).stem
        # Save chunks
        chunks_path = os.path.join(output_dir, f"{base_name}_chunks.json")
        with open(chunks_path, "w", encoding="utf-8") as f:
            json.dump(result.chunks, f, ensure_ascii=False, indent=2)
        # Save metadata
        meta_path = os.path.join(output_dir, f"{base_name}_metadata.json")
        with open(meta_path, "w", encoding="utf-8") as f:
            json.dump(result.metadata, f, ensure_ascii=False, indent=2)
        # Save full text
        text_path = os.path.join(output_dir, f"{base_name}_full.txt")
        with open(text_path, "w", encoding="utf-8") as f:
            f.write(result.full_text)
        logger.info(f"Results saved to {output_dir}")


# Usage example
if __name__ == "__main__":
    config = PipelineConfig(
        ocr_engine="paddleocr",
        chunking_strategy="recursive",
        chunk_size=1000,
        chunk_overlap=200,
        use_llm_correction=True
    )
    pipeline = DocumentParsingPipeline(config)

    # Process single document
    result = pipeline.process("research_paper.pdf")
    print(f"Total {len(result.chunks)} chunks created")
    print(f"Extracted {len(result.tables)} tables")

    # Save results
    pipeline.save_results(result, "./output")

Batch Processing and Monitoring

import time
from concurrent.futures import ThreadPoolExecutor, as_completed

class BatchProcessor:
    """Bulk document batch processor"""

    def __init__(self, pipeline: DocumentParsingPipeline, max_workers: int = 4):
        self.pipeline = pipeline
        self.max_workers = max_workers

    def process_directory(self, input_dir: str, output_dir: str) -> dict:
        """Batch process all documents in a directory"""
        supported_ext = {".pdf", ".png", ".jpg", ".jpeg", ".tiff"}
        files = [
            str(f) for f in Path(input_dir).rglob("*")
            if f.suffix.lower() in supported_ext
        ]
        logger.info(f"Found {len(files)} documents to process")
        stats = {
            "total": len(files),
            "success": 0,
            "failed": 0,
            "total_chunks": 0,
            "processing_time": 0
        }
        start_time = time.time()
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = {
                executor.submit(
                    self._process_single, f, output_dir
                ): f for f in files
            }
            for future in as_completed(futures):
                file_path = futures[future]
                try:
                    result = future.result()
                    stats["success"] += 1
                    stats["total_chunks"] += len(result.chunks)
                    logger.info(f"Success: {file_path}")
                except Exception as e:
                    stats["failed"] += 1
                    logger.error(f"Failed: {file_path} - {e}")

        stats["processing_time"] = time.time() - start_time
        logger.info(
            f"Batch complete: {stats['success']}/{stats['total']} "
            f"in {stats['processing_time']:.1f}s"
        )
        return stats

    def _process_single(
        self, file_path: str, output_dir: str
    ) -> ProcessedDocument:
        """Process and save a single document"""
        result = self.pipeline.process(file_path)
        self.pipeline.save_results(result, output_dir)
        return result

Conclusion

Document Parsing is the foundational technology that determines data quality for AI/LLM applications. Here is a summary of the key topics covered in this article.

PDF Parsing: PyMuPDF excels at general-purpose processing, while pdfplumber is strong in table extraction. The optimal tool should be selected based on document type (native/scanned/mixed).

OCR: PaddleOCR provides high accuracy for Asian languages, while Tesseract offers broad language support. Image preprocessing (binarization, deskewing, etc.) significantly impacts accuracy.

Layout Analysis: Tools like LayoutLMv3 and Unstructured enable automatic recognition of document logical structure. Unstructured is particularly well-suited for rapid prototyping.

Table Extraction: While Table Transformer and Camelot are effective tools, complex tables (merged cells, borderless tables) require additional post-processing.

LLM-Based Document Understanding: Multimodal LLMs like GPT-4V and Claude have brought breakthrough performance improvements in OCR correction, structure analysis, and information extraction. The hybrid OCR + LLM approach represents the current best practice.

Chunking Strategy: In RAG pipelines, retrieval quality directly depends on the chunking strategy. Structure-based chunking that reflects the logical structure of documents provides the highest retrieval accuracy.

Document Parsing technology is rapidly evolving, and the emergence of multimodal LLMs is fundamentally changing the existing pipeline paradigm. However, due to cost and latency constraints, the most practical choice in production is a hybrid approach that appropriately combines traditional tools with LLMs.

References