下载地址:https://www.pan38.com/dow/share.php?code=JCnzE 提取密码:3311
该框架包含以下核心技术点:
使用Selenium实现自动化操作流程
多线程订单处理提升效率
自动重试机制保障成功率
配置文件管理敏感信息
购物车批量添加功能
扩展功能建议:
集成验证码识别模块
添加订单状态监控
实现自动评价功能
增加数据统计报表
开发图形化操作界面
注意事项:
需配合ChromeDriver使用
操作频率需模拟人工避免封号
建议使用独享小号账号
import time
import threading
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from queue import Queue
import configparser
import json
class PddBatchOrder:
def init(self):
self.config = self.load_config()
self.order_queue = Queue()
self.driver = self.init_driver()
self.login_status = False
def load_config(self):
config = configparser.ConfigParser()
config.read('config.ini')
return {
'username': config.get('AUTH', 'USERNAME'),
'password': config.get('AUTH', 'PASSWORD'),
'chrome_path': config.get('PATH', 'CHROME_DRIVER'),
'max_threads': int(config.get('SETTINGS', 'MAX_THREADS')),
'retry_times': int(config.get('SETTINGS', 'RETRY_TIMES'))
}
def init_driver(self):
options = webdriver.ChromeOptions()
options.add_argument("--disable-blink-features=AutomationControlled")
options.add_experimental_option("debuggerAddress", "127.0.0.1:9222")
options.add_experimental_option("excludeSwitches", ["enable-automation"])
return webdriver.Chrome(
executable_path=self.config['chrome_path'],
options=options
)
def qr_login(self):
self.driver.get("https://mobile.yangkeduo.com/login.html")
wait = WebDriverWait(self.driver, 60)
wait.until(EC.presence_of_element_located(
(By.XPATH, '//div[contains(text(),"首页")]')
))
self.login_status = True
def batch_add_cart(self, product_links):
for link in product_links:
self.driver.get(link)
try:
WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable(
(By.XPATH, '//button[contains(text(),"加入购物车")]')
)
).click()
time.sleep(1)
except Exception as e:
print(f"添加商品失败: {str(e)}")
continue
def create_order_task(self):
while not self.order_queue.empty():
order_data = self.order_queue.get()
self.process_single_order(order_data)
self.order_queue.task_done()
def process_single_order(self, order_data):
for _ in range(self.config['retry_times']):
try:
self.driver.get("https://mobile.yangkeduo.com/order_checkout.html")
self.fill_order_info(order_data)
if self.submit_order():
print(f"订单提交成功: {order_data['product_id']}")
return True
except Exception as e:
print(f"订单处理失败: {str(e)}")
time.sleep(3)
return False
def fill_order_info(self, order_data):
# 实现地址选择、优惠券使用等逻辑
pass
def submit_order(self):
try:
WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable(
(By.XPATH, '//button[contains(text(),"提交订单")]')
)
).click()
return True
except:
return False
def start_batch_order(self, product_list, address_info):
if not self.login_status:
self.qr_login()
self.batch_add_cart(product_list)
# 创建订单任务队列
for product in product_list:
self.order_queue.put({
'product_id': product['id'],
'quantity': product['quantity'],
'address': address_info
})
# 启动多线程处理
threads = []
for _ in range(min(self.config['max_threads'], self.order_queue.qsize())):
t = threading.Thread(target=self.create_order_task)
t.start()
threads.append(t)
for t in threads:
t.join()
if name == "main":
order_tool = PddBatchOrder()
products = [
{"id": "100001", "link": "https://xxx.com", "quantity": 1},
{"id": "100002", "link": "https://xxx.com", "quantity": 2}
]
address = {"name": "张三", "phone": "13800138000", "detail": "北京市海淀区"}
order_tool.start_batch_order(products, address)