在电商开发这行摸爬滚打了八年,和淘宝评论 API 打交道的过程,简直就是一部 “血泪奋斗史”。从最初对接时被各种报错折磨得夜不能寐,到现在能熟练用它搭建起各种实用功能,其中的酸甜苦辣,今天就来给大伙好好唠唠,顺便分享些超实用的代码干货!
刚入行那会,接了个给电商商家做竞品分析工具的活儿,核心就是调用淘宝评论 API 抓取竞品评论。本以为按文档流程走,申请个接口权限,写几行代码调用就行,结果现实给了我狠狠一击。
申请接口权限就不顺利,淘宝开放平台对开发者资质审核严格,我把公司营业执照、应用使用场景说明改了又改,提交后还等了整整三个工作日才通过。好不容易拿到App Key和App Secret,调用接口时又卡在签名验证上。官方文档里签名算法写得晦涩难懂,参数排序、加密方式稍有差错,就返回40001签名错误。为了搞懂这算法,我对着文档研究了两天,还在 Stack Overflow 和国内技术论坛疯狂搜索,终于写出了正确的签名生成函数:
import hashlib import hmac import time import urllib.parse def generate_sign(params, app_secret): sorted_params = sorted(params.items(), key=lambda x: x[0]) sign_str = app_secret for k, v in sorted_params: sign_str += f"{k}{v}" sign_str += app_secret return hmac.new( app_secret.encode(), sign_str.encode(), hashlib.sha256 ).hexdigest().upper()
解决了签名问题,满心欢喜发送请求,结果又碰上接口调用频率限制。当时为了快速采集大量数据,没控制好请求频率,短时间内发送太多请求,直接被淘宝封了 IP,还收到警告邮件。无奈之下,只能研究淘宝的限流规则,用漏桶算法写了个频率控制类:
import time class LeakyBucket: def __init__(self, capacity, rate): self.capacity = capacity self.rate = rate self.tokens = capacity self.last_update = time.time() def consume(self, tokens=1): now = time.time() # 补充令牌 self.tokens = min( self.capacity, self.tokens + (now - self.last_update) * self.rate ) self.last_update = now if self.tokens >= tokens: self.tokens -= tokens return True return False # 使用示例 bucket = LeakyBucket(capacity=100, rate=20) # 容量100,每秒补充20个令牌 if bucket.consume(): # 调用API response = requests.get(api_url) else: time.sleep(0.1) # 等待令牌补充
数据到手后,也不是一帆风顺。淘宝评论数据格式复杂,有文字评论、图片评论、评分、追评等多种类型,不同类型数据结构差异大。就拿文字评论来说,有的用户会写一大段话,标点符号、表情符号混用,要准确提取关键信息,得用自然语言处理技术。为了处理这些数据,我引入了jieba分词库和TextBlob情感分析库,写了个数据清洗和情感分析函数:
import jieba from textblob import TextBlob def clean_and_analyze_comment(comment): # 分词 words = jieba.lcut(comment) # 去除停用词(可根据需求扩展停用词表) stopwords = {"的", "了", "是", "在"} clean_words = [word for word in words if word not in stopwords] clean_comment = " ".join(clean_words) # 情感分析 blob = TextBlob(clean_comment) sentiment = blob.sentiment.polarity return sentiment, clean_comment # 示例调用 comment = "这款手机拍照效果超棒,就是电池续航不太给力" sentiment, clean_comment = clean_and_analyze_comment(comment) print(f"情感倾向: {sentiment}, 清洗后评论: {clean_comment}")
有一回,客户要求实时监控自家商品评论,一有新评论就及时推送通知。这可难不倒我,通过设置合适的时间间隔,不断调用淘宝评论 API 获取最新评论。但很快又出现新问题,每次获取评论都要从第一页开始查,效率极低。后来我发现接口可以通过评论时间戳来筛选,只获取上次查询时间之后的评论,大大提高了效率:
import requests import time # 假设last_query_time是上次查询时间戳 def get_new_comments(last_query_time, app_key, app_secret, num_iid): params = { "method": "taobao.item.reviews.get", "app_key": app_key, "num_iid": num_iid, "start_date": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(last_query_time)), "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), "format": "json", "v": "2.0", } params["sign"] = generate_sign(params, app_secret) response = requests.get( "https://eco.taobao.com/router/rest", params=params ) data = response.json() if data["code"] == "200": new_comments = data["item_reviews_get_response"]["reviews"] return new_comments else: print(f"错误码: {data['code']}, 消息: {data.get('msg', '未知错误')}") return [] # 示例调用 last_query_time = 1690000000 # 假设初始时间戳 app_key = "your_app_key" app_secret = "your_app_secret" num_iid = "123456789" # 商品ID new_comments = get_new_comments(last_query_time, app_key, app_secret, num_iid) if new_comments: for comment in new_comments: print(f"用户: {comment['user_nick']}, 评论: {comment['rate_content']}")
还有一次,项目要做一个商品评论可视化大屏,展示不同商品好评率、差评关键词云图等信息。为了保证数据实时性和准确性,我用Flask框架搭建了一个后端服务,定时调用淘宝评论 API 更新数据,再通过Echarts在前端展示可视化图表。这过程中,数据缓存又成了难题,频繁调用 API 不仅浪费资源,还可能触发限流。于是引入Redis缓存,先从缓存里读取数据,如果缓存没有再调用 API 获取,获取后存入缓存:
import redis from flask import Flask, jsonify app = Flask(__name__) redis_client = redis.Redis(host="localhost", port=6379, db=0) @app.route("/product_comments/<num_iid>", methods=["GET"]) def get_product_comments(num_iid): cached_data = redis_client.get(num_iid) if cached_data: return jsonify(eval(cached_data.decode("utf-8"))) else: # 调用淘宝评论API获取数据,此处省略具体调用代码 api_data = get_comments_from_api(num_iid) redis_client.setex(num_iid, 3600, str(api_data)) # 缓存1小时 return jsonify(api_data) if __name__ == "__main__": app.run(debug=True)
这些年,靠着不断踩坑、填坑,在淘宝评论 API开发上也算积累了些经验。