본문 바로가기
카테고리 없음

다양한 문서 포맷(HWP, HWPX, PDF, DOC, DOCX) 텍스트 추출 자동화하기

by 용소소 2025. 6. 2.

 이전 글에서는 .hwp와 .pdf 파일에서 텍스트를 추출해 .txt로 저장하는 간단한 자동화 코드를 소개했었습니다. 이번에는 그 범위를 확장해 HWPX, DOC, DOCX까지 다양한 문서 포맷을 아우르는 변환기를 만들어보았습니다.

 

 RAG를 위한 데이터 전처리에 고통받을 분들을 위해 공유해 둡니다.

 

✅ 지원 포맷

  • .hwp (바이너리 형식의 한글 문서)
  • .hwpx (XML 기반 최신 한글 문서)
  • .pdf
  • .doc (구 버전 MS 워드, catdoc 필요)
  • .docx (신형 MS 워드, python-docx)

 

📦 사전 준비

필수 패키지 설치

pip install olefile python-docx pymupdf
sudo apt install catdoc  # .doc 파일 지원용

 

 

🧠 각 포맷별 텍스트 추출 방식

1. .hwp (바이너리 한글 파일)

OLE 구조로 되어 있으며, 내부 섹션을 압축 해제해 UTF-16 문자열을 직접 추출합니다.

def get_hwp_text(filename):
    ...

2. .pdf

PyMuPDF을 사용하여 페이지 단위로 텍스트를 가져옵니다.

def get_pdf_text(filename):
    ...

3. .docx

python-docx를 활용해 단락 단위 텍스트를 추출합니다.

def get_docx_text(filename):
    ...

4. .doc (구형 워드 문서)

리눅스에서 catdoc 명령어를 통해 처리합니다.

def get_doc_text(filename):
    ...

 

⚠️ catdoc이 없다면 sudo apt install catdoc로 설치 필요

5. .hwpx (HWP의 XML 기반 후속 형식)

ZIP 구조 내부의 XML(Contents/section0.xml)에서 텍스트 노드 추출

def get_hwpx_text(filename):
    ...
 

 

🧰 전체 변환기 구조

아래 코드는 지정한 입력 디렉토리에서 위 포맷들의 문서를 찾아 .txt로 변환해 출력 디렉토리에 저장합니다.

def convert_documents(input_dir, output_dir):
    ...

 

예시 실행

 

folder_pairs = [
    ("./data/rule", "./data/rule_txt"),
    ("./data/policy", "./data/policy_txt"),
]

for input_dir, output_dir in folder_pairs:
    convert_documents(input_dir, output_dir)

📝 마무리

이제는 .hwp, .pdf는 물론 .doc, .docx, .hwpx 파일까지도 한 번에 .txt로 변환할 수 있습니다. 다양한 정책 문서나 규정 파일을 분석할 때 특히 유용하게 활용할 수 있습니다.

 

전체 코드

 

import os
import olefile
import zlib
import struct
import fitz  # PyMuPDF
from docx import Document
import subprocess
import zipfile
import xml.etree.ElementTree as ET

def get_hwp_text(filename):
    f = olefile.OleFileIO(filename)
    dirs = f.listdir()
    if ["FileHeader"] not in dirs or ["\x05HwpSummaryInformation"] not in dirs:
        raise Exception("Not Valid HWP.")
    header = f.openstream("FileHeader")
    header_data = header.read()
    is_compressed = (header_data[36] & 1) == 1
    nums = [int(d[1][len("Section"):]) for d in dirs if d[0] == "BodyText"]
    sections = ["BodyText/Section" + str(x) for x in sorted(nums)]
    text = ""
    for section in sections:
        bodytext = f.openstream(section)
        data = bodytext.read()
        unpacked_data = zlib.decompress(data, -15) if is_compressed else data
        i = 0
        while i < len(unpacked_data):
            header = struct.unpack_from("<I", unpacked_data, i)[0]
            rec_type = header & 0x3ff
            rec_len = (header >> 20) & 0xfff
            if rec_type == 67:
                rec_data = unpacked_data[i + 4:i + 4 + rec_len]
                text += rec_data.decode('utf-16') + "\n"
            i += 4 + rec_len
    return text

def get_pdf_text(filename):
    doc = fitz.open(filename)
    return "\n".join(page.get_text() for page in doc)

def get_docx_text(filename):
    doc = Document(filename)
    return "\n".join(p.text for p in doc.paragraphs)

def get_doc_text(filename):
    try:
        result = subprocess.run(['catdoc', filename], capture_output=True, text=True)
        if result.returncode == 0:
            return result.stdout
        else:
            raise Exception(result.stderr)
    except FileNotFoundError:
        raise Exception("❌ 'catdoc' 명령어가 없습니다. 'sudo apt install catdoc'로 설치하세요.")

def get_hwpx_text(filename):
    text = ""
    with zipfile.ZipFile(filename, 'r') as z:
        with z.open('Contents/section0.xml') as f:
            tree = ET.parse(f)
            root = tree.getroot()
            for elem in root.iter():
                if elem.tag.endswith('t'):
                    text += (elem.text or '') + '\n'
    return text.strip()

# def convert_documents(input_dir, output_dir):
#     os.makedirs(output_dir, exist_ok=True)
#     for filename in os.listdir(input_dir):
#         filepath = os.path.join(input_dir, filename)
#         name, ext = os.path.splitext(filename.lower())

#         try:
#             if ext == '.hwp':
#                 text = get_hwp_text(filepath)
#             elif ext == '.pdf':
#                 text = get_pdf_text(filepath)
#             elif ext == '.docx':
#                 text = get_docx_text(filepath)
#             elif ext == '.doc':
#                 text = get_doc_text(filepath)
#             elif ext == '.hwpx':
#                 text = get_hwpx_text(filepath)
#             else:
#                 continue

#             output_path = os.path.join(output_dir, name + ".txt")
#             with open(output_path, "w", encoding="utf-8") as f:
#                 f.write(text)
#             print(f"✅ Converted: {filename} -> {output_path}")

#         except Exception as e:
#             print(f"❌ Failed to convert {filename}: {e}")

def convert_documents(input_dir, output_dir):
    os.makedirs(output_dir, exist_ok=True)
    failed_files = []  # 실패한 파일 목록 저장용

    for filename in os.listdir(input_dir):
        filepath = os.path.join(input_dir, filename)
        name, ext = os.path.splitext(filename)
        name = name.lower()

        try:
            if ext == '.hwp':
                text = get_hwp_text(filepath)
            elif ext == '.pdf':
                text = get_pdf_text(filepath)
            elif ext == '.docx':
                text = get_docx_text(filepath)
            elif ext == '.doc':
                text = get_doc_text(filepath)
            elif ext == '.hwpx':
                text = get_hwpx_text(filepath)
            else:
                continue

            base_output_path = os.path.join(output_dir, name)
            save_split_text(text, base_output_path)
            print(f"✅ Converted: {filename} -> {base_output_path}*.txt")

        except Exception as e:
            failed_files.append((filename, str(e)))
            print(f"❌ Failed to convert {filename}: {e}")

    # 실패한 파일 목록 출력
    if failed_files:
        print("\n📛 변환 실패 파일 목록:")
        for fname, reason in failed_files:
            print(f"- {fname}: {reason}")

        # 실패 목록 파일로 저장
        with open(os.path.join(output_dir, "failed_files.txt"), "w", encoding="utf-8") as f:
            for fname, reason in failed_files:
                f.write(f"{fname}\t{reason}\n")
        print(f"\n📄 실패 목록 저장됨: {os.path.join(output_dir, 'failed_files.txt')}")
    else:
        print("\n✅ 모든 파일이 성공적으로 변환되었습니다.")


def save_split_text(text, base_path, chunk_size=2000, overlap=200):
    length = len(text)
    if length <= chunk_size:
        with open(base_path + ".txt", "w", encoding="utf-8") as f:
            f.write(text)
    else:
        start = 0
        part_num = 1
        while start < length:
            end = start + chunk_size
            chunk = text[start:end]
            filename = f"{base_path}_{part_num:04}.txt"
            with open(filename, "w", encoding="utf-8") as f:
                f.write(chunk)
            part_num += 1
            start = end - overlap  # 겹치는 부분            
            
# 실행
folder_pairs = [
    ("./data/law_infection", "./data/law_infection_txt"),
    #("./data/policy", "./data/policy_txt"),
]

for input_dir, output_dir in folder_pairs:
    convert_documents(input_dir, output_dir)