- Published on
Document Parsing Technology Guide: PDF Parsing, OCR, Layout Analysis, and LLM-Based Extraction Pipeline
- Authors
- Name
- Introduction
- Overview of Document Parsing
- PDF Parsing Technologies and Tools
- OCR-Based Document Recognition
- Layout Analysis and Structure Extraction
- Table Extraction Techniques
- LLM-Based Document Understanding
- Document Chunking Strategies for RAG
- Building a Production Pipeline
- Conclusion
- References

Introduction
The vast majority of enterprise knowledge resides in unstructured documents: PDF reports, scanned contracts, research papers, invoices, and medical records. According to McKinsey, approximately 80% of enterprise data exists in such unstructured formats, and failing to leverage this data effectively means leaving most of your data assets untapped.
The quality of RAG (Retrieval-Augmented Generation) systems, knowledge search engines, and document automation systems ultimately depends on how accurately input documents are parsed. The principle of "Garbage in, garbage out" applies more than ever in the field of Document Parsing.
This article systematically covers the entire Document Parsing process with practical code: PDF parsing library comparisons, OCR engine selection criteria, layout analysis models, table extraction techniques, LLM-based multimodal document understanding, chunking strategies for RAG optimization, and production pipeline construction.
Overview of Document Parsing
Why Document Parsing Matters
Document Parsing is the technology of extracting structured information from unstructured documents. Beyond simple text extraction, the key is understanding the logical structure of documents (headings, body text, tables, figure captions, etc.) and organizing information into meaningful units.
Here are the major scenarios where Document Parsing is essential:
| Scenario | Description | Key Technology |
|---|---|---|
| RAG Pipeline | Split documents into chunks and store in vector DB | Chunking, Embeddings |
| Knowledge Base Construction | Extract structured knowledge from internal documents | NER, Relation Extraction |
| Document Automation | Extract key fields from invoices and contracts | Template Matching, Key-Value Extraction |
| Regulatory Compliance | Automatically track changes in regulatory documents | Change Detection, Comparative Analysis |
| Research Paper Analysis | Extract methodology, results, and citations from papers | Section Classification, Metadata Extraction |
Document Parsing Pipeline Architecture
A typical Document Parsing pipeline consists of the following stages:
- Document Ingestion: Input documents in various formats (PDF, images, Word, HTML)
- Preprocessing: Image correction, noise removal, page separation
- Text Extraction: Native PDF text extraction or OCR
- Layout Analysis: Document structure recognition (headings, body, tables, figures)
- Structured Extraction: Table parsing, key-value pair extraction, NER
- Post-processing: Text cleaning, chunking, metadata attachment
- Storage/Indexing: Store in vector DB or search engine
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
class DocumentType(Enum):
NATIVE_PDF = "native_pdf" # PDF with text layer
SCANNED_PDF = "scanned_pdf" # Scanned image PDF
IMAGE = "image" # JPG, PNG, etc.
MIXED_PDF = "mixed_pdf" # Native + scanned hybrid
@dataclass
class ParsedDocument:
text: str
pages: list = field(default_factory=list)
tables: list = field(default_factory=list)
images: list = field(default_factory=list)
metadata: dict = field(default_factory=dict)
doc_type: Optional[DocumentType] = None
def get_chunks(self, strategy: str = "recursive", chunk_size: int = 1000):
"""Chunk document using specified strategy"""
if strategy == "recursive":
return self._recursive_chunk(chunk_size)
elif strategy == "semantic":
return self._semantic_chunk(chunk_size)
elif strategy == "structure":
return self._structure_based_chunk()
return []
def _recursive_chunk(self, chunk_size: int):
separators = ["\n\n", "\n", ". ", " "]
return self._split_text(self.text, separators, chunk_size)
def _split_text(self, text: str, separators: list, chunk_size: int):
chunks = []
if len(text) <= chunk_size:
return [text]
sep = separators[0] if separators else " "
parts = text.split(sep)
current = ""
for part in parts:
if len(current) + len(part) + len(sep) > chunk_size:
if current:
chunks.append(current.strip())
current = part
else:
current = current + sep + part if current else part
if current:
chunks.append(current.strip())
return chunks
def _semantic_chunk(self, chunk_size: int):
# Semantic chunking implementation (embedding-based)
return self._recursive_chunk(chunk_size)
def _structure_based_chunk(self):
# Document structure-based chunking
return [page.get("text", "") for page in self.pages if page.get("text")]
PDF Parsing Technologies and Tools
PDF is the most widely used document format, yet it is also the most challenging format to parse. Since PDF is fundamentally a "print layout" format, the logical order of text is not guaranteed in the file structure.
Comparison of Major PDF Parsing Libraries
| Library | Strengths | Weaknesses | Best For |
|---|---|---|---|
| PyMuPDF (fitz) | Fast speed, rich features, image extraction | License (AGPL) | General PDF processing |
| pdfplumber | Accurate table extraction, coordinate-based access | Slower speed | Table-heavy documents |
| PyPDF2 | Pure Python, easy installation | Inaccurate with complex PDFs | Simple text extraction |
| Camelot | Dedicated table extraction | Cannot process full PDFs | When only tables are needed |
| pdfminer.six | Detailed layout information | Complex API | When layout analysis is needed |
PDF Parsing with PyMuPDF
import fitz # PyMuPDF
class PyMuPDFParser:
"""PyMuPDF-based PDF parser"""
def __init__(self, pdf_path: str):
self.doc = fitz.open(pdf_path)
self.pages = []
def extract_text_with_layout(self) -> list:
"""Extract text per page with layout information"""
results = []
for page_num, page in enumerate(self.doc):
blocks = page.get_text("dict")["blocks"]
page_data = {
"page_num": page_num + 1,
"width": page.rect.width,
"height": page.rect.height,
"blocks": []
}
for block in blocks:
if block["type"] == 0: # Text block
text_content = ""
for line in block["lines"]:
line_text = ""
for span in line["spans"]:
line_text += span["text"]
text_content += line_text + "\n"
page_data["blocks"].append({
"type": "text",
"bbox": block["bbox"],
"text": text_content.strip(),
"font_size": block["lines"][0]["spans"][0]["size"]
if block["lines"] and block["lines"][0]["spans"] else 0
})
elif block["type"] == 1: # Image block
page_data["blocks"].append({
"type": "image",
"bbox": block["bbox"],
"image_data": block.get("image", None)
})
results.append(page_data)
return results
def extract_images(self, output_dir: str) -> list:
"""Extract all images from the PDF"""
import os
os.makedirs(output_dir, exist_ok=True)
image_paths = []
for page_num, page in enumerate(self.doc):
images = page.get_images(full=True)
for img_idx, img in enumerate(images):
xref = img[0]
pix = fitz.Pixmap(self.doc, xref)
if pix.n < 5: # GRAY or RGB
img_path = os.path.join(
output_dir,
f"page_{page_num + 1}_img_{img_idx + 1}.png"
)
pix.save(img_path)
image_paths.append(img_path)
pix = None
return image_paths
def detect_document_type(self) -> DocumentType:
"""Determine whether PDF is native or scanned"""
total_text_len = 0
total_images = 0
for page in self.doc:
total_text_len += len(page.get_text())
total_images += len(page.get_images())
if total_text_len < 100 and total_images > 0:
return DocumentType.SCANNED_PDF
elif total_text_len > 100 and total_images > len(self.doc) * 0.5:
return DocumentType.MIXED_PDF
return DocumentType.NATIVE_PDF
def close(self):
self.doc.close()
Precision Parsing with pdfplumber
pdfplumber excels particularly in table extraction and provides precise coordinate information for each character.
import pdfplumber
class PdfPlumberParser:
"""pdfplumber-based precision PDF parser"""
def __init__(self, pdf_path: str):
self.pdf = pdfplumber.open(pdf_path)
def extract_tables(self) -> list:
"""Extract tables from all pages"""
all_tables = []
for page_num, page in enumerate(self.pdf.pages):
tables = page.extract_tables(
table_settings={
"vertical_strategy": "lines",
"horizontal_strategy": "lines",
"snap_tolerance": 3,
"join_tolerance": 3,
"edge_min_length": 3,
"min_words_vertical": 3,
"min_words_horizontal": 1,
}
)
for table_idx, table in enumerate(tables):
if table and len(table) > 1:
headers = table[0]
rows = table[1:]
all_tables.append({
"page": page_num + 1,
"table_index": table_idx,
"headers": headers,
"rows": rows,
"num_rows": len(rows),
"num_cols": len(headers) if headers else 0
})
return all_tables
def extract_text_outside_tables(self) -> str:
"""Extract only text outside table regions"""
full_text = []
for page in self.pdf.pages:
# Collect table bounding boxes
table_bboxes = []
tables = page.find_tables()
for table in tables:
table_bboxes.append(table.bbox)
# Crop and exclude table regions
filtered_page = page
for bbox in table_bboxes:
filtered_page = filtered_page.outside_bbox(bbox)
text = filtered_page.extract_text()
if text:
full_text.append(text)
return "\n\n".join(full_text)
def close(self):
self.pdf.close()
OCR-Based Document Recognition
OCR (Optical Character Recognition) is essential for processing scanned documents and image-based PDFs. OCR technology is rapidly evolving from traditional rule-based approaches to deep learning-based methods.
Comparison of Major OCR Engines
| Engine | Languages | Accuracy | Speed | Features |
|---|---|---|---|---|
| Tesseract 5 | 100+ | Medium-High | Medium | Open source, most widely used |
| EasyOCR | 80+ | Medium-High | Slow | PyTorch-based, easy installation |
| PaddleOCR | 80+ | High | Fast | Developed by Baidu, high accuracy |
| Google Vision API | 100+ | Highest | Fast | Cloud service, paid |
| Azure Document Intelligence | 100+ | Highest | Fast | Enterprise, structured extraction support |
Using Tesseract OCR
import pytesseract
from PIL import Image
import cv2
import numpy as np
class TesseractOCR:
"""Tesseract-based OCR processor"""
def __init__(self, lang: str = "eng"):
self.lang = lang
self.config = "--oem 3 --psm 6" # LSTM engine + uniform text block
def preprocess_image(self, image_path: str) -> np.ndarray:
"""Image preprocessing to improve OCR accuracy"""
img = cv2.imread(image_path)
# Convert to grayscale
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# Noise removal
denoised = cv2.fastNlMeansDenoising(gray, h=10)
# Binarization (Otsu's method)
_, binary = cv2.threshold(
denoised, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU
)
# Deskewing
coords = np.column_stack(np.where(binary > 0))
if len(coords) > 0:
angle = cv2.minAreaRect(coords)[-1]
if angle < -45:
angle = -(90 + angle)
else:
angle = -angle
if abs(angle) > 0.5:
h, w = binary.shape
center = (w // 2, h // 2)
matrix = cv2.getRotationMatrix2D(center, angle, 1.0)
binary = cv2.warpAffine(
binary, matrix, (w, h),
flags=cv2.INTER_CUBIC,
borderMode=cv2.BORDER_REPLICATE
)
return binary
def extract_text(self, image_path: str, preprocess: bool = True) -> str:
"""Extract text from image"""
if preprocess:
img = self.preprocess_image(image_path)
else:
img = Image.open(image_path)
text = pytesseract.image_to_string(
img, lang=self.lang, config=self.config
)
return text.strip()
def extract_with_boxes(self, image_path: str) -> list:
"""Extract text with bounding boxes"""
img = self.preprocess_image(image_path)
data = pytesseract.image_to_data(
img, lang=self.lang, config=self.config,
output_type=pytesseract.Output.DICT
)
results = []
for i in range(len(data["text"])):
if data["text"][i].strip():
results.append({
"text": data["text"][i],
"confidence": data["conf"][i],
"bbox": {
"x": data["left"][i],
"y": data["top"][i],
"w": data["width"][i],
"h": data["height"][i]
},
"block_num": data["block_num"][i],
"line_num": data["line_num"][i]
})
return results
High-Accuracy OCR with PaddleOCR
PaddleOCR demonstrates particularly high accuracy with Asian languages (Korean, Japanese, Chinese).
from paddleocr import PaddleOCR
class PaddleOCRProcessor:
"""PaddleOCR-based high-accuracy OCR"""
def __init__(self, lang: str = "en"):
self.ocr = PaddleOCR(
use_angle_cls=True, # Text direction detection
lang=lang,
use_gpu=True,
det_db_thresh=0.3,
det_db_box_thresh=0.5,
rec_batch_num=16
)
def process_image(self, image_path: str) -> dict:
"""Extract text and layout information from image"""
result = self.ocr.ocr(image_path, cls=True)
extracted = {
"lines": [],
"full_text": "",
"confidence_avg": 0.0
}
if not result or not result[0]:
return extracted
total_conf = 0
lines = []
for line in result[0]:
bbox = line[0] # 4-point coordinates
text = line[1][0]
confidence = line[1][1]
lines.append({
"text": text,
"confidence": confidence,
"bbox": bbox,
"y_center": (bbox[0][1] + bbox[2][1]) / 2
})
total_conf += confidence
# Sort by y-coordinate (reading order)
lines.sort(key=lambda x: (x["y_center"], x["bbox"][0][0]))
extracted["lines"] = lines
extracted["full_text"] = "\n".join(l["text"] for l in lines)
extracted["confidence_avg"] = (
total_conf / len(lines) if lines else 0
)
return extracted
def process_pdf(self, pdf_path: str) -> list:
"""OCR process all pages of a PDF"""
import fitz
doc = fitz.open(pdf_path)
results = []
for page_num, page in enumerate(doc):
# Convert page to high-resolution image
mat = fitz.Matrix(2.0, 2.0) # 2x scale
pix = page.get_pixmap(matrix=mat)
img_path = f"/tmp/page_{page_num}.png"
pix.save(img_path)
# Run OCR
ocr_result = self.process_image(img_path)
ocr_result["page_num"] = page_num + 1
results.append(ocr_result)
doc.close()
return results
Layout Analysis and Structure Extraction
Layout analysis is the process of identifying regions such as text blocks, headings, tables, figures, and captions in documents, and determining the logical reading order. Recently, transformer-based models have been leading this field.
Major Layout Analysis Models
| Model | Developer | Core Technology | Features |
|---|---|---|---|
| LayoutLMv3 | Microsoft | Multimodal Transformer | Unified text+image+layout |
| DiT (Document Image Transformer) | Microsoft | Vision Transformer | Image-based document understanding |
| Donut | NAVER CLOVA | OCR-free approach | Direct document understanding without OCR |
| Table Transformer | Microsoft | DETR-based | Specialized in table detection/structure recognition |
| Unstructured | Open Source | Hybrid | Multi-model combination pipeline |
Document Structure Analysis with LayoutLMv3
from transformers import (
LayoutLMv3ForTokenClassification,
LayoutLMv3Processor,
)
from PIL import Image
import torch
class LayoutAnalyzer:
"""LayoutLMv3-based document layout analyzer"""
LABEL_MAP = {
0: "O",
1: "B-TITLE",
2: "I-TITLE",
3: "B-TEXT",
4: "I-TEXT",
5: "B-TABLE",
6: "I-TABLE",
7: "B-FIGURE",
8: "I-FIGURE",
9: "B-LIST",
10: "I-LIST",
11: "B-HEADER",
12: "I-HEADER",
13: "B-FOOTER",
14: "I-FOOTER",
}
def __init__(self, model_name: str = "microsoft/layoutlmv3-base"):
self.processor = LayoutLMv3Processor.from_pretrained(
model_name, apply_ocr=True
)
self.model = LayoutLMv3ForTokenClassification.from_pretrained(
model_name, num_labels=len(self.LABEL_MAP)
)
self.model.eval()
def analyze(self, image_path: str) -> list:
"""Analyze document image layout"""
image = Image.open(image_path).convert("RGB")
encoding = self.processor(
image,
return_tensors="pt",
truncation=True,
max_length=512
)
with torch.no_grad():
outputs = self.model(**encoding)
predictions = outputs.logits.argmax(-1).squeeze().tolist()
tokens = self.processor.tokenizer.convert_ids_to_tokens(
encoding["input_ids"].squeeze()
)
# Map predictions to tokens
elements = []
current_label = None
current_text = ""
for token, pred in zip(tokens, predictions):
label = self.LABEL_MAP.get(pred, "O")
if label.startswith("B-"):
if current_text and current_label:
elements.append({
"type": current_label,
"text": current_text.strip()
})
current_label = label[2:]
current_text = token.replace("##", "")
elif label.startswith("I-") and current_label:
current_text += token.replace("##", "")
else:
if current_text and current_label:
elements.append({
"type": current_label,
"text": current_text.strip()
})
current_label = None
current_text = ""
if current_text and current_label:
elements.append({
"type": current_label,
"text": current_text.strip()
})
return elements
Unified Parsing with Unstructured
Unstructured is an open-source library that supports various document formats and provides an integrated pipeline combining multiple parsing engines.
from unstructured.partition.pdf import partition_pdf
from unstructured.partition.auto import partition
from unstructured.chunking.title import chunk_by_title
class UnstructuredParser:
"""Unstructured-based unified document parser"""
def __init__(self, strategy: str = "hi_res"):
self.strategy = strategy # "fast", "ocr_only", "hi_res"
def parse_pdf(self, pdf_path: str) -> list:
"""Parse PDF into structured elements"""
elements = partition_pdf(
filename=pdf_path,
strategy=self.strategy,
infer_table_structure=True,
languages=["eng"],
extract_images_in_pdf=True,
extract_image_block_output_dir="./extracted_images"
)
parsed = []
for element in elements:
parsed.append({
"type": type(element).__name__,
"text": str(element),
"metadata": {
"page_number": element.metadata.page_number,
"coordinates": (
element.metadata.coordinates
if hasattr(element.metadata, "coordinates")
else None
),
"parent_id": element.metadata.parent_id,
}
})
return parsed
def parse_and_chunk(
self, file_path: str, max_characters: int = 1000
) -> list:
"""Parse and chunk by title in one step"""
elements = partition(
filename=file_path,
strategy=self.strategy
)
chunks = chunk_by_title(
elements,
max_characters=max_characters,
combine_text_under_n_chars=200,
new_after_n_chars=800
)
return [
{
"text": str(chunk),
"type": type(chunk).__name__,
"metadata": chunk.metadata.to_dict()
}
for chunk in chunks
]
Table Extraction Techniques
Accurately extracting tables from documents is one of the most challenging tasks in Document Parsing. Tables can have complex cell merging, nested structures, and diverse styles.
Key Challenges in Table Extraction
- Table Detection: Accurately identifying table regions in documents
- Structure Recognition: Recognizing row/column structure, merged cells, and header rows
- Cell Content Extraction: Accurately extracting text from each cell
- Borderless Tables: Recognizing the structure of tables without visible lines
Table Detection with Table Transformer
from transformers import (
TableTransformerForObjectDetection,
AutoImageProcessor,
)
from PIL import Image
import torch
class TableExtractor:
"""Table Transformer-based table extractor"""
def __init__(self):
self.processor = AutoImageProcessor.from_pretrained(
"microsoft/table-transformer-detection"
)
self.detection_model = TableTransformerForObjectDetection.from_pretrained(
"microsoft/table-transformer-detection"
)
self.structure_processor = AutoImageProcessor.from_pretrained(
"microsoft/table-transformer-structure-recognition"
)
self.structure_model = TableTransformerForObjectDetection.from_pretrained(
"microsoft/table-transformer-structure-recognition"
)
def detect_tables(self, image_path: str, threshold: float = 0.7) -> list:
"""Detect table regions in an image"""
image = Image.open(image_path).convert("RGB")
inputs = self.processor(images=image, return_tensors="pt")
with torch.no_grad():
outputs = self.detection_model(**inputs)
target_sizes = torch.tensor([image.size[::-1]])
results = self.processor.post_process_object_detection(
outputs, threshold=threshold, target_sizes=target_sizes
)[0]
tables = []
for score, label, box in zip(
results["scores"], results["labels"], results["boxes"]
):
tables.append({
"score": score.item(),
"label": self.detection_model.config.id2label[label.item()],
"bbox": box.tolist() # [x1, y1, x2, y2]
})
return tables
def recognize_structure(
self, image_path: str, table_bbox: list
) -> dict:
"""Recognize internal structure of detected table"""
image = Image.open(image_path).convert("RGB")
# Crop table region
table_image = image.crop(table_bbox)
inputs = self.structure_processor(
images=table_image, return_tensors="pt"
)
with torch.no_grad():
outputs = self.structure_model(**inputs)
target_sizes = torch.tensor([table_image.size[::-1]])
results = self.structure_processor.post_process_object_detection(
outputs, threshold=0.5, target_sizes=target_sizes
)[0]
structure = {"rows": [], "columns": [], "cells": []}
for score, label, box in zip(
results["scores"], results["labels"], results["boxes"]
):
label_name = self.structure_model.config.id2label[label.item()]
entry = {"bbox": box.tolist(), "score": score.item()}
if "row" in label_name:
structure["rows"].append(entry)
elif "column" in label_name:
structure["columns"].append(entry)
else:
structure["cells"].append(entry)
# Sort by coordinates
structure["rows"].sort(key=lambda x: x["bbox"][1])
structure["columns"].sort(key=lambda x: x["bbox"][0])
return structure
Simple Table Extraction with Camelot
import camelot
import pandas as pd
def extract_tables_with_camelot(
pdf_path: str, pages: str = "all", flavor: str = "lattice"
) -> list:
"""Extract tables from PDF using Camelot
Args:
pdf_path: Path to PDF file
pages: Pages to extract ("all" or "1,2,3")
flavor: "lattice" (line-based) or "stream" (whitespace-based)
"""
tables = camelot.read_pdf(
pdf_path,
pages=pages,
flavor=flavor,
strip_text="\n"
)
results = []
for i, table in enumerate(tables):
df = table.df
results.append({
"table_index": i,
"page": table.page,
"accuracy": table.accuracy,
"data": df.to_dict(orient="records"),
"shape": df.shape,
"dataframe": df
})
return results
# Usage example
if __name__ == "__main__":
tables = extract_tables_with_camelot(
"financial_report.pdf",
pages="1-5",
flavor="lattice"
)
for t in tables:
print(f"Table {t['table_index']} (page {t['page']})")
print(f" Accuracy: {t['accuracy']:.1f}%")
print(f" Shape: {t['shape']}")
print(t["dataframe"].head())
LLM-Based Document Understanding
The emergence of multimodal LLMs like GPT-4V and Claude 3.5 is fundamentally transforming document understanding approaches. Instead of traditional OCR + post-processing pipelines, document images can be directly fed to LLMs for content understanding and structuring.
Document Processing with Multimodal LLMs
import anthropic
import base64
from pathlib import Path
class LLMDocumentProcessor:
"""LLM-based multimodal document processor"""
def __init__(self, model: str = "claude-sonnet-4-20250514"):
self.client = anthropic.Anthropic()
self.model = model
def _encode_image(self, image_path: str) -> tuple:
"""Encode image to base64"""
path = Path(image_path)
suffix = path.suffix.lower()
media_type_map = {
".png": "image/png",
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".gif": "image/gif",
".webp": "image/webp",
}
media_type = media_type_map.get(suffix, "image/png")
with open(image_path, "rb") as f:
data = base64.standard_b64encode(f.read()).decode("utf-8")
return data, media_type
def extract_structured_data(
self, image_path: str, schema_description: str
) -> str:
"""Extract structured data from document image"""
data, media_type = self._encode_image(image_path)
prompt = f"""Analyze this document image and extract structured data as JSON matching the following schema.
Schema:
{schema_description}
Instructions:
- Extract all text accurately
- Preserve row/column structure for tables
- Mark uncertain content as null
- Respond only in JSON format"""
response = self.client.messages.create(
model=self.model,
max_tokens=4096,
messages=[
{
"role": "user",
"content": [
{
"type": "image",
"source": {
"type": "base64",
"media_type": media_type,
"data": data,
},
},
{
"type": "text",
"text": prompt,
},
],
}
],
)
return response.content[0].text
def analyze_document_layout(self, image_path: str) -> str:
"""Analyze document layout and extract structure"""
data, media_type = self._encode_image(image_path)
prompt = """Analyze the layout of this document and return the following information as JSON:
1. Document type (paper, report, invoice, contract, etc.)
2. Section structure (titles and hierarchy)
3. Table presence and location description
4. Figure/chart presence and description
5. Key-value pairs (if applicable)
6. Reading order of the full text
Respond only in JSON format."""
response = self.client.messages.create(
model=self.model,
max_tokens=4096,
messages=[
{
"role": "user",
"content": [
{
"type": "image",
"source": {
"type": "base64",
"media_type": media_type,
"data": data,
},
},
{"type": "text", "text": prompt},
],
}
],
)
return response.content[0].text
def compare_documents(
self, image_path_1: str, image_path_2: str
) -> str:
"""Compare and analyze two document images"""
data1, mt1 = self._encode_image(image_path_1)
data2, mt2 = self._encode_image(image_path_2)
prompt = """Compare these two documents and analyze the following:
1. Similarities
2. Differences
3. Added content
4. Removed content
5. Modified content
Respond in structured JSON format."""
response = self.client.messages.create(
model=self.model,
max_tokens=4096,
messages=[
{
"role": "user",
"content": [
{
"type": "image",
"source": {
"type": "base64",
"media_type": mt1,
"data": data1,
},
},
{
"type": "image",
"source": {
"type": "base64",
"media_type": mt2,
"data": data2,
},
},
{"type": "text", "text": prompt},
],
}
],
)
return response.content[0].text
Hybrid Approach: OCR + LLM Correction
A hybrid approach that first extracts text via OCR and then uses an LLM to correct errors and organize structure delivers excellent results in practice.
class HybridDocumentProcessor:
"""OCR + LLM hybrid document processor"""
def __init__(self):
self.ocr = PaddleOCRProcessor(lang="en")
self.llm = LLMDocumentProcessor()
def process(self, image_path: str) -> dict:
"""Process document using hybrid approach"""
# Step 1: Extract text with OCR
ocr_result = self.ocr.process_image(image_path)
raw_text = ocr_result["full_text"]
confidence = ocr_result["confidence_avg"]
# Step 2: Correct and structure with LLM
correction_prompt = (
"Review and correct the following OCR-extracted text. "
"Fix low-confidence portions based on context, "
"and organize the logical structure (headings, body, lists) "
"into Markdown format.\n\n"
f"OCR extracted text (average confidence: {confidence:.2f}):\n"
"---\n"
f"{raw_text}\n"
"---\n\n"
"Return the corrected Markdown."
)
import anthropic
client = anthropic.Anthropic()
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=4096,
messages=[{"role": "user", "content": correction_prompt}],
)
corrected_text = response.content[0].text
return {
"raw_ocr": raw_text,
"ocr_confidence": confidence,
"corrected_text": corrected_text,
"method": "hybrid_ocr_llm"
}
Document Chunking Strategies for RAG
When the ultimate goal of Document Parsing is a RAG pipeline, the chunking strategy is the key factor that determines retrieval quality. Improper chunking significantly degrades retrieval accuracy and causes hallucinations due to context loss.
Chunking Strategy Comparison
| Strategy | Description | Pros | Cons |
|---|---|---|---|
| Fixed-size | Split by fixed token/character count | Simple, uniform size | Context disruption |
| Recursive | Split by separator priority | Preserves structure, flexible | Uneven sizes |
| Semantic | Split based on embedding similarity | Preserves meaning units | High compute cost |
| Structure-based | Split by headings/sections | Maintains logical structure | Requires structure recognition |
| Sliding Window | Split with overlap | Context continuity | Increased storage |
Advanced Chunking Implementation
from typing import Optional
import numpy as np
class AdvancedChunker:
"""Advanced chunker supporting multiple chunking strategies"""
def __init__(self, embedding_model=None):
self.embedding_model = embedding_model
def fixed_size_chunk(
self, text: str, chunk_size: int = 1000, overlap: int = 200
) -> list:
"""Fixed-size + overlap chunking"""
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
chunk = text[start:end]
# Cut at sentence boundaries
if end < len(text):
last_period = chunk.rfind(". ")
last_newline = chunk.rfind("\n")
cut_point = max(last_period, last_newline)
if cut_point > chunk_size * 0.5:
chunk = chunk[:cut_point + 1]
end = start + cut_point + 1
chunks.append({
"text": chunk.strip(),
"start": start,
"end": end,
"index": len(chunks)
})
start = end - overlap
return chunks
def recursive_chunk(
self,
text: str,
chunk_size: int = 1000,
separators: Optional[list] = None,
) -> list:
"""Recursive splitting chunking"""
if separators is None:
separators = ["\n\n\n", "\n\n", "\n", ". ", ", ", " "]
chunks = []
self._recursive_split(text, separators, chunk_size, chunks)
return [
{"text": c, "index": i}
for i, c in enumerate(chunks)
if c.strip()
]
def _recursive_split(
self, text: str, separators: list, chunk_size: int, result: list
):
if len(text) <= chunk_size:
result.append(text)
return
sep = separators[0] if separators else " "
remaining_seps = separators[1:] if len(separators) > 1 else []
parts = text.split(sep)
current = ""
for part in parts:
test = current + sep + part if current else part
if len(test) > chunk_size:
if current:
if len(current) > chunk_size and remaining_seps:
self._recursive_split(
current, remaining_seps, chunk_size, result
)
else:
result.append(current)
current = part
else:
current = test
if current:
if len(current) > chunk_size and remaining_seps:
self._recursive_split(
current, remaining_seps, chunk_size, result
)
else:
result.append(current)
def semantic_chunk(
self, text: str, threshold: float = 0.5, min_size: int = 100
) -> list:
"""Semantic chunking - embedding similarity based"""
if not self.embedding_model:
raise ValueError("Embedding model is required for semantic chunking")
# Split into sentences
sentences = [s.strip() for s in text.split(". ") if s.strip()]
if len(sentences) <= 1:
return [{"text": text, "index": 0}]
# Compute embeddings for each sentence
embeddings = self.embedding_model.encode(sentences)
# Calculate similarity between adjacent sentences
similarities = []
for i in range(len(embeddings) - 1):
sim = np.dot(embeddings[i], embeddings[i + 1]) / (
np.linalg.norm(embeddings[i])
* np.linalg.norm(embeddings[i + 1])
)
similarities.append(sim)
# Split at points where similarity falls below threshold
chunks = []
current_chunk = sentences[0]
for i, sim in enumerate(similarities):
if sim < threshold and len(current_chunk) >= min_size:
chunks.append(current_chunk)
current_chunk = sentences[i + 1]
else:
current_chunk += ". " + sentences[i + 1]
if current_chunk:
chunks.append(current_chunk)
return [
{"text": c, "index": i}
for i, c in enumerate(chunks)
]
def structure_based_chunk(self, parsed_elements: list) -> list:
"""Structure-based chunking using layout analysis results"""
chunks = []
current_chunk = {
"title": "",
"content": "",
"tables": [],
"metadata": {}
}
for element in parsed_elements:
elem_type = element.get("type", "")
elem_text = element.get("text", "")
if elem_type in ("Title", "TITLE"):
# Start new section
if current_chunk["content"]:
chunks.append(current_chunk.copy())
current_chunk = {
"title": elem_text,
"content": "",
"tables": [],
"metadata": element.get("metadata", {})
}
elif elem_type in ("Table", "TABLE"):
current_chunk["tables"].append(elem_text)
else:
current_chunk["content"] += elem_text + "\n"
if current_chunk["content"]:
chunks.append(current_chunk)
return chunks
Building a Production Pipeline
Now let's combine the individual technologies covered so far to build an end-to-end Document Parsing pipeline suitable for production environments.
Pipeline Architecture
The overall pipeline consists of the following stages:
- Input Processing: Detect and normalize various document formats
- Parsing Strategy Selection: Choose optimal parser based on document type
- Text/Structure Extraction: OCR, layout analysis, table extraction
- LLM Enhancement: Quality improvement through multimodal LLM
- Chunking and Indexing: Chunk for RAG and store in vector DB
import os
import json
import logging
from pathlib import Path
from typing import Optional
from dataclasses import dataclass, field
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class PipelineConfig:
"""Pipeline configuration"""
ocr_engine: str = "paddleocr" # tesseract, paddleocr, easyocr
ocr_lang: str = "en"
layout_model: str = "unstructured" # layoutlm, unstructured
chunking_strategy: str = "recursive" # fixed, recursive, semantic, structure
chunk_size: int = 1000
chunk_overlap: int = 200
use_llm_correction: bool = True
llm_model: str = "claude-sonnet-4-20250514"
output_format: str = "json" # json, markdown
@dataclass
class ProcessedDocument:
"""Processed document result"""
source_path: str
doc_type: str
pages: list = field(default_factory=list)
full_text: str = ""
tables: list = field(default_factory=list)
chunks: list = field(default_factory=list)
metadata: dict = field(default_factory=dict)
processing_log: list = field(default_factory=list)
class DocumentParsingPipeline:
"""Production Document Parsing Pipeline"""
def __init__(self, config: Optional[PipelineConfig] = None):
self.config = config or PipelineConfig()
self.chunker = AdvancedChunker()
def process(self, file_path: str) -> ProcessedDocument:
"""End-to-end document processing"""
result = ProcessedDocument(source_path=file_path, doc_type="")
logger.info(f"Processing: {file_path}")
try:
# 1. Detect document type
doc_type = self._detect_type(file_path)
result.doc_type = doc_type
result.processing_log.append(
f"Document type detected: {doc_type}"
)
# 2. Select and execute parsing strategy
if doc_type == "native_pdf":
raw_result = self._parse_native_pdf(file_path)
elif doc_type in ("scanned_pdf", "image"):
raw_result = self._parse_with_ocr(file_path)
elif doc_type == "mixed_pdf":
raw_result = self._parse_mixed_pdf(file_path)
else:
raw_result = self._parse_generic(file_path)
result.full_text = raw_result.get("text", "")
result.tables = raw_result.get("tables", [])
result.pages = raw_result.get("pages", [])
# 3. LLM correction (optional)
if self.config.use_llm_correction and result.full_text:
result.full_text = self._llm_correct(result.full_text)
result.processing_log.append("LLM correction applied")
# 4. Chunking
result.chunks = self._chunk_document(result)
result.processing_log.append(
f"Created {len(result.chunks)} chunks "
f"with strategy: {self.config.chunking_strategy}"
)
# 5. Generate metadata
result.metadata = {
"source": file_path,
"doc_type": doc_type,
"total_pages": len(result.pages),
"total_tables": len(result.tables),
"total_chunks": len(result.chunks),
"text_length": len(result.full_text),
"config": {
"ocr_engine": self.config.ocr_engine,
"chunking_strategy": self.config.chunking_strategy,
"chunk_size": self.config.chunk_size,
}
}
logger.info(
f"Processing complete: "
f"{len(result.chunks)} chunks created"
)
except Exception as e:
logger.error(f"Error processing {file_path}: {e}")
result.processing_log.append(f"Error: {str(e)}")
return result
def _detect_type(self, file_path: str) -> str:
"""Auto-detect document type"""
ext = Path(file_path).suffix.lower()
if ext in (".jpg", ".jpeg", ".png", ".tiff", ".bmp"):
return "image"
elif ext == ".pdf":
import fitz
doc = fitz.open(file_path)
total_text = sum(len(page.get_text()) for page in doc)
total_images = sum(len(page.get_images()) for page in doc)
doc.close()
if total_text < 100:
return "scanned_pdf"
elif total_images > len(doc) * 0.5:
return "mixed_pdf"
return "native_pdf"
return "unknown"
def _parse_native_pdf(self, file_path: str) -> dict:
"""Parse native PDF"""
parser = PyMuPDFParser(file_path)
pages = parser.extract_text_with_layout()
plumber = PdfPlumberParser(file_path)
tables = plumber.extract_tables()
text = plumber.extract_text_outside_tables()
parser.close()
plumber.close()
return {"text": text, "tables": tables, "pages": pages}
def _parse_with_ocr(self, file_path: str) -> dict:
"""OCR-based document parsing"""
if self.config.ocr_engine == "paddleocr":
processor = PaddleOCRProcessor(lang=self.config.ocr_lang)
if file_path.lower().endswith(".pdf"):
results = processor.process_pdf(file_path)
text = "\n\n".join(r["full_text"] for r in results)
return {"text": text, "pages": results, "tables": []}
else:
result = processor.process_image(file_path)
return {
"text": result["full_text"],
"pages": [result],
"tables": []
}
else:
ocr = TesseractOCR(lang="eng")
text = ocr.extract_text(file_path)
return {"text": text, "pages": [], "tables": []}
def _parse_mixed_pdf(self, file_path: str) -> dict:
"""Parse mixed PDF - native + OCR"""
native_result = self._parse_native_pdf(file_path)
ocr_result = self._parse_with_ocr(file_path)
combined_text = native_result["text"] or ocr_result["text"]
return {
"text": combined_text,
"tables": native_result["tables"],
"pages": native_result["pages"]
}
def _parse_generic(self, file_path: str) -> dict:
"""Generic document parsing (using Unstructured)"""
parser = UnstructuredParser(strategy="hi_res")
elements = parser.parse_pdf(file_path)
text = "\n\n".join(e["text"] for e in elements)
return {"text": text, "pages": [], "tables": []}
def _llm_correct(self, text: str) -> str:
"""Text correction using LLM"""
if len(text) < 100:
return text
import anthropic
client = anthropic.Anthropic()
sample = text[:3000] if len(text) > 3000 else text
response = client.messages.create(
model=self.config.llm_model,
max_tokens=4096,
messages=[
{
"role": "user",
"content": (
"Correct errors in the following OCR-extracted text. "
"Preserve the original meaning and structure while "
"fixing typos, garbled characters, and line break "
f"errors only.\n\n{sample}"
),
}
],
)
return response.content[0].text
def _chunk_document(self, doc: ProcessedDocument) -> list:
"""Document chunking"""
strategy = self.config.chunking_strategy
if strategy == "fixed":
return self.chunker.fixed_size_chunk(
doc.full_text,
self.config.chunk_size,
self.config.chunk_overlap
)
elif strategy == "recursive":
return self.chunker.recursive_chunk(
doc.full_text,
self.config.chunk_size
)
elif strategy == "structure":
if doc.pages:
return self.chunker.structure_based_chunk(doc.pages)
return self.chunker.recursive_chunk(
doc.full_text, self.config.chunk_size
)
return self.chunker.recursive_chunk(
doc.full_text, self.config.chunk_size
)
def save_results(
self, result: ProcessedDocument, output_dir: str
):
"""Save processing results"""
os.makedirs(output_dir, exist_ok=True)
base_name = Path(result.source_path).stem
# Save chunks
chunks_path = os.path.join(output_dir, f"{base_name}_chunks.json")
with open(chunks_path, "w", encoding="utf-8") as f:
json.dump(result.chunks, f, ensure_ascii=False, indent=2)
# Save metadata
meta_path = os.path.join(output_dir, f"{base_name}_metadata.json")
with open(meta_path, "w", encoding="utf-8") as f:
json.dump(result.metadata, f, ensure_ascii=False, indent=2)
# Save full text
text_path = os.path.join(output_dir, f"{base_name}_full.txt")
with open(text_path, "w", encoding="utf-8") as f:
f.write(result.full_text)
logger.info(f"Results saved to {output_dir}")
# Usage example
if __name__ == "__main__":
config = PipelineConfig(
ocr_engine="paddleocr",
chunking_strategy="recursive",
chunk_size=1000,
chunk_overlap=200,
use_llm_correction=True
)
pipeline = DocumentParsingPipeline(config)
# Process single document
result = pipeline.process("research_paper.pdf")
print(f"Total {len(result.chunks)} chunks created")
print(f"Extracted {len(result.tables)} tables")
# Save results
pipeline.save_results(result, "./output")
Batch Processing and Monitoring
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
class BatchProcessor:
"""Bulk document batch processor"""
def __init__(self, pipeline: DocumentParsingPipeline, max_workers: int = 4):
self.pipeline = pipeline
self.max_workers = max_workers
def process_directory(self, input_dir: str, output_dir: str) -> dict:
"""Batch process all documents in a directory"""
supported_ext = {".pdf", ".png", ".jpg", ".jpeg", ".tiff"}
files = [
str(f) for f in Path(input_dir).rglob("*")
if f.suffix.lower() in supported_ext
]
logger.info(f"Found {len(files)} documents to process")
stats = {
"total": len(files),
"success": 0,
"failed": 0,
"total_chunks": 0,
"processing_time": 0
}
start_time = time.time()
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = {
executor.submit(
self._process_single, f, output_dir
): f for f in files
}
for future in as_completed(futures):
file_path = futures[future]
try:
result = future.result()
stats["success"] += 1
stats["total_chunks"] += len(result.chunks)
logger.info(f"Success: {file_path}")
except Exception as e:
stats["failed"] += 1
logger.error(f"Failed: {file_path} - {e}")
stats["processing_time"] = time.time() - start_time
logger.info(
f"Batch complete: {stats['success']}/{stats['total']} "
f"in {stats['processing_time']:.1f}s"
)
return stats
def _process_single(
self, file_path: str, output_dir: str
) -> ProcessedDocument:
"""Process and save a single document"""
result = self.pipeline.process(file_path)
self.pipeline.save_results(result, output_dir)
return result
Conclusion
Document Parsing is the foundational technology that determines data quality for AI/LLM applications. Here is a summary of the key topics covered in this article.
PDF Parsing: PyMuPDF excels at general-purpose processing, while pdfplumber is strong in table extraction. The optimal tool should be selected based on document type (native/scanned/mixed).
OCR: PaddleOCR provides high accuracy for Asian languages, while Tesseract offers broad language support. Image preprocessing (binarization, deskewing, etc.) significantly impacts accuracy.
Layout Analysis: Tools like LayoutLMv3 and Unstructured enable automatic recognition of document logical structure. Unstructured is particularly well-suited for rapid prototyping.
Table Extraction: While Table Transformer and Camelot are effective tools, complex tables (merged cells, borderless tables) require additional post-processing.
LLM-Based Document Understanding: Multimodal LLMs like GPT-4V and Claude have brought breakthrough performance improvements in OCR correction, structure analysis, and information extraction. The hybrid OCR + LLM approach represents the current best practice.
Chunking Strategy: In RAG pipelines, retrieval quality directly depends on the chunking strategy. Structure-based chunking that reflects the logical structure of documents provides the highest retrieval accuracy.
Document Parsing technology is rapidly evolving, and the emergence of multimodal LLMs is fundamentally changing the existing pipeline paradigm. However, due to cost and latency constraints, the most practical choice in production is a hybrid approach that appropriately combines traditional tools with LLMs.
References
- PyMuPDF Documentation: https://pymupdf.readthedocs.io/
- pdfplumber Documentation: https://github.com/jsvine/pdfplumber
- PaddleOCR Documentation: https://github.com/PaddlePaddle/PaddleOCR
- Tesseract OCR: https://github.com/tesseract-ocr/tesseract
- LayoutLMv3 Paper: "LayoutLMv3: Pre-training for Document AI with Unified Text and Image Masking" (Huang et al., 2022)
- Table Transformer: https://github.com/microsoft/table-transformer
- Unstructured Documentation: https://docs.unstructured.io/
- Donut Paper: "OCR-free Document Understanding Transformer" (Kim et al., 2022)
- LangChain Document Loaders: https://python.langchain.com/docs/modules/data_connection/document_loaders/
- Camelot: https://camelot-py.readthedocs.io/