文章附件下载:https://www.pan38.com/dow/share.php?code=JCnzE 提取密码:5035
这个Python脚本实现了以下功能:
完整的淘宝API调用封装,包括签名生成和错误处理
支持单商品上传和批量并发上传
包含商品基本信息、图片、SKU等完整属性设置
自动重试机制和频率控制
详细的日志记录功能
从CSV文件加载商品数据的工具函数
多线程并发处理提高上传效率
import requests
import json
import time
import hashlib
import random
import os
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
class TaobaoBatchUploader:
def init(self, app_key, app_secret, session_key):
self.app_key = app_key
self.app_secret = app_secret
self.session_key = session_key
self.api_url = "https://eco.taobao.com/router/rest"
self.max_retries = 3
self.retry_delay = 2
self.log_file = "taobao_upload.log"
def _generate_sign(self, params):
"""生成API签名"""
param_str = ""
for key in sorted(params.keys()):
param_str += key + str(params[key])
sign_str = self.app_secret + param_str + self.app_secret
return hashlib.md5(sign_str.encode('utf-8')).hexdigest().upper()
def _call_api(self, method, params):
"""调用淘宝API"""
base_params = {
'method': method,
'app_key': self.app_key,
'session': self.session_key,
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'format': 'json',
'v': '2.0',
'sign_method': 'md5',
'partner_id': 'top-sdk-python-20200801'
}
params.update(base_params)
params['sign'] = self._generate_sign(params)
for attempt in range(self.max_retries):
try:
response = requests.post(self.api_url, data=params)
result = response.json()
if 'error_response' in result:
error = result['error_response']
self._log_error(f"API Error: {error.get('code')} - {error.get('msg')}")
if error.get('code') == '27': # 请求频繁
time.sleep(10)
continue
return None
return result
except Exception as e:
self._log_error(f"API Request Failed: {str(e)}")
if attempt < self.max_retries - 1:
time.sleep(self.retry_delay * (attempt + 1))
continue
return None
def _log_error(self, message):
"""记录错误日志"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
log_message = f"[{timestamp}] {message}\n"
with open(self.log_file, 'a', encoding='utf-8') as f:
f.write(log_message)
def upload_single_item(self, item_data):
"""上传单个商品"""
params = {
'num': item_data.get('quantity', 1),
'price': item_data['price'],
'type': 'fixed',
'stuff_status': 'new',
'title': item_data['title'],
'desc': item_data.get('description', ''),
'cid': item_data['category_id'],
'location.state': item_data.get('province', ''),
'location.city': item_data.get('city', ''),
'postage_id': item_data.get('postage_id', '0'),
'has_invoice': item_data.get('has_invoice', 'false'),
'has_warranty': item_data.get('has_warranty', 'false'),
'approve_status': 'onsale',
'list_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
# 处理图片
if 'images' in item_data:
params['image'] = item_data['images'][0]
if len(item_data['images']) > 1:
for i, img in enumerate(item_data['images'][1:6], start=1):
params[f'image_{i}'] = img
# 处理SKU
if 'skus' in item_data:
for i, sku in enumerate(item_data['skus'], start=1):
params[f'sku_properties_{i}'] = sku['properties']
params[f'sku_quantities_{i}'] = sku['quantity']
params[f'sku_prices_{i}'] = sku['price']
params[f'sku_outer_ids_{i}'] = sku.get('outer_id', '')
result = self._call_api('taobao.item.add', params)
if result and 'item_add_response' in result:
item_id = result['item_add_response']['item']['item_id']
self._log_error(f"Successfully uploaded item: {item_id}")
return item_id
return None
def batch_upload(self, items_data, max_workers=5):
"""批量上传商品"""
success_count = 0
failed_count = 0
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []
for item in items_data:
futures.append(executor.submit(self.upload_single_item, item))
for future in futures:
try:
result = future.result()
if result:
success_count += 1
results.append(result)
else:
failed_count += 1
except Exception as e:
self._log_error(f"Upload failed: {str(e)}")
failed_count += 1
summary = {
'total': len(items_data),
'success': success_count,
'failed': failed_count,
'item_ids': results
}
self._log_error(f"Batch upload completed. Success: {success_count}, Failed: {failed_count}")
return summary
def load_item_data_from_csv(file_path):
"""从CSV文件加载商品数据"""
import csv
items = []
with open(file_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
item = {
'title': row['title'],
'price': float(row['price']),
'quantity': int(row.get('quantity', 1)),
'category_id': row['category_id'],
'description': row.get('description', ''),
'images': row['images'].split('|') if 'images' in row else []
}
if 'skus' in row:
skus = []
for sku_str in row['skus'].split(';'):
sku_parts = sku_str.split(':')
if len(sku_parts) >= 3:
skus.append({
'properties': sku_parts[0],
'quantity': int(sku_parts[1]),
'price': float(sku_parts[2]),
'outer_id': sku_parts[3] if len(sku_parts) > 3 else ''
})
item['skus'] = skus
items.append(item)
return items
def main():
# 配置淘宝开放平台应用信息
app_key = "your_app_key"
app_secret = "your_app_secret"
session_key = "your_session_key"
# 初始化上传器
uploader = TaobaoBatchUploader(app_key, app_secret, session_key)
# 从CSV加载商品数据
try:
items_data = load_item_data_from_csv("items_to_upload.csv")
print(f"Loaded {len(items_data)} items for upload.")
# 批量上传
result = uploader.batch_upload(items_data)
print(f"Upload completed. Success: {result['success']}, Failed: {result['failed']}")
print(f"Uploaded item IDs: {result['item_ids']}")
except Exception as e:
print(f"Error: {str(e)}")
if name == "main":
main()