Skip to content
Published on

Document Parsing 기술 가이드: PDF 파싱·OCR·레이아웃 분석·LLM 기반 문서 추출 실전 파이프라인

Authors
  • Name
    Twitter

Document Parsing 기술 가이드

들어가며

기업이 보유한 지식의 대부분은 PDF 보고서, 스캔된 계약서, 연구 논문, 송장, 의료 기록 등 비정형 문서에 담겨 있다. McKinsey의 조사에 따르면 기업 데이터의 약 80%가 이러한 비정형 형태로 존재하며, 이를 효과적으로 활용하지 못하면 데이터 자산의 대부분을 방치하는 셈이다.

RAG(Retrieval-Augmented Generation) 시스템, 지식 검색 엔진, 문서 자동화 시스템의 품질은 결국 입력 문서를 얼마나 정확하게 파싱하느냐에 달려 있다. "Garbage in, garbage out"이라는 원칙이 그 어느 때보다 적용되는 분야가 바로 Document Parsing이다.

이 글에서는 PDF 파싱 라이브러리 비교, OCR 엔진 선택 기준, 레이아웃 분석 모델, 테이블 추출 기법, LLM 기반 멀티모달 문서 이해, RAG 최적화를 위한 청킹 전략, 프로덕션 파이프라인 구축까지 Document Parsing의 전 과정을 실전 코드와 함께 체계적으로 다룬다.

Document Parsing의 개요

왜 Document Parsing이 중요한가

Document Parsing은 비정형 문서에서 구조화된 정보를 추출하는 기술이다. 단순한 텍스트 추출을 넘어, 문서의 논리적 구조(제목, 본문, 표, 그림 캡션 등)를 이해하고 의미 있는 단위로 정보를 조직화하는 것이 핵심이다.

Document Parsing이 필요한 주요 시나리오는 다음과 같다.

시나리오설명핵심 기술
RAG 파이프라인문서를 청크로 분할하여 벡터 DB에 저장청킹, 임베딩
지식 베이스 구축사내 문서에서 구조화된 지식 추출NER, 관계 추출
문서 자동화송장, 계약서에서 핵심 필드 추출템플릿 매칭, 키-값 추출
규제 컴플라이언스규제 문서 변경사항 자동 추적변경 감지, 비교 분석
연구 논문 분석논문에서 방법론, 결과, 인용 추출섹션 분류, 메타데이터 추출

Document Parsing 파이프라인 아키텍처

일반적인 Document Parsing 파이프라인은 다음과 같은 단계로 구성된다.

  1. 문서 수집: PDF, 이미지, Word, HTML 등 다양한 형식의 문서 입력
  2. 전처리: 이미지 보정, 노이즈 제거, 페이지 분리
  3. 텍스트 추출: 네이티브 PDF 텍스트 추출 또는 OCR
  4. 레이아웃 분석: 문서 구조 인식 (제목, 본문, 표, 그림)
  5. 구조화 추출: 테이블 파싱, 키-값 쌍 추출, NER
  6. 후처리: 텍스트 정제, 청킹, 메타데이터 부착
  7. 저장/인덱싱: 벡터 DB 또는 검색 엔진에 저장
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional

class DocumentType(Enum):
    NATIVE_PDF = "native_pdf"      # 텍스트 레이어가 있는 PDF
    SCANNED_PDF = "scanned_pdf"    # 스캔된 이미지 PDF
    IMAGE = "image"                # JPG, PNG 등 이미지
    MIXED_PDF = "mixed_pdf"        # 네이티브 + 스캔 혼합

@dataclass
class ParsedDocument:
    text: str
    pages: list = field(default_factory=list)
    tables: list = field(default_factory=list)
    images: list = field(default_factory=list)
    metadata: dict = field(default_factory=dict)
    doc_type: Optional[DocumentType] = None

    def get_chunks(self, strategy: str = "recursive", chunk_size: int = 1000):
        """문서를 지정된 전략으로 청킹"""
        if strategy == "recursive":
            return self._recursive_chunk(chunk_size)
        elif strategy == "semantic":
            return self._semantic_chunk(chunk_size)
        elif strategy == "structure":
            return self._structure_based_chunk()
        return []

    def _recursive_chunk(self, chunk_size: int):
        separators = ["\n\n", "\n", ". ", " "]
        return self._split_text(self.text, separators, chunk_size)

    def _split_text(self, text: str, separators: list, chunk_size: int):
        chunks = []
        if len(text) <= chunk_size:
            return [text]
        sep = separators[0] if separators else " "
        parts = text.split(sep)
        current = ""
        for part in parts:
            if len(current) + len(part) + len(sep) > chunk_size:
                if current:
                    chunks.append(current.strip())
                current = part
            else:
                current = current + sep + part if current else part
        if current:
            chunks.append(current.strip())
        return chunks

    def _semantic_chunk(self, chunk_size: int):
        # 시맨틱 청킹 구현 (임베딩 기반)
        return self._recursive_chunk(chunk_size)

    def _structure_based_chunk(self):
        # 문서 구조 기반 청킹
        return [page.get("text", "") for page in self.pages if page.get("text")]

PDF 파싱 기술과 도구

PDF는 가장 널리 사용되는 문서 형식이지만, 파싱 관점에서는 가장 까다로운 형식이기도 하다. PDF는 본질적으로 "인쇄 레이아웃" 포맷이기 때문에, 텍스트의 논리적 순서가 파일 구조에 보장되지 않는다.

주요 PDF 파싱 라이브러리 비교

라이브러리장점단점적합한 용도
PyMuPDF (fitz)빠른 속도, 풍부한 기능, 이미지 추출라이선스 (AGPL)범용 PDF 처리
pdfplumber정확한 테이블 추출, 좌표 기반 접근속도가 느림테이블 중심 문서
PyPDF2순수 Python, 설치 간편복잡한 PDF에서 부정확간단한 텍스트 추출
Camelot테이블 추출 전용PDF 전체 처리 불가테이블만 필요할 때
pdfminer.six상세한 레이아웃 정보API가 복잡함레이아웃 분석 필요 시

PyMuPDF를 활용한 PDF 파싱

import fitz  # PyMuPDF

class PyMuPDFParser:
    """PyMuPDF 기반 PDF 파서"""

    def __init__(self, pdf_path: str):
        self.doc = fitz.open(pdf_path)
        self.pages = []

    def extract_text_with_layout(self) -> list:
        """페이지별 텍스트를 레이아웃 정보와 함께 추출"""
        results = []
        for page_num, page in enumerate(self.doc):
            blocks = page.get_text("dict")["blocks"]
            page_data = {
                "page_num": page_num + 1,
                "width": page.rect.width,
                "height": page.rect.height,
                "blocks": []
            }
            for block in blocks:
                if block["type"] == 0:  # 텍스트 블록
                    text_content = ""
                    for line in block["lines"]:
                        line_text = ""
                        for span in line["spans"]:
                            line_text += span["text"]
                        text_content += line_text + "\n"
                    page_data["blocks"].append({
                        "type": "text",
                        "bbox": block["bbox"],
                        "text": text_content.strip(),
                        "font_size": block["lines"][0]["spans"][0]["size"]
                        if block["lines"] and block["lines"][0]["spans"] else 0
                    })
                elif block["type"] == 1:  # 이미지 블록
                    page_data["blocks"].append({
                        "type": "image",
                        "bbox": block["bbox"],
                        "image_data": block.get("image", None)
                    })
            results.append(page_data)
        return results

    def extract_images(self, output_dir: str) -> list:
        """PDF에서 모든 이미지를 추출"""
        import os
        os.makedirs(output_dir, exist_ok=True)
        image_paths = []
        for page_num, page in enumerate(self.doc):
            images = page.get_images(full=True)
            for img_idx, img in enumerate(images):
                xref = img[0]
                pix = fitz.Pixmap(self.doc, xref)
                if pix.n < 5:  # GRAY 또는 RGB
                    img_path = os.path.join(
                        output_dir,
                        f"page_{page_num + 1}_img_{img_idx + 1}.png"
                    )
                    pix.save(img_path)
                    image_paths.append(img_path)
                pix = None
        return image_paths

    def detect_document_type(self) -> DocumentType:
        """PDF가 네이티브인지 스캔인지 판별"""
        total_text_len = 0
        total_images = 0
        for page in self.doc:
            total_text_len += len(page.get_text())
            total_images += len(page.get_images())
        if total_text_len < 100 and total_images > 0:
            return DocumentType.SCANNED_PDF
        elif total_text_len > 100 and total_images > len(self.doc) * 0.5:
            return DocumentType.MIXED_PDF
        return DocumentType.NATIVE_PDF

    def close(self):
        self.doc.close()

pdfplumber를 활용한 정밀 파싱

pdfplumber는 특히 테이블 추출에 강점이 있으며, 각 문자의 정확한 좌표 정보를 제공한다.

import pdfplumber

class PdfPlumberParser:
    """pdfplumber 기반 정밀 PDF 파서"""

    def __init__(self, pdf_path: str):
        self.pdf = pdfplumber.open(pdf_path)

    def extract_tables(self) -> list:
        """모든 페이지에서 테이블 추출"""
        all_tables = []
        for page_num, page in enumerate(self.pdf.pages):
            tables = page.extract_tables(
                table_settings={
                    "vertical_strategy": "lines",
                    "horizontal_strategy": "lines",
                    "snap_tolerance": 3,
                    "join_tolerance": 3,
                    "edge_min_length": 3,
                    "min_words_vertical": 3,
                    "min_words_horizontal": 1,
                }
            )
            for table_idx, table in enumerate(tables):
                if table and len(table) > 1:
                    headers = table[0]
                    rows = table[1:]
                    all_tables.append({
                        "page": page_num + 1,
                        "table_index": table_idx,
                        "headers": headers,
                        "rows": rows,
                        "num_rows": len(rows),
                        "num_cols": len(headers) if headers else 0
                    })
        return all_tables

    def extract_text_outside_tables(self) -> str:
        """테이블 영역을 제외한 텍스트만 추출"""
        full_text = []
        for page in self.pdf.pages:
            # 테이블 바운딩 박스 수집
            table_bboxes = []
            tables = page.find_tables()
            for table in tables:
                table_bboxes.append(table.bbox)
            # 테이블 영역 크롭 후 제외
            filtered_page = page
            for bbox in table_bboxes:
                filtered_page = filtered_page.outside_bbox(bbox)
            text = filtered_page.extract_text()
            if text:
                full_text.append(text)
        return "\n\n".join(full_text)

    def close(self):
        self.pdf.close()

OCR 기반 문서 인식

스캔된 문서나 이미지 기반 PDF를 처리하려면 OCR(Optical Character Recognition)이 필수적이다. OCR 기술은 전통적인 규칙 기반 방식에서 딥러닝 기반으로 빠르게 발전하고 있다.

주요 OCR 엔진 비교

엔진지원 언어정확도속도특징
Tesseract 5100+중~상중간오픈소스, 가장 널리 사용
EasyOCR80+중~상느림PyTorch 기반, 설치 간편
PaddleOCR80+빠름Baidu 개발, 높은 정확도
Google Vision API100+최상빠름클라우드 서비스, 유료
Azure Document Intelligence100+최상빠름엔터프라이즈, 구조화 추출 지원

Tesseract OCR 활용

import pytesseract
from PIL import Image
import cv2
import numpy as np

class TesseractOCR:
    """Tesseract 기반 OCR 처리기"""

    def __init__(self, lang: str = "kor+eng"):
        self.lang = lang
        self.config = "--oem 3 --psm 6"  # LSTM 엔진 + 균일 텍스트 블록

    def preprocess_image(self, image_path: str) -> np.ndarray:
        """OCR 정확도 향상을 위한 이미지 전처리"""
        img = cv2.imread(image_path)
        # 그레이스케일 변환
        gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        # 노이즈 제거
        denoised = cv2.fastNlMeansDenoising(gray, h=10)
        # 이진화 (Otsu's method)
        _, binary = cv2.threshold(
            denoised, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU
        )
        # 기울기 보정
        coords = np.column_stack(np.where(binary > 0))
        if len(coords) > 0:
            angle = cv2.minAreaRect(coords)[-1]
            if angle < -45:
                angle = -(90 + angle)
            else:
                angle = -angle
            if abs(angle) > 0.5:
                h, w = binary.shape
                center = (w // 2, h // 2)
                matrix = cv2.getRotationMatrix2D(center, angle, 1.0)
                binary = cv2.warpAffine(
                    binary, matrix, (w, h),
                    flags=cv2.INTER_CUBIC,
                    borderMode=cv2.BORDER_REPLICATE
                )
        return binary

    def extract_text(self, image_path: str, preprocess: bool = True) -> str:
        """이미지에서 텍스트 추출"""
        if preprocess:
            img = self.preprocess_image(image_path)
        else:
            img = Image.open(image_path)
        text = pytesseract.image_to_string(
            img, lang=self.lang, config=self.config
        )
        return text.strip()

    def extract_with_boxes(self, image_path: str) -> list:
        """바운딩 박스와 함께 텍스트 추출"""
        img = self.preprocess_image(image_path)
        data = pytesseract.image_to_data(
            img, lang=self.lang, config=self.config,
            output_type=pytesseract.Output.DICT
        )
        results = []
        for i in range(len(data["text"])):
            if data["text"][i].strip():
                results.append({
                    "text": data["text"][i],
                    "confidence": data["conf"][i],
                    "bbox": {
                        "x": data["left"][i],
                        "y": data["top"][i],
                        "w": data["width"][i],
                        "h": data["height"][i]
                    },
                    "block_num": data["block_num"][i],
                    "line_num": data["line_num"][i]
                })
        return results

PaddleOCR로 고정확도 OCR 구현

PaddleOCR은 특히 아시아 언어(한국어, 일본어, 중국어)에서 높은 정확도를 보여준다.

from paddleocr import PaddleOCR

class PaddleOCRProcessor:
    """PaddleOCR 기반 고정확도 OCR"""

    def __init__(self, lang: str = "korean"):
        self.ocr = PaddleOCR(
            use_angle_cls=True,  # 텍스트 방향 감지
            lang=lang,
            use_gpu=True,
            det_db_thresh=0.3,
            det_db_box_thresh=0.5,
            rec_batch_num=16
        )

    def process_image(self, image_path: str) -> dict:
        """이미지에서 텍스트와 레이아웃 정보 추출"""
        result = self.ocr.ocr(image_path, cls=True)
        extracted = {
            "lines": [],
            "full_text": "",
            "confidence_avg": 0.0
        }
        if not result or not result[0]:
            return extracted

        total_conf = 0
        lines = []
        for line in result[0]:
            bbox = line[0]  # 4개 꼭짓점 좌표
            text = line[1][0]
            confidence = line[1][1]
            lines.append({
                "text": text,
                "confidence": confidence,
                "bbox": bbox,
                "y_center": (bbox[0][1] + bbox[2][1]) / 2
            })
            total_conf += confidence

        # y좌표 기준으로 정렬 (읽기 순서)
        lines.sort(key=lambda x: (x["y_center"], x["bbox"][0][0]))
        extracted["lines"] = lines
        extracted["full_text"] = "\n".join(l["text"] for l in lines)
        extracted["confidence_avg"] = (
            total_conf / len(lines) if lines else 0
        )
        return extracted

    def process_pdf(self, pdf_path: str) -> list:
        """PDF의 모든 페이지를 OCR 처리"""
        import fitz
        doc = fitz.open(pdf_path)
        results = []
        for page_num, page in enumerate(doc):
            # 페이지를 고해상도 이미지로 변환
            mat = fitz.Matrix(2.0, 2.0)  # 2x 스케일
            pix = page.get_pixmap(matrix=mat)
            img_path = f"/tmp/page_{page_num}.png"
            pix.save(img_path)
            # OCR 실행
            ocr_result = self.process_image(img_path)
            ocr_result["page_num"] = page_num + 1
            results.append(ocr_result)
        doc.close()
        return results

레이아웃 분석과 구조 추출

레이아웃 분석은 문서에서 텍스트 블록, 제목, 표, 그림, 캡션 등의 영역을 식별하고 논리적 읽기 순서를 결정하는 과정이다. 최근에는 트랜스포머 기반 모델이 이 분야를 주도하고 있다.

주요 레이아웃 분석 모델

모델개발사핵심 기술특징
LayoutLMv3Microsoft멀티모달 트랜스포머텍스트+이미지+레이아웃 통합
DiT (Document Image Transformer)MicrosoftVision Transformer이미지 기반 문서 이해
DonutNAVER CLOVAOCR-free 접근OCR 없이 직접 문서 이해
Table TransformerMicrosoftDETR 기반테이블 감지/구조 인식 특화
Unstructured오픈소스하이브리드여러 모델 조합 파이프라인

LayoutLMv3를 활용한 문서 구조 분석

from transformers import (
    LayoutLMv3ForTokenClassification,
    LayoutLMv3Processor,
)
from PIL import Image
import torch

class LayoutAnalyzer:
    """LayoutLMv3 기반 문서 레이아웃 분석기"""

    LABEL_MAP = {
        0: "O",
        1: "B-TITLE",
        2: "I-TITLE",
        3: "B-TEXT",
        4: "I-TEXT",
        5: "B-TABLE",
        6: "I-TABLE",
        7: "B-FIGURE",
        8: "I-FIGURE",
        9: "B-LIST",
        10: "I-LIST",
        11: "B-HEADER",
        12: "I-HEADER",
        13: "B-FOOTER",
        14: "I-FOOTER",
    }

    def __init__(self, model_name: str = "microsoft/layoutlmv3-base"):
        self.processor = LayoutLMv3Processor.from_pretrained(
            model_name, apply_ocr=True
        )
        self.model = LayoutLMv3ForTokenClassification.from_pretrained(
            model_name, num_labels=len(self.LABEL_MAP)
        )
        self.model.eval()

    def analyze(self, image_path: str) -> list:
        """문서 이미지의 레이아웃 분석"""
        image = Image.open(image_path).convert("RGB")
        encoding = self.processor(
            image,
            return_tensors="pt",
            truncation=True,
            max_length=512
        )
        with torch.no_grad():
            outputs = self.model(**encoding)
        predictions = outputs.logits.argmax(-1).squeeze().tolist()
        tokens = self.processor.tokenizer.convert_ids_to_tokens(
            encoding["input_ids"].squeeze()
        )
        # 토큰별 예측 결과 매핑
        elements = []
        current_label = None
        current_text = ""
        for token, pred in zip(tokens, predictions):
            label = self.LABEL_MAP.get(pred, "O")
            if label.startswith("B-"):
                if current_text and current_label:
                    elements.append({
                        "type": current_label,
                        "text": current_text.strip()
                    })
                current_label = label[2:]
                current_text = token.replace("##", "")
            elif label.startswith("I-") and current_label:
                current_text += token.replace("##", "")
            else:
                if current_text and current_label:
                    elements.append({
                        "type": current_label,
                        "text": current_text.strip()
                    })
                current_label = None
                current_text = ""
        if current_text and current_label:
            elements.append({
                "type": current_label,
                "text": current_text.strip()
            })
        return elements

Unstructured 라이브러리를 활용한 통합 파싱

Unstructured는 다양한 문서 형식을 지원하는 오픈소스 라이브러리로, 여러 파싱 엔진을 통합하여 제공한다.

from unstructured.partition.pdf import partition_pdf
from unstructured.partition.auto import partition
from unstructured.chunking.title import chunk_by_title

class UnstructuredParser:
    """Unstructured 기반 통합 문서 파서"""

    def __init__(self, strategy: str = "hi_res"):
        self.strategy = strategy  # "fast", "ocr_only", "hi_res"

    def parse_pdf(self, pdf_path: str) -> list:
        """PDF를 구조화된 요소로 파싱"""
        elements = partition_pdf(
            filename=pdf_path,
            strategy=self.strategy,
            infer_table_structure=True,
            languages=["kor", "eng"],
            extract_images_in_pdf=True,
            extract_image_block_output_dir="./extracted_images"
        )
        parsed = []
        for element in elements:
            parsed.append({
                "type": type(element).__name__,
                "text": str(element),
                "metadata": {
                    "page_number": element.metadata.page_number,
                    "coordinates": (
                        element.metadata.coordinates
                        if hasattr(element.metadata, "coordinates")
                        else None
                    ),
                    "parent_id": element.metadata.parent_id,
                }
            })
        return parsed

    def parse_and_chunk(
        self, file_path: str, max_characters: int = 1000
    ) -> list:
        """파싱 후 제목 기반 청킹까지 수행"""
        elements = partition(
            filename=file_path,
            strategy=self.strategy
        )
        chunks = chunk_by_title(
            elements,
            max_characters=max_characters,
            combine_text_under_n_chars=200,
            new_after_n_chars=800
        )
        return [
            {
                "text": str(chunk),
                "type": type(chunk).__name__,
                "metadata": chunk.metadata.to_dict()
            }
            for chunk in chunks
        ]

테이블 추출 기법

문서에서 테이블을 정확하게 추출하는 것은 Document Parsing에서 가장 도전적인 과제 중 하나이다. 테이블은 복잡한 셀 병합, 중첩 구조, 다양한 스타일을 가질 수 있기 때문이다.

테이블 추출의 주요 과제

  • 테이블 감지: 문서에서 테이블 영역을 정확히 식별
  • 구조 인식: 행/열 구조, 병합된 셀, 헤더 행 인식
  • 셀 내용 추출: 각 셀의 텍스트를 정확히 추출
  • 무선 테이블(borderless table): 선이 없는 테이블의 구조 인식

Table Transformer를 활용한 테이블 감지

from transformers import (
    TableTransformerForObjectDetection,
    AutoImageProcessor,
)
from PIL import Image
import torch

class TableExtractor:
    """Table Transformer 기반 테이블 추출기"""

    def __init__(self):
        self.processor = AutoImageProcessor.from_pretrained(
            "microsoft/table-transformer-detection"
        )
        self.detection_model = TableTransformerForObjectDetection.from_pretrained(
            "microsoft/table-transformer-detection"
        )
        self.structure_processor = AutoImageProcessor.from_pretrained(
            "microsoft/table-transformer-structure-recognition"
        )
        self.structure_model = TableTransformerForObjectDetection.from_pretrained(
            "microsoft/table-transformer-structure-recognition"
        )

    def detect_tables(self, image_path: str, threshold: float = 0.7) -> list:
        """이미지에서 테이블 영역 감지"""
        image = Image.open(image_path).convert("RGB")
        inputs = self.processor(images=image, return_tensors="pt")
        with torch.no_grad():
            outputs = self.detection_model(**inputs)
        target_sizes = torch.tensor([image.size[::-1]])
        results = self.processor.post_process_object_detection(
            outputs, threshold=threshold, target_sizes=target_sizes
        )[0]
        tables = []
        for score, label, box in zip(
            results["scores"], results["labels"], results["boxes"]
        ):
            tables.append({
                "score": score.item(),
                "label": self.detection_model.config.id2label[label.item()],
                "bbox": box.tolist()  # [x1, y1, x2, y2]
            })
        return tables

    def recognize_structure(
        self, image_path: str, table_bbox: list
    ) -> dict:
        """감지된 테이블의 내부 구조 인식"""
        image = Image.open(image_path).convert("RGB")
        # 테이블 영역 크롭
        table_image = image.crop(table_bbox)
        inputs = self.structure_processor(
            images=table_image, return_tensors="pt"
        )
        with torch.no_grad():
            outputs = self.structure_model(**inputs)
        target_sizes = torch.tensor([table_image.size[::-1]])
        results = self.structure_processor.post_process_object_detection(
            outputs, threshold=0.5, target_sizes=target_sizes
        )[0]
        structure = {"rows": [], "columns": [], "cells": []}
        for score, label, box in zip(
            results["scores"], results["labels"], results["boxes"]
        ):
            label_name = self.structure_model.config.id2label[label.item()]
            entry = {"bbox": box.tolist(), "score": score.item()}
            if "row" in label_name:
                structure["rows"].append(entry)
            elif "column" in label_name:
                structure["columns"].append(entry)
            else:
                structure["cells"].append(entry)
        # y좌표 기준 정렬
        structure["rows"].sort(key=lambda x: x["bbox"][1])
        structure["columns"].sort(key=lambda x: x["bbox"][0])
        return structure

Camelot을 활용한 간편한 테이블 추출

import camelot
import pandas as pd

def extract_tables_with_camelot(
    pdf_path: str, pages: str = "all", flavor: str = "lattice"
) -> list:
    """Camelot으로 PDF에서 테이블 추출

    Args:
        pdf_path: PDF 파일 경로
        pages: 추출할 페이지 ("all" 또는 "1,2,3")
        flavor: "lattice" (선 기반) 또는 "stream" (공백 기반)
    """
    tables = camelot.read_pdf(
        pdf_path,
        pages=pages,
        flavor=flavor,
        strip_text="\n"
    )
    results = []
    for i, table in enumerate(tables):
        df = table.df
        results.append({
            "table_index": i,
            "page": table.page,
            "accuracy": table.accuracy,
            "data": df.to_dict(orient="records"),
            "shape": df.shape,
            "dataframe": df
        })
    return results


# 사용 예시
if __name__ == "__main__":
    tables = extract_tables_with_camelot(
        "financial_report.pdf",
        pages="1-5",
        flavor="lattice"
    )
    for t in tables:
        print(f"테이블 {t['table_index']} (페이지 {t['page']})")
        print(f"  정확도: {t['accuracy']:.1f}%")
        print(f"  크기: {t['shape']}")
        print(t["dataframe"].head())

LLM 기반 문서 이해

최근 GPT-4V, Claude 3.5 등 멀티모달 LLM의 등장으로 문서 이해 방식이 근본적으로 변화하고 있다. 기존의 OCR + 후처리 파이프라인 대신, 문서 이미지를 직접 LLM에 입력하여 내용을 이해하고 구조화할 수 있다.

멀티모달 LLM을 활용한 문서 처리

import anthropic
import base64
from pathlib import Path

class LLMDocumentProcessor:
    """LLM 기반 멀티모달 문서 처리기"""

    def __init__(self, model: str = "claude-sonnet-4-20250514"):
        self.client = anthropic.Anthropic()
        self.model = model

    def _encode_image(self, image_path: str) -> tuple:
        """이미지를 base64로 인코딩"""
        path = Path(image_path)
        suffix = path.suffix.lower()
        media_type_map = {
            ".png": "image/png",
            ".jpg": "image/jpeg",
            ".jpeg": "image/jpeg",
            ".gif": "image/gif",
            ".webp": "image/webp",
        }
        media_type = media_type_map.get(suffix, "image/png")
        with open(image_path, "rb") as f:
            data = base64.standard_b64encode(f.read()).decode("utf-8")
        return data, media_type

    def extract_structured_data(
        self, image_path: str, schema_description: str
    ) -> str:
        """문서 이미지에서 구조화된 데이터 추출"""
        data, media_type = self._encode_image(image_path)
        prompt = f"""이 문서 이미지를 분석하여 다음 스키마에 맞는 구조화된 데이터를 JSON으로 추출해 주세요.

스키마:
{schema_description}

주의사항:
- 모든 텍스트를 정확히 추출하세요
- 테이블이 있다면 행/열 구조를 유지하세요
- 확실하지 않은 내용은 null로 표시하세요
- JSON 형식으로만 응답하세요"""

        response = self.client.messages.create(
            model=self.model,
            max_tokens=4096,
            messages=[
                {
                    "role": "user",
                    "content": [
                        {
                            "type": "image",
                            "source": {
                                "type": "base64",
                                "media_type": media_type,
                                "data": data,
                            },
                        },
                        {
                            "type": "text",
                            "text": prompt,
                        },
                    ],
                }
            ],
        )
        return response.content[0].text

    def analyze_document_layout(self, image_path: str) -> str:
        """문서 레이아웃 분석 및 구조 추출"""
        data, media_type = self._encode_image(image_path)
        prompt = """이 문서의 레이아웃을 분석하여 다음 정보를 JSON으로 반환하세요:

1. 문서 유형 (논문, 보고서, 송장, 계약서 등)
2. 섹션 구조 (제목과 계층)
3. 테이블 존재 여부와 위치 설명
4. 그림/차트 존재 여부와 설명
5. 핵심 키-값 쌍 (있는 경우)
6. 전체 텍스트의 읽기 순서

JSON으로만 응답하세요."""

        response = self.client.messages.create(
            model=self.model,
            max_tokens=4096,
            messages=[
                {
                    "role": "user",
                    "content": [
                        {
                            "type": "image",
                            "source": {
                                "type": "base64",
                                "media_type": media_type,
                                "data": data,
                            },
                        },
                        {"type": "text", "text": prompt},
                    ],
                }
            ],
        )
        return response.content[0].text

    def compare_documents(
        self, image_path_1: str, image_path_2: str
    ) -> str:
        """두 문서 이미지를 비교 분석"""
        data1, mt1 = self._encode_image(image_path_1)
        data2, mt2 = self._encode_image(image_path_2)
        prompt = """두 문서를 비교하여 다음을 분석하세요:
1. 공통점
2. 차이점
3. 추가된 내용
4. 삭제된 내용
5. 변경된 내용

구조화된 JSON으로 응답하세요."""

        response = self.client.messages.create(
            model=self.model,
            max_tokens=4096,
            messages=[
                {
                    "role": "user",
                    "content": [
                        {
                            "type": "image",
                            "source": {
                                "type": "base64",
                                "media_type": mt1,
                                "data": data1,
                            },
                        },
                        {
                            "type": "image",
                            "source": {
                                "type": "base64",
                                "media_type": mt2,
                                "data": data2,
                            },
                        },
                        {"type": "text", "text": prompt},
                    ],
                }
            ],
        )
        return response.content[0].text

하이브리드 접근: OCR + LLM 보정

OCR로 먼저 텍스트를 추출한 후, LLM으로 오류를 보정하고 구조를 정리하는 하이브리드 방식이 실무에서 높은 효과를 보인다.

class HybridDocumentProcessor:
    """OCR + LLM 하이브리드 문서 처리기"""

    def __init__(self):
        self.ocr = PaddleOCRProcessor(lang="korean")
        self.llm = LLMDocumentProcessor()

    def process(self, image_path: str) -> dict:
        """하이브리드 방식으로 문서 처리"""
        # 1단계: OCR로 텍스트 추출
        ocr_result = self.ocr.process_image(image_path)
        raw_text = ocr_result["full_text"]
        confidence = ocr_result["confidence_avg"]

        # 2단계: LLM으로 보정 및 구조화
        correction_prompt = f"""다음 OCR 추출 텍스트를 검토하고 보정하세요.
OCR 신뢰도가 낮은 부분을 문맥에 맞게 수정하고,
문서의 논리적 구조(제목, 본문, 목록 등)를 Markdown으로 정리하세요.

OCR 추출 텍스트 (평균 신뢰도: {confidence:.2f}):
---
{raw_text}
---

보정된 Markdown을 반환하세요."""

        import anthropic
        client = anthropic.Anthropic()
        response = client.messages.create(
            model="claude-sonnet-4-20250514",
            max_tokens=4096,
            messages=[{"role": "user", "content": correction_prompt}],
        )
        corrected_text = response.content[0].text

        return {
            "raw_ocr": raw_text,
            "ocr_confidence": confidence,
            "corrected_text": corrected_text,
            "method": "hybrid_ocr_llm"
        }

RAG를 위한 문서 청킹 전략

Document Parsing의 최종 목표가 RAG 파이프라인이라면, 청킹(Chunking) 전략이 검색 품질을 결정짓는 핵심 요소이다. 부적절한 청킹은 검색 정확도를 크게 떨어뜨리고, 맥락 손실로 인한 환각(hallucination)을 유발한다.

청킹 전략 비교

전략설명장점단점
고정 크기일정 토큰/문자 수로 분할구현 간단, 균일한 크기문맥 단절
재귀적 분할구분자 우선순위로 분할구조 유지, 유연함크기 불균일
시맨틱임베딩 유사도 기반 분할의미 단위 보존계산 비용 높음
문서 구조 기반헤딩/섹션 기반 분할논리적 구조 유지구조 인식 필요
슬라이딩 윈도우오버랩 포함 분할맥락 연속성저장 공간 증가

고급 청킹 구현

from typing import Optional
import numpy as np

class AdvancedChunker:
    """다양한 청킹 전략을 지원하는 고급 청커"""

    def __init__(self, embedding_model=None):
        self.embedding_model = embedding_model

    def fixed_size_chunk(
        self, text: str, chunk_size: int = 1000, overlap: int = 200
    ) -> list:
        """고정 크기 + 오버랩 청킹"""
        chunks = []
        start = 0
        while start < len(text):
            end = start + chunk_size
            chunk = text[start:end]
            # 문장 경계에서 자르기
            if end < len(text):
                last_period = chunk.rfind(". ")
                last_newline = chunk.rfind("\n")
                cut_point = max(last_period, last_newline)
                if cut_point > chunk_size * 0.5:
                    chunk = chunk[:cut_point + 1]
                    end = start + cut_point + 1
            chunks.append({
                "text": chunk.strip(),
                "start": start,
                "end": end,
                "index": len(chunks)
            })
            start = end - overlap
        return chunks

    def recursive_chunk(
        self,
        text: str,
        chunk_size: int = 1000,
        separators: Optional[list] = None,
    ) -> list:
        """재귀적 분할 청킹"""
        if separators is None:
            separators = ["\n\n\n", "\n\n", "\n", ". ", ", ", " "]
        chunks = []
        self._recursive_split(text, separators, chunk_size, chunks)
        return [
            {"text": c, "index": i}
            for i, c in enumerate(chunks)
            if c.strip()
        ]

    def _recursive_split(
        self, text: str, separators: list, chunk_size: int, result: list
    ):
        if len(text) <= chunk_size:
            result.append(text)
            return
        sep = separators[0] if separators else " "
        remaining_seps = separators[1:] if len(separators) > 1 else []
        parts = text.split(sep)
        current = ""
        for part in parts:
            test = current + sep + part if current else part
            if len(test) > chunk_size:
                if current:
                    if len(current) > chunk_size and remaining_seps:
                        self._recursive_split(
                            current, remaining_seps, chunk_size, result
                        )
                    else:
                        result.append(current)
                current = part
            else:
                current = test
        if current:
            if len(current) > chunk_size and remaining_seps:
                self._recursive_split(
                    current, remaining_seps, chunk_size, result
                )
            else:
                result.append(current)

    def semantic_chunk(
        self, text: str, threshold: float = 0.5, min_size: int = 100
    ) -> list:
        """시맨틱 청킹 - 임베딩 유사도 기반"""
        if not self.embedding_model:
            raise ValueError("시맨틱 청킹에는 임베딩 모델이 필요합니다")
        # 문장 단위로 분리
        sentences = [s.strip() for s in text.split(". ") if s.strip()]
        if len(sentences) <= 1:
            return [{"text": text, "index": 0}]
        # 각 문장의 임베딩 계산
        embeddings = self.embedding_model.encode(sentences)
        # 인접 문장 간 유사도 계산
        similarities = []
        for i in range(len(embeddings) - 1):
            sim = np.dot(embeddings[i], embeddings[i + 1]) / (
                np.linalg.norm(embeddings[i])
                * np.linalg.norm(embeddings[i + 1])
            )
            similarities.append(sim)
        # 유사도가 임계값 아래인 지점에서 분할
        chunks = []
        current_chunk = sentences[0]
        for i, sim in enumerate(similarities):
            if sim < threshold and len(current_chunk) >= min_size:
                chunks.append(current_chunk)
                current_chunk = sentences[i + 1]
            else:
                current_chunk += ". " + sentences[i + 1]
        if current_chunk:
            chunks.append(current_chunk)
        return [
            {"text": c, "index": i}
            for i, c in enumerate(chunks)
        ]

    def structure_based_chunk(self, parsed_elements: list) -> list:
        """문서 구조 기반 청킹 - 레이아웃 분석 결과 활용"""
        chunks = []
        current_chunk = {
            "title": "",
            "content": "",
            "tables": [],
            "metadata": {}
        }
        for element in parsed_elements:
            elem_type = element.get("type", "")
            elem_text = element.get("text", "")
            if elem_type in ("Title", "TITLE"):
                # 새로운 섹션 시작
                if current_chunk["content"]:
                    chunks.append(current_chunk.copy())
                current_chunk = {
                    "title": elem_text,
                    "content": "",
                    "tables": [],
                    "metadata": element.get("metadata", {})
                }
            elif elem_type in ("Table", "TABLE"):
                current_chunk["tables"].append(elem_text)
            else:
                current_chunk["content"] += elem_text + "\n"
        if current_chunk["content"]:
            chunks.append(current_chunk)
        return chunks

실전 파이프라인 구축

지금까지 다룬 개별 기술들을 조합하여, 프로덕션 환경에서 사용할 수 있는 엔드투엔드 Document Parsing 파이프라인을 구축한다.

파이프라인 아키텍처

전체 파이프라인은 다음과 같은 단계로 구성된다.

  1. 입력 처리: 다양한 문서 형식 감지 및 정규화
  2. 파싱 전략 선택: 문서 유형에 따른 최적 파서 선택
  3. 텍스트/구조 추출: OCR, 레이아웃 분석, 테이블 추출
  4. LLM 보강: 멀티모달 LLM을 통한 품질 향상
  5. 청킹 및 인덱싱: RAG용 청킹 후 벡터 DB에 저장
import os
import json
import logging
from pathlib import Path
from typing import Optional
from dataclasses import dataclass, field

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class PipelineConfig:
    """파이프라인 설정"""
    ocr_engine: str = "paddleocr"       # tesseract, paddleocr, easyocr
    ocr_lang: str = "korean"
    layout_model: str = "unstructured"  # layoutlm, unstructured
    chunking_strategy: str = "recursive"  # fixed, recursive, semantic, structure
    chunk_size: int = 1000
    chunk_overlap: int = 200
    use_llm_correction: bool = True
    llm_model: str = "claude-sonnet-4-20250514"
    output_format: str = "json"         # json, markdown

@dataclass
class ProcessedDocument:
    """처리된 문서 결과"""
    source_path: str
    doc_type: str
    pages: list = field(default_factory=list)
    full_text: str = ""
    tables: list = field(default_factory=list)
    chunks: list = field(default_factory=list)
    metadata: dict = field(default_factory=dict)
    processing_log: list = field(default_factory=list)


class DocumentParsingPipeline:
    """프로덕션 Document Parsing 파이프라인"""

    def __init__(self, config: Optional[PipelineConfig] = None):
        self.config = config or PipelineConfig()
        self.chunker = AdvancedChunker()

    def process(self, file_path: str) -> ProcessedDocument:
        """문서를 엔드투엔드로 처리"""
        result = ProcessedDocument(source_path=file_path, doc_type="")
        logger.info(f"Processing: {file_path}")

        try:
            # 1. 문서 유형 감지
            doc_type = self._detect_type(file_path)
            result.doc_type = doc_type
            result.processing_log.append(
                f"Document type detected: {doc_type}"
            )

            # 2. 파싱 전략 선택 및 실행
            if doc_type == "native_pdf":
                raw_result = self._parse_native_pdf(file_path)
            elif doc_type in ("scanned_pdf", "image"):
                raw_result = self._parse_with_ocr(file_path)
            elif doc_type == "mixed_pdf":
                raw_result = self._parse_mixed_pdf(file_path)
            else:
                raw_result = self._parse_generic(file_path)

            result.full_text = raw_result.get("text", "")
            result.tables = raw_result.get("tables", [])
            result.pages = raw_result.get("pages", [])

            # 3. LLM 보정 (선택적)
            if self.config.use_llm_correction and result.full_text:
                result.full_text = self._llm_correct(result.full_text)
                result.processing_log.append("LLM correction applied")

            # 4. 청킹
            result.chunks = self._chunk_document(result)
            result.processing_log.append(
                f"Created {len(result.chunks)} chunks "
                f"with strategy: {self.config.chunking_strategy}"
            )

            # 5. 메타데이터 생성
            result.metadata = {
                "source": file_path,
                "doc_type": doc_type,
                "total_pages": len(result.pages),
                "total_tables": len(result.tables),
                "total_chunks": len(result.chunks),
                "text_length": len(result.full_text),
                "config": {
                    "ocr_engine": self.config.ocr_engine,
                    "chunking_strategy": self.config.chunking_strategy,
                    "chunk_size": self.config.chunk_size,
                }
            }
            logger.info(
                f"Processing complete: "
                f"{len(result.chunks)} chunks created"
            )

        except Exception as e:
            logger.error(f"Error processing {file_path}: {e}")
            result.processing_log.append(f"Error: {str(e)}")

        return result

    def _detect_type(self, file_path: str) -> str:
        """문서 유형 자동 감지"""
        ext = Path(file_path).suffix.lower()
        if ext in (".jpg", ".jpeg", ".png", ".tiff", ".bmp"):
            return "image"
        elif ext == ".pdf":
            import fitz
            doc = fitz.open(file_path)
            total_text = sum(len(page.get_text()) for page in doc)
            total_images = sum(len(page.get_images()) for page in doc)
            doc.close()
            if total_text < 100:
                return "scanned_pdf"
            elif total_images > len(doc) * 0.5:
                return "mixed_pdf"
            return "native_pdf"
        return "unknown"

    def _parse_native_pdf(self, file_path: str) -> dict:
        """네이티브 PDF 파싱"""
        parser = PyMuPDFParser(file_path)
        pages = parser.extract_text_with_layout()
        plumber = PdfPlumberParser(file_path)
        tables = plumber.extract_tables()
        text = plumber.extract_text_outside_tables()
        parser.close()
        plumber.close()
        return {"text": text, "tables": tables, "pages": pages}

    def _parse_with_ocr(self, file_path: str) -> dict:
        """OCR 기반 문서 파싱"""
        if self.config.ocr_engine == "paddleocr":
            processor = PaddleOCRProcessor(lang=self.config.ocr_lang)
            if file_path.lower().endswith(".pdf"):
                results = processor.process_pdf(file_path)
                text = "\n\n".join(r["full_text"] for r in results)
                return {"text": text, "pages": results, "tables": []}
            else:
                result = processor.process_image(file_path)
                return {
                    "text": result["full_text"],
                    "pages": [result],
                    "tables": []
                }
        else:
            ocr = TesseractOCR(lang="kor+eng")
            text = ocr.extract_text(file_path)
            return {"text": text, "pages": [], "tables": []}

    def _parse_mixed_pdf(self, file_path: str) -> dict:
        """혼합 PDF 파싱 - 네이티브 + OCR"""
        native_result = self._parse_native_pdf(file_path)
        ocr_result = self._parse_with_ocr(file_path)
        # 네이티브 텍스트가 있는 페이지는 네이티브, 없으면 OCR 사용
        combined_text = native_result["text"] or ocr_result["text"]
        return {
            "text": combined_text,
            "tables": native_result["tables"],
            "pages": native_result["pages"]
        }

    def _parse_generic(self, file_path: str) -> dict:
        """일반 문서 파싱 (Unstructured 활용)"""
        parser = UnstructuredParser(strategy="hi_res")
        elements = parser.parse_pdf(file_path)
        text = "\n\n".join(e["text"] for e in elements)
        return {"text": text, "pages": [], "tables": []}

    def _llm_correct(self, text: str) -> str:
        """LLM을 사용한 텍스트 보정"""
        if len(text) < 100:
            return text
        import anthropic
        client = anthropic.Anthropic()
        # 텍스트가 너무 길면 앞부분만 보정 예시
        sample = text[:3000] if len(text) > 3000 else text
        response = client.messages.create(
            model=self.config.llm_model,
            max_tokens=4096,
            messages=[
                {
                    "role": "user",
                    "content": (
                        "다음 OCR 추출 텍스트의 오류를 보정하세요. "
                        "원본의 의미와 구조를 유지하면서 오타, 깨진 문자, "
                        "줄바꿈 오류만 수정하세요.\n\n"
                        f"{sample}"
                    ),
                }
            ],
        )
        return response.content[0].text

    def _chunk_document(self, doc: ProcessedDocument) -> list:
        """문서 청킹"""
        strategy = self.config.chunking_strategy
        if strategy == "fixed":
            return self.chunker.fixed_size_chunk(
                doc.full_text,
                self.config.chunk_size,
                self.config.chunk_overlap
            )
        elif strategy == "recursive":
            return self.chunker.recursive_chunk(
                doc.full_text,
                self.config.chunk_size
            )
        elif strategy == "structure":
            if doc.pages:
                return self.chunker.structure_based_chunk(doc.pages)
            return self.chunker.recursive_chunk(
                doc.full_text, self.config.chunk_size
            )
        return self.chunker.recursive_chunk(
            doc.full_text, self.config.chunk_size
        )

    def save_results(
        self, result: ProcessedDocument, output_dir: str
    ):
        """처리 결과 저장"""
        os.makedirs(output_dir, exist_ok=True)
        base_name = Path(result.source_path).stem
        # 청크 저장
        chunks_path = os.path.join(output_dir, f"{base_name}_chunks.json")
        with open(chunks_path, "w", encoding="utf-8") as f:
            json.dump(result.chunks, f, ensure_ascii=False, indent=2)
        # 메타데이터 저장
        meta_path = os.path.join(output_dir, f"{base_name}_metadata.json")
        with open(meta_path, "w", encoding="utf-8") as f:
            json.dump(result.metadata, f, ensure_ascii=False, indent=2)
        # 전체 텍스트 저장
        text_path = os.path.join(output_dir, f"{base_name}_full.txt")
        with open(text_path, "w", encoding="utf-8") as f:
            f.write(result.full_text)
        logger.info(f"Results saved to {output_dir}")


# 사용 예시
if __name__ == "__main__":
    config = PipelineConfig(
        ocr_engine="paddleocr",
        chunking_strategy="recursive",
        chunk_size=1000,
        chunk_overlap=200,
        use_llm_correction=True
    )
    pipeline = DocumentParsingPipeline(config)

    # 단일 문서 처리
    result = pipeline.process("research_paper.pdf")
    print(f"총 {len(result.chunks)}개 청크 생성")
    print(f"테이블 {len(result.tables)}개 추출")

    # 결과 저장
    pipeline.save_results(result, "./output")

    # 벡터 DB에 청크 저장 (예시)
    # from chromadb import Client
    # client = Client()
    # collection = client.get_or_create_collection("documents")
    # for chunk in result.chunks:
    #     collection.add(
    #         documents=[chunk["text"]],
    #         metadatas=[result.metadata],
    #         ids=[f"{base_name}_{chunk['index']}"]
    #     )

배치 처리 및 모니터링

import time
from concurrent.futures import ThreadPoolExecutor, as_completed

class BatchProcessor:
    """대량 문서 배치 처리기"""

    def __init__(self, pipeline: DocumentParsingPipeline, max_workers: int = 4):
        self.pipeline = pipeline
        self.max_workers = max_workers

    def process_directory(self, input_dir: str, output_dir: str) -> dict:
        """디렉토리의 모든 문서를 배치 처리"""
        supported_ext = {".pdf", ".png", ".jpg", ".jpeg", ".tiff"}
        files = [
            str(f) for f in Path(input_dir).rglob("*")
            if f.suffix.lower() in supported_ext
        ]
        logger.info(f"Found {len(files)} documents to process")
        stats = {
            "total": len(files),
            "success": 0,
            "failed": 0,
            "total_chunks": 0,
            "processing_time": 0
        }
        start_time = time.time()
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = {
                executor.submit(
                    self._process_single, f, output_dir
                ): f for f in files
            }
            for future in as_completed(futures):
                file_path = futures[future]
                try:
                    result = future.result()
                    stats["success"] += 1
                    stats["total_chunks"] += len(result.chunks)
                    logger.info(f"Success: {file_path}")
                except Exception as e:
                    stats["failed"] += 1
                    logger.error(f"Failed: {file_path} - {e}")

        stats["processing_time"] = time.time() - start_time
        logger.info(
            f"Batch complete: {stats['success']}/{stats['total']} "
            f"in {stats['processing_time']:.1f}s"
        )
        return stats

    def _process_single(
        self, file_path: str, output_dir: str
    ) -> ProcessedDocument:
        """단일 문서 처리 및 저장"""
        result = self.pipeline.process(file_path)
        self.pipeline.save_results(result, output_dir)
        return result

마치며

Document Parsing은 AI/LLM 애플리케이션의 데이터 품질을 결정짓는 핵심 기반 기술이다. 이 글에서 다룬 내용을 정리하면 다음과 같다.

PDF 파싱: PyMuPDF는 범용 처리에, pdfplumber는 테이블 추출에 강점이 있다. 문서 유형(네이티브/스캔/혼합)에 따라 최적 도구를 선택해야 한다.

OCR: PaddleOCR은 아시아 언어에서 높은 정확도를, Tesseract는 범용성을 제공한다. 이미지 전처리(이진화, 기울기 보정 등)가 정확도에 큰 영향을 미친다.

레이아웃 분석: LayoutLMv3, Unstructured 같은 도구를 활용하면 문서의 논리적 구조를 자동으로 파악할 수 있다. 특히 Unstructured는 빠르게 프로토타이핑하기에 적합하다.

테이블 추출: Table Transformer, Camelot 등을 활용하되, 복잡한 테이블(병합 셀, 무선 테이블)에는 추가적인 후처리가 필요하다.

LLM 기반 문서 이해: GPT-4V, Claude 등의 멀티모달 LLM은 OCR 보정, 구조 분석, 정보 추출에서 획기적인 성능 향상을 가져왔다. OCR + LLM 하이브리드 접근이 현재 최선의 실무 전략이다.

청킹 전략: RAG 파이프라인에서 검색 품질은 청킹 전략에 직접적으로 의존한다. 문서의 논리적 구조를 반영하는 구조 기반 청킹이 가장 높은 검색 정확도를 제공한다.

Document Parsing 기술은 빠르게 발전하고 있으며, 특히 멀티모달 LLM의 등장은 기존의 파이프라인 패러다임을 근본적으로 바꾸고 있다. 하지만 비용과 지연 시간의 제약으로, 실무에서는 전통적인 도구와 LLM을 적절히 조합하는 하이브리드 접근이 가장 현실적인 선택이다.

참고자료