下载地址:https://www.pan38.com/dow/share.php?code=JCnzE 提取密码:1133
完整的拼多多自动化下单框架,包含登录、搜索商品、获取商品列表、下单等功能。代码使用了Selenium进行网页自动化操作,并加入了随机延迟、多账号支持等功能。请注意这仅用于技术学习。
import time
import random
import requests
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from fake_useragent import UserAgent
from bs4 import BeautifulSoup
import json
import csv
import os
import logging
from multiprocessing import Pool
配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
filename='pdd_automation.log'
)
logger = logging.getLogger(name)
class PDDAutomation:
def init(self, headless=True, proxy=None):
self.options = Options()
if headless:
self.options.add_argument('--headless')
if proxy:
self.options.add_argument(f'--proxy-server={proxy}')
self.options.add_argument('--disable-gpu')
self.options.add_argument('--no-sandbox')
self.options.add_argument('--disable-dev-shm-usage')
self.options.add_argument('--window-size=1920,1080')
# 随机User-Agent
ua = UserAgent()
self.options.add_argument(f'user-agent={ua.random}')
self.driver = webdriver.Chrome(options=self.options)
self.wait = WebDriverWait(self.driver, 20)
def login(self, username, password):
"""模拟登录拼多多"""
try:
self.driver.get('https://mobile.yangkeduo.com/login.html')
# 等待登录页面加载
phone_input = self.wait.until(
EC.presence_of_element_located((By.XPATH, '//input[@type="tel"]'))
)
# 输入用户名和密码
phone_input.send_keys(username)
time.sleep(random.uniform(0.5, 1.5))
# 点击获取验证码
get_code_btn = self.driver.find_element(By.XPATH, '//button[contains(text(),"获取验证码")]')
get_code_btn.click()
time.sleep(random.uniform(2, 3))
# 这里需要实际处理验证码,示例中跳过
logger.warning("需要手动处理验证码")
time.sleep(30) # 留出时间手动输入验证码
# 模拟点击登录按钮
login_btn = self.driver.find_element(By.XPATH, '//button[contains(text(),"登录")]')
login_btn.click()
# 等待登录成功
self.wait.until(
EC.presence_of_element_located((By.XPATH, '//div[contains(@class,"user-center")]'))
)
logger.info(f"登录成功: {username}")
return True
except Exception as e:
logger.error(f"登录失败: {str(e)}")
return False
def search_product(self, keyword):
"""搜索商品"""
try:
search_input = self.wait.until(
EC.presence_of_element_located((By.XPATH, '//input[@type="search"]'))
)
search_input.clear()
for char in keyword:
search_input.send_keys(char)
time.sleep(random.uniform(0.1, 0.3))
search_input.send_keys(Keys.RETURN)
time.sleep(random.uniform(2, 3))
logger.info(f"成功搜索商品: {keyword}")
return True
except Exception as e:
logger.error(f"搜索商品失败: {str(e)}")
return False
def get_product_list(self, pages=1):
"""获取商品列表"""
products = []
try:
for page in range(pages):
# 滚动页面加载更多商品
self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
time.sleep(random.uniform(3, 5))
# 解析商品列表
soup = BeautifulSoup(self.driver.page_source, 'html.parser')
items = soup.find_all('div', class_='goods-item')
for item in items:
try:
product = {
'title': item.find('div', class_='goods-name').get_text(strip=True),
'price': item.find('span', class_='price').get_text(strip=True),
'sales': item.find('span', class_='sales').get_text(strip=True) if item.find('span', class_='sales') else '0',
'link': 'https://mobile.yangkeduo.com' + item.find('a')['href']
}
products.append(product)
except Exception as e:
logger.warning(f"解析商品失败: {str(e)}")
continue
logger.info(f"已获取第 {page+1} 页商品列表,共 {len(products)} 个商品")
# 如果有下一页则点击
next_page = self.driver.find_elements(By.XPATH, '//a[contains(text(),"下一页")]')
if next_page and page < pages - 1:
next_page[0].click()
time.sleep(random.uniform(3, 5))
return products
except Exception as e:
logger.error(f"获取商品列表失败: {str(e)}")
return products
def place_order(self, product_url, quantity=1):
"""下单商品"""
try:
self.driver.get(product_url)
time.sleep(random.uniform(3, 5))
# 选择规格
spec_btn = self.wait.until(
EC.presence_of_element_located((By.XPATH, '//button[contains(text(),"选择规格")]'))
)
spec_btn.click()
time.sleep(random.uniform(1, 2))
# 随机选择规格
specs = self.driver.find_elements(By.XPATH, '//div[contains(@class,"spec-option")]')
if specs:
random.choice(specs).click()
time.sleep(random.uniform(0.5, 1.5))
# 输入购买数量
qty_input = self.driver.find_element(By.XPATH, '//input[@type="number"]')
qty_input.clear()
qty_input.send_keys(str(quantity))
time.sleep(random.uniform(0.5, 1.5))
# 点击确定
confirm_btn = self.driver.find_element(By.XPATH, '//button[contains(text(),"确定")]')
confirm_btn.click()
time.sleep(random.uniform(1, 2))
# 点击立即购买
buy_btn = self.wait.until(
EC.presence_of_element_located((By.XPATH, '//button[contains(text(),"立即购买")]'))
)
buy_btn.click()
time.sleep(random.uniform(2, 3))
# 提交订单
submit_btn = self.wait.until(
EC.presence_of_element_located((By.XPATH, '//button[contains(text(),"提交订单")]'))
)
submit_btn.click()
time.sleep(random.uniform(3, 5))
logger.info(f"成功下单商品: {product_url}")
return True
except Exception as e:
logger.error(f"下单失败: {str(e)}")
return False
def batch_order(self, keyword, quantity=1, pages=1, accounts=[]):
"""批量下单"""
results = []
# 先搜索商品
if not self.search_product(keyword):
return results
# 获取商品列表
products = self.get_product_list(pages)
if not products:
return results
# 对每个账号下单
for account in accounts:
username, password = account
# 登录
if not self.login(username, password):
continue
# 对每个商品下单
for product in random.sample(products, min(3, len(products))):
result = {
'account': username,
'product': product['title'],
'status': self.place_order(product['link'], quantity)
}
results.append(result)
# 随机间隔
time.sleep(random.uniform(10, 30))
return results
def close(self):
"""关闭浏览器"""
self.driver.quit()
logger.info("浏览器已关闭")
def load_accounts(filename='accounts.csv'):
"""从CSV加载账号"""
accounts = []
try:
with open(filename, mode='r', encoding='utf-8') as file:
reader = csv.DictReader(file)
for row in reader:
accounts.append((row['username'], row['password']))
except Exception as e:
logger.error(f"加载账号失败: {str(e)}")
return accounts
def save_results(results, filename='results.json'):
"""保存结果到JSON"""
try:
with open(filename, mode='w', encoding='utf-8') as file:
json.dump(results, file, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"保存结果失败: {str(e)}")
if name == 'main':
# 示例用法
accounts = load_accounts()
if not accounts:
accounts = [('13800138000', 'password123')] # 示例账号
automation = PDDAutomation(headless=False)
try:
results = automation.batch_order(
keyword='手机',
quantity=1,
pages=2,
accounts=accounts[:3] # 测试前3个账号
)
save_results(results)
logger.info(f"批量下单完成,共处理 {len(results)} 个订单")
except Exception as e:
logger.error(f"主程序出错: {str(e)}")
finally:
automation.close()