淘宝一键上货发布软件,淘宝批量发布上架工具, 淘宝批量上架脚本【python】

简介: 这个Python脚本实现了以下功能:完整的淘宝API调用封装

文章附件下载:https://www.pan38.com/dow/share.php?code=JCnzE 提取密码:5035

这个Python脚本实现了以下功能:

完整的淘宝API调用封装,包括签名生成和错误处理
支持单商品上传和批量并发上传
包含商品基本信息、图片、SKU等完整属性设置
自动重试机制和频率控制
详细的日志记录功能
从CSV文件加载商品数据的工具函数
多线程并发处理提高上传效率

import requests
import json
import time
import hashlib
import random
import os
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor

class TaobaoBatchUploader:
def init(self, app_key, app_secret, session_key):
self.app_key = app_key
self.app_secret = app_secret
self.session_key = session_key
self.api_url = "https://eco.taobao.com/router/rest"
self.max_retries = 3
self.retry_delay = 2
self.log_file = "taobao_upload.log"

def _generate_sign(self, params):
    """生成API签名"""
    param_str = ""
    for key in sorted(params.keys()):
        param_str += key + str(params[key])
    sign_str = self.app_secret + param_str + self.app_secret
    return hashlib.md5(sign_str.encode('utf-8')).hexdigest().upper()

def _call_api(self, method, params):
    """调用淘宝API"""
    base_params = {
        'method': method,
        'app_key': self.app_key,
        'session': self.session_key,
        'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
        'format': 'json',
        'v': '2.0',
        'sign_method': 'md5',
        'partner_id': 'top-sdk-python-20200801'
    }
    params.update(base_params)
    params['sign'] = self._generate_sign(params)

    for attempt in range(self.max_retries):
        try:
            response = requests.post(self.api_url, data=params)
            result = response.json()

            if 'error_response' in result:
                error = result['error_response']
                self._log_error(f"API Error: {error.get('code')} - {error.get('msg')}")
                if error.get('code') == '27':  # 请求频繁
                    time.sleep(10)
                    continue
                return None
            return result

        except Exception as e:
            self._log_error(f"API Request Failed: {str(e)}")
            if attempt < self.max_retries - 1:
                time.sleep(self.retry_delay * (attempt + 1))
            continue
    return None

def _log_error(self, message):
    """记录错误日志"""
    timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    log_message = f"[{timestamp}] {message}\n"
    with open(self.log_file, 'a', encoding='utf-8') as f:
        f.write(log_message)

def upload_single_item(self, item_data):
    """上传单个商品"""
    params = {
        'num': item_data.get('quantity', 1),
        'price': item_data['price'],
        'type': 'fixed',
        'stuff_status': 'new',
        'title': item_data['title'],
        'desc': item_data.get('description', ''),
        'cid': item_data['category_id'],
        'location.state': item_data.get('province', ''),
        'location.city': item_data.get('city', ''),
        'postage_id': item_data.get('postage_id', '0'),
        'has_invoice': item_data.get('has_invoice', 'false'),
        'has_warranty': item_data.get('has_warranty', 'false'),
        'approve_status': 'onsale',
        'list_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    }

    # 处理图片
    if 'images' in item_data:
        params['image'] = item_data['images'][0]
        if len(item_data['images']) > 1:
            for i, img in enumerate(item_data['images'][1:6], start=1):
                params[f'image_{i}'] = img

    # 处理SKU
    if 'skus' in item_data:
        for i, sku in enumerate(item_data['skus'], start=1):
            params[f'sku_properties_{i}'] = sku['properties']
            params[f'sku_quantities_{i}'] = sku['quantity']
            params[f'sku_prices_{i}'] = sku['price']
            params[f'sku_outer_ids_{i}'] = sku.get('outer_id', '')

    result = self._call_api('taobao.item.add', params)
    if result and 'item_add_response' in result:
        item_id = result['item_add_response']['item']['item_id']
        self._log_error(f"Successfully uploaded item: {item_id}")
        return item_id
    return None

def batch_upload(self, items_data, max_workers=5):
    """批量上传商品"""
    success_count = 0
    failed_count = 0
    results = []

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = []
        for item in items_data:
            futures.append(executor.submit(self.upload_single_item, item))

        for future in futures:
            try:
                result = future.result()
                if result:
                    success_count += 1
                    results.append(result)
                else:
                    failed_count += 1
            except Exception as e:
                self._log_error(f"Upload failed: {str(e)}")
                failed_count += 1

    summary = {
        'total': len(items_data),
        'success': success_count,
        'failed': failed_count,
        'item_ids': results
    }

    self._log_error(f"Batch upload completed. Success: {success_count}, Failed: {failed_count}")
    return summary

def load_item_data_from_csv(file_path):
"""从CSV文件加载商品数据"""
import csv
items = []

with open(file_path, 'r', encoding='utf-8') as f:
    reader = csv.DictReader(f)
    for row in reader:
        item = {
            'title': row['title'],
            'price': float(row['price']),
            'quantity': int(row.get('quantity', 1)),
            'category_id': row['category_id'],
            'description': row.get('description', ''),
            'images': row['images'].split('|') if 'images' in row else []
        }

        if 'skus' in row:
            skus = []
            for sku_str in row['skus'].split(';'):
                sku_parts = sku_str.split(':')
                if len(sku_parts) >= 3:
                    skus.append({
                        'properties': sku_parts[0],
                        'quantity': int(sku_parts[1]),
                        'price': float(sku_parts[2]),
                        'outer_id': sku_parts[3] if len(sku_parts) > 3 else ''
                    })
            item['skus'] = skus

        items.append(item)

return items

def main():

# 配置淘宝开放平台应用信息
app_key = "your_app_key"
app_secret = "your_app_secret"
session_key = "your_session_key"

# 初始化上传器
uploader = TaobaoBatchUploader(app_key, app_secret, session_key)

# 从CSV加载商品数据
try:
    items_data = load_item_data_from_csv("items_to_upload.csv")
    print(f"Loaded {len(items_data)} items for upload.")

    # 批量上传
    result = uploader.batch_upload(items_data)
    print(f"Upload completed. Success: {result['success']}, Failed: {result['failed']}")
    print(f"Uploaded item IDs: {result['item_ids']}")

except Exception as e:
    print(f"Error: {str(e)}")

if name == "main":
main()

相关文章
|
25天前
|
存储 缓存 测试技术
理解Python装饰器:简化代码的强大工具
理解Python装饰器:简化代码的强大工具
|
2月前
|
程序员 测试技术 开发者
Python装饰器:简化代码的强大工具
Python装饰器:简化代码的强大工具
158 92
|
14天前
|
机器学习/深度学习 编解码 Python
Python图片上采样工具 - RealESRGANer
Real-ESRGAN基于深度学习实现图像超分辨率放大,有效改善传统PIL缩放的模糊问题。支持多种模型版本,推荐使用魔搭社区提供的预训练模型,适用于将小图高质量放大至大图,放大倍率越低效果越佳。
|
2月前
|
人工智能 自然语言处理 安全
Python构建MCP服务器:从工具封装到AI集成的全流程实践
MCP协议为AI提供标准化工具调用接口,助力模型高效操作现实世界。
437 1
|
24天前
|
算法 安全 数据安全/隐私保护
Python随机数函数全解析:5个核心工具的实战指南
Python的random模块不仅包含基础的随机数生成函数,还提供了如randint()、choice()、shuffle()和sample()等实用工具,适用于游戏开发、密码学、统计模拟等多个领域。本文深入解析这些函数的用法、底层原理及最佳实践,帮助开发者高效利用随机数,提升代码质量与安全性。
119 0
|
12月前
|
Linux 区块链 Python
Python实用记录(十三):python脚本打包exe文件并运行
这篇文章介绍了如何使用PyInstaller将Python脚本打包成可执行文件(exe),并提供了详细的步骤和注意事项。
466 1
Python实用记录(十三):python脚本打包exe文件并运行
|
存储 Shell 区块链
怎么把Python脚本打包成可执行程序?
该文档介绍了如何将Python脚本及其运行环境打包成EXE可执行文件,以便在不具备Python环境的计算机上运行。首先确保Python脚本能够正常运行,然后通过安装PyInstaller并使用`--onefile`参数将脚本打包成独立的EXE文件。此外,还提供了去除命令行窗口和指定可执行文件图标的详细方法。这些步骤帮助用户轻松地将Python程序分发给最终用户。
158 3
怎么把Python脚本打包成可执行程序?
|
存储 区块链 Python
怎么把Python脚本打包成可执行程序?
最近根据用户提的需求用python做了一个小工具,但是在给客户使用的时候不能直接发送python文件,毕竟让客户去安装python环境,那就离了大谱了。所以这时候就需要把多个py文件带着运行环境打包成EXE可执行文件。
怎么把Python脚本打包成可执行程序?
|
区块链 Python
Python脚本打包 exe,auto-py-to-exe来帮你!
Python脚本打包 exe,auto-py-to-exe来帮你!
799 0
|
存储 区块链 Python
怎么把Python脚本打包成可执行程序?
【6月更文挑战第3天】最近根据用户提的需求用python做了一个小工具,但是在给客户使用的时候不能直接发送python文件,毕竟让客户去安装python环境,那就离了大谱了。所以这时候就需要把多个py文件带着运行环境打包成EXE可执行文件。
142 1

推荐镜像

更多