Marker 源码解析(一)(1)

简介: Marker 源码解析(一)

.\marker\benchmark.py

import argparse
import tempfile
import time
from collections import defaultdict
from tqdm import tqdm
from marker.convert import convert_single_pdf
from marker.logger import configure_logging
from marker.models import load_all_models
from marker.benchmark.scoring import score_text
from marker.extract_text import naive_get_text
import json
import os
import subprocess
import shutil
import fitz as pymupdf
from tabulate import tabulate
# 配置日志记录
configure_logging()
# 定义函数,使用 Nougat 进行预测
def nougat_prediction(pdf_filename, batch_size=1):
    # 创建临时目录
    out_dir = tempfile.mkdtemp()
    # 运行 Nougat 命令行工具进行预测
    subprocess.run(["nougat", pdf_filename, "-o", out_dir, "--no-skipping", "--recompute", "--batchsize", str(batch_size)], check=True)
    # 获取生成的 Markdown 文件
    md_file = os.listdir(out_dir)[0]
    with open(os.path.join(out_dir, md_file), "r") as f:
        data = f.read()
    # 删除临时目录
    shutil.rmtree(out_dir)
    return data
# 主函数
def main():
    # 创建参数解析器
    parser = argparse.ArgumentParser(description="Benchmark PDF to MD conversion.  Needs source pdfs, and a refernece folder with the correct markdown.")
    # 添加参数:输入 PDF 文件夹
    parser.add_argument("in_folder", help="Input PDF files")
    # 添加参数:参考 Markdown 文件夹
    parser.add_argument("reference_folder", help="Reference folder with reference markdown files")
    # 添加参数:输出文件名
    parser.add_argument("out_file", help="Output filename")
    # 添加参数:是否运行 Nougat 并比较
    parser.add_argument("--nougat", action="store_true", help="Run nougat and compare", default=False)
    # 添加参数:Nougat 批处理大小,默认为 1
    parser.add_argument("--nougat_batch_size", type=int, default=1, help="Batch size to use for nougat when making predictions.")
    # 添加参数:Marker 并行因子,默认为 1
    parser.add_argument("--marker_parallel_factor", type=int, default=1, help="How much to multiply default parallel OCR workers and model batch sizes by.")
    # 添加参数:生成的 Markdown 文件输出路径
    parser.add_argument("--md_out_path", type=str, default=None, help="Output path for generated markdown files")
    # 解析参数
    args = parser.parse_args()
    # 定义方法列表
    methods = ["naive", "marker"]
    if args.nougat:
        methods.append("nougat")
    # 加载所有模型
    model_lst = load_all_models()
    # 初始化得分字典
    scores = defaultdict(dict)
    # 获取指定文件夹中的所有文件列表
    benchmark_files = os.listdir(args.in_folder)
    # 筛选出以".pdf"结尾的文件列表
    benchmark_files = [b for b in benchmark_files if b.endswith(".pdf")]
    # 初始化存储时间信息的字典
    times = defaultdict(dict)
    # 初始化存储页数信息的字典
    pages = defaultdict(int)
    # 遍历每个 PDF 文件
    for fname in tqdm(benchmark_files):
        # 生成对应的 markdown 文件名
        md_filename = fname.rsplit(".", 1)[0] + ".md"
        # 获取参考文件的路径并读取内容
        reference_filename = os.path.join(args.reference_folder, md_filename)
        with open(reference_filename, "r") as f:
            reference = f.read()
        # 获取 PDF 文件的路径并打开
        pdf_filename = os.path.join(args.in_folder, fname)
        doc = pymupdf.open(pdf_filename)
        # 记录该 PDF 文件的页数
        pages[fname] = len(doc)
        # 遍历不同的方法
        for method in methods:
            start = time.time()
            # 根据不同方法进行处理
            if method == "marker":
                full_text, out_meta = convert_single_pdf(pdf_filename, model_lst, parallel_factor=args.marker_parallel_factor)
            elif method == "nougat":
                full_text = nougat_prediction(pdf_filename, batch_size=args.nougat_batch_size)
            elif method == "naive":
                full_text = naive_get_text(doc)
            else:
                raise ValueError(f"Unknown method {method}")
            # 计算处理时间并记录
            times[method][fname] = time.time() - start
            # 计算得分并记录
            score = score_text(full_text, reference)
            scores[method][fname] = score
            # 如果指定了 markdown 输出路径,则将处理结果写入文件
            if args.md_out_path:
                md_out_filename = f"{method}_{md_filename}"
                with open(os.path.join(args.md_out_path, md_out_filename), "w+") as f:
                    f.write(full_text)
    # 计算总页数
    total_pages = sum(pages.values())
    # 打开输出文件,以写入模式打开,如果文件不存在则创建
    with open(args.out_file, "w+") as f:
        # 创建一个默认字典,用于存储数据
        write_data = defaultdict(dict)
        # 遍历每个方法
        for method in methods:
            # 计算每个方法的总时间
            total_time = sum(times[method].values())
            # 为每个文件创建统计信息字典
            file_stats = {
                fname:
                {
                    "time": times[method][fname],
                    "score": scores[method][fname],
                    "pages": pages[fname]
                }
                for fname in benchmark_files
            }
            # 将文件统计信息和方法的平均分数、每页时间、每个文档时间存储到 write_data 中
            write_data[method] = {
                "files": file_stats,
                "avg_score": sum(scores[method].values()) / len(scores[method]),
                "time_per_page": total_time / total_pages,
                "time_per_doc": total_time / len(scores[method])
            }
        # 将 write_data 写入到输出文件中,格式化为 JSON 格式,缩进为 4
        json.dump(write_data, f, indent=4)
    # 创建两个空列表用于存储汇总表和分数表
    summary_table = []
    score_table = []
    # 分数表的表头为 benchmark_files
    score_headers = benchmark_files
    # 遍历每个方法
    for method in methods:
        # 将方法、平均分数、每页时间、每个文档时间添加到汇总表中
        summary_table.append([method, write_data[method]["avg_score"], write_data[method]["time_per_page"], write_data[method]["time_per_doc"]])
        # 将方法和每个文件的分数添加到分数表中
        score_table.append([method, *[write_data[method]["files"][h]["score"] for h in score_headers]])
    # 打印汇总表,包括方法、平均分数、每页时间、每个文档时间
    print(tabulate(summary_table, headers=["Method", "Average Score", "Time per page", "Time per document"]))
    print("")
    print("Scores by file")
    # 打印分数表,包括方法和每个文件的分数
    print(tabulate(score_table, headers=["Method", *score_headers]))
# 如果当前脚本被直接执行,则调用主函数
if __name__ == "__main__":
    main()

.\marker\chunk_convert.py

# 导入 argparse 模块,用于解析命令行参数
import argparse
# 导入 subprocess 模块,用于执行外部命令
import subprocess
# 定义主函数
def main():
    # 创建 ArgumentParser 对象,设置描述信息
    parser = argparse.ArgumentParser(description="Convert a folder of PDFs to a folder of markdown files in chunks.")
    # 添加命令行参数,指定输入文件夹路径
    parser.add_argument("in_folder", help="Input folder with pdfs.")
    # 添加命令行参数,指定输出文件夹路径
    parser.add_argument("out_folder", help="Output folder")
    # 解析命令行参数
    args = parser.parse_args()
    # 构造要执行的 shell 命令
    cmd = f"./chunk_convert.sh {args.in_folder} {args.out_folder}"
    # 执行 shell 脚本
    subprocess.run(cmd, shell=True, check=True)
# 如果当前脚本作为主程序运行,则调用主函数
if __name__ == "__main__":
    main()

.\marker\convert.py

# 导入必要的库
import argparse
import os
from typing import Dict, Optional
import ray
from tqdm import tqdm
import math
# 导入自定义模块
from marker.convert import convert_single_pdf, get_length_of_text
from marker.models import load_all_models
from marker.settings import settings
from marker.logger import configure_logging
import traceback
import json
# 配置日志记录
configure_logging()
# 定义一个远程函数,用于处理单个 PDF 文件
@ray.remote(num_cpus=settings.RAY_CORES_PER_WORKER, num_gpus=.05 if settings.CUDA else 0)
def process_single_pdf(fname: str, out_folder: str, model_refs, metadata: Optional[Dict] = None, min_length: Optional[int] = None):
    # 构建输出文件名和元数据文件名
    out_filename = fname.rsplit(".", 1)[0] + ".md"
    out_filename = os.path.join(out_folder, os.path.basename(out_filename))
    out_meta_filename = out_filename.rsplit(".", 1)[0] + "_meta.json"
    
    # 如果输出文件已存在,则直接返回
    if os.path.exists(out_filename):
        return
    
    try:
        # 如果指定了最小文本长度,检查文件文本长度是否符合要求
        if min_length:
            length = get_length_of_text(fname)
            if length < min_length:
                return
        
        # 转换 PDF 文件为 Markdown 格式,并获取转换后的文本和元数据
        full_text, out_metadata = convert_single_pdf(fname, model_refs, metadata=metadata)
        
        # 如果转换后的文本不为空,则写入到文件中
        if len(full_text.strip()) > 0:
            with open(out_filename, "w+", encoding='utf-8') as f:
                f.write(full_text)
            with open(out_meta_filename, "w+") as f:
                f.write(json.dumps(out_metadata, indent=4))
        else:
            print(f"Empty file: {fname}.  Could not convert.")
    except Exception as e:
        # 捕获异常并打印错误信息
        print(f"Error converting {fname}: {e}")
        print(traceback.format_exc())
# 主函数
def main():
    # 创建命令行参数解析器
    parser = argparse.ArgumentParser(description="Convert multiple pdfs to markdown.")
    
    # 添加输入文件夹和输出文件夹参数
    parser.add_argument("in_folder", help="Input folder with pdfs.")
    parser.add_argument("out_folder", help="Output folder")
    # 添加命令行参数,指定要转换的块索引
    parser.add_argument("--chunk_idx", type=int, default=0, help="Chunk index to convert")
    # 添加命令行参数,指定并行处理的块数
    parser.add_argument("--num_chunks", type=int, default=1, help="Number of chunks being processed in parallel")
    # 添加命令行参数,指定要转换的最大 pdf 数量
    parser.add_argument("--max", type=int, default=None, help="Maximum number of pdfs to convert")
    # 添加命令行参数,指定要使用的工作进程数
    parser.add_argument("--workers", type=int, default=5, help="Number of worker processes to use")
    # 添加命令行参数,指定要使用的元数据 json 文件进行过滤
    parser.add_argument("--metadata_file", type=str, default=None, help="Metadata json file to use for filtering")
    # 添加命令行参数,指定要转换的 pdf 的最小长度
    parser.add_argument("--min_length", type=int, default=None, help="Minimum length of pdf to convert")
    # 解析命令行参数
    args = parser.parse_args()
    # 获取输入文件夹的绝对路径
    in_folder = os.path.abspath(args.in_folder)
    # 获取输出文件夹的绝对路径
    out_folder = os.path.abspath(args.out_folder)
    # 获取输入文件夹中所有文件的路径列表
    files = [os.path.join(in_folder, f) for f in os.listdir(in_folder)]
    # 如果输出文件夹不存在,则创建输出文件夹
    os.makedirs(out_folder, exist_ok=True)
    # 处理并行处理时的块
    # 确保将所有文件放入一个块中
    chunk_size = math.ceil(len(files) / args.num_chunks)
    start_idx = args.chunk_idx * chunk_size
    end_idx = start_idx + chunk_size
    files_to_convert = files[start_idx:end_idx]
    # 如果需要,限制要转换的文件数量
    if args.max:
        files_to_convert = files_to_convert[:args.max]
    metadata = {}
    # 如果指定了元数据文件,则加载元数据
    if args.metadata_file:
        metadata_file = os.path.abspath(args.metadata_file)
        with open(metadata_file, "r") as f:
            metadata = json.load(f)
    # 确定要使用的进程数
    total_processes = min(len(files_to_convert), args.workers)
    # 初始化 Ray,设置 CPU 和 GPU 数量,存储路径等参数
    ray.init(
        num_cpus=total_processes,
        num_gpus=1 if settings.CUDA else 0,
        storage=settings.RAY_CACHE_PATH,
        _temp_dir=settings.RAY_CACHE_PATH,
        log_to_driver=settings.DEBUG
    )
    # 加载所有模型
    model_lst = load_all_models()
    # 将模型列表放入 Ray 中
    model_refs = ray.put(model_lst)
    # 根据 GPU 内存动态设置每个任务的 GPU 分配比例
    gpu_frac = settings.VRAM_PER_TASK / settings.INFERENCE_RAM if settings.CUDA else 0
    # 打印正在转换的 PDF 文件数量、当前处理的块索引、总块数、使用的进程数以及输出文件夹路径
    print(f"Converting {len(files_to_convert)} pdfs in chunk {args.chunk_idx + 1}/{args.num_chunks} with {total_processes} processes, and storing in {out_folder}")
    
    # 为每个需要转换的 PDF 文件创建一个 Ray 任务,并指定使用的 GPU 分数
    futures = [
        process_single_pdf.options(num_gpus=gpu_frac).remote(
            filename,
            out_folder,
            model_refs,
            metadata=metadata.get(os.path.basename(filename)),
            min_length=args.min_length
        ) for filename in files_to_convert
    ]
    # 运行所有的 Ray 转换任务
    progress_bar = tqdm(total=len(futures))
    while len(futures) > 0:
        # 等待所有任务完成,超时时间为 7 秒
        finished, futures = ray.wait(
            futures, timeout=7.0
        )
        finished_lst = ray.get(finished)
        # 更新进度条
        if isinstance(finished_lst, list):
            progress_bar.update(len(finished_lst))
        else:
            progress_bar.update(1)
    # 关闭 Ray 以释放资源
    ray.shutdown()
# 如果当前脚本被直接执行,则调用主函数
if __name__ == "__main__":
    main()

.\marker\convert_single.py

# 导入必要的模块
import argparse  # 用于解析命令行参数
from marker.convert import convert_single_pdf  # 导入 convert_single_pdf 函数
from marker.logger import configure_logging  # 导入 configure_logging 函数
from marker.models import load_all_models  # 导入 load_all_models 函数
import json  # 导入 json 模块
# 配置日志记录
configure_logging()
# 主函数
def main():
    # 创建参数解析器
    parser = argparse.ArgumentParser()
    # 添加命令行参数
    parser.add_argument("filename", help="PDF file to parse")  # PDF 文件名
    parser.add_argument("output", help="Output file name")  # 输出文件名
    parser.add_argument("--max_pages", type=int, default=None, help="Maximum number of pages to parse")  # 最大解析页数
    parser.add_argument("--parallel_factor", type=int, default=1, help="How much to multiply default parallel OCR workers and model batch sizes by.")  # 并行因子
    # 解析命令行参数
    args = parser.parse_args()
    # 获取文件名
    fname = args.filename
    # 加载所有模型
    model_lst = load_all_models()
    # 调用 convert_single_pdf 函数,解析 PDF 文件并返回全文和元数据
    full_text, out_meta = convert_single_pdf(fname, model_lst, max_pages=args.max_pages, parallel_factor=args.parallel_factor)
    # 将全文写入输出文件
    with open(args.output, "w+", encoding='utf-8') as f:
        f.write(full_text)
    # 生成元数据文件名
    out_meta_filename = args.output.rsplit(".", 1)[0] + "_meta.json"
    # 将元数据写入元数据文件
    with open(out_meta_filename, "w+") as f:
        f.write(json.dumps(out_meta, indent=4))
# 如果当前脚本被直接执行,则调用主函数
if __name__ == "__main__":
    main()

.\marker\marker\bbox.py

import fitz as pymupdf
# 判断两个矩形框是否应该合并
def should_merge_blocks(box1, box2, tol=5):
    # 在 tol y 像素内,并且在右侧在 tol 像素内
    merge = [
        box2[0] > box1[0], # 在 x 坐标上在后面
        abs(box2[1] - box1[1]) < tol, # 在 y 坐标上在 tol 像素内
        abs(box2[3] - box1[3]) < tol, # 在 y 坐标上在 tol 像素内
        abs(box2[0] - box1[2]) < tol, # 在 x 坐标上在 tol 像素内
    ]
    return all(merge)
# 合并两个矩形框
def merge_boxes(box1, box2):
    return (min(box1[0], box2[0]), min(box1[1], box2[1]), max(box2[2], box1[2]), max(box1[3], box2[3]))
# 判断两个矩形框是否相交
def boxes_intersect(box1, box2):
    # 矩形框1与矩形框2相交
    return box1[0] < box2[2] and box1[2] > box2[0] and box1[1] < box2[3] and box1[3] > box2[1]
# 判断两个矩形框的相交面积占比是否大于给定百分比
def boxes_intersect_pct(box1, box2, pct=.9):
    # 确定相交矩形的坐标
    x_left = max(box1[0], box2[0])
    y_top = max(box1[1], box2[1])
    x_right = min(box1[2], box2[2])
    y_bottom = min(box1[3], box2[3])
    if x_right < x_left or y_bottom < y_top:
        return 0.0
    # 两个轴对齐边界框的交集始终是一个轴对齐边界框
    intersection_area = (x_right - x_left) * (y_bottom - y_top)
    # 计算两个边界框的面积
    bb1_area = (box1[2] - box1[0]) * (box1[3] - box1[1])
    bb2_area = (box2[2] - box2[0]) * (box2[3] - box2[1])
    iou = intersection_area / float(bb1_area + bb2_area - intersection_area)
    return iou > pct
# 判断一个矩形框是否与多个矩形框相交
def multiple_boxes_intersect(box1, boxes):
    for box2 in boxes:
        if boxes_intersect(box1, box2):
            return True
    return False
# 判断一个矩形框是否包含在另一个矩形框内
def box_contained(box1, box2):
    # 矩形框1在矩形框2内部
    return box1[0] > box2[0] and box1[1] > box2[1] and box1[2] < box2[2] and box1[3] < box2[3]
# 将归一化的矩形框坐标还原为原始坐标
def unnormalize_box(bbox, width, height):
    return [
        width * (bbox[0] / 1000),
        height * (bbox[1] / 1000),
        width * (bbox[2] / 1000),
        height * (bbox[3] / 1000),
    ]
# 修正矩形框的旋转
def correct_rotation(bbox, page):
    #bbox base is (x0, y0, x1, y1)
    # 获取页面的旋转角度
    rotation = page.rotation
    # 如果旋转角度为0,则直接返回原始边界框
    if rotation == 0:
        return bbox
    # 计算旋转后的左上角和右下角坐标
    tl = pymupdf.Point(bbox[0], bbox[1]) * page.rotation_matrix
    br = pymupdf.Point(bbox[2], bbox[3]) * page.rotation_matrix
    # 根据不同的旋转角度进行边界框的调整
    if rotation == 90:
        bbox = [br[0], tl[1], tl[0], br[1]]
    elif rotation == 180:
        bbox = [br[0], br[1], tl[0], tl[1]]
    elif rotation == 270:
        bbox = [tl[0], br[1], br[0], tl[1]]
    # 返回调整后的边界框
    return bbox

.\marker\marker\benchmark\scoring.py

# 导入 math 模块
import math
# 从 rapidfuzz 模块中导入 fuzz 和 distance 函数
from rapidfuzz import fuzz, distance
# 导入 re 模块
import re
# 定义最小分块字符数
CHUNK_MIN_CHARS = 25
def tokenize(text):
    # 定义正则表达式模式
    pattern = r'([^\w\s\d\'])|([\w\']+)|(\d+)|(\n+)|( +)'
    # 使用正则表达式模式匹配文本
    result = re.findall(pattern, text)
    # 将匹配结果扁平化并过滤掉空字符串
    flattened_result = [item for sublist in result for item in sublist if item]
    return flattened_result
def chunk_text(text):
    # 将文本按换行符分割成块
    chunks = text.split("\n")
    # 过滤掉空白块和长度小于最小分块字符数的块
    chunks = [c for c in chunks if c.strip() and len(c) > CHUNK_MIN_CHARS]
    return chunks
def overlap_score(hypothesis_chunks, reference_chunks):
    # 计算长度修正因子
    length_modifier = len(hypothesis_chunks) / len(reference_chunks)
    # 计算搜索距离
    search_distance = max(len(reference_chunks) // 5, 10)
    chunk_scores = []
    chunk_weights = []
    for i, hyp_chunk in enumerate(hypothesis_chunks):
        max_score = 0
        chunk_weight = 1
        i_offset = int(i * length_modifier)
        chunk_range = range(max(0, i_offset-search_distance), min(len(reference_chunks), i_offset+search_distance))
        for j in chunk_range:
            ref_chunk = reference_chunks[j]
            # 计算相似度得分
            score = fuzz.ratio(hyp_chunk, ref_chunk, score_cutoff=30) / 100
            if score > max_score:
                max_score = score
                chunk_weight = math.sqrt(len(ref_chunk))
        chunk_scores.append(max_score)
        chunk_weights.append(chunk_weight)
    chunk_scores = [chunk_scores[i] * chunk_weights[i] for i in range(len(chunk_scores))]
    return chunk_scores, chunk_weights
def score_text(hypothesis, reference):
    # 返回一个0-1的对齐分数
    hypothesis_chunks = chunk_text(hypothesis)
    reference_chunks = chunk_text(reference)
    chunk_scores, chunk_weights = overlap_score(hypothesis_chunks, reference_chunks)
    return sum(chunk_scores) / sum(chunk_weights)

.\marker\marker\cleaners\bullets.py

# 导入正则表达式模块
import re
# 定义函数,用于替换文本中的特殊符号为 -
def replace_bullets(text):
    # 定义匹配特殊符号的正则表达式模式
    bullet_pattern = r"(^|[\n ])[•●○■▪▫–—]( )"
    # 使用正则表达式替换特殊符号为 -
    replaced_string = re.sub(bullet_pattern, r"\1-\2", text)
    # 返回替换后的文本
    return replaced_string

Marker 源码解析(一)(2)https://developer.aliyun.com/article/1483778

相关文章
|
15小时前
PandasTA 源码解析(二十三)
PandasTA 源码解析(二十三)
39 0
|
15小时前
PandasTA 源码解析(二十二)(3)
PandasTA 源码解析(二十二)
34 0
|
15小时前
PandasTA 源码解析(二十二)(2)
PandasTA 源码解析(二十二)
38 2
|
15小时前
PandasTA 源码解析(二十二)(1)
PandasTA 源码解析(二十二)
30 0
|
15小时前
PandasTA 源码解析(二十一)(4)
PandasTA 源码解析(二十一)
22 1
|
15小时前
PandasTA 源码解析(二十一)(3)
PandasTA 源码解析(二十一)
17 0
|
15小时前
PandasTA 源码解析(二十一)(2)
PandasTA 源码解析(二十一)
25 1
|
15小时前
PandasTA 源码解析(二十一)(1)
PandasTA 源码解析(二十一)
22 2
|
15小时前
PandasTA 源码解析(二十)(1)
PandasTA 源码解析(二十)
15 0
|
15小时前
PandasTA 源码解析(十九)(3)
PandasTA 源码解析(十九)
12 2

推荐镜像

更多