环境准备
- RDS MySQL:开通rds.mysql.s2.large(2核4GB)的实例,存储空间100GB
- ECS:推荐4核8GB以上规格
压测脚本
修改DB_CONFIG为RDS MySQL的链接地址和账密。压测约5分钟后,在RDS MySQL实例详情页面,能够观察到有CPU事件推送,点击事件进行一键诊断。
import math import pymysql import random import time import threading from datetime import datetime, timedelta from concurrent.futures import ThreadPoolExecutor import signal import logging import os from logging.handlers import RotatingFileHandler import multiprocessing # ==================== 日志配置 ==================== def setup_logger(): """配置日志系统""" # 创建日志目录 log_dir = "logs" if not os.path.exists(log_dir): os.makedirs(log_dir) # 创建日志文件名(基于当前时间) log_filename = f"{log_dir}/cpu_high_qps_cause_and_slow_log.log" # 配置根日志记录器 logger = logging.getLogger() logger.setLevel(logging.DEBUG) # 设置为DEBUG级别,确保能看到所有日志 # 创建格式化器 formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') # 文件处理器 - 滚动日志,每个文件最大10MB,保留5个备份 file_handler = RotatingFileHandler( log_filename, maxBytes=10 * 1024 * 1024, backupCount=5, encoding='utf-8' ) file_handler.setFormatter(formatter) logger.addHandler(file_handler) # 控制台处理器 console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) logger.addHandler(console_handler) return log_filename # ==================== 数据库配置 ==================== DB_CONFIG = { 'host': 'rm-xxx.mysql.rds.aliyuncs.com', 'port': 3306, 'user': 'you_username', 'password': 'you_password', 'charset': 'utf8mb4', 'autocommit': True, } DATABASE_NAME = 'test_db' def get_connection(use_database=True): config = DB_CONFIG.copy() if use_database: config['database'] = DATABASE_NAME return pymysql.connect(**config) def create_database_if_not_exists(): conn = get_connection(use_database=False) try: with conn.cursor() as cursor: cursor.execute(f"SHOW DATABASES LIKE '{DATABASE_NAME}'") result = cursor.fetchone() if result: # 数据库已存在,先删除 logging.info(f"数据库 {DATABASE_NAME} 已存在,正在删除...") cursor.execute(f"DROP DATABASE {DATABASE_NAME}") logging.info(f"数据库 {DATABASE_NAME} 已删除") # 创建新数据库 cursor.execute(f"CREATE DATABASE {DATABASE_NAME} CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci") logging.info(f"数据库 {DATABASE_NAME} 创建成功") conn.commit() except Exception as e: logging.error(f"检查/创建数据库失败: {e}") conn.rollback() # 可以选择在这里重新尝试或抛出异常 raise e finally: conn.close() def drop_database_if_exists(): """ 删除数据库(如果存在),用于还原数据库状态 注意:此操作会永久删除数据库中的所有数据,请谨慎使用 """ conn = get_connection(use_database=False) try: with conn.cursor() as cursor: cursor.execute(f"SHOW DATABASES LIKE '{DATABASE_NAME}'") result = cursor.fetchone() if result: cursor.execute(f"DROP DATABASE {DATABASE_NAME}") logging.info(f"数据库 {DATABASE_NAME} 已删除") else: logging.info(f"数据库 {DATABASE_NAME} 不存在,无需删除") conn.commit() except Exception as e: logging.error(f"删除数据库失败: {e}") conn.rollback() finally: conn.close() def create_tables(): conn = get_connection() try: with conn.cursor() as cursor: cursor.execute(""" CREATE TABLE IF NOT EXISTS users ( id INT AUTO_INCREMENT PRIMARY KEY, username VARCHAR(100) NOT NULL UNIQUE, email VARCHAR(255) NOT NULL, password_hash VARCHAR(255) NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, last_login TIMESTAMP NULL, status ENUM('active', 'inactive', 'suspended') DEFAULT 'active', INDEX idx_username (username), INDEX idx_email (email), INDEX idx_status (status), INDEX idx_created_at (created_at) ) ENGINE=InnoDB """) cursor.execute(""" CREATE TABLE IF NOT EXISTS products ( id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255) NOT NULL, description TEXT, price DECIMAL(10, 2) NOT NULL, category_id INT NOT NULL, stock_quantity INT DEFAULT 0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, is_active BOOLEAN DEFAULT TRUE, FULLTEXT INDEX idx_product_name (name), INDEX idx_category (category_id), INDEX idx_price (price), INDEX idx_created_at (created_at) ) ENGINE=InnoDB """) cursor.execute(""" CREATE TABLE IF NOT EXISTS orders ( id INT AUTO_INCREMENT PRIMARY KEY, user_id INT NOT NULL, total_amount DECIMAL(10, 2) NOT NULL, status ENUM('pending', 'paid', 'shipped', 'delivered', 'cancelled') DEFAULT 'pending', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, shipping_address TEXT, payment_method ENUM('credit_card', 'paypal', 'bank_transfer') DEFAULT 'credit_card', INDEX idx_user_id (user_id) -- 移除 created_at 和 total_amount 索引,让范围查询在高QPS时变慢 ) ENGINE=InnoDB """) cursor.execute(""" CREATE TABLE IF NOT EXISTS order_items ( id INT AUTO_INCREMENT PRIMARY KEY, order_id INT NOT NULL, product_id INT NOT NULL, quantity INT NOT NULL, price DECIMAL(10, 2) NOT NULL, subtotal DECIMAL(10, 2) NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, INDEX idx_order_id (order_id), INDEX idx_product_id (product_id), INDEX idx_created_at (created_at), FOREIGN KEY (order_id) REFERENCES orders(id) ON DELETE CASCADE, FOREIGN KEY (product_id) REFERENCES products(id) ON DELETE CASCADE ) ENGINE=InnoDB """) cursor.execute(""" CREATE TABLE IF NOT EXISTS user_activity_logs ( id INT AUTO_INCREMENT PRIMARY KEY, user_id INT NOT NULL, activity_type VARCHAR(100) NOT NULL, description TEXT, ip_address VARCHAR(45), user_agent TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, INDEX idx_user_id (user_id), INDEX idx_activity_type (activity_type) -- 移除 created_at 索引,让时间范围查询在高QPS时变慢 ) ENGINE=InnoDB """) cursor.execute(""" CREATE TABLE IF NOT EXISTS product_reviews ( id INT AUTO_INCREMENT PRIMARY KEY, product_id INT NOT NULL, user_id INT NOT NULL, rating TINYINT NOT NULL CHECK (rating >= 1 AND rating <= 5), title VARCHAR(255), comment TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, is_approved BOOLEAN DEFAULT FALSE, INDEX idx_product_id (product_id), INDEX idx_user_id (user_id), INDEX idx_rating (rating), INDEX idx_created_at (created_at), FULLTEXT INDEX idx_comment (comment), FOREIGN KEY (product_id) REFERENCES products(id) ON DELETE CASCADE, FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE ) ENGINE=InnoDB """) cursor.execute(""" CREATE TABLE IF NOT EXISTS categories ( id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(100) NOT NULL, parent_id INT DEFAULT NULL, description TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, INDEX idx_parent_id (parent_id), FOREIGN KEY (parent_id) REFERENCES categories(id) ON DELETE SET NULL ) ENGINE=InnoDB """) logging.info("所有表创建成功") conn.commit() except Exception as e: logging.error(f"创建表失败: {e}") conn.rollback() finally: conn.close() # ==================== 数据填充函数(大幅增加数据量)==================== def generate_users(num_users=50000): """生成更多用户数据""" conn = get_connection() try: with conn.cursor() as cursor: batch_size = 5000 for i in range(0, num_users, batch_size): values = [ (f"user{i + j}", f"user{i + j}@example.com", f"hash{random.randint(100000, 999999)}") for j in range(batch_size) ] cursor.executemany( "INSERT IGNORE INTO users (username, email, password_hash) VALUES (%s, %s, %s)", values ) conn.commit() logging.info(f"已插入 {min(i + batch_size, num_users)}/{num_users} 用户") logging.info("用户数据插入完成") except Exception as e: logging.error(f"插入用户数据失败: {e}") conn.rollback() finally: conn.close() def generate_categories(): conn = get_connection() try: with conn.cursor() as cursor: categories = [ ('Electronics', None, 'Electronic devices and accessories'), ('Computers', None, 'Computers and laptops'), ('Smartphones', 1, 'Smartphones and mobile devices'), ('Laptops', 2, 'Laptops and notebooks'), ('Tablets', 1, 'Tablets and iPads'), ('Cameras', 1, 'Digital cameras and accessories'), ('TVs', 1, 'Televisions and home theater'), ('Books', None, 'Books and magazines'), ('Fiction', 8, 'Fiction books'), ('Non-Fiction', 8, 'Non-fiction books'), ('Clothing', None, 'Clothing and apparel'), ('Men', 11, "Men's clothing"), ('Women', 11, "Women's clothing"), ('Shoes', 11, 'Shoes and footwear'), ('Home', None, 'Home and kitchen'), ('Furniture', 15, 'Furniture'), ('Kitchen', 15, 'Kitchen appliances'), ('Garden', 15, 'Garden and outdoor'), ] cursor.executemany( "INSERT IGNORE INTO categories (name, parent_id, description) VALUES (%s, %s, %s)", categories ) conn.commit() logging.info("分类数据插入完成") except Exception as e: logging.error(f"插入分类数据失败: {e}") conn.rollback() finally: conn.close() def generate_products(num_products=20000): """生成更多产品数据""" conn = get_connection() try: with conn.cursor() as cursor: cursor.execute("SELECT id FROM categories") category_ids = [row[0] for row in cursor.fetchall()] batch_size = 2000 product_names = ["Pro", "Elite", "Premium", "Advanced", "Standard", "Basic", "Deluxe", "Ultimate"] product_types = ["Laptop", "Smartphone", "Tablet", "Camera", "TV", "Monitor", "Headphones", "Speaker"] for i in range(0, num_products, batch_size): values = [] for _ in range(batch_size): name = f"{random.choice(product_names)} {random.choice(product_types)} {random.randint(1, 10000)}" price = round(random.uniform(10, 2000), 2) values.append( (name, f"Description of {name}", price, random.choice(category_ids), random.randint(0, 1000))) cursor.executemany( "INSERT IGNORE INTO products (name, description, price, category_id, stock_quantity) VALUES (%s, %s, %s, %s, %s)", values ) conn.commit() logging.info(f"已插入 {min(i + batch_size, num_products)}/{num_products} 产品") logging.info("产品数据插入完成") except Exception as e: logging.error(f"插入产品数据失败: {e}") conn.rollback() finally: conn.close() def generate_orders(num_orders=100_000): """生成大量订单""" conn = get_connection() try: with conn.cursor() as cursor: cursor.execute("SELECT id FROM users") user_ids = [row[0] for row in cursor.fetchall()] cursor.execute("SELECT id, price FROM products") products = {row[0]: row[1] for row in cursor.fetchall()} product_ids = list(products.keys()) batch_size = 10000 start_date = datetime.now() - timedelta(days=365) for i in range(0, num_orders, batch_size): values = [] for _ in range(batch_size): user_id = random.choice(user_ids) total = round(sum(products[random.choice(product_ids)] * random.randint(1, 3) for _ in range(3)), 2) created_at = start_date + timedelta( days=random.randint(0, 365), hours=random.randint(0, 23), minutes=random.randint(0, 59) ) values.append(( user_id, total, random.choice(['pending', 'paid', 'shipped', 'delivered', 'cancelled']), created_at, created_at, 'Address', 'credit_card')) cursor.executemany( "INSERT INTO orders (user_id, total_amount, status, created_at, updated_at, shipping_address, payment_method) VALUES (%s, %s, %s, %s, %s, %s, %s)", values ) conn.commit() logging.info(f"已插入 {min(i + batch_size, num_orders)}/{num_orders} 订单") logging.info("订单数据插入完成") except Exception as e: logging.error(f"插入订单数据失败: {e}") conn.rollback() finally: conn.close() def generate_order_items(): """生成订单项""" conn = get_connection() try: with conn.cursor() as cursor: cursor.execute("SELECT id FROM orders") order_ids = [row[0] for row in cursor.fetchall()] cursor.execute("SELECT id, price FROM products") products = {row[0]: row[1] for row in cursor.fetchall()} batch_size = 50000 values = [] for order_id in order_ids: for _ in range(random.randint(1, 5)): product_id = random.choice(list(products.keys())) qty = random.randint(1, 3) price = products[product_id] values.append((order_id, product_id, qty, price, qty * price, datetime.now())) if len(values) >= batch_size: cursor.executemany( "INSERT INTO order_items (order_id, product_id, quantity, price, subtotal, created_at) VALUES (%s, %s, %s, %s, %s, %s)", values ) conn.commit() logging.info(f"已插入 {len(values)} 订单项") values = [] if values: cursor.executemany( "INSERT INTO order_items (order_id, product_id, quantity, price, subtotal, created_at) VALUES (%s, %s, %s, %s, %s, %s)", values ) conn.commit() logging.info(f"已插入 {len(values)} 订单项") logging.info("订单项数据插入完成") except Exception as e: logging.error(f"插入订单项数据失败: {e}") conn.rollback() finally: conn.close() def generate_product_reviews(num_reviews=200000): """生成大量评论""" conn = get_connection() try: with conn.cursor() as cursor: cursor.execute("SELECT id FROM users") user_ids = [row[0] for row in cursor.fetchall()] cursor.execute("SELECT id FROM products") product_ids = [row[0] for row in cursor.fetchall()] batch_size = 10000 titles = ["Great!", "Good", "Ok", "Bad", "Excellent"] comments = ["Good product.", "Nice.", "Could be better.", "Excellent quality."] for i in range(0, num_reviews, batch_size): values = [ ( random.choice(product_ids), random.choice(user_ids), random.randint(1, 5), random.choice(titles), random.choice(comments), datetime.now() - timedelta(days=random.randint(0, 180)), datetime.now() - timedelta(days=random.randint(0, 180)), random.random() > 0.2 ) for _ in range(batch_size) ] cursor.executemany( "INSERT INTO product_reviews (product_id, user_id, rating, title, comment, created_at, updated_at, is_approved) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)", values ) conn.commit() logging.info(f"已插入 {min(i + batch_size, num_reviews)}/{num_reviews} 评论") logging.info("产品评论数据插入完成") except Exception as e: logging.error(f"插入评论失败: {e}") conn.rollback() finally: conn.close() def generate_user_activity_logs(num_logs=500000): """生成大量日志""" conn = get_connection() try: with conn.cursor() as cursor: cursor.execute("SELECT id FROM users") user_ids = [row[0] for row in cursor.fetchall()] batch_size = 50000 types = ['login', 'view_product', 'add_to_cart', 'purchase'] ips = ['192.168.1.1', '10.0.0.1', '172.16.0.1'] agents = ['Mozilla/5.0', 'iPhone', 'Android'] for i in range(0, num_logs, batch_size): values = [ ( random.choice(user_ids), random.choice(types), f"Action: {random.choice(types)}", random.choice(ips), random.choice(agents), datetime.now() - timedelta(days=random.randint(0, 90)) ) for _ in range(batch_size) ] cursor.executemany( "INSERT INTO user_activity_logs (user_id, activity_type, description, ip_address, user_agent, created_at) VALUES (%s, %s, %s, %s, %s, %s)", values ) conn.commit() logging.info(f"已插入 {min(i + batch_size, num_logs)}/{num_logs} 日志") logging.info("日志数据插入完成") except Exception as e: logging.exception(f"插入日志失败: {e}") conn.rollback() finally: conn.close() def populate_database(): # 否则执行数据插入逻辑 logging.info("开始填充数据...") generate_users(50_000) generate_categories() generate_products(20_000) generate_orders(100_000) generate_order_items() generate_product_reviews(200_000) generate_user_activity_logs(500_000) logging.info("数据填充完成") # ==================== 查询函数(快速查询,产生高QPS)==================== def normal_queries(query_type): """快速查询,产生高QPS,增加CPU压力""" # 简化:每个线程一个长连接 conn = get_connection() cursor = conn.cursor(pymysql.cursors.DictCursor) try: query_type = query_type if query_type <= 10 else query_type % 10 while True: start_time = time.time() try: if query_type == 1: user_id = random.randint(1, 50000) cursor.execute("SELECT id, username, status FROM users WHERE id = %s", (user_id,)) cursor.fetchone() elif query_type == 2: cursor.execute("SELECT id, username FROM users WHERE status = %s LIMIT 10", (random.choice(['active', 'inactive']),)) cursor.fetchall() elif query_type == 3: order_id = random.randint(1, 100000) cursor.execute( "SELECT o.id, o.total_amount, o.status, u.username FROM orders o JOIN users u ON o.user_id = u.id WHERE o.id = %s", (order_id,)) cursor.fetchone() elif query_type == 4: keyword = random.choice(['Pro', 'Smart', 'Laptop']) cursor.execute( "SELECT id, name, price FROM products WHERE MATCH(name) AGAINST (%s IN NATURAL LANGUAGE MODE) LIMIT 5", (keyword,)) cursor.fetchall() elif query_type == 5: product_id = random.randint(1, 20000) cursor.execute("SELECT id, name, price, category_id FROM products WHERE id = %s", (product_id,)) cursor.fetchone() elif query_type == 6: username = f"user{random.randint(0, 49999)}" cursor.execute("SELECT id, username, email FROM users WHERE username = %s", (username,)) cursor.fetchone() elif query_type == 7: order_item_id = random.randint(1, 300000) cursor.execute("SELECT id, order_id, product_id, quantity FROM order_items WHERE id = %s", (order_item_id,)) cursor.fetchone() elif query_type == 8: # 简单的JOIN查询,增加CPU压力 user_id = random.randint(1, 50000) cursor.execute( "SELECT u.id, u.username, COUNT(o.id) as order_count FROM users u LEFT JOIN orders o ON u.id = o.user_id WHERE u.id = %s GROUP BY u.id, u.username", (user_id,)) cursor.fetchone() elif query_type == 9: # 简单的聚合查询,增加CPU压力 cursor.execute( "SELECT status, COUNT(*) as cnt FROM orders WHERE status IN ('cancelled') GROUP BY status LIMIT 5") cursor.fetchall() elif query_type == 10: # 简单的范围查询,增加CPU压力 product_id = random.randint(1, 20000) cursor.execute("SELECT id, name, price FROM products WHERE id BETWEEN %s AND %s LIMIT 10", (product_id, product_id + 100)) cursor.fetchall() except Exception as e: try: conn.rollback() except: pass continue finally: end_time = time.time() if end_time - start_time > 2: logging.info(f"查询类型: {query_type}, 耗时: {end_time - start_time:.2f}s") finally: cursor.close() conn.close() def start_worker_process(concurrent_tasks): """工作进程函数,创建指定数量的线程执行任务""" logging.info(f"工作进程启动,创建 {concurrent_tasks} 个线程") with ThreadPoolExecutor(max_workers=concurrent_tasks) as executor: # 提交normal_queries任务 futures = [executor.submit(normal_queries, i) for i in range(concurrent_tasks)] # 等待所有任务完成 for future in futures: try: future.result() except Exception as e: logging.error(f"任务执行出错: {e}") # ==================== 主程序 ==================== if __name__ == "__main__": # 设置日志 log_file = setup_logger() logging.info(f"日志文件已创建: {log_file}") logging.info("可以使用 'tail -f {log_file}' 命令实时查看日志") exit_signal = False def signal_handler(signum, frame): global exit_signal logging.info("检测到退出信号,程序将终止...") exit() signal.signal(signal.SIGINT, signal_handler) is_create = 'y' if is_create == 'y': logging.info("########## 开始创建数据库数据... ##########") create_database_if_not_exists() # 总是创建表,确保表存在 logging.info("########## 开始创建表... ##########") create_tables() logging.info("########## 开始填充数据... ##########") populate_database() # 填充大数据量 logging.info("########## 开始准备进行场景构造... ##########") # 计算需要的进程数, mysql rds.mysql.s2.large规格,400并发 = 61% cpu_usage_ratio = 0.9 total_concurrent = math.ceil(cpu_usage_ratio / (0.61/400)) concurrency_per_process = 200 num_processes = (total_concurrent + concurrency_per_process - 1) // concurrency_per_process # 向上取整 logging.info(f"将启动 {num_processes} 个进程,每个进程处理 {concurrency_per_process} 个并发任务,目标CPU使用率 {cpu_usage_ratio*100}%") # 创建并启动多个进程 processes = [] for i in range(num_processes): # 计算当前进程的并发数 current_concurrency = min(concurrency_per_process, total_concurrent - i * concurrency_per_process) # 创建进程 p = multiprocessing.Process(target=start_worker_process, args=(current_concurrency,)) p.start() processes.append(p) logging.info(f"已启动进程 {p.pid},处理 {current_concurrency} 个并发任务") try: # 监控进程运行 while not exit_signal: logging.info(f"加压进行中,当前时间是:{time.strftime('%Y-%m-%d %H:%M:%S')}") time.sleep(5) # 终止所有进程 for p in processes: p.terminate() p.join() logging.info("所有进程已终止,程序结束") except KeyboardInterrupt: logging.info("收到键盘中断信号,正在终止所有进程...") for p in processes: p.terminate() p.join() logging.info("所有进程已终止")
👉欢迎 钉钉搜索群号:106730017609 入群交流
👉立即上手:登录云数据库RDS控制台
👉查看更多文档:RDS AI助手
👉「RDS AI助手」现在免费公测中,欢迎留下您的诉求:https://survey.aliyun.com/apps/zhiliao/m3RVhe0m4