Marker 源码解析(一)(2)https://developer.aliyun.com/article/1483778
.\marker\marker\cleaners\table.py
# 从 marker.bbox 模块中导入 merge_boxes 函数 # 从 marker.schema 模块中导入 Line, Span, Block, Page 类 # 从 copy 模块中导入 deepcopy 函数 # 从 tabulate 模块中导入 tabulate 函数 # 从 typing 模块中导入 List 类型 # 导入 re 模块 # 导入 textwrap 模块 from marker.bbox import merge_boxes from marker.schema import Line, Span, Block, Page from copy import deepcopy from tabulate import tabulate from typing import List import re import textwrap # 合并表格块 def merge_table_blocks(blocks: List[Page]): # 初始化当前行列表和当前边界框 current_lines = [] current_bbox = None # 遍历每一页 for page in blocks: new_page_blocks = [] pnum = page.pnum # 遍历每个块 for block in page.blocks: # 如果块的最常见类型不是表格 if block.most_common_block_type() != "Table": # 如果当前行列表不为空 if len(current_lines) > 0: # 创建新的块对象,包含当前行列表和当前页码 new_block = Block( lines=deepcopy(current_lines), pnum=pnum, bbox=current_bbox ) new_page_blocks.append(new_block) current_lines = [] current_bbox = None # 将当前块添加到新页块列表中 new_page_blocks.append(block) continue # 将块的行添加到当前行列表中 current_lines.extend(block.lines) # 如果当前边界框为空,则设置为块的边界框,否则合并边界框 if current_bbox is None: current_bbox = block.bbox else: current_bbox = merge_boxes(current_bbox, block.bbox) # 如果当前行列表不为空 if len(current_lines) > 0: # 创建新的块对象,包含当前行列表和当前页码 new_block = Block( lines=deepcopy(current_lines), pnum=pnum, bbox=current_bbox ) new_page_blocks.append(new_block) current_lines = [] current_bbox = None # 更新当前页的块列表 page.blocks = new_page_blocks # 创建新的表格 def create_new_tables(blocks: List[Page]): # 初始化表格索引和正则表达式模式 table_idx = 0 dot_pattern = re.compile(r'(\s*\.\s*){4,}') dot_multiline_pattern = re.compile(r'.*(\s*\.\s*){4,}.*', re.DOTALL) # 遍历每一页中的文本块 for page in blocks: # 遍历每个文本块中的块 for block in page.blocks: # 如果块类型不是表格或者行数小于3,则跳过 if block.most_common_block_type() != "Table" or len(block.lines) < 3: continue # 初始化表格行列表和y坐标 table_rows = [] y_coord = None row = [] # 遍历每行文本 for line in block.lines: # 遍历每个文本块 for span in line.spans: # 如果y坐标不同于当前span的起始y坐标 if y_coord != span.y_start: # 如果当前行有内容,则添加到表格行列表中 if len(row) > 0: table_rows.append(row) row = [] y_coord = span.y_start # 获取文本内容并处理多行文本 text = span.text if dot_multiline_pattern.match(text): text = dot_pattern.sub(' ', text) row.append(text) # 如果当前行有内容,则添加到表格行列表中 if len(row) > 0: table_rows.append(row) # 如果表格行字符总长度大于300,或者第一行列数大于8或小于2,则跳过 if max([len("".join(r)) for r in table_rows]) > 300 or len(table_rows[0]) > 8 or len(table_rows[0]) < 2: continue # 格式化表格行数据并创建新的Span和Line对象 new_text = tabulate(table_rows, headers="firstrow", tablefmt="github") new_span = Span( bbox=block.bbox, span_id=f"{table_idx}_fix_table", font="Table", color=0, block_type="Table", text=new_text ) new_line = Line( bbox=block.bbox, spans=[new_span] ) # 替换原有文本块的行为新的行 block.lines = [new_line] table_idx += 1 # 返回处理过的表格数量 return table_idx
.\marker\marker\convert.py
# 导入所需的库 import fitz as pymupdf # 导入自定义模块 from marker.cleaners.table import merge_table_blocks, create_new_tables from marker.debug.data import dump_bbox_debug_data from marker.extract_text import get_text_blocks from marker.cleaners.headers import filter_header_footer, filter_common_titles from marker.cleaners.equations import replace_equations from marker.ordering import order_blocks from marker.postprocessors.editor import edit_full_text from marker.segmentation import detect_document_block_types from marker.cleaners.code import identify_code_blocks, indent_blocks from marker.cleaners.bullets import replace_bullets from marker.markdown import merge_spans, merge_lines, get_full_text from marker.schema import Page, BlockType from typing import List, Dict, Tuple, Optional import re import magic from marker.settings import settings # 定义函数,根据文件路径获取文件类型 def find_filetype(fpath): # 获取文件的 MIME 类型 mimetype = magic.from_file(fpath).lower() # 根据 MIME 类型判断文件类型 if "pdf" in mimetype: return "pdf" elif "epub" in mimetype: return "epub" elif "mobi" in mimetype: return "mobi" elif mimetype in settings.SUPPORTED_FILETYPES: return settings.SUPPORTED_FILETYPES[mimetype] else: # 输出非标准文件类型信息 print(f"Found nonstandard filetype {mimetype}") return "other" # 定义函数,为文本块添加标注 def annotate_spans(blocks: List[Page], block_types: List[BlockType]): for i, page in enumerate(blocks): page_block_types = block_types[i] page.add_block_types(page_block_types) # 定义函数,获取文本文件的长度 def get_length_of_text(fname: str) -> int: # 获取文件类型 filetype = find_filetype(fname) # 如果文件类型为其他,则返回长度为0 if filetype == "other": return 0 # 使用 pymupdf 打开文件 doc = pymupdf.open(fname, filetype=filetype) full_text = "" # 遍历每一页,获取文本内容并拼接 for page in doc: full_text += page.get_text("text", sort=True, flags=settings.TEXT_FLAGS) return len(full_text) def convert_single_pdf( fname: str, # 定义函数,将单个 PDF 文件转换为文本 model_lst: List, # 模型列表 max_pages=None, # 最大页数,默认为 None metadata: Optional[Dict]=None, # 元数据,默认为 None parallel_factor: int = 1 # 并行因子,默认为 1 ) -> Tuple[str, Dict]: # 返回类型为元组,包含字符串和字典 lang = settings.DEFAULT_LANG # 设置默认语言为系统默认语言 if metadata: # 如果有元数据 lang = metadata.get("language", settings.DEFAULT_LANG) # 获取元数据中的语言信息,如果不存在则使用系统默认语言 # 使用 Tesseract 语言,如果可用 tess_lang = settings.TESSERACT_LANGUAGES.get(lang, "eng") # 获取 Tesseract 语言设置 spell_lang = settings.SPELLCHECK_LANGUAGES.get(lang, None) # 获取拼写检查语言设置 if "eng" not in tess_lang: # 如果英语不在 Tesseract 语言中 tess_lang = f"eng+{tess_lang}" # 添加英语到 Tesseract 语言中 # 输出元数据 out_meta = {"language": lang} # 设置输出元数据的语言信息 filetype = find_filetype(fname) # 查找文件类型 if filetype == "other": # 如果文件类型为其他 return "", out_meta # 返回空字符串和输出元数据 out_meta["filetype"] = filetype # 设置输出元数据的文件类型 doc = pymupdf.open(fname, filetype=filetype) # 打开文件 if filetype != "pdf": # 如果文件类型不是 PDF conv = doc.convert_to_pdf() # 将文件转换为 PDF 格式 doc = pymupdf.open("pdf", conv) # 打开 PDF 文件 blocks, toc, ocr_stats = get_text_blocks( doc, tess_lang, spell_lang, max_pages=max_pages, parallel=int(parallel_factor * settings.OCR_PARALLEL_WORKERS) ) # 获取文本块、目录和 OCR 统计信息 out_meta["toc"] = toc # 设置输出元数据的目录信息 out_meta["pages"] = len(blocks) # 设置输出元数据的页数 out_meta["ocr_stats"] = ocr_stats # 设置输出元数据的 OCR 统计信息 if len([b for p in blocks for b in p.blocks]) == 0: # 如果没有提取到任何文本块 print(f"Could not extract any text blocks for {fname}") # 打印无法提取文本块的消息 return "", out_meta # 返回空字符串和输出元数据 # 解包模型列表 texify_model, layoutlm_model, order_model, edit_model = model_lst # 解包模型列表 block_types = detect_document_block_types( doc, blocks, layoutlm_model, batch_size=int(settings.LAYOUT_BATCH_SIZE * parallel_factor) ) # 检测文档的块类型 # 查找页眉和页脚 bad_span_ids = filter_header_footer(blocks) # 过滤页眉和页脚 out_meta["block_stats"] = {"header_footer": len(bad_span_ids)} # 设置输出元数据的块统计信息 annotate_spans(blocks, block_types) # 标注文本块 # 如果设置了标志,则转储调试数据 dump_bbox_debug_data(doc, blocks) # 转储边界框调试数据 # 根据指定的参数对文档中的块进行排序 blocks = order_blocks( doc, blocks, order_model, batch_size=int(settings.ORDERER_BATCH_SIZE * parallel_factor) ) # 识别代码块数量并更新元数据 code_block_count = identify_code_blocks(blocks) out_meta["block_stats"]["code"] = code_block_count # 缩进代码块 indent_blocks(blocks) # 合并表格块 merge_table_blocks(blocks) # 创建新的表格块并更新元数据 table_count = create_new_tables(blocks) out_meta["block_stats"]["table"] = table_count # 遍历每个页面的块 for page in blocks: for block in page.blocks: # 过滤掉坏的 span id block.filter_spans(bad_span_ids) # 过滤掉坏的 span 类型 block.filter_bad_span_types() # 替换方程式并更新元数据 filtered, eq_stats = replace_equations( doc, blocks, block_types, texify_model, batch_size=int(settings.TEXIFY_BATCH_SIZE * parallel_factor) ) out_meta["block_stats"]["equations"] = eq_stats # 复制以避免更改原始数据 merged_lines = merge_spans(filtered) text_blocks = merge_lines(merged_lines, filtered) text_blocks = filter_common_titles(text_blocks) full_text = get_full_text(text_blocks) # 处理被连接的空块 full_text = re.sub(r'\n{3,}', '\n\n', full_text) full_text = re.sub(r'(\n\s){3,}', '\n\n', full_text) # 用 - 替换项目符号字符 full_text = replace_bullets(full_text) # 使用编辑器模型后处理文本 full_text, edit_stats = edit_full_text( full_text, edit_model, batch_size=settings.EDITOR_BATCH_SIZE * parallel_factor ) out_meta["postprocess_stats"] = {"edit": edit_stats} # 返回处理后的文本和元数据 return full_text, out_meta
.\marker\marker\debug\data.py
import base64 import json import os import zlib from typing import List from marker.schema import Page from marker.settings import settings from PIL import Image import io # 定义一个函数,用于将公式的调试数据转储到文件中 def dump_equation_debug_data(doc, images, converted_spans): # 如果未设置调试数据文件夹或调试级别为0,则直接返回 if not settings.DEBUG_DATA_FOLDER or settings.DEBUG_LEVEL == 0: return # 如果图片列表为空,则直接返回 if len(images) == 0: return # 断言每个图片都有对应的转换结果 assert len(converted_spans) == len(images) data_lines = [] # 遍历图片和对应的转换结果 for idx, (pil_image, converted_span) in enumerate(zip(images, converted_spans)): # 如果转换结果为空,则跳过当前图片 if converted_span is None: continue # 将 PIL 图像保存为 BytesIO 对象 img_bytes = io.BytesIO() pil_image.save(img_bytes, format="WEBP", lossless=True) # 将图片数据进行 base64 编码 b64_image = base64.b64encode(img_bytes.getvalue()).decode("utf-8") # 将图片数据、转换后的文本和边界框信息添加到数据行中 data_lines.append({ "image": b64_image, "text": converted_span.text, "bbox": converted_span.bbox }) # 从文档名称中去除扩展名 doc_base = os.path.basename(doc.name).rsplit(".", 1)[0] # 构建调试数据文件路径 debug_file = os.path.join(settings.DEBUG_DATA_FOLDER, f"{doc_base}_equations.json") # 将数据行写入到 JSON 文件中 with open(debug_file, "w+") as f: json.dump(data_lines, f) # 定义一个函数,用于将边界框的调试数据转储到文件中 def dump_bbox_debug_data(doc, blocks: List[Page]): # 如果未设置调试数据文件夹或调试级别小于2,则直接返回 if not settings.DEBUG_DATA_FOLDER or settings.DEBUG_LEVEL < 2: return # 从文档名称中去除扩展名 doc_base = os.path.basename(doc.name).rsplit(".", 1)[0] # 构建调试数据文件路径 debug_file = os.path.join(settings.DEBUG_DATA_FOLDER, f"{doc_base}_bbox.json") debug_data = [] # 遍历每个页面的块索引和块数据 for idx, page_blocks in enumerate(blocks): # 获取当前页面对象 page = doc[idx] # 获取页面的像素图像 pix = page.get_pixmap(dpi=settings.TEXIFY_DPI, annots=False, clip=page_blocks.bbox) # 将像素图像转换为 PNG 格式的字节流 png = pix.pil_tobytes(format="PNG") # 从 PNG 字节流创建图像对象 png_image = Image.open(io.BytesIO(png)) # 获取图像的宽度和高度 width, height = png_image.size # 设置最大尺寸 max_dimension = 6000 # 如果图像宽度或高度超过最大尺寸 if width > max_dimension or height > max_dimension: # 计算缩放因子 scaling_factor = min(max_dimension / width, max_dimension / height) # 缩放图像 png_image = png_image.resize((int(width * scaling_factor), int(height * scaling_factor)), Image.ANTIALIAS) # 创建一个字节流对象 img_bytes = io.BytesIO() # 将图像以 WEBP 格式保存到字节流中 png_image.save(img_bytes, format="WEBP", lossless=True, quality=100) # 将字节流编码为 base64 字符串 b64_image = base64.b64encode(img_bytes.getvalue()).decode("utf-8") # 获取页面块的模型数据 page_data = page_blocks.model_dump() # 将图像数据添加到页面数据中 page_data["image"] = b64_image # 将页面数据添加到调试数据列表中 debug_data.append(page_data) # 将调试数据以 JSON 格式写入调试文件 with open(debug_file, "w+") as f: json.dump(debug_data, f)
.\marker\marker\extract_text.py
# 导入所需的模块 import os from typing import Tuple, List, Optional # 导入拼写检查器 SpellChecker from spellchecker import SpellChecker # 导入正确旋转的边界框函数 from marker.bbox import correct_rotation # 导入整页 OCR 函数 from marker.ocr.page import ocr_entire_page # 导入检测不良 OCR 的工具函数和字体标志分解器 from marker.ocr.utils import detect_bad_ocr, font_flags_decomposer # 导入设置模块中的设置 from marker.settings import settings # 导入 Span, Line, Block, Page 数据结构 from marker.schema import Span, Line, Block, Page # 导入线程池执行器 from concurrent.futures import ThreadPoolExecutor # 设置环境变量 TESSDATA_PREFIX 为设置模块中的 TESSDATA_PREFIX os.environ["TESSDATA_PREFIX"] = settings.TESSDATA_PREFIX # 根据垂直分组对旋转文本进行排序 def sort_rotated_text(page_blocks, tolerance=1.25): vertical_groups = {} for block in page_blocks: group_key = round(block.bbox[1] / tolerance) * tolerance if group_key not in vertical_groups: vertical_groups[group_key] = [] vertical_groups[group_key].append(block) # 对每个组进行水平排序,并将组展平为一个列表 sorted_page_blocks = [] for _, group in sorted(vertical_groups.items()): sorted_group = sorted(group, key=lambda x: x.bbox[0]) sorted_page_blocks.extend(sorted_group) return sorted_page_blocks # 获取单个页面的块信息 def get_single_page_blocks(doc, pnum: int, tess_lang: str, spellchecker: Optional[SpellChecker] = None, ocr=False) -> Tuple[List[Block], int]: # 获取文档中指定页码的页面 page = doc[pnum] # 获取页面的旋转角度 rotation = page.rotation # 如果需要进行 OCR if ocr: # 对整个页面进行 OCR,使用指定的语言和拼写检查器 blocks = ocr_entire_page(page, tess_lang, spellchecker) else: # 否则,获取页面的文本块信息,按照设置中的标志进行排序 blocks = page.get_text("dict", sort=True, flags=settings.TEXT_FLAGS)["blocks"] # 初始化页面块列表和跨度 ID page_blocks = [] span_id = 0 # 遍历每个块的索引和块内容 for block_idx, block in enumerate(blocks): # 初始化存储每个块中行的列表 block_lines = [] # 遍历每个块中的行 for l in block["lines"]: # 初始化存储每个行中span的列表 spans = [] # 遍历每个span for i, s in enumerate(l["spans"]): # 获取span的文本内容和边界框 block_text = s["text"] bbox = s["bbox"] # 创建Span对象,包括文本内容、边界框、span id、字体和颜色等信息 span_obj = Span( text=block_text, bbox=correct_rotation(bbox, page), span_id=f"{pnum}_{span_id}", font=f"{s['font']}_{font_flags_decomposer(s['flags'])}", # 在字体后面添加字体标志 color=s["color"], ascender=s["ascender"], descender=s["descender"], ) spans.append(span_obj) # 将span对象添加到spans列表中 span_id += 1 # 创建Line对象,包括spans列表和边界框 line_obj = Line( spans=spans, bbox=correct_rotation(l["bbox"], page), ) # 只选择有效的行,即边界框面积大于0的行 if line_obj.area > 0: block_lines.append(line_obj) # 将有效的行添加到block_lines列表中 # 创建Block对象,包括lines列表和边界框 block_obj = Block( lines=block_lines, bbox=correct_rotation(block["bbox"], page), pnum=pnum ) # 只选择包含多行的块 if len(block_lines) > 0: page_blocks.append(block_obj) # 将包含多行的块添加到page_blocks列表中 # 如果页面被旋转,重新对文本进行排序 if rotation > 0: page_blocks = sort_rotated_text(page_blocks) return page_blocks # 返回处理后的页面块列表 # 将单个页面转换为文本块,进行 OCR 处理 def convert_single_page(doc, pnum, tess_lang: str, spell_lang: Optional[str], no_text: bool, disable_ocr: bool = False, min_ocr_page: int = 2): # 初始化变量用于记录 OCR 页面数量、成功次数和失败次数 ocr_pages = 0 ocr_success = 0 ocr_failed = 0 spellchecker = None # 获取当前页面的边界框 page_bbox = doc[pnum].bound() # 如果指定了拼写检查语言,则创建拼写检查器对象 if spell_lang: spellchecker = SpellChecker(language=spell_lang) # 获取单个页面的文本块 blocks = get_single_page_blocks(doc, pnum, tess_lang, spellchecker) # 创建页面对象,包含文本块、页面编号和边界框 page_obj = Page(blocks=blocks, pnum=pnum, bbox=page_bbox) # 判断是否需要对页面进行 OCR 处理 conditions = [ ( no_text # 全文本为空,需要进行完整 OCR 处理 or (len(page_obj.prelim_text) > 0 and detect_bad_ocr(page_obj.prelim_text, spellchecker)) # OCR 处理不佳 ), min_ocr_page < pnum < len(doc) - 1, not disable_ocr ] if all(conditions) or settings.OCR_ALL_PAGES: # 获取当前页面对象 page = doc[pnum] # 获取包含 OCR 处理的文本块 blocks = get_single_page_blocks(doc, pnum, tess_lang, spellchecker, ocr=True) # 创建包含 OCR 处理的页面对象,包含文本块、页面编号、边界框和旋转信息 page_obj = Page(blocks=blocks, pnum=pnum, bbox=page_bbox, rotation=page.rotation) ocr_pages = 1 if len(blocks) == 0: ocr_failed = 1 else: ocr_success = 1 # 返回页面对象和 OCR 处理结果统计信息 return page_obj, {"ocr_pages": ocr_pages, "ocr_failed": ocr_failed, "ocr_success": ocr_success} # 获取文本块列表 def get_text_blocks(doc, tess_lang: str, spell_lang: Optional[str], max_pages: Optional[int] = None, parallel: int = settings.OCR_PARALLEL_WORKERS): all_blocks = [] # 获取文档的目录 toc = doc.get_toc() ocr_pages = 0 ocr_failed = 0 ocr_success = 0 # 这是一个线程,因为大部分工作在一个单独的进程中进行(tesseract) range_end = len(doc) # 判断是否全文本为空 no_text = len(naive_get_text(doc).strip()) == 0 # 如果指定了最大页面数,则限制范围 if max_pages: range_end = min(max_pages, len(doc)) # 使用线程池执行并行任务,最大工作线程数为 parallel with ThreadPoolExecutor(max_workers=parallel) as pool: # 生成参数列表,包含文档、页数、Tesseract语言、拼写语言、是否无文本的元组 args_list = [(doc, pnum, tess_lang, spell_lang, no_text) for pnum in range(range_end)] # 根据并行数选择使用 map 函数或线程池的 map 函数 if parallel == 1: func = map else: func = pool.map # 执行函数并获取结果 results = func(lambda a: convert_single_page(*a), args_list) # 遍历结果 for result in results: # 获取页面对象和 OCR 统计信息 page_obj, ocr_stats = result # 将页面对象添加到所有块列表中 all_blocks.append(page_obj) # 更新 OCR 页面数、失败数和成功数 ocr_pages += ocr_stats["ocr_pages"] ocr_failed += ocr_stats["ocr_failed"] ocr_success += ocr_stats["ocr_success"] # 返回所有块列表、目录和 OCR 统计信息 return all_blocks, toc, {"ocr_pages": ocr_pages, "ocr_failed": ocr_failed, "ocr_success": ocr_success} # 定义一个函数,用于从文档中提取文本内容 def naive_get_text(doc): # 初始化一个空字符串,用于存储提取的文本内容 full_text = "" # 遍历文档中的每一页 for page in doc: # 获取当前页的文本内容,并按照指定的参数进行排序和处理 full_text += page.get_text("text", sort=True, flags=settings.TEXT_FLAGS) # 在每一页的文本内容后添加换行符 full_text += "\n" # 返回整个文档的文本内容 return full_text
Marker 源码解析(一)(4)https://developer.aliyun.com/article/1483785