Skip to content

Split View: Document Parsing 기술 가이드: PDF 파싱·OCR·레이아웃 분석·LLM 기반 문서 추출 실전 파이프라인

✨ Learn with Quiz
|

Document Parsing 기술 가이드: PDF 파싱·OCR·레이아웃 분석·LLM 기반 문서 추출 실전 파이프라인

Document Parsing 기술 가이드

들어가며

기업이 보유한 지식의 대부분은 PDF 보고서, 스캔된 계약서, 연구 논문, 송장, 의료 기록 등 비정형 문서에 담겨 있다. McKinsey의 조사에 따르면 기업 데이터의 약 80%가 이러한 비정형 형태로 존재하며, 이를 효과적으로 활용하지 못하면 데이터 자산의 대부분을 방치하는 셈이다.

RAG(Retrieval-Augmented Generation) 시스템, 지식 검색 엔진, 문서 자동화 시스템의 품질은 결국 입력 문서를 얼마나 정확하게 파싱하느냐에 달려 있다. "Garbage in, garbage out"이라는 원칙이 그 어느 때보다 적용되는 분야가 바로 Document Parsing이다.

이 글에서는 PDF 파싱 라이브러리 비교, OCR 엔진 선택 기준, 레이아웃 분석 모델, 테이블 추출 기법, LLM 기반 멀티모달 문서 이해, RAG 최적화를 위한 청킹 전략, 프로덕션 파이프라인 구축까지 Document Parsing의 전 과정을 실전 코드와 함께 체계적으로 다룬다.

Document Parsing의 개요

왜 Document Parsing이 중요한가

Document Parsing은 비정형 문서에서 구조화된 정보를 추출하는 기술이다. 단순한 텍스트 추출을 넘어, 문서의 논리적 구조(제목, 본문, 표, 그림 캡션 등)를 이해하고 의미 있는 단위로 정보를 조직화하는 것이 핵심이다.

Document Parsing이 필요한 주요 시나리오는 다음과 같다.

시나리오설명핵심 기술
RAG 파이프라인문서를 청크로 분할하여 벡터 DB에 저장청킹, 임베딩
지식 베이스 구축사내 문서에서 구조화된 지식 추출NER, 관계 추출
문서 자동화송장, 계약서에서 핵심 필드 추출템플릿 매칭, 키-값 추출
규제 컴플라이언스규제 문서 변경사항 자동 추적변경 감지, 비교 분석
연구 논문 분석논문에서 방법론, 결과, 인용 추출섹션 분류, 메타데이터 추출

Document Parsing 파이프라인 아키텍처

일반적인 Document Parsing 파이프라인은 다음과 같은 단계로 구성된다.

  1. 문서 수집: PDF, 이미지, Word, HTML 등 다양한 형식의 문서 입력
  2. 전처리: 이미지 보정, 노이즈 제거, 페이지 분리
  3. 텍스트 추출: 네이티브 PDF 텍스트 추출 또는 OCR
  4. 레이아웃 분석: 문서 구조 인식 (제목, 본문, 표, 그림)
  5. 구조화 추출: 테이블 파싱, 키-값 쌍 추출, NER
  6. 후처리: 텍스트 정제, 청킹, 메타데이터 부착
  7. 저장/인덱싱: 벡터 DB 또는 검색 엔진에 저장
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional

class DocumentType(Enum):
    NATIVE_PDF = "native_pdf"      # 텍스트 레이어가 있는 PDF
    SCANNED_PDF = "scanned_pdf"    # 스캔된 이미지 PDF
    IMAGE = "image"                # JPG, PNG 등 이미지
    MIXED_PDF = "mixed_pdf"        # 네이티브 + 스캔 혼합

@dataclass
class ParsedDocument:
    text: str
    pages: list = field(default_factory=list)
    tables: list = field(default_factory=list)
    images: list = field(default_factory=list)
    metadata: dict = field(default_factory=dict)
    doc_type: Optional[DocumentType] = None

    def get_chunks(self, strategy: str = "recursive", chunk_size: int = 1000):
        """문서를 지정된 전략으로 청킹"""
        if strategy == "recursive":
            return self._recursive_chunk(chunk_size)
        elif strategy == "semantic":
            return self._semantic_chunk(chunk_size)
        elif strategy == "structure":
            return self._structure_based_chunk()
        return []

    def _recursive_chunk(self, chunk_size: int):
        separators = ["\n\n", "\n", ". ", " "]
        return self._split_text(self.text, separators, chunk_size)

    def _split_text(self, text: str, separators: list, chunk_size: int):
        chunks = []
        if len(text) <= chunk_size:
            return [text]
        sep = separators[0] if separators else " "
        parts = text.split(sep)
        current = ""
        for part in parts:
            if len(current) + len(part) + len(sep) > chunk_size:
                if current:
                    chunks.append(current.strip())
                current = part
            else:
                current = current + sep + part if current else part
        if current:
            chunks.append(current.strip())
        return chunks

    def _semantic_chunk(self, chunk_size: int):
        # 시맨틱 청킹 구현 (임베딩 기반)
        return self._recursive_chunk(chunk_size)

    def _structure_based_chunk(self):
        # 문서 구조 기반 청킹
        return [page.get("text", "") for page in self.pages if page.get("text")]

PDF 파싱 기술과 도구

PDF는 가장 널리 사용되는 문서 형식이지만, 파싱 관점에서는 가장 까다로운 형식이기도 하다. PDF는 본질적으로 "인쇄 레이아웃" 포맷이기 때문에, 텍스트의 논리적 순서가 파일 구조에 보장되지 않는다.

주요 PDF 파싱 라이브러리 비교

라이브러리장점단점적합한 용도
PyMuPDF (fitz)빠른 속도, 풍부한 기능, 이미지 추출라이선스 (AGPL)범용 PDF 처리
pdfplumber정확한 테이블 추출, 좌표 기반 접근속도가 느림테이블 중심 문서
PyPDF2순수 Python, 설치 간편복잡한 PDF에서 부정확간단한 텍스트 추출
Camelot테이블 추출 전용PDF 전체 처리 불가테이블만 필요할 때
pdfminer.six상세한 레이아웃 정보API가 복잡함레이아웃 분석 필요 시

PyMuPDF를 활용한 PDF 파싱

import fitz  # PyMuPDF

class PyMuPDFParser:
    """PyMuPDF 기반 PDF 파서"""

    def __init__(self, pdf_path: str):
        self.doc = fitz.open(pdf_path)
        self.pages = []

    def extract_text_with_layout(self) -> list:
        """페이지별 텍스트를 레이아웃 정보와 함께 추출"""
        results = []
        for page_num, page in enumerate(self.doc):
            blocks = page.get_text("dict")["blocks"]
            page_data = {
                "page_num": page_num + 1,
                "width": page.rect.width,
                "height": page.rect.height,
                "blocks": []
            }
            for block in blocks:
                if block["type"] == 0:  # 텍스트 블록
                    text_content = ""
                    for line in block["lines"]:
                        line_text = ""
                        for span in line["spans"]:
                            line_text += span["text"]
                        text_content += line_text + "\n"
                    page_data["blocks"].append({
                        "type": "text",
                        "bbox": block["bbox"],
                        "text": text_content.strip(),
                        "font_size": block["lines"][0]["spans"][0]["size"]
                        if block["lines"] and block["lines"][0]["spans"] else 0
                    })
                elif block["type"] == 1:  # 이미지 블록
                    page_data["blocks"].append({
                        "type": "image",
                        "bbox": block["bbox"],
                        "image_data": block.get("image", None)
                    })
            results.append(page_data)
        return results

    def extract_images(self, output_dir: str) -> list:
        """PDF에서 모든 이미지를 추출"""
        import os
        os.makedirs(output_dir, exist_ok=True)
        image_paths = []
        for page_num, page in enumerate(self.doc):
            images = page.get_images(full=True)
            for img_idx, img in enumerate(images):
                xref = img[0]
                pix = fitz.Pixmap(self.doc, xref)
                if pix.n < 5:  # GRAY 또는 RGB
                    img_path = os.path.join(
                        output_dir,
                        f"page_{page_num + 1}_img_{img_idx + 1}.png"
                    )
                    pix.save(img_path)
                    image_paths.append(img_path)
                pix = None
        return image_paths

    def detect_document_type(self) -> DocumentType:
        """PDF가 네이티브인지 스캔인지 판별"""
        total_text_len = 0
        total_images = 0
        for page in self.doc:
            total_text_len += len(page.get_text())
            total_images += len(page.get_images())
        if total_text_len < 100 and total_images > 0:
            return DocumentType.SCANNED_PDF
        elif total_text_len > 100 and total_images > len(self.doc) * 0.5:
            return DocumentType.MIXED_PDF
        return DocumentType.NATIVE_PDF

    def close(self):
        self.doc.close()

pdfplumber를 활용한 정밀 파싱

pdfplumber는 특히 테이블 추출에 강점이 있으며, 각 문자의 정확한 좌표 정보를 제공한다.

import pdfplumber

class PdfPlumberParser:
    """pdfplumber 기반 정밀 PDF 파서"""

    def __init__(self, pdf_path: str):
        self.pdf = pdfplumber.open(pdf_path)

    def extract_tables(self) -> list:
        """모든 페이지에서 테이블 추출"""
        all_tables = []
        for page_num, page in enumerate(self.pdf.pages):
            tables = page.extract_tables(
                table_settings={
                    "vertical_strategy": "lines",
                    "horizontal_strategy": "lines",
                    "snap_tolerance": 3,
                    "join_tolerance": 3,
                    "edge_min_length": 3,
                    "min_words_vertical": 3,
                    "min_words_horizontal": 1,
                }
            )
            for table_idx, table in enumerate(tables):
                if table and len(table) > 1:
                    headers = table[0]
                    rows = table[1:]
                    all_tables.append({
                        "page": page_num + 1,
                        "table_index": table_idx,
                        "headers": headers,
                        "rows": rows,
                        "num_rows": len(rows),
                        "num_cols": len(headers) if headers else 0
                    })
        return all_tables

    def extract_text_outside_tables(self) -> str:
        """테이블 영역을 제외한 텍스트만 추출"""
        full_text = []
        for page in self.pdf.pages:
            # 테이블 바운딩 박스 수집
            table_bboxes = []
            tables = page.find_tables()
            for table in tables:
                table_bboxes.append(table.bbox)
            # 테이블 영역 크롭 후 제외
            filtered_page = page
            for bbox in table_bboxes:
                filtered_page = filtered_page.outside_bbox(bbox)
            text = filtered_page.extract_text()
            if text:
                full_text.append(text)
        return "\n\n".join(full_text)

    def close(self):
        self.pdf.close()

OCR 기반 문서 인식

스캔된 문서나 이미지 기반 PDF를 처리하려면 OCR(Optical Character Recognition)이 필수적이다. OCR 기술은 전통적인 규칙 기반 방식에서 딥러닝 기반으로 빠르게 발전하고 있다.

주요 OCR 엔진 비교

엔진지원 언어정확도속도특징
Tesseract 5100+중~상중간오픈소스, 가장 널리 사용
EasyOCR80+중~상느림PyTorch 기반, 설치 간편
PaddleOCR80+빠름Baidu 개발, 높은 정확도
Google Vision API100+최상빠름클라우드 서비스, 유료
Azure Document Intelligence100+최상빠름엔터프라이즈, 구조화 추출 지원

Tesseract OCR 활용

import pytesseract
from PIL import Image
import cv2
import numpy as np

class TesseractOCR:
    """Tesseract 기반 OCR 처리기"""

    def __init__(self, lang: str = "kor+eng"):
        self.lang = lang
        self.config = "--oem 3 --psm 6"  # LSTM 엔진 + 균일 텍스트 블록

    def preprocess_image(self, image_path: str) -> np.ndarray:
        """OCR 정확도 향상을 위한 이미지 전처리"""
        img = cv2.imread(image_path)
        # 그레이스케일 변환
        gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        # 노이즈 제거
        denoised = cv2.fastNlMeansDenoising(gray, h=10)
        # 이진화 (Otsu's method)
        _, binary = cv2.threshold(
            denoised, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU
        )
        # 기울기 보정
        coords = np.column_stack(np.where(binary > 0))
        if len(coords) > 0:
            angle = cv2.minAreaRect(coords)[-1]
            if angle < -45:
                angle = -(90 + angle)
            else:
                angle = -angle
            if abs(angle) > 0.5:
                h, w = binary.shape
                center = (w // 2, h // 2)
                matrix = cv2.getRotationMatrix2D(center, angle, 1.0)
                binary = cv2.warpAffine(
                    binary, matrix, (w, h),
                    flags=cv2.INTER_CUBIC,
                    borderMode=cv2.BORDER_REPLICATE
                )
        return binary

    def extract_text(self, image_path: str, preprocess: bool = True) -> str:
        """이미지에서 텍스트 추출"""
        if preprocess:
            img = self.preprocess_image(image_path)
        else:
            img = Image.open(image_path)
        text = pytesseract.image_to_string(
            img, lang=self.lang, config=self.config
        )
        return text.strip()

    def extract_with_boxes(self, image_path: str) -> list:
        """바운딩 박스와 함께 텍스트 추출"""
        img = self.preprocess_image(image_path)
        data = pytesseract.image_to_data(
            img, lang=self.lang, config=self.config,
            output_type=pytesseract.Output.DICT
        )
        results = []
        for i in range(len(data["text"])):
            if data["text"][i].strip():
                results.append({
                    "text": data["text"][i],
                    "confidence": data["conf"][i],
                    "bbox": {
                        "x": data["left"][i],
                        "y": data["top"][i],
                        "w": data["width"][i],
                        "h": data["height"][i]
                    },
                    "block_num": data["block_num"][i],
                    "line_num": data["line_num"][i]
                })
        return results

PaddleOCR로 고정확도 OCR 구현

PaddleOCR은 특히 아시아 언어(한국어, 일본어, 중국어)에서 높은 정확도를 보여준다.

from paddleocr import PaddleOCR

class PaddleOCRProcessor:
    """PaddleOCR 기반 고정확도 OCR"""

    def __init__(self, lang: str = "korean"):
        self.ocr = PaddleOCR(
            use_angle_cls=True,  # 텍스트 방향 감지
            lang=lang,
            use_gpu=True,
            det_db_thresh=0.3,
            det_db_box_thresh=0.5,
            rec_batch_num=16
        )

    def process_image(self, image_path: str) -> dict:
        """이미지에서 텍스트와 레이아웃 정보 추출"""
        result = self.ocr.ocr(image_path, cls=True)
        extracted = {
            "lines": [],
            "full_text": "",
            "confidence_avg": 0.0
        }
        if not result or not result[0]:
            return extracted

        total_conf = 0
        lines = []
        for line in result[0]:
            bbox = line[0]  # 4개 꼭짓점 좌표
            text = line[1][0]
            confidence = line[1][1]
            lines.append({
                "text": text,
                "confidence": confidence,
                "bbox": bbox,
                "y_center": (bbox[0][1] + bbox[2][1]) / 2
            })
            total_conf += confidence

        # y좌표 기준으로 정렬 (읽기 순서)
        lines.sort(key=lambda x: (x["y_center"], x["bbox"][0][0]))
        extracted["lines"] = lines
        extracted["full_text"] = "\n".join(l["text"] for l in lines)
        extracted["confidence_avg"] = (
            total_conf / len(lines) if lines else 0
        )
        return extracted

    def process_pdf(self, pdf_path: str) -> list:
        """PDF의 모든 페이지를 OCR 처리"""
        import fitz
        doc = fitz.open(pdf_path)
        results = []
        for page_num, page in enumerate(doc):
            # 페이지를 고해상도 이미지로 변환
            mat = fitz.Matrix(2.0, 2.0)  # 2x 스케일
            pix = page.get_pixmap(matrix=mat)
            img_path = f"/tmp/page_{page_num}.png"
            pix.save(img_path)
            # OCR 실행
            ocr_result = self.process_image(img_path)
            ocr_result["page_num"] = page_num + 1
            results.append(ocr_result)
        doc.close()
        return results

레이아웃 분석과 구조 추출

레이아웃 분석은 문서에서 텍스트 블록, 제목, 표, 그림, 캡션 등의 영역을 식별하고 논리적 읽기 순서를 결정하는 과정이다. 최근에는 트랜스포머 기반 모델이 이 분야를 주도하고 있다.

주요 레이아웃 분석 모델

모델개발사핵심 기술특징
LayoutLMv3Microsoft멀티모달 트랜스포머텍스트+이미지+레이아웃 통합
DiT (Document Image Transformer)MicrosoftVision Transformer이미지 기반 문서 이해
DonutNAVER CLOVAOCR-free 접근OCR 없이 직접 문서 이해
Table TransformerMicrosoftDETR 기반테이블 감지/구조 인식 특화
Unstructured오픈소스하이브리드여러 모델 조합 파이프라인

LayoutLMv3를 활용한 문서 구조 분석

from transformers import (
    LayoutLMv3ForTokenClassification,
    LayoutLMv3Processor,
)
from PIL import Image
import torch

class LayoutAnalyzer:
    """LayoutLMv3 기반 문서 레이아웃 분석기"""

    LABEL_MAP = {
        0: "O",
        1: "B-TITLE",
        2: "I-TITLE",
        3: "B-TEXT",
        4: "I-TEXT",
        5: "B-TABLE",
        6: "I-TABLE",
        7: "B-FIGURE",
        8: "I-FIGURE",
        9: "B-LIST",
        10: "I-LIST",
        11: "B-HEADER",
        12: "I-HEADER",
        13: "B-FOOTER",
        14: "I-FOOTER",
    }

    def __init__(self, model_name: str = "microsoft/layoutlmv3-base"):
        self.processor = LayoutLMv3Processor.from_pretrained(
            model_name, apply_ocr=True
        )
        self.model = LayoutLMv3ForTokenClassification.from_pretrained(
            model_name, num_labels=len(self.LABEL_MAP)
        )
        self.model.eval()

    def analyze(self, image_path: str) -> list:
        """문서 이미지의 레이아웃 분석"""
        image = Image.open(image_path).convert("RGB")
        encoding = self.processor(
            image,
            return_tensors="pt",
            truncation=True,
            max_length=512
        )
        with torch.no_grad():
            outputs = self.model(**encoding)
        predictions = outputs.logits.argmax(-1).squeeze().tolist()
        tokens = self.processor.tokenizer.convert_ids_to_tokens(
            encoding["input_ids"].squeeze()
        )
        # 토큰별 예측 결과 매핑
        elements = []
        current_label = None
        current_text = ""
        for token, pred in zip(tokens, predictions):
            label = self.LABEL_MAP.get(pred, "O")
            if label.startswith("B-"):
                if current_text and current_label:
                    elements.append({
                        "type": current_label,
                        "text": current_text.strip()
                    })
                current_label = label[2:]
                current_text = token.replace("##", "")
            elif label.startswith("I-") and current_label:
                current_text += token.replace("##", "")
            else:
                if current_text and current_label:
                    elements.append({
                        "type": current_label,
                        "text": current_text.strip()
                    })
                current_label = None
                current_text = ""
        if current_text and current_label:
            elements.append({
                "type": current_label,
                "text": current_text.strip()
            })
        return elements

Unstructured 라이브러리를 활용한 통합 파싱

Unstructured는 다양한 문서 형식을 지원하는 오픈소스 라이브러리로, 여러 파싱 엔진을 통합하여 제공한다.

from unstructured.partition.pdf import partition_pdf
from unstructured.partition.auto import partition
from unstructured.chunking.title import chunk_by_title

class UnstructuredParser:
    """Unstructured 기반 통합 문서 파서"""

    def __init__(self, strategy: str = "hi_res"):
        self.strategy = strategy  # "fast", "ocr_only", "hi_res"

    def parse_pdf(self, pdf_path: str) -> list:
        """PDF를 구조화된 요소로 파싱"""
        elements = partition_pdf(
            filename=pdf_path,
            strategy=self.strategy,
            infer_table_structure=True,
            languages=["kor", "eng"],
            extract_images_in_pdf=True,
            extract_image_block_output_dir="./extracted_images"
        )
        parsed = []
        for element in elements:
            parsed.append({
                "type": type(element).__name__,
                "text": str(element),
                "metadata": {
                    "page_number": element.metadata.page_number,
                    "coordinates": (
                        element.metadata.coordinates
                        if hasattr(element.metadata, "coordinates")
                        else None
                    ),
                    "parent_id": element.metadata.parent_id,
                }
            })
        return parsed

    def parse_and_chunk(
        self, file_path: str, max_characters: int = 1000
    ) -> list:
        """파싱 후 제목 기반 청킹까지 수행"""
        elements = partition(
            filename=file_path,
            strategy=self.strategy
        )
        chunks = chunk_by_title(
            elements,
            max_characters=max_characters,
            combine_text_under_n_chars=200,
            new_after_n_chars=800
        )
        return [
            {
                "text": str(chunk),
                "type": type(chunk).__name__,
                "metadata": chunk.metadata.to_dict()
            }
            for chunk in chunks
        ]

테이블 추출 기법

문서에서 테이블을 정확하게 추출하는 것은 Document Parsing에서 가장 도전적인 과제 중 하나이다. 테이블은 복잡한 셀 병합, 중첩 구조, 다양한 스타일을 가질 수 있기 때문이다.

테이블 추출의 주요 과제

  • 테이블 감지: 문서에서 테이블 영역을 정확히 식별
  • 구조 인식: 행/열 구조, 병합된 셀, 헤더 행 인식
  • 셀 내용 추출: 각 셀의 텍스트를 정확히 추출
  • 무선 테이블(borderless table): 선이 없는 테이블의 구조 인식

Table Transformer를 활용한 테이블 감지

from transformers import (
    TableTransformerForObjectDetection,
    AutoImageProcessor,
)
from PIL import Image
import torch

class TableExtractor:
    """Table Transformer 기반 테이블 추출기"""

    def __init__(self):
        self.processor = AutoImageProcessor.from_pretrained(
            "microsoft/table-transformer-detection"
        )
        self.detection_model = TableTransformerForObjectDetection.from_pretrained(
            "microsoft/table-transformer-detection"
        )
        self.structure_processor = AutoImageProcessor.from_pretrained(
            "microsoft/table-transformer-structure-recognition"
        )
        self.structure_model = TableTransformerForObjectDetection.from_pretrained(
            "microsoft/table-transformer-structure-recognition"
        )

    def detect_tables(self, image_path: str, threshold: float = 0.7) -> list:
        """이미지에서 테이블 영역 감지"""
        image = Image.open(image_path).convert("RGB")
        inputs = self.processor(images=image, return_tensors="pt")
        with torch.no_grad():
            outputs = self.detection_model(**inputs)
        target_sizes = torch.tensor([image.size[::-1]])
        results = self.processor.post_process_object_detection(
            outputs, threshold=threshold, target_sizes=target_sizes
        )[0]
        tables = []
        for score, label, box in zip(
            results["scores"], results["labels"], results["boxes"]
        ):
            tables.append({
                "score": score.item(),
                "label": self.detection_model.config.id2label[label.item()],
                "bbox": box.tolist()  # [x1, y1, x2, y2]
            })
        return tables

    def recognize_structure(
        self, image_path: str, table_bbox: list
    ) -> dict:
        """감지된 테이블의 내부 구조 인식"""
        image = Image.open(image_path).convert("RGB")
        # 테이블 영역 크롭
        table_image = image.crop(table_bbox)
        inputs = self.structure_processor(
            images=table_image, return_tensors="pt"
        )
        with torch.no_grad():
            outputs = self.structure_model(**inputs)
        target_sizes = torch.tensor([table_image.size[::-1]])
        results = self.structure_processor.post_process_object_detection(
            outputs, threshold=0.5, target_sizes=target_sizes
        )[0]
        structure = {"rows": [], "columns": [], "cells": []}
        for score, label, box in zip(
            results["scores"], results["labels"], results["boxes"]
        ):
            label_name = self.structure_model.config.id2label[label.item()]
            entry = {"bbox": box.tolist(), "score": score.item()}
            if "row" in label_name:
                structure["rows"].append(entry)
            elif "column" in label_name:
                structure["columns"].append(entry)
            else:
                structure["cells"].append(entry)
        # y좌표 기준 정렬
        structure["rows"].sort(key=lambda x: x["bbox"][1])
        structure["columns"].sort(key=lambda x: x["bbox"][0])
        return structure

Camelot을 활용한 간편한 테이블 추출

import camelot
import pandas as pd

def extract_tables_with_camelot(
    pdf_path: str, pages: str = "all", flavor: str = "lattice"
) -> list:
    """Camelot으로 PDF에서 테이블 추출

    Args:
        pdf_path: PDF 파일 경로
        pages: 추출할 페이지 ("all" 또는 "1,2,3")
        flavor: "lattice" (선 기반) 또는 "stream" (공백 기반)
    """
    tables = camelot.read_pdf(
        pdf_path,
        pages=pages,
        flavor=flavor,
        strip_text="\n"
    )
    results = []
    for i, table in enumerate(tables):
        df = table.df
        results.append({
            "table_index": i,
            "page": table.page,
            "accuracy": table.accuracy,
            "data": df.to_dict(orient="records"),
            "shape": df.shape,
            "dataframe": df
        })
    return results


# 사용 예시
if __name__ == "__main__":
    tables = extract_tables_with_camelot(
        "financial_report.pdf",
        pages="1-5",
        flavor="lattice"
    )
    for t in tables:
        print(f"테이블 {t['table_index']} (페이지 {t['page']})")
        print(f"  정확도: {t['accuracy']:.1f}%")
        print(f"  크기: {t['shape']}")
        print(t["dataframe"].head())

LLM 기반 문서 이해

최근 GPT-4V, Claude 3.5 등 멀티모달 LLM의 등장으로 문서 이해 방식이 근본적으로 변화하고 있다. 기존의 OCR + 후처리 파이프라인 대신, 문서 이미지를 직접 LLM에 입력하여 내용을 이해하고 구조화할 수 있다.

멀티모달 LLM을 활용한 문서 처리

import anthropic
import base64
from pathlib import Path

class LLMDocumentProcessor:
    """LLM 기반 멀티모달 문서 처리기"""

    def __init__(self, model: str = "claude-sonnet-4-20250514"):
        self.client = anthropic.Anthropic()
        self.model = model

    def _encode_image(self, image_path: str) -> tuple:
        """이미지를 base64로 인코딩"""
        path = Path(image_path)
        suffix = path.suffix.lower()
        media_type_map = {
            ".png": "image/png",
            ".jpg": "image/jpeg",
            ".jpeg": "image/jpeg",
            ".gif": "image/gif",
            ".webp": "image/webp",
        }
        media_type = media_type_map.get(suffix, "image/png")
        with open(image_path, "rb") as f:
            data = base64.standard_b64encode(f.read()).decode("utf-8")
        return data, media_type

    def extract_structured_data(
        self, image_path: str, schema_description: str
    ) -> str:
        """문서 이미지에서 구조화된 데이터 추출"""
        data, media_type = self._encode_image(image_path)
        prompt = f"""이 문서 이미지를 분석하여 다음 스키마에 맞는 구조화된 데이터를 JSON으로 추출해 주세요.

스키마:
{schema_description}

주의사항:
- 모든 텍스트를 정확히 추출하세요
- 테이블이 있다면 행/열 구조를 유지하세요
- 확실하지 않은 내용은 null로 표시하세요
- JSON 형식으로만 응답하세요"""

        response = self.client.messages.create(
            model=self.model,
            max_tokens=4096,
            messages=[
                {
                    "role": "user",
                    "content": [
                        {
                            "type": "image",
                            "source": {
                                "type": "base64",
                                "media_type": media_type,
                                "data": data,
                            },
                        },
                        {
                            "type": "text",
                            "text": prompt,
                        },
                    ],
                }
            ],
        )
        return response.content[0].text

    def analyze_document_layout(self, image_path: str) -> str:
        """문서 레이아웃 분석 및 구조 추출"""
        data, media_type = self._encode_image(image_path)
        prompt = """이 문서의 레이아웃을 분석하여 다음 정보를 JSON으로 반환하세요:

1. 문서 유형 (논문, 보고서, 송장, 계약서 등)
2. 섹션 구조 (제목과 계층)
3. 테이블 존재 여부와 위치 설명
4. 그림/차트 존재 여부와 설명
5. 핵심 키-값 쌍 (있는 경우)
6. 전체 텍스트의 읽기 순서

JSON으로만 응답하세요."""

        response = self.client.messages.create(
            model=self.model,
            max_tokens=4096,
            messages=[
                {
                    "role": "user",
                    "content": [
                        {
                            "type": "image",
                            "source": {
                                "type": "base64",
                                "media_type": media_type,
                                "data": data,
                            },
                        },
                        {"type": "text", "text": prompt},
                    ],
                }
            ],
        )
        return response.content[0].text

    def compare_documents(
        self, image_path_1: str, image_path_2: str
    ) -> str:
        """두 문서 이미지를 비교 분석"""
        data1, mt1 = self._encode_image(image_path_1)
        data2, mt2 = self._encode_image(image_path_2)
        prompt = """두 문서를 비교하여 다음을 분석하세요:
1. 공통점
2. 차이점
3. 추가된 내용
4. 삭제된 내용
5. 변경된 내용

구조화된 JSON으로 응답하세요."""

        response = self.client.messages.create(
            model=self.model,
            max_tokens=4096,
            messages=[
                {
                    "role": "user",
                    "content": [
                        {
                            "type": "image",
                            "source": {
                                "type": "base64",
                                "media_type": mt1,
                                "data": data1,
                            },
                        },
                        {
                            "type": "image",
                            "source": {
                                "type": "base64",
                                "media_type": mt2,
                                "data": data2,
                            },
                        },
                        {"type": "text", "text": prompt},
                    ],
                }
            ],
        )
        return response.content[0].text

하이브리드 접근: OCR + LLM 보정

OCR로 먼저 텍스트를 추출한 후, LLM으로 오류를 보정하고 구조를 정리하는 하이브리드 방식이 실무에서 높은 효과를 보인다.

class HybridDocumentProcessor:
    """OCR + LLM 하이브리드 문서 처리기"""

    def __init__(self):
        self.ocr = PaddleOCRProcessor(lang="korean")
        self.llm = LLMDocumentProcessor()

    def process(self, image_path: str) -> dict:
        """하이브리드 방식으로 문서 처리"""
        # 1단계: OCR로 텍스트 추출
        ocr_result = self.ocr.process_image(image_path)
        raw_text = ocr_result["full_text"]
        confidence = ocr_result["confidence_avg"]

        # 2단계: LLM으로 보정 및 구조화
        correction_prompt = f"""다음 OCR 추출 텍스트를 검토하고 보정하세요.
OCR 신뢰도가 낮은 부분을 문맥에 맞게 수정하고,
문서의 논리적 구조(제목, 본문, 목록 등)를 Markdown으로 정리하세요.

OCR 추출 텍스트 (평균 신뢰도: {confidence:.2f}):
---
{raw_text}
---

보정된 Markdown을 반환하세요."""

        import anthropic
        client = anthropic.Anthropic()
        response = client.messages.create(
            model="claude-sonnet-4-20250514",
            max_tokens=4096,
            messages=[{"role": "user", "content": correction_prompt}],
        )
        corrected_text = response.content[0].text

        return {
            "raw_ocr": raw_text,
            "ocr_confidence": confidence,
            "corrected_text": corrected_text,
            "method": "hybrid_ocr_llm"
        }

RAG를 위한 문서 청킹 전략

Document Parsing의 최종 목표가 RAG 파이프라인이라면, 청킹(Chunking) 전략이 검색 품질을 결정짓는 핵심 요소이다. 부적절한 청킹은 검색 정확도를 크게 떨어뜨리고, 맥락 손실로 인한 환각(hallucination)을 유발한다.

청킹 전략 비교

전략설명장점단점
고정 크기일정 토큰/문자 수로 분할구현 간단, 균일한 크기문맥 단절
재귀적 분할구분자 우선순위로 분할구조 유지, 유연함크기 불균일
시맨틱임베딩 유사도 기반 분할의미 단위 보존계산 비용 높음
문서 구조 기반헤딩/섹션 기반 분할논리적 구조 유지구조 인식 필요
슬라이딩 윈도우오버랩 포함 분할맥락 연속성저장 공간 증가

고급 청킹 구현

from typing import Optional
import numpy as np

class AdvancedChunker:
    """다양한 청킹 전략을 지원하는 고급 청커"""

    def __init__(self, embedding_model=None):
        self.embedding_model = embedding_model

    def fixed_size_chunk(
        self, text: str, chunk_size: int = 1000, overlap: int = 200
    ) -> list:
        """고정 크기 + 오버랩 청킹"""
        chunks = []
        start = 0
        while start < len(text):
            end = start + chunk_size
            chunk = text[start:end]
            # 문장 경계에서 자르기
            if end < len(text):
                last_period = chunk.rfind(". ")
                last_newline = chunk.rfind("\n")
                cut_point = max(last_period, last_newline)
                if cut_point > chunk_size * 0.5:
                    chunk = chunk[:cut_point + 1]
                    end = start + cut_point + 1
            chunks.append({
                "text": chunk.strip(),
                "start": start,
                "end": end,
                "index": len(chunks)
            })
            start = end - overlap
        return chunks

    def recursive_chunk(
        self,
        text: str,
        chunk_size: int = 1000,
        separators: Optional[list] = None,
    ) -> list:
        """재귀적 분할 청킹"""
        if separators is None:
            separators = ["\n\n\n", "\n\n", "\n", ". ", ", ", " "]
        chunks = []
        self._recursive_split(text, separators, chunk_size, chunks)
        return [
            {"text": c, "index": i}
            for i, c in enumerate(chunks)
            if c.strip()
        ]

    def _recursive_split(
        self, text: str, separators: list, chunk_size: int, result: list
    ):
        if len(text) <= chunk_size:
            result.append(text)
            return
        sep = separators[0] if separators else " "
        remaining_seps = separators[1:] if len(separators) > 1 else []
        parts = text.split(sep)
        current = ""
        for part in parts:
            test = current + sep + part if current else part
            if len(test) > chunk_size:
                if current:
                    if len(current) > chunk_size and remaining_seps:
                        self._recursive_split(
                            current, remaining_seps, chunk_size, result
                        )
                    else:
                        result.append(current)
                current = part
            else:
                current = test
        if current:
            if len(current) > chunk_size and remaining_seps:
                self._recursive_split(
                    current, remaining_seps, chunk_size, result
                )
            else:
                result.append(current)

    def semantic_chunk(
        self, text: str, threshold: float = 0.5, min_size: int = 100
    ) -> list:
        """시맨틱 청킹 - 임베딩 유사도 기반"""
        if not self.embedding_model:
            raise ValueError("시맨틱 청킹에는 임베딩 모델이 필요합니다")
        # 문장 단위로 분리
        sentences = [s.strip() for s in text.split(". ") if s.strip()]
        if len(sentences) <= 1:
            return [{"text": text, "index": 0}]
        # 각 문장의 임베딩 계산
        embeddings = self.embedding_model.encode(sentences)
        # 인접 문장 간 유사도 계산
        similarities = []
        for i in range(len(embeddings) - 1):
            sim = np.dot(embeddings[i], embeddings[i + 1]) / (
                np.linalg.norm(embeddings[i])
                * np.linalg.norm(embeddings[i + 1])
            )
            similarities.append(sim)
        # 유사도가 임계값 아래인 지점에서 분할
        chunks = []
        current_chunk = sentences[0]
        for i, sim in enumerate(similarities):
            if sim < threshold and len(current_chunk) >= min_size:
                chunks.append(current_chunk)
                current_chunk = sentences[i + 1]
            else:
                current_chunk += ". " + sentences[i + 1]
        if current_chunk:
            chunks.append(current_chunk)
        return [
            {"text": c, "index": i}
            for i, c in enumerate(chunks)
        ]

    def structure_based_chunk(self, parsed_elements: list) -> list:
        """문서 구조 기반 청킹 - 레이아웃 분석 결과 활용"""
        chunks = []
        current_chunk = {
            "title": "",
            "content": "",
            "tables": [],
            "metadata": {}
        }
        for element in parsed_elements:
            elem_type = element.get("type", "")
            elem_text = element.get("text", "")
            if elem_type in ("Title", "TITLE"):
                # 새로운 섹션 시작
                if current_chunk["content"]:
                    chunks.append(current_chunk.copy())
                current_chunk = {
                    "title": elem_text,
                    "content": "",
                    "tables": [],
                    "metadata": element.get("metadata", {})
                }
            elif elem_type in ("Table", "TABLE"):
                current_chunk["tables"].append(elem_text)
            else:
                current_chunk["content"] += elem_text + "\n"
        if current_chunk["content"]:
            chunks.append(current_chunk)
        return chunks

실전 파이프라인 구축

지금까지 다룬 개별 기술들을 조합하여, 프로덕션 환경에서 사용할 수 있는 엔드투엔드 Document Parsing 파이프라인을 구축한다.

파이프라인 아키텍처

전체 파이프라인은 다음과 같은 단계로 구성된다.

  1. 입력 처리: 다양한 문서 형식 감지 및 정규화
  2. 파싱 전략 선택: 문서 유형에 따른 최적 파서 선택
  3. 텍스트/구조 추출: OCR, 레이아웃 분석, 테이블 추출
  4. LLM 보강: 멀티모달 LLM을 통한 품질 향상
  5. 청킹 및 인덱싱: RAG용 청킹 후 벡터 DB에 저장
import os
import json
import logging
from pathlib import Path
from typing import Optional
from dataclasses import dataclass, field

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class PipelineConfig:
    """파이프라인 설정"""
    ocr_engine: str = "paddleocr"       # tesseract, paddleocr, easyocr
    ocr_lang: str = "korean"
    layout_model: str = "unstructured"  # layoutlm, unstructured
    chunking_strategy: str = "recursive"  # fixed, recursive, semantic, structure
    chunk_size: int = 1000
    chunk_overlap: int = 200
    use_llm_correction: bool = True
    llm_model: str = "claude-sonnet-4-20250514"
    output_format: str = "json"         # json, markdown

@dataclass
class ProcessedDocument:
    """처리된 문서 결과"""
    source_path: str
    doc_type: str
    pages: list = field(default_factory=list)
    full_text: str = ""
    tables: list = field(default_factory=list)
    chunks: list = field(default_factory=list)
    metadata: dict = field(default_factory=dict)
    processing_log: list = field(default_factory=list)


class DocumentParsingPipeline:
    """프로덕션 Document Parsing 파이프라인"""

    def __init__(self, config: Optional[PipelineConfig] = None):
        self.config = config or PipelineConfig()
        self.chunker = AdvancedChunker()

    def process(self, file_path: str) -> ProcessedDocument:
        """문서를 엔드투엔드로 처리"""
        result = ProcessedDocument(source_path=file_path, doc_type="")
        logger.info(f"Processing: {file_path}")

        try:
            # 1. 문서 유형 감지
            doc_type = self._detect_type(file_path)
            result.doc_type = doc_type
            result.processing_log.append(
                f"Document type detected: {doc_type}"
            )

            # 2. 파싱 전략 선택 및 실행
            if doc_type == "native_pdf":
                raw_result = self._parse_native_pdf(file_path)
            elif doc_type in ("scanned_pdf", "image"):
                raw_result = self._parse_with_ocr(file_path)
            elif doc_type == "mixed_pdf":
                raw_result = self._parse_mixed_pdf(file_path)
            else:
                raw_result = self._parse_generic(file_path)

            result.full_text = raw_result.get("text", "")
            result.tables = raw_result.get("tables", [])
            result.pages = raw_result.get("pages", [])

            # 3. LLM 보정 (선택적)
            if self.config.use_llm_correction and result.full_text:
                result.full_text = self._llm_correct(result.full_text)
                result.processing_log.append("LLM correction applied")

            # 4. 청킹
            result.chunks = self._chunk_document(result)
            result.processing_log.append(
                f"Created {len(result.chunks)} chunks "
                f"with strategy: {self.config.chunking_strategy}"
            )

            # 5. 메타데이터 생성
            result.metadata = {
                "source": file_path,
                "doc_type": doc_type,
                "total_pages": len(result.pages),
                "total_tables": len(result.tables),
                "total_chunks": len(result.chunks),
                "text_length": len(result.full_text),
                "config": {
                    "ocr_engine": self.config.ocr_engine,
                    "chunking_strategy": self.config.chunking_strategy,
                    "chunk_size": self.config.chunk_size,
                }
            }
            logger.info(
                f"Processing complete: "
                f"{len(result.chunks)} chunks created"
            )

        except Exception as e:
            logger.error(f"Error processing {file_path}: {e}")
            result.processing_log.append(f"Error: {str(e)}")

        return result

    def _detect_type(self, file_path: str) -> str:
        """문서 유형 자동 감지"""
        ext = Path(file_path).suffix.lower()
        if ext in (".jpg", ".jpeg", ".png", ".tiff", ".bmp"):
            return "image"
        elif ext == ".pdf":
            import fitz
            doc = fitz.open(file_path)
            total_text = sum(len(page.get_text()) for page in doc)
            total_images = sum(len(page.get_images()) for page in doc)
            doc.close()
            if total_text < 100:
                return "scanned_pdf"
            elif total_images > len(doc) * 0.5:
                return "mixed_pdf"
            return "native_pdf"
        return "unknown"

    def _parse_native_pdf(self, file_path: str) -> dict:
        """네이티브 PDF 파싱"""
        parser = PyMuPDFParser(file_path)
        pages = parser.extract_text_with_layout()
        plumber = PdfPlumberParser(file_path)
        tables = plumber.extract_tables()
        text = plumber.extract_text_outside_tables()
        parser.close()
        plumber.close()
        return {"text": text, "tables": tables, "pages": pages}

    def _parse_with_ocr(self, file_path: str) -> dict:
        """OCR 기반 문서 파싱"""
        if self.config.ocr_engine == "paddleocr":
            processor = PaddleOCRProcessor(lang=self.config.ocr_lang)
            if file_path.lower().endswith(".pdf"):
                results = processor.process_pdf(file_path)
                text = "\n\n".join(r["full_text"] for r in results)
                return {"text": text, "pages": results, "tables": []}
            else:
                result = processor.process_image(file_path)
                return {
                    "text": result["full_text"],
                    "pages": [result],
                    "tables": []
                }
        else:
            ocr = TesseractOCR(lang="kor+eng")
            text = ocr.extract_text(file_path)
            return {"text": text, "pages": [], "tables": []}

    def _parse_mixed_pdf(self, file_path: str) -> dict:
        """혼합 PDF 파싱 - 네이티브 + OCR"""
        native_result = self._parse_native_pdf(file_path)
        ocr_result = self._parse_with_ocr(file_path)
        # 네이티브 텍스트가 있는 페이지는 네이티브, 없으면 OCR 사용
        combined_text = native_result["text"] or ocr_result["text"]
        return {
            "text": combined_text,
            "tables": native_result["tables"],
            "pages": native_result["pages"]
        }

    def _parse_generic(self, file_path: str) -> dict:
        """일반 문서 파싱 (Unstructured 활용)"""
        parser = UnstructuredParser(strategy="hi_res")
        elements = parser.parse_pdf(file_path)
        text = "\n\n".join(e["text"] for e in elements)
        return {"text": text, "pages": [], "tables": []}

    def _llm_correct(self, text: str) -> str:
        """LLM을 사용한 텍스트 보정"""
        if len(text) < 100:
            return text
        import anthropic
        client = anthropic.Anthropic()
        # 텍스트가 너무 길면 앞부분만 보정 예시
        sample = text[:3000] if len(text) > 3000 else text
        response = client.messages.create(
            model=self.config.llm_model,
            max_tokens=4096,
            messages=[
                {
                    "role": "user",
                    "content": (
                        "다음 OCR 추출 텍스트의 오류를 보정하세요. "
                        "원본의 의미와 구조를 유지하면서 오타, 깨진 문자, "
                        "줄바꿈 오류만 수정하세요.\n\n"
                        f"{sample}"
                    ),
                }
            ],
        )
        return response.content[0].text

    def _chunk_document(self, doc: ProcessedDocument) -> list:
        """문서 청킹"""
        strategy = self.config.chunking_strategy
        if strategy == "fixed":
            return self.chunker.fixed_size_chunk(
                doc.full_text,
                self.config.chunk_size,
                self.config.chunk_overlap
            )
        elif strategy == "recursive":
            return self.chunker.recursive_chunk(
                doc.full_text,
                self.config.chunk_size
            )
        elif strategy == "structure":
            if doc.pages:
                return self.chunker.structure_based_chunk(doc.pages)
            return self.chunker.recursive_chunk(
                doc.full_text, self.config.chunk_size
            )
        return self.chunker.recursive_chunk(
            doc.full_text, self.config.chunk_size
        )

    def save_results(
        self, result: ProcessedDocument, output_dir: str
    ):
        """처리 결과 저장"""
        os.makedirs(output_dir, exist_ok=True)
        base_name = Path(result.source_path).stem
        # 청크 저장
        chunks_path = os.path.join(output_dir, f"{base_name}_chunks.json")
        with open(chunks_path, "w", encoding="utf-8") as f:
            json.dump(result.chunks, f, ensure_ascii=False, indent=2)
        # 메타데이터 저장
        meta_path = os.path.join(output_dir, f"{base_name}_metadata.json")
        with open(meta_path, "w", encoding="utf-8") as f:
            json.dump(result.metadata, f, ensure_ascii=False, indent=2)
        # 전체 텍스트 저장
        text_path = os.path.join(output_dir, f"{base_name}_full.txt")
        with open(text_path, "w", encoding="utf-8") as f:
            f.write(result.full_text)
        logger.info(f"Results saved to {output_dir}")


# 사용 예시
if __name__ == "__main__":
    config = PipelineConfig(
        ocr_engine="paddleocr",
        chunking_strategy="recursive",
        chunk_size=1000,
        chunk_overlap=200,
        use_llm_correction=True
    )
    pipeline = DocumentParsingPipeline(config)

    # 단일 문서 처리
    result = pipeline.process("research_paper.pdf")
    print(f"총 {len(result.chunks)}개 청크 생성")
    print(f"테이블 {len(result.tables)}개 추출")

    # 결과 저장
    pipeline.save_results(result, "./output")

    # 벡터 DB에 청크 저장 (예시)
    # from chromadb import Client
    # client = Client()
    # collection = client.get_or_create_collection("documents")
    # for chunk in result.chunks:
    #     collection.add(
    #         documents=[chunk["text"]],
    #         metadatas=[result.metadata],
    #         ids=[f"{base_name}_{chunk['index']}"]
    #     )

배치 처리 및 모니터링

import time
from concurrent.futures import ThreadPoolExecutor, as_completed

class BatchProcessor:
    """대량 문서 배치 처리기"""

    def __init__(self, pipeline: DocumentParsingPipeline, max_workers: int = 4):
        self.pipeline = pipeline
        self.max_workers = max_workers

    def process_directory(self, input_dir: str, output_dir: str) -> dict:
        """디렉토리의 모든 문서를 배치 처리"""
        supported_ext = {".pdf", ".png", ".jpg", ".jpeg", ".tiff"}
        files = [
            str(f) for f in Path(input_dir).rglob("*")
            if f.suffix.lower() in supported_ext
        ]
        logger.info(f"Found {len(files)} documents to process")
        stats = {
            "total": len(files),
            "success": 0,
            "failed": 0,
            "total_chunks": 0,
            "processing_time": 0
        }
        start_time = time.time()
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = {
                executor.submit(
                    self._process_single, f, output_dir
                ): f for f in files
            }
            for future in as_completed(futures):
                file_path = futures[future]
                try:
                    result = future.result()
                    stats["success"] += 1
                    stats["total_chunks"] += len(result.chunks)
                    logger.info(f"Success: {file_path}")
                except Exception as e:
                    stats["failed"] += 1
                    logger.error(f"Failed: {file_path} - {e}")

        stats["processing_time"] = time.time() - start_time
        logger.info(
            f"Batch complete: {stats['success']}/{stats['total']} "
            f"in {stats['processing_time']:.1f}s"
        )
        return stats

    def _process_single(
        self, file_path: str, output_dir: str
    ) -> ProcessedDocument:
        """단일 문서 처리 및 저장"""
        result = self.pipeline.process(file_path)
        self.pipeline.save_results(result, output_dir)
        return result

마치며

Document Parsing은 AI/LLM 애플리케이션의 데이터 품질을 결정짓는 핵심 기반 기술이다. 이 글에서 다룬 내용을 정리하면 다음과 같다.

PDF 파싱: PyMuPDF는 범용 처리에, pdfplumber는 테이블 추출에 강점이 있다. 문서 유형(네이티브/스캔/혼합)에 따라 최적 도구를 선택해야 한다.

OCR: PaddleOCR은 아시아 언어에서 높은 정확도를, Tesseract는 범용성을 제공한다. 이미지 전처리(이진화, 기울기 보정 등)가 정확도에 큰 영향을 미친다.

레이아웃 분석: LayoutLMv3, Unstructured 같은 도구를 활용하면 문서의 논리적 구조를 자동으로 파악할 수 있다. 특히 Unstructured는 빠르게 프로토타이핑하기에 적합하다.

테이블 추출: Table Transformer, Camelot 등을 활용하되, 복잡한 테이블(병합 셀, 무선 테이블)에는 추가적인 후처리가 필요하다.

LLM 기반 문서 이해: GPT-4V, Claude 등의 멀티모달 LLM은 OCR 보정, 구조 분석, 정보 추출에서 획기적인 성능 향상을 가져왔다. OCR + LLM 하이브리드 접근이 현재 최선의 실무 전략이다.

청킹 전략: RAG 파이프라인에서 검색 품질은 청킹 전략에 직접적으로 의존한다. 문서의 논리적 구조를 반영하는 구조 기반 청킹이 가장 높은 검색 정확도를 제공한다.

Document Parsing 기술은 빠르게 발전하고 있으며, 특히 멀티모달 LLM의 등장은 기존의 파이프라인 패러다임을 근본적으로 바꾸고 있다. 하지만 비용과 지연 시간의 제약으로, 실무에서는 전통적인 도구와 LLM을 적절히 조합하는 하이브리드 접근이 가장 현실적인 선택이다.

참고자료

Document Parsing Technology Guide: PDF Parsing, OCR, Layout Analysis, and LLM-Based Extraction Pipeline

Document Parsing Technology Guide

Introduction

The vast majority of enterprise knowledge resides in unstructured documents: PDF reports, scanned contracts, research papers, invoices, and medical records. According to McKinsey, approximately 80% of enterprise data exists in such unstructured formats, and failing to leverage this data effectively means leaving most of your data assets untapped.

The quality of RAG (Retrieval-Augmented Generation) systems, knowledge search engines, and document automation systems ultimately depends on how accurately input documents are parsed. The principle of "Garbage in, garbage out" applies more than ever in the field of Document Parsing.

This article systematically covers the entire Document Parsing process with practical code: PDF parsing library comparisons, OCR engine selection criteria, layout analysis models, table extraction techniques, LLM-based multimodal document understanding, chunking strategies for RAG optimization, and production pipeline construction.

Overview of Document Parsing

Why Document Parsing Matters

Document Parsing is the technology of extracting structured information from unstructured documents. Beyond simple text extraction, the key is understanding the logical structure of documents (headings, body text, tables, figure captions, etc.) and organizing information into meaningful units.

Here are the major scenarios where Document Parsing is essential:

ScenarioDescriptionKey Technology
RAG PipelineSplit documents into chunks and store in vector DBChunking, Embeddings
Knowledge Base ConstructionExtract structured knowledge from internal documentsNER, Relation Extraction
Document AutomationExtract key fields from invoices and contractsTemplate Matching, Key-Value Extraction
Regulatory ComplianceAutomatically track changes in regulatory documentsChange Detection, Comparative Analysis
Research Paper AnalysisExtract methodology, results, and citations from papersSection Classification, Metadata Extraction

Document Parsing Pipeline Architecture

A typical Document Parsing pipeline consists of the following stages:

  1. Document Ingestion: Input documents in various formats (PDF, images, Word, HTML)
  2. Preprocessing: Image correction, noise removal, page separation
  3. Text Extraction: Native PDF text extraction or OCR
  4. Layout Analysis: Document structure recognition (headings, body, tables, figures)
  5. Structured Extraction: Table parsing, key-value pair extraction, NER
  6. Post-processing: Text cleaning, chunking, metadata attachment
  7. Storage/Indexing: Store in vector DB or search engine
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional

class DocumentType(Enum):
    NATIVE_PDF = "native_pdf"      # PDF with text layer
    SCANNED_PDF = "scanned_pdf"    # Scanned image PDF
    IMAGE = "image"                # JPG, PNG, etc.
    MIXED_PDF = "mixed_pdf"        # Native + scanned hybrid

@dataclass
class ParsedDocument:
    text: str
    pages: list = field(default_factory=list)
    tables: list = field(default_factory=list)
    images: list = field(default_factory=list)
    metadata: dict = field(default_factory=dict)
    doc_type: Optional[DocumentType] = None

    def get_chunks(self, strategy: str = "recursive", chunk_size: int = 1000):
        """Chunk document using specified strategy"""
        if strategy == "recursive":
            return self._recursive_chunk(chunk_size)
        elif strategy == "semantic":
            return self._semantic_chunk(chunk_size)
        elif strategy == "structure":
            return self._structure_based_chunk()
        return []

    def _recursive_chunk(self, chunk_size: int):
        separators = ["\n\n", "\n", ". ", " "]
        return self._split_text(self.text, separators, chunk_size)

    def _split_text(self, text: str, separators: list, chunk_size: int):
        chunks = []
        if len(text) <= chunk_size:
            return [text]
        sep = separators[0] if separators else " "
        parts = text.split(sep)
        current = ""
        for part in parts:
            if len(current) + len(part) + len(sep) > chunk_size:
                if current:
                    chunks.append(current.strip())
                current = part
            else:
                current = current + sep + part if current else part
        if current:
            chunks.append(current.strip())
        return chunks

    def _semantic_chunk(self, chunk_size: int):
        # Semantic chunking implementation (embedding-based)
        return self._recursive_chunk(chunk_size)

    def _structure_based_chunk(self):
        # Document structure-based chunking
        return [page.get("text", "") for page in self.pages if page.get("text")]

PDF Parsing Technologies and Tools

PDF is the most widely used document format, yet it is also the most challenging format to parse. Since PDF is fundamentally a "print layout" format, the logical order of text is not guaranteed in the file structure.

Comparison of Major PDF Parsing Libraries

LibraryStrengthsWeaknessesBest For
PyMuPDF (fitz)Fast speed, rich features, image extractionLicense (AGPL)General PDF processing
pdfplumberAccurate table extraction, coordinate-based accessSlower speedTable-heavy documents
PyPDF2Pure Python, easy installationInaccurate with complex PDFsSimple text extraction
CamelotDedicated table extractionCannot process full PDFsWhen only tables are needed
pdfminer.sixDetailed layout informationComplex APIWhen layout analysis is needed

PDF Parsing with PyMuPDF

import fitz  # PyMuPDF

class PyMuPDFParser:
    """PyMuPDF-based PDF parser"""

    def __init__(self, pdf_path: str):
        self.doc = fitz.open(pdf_path)
        self.pages = []

    def extract_text_with_layout(self) -> list:
        """Extract text per page with layout information"""
        results = []
        for page_num, page in enumerate(self.doc):
            blocks = page.get_text("dict")["blocks"]
            page_data = {
                "page_num": page_num + 1,
                "width": page.rect.width,
                "height": page.rect.height,
                "blocks": []
            }
            for block in blocks:
                if block["type"] == 0:  # Text block
                    text_content = ""
                    for line in block["lines"]:
                        line_text = ""
                        for span in line["spans"]:
                            line_text += span["text"]
                        text_content += line_text + "\n"
                    page_data["blocks"].append({
                        "type": "text",
                        "bbox": block["bbox"],
                        "text": text_content.strip(),
                        "font_size": block["lines"][0]["spans"][0]["size"]
                        if block["lines"] and block["lines"][0]["spans"] else 0
                    })
                elif block["type"] == 1:  # Image block
                    page_data["blocks"].append({
                        "type": "image",
                        "bbox": block["bbox"],
                        "image_data": block.get("image", None)
                    })
            results.append(page_data)
        return results

    def extract_images(self, output_dir: str) -> list:
        """Extract all images from the PDF"""
        import os
        os.makedirs(output_dir, exist_ok=True)
        image_paths = []
        for page_num, page in enumerate(self.doc):
            images = page.get_images(full=True)
            for img_idx, img in enumerate(images):
                xref = img[0]
                pix = fitz.Pixmap(self.doc, xref)
                if pix.n < 5:  # GRAY or RGB
                    img_path = os.path.join(
                        output_dir,
                        f"page_{page_num + 1}_img_{img_idx + 1}.png"
                    )
                    pix.save(img_path)
                    image_paths.append(img_path)
                pix = None
        return image_paths

    def detect_document_type(self) -> DocumentType:
        """Determine whether PDF is native or scanned"""
        total_text_len = 0
        total_images = 0
        for page in self.doc:
            total_text_len += len(page.get_text())
            total_images += len(page.get_images())
        if total_text_len < 100 and total_images > 0:
            return DocumentType.SCANNED_PDF
        elif total_text_len > 100 and total_images > len(self.doc) * 0.5:
            return DocumentType.MIXED_PDF
        return DocumentType.NATIVE_PDF

    def close(self):
        self.doc.close()

Precision Parsing with pdfplumber

pdfplumber excels particularly in table extraction and provides precise coordinate information for each character.

import pdfplumber

class PdfPlumberParser:
    """pdfplumber-based precision PDF parser"""

    def __init__(self, pdf_path: str):
        self.pdf = pdfplumber.open(pdf_path)

    def extract_tables(self) -> list:
        """Extract tables from all pages"""
        all_tables = []
        for page_num, page in enumerate(self.pdf.pages):
            tables = page.extract_tables(
                table_settings={
                    "vertical_strategy": "lines",
                    "horizontal_strategy": "lines",
                    "snap_tolerance": 3,
                    "join_tolerance": 3,
                    "edge_min_length": 3,
                    "min_words_vertical": 3,
                    "min_words_horizontal": 1,
                }
            )
            for table_idx, table in enumerate(tables):
                if table and len(table) > 1:
                    headers = table[0]
                    rows = table[1:]
                    all_tables.append({
                        "page": page_num + 1,
                        "table_index": table_idx,
                        "headers": headers,
                        "rows": rows,
                        "num_rows": len(rows),
                        "num_cols": len(headers) if headers else 0
                    })
        return all_tables

    def extract_text_outside_tables(self) -> str:
        """Extract only text outside table regions"""
        full_text = []
        for page in self.pdf.pages:
            # Collect table bounding boxes
            table_bboxes = []
            tables = page.find_tables()
            for table in tables:
                table_bboxes.append(table.bbox)
            # Crop and exclude table regions
            filtered_page = page
            for bbox in table_bboxes:
                filtered_page = filtered_page.outside_bbox(bbox)
            text = filtered_page.extract_text()
            if text:
                full_text.append(text)
        return "\n\n".join(full_text)

    def close(self):
        self.pdf.close()

OCR-Based Document Recognition

OCR (Optical Character Recognition) is essential for processing scanned documents and image-based PDFs. OCR technology is rapidly evolving from traditional rule-based approaches to deep learning-based methods.

Comparison of Major OCR Engines

EngineLanguagesAccuracySpeedFeatures
Tesseract 5100+Medium-HighMediumOpen source, most widely used
EasyOCR80+Medium-HighSlowPyTorch-based, easy installation
PaddleOCR80+HighFastDeveloped by Baidu, high accuracy
Google Vision API100+HighestFastCloud service, paid
Azure Document Intelligence100+HighestFastEnterprise, structured extraction support

Using Tesseract OCR

import pytesseract
from PIL import Image
import cv2
import numpy as np

class TesseractOCR:
    """Tesseract-based OCR processor"""

    def __init__(self, lang: str = "eng"):
        self.lang = lang
        self.config = "--oem 3 --psm 6"  # LSTM engine + uniform text block

    def preprocess_image(self, image_path: str) -> np.ndarray:
        """Image preprocessing to improve OCR accuracy"""
        img = cv2.imread(image_path)
        # Convert to grayscale
        gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        # Noise removal
        denoised = cv2.fastNlMeansDenoising(gray, h=10)
        # Binarization (Otsu's method)
        _, binary = cv2.threshold(
            denoised, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU
        )
        # Deskewing
        coords = np.column_stack(np.where(binary > 0))
        if len(coords) > 0:
            angle = cv2.minAreaRect(coords)[-1]
            if angle < -45:
                angle = -(90 + angle)
            else:
                angle = -angle
            if abs(angle) > 0.5:
                h, w = binary.shape
                center = (w // 2, h // 2)
                matrix = cv2.getRotationMatrix2D(center, angle, 1.0)
                binary = cv2.warpAffine(
                    binary, matrix, (w, h),
                    flags=cv2.INTER_CUBIC,
                    borderMode=cv2.BORDER_REPLICATE
                )
        return binary

    def extract_text(self, image_path: str, preprocess: bool = True) -> str:
        """Extract text from image"""
        if preprocess:
            img = self.preprocess_image(image_path)
        else:
            img = Image.open(image_path)
        text = pytesseract.image_to_string(
            img, lang=self.lang, config=self.config
        )
        return text.strip()

    def extract_with_boxes(self, image_path: str) -> list:
        """Extract text with bounding boxes"""
        img = self.preprocess_image(image_path)
        data = pytesseract.image_to_data(
            img, lang=self.lang, config=self.config,
            output_type=pytesseract.Output.DICT
        )
        results = []
        for i in range(len(data["text"])):
            if data["text"][i].strip():
                results.append({
                    "text": data["text"][i],
                    "confidence": data["conf"][i],
                    "bbox": {
                        "x": data["left"][i],
                        "y": data["top"][i],
                        "w": data["width"][i],
                        "h": data["height"][i]
                    },
                    "block_num": data["block_num"][i],
                    "line_num": data["line_num"][i]
                })
        return results

High-Accuracy OCR with PaddleOCR

PaddleOCR demonstrates particularly high accuracy with Asian languages (Korean, Japanese, Chinese).

from paddleocr import PaddleOCR

class PaddleOCRProcessor:
    """PaddleOCR-based high-accuracy OCR"""

    def __init__(self, lang: str = "en"):
        self.ocr = PaddleOCR(
            use_angle_cls=True,  # Text direction detection
            lang=lang,
            use_gpu=True,
            det_db_thresh=0.3,
            det_db_box_thresh=0.5,
            rec_batch_num=16
        )

    def process_image(self, image_path: str) -> dict:
        """Extract text and layout information from image"""
        result = self.ocr.ocr(image_path, cls=True)
        extracted = {
            "lines": [],
            "full_text": "",
            "confidence_avg": 0.0
        }
        if not result or not result[0]:
            return extracted

        total_conf = 0
        lines = []
        for line in result[0]:
            bbox = line[0]  # 4-point coordinates
            text = line[1][0]
            confidence = line[1][1]
            lines.append({
                "text": text,
                "confidence": confidence,
                "bbox": bbox,
                "y_center": (bbox[0][1] + bbox[2][1]) / 2
            })
            total_conf += confidence

        # Sort by y-coordinate (reading order)
        lines.sort(key=lambda x: (x["y_center"], x["bbox"][0][0]))
        extracted["lines"] = lines
        extracted["full_text"] = "\n".join(l["text"] for l in lines)
        extracted["confidence_avg"] = (
            total_conf / len(lines) if lines else 0
        )
        return extracted

    def process_pdf(self, pdf_path: str) -> list:
        """OCR process all pages of a PDF"""
        import fitz
        doc = fitz.open(pdf_path)
        results = []
        for page_num, page in enumerate(doc):
            # Convert page to high-resolution image
            mat = fitz.Matrix(2.0, 2.0)  # 2x scale
            pix = page.get_pixmap(matrix=mat)
            img_path = f"/tmp/page_{page_num}.png"
            pix.save(img_path)
            # Run OCR
            ocr_result = self.process_image(img_path)
            ocr_result["page_num"] = page_num + 1
            results.append(ocr_result)
        doc.close()
        return results

Layout Analysis and Structure Extraction

Layout analysis is the process of identifying regions such as text blocks, headings, tables, figures, and captions in documents, and determining the logical reading order. Recently, transformer-based models have been leading this field.

Major Layout Analysis Models

ModelDeveloperCore TechnologyFeatures
LayoutLMv3MicrosoftMultimodal TransformerUnified text+image+layout
DiT (Document Image Transformer)MicrosoftVision TransformerImage-based document understanding
DonutNAVER CLOVAOCR-free approachDirect document understanding without OCR
Table TransformerMicrosoftDETR-basedSpecialized in table detection/structure recognition
UnstructuredOpen SourceHybridMulti-model combination pipeline

Document Structure Analysis with LayoutLMv3

from transformers import (
    LayoutLMv3ForTokenClassification,
    LayoutLMv3Processor,
)
from PIL import Image
import torch

class LayoutAnalyzer:
    """LayoutLMv3-based document layout analyzer"""

    LABEL_MAP = {
        0: "O",
        1: "B-TITLE",
        2: "I-TITLE",
        3: "B-TEXT",
        4: "I-TEXT",
        5: "B-TABLE",
        6: "I-TABLE",
        7: "B-FIGURE",
        8: "I-FIGURE",
        9: "B-LIST",
        10: "I-LIST",
        11: "B-HEADER",
        12: "I-HEADER",
        13: "B-FOOTER",
        14: "I-FOOTER",
    }

    def __init__(self, model_name: str = "microsoft/layoutlmv3-base"):
        self.processor = LayoutLMv3Processor.from_pretrained(
            model_name, apply_ocr=True
        )
        self.model = LayoutLMv3ForTokenClassification.from_pretrained(
            model_name, num_labels=len(self.LABEL_MAP)
        )
        self.model.eval()

    def analyze(self, image_path: str) -> list:
        """Analyze document image layout"""
        image = Image.open(image_path).convert("RGB")
        encoding = self.processor(
            image,
            return_tensors="pt",
            truncation=True,
            max_length=512
        )
        with torch.no_grad():
            outputs = self.model(**encoding)
        predictions = outputs.logits.argmax(-1).squeeze().tolist()
        tokens = self.processor.tokenizer.convert_ids_to_tokens(
            encoding["input_ids"].squeeze()
        )
        # Map predictions to tokens
        elements = []
        current_label = None
        current_text = ""
        for token, pred in zip(tokens, predictions):
            label = self.LABEL_MAP.get(pred, "O")
            if label.startswith("B-"):
                if current_text and current_label:
                    elements.append({
                        "type": current_label,
                        "text": current_text.strip()
                    })
                current_label = label[2:]
                current_text = token.replace("##", "")
            elif label.startswith("I-") and current_label:
                current_text += token.replace("##", "")
            else:
                if current_text and current_label:
                    elements.append({
                        "type": current_label,
                        "text": current_text.strip()
                    })
                current_label = None
                current_text = ""
        if current_text and current_label:
            elements.append({
                "type": current_label,
                "text": current_text.strip()
            })
        return elements

Unified Parsing with Unstructured

Unstructured is an open-source library that supports various document formats and provides an integrated pipeline combining multiple parsing engines.

from unstructured.partition.pdf import partition_pdf
from unstructured.partition.auto import partition
from unstructured.chunking.title import chunk_by_title

class UnstructuredParser:
    """Unstructured-based unified document parser"""

    def __init__(self, strategy: str = "hi_res"):
        self.strategy = strategy  # "fast", "ocr_only", "hi_res"

    def parse_pdf(self, pdf_path: str) -> list:
        """Parse PDF into structured elements"""
        elements = partition_pdf(
            filename=pdf_path,
            strategy=self.strategy,
            infer_table_structure=True,
            languages=["eng"],
            extract_images_in_pdf=True,
            extract_image_block_output_dir="./extracted_images"
        )
        parsed = []
        for element in elements:
            parsed.append({
                "type": type(element).__name__,
                "text": str(element),
                "metadata": {
                    "page_number": element.metadata.page_number,
                    "coordinates": (
                        element.metadata.coordinates
                        if hasattr(element.metadata, "coordinates")
                        else None
                    ),
                    "parent_id": element.metadata.parent_id,
                }
            })
        return parsed

    def parse_and_chunk(
        self, file_path: str, max_characters: int = 1000
    ) -> list:
        """Parse and chunk by title in one step"""
        elements = partition(
            filename=file_path,
            strategy=self.strategy
        )
        chunks = chunk_by_title(
            elements,
            max_characters=max_characters,
            combine_text_under_n_chars=200,
            new_after_n_chars=800
        )
        return [
            {
                "text": str(chunk),
                "type": type(chunk).__name__,
                "metadata": chunk.metadata.to_dict()
            }
            for chunk in chunks
        ]

Table Extraction Techniques

Accurately extracting tables from documents is one of the most challenging tasks in Document Parsing. Tables can have complex cell merging, nested structures, and diverse styles.

Key Challenges in Table Extraction

  • Table Detection: Accurately identifying table regions in documents
  • Structure Recognition: Recognizing row/column structure, merged cells, and header rows
  • Cell Content Extraction: Accurately extracting text from each cell
  • Borderless Tables: Recognizing the structure of tables without visible lines

Table Detection with Table Transformer

from transformers import (
    TableTransformerForObjectDetection,
    AutoImageProcessor,
)
from PIL import Image
import torch

class TableExtractor:
    """Table Transformer-based table extractor"""

    def __init__(self):
        self.processor = AutoImageProcessor.from_pretrained(
            "microsoft/table-transformer-detection"
        )
        self.detection_model = TableTransformerForObjectDetection.from_pretrained(
            "microsoft/table-transformer-detection"
        )
        self.structure_processor = AutoImageProcessor.from_pretrained(
            "microsoft/table-transformer-structure-recognition"
        )
        self.structure_model = TableTransformerForObjectDetection.from_pretrained(
            "microsoft/table-transformer-structure-recognition"
        )

    def detect_tables(self, image_path: str, threshold: float = 0.7) -> list:
        """Detect table regions in an image"""
        image = Image.open(image_path).convert("RGB")
        inputs = self.processor(images=image, return_tensors="pt")
        with torch.no_grad():
            outputs = self.detection_model(**inputs)
        target_sizes = torch.tensor([image.size[::-1]])
        results = self.processor.post_process_object_detection(
            outputs, threshold=threshold, target_sizes=target_sizes
        )[0]
        tables = []
        for score, label, box in zip(
            results["scores"], results["labels"], results["boxes"]
        ):
            tables.append({
                "score": score.item(),
                "label": self.detection_model.config.id2label[label.item()],
                "bbox": box.tolist()  # [x1, y1, x2, y2]
            })
        return tables

    def recognize_structure(
        self, image_path: str, table_bbox: list
    ) -> dict:
        """Recognize internal structure of detected table"""
        image = Image.open(image_path).convert("RGB")
        # Crop table region
        table_image = image.crop(table_bbox)
        inputs = self.structure_processor(
            images=table_image, return_tensors="pt"
        )
        with torch.no_grad():
            outputs = self.structure_model(**inputs)
        target_sizes = torch.tensor([table_image.size[::-1]])
        results = self.structure_processor.post_process_object_detection(
            outputs, threshold=0.5, target_sizes=target_sizes
        )[0]
        structure = {"rows": [], "columns": [], "cells": []}
        for score, label, box in zip(
            results["scores"], results["labels"], results["boxes"]
        ):
            label_name = self.structure_model.config.id2label[label.item()]
            entry = {"bbox": box.tolist(), "score": score.item()}
            if "row" in label_name:
                structure["rows"].append(entry)
            elif "column" in label_name:
                structure["columns"].append(entry)
            else:
                structure["cells"].append(entry)
        # Sort by coordinates
        structure["rows"].sort(key=lambda x: x["bbox"][1])
        structure["columns"].sort(key=lambda x: x["bbox"][0])
        return structure

Simple Table Extraction with Camelot

import camelot
import pandas as pd

def extract_tables_with_camelot(
    pdf_path: str, pages: str = "all", flavor: str = "lattice"
) -> list:
    """Extract tables from PDF using Camelot

    Args:
        pdf_path: Path to PDF file
        pages: Pages to extract ("all" or "1,2,3")
        flavor: "lattice" (line-based) or "stream" (whitespace-based)
    """
    tables = camelot.read_pdf(
        pdf_path,
        pages=pages,
        flavor=flavor,
        strip_text="\n"
    )
    results = []
    for i, table in enumerate(tables):
        df = table.df
        results.append({
            "table_index": i,
            "page": table.page,
            "accuracy": table.accuracy,
            "data": df.to_dict(orient="records"),
            "shape": df.shape,
            "dataframe": df
        })
    return results


# Usage example
if __name__ == "__main__":
    tables = extract_tables_with_camelot(
        "financial_report.pdf",
        pages="1-5",
        flavor="lattice"
    )
    for t in tables:
        print(f"Table {t['table_index']} (page {t['page']})")
        print(f"  Accuracy: {t['accuracy']:.1f}%")
        print(f"  Shape: {t['shape']}")
        print(t["dataframe"].head())

LLM-Based Document Understanding

The emergence of multimodal LLMs like GPT-4V and Claude 3.5 is fundamentally transforming document understanding approaches. Instead of traditional OCR + post-processing pipelines, document images can be directly fed to LLMs for content understanding and structuring.

Document Processing with Multimodal LLMs

import anthropic
import base64
from pathlib import Path

class LLMDocumentProcessor:
    """LLM-based multimodal document processor"""

    def __init__(self, model: str = "claude-sonnet-4-20250514"):
        self.client = anthropic.Anthropic()
        self.model = model

    def _encode_image(self, image_path: str) -> tuple:
        """Encode image to base64"""
        path = Path(image_path)
        suffix = path.suffix.lower()
        media_type_map = {
            ".png": "image/png",
            ".jpg": "image/jpeg",
            ".jpeg": "image/jpeg",
            ".gif": "image/gif",
            ".webp": "image/webp",
        }
        media_type = media_type_map.get(suffix, "image/png")
        with open(image_path, "rb") as f:
            data = base64.standard_b64encode(f.read()).decode("utf-8")
        return data, media_type

    def extract_structured_data(
        self, image_path: str, schema_description: str
    ) -> str:
        """Extract structured data from document image"""
        data, media_type = self._encode_image(image_path)
        prompt = f"""Analyze this document image and extract structured data as JSON matching the following schema.

Schema:
{schema_description}

Instructions:
- Extract all text accurately
- Preserve row/column structure for tables
- Mark uncertain content as null
- Respond only in JSON format"""

        response = self.client.messages.create(
            model=self.model,
            max_tokens=4096,
            messages=[
                {
                    "role": "user",
                    "content": [
                        {
                            "type": "image",
                            "source": {
                                "type": "base64",
                                "media_type": media_type,
                                "data": data,
                            },
                        },
                        {
                            "type": "text",
                            "text": prompt,
                        },
                    ],
                }
            ],
        )
        return response.content[0].text

    def analyze_document_layout(self, image_path: str) -> str:
        """Analyze document layout and extract structure"""
        data, media_type = self._encode_image(image_path)
        prompt = """Analyze the layout of this document and return the following information as JSON:

1. Document type (paper, report, invoice, contract, etc.)
2. Section structure (titles and hierarchy)
3. Table presence and location description
4. Figure/chart presence and description
5. Key-value pairs (if applicable)
6. Reading order of the full text

Respond only in JSON format."""

        response = self.client.messages.create(
            model=self.model,
            max_tokens=4096,
            messages=[
                {
                    "role": "user",
                    "content": [
                        {
                            "type": "image",
                            "source": {
                                "type": "base64",
                                "media_type": media_type,
                                "data": data,
                            },
                        },
                        {"type": "text", "text": prompt},
                    ],
                }
            ],
        )
        return response.content[0].text

    def compare_documents(
        self, image_path_1: str, image_path_2: str
    ) -> str:
        """Compare and analyze two document images"""
        data1, mt1 = self._encode_image(image_path_1)
        data2, mt2 = self._encode_image(image_path_2)
        prompt = """Compare these two documents and analyze the following:
1. Similarities
2. Differences
3. Added content
4. Removed content
5. Modified content

Respond in structured JSON format."""

        response = self.client.messages.create(
            model=self.model,
            max_tokens=4096,
            messages=[
                {
                    "role": "user",
                    "content": [
                        {
                            "type": "image",
                            "source": {
                                "type": "base64",
                                "media_type": mt1,
                                "data": data1,
                            },
                        },
                        {
                            "type": "image",
                            "source": {
                                "type": "base64",
                                "media_type": mt2,
                                "data": data2,
                            },
                        },
                        {"type": "text", "text": prompt},
                    ],
                }
            ],
        )
        return response.content[0].text

Hybrid Approach: OCR + LLM Correction

A hybrid approach that first extracts text via OCR and then uses an LLM to correct errors and organize structure delivers excellent results in practice.

class HybridDocumentProcessor:
    """OCR + LLM hybrid document processor"""

    def __init__(self):
        self.ocr = PaddleOCRProcessor(lang="en")
        self.llm = LLMDocumentProcessor()

    def process(self, image_path: str) -> dict:
        """Process document using hybrid approach"""
        # Step 1: Extract text with OCR
        ocr_result = self.ocr.process_image(image_path)
        raw_text = ocr_result["full_text"]
        confidence = ocr_result["confidence_avg"]

        # Step 2: Correct and structure with LLM
        correction_prompt = (
            "Review and correct the following OCR-extracted text. "
            "Fix low-confidence portions based on context, "
            "and organize the logical structure (headings, body, lists) "
            "into Markdown format.\n\n"
            f"OCR extracted text (average confidence: {confidence:.2f}):\n"
            "---\n"
            f"{raw_text}\n"
            "---\n\n"
            "Return the corrected Markdown."
        )

        import anthropic
        client = anthropic.Anthropic()
        response = client.messages.create(
            model="claude-sonnet-4-20250514",
            max_tokens=4096,
            messages=[{"role": "user", "content": correction_prompt}],
        )
        corrected_text = response.content[0].text

        return {
            "raw_ocr": raw_text,
            "ocr_confidence": confidence,
            "corrected_text": corrected_text,
            "method": "hybrid_ocr_llm"
        }

Document Chunking Strategies for RAG

When the ultimate goal of Document Parsing is a RAG pipeline, the chunking strategy is the key factor that determines retrieval quality. Improper chunking significantly degrades retrieval accuracy and causes hallucinations due to context loss.

Chunking Strategy Comparison

StrategyDescriptionProsCons
Fixed-sizeSplit by fixed token/character countSimple, uniform sizeContext disruption
RecursiveSplit by separator priorityPreserves structure, flexibleUneven sizes
SemanticSplit based on embedding similarityPreserves meaning unitsHigh compute cost
Structure-basedSplit by headings/sectionsMaintains logical structureRequires structure recognition
Sliding WindowSplit with overlapContext continuityIncreased storage

Advanced Chunking Implementation

from typing import Optional
import numpy as np

class AdvancedChunker:
    """Advanced chunker supporting multiple chunking strategies"""

    def __init__(self, embedding_model=None):
        self.embedding_model = embedding_model

    def fixed_size_chunk(
        self, text: str, chunk_size: int = 1000, overlap: int = 200
    ) -> list:
        """Fixed-size + overlap chunking"""
        chunks = []
        start = 0
        while start < len(text):
            end = start + chunk_size
            chunk = text[start:end]
            # Cut at sentence boundaries
            if end < len(text):
                last_period = chunk.rfind(". ")
                last_newline = chunk.rfind("\n")
                cut_point = max(last_period, last_newline)
                if cut_point > chunk_size * 0.5:
                    chunk = chunk[:cut_point + 1]
                    end = start + cut_point + 1
            chunks.append({
                "text": chunk.strip(),
                "start": start,
                "end": end,
                "index": len(chunks)
            })
            start = end - overlap
        return chunks

    def recursive_chunk(
        self,
        text: str,
        chunk_size: int = 1000,
        separators: Optional[list] = None,
    ) -> list:
        """Recursive splitting chunking"""
        if separators is None:
            separators = ["\n\n\n", "\n\n", "\n", ". ", ", ", " "]
        chunks = []
        self._recursive_split(text, separators, chunk_size, chunks)
        return [
            {"text": c, "index": i}
            for i, c in enumerate(chunks)
            if c.strip()
        ]

    def _recursive_split(
        self, text: str, separators: list, chunk_size: int, result: list
    ):
        if len(text) <= chunk_size:
            result.append(text)
            return
        sep = separators[0] if separators else " "
        remaining_seps = separators[1:] if len(separators) > 1 else []
        parts = text.split(sep)
        current = ""
        for part in parts:
            test = current + sep + part if current else part
            if len(test) > chunk_size:
                if current:
                    if len(current) > chunk_size and remaining_seps:
                        self._recursive_split(
                            current, remaining_seps, chunk_size, result
                        )
                    else:
                        result.append(current)
                current = part
            else:
                current = test
        if current:
            if len(current) > chunk_size and remaining_seps:
                self._recursive_split(
                    current, remaining_seps, chunk_size, result
                )
            else:
                result.append(current)

    def semantic_chunk(
        self, text: str, threshold: float = 0.5, min_size: int = 100
    ) -> list:
        """Semantic chunking - embedding similarity based"""
        if not self.embedding_model:
            raise ValueError("Embedding model is required for semantic chunking")
        # Split into sentences
        sentences = [s.strip() for s in text.split(". ") if s.strip()]
        if len(sentences) <= 1:
            return [{"text": text, "index": 0}]
        # Compute embeddings for each sentence
        embeddings = self.embedding_model.encode(sentences)
        # Calculate similarity between adjacent sentences
        similarities = []
        for i in range(len(embeddings) - 1):
            sim = np.dot(embeddings[i], embeddings[i + 1]) / (
                np.linalg.norm(embeddings[i])
                * np.linalg.norm(embeddings[i + 1])
            )
            similarities.append(sim)
        # Split at points where similarity falls below threshold
        chunks = []
        current_chunk = sentences[0]
        for i, sim in enumerate(similarities):
            if sim < threshold and len(current_chunk) >= min_size:
                chunks.append(current_chunk)
                current_chunk = sentences[i + 1]
            else:
                current_chunk += ". " + sentences[i + 1]
        if current_chunk:
            chunks.append(current_chunk)
        return [
            {"text": c, "index": i}
            for i, c in enumerate(chunks)
        ]

    def structure_based_chunk(self, parsed_elements: list) -> list:
        """Structure-based chunking using layout analysis results"""
        chunks = []
        current_chunk = {
            "title": "",
            "content": "",
            "tables": [],
            "metadata": {}
        }
        for element in parsed_elements:
            elem_type = element.get("type", "")
            elem_text = element.get("text", "")
            if elem_type in ("Title", "TITLE"):
                # Start new section
                if current_chunk["content"]:
                    chunks.append(current_chunk.copy())
                current_chunk = {
                    "title": elem_text,
                    "content": "",
                    "tables": [],
                    "metadata": element.get("metadata", {})
                }
            elif elem_type in ("Table", "TABLE"):
                current_chunk["tables"].append(elem_text)
            else:
                current_chunk["content"] += elem_text + "\n"
        if current_chunk["content"]:
            chunks.append(current_chunk)
        return chunks

Building a Production Pipeline

Now let's combine the individual technologies covered so far to build an end-to-end Document Parsing pipeline suitable for production environments.

Pipeline Architecture

The overall pipeline consists of the following stages:

  1. Input Processing: Detect and normalize various document formats
  2. Parsing Strategy Selection: Choose optimal parser based on document type
  3. Text/Structure Extraction: OCR, layout analysis, table extraction
  4. LLM Enhancement: Quality improvement through multimodal LLM
  5. Chunking and Indexing: Chunk for RAG and store in vector DB
import os
import json
import logging
from pathlib import Path
from typing import Optional
from dataclasses import dataclass, field

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class PipelineConfig:
    """Pipeline configuration"""
    ocr_engine: str = "paddleocr"       # tesseract, paddleocr, easyocr
    ocr_lang: str = "en"
    layout_model: str = "unstructured"  # layoutlm, unstructured
    chunking_strategy: str = "recursive"  # fixed, recursive, semantic, structure
    chunk_size: int = 1000
    chunk_overlap: int = 200
    use_llm_correction: bool = True
    llm_model: str = "claude-sonnet-4-20250514"
    output_format: str = "json"         # json, markdown

@dataclass
class ProcessedDocument:
    """Processed document result"""
    source_path: str
    doc_type: str
    pages: list = field(default_factory=list)
    full_text: str = ""
    tables: list = field(default_factory=list)
    chunks: list = field(default_factory=list)
    metadata: dict = field(default_factory=dict)
    processing_log: list = field(default_factory=list)


class DocumentParsingPipeline:
    """Production Document Parsing Pipeline"""

    def __init__(self, config: Optional[PipelineConfig] = None):
        self.config = config or PipelineConfig()
        self.chunker = AdvancedChunker()

    def process(self, file_path: str) -> ProcessedDocument:
        """End-to-end document processing"""
        result = ProcessedDocument(source_path=file_path, doc_type="")
        logger.info(f"Processing: {file_path}")

        try:
            # 1. Detect document type
            doc_type = self._detect_type(file_path)
            result.doc_type = doc_type
            result.processing_log.append(
                f"Document type detected: {doc_type}"
            )

            # 2. Select and execute parsing strategy
            if doc_type == "native_pdf":
                raw_result = self._parse_native_pdf(file_path)
            elif doc_type in ("scanned_pdf", "image"):
                raw_result = self._parse_with_ocr(file_path)
            elif doc_type == "mixed_pdf":
                raw_result = self._parse_mixed_pdf(file_path)
            else:
                raw_result = self._parse_generic(file_path)

            result.full_text = raw_result.get("text", "")
            result.tables = raw_result.get("tables", [])
            result.pages = raw_result.get("pages", [])

            # 3. LLM correction (optional)
            if self.config.use_llm_correction and result.full_text:
                result.full_text = self._llm_correct(result.full_text)
                result.processing_log.append("LLM correction applied")

            # 4. Chunking
            result.chunks = self._chunk_document(result)
            result.processing_log.append(
                f"Created {len(result.chunks)} chunks "
                f"with strategy: {self.config.chunking_strategy}"
            )

            # 5. Generate metadata
            result.metadata = {
                "source": file_path,
                "doc_type": doc_type,
                "total_pages": len(result.pages),
                "total_tables": len(result.tables),
                "total_chunks": len(result.chunks),
                "text_length": len(result.full_text),
                "config": {
                    "ocr_engine": self.config.ocr_engine,
                    "chunking_strategy": self.config.chunking_strategy,
                    "chunk_size": self.config.chunk_size,
                }
            }
            logger.info(
                f"Processing complete: "
                f"{len(result.chunks)} chunks created"
            )

        except Exception as e:
            logger.error(f"Error processing {file_path}: {e}")
            result.processing_log.append(f"Error: {str(e)}")

        return result

    def _detect_type(self, file_path: str) -> str:
        """Auto-detect document type"""
        ext = Path(file_path).suffix.lower()
        if ext in (".jpg", ".jpeg", ".png", ".tiff", ".bmp"):
            return "image"
        elif ext == ".pdf":
            import fitz
            doc = fitz.open(file_path)
            total_text = sum(len(page.get_text()) for page in doc)
            total_images = sum(len(page.get_images()) for page in doc)
            doc.close()
            if total_text < 100:
                return "scanned_pdf"
            elif total_images > len(doc) * 0.5:
                return "mixed_pdf"
            return "native_pdf"
        return "unknown"

    def _parse_native_pdf(self, file_path: str) -> dict:
        """Parse native PDF"""
        parser = PyMuPDFParser(file_path)
        pages = parser.extract_text_with_layout()
        plumber = PdfPlumberParser(file_path)
        tables = plumber.extract_tables()
        text = plumber.extract_text_outside_tables()
        parser.close()
        plumber.close()
        return {"text": text, "tables": tables, "pages": pages}

    def _parse_with_ocr(self, file_path: str) -> dict:
        """OCR-based document parsing"""
        if self.config.ocr_engine == "paddleocr":
            processor = PaddleOCRProcessor(lang=self.config.ocr_lang)
            if file_path.lower().endswith(".pdf"):
                results = processor.process_pdf(file_path)
                text = "\n\n".join(r["full_text"] for r in results)
                return {"text": text, "pages": results, "tables": []}
            else:
                result = processor.process_image(file_path)
                return {
                    "text": result["full_text"],
                    "pages": [result],
                    "tables": []
                }
        else:
            ocr = TesseractOCR(lang="eng")
            text = ocr.extract_text(file_path)
            return {"text": text, "pages": [], "tables": []}

    def _parse_mixed_pdf(self, file_path: str) -> dict:
        """Parse mixed PDF - native + OCR"""
        native_result = self._parse_native_pdf(file_path)
        ocr_result = self._parse_with_ocr(file_path)
        combined_text = native_result["text"] or ocr_result["text"]
        return {
            "text": combined_text,
            "tables": native_result["tables"],
            "pages": native_result["pages"]
        }

    def _parse_generic(self, file_path: str) -> dict:
        """Generic document parsing (using Unstructured)"""
        parser = UnstructuredParser(strategy="hi_res")
        elements = parser.parse_pdf(file_path)
        text = "\n\n".join(e["text"] for e in elements)
        return {"text": text, "pages": [], "tables": []}

    def _llm_correct(self, text: str) -> str:
        """Text correction using LLM"""
        if len(text) < 100:
            return text
        import anthropic
        client = anthropic.Anthropic()
        sample = text[:3000] if len(text) > 3000 else text
        response = client.messages.create(
            model=self.config.llm_model,
            max_tokens=4096,
            messages=[
                {
                    "role": "user",
                    "content": (
                        "Correct errors in the following OCR-extracted text. "
                        "Preserve the original meaning and structure while "
                        "fixing typos, garbled characters, and line break "
                        f"errors only.\n\n{sample}"
                    ),
                }
            ],
        )
        return response.content[0].text

    def _chunk_document(self, doc: ProcessedDocument) -> list:
        """Document chunking"""
        strategy = self.config.chunking_strategy
        if strategy == "fixed":
            return self.chunker.fixed_size_chunk(
                doc.full_text,
                self.config.chunk_size,
                self.config.chunk_overlap
            )
        elif strategy == "recursive":
            return self.chunker.recursive_chunk(
                doc.full_text,
                self.config.chunk_size
            )
        elif strategy == "structure":
            if doc.pages:
                return self.chunker.structure_based_chunk(doc.pages)
            return self.chunker.recursive_chunk(
                doc.full_text, self.config.chunk_size
            )
        return self.chunker.recursive_chunk(
            doc.full_text, self.config.chunk_size
        )

    def save_results(
        self, result: ProcessedDocument, output_dir: str
    ):
        """Save processing results"""
        os.makedirs(output_dir, exist_ok=True)
        base_name = Path(result.source_path).stem
        # Save chunks
        chunks_path = os.path.join(output_dir, f"{base_name}_chunks.json")
        with open(chunks_path, "w", encoding="utf-8") as f:
            json.dump(result.chunks, f, ensure_ascii=False, indent=2)
        # Save metadata
        meta_path = os.path.join(output_dir, f"{base_name}_metadata.json")
        with open(meta_path, "w", encoding="utf-8") as f:
            json.dump(result.metadata, f, ensure_ascii=False, indent=2)
        # Save full text
        text_path = os.path.join(output_dir, f"{base_name}_full.txt")
        with open(text_path, "w", encoding="utf-8") as f:
            f.write(result.full_text)
        logger.info(f"Results saved to {output_dir}")


# Usage example
if __name__ == "__main__":
    config = PipelineConfig(
        ocr_engine="paddleocr",
        chunking_strategy="recursive",
        chunk_size=1000,
        chunk_overlap=200,
        use_llm_correction=True
    )
    pipeline = DocumentParsingPipeline(config)

    # Process single document
    result = pipeline.process("research_paper.pdf")
    print(f"Total {len(result.chunks)} chunks created")
    print(f"Extracted {len(result.tables)} tables")

    # Save results
    pipeline.save_results(result, "./output")

Batch Processing and Monitoring

import time
from concurrent.futures import ThreadPoolExecutor, as_completed

class BatchProcessor:
    """Bulk document batch processor"""

    def __init__(self, pipeline: DocumentParsingPipeline, max_workers: int = 4):
        self.pipeline = pipeline
        self.max_workers = max_workers

    def process_directory(self, input_dir: str, output_dir: str) -> dict:
        """Batch process all documents in a directory"""
        supported_ext = {".pdf", ".png", ".jpg", ".jpeg", ".tiff"}
        files = [
            str(f) for f in Path(input_dir).rglob("*")
            if f.suffix.lower() in supported_ext
        ]
        logger.info(f"Found {len(files)} documents to process")
        stats = {
            "total": len(files),
            "success": 0,
            "failed": 0,
            "total_chunks": 0,
            "processing_time": 0
        }
        start_time = time.time()
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = {
                executor.submit(
                    self._process_single, f, output_dir
                ): f for f in files
            }
            for future in as_completed(futures):
                file_path = futures[future]
                try:
                    result = future.result()
                    stats["success"] += 1
                    stats["total_chunks"] += len(result.chunks)
                    logger.info(f"Success: {file_path}")
                except Exception as e:
                    stats["failed"] += 1
                    logger.error(f"Failed: {file_path} - {e}")

        stats["processing_time"] = time.time() - start_time
        logger.info(
            f"Batch complete: {stats['success']}/{stats['total']} "
            f"in {stats['processing_time']:.1f}s"
        )
        return stats

    def _process_single(
        self, file_path: str, output_dir: str
    ) -> ProcessedDocument:
        """Process and save a single document"""
        result = self.pipeline.process(file_path)
        self.pipeline.save_results(result, output_dir)
        return result

Conclusion

Document Parsing is the foundational technology that determines data quality for AI/LLM applications. Here is a summary of the key topics covered in this article.

PDF Parsing: PyMuPDF excels at general-purpose processing, while pdfplumber is strong in table extraction. The optimal tool should be selected based on document type (native/scanned/mixed).

OCR: PaddleOCR provides high accuracy for Asian languages, while Tesseract offers broad language support. Image preprocessing (binarization, deskewing, etc.) significantly impacts accuracy.

Layout Analysis: Tools like LayoutLMv3 and Unstructured enable automatic recognition of document logical structure. Unstructured is particularly well-suited for rapid prototyping.

Table Extraction: While Table Transformer and Camelot are effective tools, complex tables (merged cells, borderless tables) require additional post-processing.

LLM-Based Document Understanding: Multimodal LLMs like GPT-4V and Claude have brought breakthrough performance improvements in OCR correction, structure analysis, and information extraction. The hybrid OCR + LLM approach represents the current best practice.

Chunking Strategy: In RAG pipelines, retrieval quality directly depends on the chunking strategy. Structure-based chunking that reflects the logical structure of documents provides the highest retrieval accuracy.

Document Parsing technology is rapidly evolving, and the emergence of multimodal LLMs is fundamentally changing the existing pipeline paradigm. However, due to cost and latency constraints, the most practical choice in production is a hybrid approach that appropriately combines traditional tools with LLMs.

References