微信 item_search - 关键词取文章列表接口对接全攻略:从入门到精通

简介: 本攻略详解基于搜狗微信搜索合规接口的item_search调用方法,涵盖接口认知、密钥获取、参数配置、签名生成、批量采集、异常处理及性能优化,结合Python实操示例,助力开发者高效实现微信文章列表的数据获取与舆情监测、内容聚合等应用,兼顾合规性与生产稳定性。

微信生态作为国内最大的内容分发平台之一,汇聚了海量公众号优质文章。item_search 接口(基于搜狗微信搜索合规能力封装)是通过关键词批量获取微信文章列表的核心工具,支持按关键词、发布时间、公众号类型、互动量等多维度筛选,返回文章标题、URL、发布时间、互动数据等核心信息,广泛应用于舆情监测、热点追踪、内容聚合、垂直领域文章采集等场景。
本攻略结合微信文章列表接口特性(数据覆盖广、筛选维度细、需严格遵守反爬与合规规范),从接口认知、前置准备、实操落地、调试优化到进阶技巧,全方位拆解对接流程,兼顾入门易用性与生产级稳定性,帮助开发者高效完成接口对接。
一、接口核心认知:先明确 “能做什么”“适配什么场景”

  1. 接口定位与核心价值
    核心功能:通过 item_search 接口,输入关键词(支持多关键词组合),筛选微信公众号文章列表,支持按发布时间、公众号认证类型、互动量、文章类型等条件过滤,返回分页文章列表数据(含文章基础信息与核心互动数据),可联动 item_get 接口获取文章详情;
    平台特性:
    数据覆盖:收录 90% 以上活跃微信公众号,支持近 5 年历史文章检索,新文章收录延迟 10-30 分钟;
    筛选能力:支持精确匹配、模糊匹配、多关键词组合(AND/OR 逻辑),时间范围精准到天;
    无需微信授权:依赖搜狗微信搜索合规采集能力,无需微信开放平台账号,仅需服务商提供的密钥;
    响应速度:单页请求响应时间 1-3 秒,批量分页请求支持并发优化;
    典型应用:
    舆情监测:按关键词(如 “政策名称”“企业品牌”“社会热点事件”)实时追踪相关文章传播;
    热点追踪:输入热点关键词(如 “AI 大模型”“民生政策”),批量获取最新相关文章列表;
    内容聚合:垂直领域(如 “医疗健康”“金融理财”)关键词采集,搭建领域资讯平台;
    学术研究:按主题关键词(如 “乡村振兴”“数字经济”)采集相关文章,作为研究素材;
    竞品监测:输入竞品品牌 / 产品关键词,追踪竞品公众号相关宣传、用户反馈文章。
  2. 核心参数与返回字段(关键词搜索场景适配版)
    (1)请求参数(必填 + 可选,按优先级排序)
    参数名称 类型 是否必填 说明 应用场景示例
    appkey string 是 接口调用密钥,由合规服务商分配(注册并通过审核后获取) sogou_wechat_search_abc123
    secret string 是 签名密钥,用于请求合法性校验(不可泄露,定期轮换) sogou_wechat_search_def456
    keyword string 是 搜索关键词(支持多关键词组合,AND 用空格分隔,OR 用 分隔) 数字经济 政策解读(AND)、AI 大模型 生成式 AI(OR)
    time_range string 否 发布时间范围筛选,默认 “all”(全部时间) all(全部)、1day(1 天内)、7days(7 天内)、30days(30 天内)、90days(90 天内)、1year(1 年内)、custom(自定义,需配合 start_date/end_date)
    start_date string 否 自定义时间范围开始日期(time_range=custom 时必填),格式 “YYYY-MM-DD” 2024-01-01
    end_date string 否 自定义时间范围结束日期(time_range=custom 时必填),格式 “YYYY-MM-DD” 2024-12-31
    verify_type string 否 公众号认证类型筛选,默认 “all”(全部) all(全部)、official(政府 / 事业单位认证)、enterprise(企业认证)、personal(个人认证)、none(未认证)
    sort_type string 否 排序方式,默认 “relevance”(相关度优先) relevance(相关度)、pub_time(发布时间倒序)、read_count(阅读量倒序)、like_count(在看数倒序)
    article_type string 否 文章类型筛选,默认 “all”(全部) all(全部)、original(原创文章)、reprint(转载文章)、video(视频文章)、image(图文文章)
    page_no int 否 页码,默认 1(支持分页,最大页码以服务商限制为准,通常≤100) 1、2、3
    page_size int 否 每页文章条数,默认 20(最大支持 50 条 / 页,企业版可申请提升至 100) 20、30、50
    fields string 否 需返回的字段集合,默认返回全部,按需筛选(英文逗号分隔) title,article_url,pub_time,official_account,read_count,like_count,summary,is_original
    match_type string 否 关键词匹配类型,默认 “fuzzy”(模糊匹配) fuzzy(模糊匹配,含关键词即可)、exact(精确匹配,标题 / 正文中完整包含关键词组合)
    timestamp long 是 请求时间戳(毫秒级,有效期 5 分钟,避免重复请求) 1735689600000
    sign string 是 签名值(按服务商规则加密生成,核心校验项) 32 位 MD5 大写串(如 A8F7C3D2E1B0967453120FEDCBA9876)
    注:
    关键词组合规则:多关键词用空格分隔表示 “AND”(同时包含),用|分隔表示 “OR”(包含任一),如 “数字经济 政策 | 法规” 表示 “包含数字经济且包含政策或法规”;
    时间范围优先级:time_range=custom 时,start_date 和 end_date 必须同时传入,且 end_date≥start_date,时间跨度最大支持 1 年;
    部分服务商支持高级筛选参数(如 “公众号名称”“文章标签”),需以实际文档为准。
    (2)返回核心字段(按业务场景分类,关键词搜索重点标注)
    文章基础信息:文章标题(title,含副标题)、文章 URL(article_url,微信原始链接)、搜狗文章 ID(article_id,唯一标识,可用于item_get接口)、摘要(summary,文章核心观点提炼)、发布时间(pub_time,精确到秒)、文章类型(type:original/reprint/video/image)、原创标识(is_original,true/false)、转载来源(reprint_source,非原创文章的首发公众号);
    公众号信息:公众号名称(official_account)、公众号 ID(wechat_id,微信官方唯一标识)、公众号认证类型(verify_type,如 “企业认证”“政府认证”)、公众号头像(avatar_url,部分服务商支持);
    互动数据:阅读量(read_count,微信公开显示的 “10 万 +” 或精准数值)、在看数(like_count,原 “赞” 数)、留言数(comment_count,部分服务商支持)、转发量(share_count,估算值,企业版可用);
    扩展信息:文章标签(tags,搜狗标注的内容分类,如 “财经”“科技”“民生”)、相关度评分(relevance_score,关键词匹配相关度,0-100 分)、是否违规(is_illegal,true/false,违规 / 删除文章标记)、收录时间(index_time,搜狗收录该文章的时间)。
  3. 接口限制与注意事项
    调用频率:普通用户 10-30 次 / 分钟,企业用户 50-200 次 / 分钟(不同服务商限制不同,需提前确认);
    数据缓存:搜索结果缓存 30 分钟 - 1 小时,热门关键词缓存可能缩短至 10 分钟,企业版支持refresh=1强制刷新(需申请权限);
    权限差异:
    权限类型 核心差异 适用场景
    普通用户 基础筛选(时间、认证类型)、基础字段、单页最大 20 条、低调用频率 个人研究、小型工具开发
    企业用户 高级筛选(自定义时间、文章类型、精确匹配)、完整字段(转发量、标签)、单页最大 100 条、高调用频率、强制刷新 舆情监测、商业数据分析、大规模内容采集
    内容限制:
    已删除 / 违规文章、隐私公众号文章、未被搜狗收录的文章无法返回;
    部分原创文章可能因版权限制,仅返回基础信息(标题、URL),正文需通过item_get接口进一步获取;
    合规要求:
    必须使用合规服务商接口,禁止自行爬取搜狗微信搜索结果(违反《反不正当竞争法》);
    数据仅可用于自身合规业务,禁止商业化售卖、恶意传播或侵犯公众号版权;
    引用需标注 “数据来源:搜狗微信搜索”,原创文章二次使用需获得公众号作者授权;
    不得用于批量抓取个人隐私、敏感信息或从事违法活动。
    二、对接前准备:3 步搞定前置条件
  4. 获取接口密钥(核心步骤)
    微信关键词文章列表接口无官方公开申请渠道,需通过合规第三方服务商(如聚合数据、极速数据、APISpace、阿里云 API 市场等)获取,步骤如下:
    选择合规服务商:优先选择有资质、口碑好、支持企业认证的服务商,要求提供《数据合规承诺书》《服务协议》,避免使用非法爬虫接口(存在账号封禁、法律风险);
    注册与认证:
    个人用户:注册账号,完成实名认证(提供身份证信息),获取基础接口权限;
    企业用户:注册账号,完成企业认证(提供营业执照、法人信息、业务用途说明),获取高级接口权限;
    申请接口权限:搜索 “微信文章关键词搜索接口” 或 “搜狗微信 item_search 接口”,选择对应接口,提交申请(需明确业务用途,如 “舆情监测系统开发”“垂直领域内容聚合”);
    获取密钥:审核通过后,在服务商后台 “应用管理” 中获取 appkey 和 secret(部分服务商按调用次数收费,需提前充值或购买套餐);
    查阅文档:下载服务商提供的接口文档,重点确认:
    参数格式(如time_range取值、关键词组合规则);
    签名生成规则(MD5/SHA256、参数排序要求);
    返回字段含义(不同服务商字段命名可能差异);
    调用频率、分页上限、权限限制等细节。
  5. 技术环境准备
    (1)支持语言与协议
    接口采用 HTTPS 协议,支持所有主流开发语言:Python、Java、PHP、Go、Node.js 等,无框架限制,推荐使用 Python(数据处理、批量请求效率高,适配关键词搜索的多条件筛选场景)。
    (2)必备工具与依赖
    调试工具:
    Postman(快速验证接口可用性,批量测试参数组合);
    curl(命令行调试,快速排查网络问题);
    浏览器开发者工具(分析搜狗微信搜索结果页结构,辅助字段映射);
    在线 MD5/SHA256 工具(验证签名生成正确性);
    开发依赖:
    网络请求:requests(Python)、OkHttp(Java)、axios(Node.js);
    加密工具:语言内置 MD5/SHA256 库(签名生成用,按服务商规则选择);
    数据处理:json(解析响应数据)、pandas(批量整理文章列表)、datetime(时间格式转换);
    辅助工具:
    日志库(如 logging,记录请求 / 响应 / 错误,便于追溯);
    Redis(缓存搜索结果,减少重复请求);
    定时任务框架(如 APScheduler,实时关键词监测);
    异常监控工具(如 Sentry,生产级报错追踪);
    多线程 / 异步库(如 aiohttp、concurrent.futures,提升批量分页请求效率)。
  6. 业务需求梳理
    关键词策略:明确核心关键词及组合规则(如舆情监测需 “品牌名 + 负面关键词” 组合),避免关键词过于宽泛导致结果冗余;
    筛选条件定义:
    时间范围:实时监测用 “1day/7days”,历史数据采集用 “custom” 自定义时间;
    公众号类型:政府政策监测选 “official”,企业品牌监测选 “enterprise”;
    文章类型:原创内容分析选 “original”,全面覆盖选 “all”;
    字段筛选:仅保留业务必需字段(如热点追踪需 “title、article_url、pub_time、read_count”,竞品监测需 “official_account、is_original、like_count”),减少数据传输量;
    分页与批量需求:确认是否需要获取全部搜索结果(如学术研究),或仅需前 N 页(如实时热点追踪),避免无效分页请求;
    异常场景预设:关键词无匹配结果、频率超限、网络波动、文章已删除等场景,需设计降级方案(如返回 “无相关文章” 提示、缓存历史搜索结果)。
    三、实操步骤:从调试到落地(Python 示例)
    步骤 1:理解请求流程
    拼接关键词、筛选条件等核心参数;
    按服务商规则生成签名(sign),确保请求合法性;
    发送 POST/GET 请求(多数服务商推荐 POST,参数传输更安全);
    接收响应数据,解析 JSON 格式结果,提取文章列表;
    数据标准化处理(时间格式统一、字段映射、去重);
    按需分页请求(遍历所有页码,获取完整结果);
    异常处理(签名错误、权限不足、无结果等)。
    步骤 2:签名生成规则(关键!不同服务商可能差异,以实际文档为准)
    多数合规服务商采用以下通用签名规则(若有差异,需按服务商文档调整):
    按参数名ASCII 升序排序所有请求参数(不含sign字段);
    将排序后的参数拼接为 “key1=value1&key2=value2&...” 格式(中文 / 特殊字符需 URL 编码);
    在拼接字符串末尾追加 &secret=你的secret;
    对拼接后的字符串进行MD5 加密(32 位大写) 或 SHA256 加密(64 位大写),结果即为sign。
    签名示例(参数排序与拼接)
    假设请求参数:
    appkey=sogou_wechat_search_abc123
    keyword = 数字经济 政策解读
    time_range=30days
    verify_type=all
    sort_type=pub_time
    page_no=1
    page_size=20
    timestamp=1735689600000
    secret=sogou_wechat_search_def456
    排序后参数:appkey、keyword、page_no、page_size、sort_type、time_range、timestamp、verify_type;
    拼接字符串(中文已 URL 编码):appkey=sogou_wechat_search_abc123&keyword=%E6%95%B0%E5%AD%97%E7%BB%8F%E6%B5%8E%20%E6%94%BF%E7%AD%96%E8%A7%A3%E8%AF%BB&page_no=1&page_size=20&sort_type=pub_time&time_range=30days×tamp=1735689600000&verify_type=all&secret=sogou_wechat_search_def456;
    MD5 加密后 sign:A8F7C3D2E1B0967453120FEDCBA9876543210ABCDEF(32 位大写)。
    步骤 3:完整代码实现(Python)
    (1)依赖安装
    bash
    运行
    pip install requests pandas beautifulsoup4 aiohttp apscheduler # requests:网络请求;pandas:数据整理;aiohttp:异步请求;APScheduler:定时任务
    (2)完整代码(含签名生成、接口调用、批量分页、数据保存)
    python
    import requests
    import hashlib
    import time
    import json
    import pandas as pd
    from urllib.parse import urlencode, unquote
    from typing import Dict, List, Optional
    import logging
    from apscheduler.schedulers.blocking import BlockingScheduler
    import aiohttp
    import asyncio

配置日志(记录接口调用、错误信息,便于合规追溯)

logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[logging.FileHandler("sogou_wechat_item_search.log"), logging.StreamHandler()]
)

接口核心配置(替换为服务商提供的appkey、secret、API_URL)

APP_KEY = "你的appkey"
SECRET = "你的secret"
API_URL = "https://api.sogou-wechat.com/item_search" # 服务商接口地址(以实际为准)
SAVE_PATH = "微信关键词文章列表.xlsx" # 数据保存路径
CACHE_KEY_PREFIX = "wechatsearch" # Redis缓存键前缀(如需缓存可启用)

def generate_sign(params: Dict) -> str:
"""生成接口签名(通用MD5版,若服务商要求SHA256可调整)"""

# 1. 按参数名ASCII升序排序(排除sign字段)
sorted_params = sorted(params.items(), key=lambda x: x[0])
# 2. 拼接参数字符串(urlencode自动处理中文、特殊字符)
param_str = urlencode(sorted_params, encoding="utf-8") + f"&secret={SECRET}"
# 3. MD5加密(32位大写)
md5 = hashlib.md5()
md5.update(param_str.encode("utf-8"))
return md5.hexdigest().upper()

def standardize_data(raw_article: Dict) -> Dict:
"""标准化文章数据(统一字段格式,适配业务展示/分析)"""

# 解析互动数据
interact_data = raw_article.get("interact_data", {}) or raw_article  # 兼容不同服务商字段结构
# 标准化时间格式
pub_time = raw_article.get("pub_time", 0)
pub_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(pub_time/1000)) if pub_time else ""
# 处理阅读量(兼容“10万+”格式)
read_count = interact_data.get("read_count", 0)
if isinstance(read_count, str) and "万+" in read_count:
    read_count = int(float(read_count.replace("万+", "")) * 10000)

return {
    "请求时间": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
    "关键词": raw_article.get("keyword", ""),
    "文章标题": raw_article.get("title", ""),
    "文章URL": raw_article.get("article_url", ""),
    "搜狗文章ID": raw_article.get("article_id", ""),
    "公众号名称": raw_article.get("official_account", ""),
    "公众号认证类型": raw_article.get("verify_type", ""),
    "发布时间": pub_time_str,
    "文章类型": "原创" if raw_article.get("is_original", False) else "转载",
    "转载来源": raw_article.get("reprint_source", ""),
    "阅读量": read_count,
    "在看数": interact_data.get("like_count", 0),
    "留言数": interact_data.get("comment_count", 0),
    "转发量(估算)": interact_data.get("share_count", 0),
    "摘要": raw_article.get("summary", ""),
    "相关度评分": raw_article.get("relevance_score", 0),
    "文章标签": ",".join(raw_article.get("tags", [])) if raw_article.get("tags") else "",
    "文章状态": "正常" if not raw_article.get("is_illegal", False) else "已删除/违规"
}

def get_article_list_sync(
keyword: str,
time_range: str = "all",
start_date: Optional[str] = None,
end_date: Optional[str] = None,
verify_type: str = "all",
sort_type: str = "relevance",
article_type: str = "all",
page_no: int = 1,
page_size: int = 20,
match_type: str = "fuzzy"
) -> Dict:
"""
同步调用item_search接口,获取单页微信文章列表
:param keyword: 搜索关键词(支持多关键词组合)
:param time_range: 时间范围
:param start_date: 自定义开始日期(time_range=custom时必填)
:param end_date: 自定义结束日期(time_range=custom时必填)
:param verify_type: 公众号认证类型
:param sort_type: 排序方式
:param article_type: 文章类型
:param page_no: 页码
:param page_size: 每页条数
:param match_type: 关键词匹配类型
:return: 标准化后的文章列表结果
"""

# 1. 校验参数合法性
if time_range == "custom" and not (start_date and end_date):
    logging.error("time_range=custom时,start_date和end_date为必填参数")
    return {"success": False, "error_msg": "缺少自定义时间范围参数", "error_code": -1}

# 2. 构建基础参数(必填项)
params = {
    "appkey": APP_KEY,
    "keyword": keyword,
    "time_range": time_range,
    "verify_type": verify_type,
    "sort_type": sort_type,
    "article_type": article_type,
    "page_no": page_no,
    "page_size": page_size,
    "match_type": match_type,
    "timestamp": int(time.time() * 1000)
}

# 3. 补充可选参数
if time_range == "custom":
    params["start_date"] = start_date
    params["end_date"] = end_date

# 4. 生成签名
params["sign"] = generate_sign(params)

try:
    # 5. 发送POST请求(HTTPS协议,超时15秒)
    response = requests.post(
        url=API_URL,
        data=json.dumps(params),
        headers={"Content-Type": "application/json"},
        timeout=15,
        verify=True
    )
    response.raise_for_status()  # 抛出HTTP请求异常(如404、500)
    result = response.json()

    # 6. 处理响应(不同服务商返回格式可能不同,需按实际文档调整)
    # 假设服务商返回格式:{"code":200,"msg":"success","data":{"article_list":[],"total":0,"page_total":1}}
    if result.get("code") == 200:
        raw_data = result.get("data", {})
        article_list = raw_data.get("article_list", [])
        total = raw_data.get("total", 0)  # 匹配文章总条数
        page_total = raw_data.get("page_total", 1)  # 总页码

        # 标准化文章数据
        standard_articles = [standardize_data(article) for article in article_list]
        for article in standard_articles:
            article["关键词"] = keyword  # 补充关键词字段,便于多关键词筛选时区分

        return {
            "success": True,
            "data": standard_articles,
            "total": total,
            "page_no": page_no,
            "page_total": page_total,
            "error_msg": ""
        }
    else:
        error_msg = result.get("msg", "接口调用失败")
        error_code = result.get("code", -2)
        logging.error(f"接口返回错误:code={error_code}, msg={error_msg}(关键词:{keyword},页码:{page_no})")
        return {
            "success": False,
            "data": [],
            "total": 0,
            "page_no": page_no,
            "page_total": 0,
            "error_code": error_code,
            "error_msg": error_msg
        }
except requests.exceptions.RequestException as e:
    logging.error(f"网络异常(关键词:{keyword},页码:{page_no}):{str(e)}")
    return {
        "success": False,
        "data": [],
        "total": 0,
        "page_no": page_no,
        "page_total": 0,
        "error_code": -3,
        "error_msg": f"网络异常:{str(e)}"
    }
except Exception as e:
    logging.error(f"数据处理异常(关键词:{keyword},页码:{page_no}):{str(e)}")
    return {
        "success": False,
        "data": [],
        "total": 0,
        "page_no": page_no,
        "page_total": 0,
        "error_code": -4,
        "error_msg": f"处理异常:{str(e)}"
    }

async def get_article_list_async(session, keyword: str, **kwargs) -> Dict:
"""异步调用item_search接口,提升批量请求效率"""
page_no = kwargs.get("page_no", 1)

# 构建参数(逻辑同同步方法)
params = {
    "appkey": APP_KEY,
    "keyword": keyword,
    "time_range": kwargs.get("time_range", "all"),
    "verify_type": kwargs.get("verify_type", "all"),
    "sort_type": kwargs.get("sort_type", "relevance"),
    "article_type": kwargs.get("article_type", "all"),
    "page_no": page_no,
    "page_size": kwargs.get("page_size", 20),
    "match_type": kwargs.get("match_type", "fuzzy"),
    "timestamp": int(time.time() * 1000)
}
if params["time_range"] == "custom":
    params["start_date"] = kwargs.get("start_date")
    params["end_date"] = kwargs.get("end_date")
# 生成签名
params["sign"] = generate_sign(params)

try:
    async with session.post(
        API_URL,
        json=params,
        headers={"Content-Type": "application/json"},
        timeout=15
    ) as response:
        response.raise_for_status()
        result = await response.json()
        if result.get("code") == 200:
            raw_data = result.get("data", {})
            article_list = raw_data.get("article_list", [])
            standard_articles = [standardize_data(article) for article in article_list]
            for article in standard_articles:
                article["关键词"] = keyword
            return {
                "success": True,
                "data": standard_articles,
                "total": raw_data.get("total", 0),
                "page_no": page_no,
                "page_total": raw_data.get("page_total", 1),
                "error_msg": ""
            }
        else:
            error_msg = result.get("msg", "接口调用失败")
            logging.error(f"异步请求失败(关键词:{keyword},页码:{page_no}):{error_msg}")
            return {"success": False, "data": [], "error_msg": error_msg}
except Exception as e:
    logging.error(f"异步请求异常(关键词:{keyword},页码:{page_no}):{str(e)}")
    return {"success": False, "data": [], "error_msg": str(e)}

def batch_get_article_list_sync(
keyword: str,
max_page: int = 5, # 最大获取页码(避免无限制分页)
kwargs
) -> List[Dict]:
"""同步批量获取多页文章列表(遍历所有页码或指定max_page)"""
all_articles = []
page_no = 1
while True:
logging.info(f"正在获取关键词「{keyword}」第{page_no}页文章列表")
result = get_article_list_sync(keyword=keyword, page_no=page_no,
kwargs)
if not result["success"]:
logging.error(f"第{page_no}页获取失败:{result['error_msg']}")
break
page_articles = result["data"]
if not page_articles:
logging.info(f"第{page_no}页无匹配文章,批量获取结束")
break
all_articles.extend(page_articles)
logging.info(f"第{page_no}页获取成功,新增{len(page_articles)}条数据(累计{len(all_articles)}条)")

    # 终止条件:达到最大页码或总页码
    if page_no >= result["page_total"] or page_no >= max_page:
        break
    page_no += 1
    # 控制调用频率(普通用户10次/分钟,间隔6秒;企业用户50次/分钟,间隔1秒)
    time.sleep(6)
return all_articles

async def batch_get_article_list_async(
keyword: str,
max_page: int = 5,
**kwargs
) -> List[Dict]:
"""异步批量获取多页文章列表(并行请求,提升效率)"""
all_articles = []

# 先获取总页码
first_page_result = await get_article_list_async(
    session=None, keyword=keyword, page_no=1, **kwargs
)
if not first_page_result["success"]:
    logging.error(f"关键词「{keyword}」第1页获取失败:{first_page_result['error_msg']}")
    return all_articles
all_articles.extend(first_page_result["data"])
page_total = min(first_page_result["page_total"], max_page)
if page_total <= 1:
    logging.info(f"关键词「{keyword}」仅1页数据,批量获取结束")
    return all_articles

# 并行请求剩余页码
async with aiohttp.ClientSession() as session:
    tasks = []
    for page_no in range(2, page_total + 1):
        tasks.append(get_article_list_async(session, keyword=keyword, page_no=page_no, **kwargs))
    # 控制并发数(避免频率超限)
    semaphore = asyncio.Semaphore(5)  # 最大并发5个请求
    async def bounded_task(task):
        async with semaphore:
            return await task
    results = await asyncio.gather(*[bounded_task(task) for task in tasks])

    for result in results:
        if result["success"]:
            all_articles.extend(result["data"])
            logging.info(f"异步获取成功,新增{len(result['data'])}条数据")
logging.info(f"关键词「{keyword}」异步批量获取结束,累计{len(all_articles)}条数据")
return all_articles

def save_article_list(articles: List[Dict], save_path: str = SAVE_PATH):
"""将文章列表保存为Excel文件(便于归档/分析)"""
if not articles:
logging.warning("无文章数据可保存")
return

df = pd.DataFrame(articles)
# 筛选常用字段,优化Excel可读性
columns = [
    "请求时间", "关键词", "文章标题", "公众号名称", "公众号认证类型",
    "发布时间", "文章类型", "阅读量", "在看数", "留言数", "转发量(估算)",
    "相关度评分", "文章标签", "文章状态", "摘要", "文章URL", "搜狗文章ID"
]
df = df[columns].drop_duplicates(subset=["文章URL"])  # 按文章URL去重(避免重复保存)

# 增量保存(避免覆盖历史数据)
try:
    history_df = pd.read_excel(save_path, engine="openpyxl")
    df = pd.concat([history_df, df], ignore_index=True).drop_duplicates(subset=["文章URL"])
except FileNotFoundError:
    pass

df.to_excel(save_path, index=False, engine="openpyxl")
logging.info(f"文章列表已归档至:{save_path}(累计{len(df)}条数据)")

def keyword_monitor_task(keywords: List[str], **kwargs):
"""关键词定时监测任务(实时追踪相关文章)"""
logging.info("=== 开始执行关键词定时监测任务 ===")
all_articles = []
for keyword in keywords:
logging.info(f"=== 开始监测关键词:{keyword} ===")

    # 同步批量获取(如需高效可改用异步)
    articles = batch_get_article_list_sync(keyword=keyword, **kwargs)
    all_articles.extend(articles)
    logging.info(f"=== 关键词「{keyword}」监测完成,获取{len(articles)}条数据 ===")
# 保存所有关键词的文章列表
save_article_list(all_articles)
logging.info("=== 关键词定时监测任务执行完成 ===")

调用示例(支持单页/批量/异步/定时监测)

if name == "main":

# 模式1:获取单页文章列表(关键词“数字经济 政策解读”,30天内,按发布时间排序)
single_page_result = get_article_list_sync(
    keyword="数字经济 政策解读",
    time_range="30days",
    sort_type="pub_time",
    page_no=1,
    page_size=20
)
if single_page_result["success"]:
    print("="*80)
    print(f"关键词「数字经济 政策解读」第1页文章列表(共{len(single_page_result['data'])}条)")
    print("="*80)
    for idx, article in enumerate(single_page_result["data"][:10], 1):  # 打印前10条
        print(f"{idx:2d}. 标题:{article['文章标题']}")
        print(f"   公众号:{article['公众号名称']}({article['公众号认证类型']})")
        print(f"   发布时间:{article['发布时间']} | 阅读量:{article['阅读量']} | 在看数:{article['在看数']}")
        print(f"   相关度:{article['相关度评分']}分 | 状态:{article['文章状态']}")
        print(f"   URL:{article['文章URL']}")
        print("-"*80)
else:
    print(f"单页文章列表获取失败:{single_page_result['error_msg']}(错误码:{single_page_result['error_code']})")

# 模式2:同步批量获取多页文章列表(关键词“AI大模型”,7天内,最多获取5页)
# batch_articles = batch_get_article_list_sync(
#     keyword="AI大模型",
#     time_range="7days",
#     sort_type="read_count",
#     max_page=5,
#     page_size=30
# )
# save_article_list(batch_articles)

# 模式3:异步批量获取多页文章列表(关键词“乡村振兴”,自定义时间2024-01-01至2024-12-31)
# asyncio.run(batch_get_article_list_async(
#     keyword="乡村振兴",
#     time_range="custom",
#     start_date="2024-01-01",
#     end_date="2024-12-31",
#     verify_type="official",
#     max_page=10,
#     page_size=50
# ))

# 模式4:启动定时监测任务(每1小时监测一次关键词“数字经济”“AI大模型”)
# scheduler = BlockingScheduler()
# scheduler.add_job(
#     keyword_monitor_task,
#     'interval',
#     hours=1,
#     args=[["数字经济", "AI大模型"]],
#     kwargs={"time_range": "1day", "sort_type": "pub_time", "max_page": 3}
# )
# logging.info("关键词定时监测任务已启动,每1小时执行一次...")
# try:
#     scheduler.start()
# except (KeyboardInterrupt, SystemExit):
#     logging.info("关键词定时监测任务已停止")

四、调试与验证:快速定位问题

  1. 调试步骤(优先用 Postman 验证,排除代码干扰)
    手动拼接参数:在 Postman 中创建 POST 请求,填写appkey、keyword、time_range、timestamp等必填项,按业务需求补充可选参数;
    生成签名:按服务商规则手动计算sign(用在线 MD5 工具验证,输入拼接后的字符串);
    配置请求头:设置Content-Type: application/json,将参数以 JSON 格式传入请求体;
    发送请求:点击发送,查看响应结果;
    验证结果:
    若返回 200 且文章列表完整:接口正常,可对接代码;
    若返回 401(签名无效):检查参数排序、secret 是否正确、时间戳是否过期、中文参数是否编码;
    若返回 403(权限不足):确认账号类型(普通 / 企业)是否匹配筛选条件 / 字段需求,是否已付费或申请高级权限;
    若返回 400(参数错误):核对time_range/match_type等参数取值是否合法、必填参数是否缺失、自定义时间格式是否正确;
    若返回 429(频率超限):降低调用频率,或申请提升配额;
    若返回 404(接口不存在):核对 API_URL 是否正确,确认item_search接口已申请开通;
    若返回 500(服务器异常):记录日志,稍后重试(可能是服务商接口问题);
    若返回 503(服务不可用):联系服务商确认接口是否维护;
    若返回 “无匹配文章”:调整关键词(如扩大范围)、时间范围或筛选条件,确认是否有相关文章存在。
  2. 常见调试问题排查(关键词搜索场景高频问题)
    问题现象 常见原因 排查方案
    签名错误(401) 1. 参数排序错误;2. secret 错误;3. 时间戳过期;4. 中文关键词未编码;5. 服务商签名规则差异 1. 按 ASCII 升序排序参数并打印验证;2. 核对 secret 与服务商后台一致;3. 校准本地时间(误差≤5 分钟);4. 用urlencode处理中文关键词;5. 重新查看服务商文档,确认是否需要 SHA256 加密或额外参数(如nonce随机数)
    无匹配文章 1. 关键词过于精准 / 宽泛;2. 时间范围过窄;3. 筛选条件过于严格;4. 文章未被搜狗收录;5. 关键词组合逻辑错误 1. 调整关键词(如 “数字经济政策” 改为 “数字经济 政策”);2. 扩大时间范围(如从 1day 改为 7days);3. 放宽筛选条件(如verify_type从 “official” 改为 “all”);4. 直接在搜狗微信搜索官网验证关键词是否有结果;5. 确认多关键词组合是否正确(AND 用空格,OR 用
    权限不足(403) 1. 普通用户使用高级筛选(如article_type=original、精确匹配);2. 调用次数耗尽;3. 业务用途未通过审核 1. 移除高级筛选条件,或升级为企业用户;2. 查看服务商后台调用次数剩余量,及时充值;3. 补充业务合规说明(如 “仅用于内部舆情监测”),重新提交审核
    频率超限(429) 单 IP / 账号调用次数超过服务商限制 1. 批量分页时增加间隔(普通用户 6 秒 / 次,企业用户 1 秒 / 次);2. 异步请求控制并发数(≤5);3. 分时段调用,避免集中请求;4. 用ratelimit库控制调用频率(Python 示例:@limits(calls=10, period=60));5. 联系服务商申请提升配额
    参数错误(400) 1. time_range=custom未传start_date/end_date;2. 日期格式错误(非 “YYYY-MM-DD”);3. match_type/sort_type取值非法;4. 关键词为空 1. 确保自定义时间范围参数完整;2. 统一日期格式为 “YYYY-MM-DD”;3. 核对服务商文档,确认参数取值(如match_type仅支持 “fuzzy”“exact”);4. 确保关键词非空且符合格式要求
    文章列表重复 1. 分页时缓存不一致;2. 关键词多义性导致重复收录;3. 未去重处理 1. 批量获取时一次性请求所有页码,减少缓存影响;2. 优化关键词(如增加限定词);3. 按article_url去重(代码中已实现)
    文章状态为 “已删除 / 违规” 1. 文章已被公众号删除;2. 文章涉及违规内容被屏蔽;3. 搜狗收录后文章状态变更 1. 直接访问article_url确认文章是否存在;2. 过滤该类文章,避免影响数据质量;3. 企业版可申请 “实时状态同步” 权限,提升数据时效性
    响应速度慢 1. 关键词匹配结果过多;2. 分页page_size过大;3. 服务商服务器负载高;4. 网络波动 1. 优化关键词,缩小筛选范围;2. 减小page_size(如从 50 改为 20);3. 避开高峰时段调用;4. 增加超时时间(如从 10 秒改为 15 秒)
    五、进阶优化:提升效率与稳定性(生产级必备)
  3. 性能优化(批量 / 实时监测场景重点)
    (1)批量请求优化
    异步并发请求:多页码 / 多关键词批量获取时,用异步请求并行拉取(控制并发数≤5,避免频率超限),如代码中batch_get_article_list_async方法,效率比同步提升 3-5 倍;
    分页智能停止:获取第 1 页后,根据page_total计算总页码,仅请求有效页码,避免无效分页(如page_no超过page_total);
    字段筛选精准化:仅保留业务必需字段(如fields=title,article_url,pub_time,read_count),减少数据传输量和解析耗时。
    (2)缓存策略优化
    搜索结果缓存:用 Redis 缓存关键词 + 筛选条件组合对应的搜索结果(缓存 key = 关键词 + 时间范围 + 筛选条件,有效期 30 分钟 - 1 小时),避免重复请求相同关键词;
    缓存穿透防护:对无匹配结果的关键词,缓存空结果(有效期 10 分钟),避免频繁无效请求;
    热点关键词缓存:对高频监测的关键词(如实时舆情关键词),缩短缓存有效期(如 10 分钟),平衡实时性与效率。
    (3)关键词策略优化
    关键词分词与扩展:利用 jieba 分词对核心关键词进行扩展(如 “数字经济” 扩展为 “数字经济、数字产业化、产业数字化”),提升匹配覆盖率;
    多关键词优先级:按业务重要性划分关键词优先级(如核心关键词、次要关键词),核心关键词高频监测,次要关键词低频监测;
    关键词去重:多关键词监测时,避免关键词重复(如 “AI 大模型” 和 “大模型 AI”),减少冗余请求。
  4. 稳定性优化(生产级必备)
    异常重试机制:
    对 429(频率超限)、500(服务器异常)、503(服务不可用)错误,采用指数退避策略重试(5s→10s→20s);
    重试次数≤3 次,避免无效重试导致更多错误;
    对 401(签名错误)、403(权限不足)、400(参数错误)错误,直接返回并日志告警(需人工介入)。
    密钥与权限安全:
    定期轮换secret(每 3 个月更新 1 次),在服务商后台操作;
    生产环境将appkey和secret存储在环境变量或配置中心(如 Nacos),避免硬编码;
    限制接口调用 IP(部分服务商支持 IP 白名单),防止密钥泄露后被滥用;
    企业用户可申请 “子账号” 权限,按业务模块分配不同appkey,便于权限管控。
    日志与监控:
    详细记录每次请求的参数、签名、响应结果、错误信息、调用耗时,便于问题追溯;
    配置日志告警(如通过邮件 / 钉钉推送高频错误、频率超限、权限不足提示),实时监控接口状态;
    监控数据质量(如连续 10 次请求无结果、高频关键词响应时间超过 5 秒),及时触发告警;
    统计关键词匹配数量、文章状态分布,评估接口使用效果。
  5. 业务场景专属适配
    (1)舆情监测场景
    实时性优化:高频关键词监测(如每 15 分钟 / 1 小时),用 “1day” 时间范围 +“pub_time” 排序,快速捕获最新舆情;
    敏感内容识别:基于tags(文章标签)、summary(摘要)和关键词匹配(如 “投诉”“争议”“风险”),筛选敏感舆情文章,触发告警;
    多维度统计:按 “发布时间”“公众号认证类型”“互动量等级” 统计舆情文章分布,生成舆情简报;
    关联文章追踪:通过article_id联动item_get接口,获取敏感文章正文,深入分析舆情内容。
    (2)内容聚合场景
    文章质量筛选:按read_count(阅读量≥1000)、relevance_score(相关度≥80 分)、is_original(原创)筛选优质文章;
    分类导航:基于tags(文章标签)构建内容分类导航(如 “财经”“科技”“民生”),支持用户按需筛选;
    去重优化:按article_url+title组合去重,避免同一文章被多个关键词匹配重复收录;
    个性化推荐:基于用户关注的关键词和tags偏好,推荐相关优质文章。
    (3)学术研究场景
    历史数据批量采集:用time_range=custom自定义时间范围(如 1 年),批量获取所有匹配文章,归档至数据库;
    数据标准化:统一时间格式、互动量单位,提取title、summary、content(需item_get接口)进行文本分析;
    多关键词组合采集:按研究主题,组合多个相关关键词(如 “乡村振兴 | 农村发展 | 乡村治理”),确保数据覆盖全面;
    趋势分析:按 “月份”“季度” 统计文章发布数量、关键词出现频率,分析研究主题的时间趋势。
    六、避坑指南:常见问题与解决方案(关键词搜索场景高频)
  6. 签名错误(最高频问题)
    原因:参数排序错误、secret 错误、时间戳过期、中文关键词未编码、服务商签名规则差异;
    解决方案:
    严格按 ASCII 升序排序参数,打印sorted_params确认排序正确;
    用urlencode自动处理中文关键词和特殊字符,避免手动拼接编码错误;
    用在线 MD5/SHA256 工具验证签名(输入拼接后的字符串,对比代码生成结果);
    确保时间戳为毫秒级(int(time.time()*1000)),本地时间与服务器时间误差≤5 分钟;
    若仍报错,重新查阅服务商文档,确认是否需要添加额外参数(如nonce)或采用其他加密方式。
  7. 合规风险(核心注意事项)
    原因:使用非法爬虫接口、数据用于商业化售卖、侵犯公众号版权、抓取敏感信息;
    解决方案:
    仅使用合规服务商接口,要求服务商提供《数据合规承诺书》和《版权使用授权》;
    明确数据使用场景,禁止商业化售卖、恶意传播或用于违法活动;
    引用文章数据时,标注 “数据来源:搜狗微信搜索” 和 “文章版权归原公众号所有”;
    涉及原创文章的二次使用(如转载、节选),需提前获得公众号作者授权;
    避开敏感关键词(如涉密、违法违规关键词),不采集隐私公众号文章。
  8. 频率超限与调用成本控制
    原因:批量请求未控制频率、关键词监测过于频繁、无效分页请求过多;
    解决方案:
    按服务商限制控制调用频率(普通用户 10 次 / 分钟,企业用户 50 次 / 分钟),批量请求增加间隔;
    按关键词优先级调整监测频率(核心关键词每 1 小时,次要关键词每 6 小时);
    智能分页:仅请求前 N 页(如前 5 页),或根据total判断是否需要继续分页(如total≤100时请求全部);
    企业用户可申请 “按结果计费” 套餐,避免无效请求(如无匹配文章不扣费)。
  9. 数据质量优化
    原因:文章重复、状态异常(已删除 / 违规)、互动数据不准确、收录延迟;
    解决方案:
    按article_url去重,避免同一文章被多个关键词匹配重复收录;
    过滤 “已删除 / 违规” 文章,仅保留正常状态数据;
    互动数据中 “10 万 +” 转换为具体数值(如 100000),统一数据格式;
    新文章收录延迟 10-30 分钟,实时监测场景需预留缓冲时间;
    对比多个服务商的接口数据,选择数据准确性更高的服务商。
    七、上线前检查清单(生产级必查)
    密钥是否保密(未硬编码、未提交到代码仓库,用环境变量 / 配置中心存储);
    异常处理是否完整(覆盖 401/403/404/429/500/503 等所有常见错误码);
    频率控制是否到位(调用频率未超过服务商配额,批量请求有合理间隔 / 并发控制);
    权限与付费是否合规(账号类型匹配筛选条件 / 字段需求,调用次数充足);
    数据处理是否规范(时间格式统一、字段映射正确、去重处理、异常数据过滤);
    日志是否完善(记录请求参数、响应结果、错误
相关文章
|
1天前
|
云安全 人工智能 自然语言处理
|
9天前
|
数据采集 人工智能 自然语言处理
Meta SAM3开源:让图像分割,听懂你的话
Meta发布并开源SAM 3,首个支持文本或视觉提示的统一图像视频分割模型,可精准分割“红色条纹伞”等开放词汇概念,覆盖400万独特概念,性能达人类水平75%–80%,推动视觉分割新突破。
648 56
Meta SAM3开源:让图像分割,听懂你的话
|
6天前
|
搜索推荐 编译器 Linux
一个可用于企业开发及通用跨平台的Makefile文件
一款适用于企业级开发的通用跨平台Makefile,支持C/C++混合编译、多目标输出(可执行文件、静态/动态库)、Release/Debug版本管理。配置简洁,仅需修改带`MF_CONFIGURE_`前缀的变量,支持脚本化配置与子Makefile管理,具备完善日志、错误提示和跨平台兼容性,附详细文档与示例,便于学习与集成。
319 116
|
5天前
|
人工智能 Java API
Java 正式进入 Agentic AI 时代:Spring AI Alibaba 1.1 发布背后的技术演进
Spring AI Alibaba 1.1 正式发布,提供极简方式构建企业级AI智能体。基于ReactAgent核心,支持多智能体协作、上下文工程与生产级管控,助力开发者快速打造可靠、可扩展的智能应用。
|
21天前
|
域名解析 人工智能
【实操攻略】手把手教学,免费领取.CN域名
即日起至2025年12月31日,购买万小智AI建站或云·企业官网,每单可免费领1个.CN域名首年!跟我了解领取攻略吧~
|
8天前
|
机器学习/深度学习 人工智能 自然语言处理
AgentEvolver:让智能体系统学会「自我进化」
AgentEvolver 是一个自进化智能体系统,通过自我任务生成、经验导航与反思归因三大机制,推动AI从“被动执行”迈向“主动学习”。它显著提升强化学习效率,在更少参数下实现更强性能,助力智能体持续自我迭代。开源地址:https://github.com/modelscope/AgentEvolver
440 32
|
4天前
|
弹性计算 人工智能 Cloud Native
阿里云无门槛和有门槛优惠券解析:学生券,满减券,补贴券等优惠券领取与使用介绍
为了回馈用户与助力更多用户节省上云成本,阿里云会经常推出各种优惠券相关的活动,包括无门槛优惠券和有门槛优惠券。本文将详细介绍阿里云无门槛优惠券的领取与使用方式,同时也会概述几种常见的有门槛优惠券,帮助用户更好地利用这些优惠,降低云服务的成本。
272 132

热门文章

最新文章