.\marker\benchmark.py
import argparse import tempfile import time from collections import defaultdict from tqdm import tqdm from marker.convert import convert_single_pdf from marker.logger import configure_logging from marker.models import load_all_models from marker.benchmark.scoring import score_text from marker.extract_text import naive_get_text import json import os import subprocess import shutil import fitz as pymupdf from tabulate import tabulate # 配置日志记录 configure_logging() # 定义函数,使用 Nougat 进行预测 def nougat_prediction(pdf_filename, batch_size=1): # 创建临时目录 out_dir = tempfile.mkdtemp() # 运行 Nougat 命令行工具进行预测 subprocess.run(["nougat", pdf_filename, "-o", out_dir, "--no-skipping", "--recompute", "--batchsize", str(batch_size)], check=True) # 获取生成的 Markdown 文件 md_file = os.listdir(out_dir)[0] with open(os.path.join(out_dir, md_file), "r") as f: data = f.read() # 删除临时目录 shutil.rmtree(out_dir) return data # 主函数 def main(): # 创建参数解析器 parser = argparse.ArgumentParser(description="Benchmark PDF to MD conversion. Needs source pdfs, and a refernece folder with the correct markdown.") # 添加参数:输入 PDF 文件夹 parser.add_argument("in_folder", help="Input PDF files") # 添加参数:参考 Markdown 文件夹 parser.add_argument("reference_folder", help="Reference folder with reference markdown files") # 添加参数:输出文件名 parser.add_argument("out_file", help="Output filename") # 添加参数:是否运行 Nougat 并比较 parser.add_argument("--nougat", action="store_true", help="Run nougat and compare", default=False) # 添加参数:Nougat 批处理大小,默认为 1 parser.add_argument("--nougat_batch_size", type=int, default=1, help="Batch size to use for nougat when making predictions.") # 添加参数:Marker 并行因子,默认为 1 parser.add_argument("--marker_parallel_factor", type=int, default=1, help="How much to multiply default parallel OCR workers and model batch sizes by.") # 添加参数:生成的 Markdown 文件输出路径 parser.add_argument("--md_out_path", type=str, default=None, help="Output path for generated markdown files") # 解析参数 args = parser.parse_args() # 定义方法列表 methods = ["naive", "marker"] if args.nougat: methods.append("nougat") # 加载所有模型 model_lst = load_all_models() # 初始化得分字典 scores = defaultdict(dict) # 获取指定文件夹中的所有文件列表 benchmark_files = os.listdir(args.in_folder) # 筛选出以".pdf"结尾的文件列表 benchmark_files = [b for b in benchmark_files if b.endswith(".pdf")] # 初始化存储时间信息的字典 times = defaultdict(dict) # 初始化存储页数信息的字典 pages = defaultdict(int) # 遍历每个 PDF 文件 for fname in tqdm(benchmark_files): # 生成对应的 markdown 文件名 md_filename = fname.rsplit(".", 1)[0] + ".md" # 获取参考文件的路径并读取内容 reference_filename = os.path.join(args.reference_folder, md_filename) with open(reference_filename, "r") as f: reference = f.read() # 获取 PDF 文件的路径并打开 pdf_filename = os.path.join(args.in_folder, fname) doc = pymupdf.open(pdf_filename) # 记录该 PDF 文件的页数 pages[fname] = len(doc) # 遍历不同的方法 for method in methods: start = time.time() # 根据不同方法进行处理 if method == "marker": full_text, out_meta = convert_single_pdf(pdf_filename, model_lst, parallel_factor=args.marker_parallel_factor) elif method == "nougat": full_text = nougat_prediction(pdf_filename, batch_size=args.nougat_batch_size) elif method == "naive": full_text = naive_get_text(doc) else: raise ValueError(f"Unknown method {method}") # 计算处理时间并记录 times[method][fname] = time.time() - start # 计算得分并记录 score = score_text(full_text, reference) scores[method][fname] = score # 如果指定了 markdown 输出路径,则将处理结果写入文件 if args.md_out_path: md_out_filename = f"{method}_{md_filename}" with open(os.path.join(args.md_out_path, md_out_filename), "w+") as f: f.write(full_text) # 计算总页数 total_pages = sum(pages.values()) # 打开输出文件,以写入模式打开,如果文件不存在则创建 with open(args.out_file, "w+") as f: # 创建一个默认字典,用于存储数据 write_data = defaultdict(dict) # 遍历每个方法 for method in methods: # 计算每个方法的总时间 total_time = sum(times[method].values()) # 为每个文件创建统计信息字典 file_stats = { fname: { "time": times[method][fname], "score": scores[method][fname], "pages": pages[fname] } for fname in benchmark_files } # 将文件统计信息和方法的平均分数、每页时间、每个文档时间存储到 write_data 中 write_data[method] = { "files": file_stats, "avg_score": sum(scores[method].values()) / len(scores[method]), "time_per_page": total_time / total_pages, "time_per_doc": total_time / len(scores[method]) } # 将 write_data 写入到输出文件中,格式化为 JSON 格式,缩进为 4 json.dump(write_data, f, indent=4) # 创建两个空列表用于存储汇总表和分数表 summary_table = [] score_table = [] # 分数表的表头为 benchmark_files score_headers = benchmark_files # 遍历每个方法 for method in methods: # 将方法、平均分数、每页时间、每个文档时间添加到汇总表中 summary_table.append([method, write_data[method]["avg_score"], write_data[method]["time_per_page"], write_data[method]["time_per_doc"]]) # 将方法和每个文件的分数添加到分数表中 score_table.append([method, *[write_data[method]["files"][h]["score"] for h in score_headers]]) # 打印汇总表,包括方法、平均分数、每页时间、每个文档时间 print(tabulate(summary_table, headers=["Method", "Average Score", "Time per page", "Time per document"])) print("") print("Scores by file") # 打印分数表,包括方法和每个文件的分数 print(tabulate(score_table, headers=["Method", *score_headers])) # 如果当前脚本被直接执行,则调用主函数 if __name__ == "__main__": main()
.\marker\chunk_convert.py
# 导入 argparse 模块,用于解析命令行参数 import argparse # 导入 subprocess 模块,用于执行外部命令 import subprocess # 定义主函数 def main(): # 创建 ArgumentParser 对象,设置描述信息 parser = argparse.ArgumentParser(description="Convert a folder of PDFs to a folder of markdown files in chunks.") # 添加命令行参数,指定输入文件夹路径 parser.add_argument("in_folder", help="Input folder with pdfs.") # 添加命令行参数,指定输出文件夹路径 parser.add_argument("out_folder", help="Output folder") # 解析命令行参数 args = parser.parse_args() # 构造要执行的 shell 命令 cmd = f"./chunk_convert.sh {args.in_folder} {args.out_folder}" # 执行 shell 脚本 subprocess.run(cmd, shell=True, check=True) # 如果当前脚本作为主程序运行,则调用主函数 if __name__ == "__main__": main()
.\marker\convert.py
# 导入必要的库 import argparse import os from typing import Dict, Optional import ray from tqdm import tqdm import math # 导入自定义模块 from marker.convert import convert_single_pdf, get_length_of_text from marker.models import load_all_models from marker.settings import settings from marker.logger import configure_logging import traceback import json # 配置日志记录 configure_logging() # 定义一个远程函数,用于处理单个 PDF 文件 @ray.remote(num_cpus=settings.RAY_CORES_PER_WORKER, num_gpus=.05 if settings.CUDA else 0) def process_single_pdf(fname: str, out_folder: str, model_refs, metadata: Optional[Dict] = None, min_length: Optional[int] = None): # 构建输出文件名和元数据文件名 out_filename = fname.rsplit(".", 1)[0] + ".md" out_filename = os.path.join(out_folder, os.path.basename(out_filename)) out_meta_filename = out_filename.rsplit(".", 1)[0] + "_meta.json" # 如果输出文件已存在,则直接返回 if os.path.exists(out_filename): return try: # 如果指定了最小文本长度,检查文件文本长度是否符合要求 if min_length: length = get_length_of_text(fname) if length < min_length: return # 转换 PDF 文件为 Markdown 格式,并获取转换后的文本和元数据 full_text, out_metadata = convert_single_pdf(fname, model_refs, metadata=metadata) # 如果转换后的文本不为空,则写入到文件中 if len(full_text.strip()) > 0: with open(out_filename, "w+", encoding='utf-8') as f: f.write(full_text) with open(out_meta_filename, "w+") as f: f.write(json.dumps(out_metadata, indent=4)) else: print(f"Empty file: {fname}. Could not convert.") except Exception as e: # 捕获异常并打印错误信息 print(f"Error converting {fname}: {e}") print(traceback.format_exc()) # 主函数 def main(): # 创建命令行参数解析器 parser = argparse.ArgumentParser(description="Convert multiple pdfs to markdown.") # 添加输入文件夹和输出文件夹参数 parser.add_argument("in_folder", help="Input folder with pdfs.") parser.add_argument("out_folder", help="Output folder") # 添加命令行参数,指定要转换的块索引 parser.add_argument("--chunk_idx", type=int, default=0, help="Chunk index to convert") # 添加命令行参数,指定并行处理的块数 parser.add_argument("--num_chunks", type=int, default=1, help="Number of chunks being processed in parallel") # 添加命令行参数,指定要转换的最大 pdf 数量 parser.add_argument("--max", type=int, default=None, help="Maximum number of pdfs to convert") # 添加命令行参数,指定要使用的工作进程数 parser.add_argument("--workers", type=int, default=5, help="Number of worker processes to use") # 添加命令行参数,指定要使用的元数据 json 文件进行过滤 parser.add_argument("--metadata_file", type=str, default=None, help="Metadata json file to use for filtering") # 添加命令行参数,指定要转换的 pdf 的最小长度 parser.add_argument("--min_length", type=int, default=None, help="Minimum length of pdf to convert") # 解析命令行参数 args = parser.parse_args() # 获取输入文件夹的绝对路径 in_folder = os.path.abspath(args.in_folder) # 获取输出文件夹的绝对路径 out_folder = os.path.abspath(args.out_folder) # 获取输入文件夹中所有文件的路径列表 files = [os.path.join(in_folder, f) for f in os.listdir(in_folder)] # 如果输出文件夹不存在,则创建输出文件夹 os.makedirs(out_folder, exist_ok=True) # 处理并行处理时的块 # 确保将所有文件放入一个块中 chunk_size = math.ceil(len(files) / args.num_chunks) start_idx = args.chunk_idx * chunk_size end_idx = start_idx + chunk_size files_to_convert = files[start_idx:end_idx] # 如果需要,限制要转换的文件数量 if args.max: files_to_convert = files_to_convert[:args.max] metadata = {} # 如果指定了元数据文件,则加载元数据 if args.metadata_file: metadata_file = os.path.abspath(args.metadata_file) with open(metadata_file, "r") as f: metadata = json.load(f) # 确定要使用的进程数 total_processes = min(len(files_to_convert), args.workers) # 初始化 Ray,设置 CPU 和 GPU 数量,存储路径等参数 ray.init( num_cpus=total_processes, num_gpus=1 if settings.CUDA else 0, storage=settings.RAY_CACHE_PATH, _temp_dir=settings.RAY_CACHE_PATH, log_to_driver=settings.DEBUG ) # 加载所有模型 model_lst = load_all_models() # 将模型列表放入 Ray 中 model_refs = ray.put(model_lst) # 根据 GPU 内存动态设置每个任务的 GPU 分配比例 gpu_frac = settings.VRAM_PER_TASK / settings.INFERENCE_RAM if settings.CUDA else 0 # 打印正在转换的 PDF 文件数量、当前处理的块索引、总块数、使用的进程数以及输出文件夹路径 print(f"Converting {len(files_to_convert)} pdfs in chunk {args.chunk_idx + 1}/{args.num_chunks} with {total_processes} processes, and storing in {out_folder}") # 为每个需要转换的 PDF 文件创建一个 Ray 任务,并指定使用的 GPU 分数 futures = [ process_single_pdf.options(num_gpus=gpu_frac).remote( filename, out_folder, model_refs, metadata=metadata.get(os.path.basename(filename)), min_length=args.min_length ) for filename in files_to_convert ] # 运行所有的 Ray 转换任务 progress_bar = tqdm(total=len(futures)) while len(futures) > 0: # 等待所有任务完成,超时时间为 7 秒 finished, futures = ray.wait( futures, timeout=7.0 ) finished_lst = ray.get(finished) # 更新进度条 if isinstance(finished_lst, list): progress_bar.update(len(finished_lst)) else: progress_bar.update(1) # 关闭 Ray 以释放资源 ray.shutdown() # 如果当前脚本被直接执行,则调用主函数 if __name__ == "__main__": main()
.\marker\convert_single.py
# 导入必要的模块 import argparse # 用于解析命令行参数 from marker.convert import convert_single_pdf # 导入 convert_single_pdf 函数 from marker.logger import configure_logging # 导入 configure_logging 函数 from marker.models import load_all_models # 导入 load_all_models 函数 import json # 导入 json 模块 # 配置日志记录 configure_logging() # 主函数 def main(): # 创建参数解析器 parser = argparse.ArgumentParser() # 添加命令行参数 parser.add_argument("filename", help="PDF file to parse") # PDF 文件名 parser.add_argument("output", help="Output file name") # 输出文件名 parser.add_argument("--max_pages", type=int, default=None, help="Maximum number of pages to parse") # 最大解析页数 parser.add_argument("--parallel_factor", type=int, default=1, help="How much to multiply default parallel OCR workers and model batch sizes by.") # 并行因子 # 解析命令行参数 args = parser.parse_args() # 获取文件名 fname = args.filename # 加载所有模型 model_lst = load_all_models() # 调用 convert_single_pdf 函数,解析 PDF 文件并返回全文和元数据 full_text, out_meta = convert_single_pdf(fname, model_lst, max_pages=args.max_pages, parallel_factor=args.parallel_factor) # 将全文写入输出文件 with open(args.output, "w+", encoding='utf-8') as f: f.write(full_text) # 生成元数据文件名 out_meta_filename = args.output.rsplit(".", 1)[0] + "_meta.json" # 将元数据写入元数据文件 with open(out_meta_filename, "w+") as f: f.write(json.dumps(out_meta, indent=4)) # 如果当前脚本被直接执行,则调用主函数 if __name__ == "__main__": main()
.\marker\marker\bbox.py
import fitz as pymupdf # 判断两个矩形框是否应该合并 def should_merge_blocks(box1, box2, tol=5): # 在 tol y 像素内,并且在右侧在 tol 像素内 merge = [ box2[0] > box1[0], # 在 x 坐标上在后面 abs(box2[1] - box1[1]) < tol, # 在 y 坐标上在 tol 像素内 abs(box2[3] - box1[3]) < tol, # 在 y 坐标上在 tol 像素内 abs(box2[0] - box1[2]) < tol, # 在 x 坐标上在 tol 像素内 ] return all(merge) # 合并两个矩形框 def merge_boxes(box1, box2): return (min(box1[0], box2[0]), min(box1[1], box2[1]), max(box2[2], box1[2]), max(box1[3], box2[3])) # 判断两个矩形框是否相交 def boxes_intersect(box1, box2): # 矩形框1与矩形框2相交 return box1[0] < box2[2] and box1[2] > box2[0] and box1[1] < box2[3] and box1[3] > box2[1] # 判断两个矩形框的相交面积占比是否大于给定百分比 def boxes_intersect_pct(box1, box2, pct=.9): # 确定相交矩形的坐标 x_left = max(box1[0], box2[0]) y_top = max(box1[1], box2[1]) x_right = min(box1[2], box2[2]) y_bottom = min(box1[3], box2[3]) if x_right < x_left or y_bottom < y_top: return 0.0 # 两个轴对齐边界框的交集始终是一个轴对齐边界框 intersection_area = (x_right - x_left) * (y_bottom - y_top) # 计算两个边界框的面积 bb1_area = (box1[2] - box1[0]) * (box1[3] - box1[1]) bb2_area = (box2[2] - box2[0]) * (box2[3] - box2[1]) iou = intersection_area / float(bb1_area + bb2_area - intersection_area) return iou > pct # 判断一个矩形框是否与多个矩形框相交 def multiple_boxes_intersect(box1, boxes): for box2 in boxes: if boxes_intersect(box1, box2): return True return False # 判断一个矩形框是否包含在另一个矩形框内 def box_contained(box1, box2): # 矩形框1在矩形框2内部 return box1[0] > box2[0] and box1[1] > box2[1] and box1[2] < box2[2] and box1[3] < box2[3] # 将归一化的矩形框坐标还原为原始坐标 def unnormalize_box(bbox, width, height): return [ width * (bbox[0] / 1000), height * (bbox[1] / 1000), width * (bbox[2] / 1000), height * (bbox[3] / 1000), ] # 修正矩形框的旋转 def correct_rotation(bbox, page): #bbox base is (x0, y0, x1, y1) # 获取页面的旋转角度 rotation = page.rotation # 如果旋转角度为0,则直接返回原始边界框 if rotation == 0: return bbox # 计算旋转后的左上角和右下角坐标 tl = pymupdf.Point(bbox[0], bbox[1]) * page.rotation_matrix br = pymupdf.Point(bbox[2], bbox[3]) * page.rotation_matrix # 根据不同的旋转角度进行边界框的调整 if rotation == 90: bbox = [br[0], tl[1], tl[0], br[1]] elif rotation == 180: bbox = [br[0], br[1], tl[0], tl[1]] elif rotation == 270: bbox = [tl[0], br[1], br[0], tl[1]] # 返回调整后的边界框 return bbox
.\marker\marker\benchmark\scoring.py
# 导入 math 模块 import math # 从 rapidfuzz 模块中导入 fuzz 和 distance 函数 from rapidfuzz import fuzz, distance # 导入 re 模块 import re # 定义最小分块字符数 CHUNK_MIN_CHARS = 25 def tokenize(text): # 定义正则表达式模式 pattern = r'([^\w\s\d\'])|([\w\']+)|(\d+)|(\n+)|( +)' # 使用正则表达式模式匹配文本 result = re.findall(pattern, text) # 将匹配结果扁平化并过滤掉空字符串 flattened_result = [item for sublist in result for item in sublist if item] return flattened_result def chunk_text(text): # 将文本按换行符分割成块 chunks = text.split("\n") # 过滤掉空白块和长度小于最小分块字符数的块 chunks = [c for c in chunks if c.strip() and len(c) > CHUNK_MIN_CHARS] return chunks def overlap_score(hypothesis_chunks, reference_chunks): # 计算长度修正因子 length_modifier = len(hypothesis_chunks) / len(reference_chunks) # 计算搜索距离 search_distance = max(len(reference_chunks) // 5, 10) chunk_scores = [] chunk_weights = [] for i, hyp_chunk in enumerate(hypothesis_chunks): max_score = 0 chunk_weight = 1 i_offset = int(i * length_modifier) chunk_range = range(max(0, i_offset-search_distance), min(len(reference_chunks), i_offset+search_distance)) for j in chunk_range: ref_chunk = reference_chunks[j] # 计算相似度得分 score = fuzz.ratio(hyp_chunk, ref_chunk, score_cutoff=30) / 100 if score > max_score: max_score = score chunk_weight = math.sqrt(len(ref_chunk)) chunk_scores.append(max_score) chunk_weights.append(chunk_weight) chunk_scores = [chunk_scores[i] * chunk_weights[i] for i in range(len(chunk_scores))] return chunk_scores, chunk_weights def score_text(hypothesis, reference): # 返回一个0-1的对齐分数 hypothesis_chunks = chunk_text(hypothesis) reference_chunks = chunk_text(reference) chunk_scores, chunk_weights = overlap_score(hypothesis_chunks, reference_chunks) return sum(chunk_scores) / sum(chunk_weights)
.\marker\marker\cleaners\bullets.py
# 导入正则表达式模块 import re # 定义函数,用于替换文本中的特殊符号为 - def replace_bullets(text): # 定义匹配特殊符号的正则表达式模式 bullet_pattern = r"(^|[\n ])[•●○■▪▫–—]( )" # 使用正则表达式替换特殊符号为 - replaced_string = re.sub(bullet_pattern, r"\1-\2", text) # 返回替换后的文本 return replaced_string
Marker 源码解析(一)(2)https://developer.aliyun.com/article/1483778