下载地址:https://www.pan38.com/yun/share.php?code=JCnzE 提取密码:1133
这个代码实现了一个完整的淘宝私信群发工具,包含登录、发送单条消息和批量发送功能。使用时需要替换USERNAME、PASSWORD和RECEIVER_ID为实际值。代码中添加了随机等待时间模拟人工操作,避免被平台检测为机器人。
import time
import random
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import NoSuchElementException, TimeoutException
class TaobaoMassMessenger:
def init(self, username, password):
self.username = username
self.password = password
self.driver = webdriver.Chrome()
self.wait = WebDriverWait(self.driver, 20)
def login(self):
"""登录淘宝账号"""
self.driver.get("https://login.taobao.com/")
# 切换到密码登录
try:
self.wait.until(EC.element_to_be_clickable(
(By.XPATH, '//*[@id="login"]/div[1]/i'))).click()
except TimeoutException:
pass
# 输入用户名和密码
self.wait.until(EC.presence_of_element_located(
(By.ID, "fm-login-id"))).send_keys(self.username)
self.wait.until(EC.presence_of_element_located(
(By.ID, "fm-login-password"))).send_keys(self.password)
# 点击登录按钮
self.wait.until(EC.element_to_be_clickable(
(By.CLASS_NAME, "password-login"))).click()
# 等待登录成功
self.wait.until(EC.url_contains("taobao.com"))
def send_message(self, receiver_id, message):
"""发送私信给指定用户"""
# 打开私信页面
self.driver.get(f"https://amos.alicdn.com/msg.aw?v=2&uid={receiver_id}")
try:
# 等待私信框加载
self.wait.until(EC.presence_of_element_located(
(By.ID, "J_Textarea")))
# 输入消息内容
textarea = self.driver.find_element(By.ID, "J_Textarea")
textarea.clear()
textarea.send_keys(message)
# 点击发送按钮
send_btn = self.driver.find_element(By.ID, "J_SendBtn")
send_btn.click()
# 随机等待1-3秒模拟人工操作
time.sleep(random.uniform(1, 3))
return True
except (NoSuchElementException, TimeoutException) as e:
print(f"发送给 {receiver_id} 失败: {str(e)}")
return False
def batch_send(self, receiver_ids, messages):
"""批量发送私信"""
success_count = 0
for i, receiver_id in enumerate(receiver_ids):
message = random.choice(messages) if isinstance(messages, list) else messages
print(f"正在发送第 {i+1}/{len(receiver_ids)} 条消息给 {receiver_id}")
if self.send_message(receiver_id, message):
success_count += 1
# 每发送10条休息5-10秒
if (i+1) % 10 == 0:
sleep_time = random.uniform(5, 10)
print(f"已发送10条,休息{sleep_time:.1f}秒...")
time.sleep(sleep_time)
print(f"发送完成,成功 {success_count}/{len(receiver_ids)}")
def close(self):
"""关闭浏览器"""
self.driver.quit()
if name == "main":
# 配置信息
USERNAME = "your_taobao_username"
PASSWORD = "your_taobao_password"
# 接收者ID列表(可从淘宝商品页面获取)
RECEIVER_IDS = [
"example_user1",
"example_user2",
"example_user3"
]
# 消息内容(可以是单个消息或消息列表)
MESSAGES = [
"您好,我是XX店铺的客服,我们店铺正在做活动,全场5折起,欢迎来看看哦!",
"亲,您在我们店铺收藏的商品现在有优惠啦,点击查看:https://example.com",
"尊敬的顾客,感谢您关注我们店铺,新用户首单立减10元!"
]
# 创建实例并运行
messenger = TaobaoMassMessenger(USERNAME, PASSWORD)
try:
messenger.login()
messenger.batch_send(RECEIVER_IDS, MESSAGES)
finally:
messenger.close()
import logging
from datetime import datetime
def setup_logger():
logger = logging.getLogger('taobao_messenger')
logger.setLevel(logging.INFO)
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s')
# 文件日志
file_handler = logging.FileHandler(
f'logs/taobao_msg_{datetime.now().strftime("%Y%m%d")}.log')
file_handler.setFormatter(formatter)
# 控制台日志
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.addHandler(console_handler)
return logger
import random
import time
import configparser
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
class ConfigLoader:
@staticmethod
def load_config():
config = configparser.ConfigParser()
config.read('config.ini')
return config
class BrowserUtils:
@staticmethod
def random_wait(driver, element_locator, max_wait=10):
try:
WebDriverWait(driver, max_wait).until(
EC.presence_of_element_located(element_locator))
time.sleep(random.uniform(0.5, 2))
return True
except Exception:
return False
@staticmethod
def human_like_typing(element, text):
for char in text:
element.send_keys(char)
time.sleep(random.uniform(0.1, 0.3))
import csv
import random
from typing import List
class ReceiverManager:
@staticmethod
def load_receivers_from_file(filename: str) -> List[str]:
with open(filename, 'r', encoding='utf-8') as f:
return [line.strip() for line in f if line.strip()]
@staticmethod
def save_receivers_to_file(filename: str, receivers: List[str]):
with open(filename, 'w', encoding='utf-8') as f:
f.write('\n'.join(receivers))
@staticmethod
def shuffle_receivers(receivers: List[str]) -> List[str]:
return random.sample(receivers, len(receivers))
@staticmethod
def filter_active_receivers(receivers: List[str]) -> List[str]:
# 可添加逻辑过滤无效接收者
return receivers