一、方案背景与核心痛点
在资讯自动化采集工程落地过程中,基于 RSS 渠道构建资讯聚合系统普遍存在四大行业痛点:资讯源重复报道泛滥、大量低价值标题党内容、原始正文文本碎片化、稿件信息良莠不齐。传统基于正则、关键词规则的过滤与摘要方案泛化能力弱,无法适配多变资讯文风,难以自动化提取事件核心要素、研判稿件信息价值并输出标准化结构化文本。
本方案依托 Python 3.10 + 生态,融合 LLM 大模型能力,搭建一套全链路可落地的资讯自动化处理流水线,完整覆盖多源采集、双层去重与质量过滤、大模型结构化深度摘要、标准化报表导出四大核心模块,全程无人工介入。方案摒弃硬编码规则匹配,由大模型自主萃取事实要素、判别内容价值、输出规范分析文本;同时前置完成内容去重逻辑,大幅削减无效 Token 消耗,降低大模型接口调用成本。
二、整体流水线架构
数据流分层架构:RSS/HTML异构资讯源 → 多源采集层 → SimHash去重与质量过滤层 → LLM结构化深度摘要层 → Markdown标准化输出层 核心设计优势:将去重、短文本过滤、标题党拦截逻辑前置至大模型调用环节之前,仅将清洗完成、无重复、具备有效信息量的稿件送入大模型处理,从源头压缩 Token 使用量,显著降低 API 调用开销。
三、分层模块工程实现
(一)采集层:多源资讯抓取与文本清洗组件
采集层仅负责资讯元数据拉取、HTML 标签剥离、原始干净文本提取,不执行任何摘要与内容分析逻辑;通过 URL MD5 哈希完成一级粗去重,过滤已抓取重复链接,避免重复请求。 引入feedparser解析 RSS 订阅源、requests搭建持久会话池降低请求开销、BeautifulSoup实现页面标签净化,内置通用 UA 伪装、超时容错、HTML 正文剥离兜底逻辑,兼容 RSS 摘要缺失场景下的全文爬取。
python
运行
```import feedparser
import requests
import hashlib
from dataclasses import dataclass, field
from bs4 import BeautifulSoup
from typing import Optional, List
import time
import random
@dataclass
class Article:
"""资讯结构化元数据实体类"""
title: str = ""
url: str = ""
source: str = ""
published: str = ""
raw_content: str = ""
clean_content: str = ""
word_count: int = 0
uid: str = ""
seen: bool = False
class ArticleFetcher:
"""异构资讯多源采集引擎"""
def init(self):
# 持久化请求会话,复用连接池优化网络性能
self.session = requests.Session()
self.session.headers.update({
"User-Agent": (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 Chrome/125.0.0.0 Safari/537.36"
),
})
# 全局URL指纹集合,实现链接级粗去重
self.seen_urls: set = set()
def fetch_rss(self, feed_url: str, source: str) -> List[Article]:
"""RSS订阅源批量资讯拉取接口"""
articles = []
try:
feed = feedparser.parse(feed_url)
# 限制单次解析条目上限,避免单次流量过载
for entry in feed.entries[:20]:
raw_text = self._get_rss_entry_content(entry)
article = Article(
title=getattr(entry, "title", ""),
url=getattr(entry, "link", ""),
source=source,
published=getattr(entry, "published", ""),
raw_content=raw_text,
)
# URL哈希判重,仅新增未收录资讯
if self._is_new_article(article):
articles.append(article)
print(f"[{source}] 成功采集新增资讯 {len(articles)} 篇")
except Exception as e:
print(f"[{source}] RSS源解析异常,异常信息:{str(e)}")
return articles
def fetch_html_fulltext(self, url: str, source: str) -> Optional[Article]:
"""RSS正文缺失兜底:单页面HTML全文采集"""
try:
resp = self.session.get(url, timeout=15)
soup = BeautifulSoup(resp.text, "html.parser")
# 清除广告、导航、脚本等无效DOM节点
for invalid_tag in soup.select("script, style, nav, .ad, .advertisement"):
invalid_tag.decompose()
# 精准定位正文容器标签
content_block = soup.select_one("article, .content, #article-body, main")
pure_text = content_block.get_text(separator="\n", strip=True) if content_block else ""
return Article(
title=soup.title.string or "",
url=url, source=source,
raw_content=pure_text[:2000],
)
except Exception as e:
print(f"[HTML采集] 链接{url[:40]} 请求失败,异常:{str(e)}")
return None
def _get_rss_entry_content(self, entry) -> str:
"""解析RSS条目摘要/正文字段,兼容多平台RSS结构差异"""
for field in ("summary", "content"):
field_val = getattr(entry, field, None)
if not field_val:
continue
# 兼容数组型content结构与字符串摘要
raw_html = field_val[0].get("value", str(field_val)) if isinstance(field_val, list) else str(field_val)
return self._html_to_plaintext(raw_html)
return ""
def _html_to_plaintext(self, html: str) -> str:
"""HTML标签清洗,输出纯文本并截断超长内容"""
soup = BeautifulSoup(html, "html.parser")
return soup.get_text(separator=" ", strip=True)[:3000]
def _is_new_article(self, article: Article) -> bool:
"""基于URL MD5哈希实现链接级去重判断"""
if not article.url:
return False
url_fingerprint = hashlib.md5(article.url.encode(encoding="utf-8")).hexdigest()[:16]
if url_fingerprint in self.seen_urls:
return False
self.seen_urls.add(url_fingerprint)
return True
采集层补充说明
1. 双层采集兜底机制:优先解析 RSS 内置摘要;若 RSS 仅提供标题无正文,自动调用fetch_html_fulltext抓取网页全文,解决国内资讯站点 RSS 内容缺失问题;
2. 网络优化:复用requests.Session长连接、固定通用 UA、设置 15s 请求超时,降低封禁风险;
3. 粗粒度去重:通过 URL 生成 16 位 MD5 指纹全局缓存,拦截重复链接请求,节省网络 IO。
(二)去重与质量过滤层:SimHash 文本指纹过滤引擎
在链接粗去重基础上,引入SimHash 局部敏感哈希算法实现语义级去重,解决「不同 URL 报道同一事件」的重复资讯问题;同步增加稿件质量前置过滤,拦截短文本无价值内容、标题党煽动类资讯,进一步减少无效大模型调用。 核心原理:对资讯正文生成 64 位文本指纹,通过计算指纹间海明距离判别重复;海明距离阈值默认配置为 3,距离≤3 判定为同事件重复稿件。相比单纯 URL 匹配,可识别换标题、换发布渠道但核心内容一致的同质化资讯。
python
运行
```import hashlib
from typing import List
@dataclass
class ContentDeduplicator:
"""SimHash语义去重 + 资讯质量过滤器"""
seen_text_signatures: set = set()
@staticmethod
def generate_simhash(text: str, slide_window: int = 3) -> int:
"""轻量化64位SimHash指纹生成,滑动窗口提取文本特征向量"""
# 截断超长文本,控制特征计算开销
word_list = text.split()[:200]
feature_vectors = []
# 滑动窗口提取连续词组特征
for idx in range(len(word_list) - slide_window + 1):
window_words = word_list[idx: idx + slide_window]
combined_key = "".join(window_words)
feature_hash = hash(combined_key) & 0xFFFF
feature_vectors.append(feature_hash)
# 64维向量加权聚合
weight_vec = [0] * 64
for h_val in feature_vectors:
for bit_idx in range(64):
bit_flag = (h_val >> bit_idx) & 1
weight_vec[bit_idx] += 1 if bit_flag else -1
# 生成最终64位二进制指纹
final_fingerprint = 0
for weight in weight_vec:
final_fingerprint = (final_fingerprint << 1) | (1 if weight > 0 else 0)
return final_fingerprint
def check_duplicate(self, article: Article, hamming_threshold: int = 3) -> bool:
"""海明距离计算,判断文本语义重复"""
target_text = article.clean_content if article.clean_content else article.raw_content
current_sig = self.generate_simhash(target_text)
# 比对历史指纹库
for history_sig in self.seen_text_signatures:
hamming_dist = bin(current_sig ^ history_sig).count("1")
if hamming_dist <= hamming_threshold:
return True
self.seen_text_signatures.add(current_sig)
return False
def batch_filter(self, articles: List[Article]) -> List[Article]:
"""批量执行三重过滤:短文本过滤、标题党拦截、SimHash语义去重"""
valid_articles = []
skip_count = 0
# 标题党风险关键词库
clickbait_keywords = [
"震惊", "刚刚", "内幕", "真相", "必看", "转发",
"突发", "刚刚宣布", "紧急", "惊天", "彻底爆发"
]
for article in articles:
text = article.clean_content or article.raw_content
article.word_count = len(text.split())
# 过滤100词以内无价值短资讯
if article.word_count < 100:
skip_count += 1
continue
# 拦截标题党煽动类稿件
if any(keyword in article.title for keyword in clickbait_keywords):
skip_count += 1
continue
# 语义去重校验
if self.check_duplicate(article):
skip_count += 1
continue
valid_articles.append(article)
print(f"过滤完成:共丢弃{skip_count}/{len(articles)}篇低质/重复资讯,留存有效稿件{len(valid_articles)}篇")
return valid_articles
调优说明
- 误杀规避:短文本资讯(100-200 字)特征稀疏,易出现 SimHash 误判,可动态下调海明阈值至 2;长深度稿件维持阈值 3 平衡召回与去重效果;
- 双重去重体系:URL 哈希粗过滤 + SimHash 语义细过滤,兼顾性能与去重精度。
(三)大模型结构化深度摘要层
基于 OpenAI 兼容接口封装批量分析引擎,通过精细化 Prompt 约束大模型输出标准化 JSON 结构化分析报告,不再输出自由文本摘要;内置 Token 计数、超长文本截断、批量限流、异常捕获、接口成本统计能力,适配批量资讯处理场景。 输出字段覆盖:精炼标题、事件背景、核心事实清单、深度趋势分析、行业标签、资讯质量打分、预估阅读时长、核心结论、内容潜在偏见标注,满足情报分析、行业简报业务需求。
python
运行
from openai import OpenAI
import tiktoken
import time
import json
from typing import List, Dict
class LLMDeepSummaryAnalyzer:
"""资讯深度结构化分析生成器,兼容OpenAI系列模型接口"""
def init(
self,
api_key: str,
base_url: str = "https://api.openai.com/v1",
model_name: str = "gpt-4o-mini",
single_max_output_tokens: int = 1200,
):
self.llm_client = OpenAI(api_key=api_key, base_url=base_url)
self.model = model_name
self.max_output_tokens = single_max_output_tokens
def single_article_analysis(self, article: Article) -> Dict:
"""单篇资讯调用LLM生成结构化深度分析"""
raw_text = (article.clean_content or article.raw_content)[:2500]
# 标准化约束Prompt,强制JSON输出规范
analysis_prompt = f"""你是专业行业情报编辑,严谨客观,仅基于原文内容输出分析,禁止杜撰信息。
待分析资讯元信息:
资讯来源:{article.source}
资讯标题:{article.title}
发布时间:{article.published}
正文原文:
{raw_text}
严格遵循以下JSON格式输出,禁止输出任何额外解释、markdown、说明文字,仅返回纯JSON:
{
{
"title": "20字以内精炼新闻标题",
"background": "50-100字事件行业背景梳理",
"core_facts": ["客观事实1", "客观事实2", "客观事实3"],
"analysis": "150-300字深度解读,包含事件成因、行业影响、短期发展预判",
"tags": ["行业标签1", "细分标签2", "热点标签3"],
"quality_score": 1-10区间资讯质量打分,信息越详实分数越高,
"read_time_minutes": 预估阅读时长(整数),
"key_conclusion": "30字以内全文核心总结",
"bias_signals": ["文章存在的立场偏向/信息片面点,无则填空数组"]
}}
"""
try:
# Token长度校验,超长自动截断上下文
token_encoder = tiktoken.encoding_for_model(self.model)
prompt_token_len = len(token_encoder.encode(analysis_prompt))
if prompt_token_len > 8000:
analysis_prompt = self._truncate_overlength_prompt(analysis_prompt, token_encoder)
# LLM接口调用,低温度保证输出稳定性
chat_resp = self.llm_client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": "专业客观的行业情报分析师,严格按照指定JSON格式输出,不附加任何多余文字。"},
{"role": "user", "content": analysis_prompt}
],
temperature=0.3,
max_tokens=self.max_output_tokens,
)
response_content = chat_resp.choices[0].message.content.strip()
# 剥离代码块标记,兼容模型输出```json包裹场景
if "```json" in response_content:
response_content = response_content.split("```json")[1].split("```")[0]
elif "```" in response_content:
response_content = response_content.split("```")[1].split("```")[0]
# JSON反序列化
analysis_result = json.loads(response_content.strip())
# 补充业务元数据
analysis_result["source"] = article.source
analysis_result["original_url"] = article.url
analysis_result["word_count"] = article.word_count
analysis_result["token_used"] = chat_resp.usage.total_tokens if chat_resp.usage else 0
return analysis_result
except json.JSONDecodeError:
return {"error": "LLM输出JSON格式解析失败"}
except Exception as err:
return {"error": f"接口调用异常:{str(err)}"}
def _truncate_overlength_prompt(self, prompt: str, encoder) -> str:
"""超长上下文截断,控制输入Token上限7000"""
line_list = prompt.split("\n")
final_lines = []
total_token = 0
for line in line_list:
line_token = len(encoder.encode(line))
if total_token + line_token > 7000:
break
final_lines.append(line)
total_token += line_token
return "\n".join(final_lines)
def batch_analysis(self, article_list: List[Article], request_delay: float = 0.5) -> List[Dict]:
"""批量资讯分析,限流间隔防接口封禁,统计总Token与调用成本"""
analysis_reports = []
total_consume_tokens = 0
for idx, article in enumerate(article_list):
print(f"正在处理 {idx+1}/{len(article_list)} | 资讯标题:{article.title[:30]}...")
report = self.single_article_analysis(article)
if "error" not in report:
analysis_reports.append(report)
total_consume_tokens += report.get("token_used", 0)
print(f" 处理成功 | 消耗Token:{report['token_used']} | 资讯质量分:{report['quality_score']}")
else:
print(f" 处理失败 | 异常信息:{report['error'][:40]}")
time.sleep(request_delay)
# 接口成本概算(gpt-4o-mini 百万Token单价0.1元)
estimate_cost = total_consume_tokens / 1_000_000 * 0.1
print(f"\n批量分析完成:有效报告{len(analysis_reports)}份,总消耗Token {total_consume_tokens},预估调用成本 ¥{estimate_cost:.2f}")
return analysis_reports
Prompt 设计核心价值
- 结构化输出约束:强制 JSON 标准格式,省去后续文本解析、摘要拆分工作,直接对接报表导出;
- 多层情报维度:除基础摘要外,增加事实提取、偏见识别、质量打分,满足行业情报研判需求;
- 低温度参数(0.3):降低模型随机发散,保证同类型资讯输出结构、尺度统一。
(四)输出层:Markdown 标准化报表导出组件
将大模型生成的结构化分析报告批量渲染为可直接查阅的 Markdown 日报文档,按资讯质量分降序排序,区分高 / 中 / 低价值资讯标签,自动创建输出目录,适配日常情报归档、周报交付场景。
python
运行
import json
from datetime import datetime
import os
from typing import List, Dict
class DailyReportExporter:
"""结构化分析报告Markdown导出工具"""
@staticmethod
def export_markdown(report_list: List[Dict], output_path: str = "output/daily_report.md"):
# 文档头部元信息
md_content = [
f"# 行业资讯自动化情报日报 {datetime.now().strftime('%Y-%m-%d')}",
"",
f"> 本次有效分析资讯总量:{len(report_list)} 篇",
"",
]
# 按资讯质量分降序排序,高价值资讯前置
sorted_reports = sorted(report_list, key=lambda x: x.get("quality_score", 0), reverse=True)
for report in sorted_reports:
score = report.get("quality_score", 0)
# 分级标签标识
level_tag = "🔴高价值" if score >= 7 else "🟡中等价值" if score >= 5 else "⚪低价值"
md_content.append(f"## {level_tag} {report.get('title', '无标题资讯')}")
md_content.append("")
# 基础元数据
md_content.append(f"- 资讯来源:{report.get('source', '未知')} | 预估阅读时长:{report.get('read_time_minutes', 0)}分钟")
md_content.append(f"- 事件背景:{report.get('background', '')}")
md_content.append("")
# 核心事实清单
fact_list = report.get("core_facts", [])
if fact_list:
md_content.append("### 核心客观事实")
for fact in fact_list[:3]:
md_content.append(f"- {fact}")
md_content.append("")
# 深度分析文本
md_content.append(f"### 深度行业分析\n{report.get('analysis', '')}")
md_content.append("")
# 行业标签
tag_list = report.get("tags", [])
if tag_list:
md_content.append(f"**分类标签**:{', '.join(tag_list)}")
# 内容立场提示
bias_list = report.get("bias_signals", [])
if bias_list:
md_content.append(f"**内容立场提示**:{', '.join(bias_list)}")
# 分隔线
md_content.append("---")
md_content.append("")
# 自动创建输出目录
output_dir = os.path.dirname(output_path)
if output_dir and not os.path.exists(output_dir):
os.makedirs(output_dir, parents=True, exist_ok=True)
# 写入本地文件,UTF-8编码兼容中文
with open(output_path, "w", encoding="utf-8") as f:
f.write("\n".join(md_content))
print(f"情报日报已导出至路径:{output_path}")
四、全链路流水线调度入口
封装一体化执行函数,串联采集、过滤、LLM 分析、报表导出全流程,支持批量 RSS 源批量订阅配置,可直接定时调度运行。
python
运行
def run_daily_intelligence_pipeline(
feed_configs: List[tuple[str, str]],
llm_api_key: str,
min_quality_threshold: int = 5,
) -> List[Dict]:
"""
每日资讯流水线统一调度函数
feed_configs: RSS源配置数组,格式[(rss订阅链接, 资讯来源名称)]
min_quality_threshold: 最终保留资讯最低质量分数
"""
# 1. 多源资讯采集
fetcher = ArticleFetcher()
raw_article_pool = []
for feed_url, source_name in feed_configs:
raw_article_pool.extend(fetcher.fetch_rss(feed_url, source_name))
time.sleep(1)
# 2. SimHash去重与低质资讯过滤
dedup_engine = ContentDeduplicator()
valid_articles = dedup_engine.batch_filter(raw_article_pool)
# 3. LLM批量结构化深度分析
llm_analyzer = LLMDeepSummaryAnalyzer(api_key=llm_api_key)
full_reports = llm_analyzer.batch_analysis(valid_articles, request_delay=0.5)
# 4. 过滤低于阈值的低质量分析稿件
final_reports = [r for r in full_reports if r.get("quality_score", 0) >= min_quality_threshold]
# 5. 导出Markdown日报
DailyReportExporter.export_markdown(final_reports, "output/daily_report.md")
return final_reports
if name == "main":
# 业务RSS订阅源配置示例
FEED_SOURCE_LIST = [
("https://feeds.feedburner.com/ruanyifeng", "阮一峰技术周刊"),
("https://www.36kr.com/feed", "36氪创投资讯"),
]
# 启动全链路流水线
result_reports = run_daily_intelligence_pipeline(
feed_configs=FEED_SOURCE_LIST,
llm_api_key="填入你的LLM接口密钥",
min_quality_threshold=5,
)
五、接口调用成本量化测算
以gpt-4o-mini模型、单批次处理 50 篇有效资讯为基准测算:
表格
指标项 数值
单篇资讯平均输入 Token 800
单篇资讯平均输出 Token 600
50 篇资讯总消耗 Token 70000
接口估算成本 ¥0.07
整体调用成本极低,系统主要开销来源于大模型 API 调用,无额外存储、算力成本;日均 500 篇资讯处理量级,单日 Token 成本仅约 0.7 元,具备规模化落地性价比。
六、工程落地常见问题与优化方案
- RSS 源仅返回摘要、无完整正文 解决方案:配套 HTML 全文采集兜底接口fetch_html_fulltext,当 RSS 文本长度不足时自动触发页面全量爬取,补全资讯正文。
- LLM 返回内容包含代码标记,JSON 解析失败 优化方案:增加字符串分割逻辑剥离 ```json 代码块标记,Prompt 强制约束仅输出纯 JSON 文本,双保险规避解析异常。
- 资讯质量打分浮动、跨模型不具备可比性 说明:质量分为 LLM 主观研判指标,仅用于内部稿件分级筛选,不适合作为客观量化指标;切换模型时建议同步调整过滤阈值。
- SimHash 短文本误判重复 调优方案:区分文本长度动态调整海明距离阈值,100-200 字短资讯阈值下调至 2,长深度资讯维持阈值 3。
七、方案能力边界说明 - 事实校验局限:LLM 仅基于原文文本做信息萃取与解读,无法独立完成事实真伪核查;涉及重大行业事件、数据类情报,必须溯源原始稿件人工复核。
- 成本线性增长:资讯处理量与 Token 消耗、调用成本呈线性正相关;日均千篇以上大规模场景,可切换低成本gpt-3.5-turbo或前置轻量摘要预过滤压缩文本长度。
- 时效性约束:资讯更新频率完全依赖 RSS 源站推送策略,多数资讯平台 RSS 仅每日更新 1 次;对高实时性情报需求,需补充关键词搜索引擎轮询采集链路。