下载地址:https://www.pan38.com/dow/share.php?code=JCnzE 提取密码:1133
这个短信群发系统包含以下核心功能:
随机手机号生成器(支持中国号码)
批量短信发送功能(使用Twilio API)
数据库管理模块(存储号码和发送记录)
多线程支持(可扩展)
使用前需要注册Twilio账号并配置ACCOUNT_SID、AUTH_TOKEN和FROM_NUMBER。系统会自动创建SQLite数据库存储号码和发送记录。
import random
import time
import threading
import sqlite3
from twilio.rest import Client
class PhoneNumberGenerator:
def init(self, country_code="86"):
self.country_code = country_code
self.operators = ["130", "131", "132", "133", "134", "135", "136", "137", "138", "139",
"150", "151", "152", "153", "155", "156", "157", "158", "159",
"180", "181", "182", "183", "184", "185", "186", "187", "188", "189"]
def generate_random_number(self, count=1):
numbers = []
for _ in range(count):
prefix = random.choice(self.operators)
suffix = ''.join([str(random.randint(0, 9)) for _ in range(8)])
numbers.append(f"{self.country_code}{prefix}{suffix}")
return numbers if count > 1 else numbers[0]
class SMSSender:
def init(self, account_sid, auth_token, from_number):
self.client = Client(account_sid, auth_token)
self.from_number = from_number
def send_single_sms(self, to_number, message):
try:
message = self.client.messages.create(
body=message,
from_=self.from_number,
to=to_number
)
return {"status": "success", "message_sid": message.sid}
except Exception as e:
return {"status": "failed", "error": str(e)}
def send_bulk_sms(self, numbers, message, delay=1):
results = []
for number in numbers:
result = self.send_single_sms(number, message)
results.append({"number": number, "result": result})
time.sleep(delay)
return results
class DatabaseManager:
def init(self, db_name="sms_system.db"):
self.conn = sqlite3.connect(db_name)
self.create_tables()
def create_tables(self):
cursor = self.conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS phone_numbers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
number TEXT UNIQUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS sms_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
number TEXT,
message TEXT,
status TEXT,
message_sid TEXT,
error TEXT,
sent_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
self.conn.commit()
def save_numbers(self, numbers):
cursor = self.conn.cursor()
for number in numbers:
try:
cursor.execute("INSERT INTO phone_numbers (number) VALUES (?)", (number,))
except sqlite3.IntegrityError:
continue
self.conn.commit()
def log_sms(self, number, message, status, message_sid=None, error=None):
cursor = self.conn.cursor()
cursor.execute("""
INSERT INTO sms_logs (number, message, status, message_sid, error)
VALUES (?, ?, ?, ?, ?)
""", (number, message, status, message_sid, error))
self.conn.commit()
class SMSSystem:
def init(self, twilio_account_sid, twilio_auth_token, twilio_from_number):
self.number_generator = PhoneNumberGenerator()
self.sms_sender = SMSSender(twilio_account_sid, twilio_auth_token, twilio_from_number)
self.db_manager = DatabaseManager()
def generate_and_save_numbers(self, count=100):
numbers = self.number_generator.generate_random_number(count)
self.db_manager.save_numbers(numbers)
return numbers
def send_bulk_messages(self, message, batch_size=50, delay=1):
cursor = self.db_manager.conn.cursor()
cursor.execute("SELECT number FROM phone_numbers")
all_numbers = [row[0] for row in cursor.fetchall()]
results = []
for i in range(0, len(all_numbers), batch_size):
batch = all_numbers[i:i+batch_size]
batch_results = self.sms_sender.send_bulk_sms(batch, message, delay)
for result in batch_results:
number = result["number"]
res = result["result"]
if res["status"] == "success":
self.db_manager.log_sms(number, message, "success", res["message_sid"])
else:
self.db_manager.log_sms(number, message, "failed", error=res["error"])
results.extend(batch_results)
time.sleep(5) # 每批之间增加延迟
return results
if name == "main":
# 配置Twilio账户信息
ACCOUNT_SID = "your_account_sid"
AUTH_TOKEN = "your_auth_token"
FROM_NUMBER = "+1234567890" # 你的Twilio号码
system = SMSSystem(ACCOUNT_SID, AUTH_TOKEN, FROM_NUMBER)
# 生成100个随机号码并保存
print("正在生成随机号码...")
numbers = system.generate_and_save_numbers(100)
print(f"已生成并保存 {len(numbers)} 个号码")
# 发送批量短信
print("开始发送批量短信...")
message = "您好,这是测试短信。感谢您的关注!"
results = system.send_bulk_messages(message, batch_size=10, delay=2)
# 统计结果
success_count = sum(1 for r in results if r["result"]["status"] == "success")
print(f"短信发送完成,成功: {success_count}, 失败: {len(results)-success_count}")