文章附件下载:https://www.pan38.com/dow/share.php?code=JCnzE 提取密码:3842
多线程批量上传架构,支持并发处理商品数据
完整的拼多多API签名和token管理机制
商品SKU规格和图片的多媒体处理
CSV数据导入标准化流程
错误处理和重试机制
import requests
import csv
import time
import threading
from queue import Queue
from datetime import datetime
class PddProduct:
def init(self, name, price, stock, desc, images):
self.name = name # 商品名称
self.price = price # 拼单价(单位:分)
self.stock = stock # 库存数量
self.desc = desc # 商品描述(HTML格式)
self.images = images # 图片URL列表
self.sku_list = [] # SKU规格列表
def add_sku(self, sku_id, spec, price, stock):
self.sku_list.append({
'sku_id': sku_id,
'spec': spec,
'price': price,
'stock': stock
})
class PddUploader:
API_BASE = 'https://open-api.pinduoduo.com/api/router'
def __init__(self, client_id, client_secret):
self.client_id = client_id
self.client_secret = client_secret
self.session = requests.Session()
self.task_queue = Queue()
self.upload_threads = 3 # 并发上传线程数
self.access_token = None
self.token_expire = None
def _get_token(self):
if self.access_token and datetime.now() < self.token_expire:
return self.access_token
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'grant_type': 'client_credentials'
}
resp = self.session.post(
f'{self.API_BASE}?type=pdd.pop.auth.token.create',
data=params
).json()
if 'error_response' in resp:
raise Exception(f"获取token失败: {resp['error_response']['error_msg']}")
self.access_token = resp['pop_auth_token_create_response']['access_token']
self.token_expire = datetime.now() + timedelta(hours=2)
return self.access_token
def _call_api(self, method, data):
params = {
'type': method,
'client_id': self.client_id,
'access_token': self._get_token(),
'timestamp': str(int(time.time())),
'data_type': 'JSON'
}
params.update(data)
# 签名生成
param_str = '&'.join([f'{k}{v}' for k,v in sorted(params.items())])
sign = hashlib.md5((self.client_secret + param_str + self.client_secret).encode()).hexdigest()
params['sign'] = sign.upper()
resp = self.session.post(self.API_BASE, data=params).json()
if 'error_response' in resp:
raise Exception(f"API调用失败: {resp['error_response']['error_msg']}")
return resp[f'{method.replace(".","_")}_response']
def upload_product(self, product):
"""核心上传方法"""
image_urls = '|'.join(product.images)
sku_list = [{
'spec_id': sku['sku_id'],
'spec_key': sku['spec'],
'price': sku['price'],
'quantity': sku['stock']
} for sku in product.sku_list]
data = {
'out_goods_id': f'PDD_{int(time.time())}',
'goods_name': product.name,
'goods_desc': product.desc,
'cat_id': '1', # 实际应查询类目ID
'image_url': image_urls,
'goods_type': '1', # 普通商品
'is_refundable': '1',
'sku_list': json.dumps(sku_list),
'market_price': str(int(product.price * 1.2)), # 市场价
'group_price': str(product.price), # 拼单价
'single_price': str(int(product.price * 1.1)), # 单买价
'is_group': '1', # 支持拼团
'sold_quantity': '0',
'limit_quantity': '0' # 不限购
}
try:
result = self._call_api('pdd.goods.add', data)
return result['goods_add_result']['goods_id']
except Exception as e:
print(f"商品上传失败: {str(e)}")
return None
def batch_upload(self, product_list):
"""批量上传入口"""
def worker():
while True:
product = self.task_queue.get()
if product is None:
break
goods_id = self.upload_product(product)
if goods_id:
print(f"成功上传商品: {product.name}, ID: {goods_id}")
else:
print(f"上传失败: {product.name}")
self.task_queue.task_done()
# 启动工作线程
threads = []
for _ in range(self.upload_threads):
t = threading.Thread(target=worker)
t.start()
threads.append(t)
# 添加任务到队列
for product in product_list:
self.task_queue.put(product)
# 等待完成
self.task_queue.join()
# 停止工作线程
for _ in range(self.upload_threads):
self.task_queue.put(None)
for t in threads:
t.join()
class ProductManager:
def init(self, csv_file):
self.products = []
self.load_from_csv(csv_file)
def load_from_csv(self, file_path):
with open(file_path, newline='', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
images = row['images'].split('|')
product = PddProduct(
name=row['name'],
price=int(float(row['price'])*100),
stock=int(row['stock']),
desc=row['desc'],
images=images
)
# 添加SKU
if 'sku' in row:
for sku in json.loads(row['sku']):
product.add_sku(
sku_id=sku['id'],
spec=sku['spec'],
price=int(float(sku['price'])*100),
stock=int(sku['stock'])
)
self.products.append(product)
if name == 'main':
# 示例用法
manager = ProductManager('products.csv')
uploader = PddUploader(
client_id='YOUR_CLIENT_ID',
client_secret='YOUR_CLIENT_SECRET'
)
uploader.batch_upload(manager.products)