이전 글에서는 .hwp와 .pdf 파일에서 텍스트를 추출해 .txt로 저장하는 간단한 자동화 코드를 소개했었습니다. 이번에는 그 범위를 확장해 HWPX, DOC, DOCX까지 다양한 문서 포맷을 아우르는 변환기를 만들어보았습니다.
RAG를 위한 데이터 전처리에 고통받을 분들을 위해 공유해 둡니다.
✅ 지원 포맷
- .hwp (바이너리 형식의 한글 문서)
- .hwpx (XML 기반 최신 한글 문서)
- .doc (구 버전 MS 워드, catdoc 필요)
- .docx (신형 MS 워드, python-docx)
📦 사전 준비
필수 패키지 설치
pip install olefile python-docx pymupdf
sudo apt install catdoc # .doc 파일 지원용
🧠 각 포맷별 텍스트 추출 방식
1. .hwp (바이너리 한글 파일)
OLE 구조로 되어 있으며, 내부 섹션을 압축 해제해 UTF-16 문자열을 직접 추출합니다.
def get_hwp_text(filename):
...
2. .pdf
PyMuPDF을 사용하여 페이지 단위로 텍스트를 가져옵니다.
def get_pdf_text(filename):
...
3. .docx
python-docx를 활용해 단락 단위 텍스트를 추출합니다.
def get_docx_text(filename):
...
4. .doc (구형 워드 문서)
리눅스에서 catdoc 명령어를 통해 처리합니다.
def get_doc_text(filename):
...
⚠️ catdoc이 없다면 sudo apt install catdoc로 설치 필요
5. .hwpx (HWP의 XML 기반 후속 형식)
ZIP 구조 내부의 XML(Contents/section0.xml)에서 텍스트 노드 추출
def get_hwpx_text(filename):
...
🧰 전체 변환기 구조
아래 코드는 지정한 입력 디렉토리에서 위 포맷들의 문서를 찾아 .txt로 변환해 출력 디렉토리에 저장합니다.
def convert_documents(input_dir, output_dir):
...
예시 실행
folder_pairs = [
("./data/rule", "./data/rule_txt"),
("./data/policy", "./data/policy_txt"),
]
for input_dir, output_dir in folder_pairs:
convert_documents(input_dir, output_dir)
📝 마무리
이제는 .hwp, .pdf는 물론 .doc, .docx, .hwpx 파일까지도 한 번에 .txt로 변환할 수 있습니다. 다양한 정책 문서나 규정 파일을 분석할 때 특히 유용하게 활용할 수 있습니다.
전체 코드
import os
import olefile
import zlib
import struct
import fitz # PyMuPDF
from docx import Document
import subprocess
import zipfile
import xml.etree.ElementTree as ET
def get_hwp_text(filename):
f = olefile.OleFileIO(filename)
dirs = f.listdir()
if ["FileHeader"] not in dirs or ["\x05HwpSummaryInformation"] not in dirs:
raise Exception("Not Valid HWP.")
header = f.openstream("FileHeader")
header_data = header.read()
is_compressed = (header_data[36] & 1) == 1
nums = [int(d[1][len("Section"):]) for d in dirs if d[0] == "BodyText"]
sections = ["BodyText/Section" + str(x) for x in sorted(nums)]
text = ""
for section in sections:
bodytext = f.openstream(section)
data = bodytext.read()
unpacked_data = zlib.decompress(data, -15) if is_compressed else data
i = 0
while i < len(unpacked_data):
header = struct.unpack_from("<I", unpacked_data, i)[0]
rec_type = header & 0x3ff
rec_len = (header >> 20) & 0xfff
if rec_type == 67:
rec_data = unpacked_data[i + 4:i + 4 + rec_len]
text += rec_data.decode('utf-16') + "\n"
i += 4 + rec_len
return text
def get_pdf_text(filename):
doc = fitz.open(filename)
return "\n".join(page.get_text() for page in doc)
def get_docx_text(filename):
doc = Document(filename)
return "\n".join(p.text for p in doc.paragraphs)
def get_doc_text(filename):
try:
result = subprocess.run(['catdoc', filename], capture_output=True, text=True)
if result.returncode == 0:
return result.stdout
else:
raise Exception(result.stderr)
except FileNotFoundError:
raise Exception("❌ 'catdoc' 명령어가 없습니다. 'sudo apt install catdoc'로 설치하세요.")
def get_hwpx_text(filename):
text = ""
with zipfile.ZipFile(filename, 'r') as z:
with z.open('Contents/section0.xml') as f:
tree = ET.parse(f)
root = tree.getroot()
for elem in root.iter():
if elem.tag.endswith('t'):
text += (elem.text or '') + '\n'
return text.strip()
# def convert_documents(input_dir, output_dir):
# os.makedirs(output_dir, exist_ok=True)
# for filename in os.listdir(input_dir):
# filepath = os.path.join(input_dir, filename)
# name, ext = os.path.splitext(filename.lower())
# try:
# if ext == '.hwp':
# text = get_hwp_text(filepath)
# elif ext == '.pdf':
# text = get_pdf_text(filepath)
# elif ext == '.docx':
# text = get_docx_text(filepath)
# elif ext == '.doc':
# text = get_doc_text(filepath)
# elif ext == '.hwpx':
# text = get_hwpx_text(filepath)
# else:
# continue
# output_path = os.path.join(output_dir, name + ".txt")
# with open(output_path, "w", encoding="utf-8") as f:
# f.write(text)
# print(f"✅ Converted: {filename} -> {output_path}")
# except Exception as e:
# print(f"❌ Failed to convert {filename}: {e}")
def convert_documents(input_dir, output_dir):
os.makedirs(output_dir, exist_ok=True)
failed_files = [] # 실패한 파일 목록 저장용
for filename in os.listdir(input_dir):
filepath = os.path.join(input_dir, filename)
name, ext = os.path.splitext(filename)
name = name.lower()
try:
if ext == '.hwp':
text = get_hwp_text(filepath)
elif ext == '.pdf':
text = get_pdf_text(filepath)
elif ext == '.docx':
text = get_docx_text(filepath)
elif ext == '.doc':
text = get_doc_text(filepath)
elif ext == '.hwpx':
text = get_hwpx_text(filepath)
else:
continue
base_output_path = os.path.join(output_dir, name)
save_split_text(text, base_output_path)
print(f"✅ Converted: {filename} -> {base_output_path}*.txt")
except Exception as e:
failed_files.append((filename, str(e)))
print(f"❌ Failed to convert {filename}: {e}")
# 실패한 파일 목록 출력
if failed_files:
print("\n📛 변환 실패 파일 목록:")
for fname, reason in failed_files:
print(f"- {fname}: {reason}")
# 실패 목록 파일로 저장
with open(os.path.join(output_dir, "failed_files.txt"), "w", encoding="utf-8") as f:
for fname, reason in failed_files:
f.write(f"{fname}\t{reason}\n")
print(f"\n📄 실패 목록 저장됨: {os.path.join(output_dir, 'failed_files.txt')}")
else:
print("\n✅ 모든 파일이 성공적으로 변환되었습니다.")
def save_split_text(text, base_path, chunk_size=2000, overlap=200):
length = len(text)
if length <= chunk_size:
with open(base_path + ".txt", "w", encoding="utf-8") as f:
f.write(text)
else:
start = 0
part_num = 1
while start < length:
end = start + chunk_size
chunk = text[start:end]
filename = f"{base_path}_{part_num:04}.txt"
with open(filename, "w", encoding="utf-8") as f:
f.write(chunk)
part_num += 1
start = end - overlap # 겹치는 부분
# 실행
folder_pairs = [
("./data/law_infection", "./data/law_infection_txt"),
#("./data/policy", "./data/policy_txt"),
]
for input_dir, output_dir in folder_pairs:
convert_documents(input_dir, output_dir)