下载地址:https://www.pan38.com/share.php?code=pvvmX 提取码:1800
代码功能说明:
使用Selenium模拟浏览器操作实现淘宝/阿里旺旺自动登录和私信发送
支持从JSON文件加载消息模板和联系人列表
包含随机延迟机制避免被检测为自动化脚本
实现多级异常处理和重试机制提高稳定性
支持个性化消息模板,可插入联系人姓名等变量
提供详细的运行日志和统计信息
使用Chrome浏览器隐身模式避免cookie干扰
import time
import random
import json
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import NoSuchElementException, TimeoutException
class TaobaoMassMessage:
def init(self):
self.driver = None
self.logged_in = False
self.message_templates = []
self.contact_list = []
self.config = {
'login_url': 'https://login.taobao.com/',
'message_url': 'https://msg.taobao.com/',
'wait_time': 15,
'delay_range': (3, 8),
'max_retries': 3
}
def init_driver(self):
options = webdriver.ChromeOptions()
options.add_argument('--disable-blink-features=AutomationControlled')
options.add_experimental_option('excludeSwitches', ['enable-automation'])
options.add_argument('--start-maximized')
self.driver = webdriver.Chrome(options=options)
self.driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
def login(self, username, password):
if not self.driver:
self.init_driver()
for attempt in range(self.config['max_retries']):
try:
self.driver.get(self.config['login_url'])
WebDriverWait(self.driver, self.config['wait_time']).until(
EC.presence_of_element_located((By.ID, 'fm-login-id'))
)
# 切换到账号密码登录
switch_btn = self.driver.find_element(By.XPATH, '//*[@id="login"]/div[1]/i')
switch_btn.click()
# 输入用户名
username_input = self.driver.find_element(By.ID, 'fm-login-id')
username_input.clear()
username_input.send_keys(username)
# 输入密码
password_input = self.driver.find_element(By.ID, 'fm-login-password')
password_input.clear()
password_input.send_keys(password)
# 点击登录
login_btn = self.driver.find_element(By.XPATH, '//*[@id="login-form"]/div[4]/button')
login_btn.click()
# 等待登录成功
WebDriverWait(self.driver, self.config['wait_time']).until(
lambda driver: 'taobao.com' in driver.current_url
)
self.logged_in = True
print("登录成功")
return True
except Exception as e:
print(f"登录尝试 {attempt + 1} 失败: {str(e)}")
if attempt == self.config['max_retries'] - 1:
print("达到最大重试次数,登录失败")
return False
time.sleep(5)
continue
def load_message_templates(self, file_path):
try:
with open(file_path, 'r', encoding='utf-8') as f:
self.message_templates = json.load(f)
print(f"成功加载 {len(self.message_templates)} 条消息模板")
return True
except Exception as e:
print(f"加载消息模板失败: {str(e)}")
return False
def load_contact_list(self, file_path):
try:
with open(file_path, 'r', encoding='utf-8') as f:
self.contact_list = json.load(f)
print(f"成功加载 {len(self.contact_list)} 个联系人")
return True
except Exception as e:
print(f"加载联系人列表失败: {str(e)}")
return False
def random_delay(self):
delay = random.uniform(*self.config['delay_range'])
time.sleep(delay)
def send_single_message(self, contact, message):
if not self.logged_in:
print("请先登录")
return False
for attempt in range(self.config['max_retries']):
try:
# 打开消息页面
self.driver.get(f"{self.config['message_url']}sendmsg.htm?toUserId={contact['user_id']}")
# 等待消息框加载
message_box = WebDriverWait(self.driver, self.config['wait_time']).until(
EC.presence_of_element_located((By.ID, 'J_Message'))
)
# 清空可能存在的默认内容
message_box.clear()
self.random_delay()
# 输入消息
message_box.send_keys(message)
self.random_delay()
# 点击发送
send_btn = WebDriverWait(self.driver, self.config['wait_time']).until(
EC.element_to_be_clickable((By.ID, 'J_Send'))
)
send_btn.click()
self.random_delay()
print(f"已发送消息给 {contact['name']}")
return True
except Exception as e:
print(f"发送消息给 {contact['name']} 失败 (尝试 {attempt + 1}): {str(e)}")
if attempt == self.config['max_retries'] - 1:
return False
time.sleep(5)
continue
def send_mass_messages(self):
if not self.message_templates or not self.contact_list:
print("请先加载消息模板和联系人列表")
return False
success_count = 0
failure_count = 0
for contact in self.contact_list:
# 随机选择一条消息模板
template = random.choice(self.message_templates)
message = template['content'].format(name=contact['name'])
if self.send_single_message(contact, message):
success_count += 1
else:
failure_count += 1
# 随机延迟,避免触发反爬
self.random_delay()
print(f"\n发送完成: 成功 {success_count} 条, 失败 {failure_count} 条")
return True
def close(self):
if self.driver:
self.driver.quit()
print("浏览器已关闭")
def main():
bot = TaobaoMassMessage()
try:
# 加载配置
if not bot.load_message_templates('message_templates.json'):
return
if not bot.load_contact_list('contact_list.json'):
return
# 登录
username = input("请输入淘宝账号: ")
password = input("请输入密码: ")
if not bot.login(username, password):
return
# 开始发送
print("\n开始批量发送消息...")
bot.send_mass_messages()
except KeyboardInterrupt:
print("\n用户中断操作")
except Exception as e:
print(f"程序运行出错: {str(e)}")
finally:
bot.close()
if name == "main":
main()