在跨境电商和供应链管理中,1688 作为国内最大的 B2B 工厂货源平台,拥有海量商品数据。传统的人工选品方式效率低下、容易出错,而通过 API 接口实现自动化采集,可以彻底解放人力,实现"无需选品"的智能货源对接。本文将深入讲解如何使用 Python 对接 1688 商品采集 API,构建工厂货源自动化系统。
一、方案概述:为什么需要自动化采集?
1.1 传统选品的痛点
表格
痛点 影响 解决方案
手动翻页比价 耗时耗力,效率低下 API 批量采集
数据不及时 错过最佳拿货时机 实时监控更新
信息不完整 SKU、库存、代发价遗漏 全字段自动获取
多平台操作 重复劳动,容易出错 一键铺货对接
反爬限制 IP 封禁,数据中断 官方合规接口
1.2 自动化采集的核心优势
通过 API 接口实现 1688 商品采集,可以:
无需人工选品:设定规则后系统自动筛选符合条件的商品
实时数据同步:价格、库存、销量变动自动更新
全字段获取:标题、价格、SKU、主图、详情、销量、代发价一次性获取
多平台铺货:采集后直接上架到淘宝、拼多多、抖音、跨境平台
7×24 小时监控:自动发现爆款、价格预警、库存提醒
二、1688 商品采集 API 体系
1688 开放平台及第三方数据服务提供了丰富的商品采集接口:
表格
接口 功能 适用场景
item_get 获取单个商品详情 深度分析、SKU 监控
item_search 按关键词搜索商品 批量选品、类目采集
item_search_img 按图搜索商品(拍立淘) 找同款、竞品分析
item_search_shop 获取店铺所有商品 整店采集、供应商管理
item_search_suggest 获取搜索词推荐 SEO 优化、关键词挖掘
item_fee 获取商品快递费用 运费核算、利润计算
seller_info 获取店铺详情 供应商资质审核
item_password 淘口令解析 链接转换、推广分析
cat_get 获取商品分类 类目导航、数据归类
三、环境准备与依赖安装
bash
复制
安装必要的 Python 库
pip install requests
pip install pandas # 数据处理与导出
pip install openpyxl # Excel 导出支持
pip install schedule # 定时任务
pip install python-dotenv # 环境变量管理
pip install loguru # 日志记录
四、核心实现:商品采集 API 客户端
4.1 基础 API 客户端
Python
复制
-- coding: utf-8 --
"""
1688 商品采集 API 客户端
支持:商品详情获取、关键词搜索、店铺采集、图片搜索
"""
import requests
import hashlib
import time
import json
import os
from typing import List, Dict, Optional, Callable
from dataclasses import dataclass, asdict
from datetime import datetime
from urllib.parse import urlencode
from loguru import logger
@dataclass
class ProductItem:
"""1688 商品数据结构"""
item_id: str
title: str
price: float
original_price: float
wholesale_price: float # 批发价
agent_price: float # 代发价
min_order: int # 最小起订量
unit: str
stock: int
sales_30d: int # 30天销量
total_sales: int
location: str # 发货地
supplier_name: str
supplier_id: str
supplier_level: str # 诚信通等级
is_support_mix: bool # 是否支持混批
is_support_agent: bool # 是否支持一件代发
main_image: str
images: List[str]
detail_url: str
category: str
attributes: Dict[str, str] # 商品属性
skus: List[Dict] # SKU 规格
create_time: str
update_time: str
class Alibaba1688Collector:
"""
1688 商品采集器
支持官方 API 和第三方数据服务
"""
def __init__(self, api_key: str, api_secret: str, base_url: Optional[str] = None):
"""
初始化采集器
Args:
api_key: API 调用 Key
api_secret: API 调用密钥
base_url: API 基础地址(默认使用第三方服务地址)
"""
self.api_key = api_key
self.api_secret = api_secret
self.base_url = base_url or "https://api.openclaw.com/1688"
self.session = requests.Session()
self.session.headers.update({
"Accept": "application/json",
"Content-Type": "application/x-www-form-urlencoded;charset=utf-8"
})
# 请求统计
self.request_count = 0
self.success_count = 0
logger.info(f"1688 采集器初始化完成 | 接口地址: {self.base_url}")
def _generate_sign(self, params: dict) -> str:
"""
生成 API 签名(MD5)
规则:参数按 key 排序后拼接 + secret,整体 MD5 大写
"""
sorted_params = sorted(params.items(), key=lambda x: x[0])
param_str = "".join([f"{k}{v}" for k, v in sorted_params])
sign_str = f"{self.api_secret}{param_str}{self.api_secret}"
return hashlib.md5(sign_str.encode('utf-8')).hexdigest().upper()
def _request(self, api_name: str, params: dict, timeout: int = 30) -> dict:
"""
发送 API 请求
Args:
api_name: API 接口名称(如 item_get, item_search)
params: 业务参数
timeout: 请求超时时间
Returns:
API 响应字典
"""
# 构建公共参数
common_params = {
"key": self.api_key,
"api_name": api_name,
"timestamp": int(time.time()),
"format": "json",
"lang": "cn"
}
common_params.update(params)
# 生成签名
common_params["sign"] = self._generate_sign(common_params)
url = f"{self.base_url}/{api_name}"
self.request_count += 1
try:
response = self.session.get(url, params=common_params, timeout=timeout)
response.raise_for_status()
result = response.json()
if result.get("code") == 0 or result.get("code") == 200:
self.success_count += 1
logger.debug(f"请求成功 | API: {api_name} | 耗时: {response.elapsed.total_seconds():.2f}s")
return result.get("data", result)
else:
logger.warning(f"请求失败 | API: {api_name} | 错误: {result.get('msg', '未知错误')}")
return {"error": result.get("msg", "请求失败"), "code": result.get("code")}
except requests.exceptions.Timeout:
logger.error(f"请求超时 | API: {api_name}")
return {"error": "请求超时", "code": -1}
except requests.exceptions.RequestException as e:
logger.error(f"请求异常 | API: {api_name} | {str(e)}")
return {"error": str(e), "code": -2}
def get_item_detail(self, num_iid: str, get_sales: bool = True,
get_agent: bool = True) -> Optional[ProductItem]:
"""
获取单个商品详情
Args:
num_iid: 1688 商品数字 ID
get_sales: 是否获取销量数据
get_agent: 是否获取代发价格
Returns:
ProductItem 对象
"""
params = {
"num_iid": num_iid,
"sales_data": 1 if get_sales else 0,
"agent": 1 if get_agent else 0
}
data = self._request("item_get", params)
if "error" in data:
return None
return self._parse_product(data)
def search_by_keyword(self, keyword: str, page: int = 1, page_size: int = 20,
sort: str = "sales", price_start: Optional[float] = None,
price_end: Optional[float] = None) -> List[ProductItem]:
"""
关键词搜索商品
Args:
keyword: 搜索关键词
page: 页码
page_size: 每页数量(最大 50)
sort: 排序方式(sales:销量, price_asc:价格升序, price_desc:价格降序, credit:信用)
price_start: 价格下限
price_end: 价格上限
Returns:
ProductItem 列表
"""
params = {
"q": keyword,
"page": page,
"page_size": page_size,
"sort": sort
}
if price_start is not None:
params["start_price"] = price_start
if price_end is not None:
params["end_price"] = price_end
data = self._request("item_search", params)
if "error" in data or "items" not in data:
return []
return [self._parse_product(item) for item in data.get("items", [])]
def search_by_image(self, image_url: str, page: int = 1) -> List[ProductItem]:
"""
按图搜索商品(拍立淘)
Args:
image_url: 图片 URL
page: 页码
Returns:
ProductItem 列表
"""
params = {
"imgid": image_url,
"page": page
}
data = self._request("item_search_img", params)
if "error" in data:
return []
return [self._parse_product(item) for item in data.get("items", [])]
def get_shop_items(self, seller_id: str, page: int = 1,
page_size: int = 20) -> List[ProductItem]:
"""
获取店铺所有商品
Args:
seller_id: 卖家 ID(供应商 ID)
page: 页码
page_size: 每页数量
Returns:
ProductItem 列表
"""
params = {
"seller_id": seller_id,
"page": page,
"page_size": page_size
}
data = self._request("item_search_shop", params)
if "error" in data:
return []
return [self._parse_product(item) for item in data.get("items", [])]
def _parse_product(self, data: dict) -> ProductItem:
"""解析商品数据为结构化对象"""
return ProductItem(
item_id=str(data.get("num_iid", "")),
title=data.get("title", ""),
price=float(data.get("price", 0) or 0),
original_price=float(data.get("original_price", 0) or 0),
wholesale_price=float(data.get("wholesale_price", 0) or 0),
agent_price=float(data.get("agent_price", 0) or 0),
min_order=int(data.get("min_num", 1) or 1),
unit=data.get("unit", "件"),
stock=int(data.get("stock", 0) or 0),
sales_30d=int(data.get("sales", 0) or 0),
total_sales=int(data.get("total_sales", 0) or 0),
location=data.get("location", ""),
supplier_name=data.get("supplier", ""),
supplier_id=str(data.get("seller_id", "")),
supplier_level=data.get("supplier_level", ""),
is_support_mix=data.get("is_support_mix", False),
is_support_agent=data.get("is_support_agent", False),
main_image=data.get("pic_url", ""),
images=data.get("item_imgs", []),
detail_url=data.get("detail_url", ""),
category=data.get("category", ""),
attributes=data.get("props", {}),
skus=data.get("skus", []),
create_time=data.get("created_time", ""),
update_time=data.get("modified_time", "")
)
def get_stats(self) -> dict:
"""获取采集统计"""
return {
"total_requests": self.request_count,
"success_requests": self.success_count,
"success_rate": f"{(self.success_count / max(self.request_count, 1) * 100):.1f}%"
}
==================== 使用示例 ====================
if name == "main":
# 初始化采集器(替换为真实凭证)
collector = Alibaba1688Collector(
api_key="your_api_key",
api_secret="your_api_secret"
)
# 示例 1:获取单个商品详情
print("=" * 60)
print("示例 1:获取商品详情")
print("=" * 60)
item_id = "702356889901" # 替换为真实商品 ID
product = collector.get_item_detail(item_id)
if product:
print(f"商品ID: {product.item_id}")
print(f"标题: {product.title}")
print(f"售价: ¥{product.price}")
print(f"批发价: ¥{product.wholesale_price}")
print(f"代发价: ¥{product.agent_price}")
print(f"起订量: {product.min_order} {product.unit}")
print(f"库存: {product.stock}")
print(f"30天销量: {product.sales_30d}")
print(f"发货地: {product.location}")
print(f"供应商: {product.supplier_name}")
print(f"支持代发: {'是' if product.is_support_agent else '否'}")
print(f"SKU数量: {len(product.skus)}")
else:
print("获取商品详情失败")
五、自动化选品引擎:无需人工筛选
5.1 智能选品规则引擎
Python
复制
from typing import List, Callable
from dataclasses import dataclass
@dataclass
class SelectionRule:
"""选品规则配置"""
name: str # 规则名称
min_price: float = 0 # 最低价格
max_price: float = float('inf') # 最高价格
min_sales_30d: int = 0 # 最低月销量
min_stock: int = 0 # 最低库存
must_support_agent: bool = False # 必须支持一件代发
must_support_mix: bool = False # 必须支持混批
supplier_levels: List[str] = None # 供应商等级要求
exclude_keywords: List[str] = None # 排除关键词
include_keywords: List[str] = None # 必须包含关键词
min_profit_margin: float = 0.3 # 最低利润率(基于代发价)
class AutoSelector:
"""
自动化选品器
根据预设规则自动筛选符合条件的商品
"""
def __init__(self, collector: Alibaba1688Collector):
self.collector = collector
self.rules: List[SelectionRule] = []
self.selected_products: List[ProductItem] = []
def add_rule(self, rule: SelectionRule):
"""添加选品规则"""
self.rules.append(rule)
logger.info(f"添加选品规则: {rule.name}")
def evaluate_product(self, product: ProductItem, rule: SelectionRule) -> bool:
"""
评估商品是否符合规则
Returns:
bool: 是否符合所有条件
"""
# 价格区间检查
if not (rule.min_price <= product.price <= rule.max_price):
return False
# 销量检查
if product.sales_30d < rule.min_sales_30d:
return False
# 库存检查
if product.stock < rule.min_stock:
return False
# 一件代发检查
if rule.must_support_agent and not product.is_support_agent:
return False
# 混批检查
if rule.must_support_mix and not product.is_support_mix:
return False
# 供应商等级检查
if rule.supplier_levels and product.supplier_level not in rule.supplier_levels:
return False
# 关键词排除
if rule.exclude_keywords:
for kw in rule.exclude_keywords:
if kw.lower() in product.title.lower():
return False
# 关键词必须包含
if rule.include_keywords:
for kw in rule.include_keywords:
if kw.lower() not in product.title.lower():
return False
# 利润率估算(简化计算)
if rule.min_profit_margin > 0 and product.agent_price > 0:
estimated_retail = product.price * 1.5 # 假设零售价为售价的 1.5 倍
profit_margin = (estimated_retail - product.agent_price) / estimated_retail
if profit_margin < rule.min_profit_margin:
return False
return True
def select_by_keyword(self, keyword: str, max_pages: int = 5) -> List[ProductItem]:
"""
根据关键词自动选品
Args:
keyword: 搜索关键词
max_pages: 最大搜索页数
Returns:
符合规则的商品列表
"""
all_selected = []
for page in range(1, max_pages + 1):
logger.info(f"正在搜索第 {page} 页 | 关键词: {keyword}")
products = self.collector.search_by_keyword(
keyword=keyword,
page=page,
page_size=50,
sort="sales" # 按销量排序,优先爆款
)
if not products:
break
for product in products:
# 应用所有规则
for rule in self.rules:
if self.evaluate_product(product, rule):
all_selected.append(product)
logger.info(f"✅ 选中商品: {product.title[:30]}... | "
f"销量:{product.sales_30d} | 价格:¥{product.price}")
break # 符合任一规则即可
# 限流控制
time.sleep(1)
self.selected_products.extend(all_selected)
logger.info(f"选品完成 | 共选中 {len(all_selected)} 个商品")
return all_selected
def select_by_supplier(self, seller_id: str) -> List[ProductItem]:
"""
整店采集并筛选
Args:
seller_id: 供应商店铺 ID
Returns:
符合规则的商品列表
"""
all_selected = []
page = 1
while True:
products = self.collector.get_shop_items(seller_id, page=page, page_size=50)
if not products:
break
for product in products:
for rule in self.rules:
if self.evaluate_product(product, rule):
all_selected.append(product)
break
page += 1
time.sleep(1)
self.selected_products.extend(all_selected)
return all_selected
def export_to_excel(self, filename: str = "selected_products.xlsx"):
"""导出选品结果到 Excel"""
import pandas as pd
data = []
for p in self.selected_products:
data.append({
"商品ID": p.item_id,
"标题": p.title,
"售价": p.price,
"批发价": p.wholesale_price,
"代发价": p.agent_price,
"起订量": p.min_order,
"库存": p.stock,
"30天销量": p.sales_30d,
"发货地": p.location,
"供应商": p.supplier_name,
"支持代发": "是" if p.is_support_agent else "否",
"链接": p.detail_url
})
df = pd.DataFrame(data)
df.to_excel(filename, index=False, engine='openpyxl')
logger.info(f"选品结果已导出: {filename} | 共 {len(data)} 条记录")
==================== 自动化选品示例 ====================
if name == "main":
collector = Alibaba1688Collector(
api_key="your_api_key",
api_secret="your_api_secret"
)
selector = AutoSelector(collector)
# 配置选品规则:寻找适合一件代发的低价爆款
selector.add_rule(SelectionRule(
name="一件代发爆款",
min_price=10,
max_price=100,
min_sales_30d=100,
min_stock=50,
must_support_agent=True,
supplier_levels=["实力商家", "超级工厂"],
min_profit_margin=0.4
))
# 配置规则:寻找高利润混批商品
selector.add_rule(SelectionRule(
name="高利润混批",
min_price=50,
max_price=500,
min_sales_30d=50,
must_support_mix=True,
include_keywords=["定制", "批发"],
min_profit_margin=0.5
))
# 自动选品
results = selector.select_by_keyword("蓝牙耳机", max_pages=3)
# 导出结果
selector.export_to_excel("bluetooth_earphones_selected.xlsx")
六、定时监控与价格预警系统
Python
复制
import schedule
import time
from datetime import datetime
from typing import Dict, Callable
class PriceMonitor:
"""
价格监控与预警系统
自动监控商品价格、库存变动,触发告警
"""
def __init__(self, collector: Alibaba1688Collector):
self.collector = collector
self.monitored_items: Dict[str, dict] = {} # 监控中的商品
self.alert_handlers: List[Callable] = []
self.price_history: Dict[str, List[tuple]] = {} # 价格历史
def add_item(self, item_id: str, target_price: Optional[float] = None):
"""
添加监控商品
Args:
item_id: 商品 ID
target_price: 目标价格(低于此价格触发提醒)
"""
product = self.collector.get_item_detail(item_id)
if product:
self.monitored_items[item_id] = {
"product": product,
"target_price": target_price,
"last_price": product.price,
"last_stock": product.stock,
"added_at": datetime.now()
}
self.price_history[item_id] = [(datetime.now(), product.price)]
logger.info(f"添加监控: {product.title[:30]}... | 当前价: ¥{product.price}")
def add_alert_handler(self, handler: Callable):
"""添加告警处理器"""
self.alert_handlers.append(handler)
def _check_changes(self, item_id: str):
"""检查商品变动"""
monitor = self.monitored_items[item_id]
old_product = monitor["product"]
# 重新获取最新数据
new_product = self.collector.get_item_detail(item_id)
if not new_product:
return
monitor["product"] = new_product
alerts = []
# 价格变动检测
if abs(new_product.price - monitor["last_price"]) > 0.01:
change_pct = (new_product.price - monitor["last_price"]) / monitor["last_price"] * 100
direction = "上涨" if change_pct > 0 else "下降"
alerts.append({
"type": "price_change",
"title": f"价格{direction} {abs(change_pct):.1f}%",
"content": f"{new_product.title[:30]}... | "
f"¥{monitor['last_price']} → ¥{new_product.price}",
"item_id": item_id,
"old_price": monitor["last_price"],
"new_price": new_product.price
})
monitor["last_price"] = new_product.price
self.price_history[item_id].append((datetime.now(), new_product.price))
# 库存变动检测
if old_product.stock == 0 and new_product.stock > 0:
alerts.append({
"type": "restock",
"title": "商品补货",
"content": f"{new_product.title[:30]}... | 库存: {new_product.stock}",
"item_id": item_id
})
elif old_product.stock > 0 and new_product.stock == 0:
alerts.append({
"type": "out_of_stock",
"title": "商品缺货",
"content": f"{new_product.title[:30]}...",
"item_id": item_id
})
# 目标价格达成
if monitor["target_price"] and new_product.price <= monitor["target_price"]:
alerts.append({
"type": "target_price",
"title": "达到目标价格",
"content": f"{new_product.title[:30]}... | "
f"当前¥{new_product.price} ≤ 目标¥{monitor['target_price']}",
"item_id": item_id,
"target_price": monitor["target_price"]
})
# 销量激增检测(日销量 > 100)
daily_sales = new_product.sales_30d - old_product.sales_30d
if daily_sales > 100:
alerts.append({
"type": "sales_surge",
"title": "销量激增",
"content": f"{new_product.title[:30]}... | 日增 {daily_sales} 单",
"item_id": item_id,
"daily_sales": daily_sales
})
# 触发告警
for alert in alerts:
self._trigger_alert(alert)
def _trigger_alert(self, alert: dict):
"""触发告警"""
logger.warning(f"【{alert['title']}】{alert['content']}")
for handler in self.alert_handlers:
try:
handler(alert)
except Exception as e:
logger.error(f"告警处理器异常: {e}")
def check_all(self):
"""检查所有监控商品"""
logger.info(f"开始批量检查 | 共 {len(self.monitored_items)} 个商品")
for item_id in list(self.monitored_items.keys()):
self._check_changes(item_id)
time.sleep(2) # 限流
def start_monitoring(self, interval_minutes: int = 30):
"""
启动定时监控
Args:
interval_minutes: 检查间隔(分钟)
"""
schedule.every(interval_minutes).minutes.do(self.check_all)
logger.info(f"定时监控已启动 | 间隔: {interval_minutes} 分钟")
while True:
schedule.run_pending()
time.sleep(1)
==================== 告警处理器示例 ====================
def dingtalk_alert(alert: dict):
"""钉钉机器人告警"""
# 实现钉钉 Webhook 发送
pass
def email_alert(alert: dict):
"""邮件告警"""
# 实现邮件发送
pass
def wechat_alert(alert: dict):
"""企业微信告警"""
# 实现企业微信发送
pass
==================== 监控示例 ====================
if name == "main":
collector = Alibaba1688Collector(
api_key="your_api_key",
api_secret="your_api_secret"
)
monitor = PriceMonitor(collector)
# 添加监控商品
monitor.add_item("702356889901", target_price=15.0)
monitor.add_item("702356889902", target_price=20.0)
# 添加告警处理器
monitor.add_alert_handler(dingtalk_alert)
monitor.add_alert_handler(wechat_alert)
# 启动监控(每 30 分钟检查一次)
monitor.start_monitoring(interval_minutes=30)
七、多平台铺货对接
Python
复制
import json
class PlatformPublisher:
"""
多平台铺货发布器
将采集的 1688 商品信息转换为各平台格式并发布
"""
def __init__(self):
self.platforms = {}
def register_platform(self, name: str, publisher: Callable):
"""注册平台发布器"""
self.platforms[name] = publisher
def publish(self, product: ProductItem, platforms: List[str]) -> Dict[str, bool]:
"""
发布商品到指定平台
Args:
product: 商品信息
platforms: 目标平台列表
Returns:
各平台发布结果
"""
results = {}
for platform in platforms:
if platform in self.platforms:
try:
self.platforms[platform](product)
results[platform] = True
logger.info(f"发布成功 | 平台: {platform} | 商品: {product.title[:30]}")
except Exception as e:
results[platform] = False
logger.error(f"发布失败 | 平台: {platform} | {e}")
else:
results[platform] = False
logger.warning(f"未注册平台: {platform}")
return results
def generate_taobao_format(self, product: ProductItem) -> dict:
"""生成淘宝上架格式"""
return {
"title": product.title,
"price": product.price * 1.5, # 加价 50%
"original_price": product.price * 2,
"quantity": min(product.stock, 100),
"category": self._map_category(product.category, "taobao"),
"images": product.images[:5],
"detail": self._generate_detail_html(product),
"express_fee": 0 # 包邮
}
def generate_pdd_format(self, product: ProductItem) -> dict:
"""生成拼多多上架格式"""
return {
"goods_name": product.title,
"market_price": int(product.price * 2),
"price": int(product.price * 1.3),
"quantity": min(product.stock, 1000),
"carousel_gallery": product.images[:10],
"detail_gallery": product.images,
"category_id": self._map_category(product.category, "pdd")
}
def generate_douyin_format(self, product: ProductItem) -> dict:
"""生成抖音小店上架格式"""
return {
"name": product.title[:30], # 抖音标题限制 30 字
"pic": product.main_image,
"description": product.title,
"price": int(product.price * 1.4),
"stock_num": product.stock,
"category_id": self._map_category(product.category, "douyin"),
"specs": self._convert_skus(product.skus)
}
def _map_category(self, category: str, platform: str) -> str:
"""类目映射(简化版,实际需维护映射表)"""
mappings = {
"taobao": {"数码": "14", "服装": "16"},
"pdd": {"数码": "8424", "服装": "12896"},
"douyin": {"数码": "20001", "服装": "20005"}
}
return mappings.get(platform, {}).get(category, "")
def _generate_detail_html(self, product: ProductItem) -> str:
"""生成详情页 HTML"""
html = f"<h1>{product.title}</h1>"
html += f"<p>发货地: {product.location}</p>"
html += f"<p>供应商: {product.supplier_name}</p>"
for img in product.images:
html += f'<img src="{img}" />'
return html
def _convert_skus(self, skus: List[dict]) -> List[dict]:
"""转换 SKU 格式"""
return [{
"spec_id": s.get("sku_id", ""),
"spec_name": s.get("properties", ""),
"price": s.get("price", 0),
"stock_num": s.get("quantity", 0)
} for s in skus]
==================== 铺货示例 ====================
if name == "main":
publisher = PlatformPublisher()
# 注册各平台发布器(实际需实现具体 API 调用)
publisher.register_platform("taobao", lambda p: print(f"发布到淘宝: {p.title}"))
publisher.register_platform("pdd", lambda p: print(f"发布到拼多多: {p.title}"))
publisher.register_platform("douyin", lambda p: print(f"发布到抖音: {p.title}"))
# 模拟商品
product = ProductItem(
item_id="123456",
title="测试商品",
price=10.0,
original_price=20.0,
wholesale_price=8.0,
agent_price=12.0,
min_order=1,
unit="件",
stock=100,
sales_30d=50,
total_sales=500,
location="义乌",
supplier_name="测试供应商",
supplier_id="789",
supplier_level="实力商家",
is_support_mix=True,
is_support_agent=True,
main_image="http://example.com/1.jpg",
images=["http://example.com/1.jpg", "http://example.com/2.jpg"],
detail_url="http://1688.com/item/123456",
category="数码",
attributes={},
skus=[],
create_time="",
update_time=""
)
# 发布到多平台
results = publisher.publish(product, ["taobao", "pdd", "douyin"])
print(f"发布结果: {results}")
八、完整自动化流程整合
Python
复制
class AutoSourcingPipeline:
"""
自动化货源对接流水线
实现从采集、选品、监控到铺货的全流程自动化
"""
def __init__(self, api_key: str, api_secret: str):
self.collector = Alibaba1688Collector(api_key, api_secret)
self.selector = AutoSelector(self.collector)
self.monitor = PriceMonitor(self.collector)
self.publisher = PlatformPublisher()
def run_full_pipeline(
self,
keywords: List[str],
rules: List[SelectionRule],
target_platforms: List[str],
monitor_interval: int = 30
):
"""
运行完整自动化流程
Args:
keywords: 搜索关键词列表
rules: 选品规则列表
target_platforms: 目标铺货平台
monitor_interval: 监控间隔(分钟)
"""
logger.info("=" * 60)
logger.info("启动 1688 自动化货源对接流水线")
logger.info("=" * 60)
# 步骤 1:配置选品规则
for rule in rules:
self.selector.add_rule(rule)
# 步骤 2:批量采集与选品
all_selected = []
for keyword in keywords:
logger.info(f"正在采集关键词: {keyword}")
selected = self.selector.select_by_keyword(keyword, max_pages=5)
all_selected.extend(selected)
# 去重
unique_items = {p.item_id: p for p in all_selected}
all_selected = list(unique_items.values())
logger.info(f"选品完成 | 共 {len(all_selected)} 个唯一商品")
# 步骤 3:添加到监控系统
for product in all_selected:
self.monitor.add_item(product.item_id)
# 步骤 4:铺货到目标平台
publish_results = []
for product in all_selected:
results = self.publisher.publish(product, target_platforms)
publish_results.append({
"item_id": product.item_id,
"title": product.title,
"results": results
})
# 步骤 5:启动价格监控
self.monitor.start_monitoring(monitor_interval)
return {
"total_selected": len(all_selected),
"publish_results": publish_results
}
==================== 完整运行示例 ====================
if name == "main":
# 初始化流水线
pipeline = AutoSourcingPipeline(
api_key="your_api_key",
api_secret="your_api_secret"
)
# 配置选品规则
rules = [
SelectionRule(
name="一件代发低价爆款",
min_price=5,
max_price=50,
min_sales_30d=200,
must_support_agent=True,
min_profit_margin=0.4
),
SelectionRule(
name="高利润定制款",
min_price=100,
max_price=500,
min_sales_30d=50,
include_keywords=["定制", "logo"],
min_profit_margin=0.5
)
]
# 运行完整流程
result = pipeline.run_full_pipeline(
keywords=["蓝牙耳机", "手机壳", "数据线"],
rules=rules,
target_platforms=["taobao", "pdd"],
monitor_interval=30
)
print(f"\n流水线运行完成 | 选中 {result['total_selected']} 个商品")
九、关键注意事项
表格
注意事项 说明
API 凭证安全 api_secret 严禁硬编码,使用环境变量或密钥管理服务
请求频率控制 默认限流 50 次/分钟,超出会返回 429 错误
数据合规使用 仅用于自有业务分析,不得转售或恶意爬取
商品信息更新 1688 商品信息可能实时变动,建议定时刷新
供应商沟通 一件代发需与供应商确认库存和发货时效
多语言支持 跨境场景可设置 lang=en 获取英文数据
十、总结
功能模块 核心能力 实现方式
商品采集 详情/搜索/店铺/图片搜索 item_get / item_search / item_search_shop
智能选品 自动筛选符合条件的商品 AutoSelector 规则引擎
价格监控 实时价格/库存变动告警 PriceMonitor 定时检查
多平台铺货 一键上架到淘宝/拼多多/抖音 PlatformPublisher 格式转换
数据导出 Excel/CSV 批量导出 pandas + openpyxl
通过这套 Python 自动化方案,你可以实现真正的"无需选品"——设定规则后,系统自动从 1688 工厂货源中筛选、监控、铺货,大幅降低人工成本,提升电商运营效率。
如遇任何疑问或有进一步的需求,请随时与我私信或者评论联系。