RDS AI助手深度测评-场景4:CPU压测指引

本文涉及的产品
RDS MySQL DuckDB 分析主实例,集群系列 4核8GB
RDS AI 助手,专业版
简介: 阿里云数据库RDS「RDS AI助手」正式上线啦!用聊天的方式,帮你搞定 信息查询、问题诊断、慢SQL优化、实例巡检 等一系列数据库运维任务,更能随你调遣,定制最贴近你业务的个性化Agent!现在免费公测中,欢迎留下您的诉求:https://survey.aliyun.com/apps/zhiliao/m3RVhe0m4

环境准备

  • RDS MySQL:开通rds.mysql.s2.large(2核4GB)的实例,存储空间100GB
  • ECS:推荐4核8GB以上规格

压测脚本

修改DB_CONFIG为RDS MySQL的链接地址和账密。压测约5分钟后,在RDS MySQL实例详情页面,能够观察到有CPU事件推送,点击事件进行一键诊断。

import math
import pymysql
import random
import time
import threading
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor
import signal
import logging
import os
from logging.handlers import RotatingFileHandler
import multiprocessing
# ==================== 日志配置 ====================
def setup_logger():
    """配置日志系统"""
    # 创建日志目录
    log_dir = "logs"
    if not os.path.exists(log_dir):
        os.makedirs(log_dir)
    # 创建日志文件名(基于当前时间)
    log_filename = f"{log_dir}/cpu_high_qps_cause_and_slow_log.log"
    # 配置根日志记录器
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)  # 设置为DEBUG级别,确保能看到所有日志
    # 创建格式化器
    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    # 文件处理器 - 滚动日志,每个文件最大10MB,保留5个备份
    file_handler = RotatingFileHandler(
        log_filename, maxBytes=10 * 1024 * 1024, backupCount=5, encoding='utf-8'
    )
    file_handler.setFormatter(formatter)
    logger.addHandler(file_handler)
    # 控制台处理器
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(formatter)
    logger.addHandler(console_handler)
    return log_filename
# ==================== 数据库配置 ====================
DB_CONFIG = {
    'host': 'rm-xxx.mysql.rds.aliyuncs.com',
    'port': 3306,
    'user': 'you_username',
    'password': 'you_password',
    'charset': 'utf8mb4',
    'autocommit': True,
}
DATABASE_NAME = 'test_db'
def get_connection(use_database=True):
    config = DB_CONFIG.copy()
    if use_database:
        config['database'] = DATABASE_NAME
    return pymysql.connect(**config)
def create_database_if_not_exists():
    conn = get_connection(use_database=False)
    try:
        with conn.cursor() as cursor:
            cursor.execute(f"SHOW DATABASES LIKE '{DATABASE_NAME}'")
            result = cursor.fetchone()
            if result:
                # 数据库已存在,先删除
                logging.info(f"数据库 {DATABASE_NAME} 已存在,正在删除...")
                cursor.execute(f"DROP DATABASE {DATABASE_NAME}")
                logging.info(f"数据库 {DATABASE_NAME} 已删除")
            # 创建新数据库
            cursor.execute(f"CREATE DATABASE {DATABASE_NAME} CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci")
            logging.info(f"数据库 {DATABASE_NAME} 创建成功")
        conn.commit()
    except Exception as e:
        logging.error(f"检查/创建数据库失败: {e}")
        conn.rollback()
        # 可以选择在这里重新尝试或抛出异常
        raise e
    finally:
        conn.close()
def drop_database_if_exists():
    """
    删除数据库(如果存在),用于还原数据库状态
    注意:此操作会永久删除数据库中的所有数据,请谨慎使用
    """
    conn = get_connection(use_database=False)
    try:
        with conn.cursor() as cursor:
            cursor.execute(f"SHOW DATABASES LIKE '{DATABASE_NAME}'")
            result = cursor.fetchone()
            if result:
                cursor.execute(f"DROP DATABASE {DATABASE_NAME}")
                logging.info(f"数据库 {DATABASE_NAME} 已删除")
            else:
                logging.info(f"数据库 {DATABASE_NAME} 不存在,无需删除")
        conn.commit()
    except Exception as e:
        logging.error(f"删除数据库失败: {e}")
        conn.rollback()
    finally:
        conn.close()
def create_tables():
    conn = get_connection()
    try:
        with conn.cursor() as cursor:
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS users (
                    id INT AUTO_INCREMENT PRIMARY KEY,
                    username VARCHAR(100) NOT NULL UNIQUE,
                    email VARCHAR(255) NOT NULL,
                    password_hash VARCHAR(255) NOT NULL,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                    last_login TIMESTAMP NULL,
                    status ENUM('active', 'inactive', 'suspended') DEFAULT 'active',
                    INDEX idx_username (username),
                    INDEX idx_email (email),
                    INDEX idx_status (status),
                    INDEX idx_created_at (created_at)
                ) ENGINE=InnoDB
            """)
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS products (
                    id INT AUTO_INCREMENT PRIMARY KEY,
                    name VARCHAR(255) NOT NULL,
                    description TEXT,
                    price DECIMAL(10, 2) NOT NULL,
                    category_id INT NOT NULL,
                    stock_quantity INT DEFAULT 0,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                    is_active BOOLEAN DEFAULT TRUE,
                    FULLTEXT INDEX idx_product_name (name),
                    INDEX idx_category (category_id),
                    INDEX idx_price (price),
                    INDEX idx_created_at (created_at)
                ) ENGINE=InnoDB
            """)
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS orders (
                    id INT AUTO_INCREMENT PRIMARY KEY,
                    user_id INT NOT NULL,
                    total_amount DECIMAL(10, 2) NOT NULL,
                    status ENUM('pending', 'paid', 'shipped', 'delivered', 'cancelled') DEFAULT 'pending',
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                    shipping_address TEXT,
                    payment_method ENUM('credit_card', 'paypal', 'bank_transfer') DEFAULT 'credit_card',
                    INDEX idx_user_id (user_id)
                    -- 移除 created_at 和 total_amount 索引,让范围查询在高QPS时变慢
                ) ENGINE=InnoDB
            """)
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS order_items (
                    id INT AUTO_INCREMENT PRIMARY KEY,
                    order_id INT NOT NULL,
                    product_id INT NOT NULL,
                    quantity INT NOT NULL,
                    price DECIMAL(10, 2) NOT NULL,
                    subtotal DECIMAL(10, 2) NOT NULL,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    INDEX idx_order_id (order_id),
                    INDEX idx_product_id (product_id),
                    INDEX idx_created_at (created_at),
                    FOREIGN KEY (order_id) REFERENCES orders(id) ON DELETE CASCADE,
                    FOREIGN KEY (product_id) REFERENCES products(id) ON DELETE CASCADE
                ) ENGINE=InnoDB
            """)
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS user_activity_logs (
                    id INT AUTO_INCREMENT PRIMARY KEY,
                    user_id INT NOT NULL,
                    activity_type VARCHAR(100) NOT NULL,
                    description TEXT,
                    ip_address VARCHAR(45),
                    user_agent TEXT,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    INDEX idx_user_id (user_id),
                    INDEX idx_activity_type (activity_type)
                    -- 移除 created_at 索引,让时间范围查询在高QPS时变慢
                ) ENGINE=InnoDB
            """)
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS product_reviews (
                    id INT AUTO_INCREMENT PRIMARY KEY,
                    product_id INT NOT NULL,
                    user_id INT NOT NULL,
                    rating TINYINT NOT NULL CHECK (rating >= 1 AND rating <= 5),
                    title VARCHAR(255),
                    comment TEXT,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                    is_approved BOOLEAN DEFAULT FALSE,
                    INDEX idx_product_id (product_id),
                    INDEX idx_user_id (user_id),
                    INDEX idx_rating (rating),
                    INDEX idx_created_at (created_at),
                    FULLTEXT INDEX idx_comment (comment),
                    FOREIGN KEY (product_id) REFERENCES products(id) ON DELETE CASCADE,
                    FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
                ) ENGINE=InnoDB
            """)
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS categories (
                    id INT AUTO_INCREMENT PRIMARY KEY,
                    name VARCHAR(100) NOT NULL,
                    parent_id INT DEFAULT NULL,
                    description TEXT,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                    INDEX idx_parent_id (parent_id),
                    FOREIGN KEY (parent_id) REFERENCES categories(id) ON DELETE SET NULL
                ) ENGINE=InnoDB
            """)
            logging.info("所有表创建成功")
        conn.commit()
    except Exception as e:
        logging.error(f"创建表失败: {e}")
        conn.rollback()
    finally:
        conn.close()
# ==================== 数据填充函数(大幅增加数据量)====================
def generate_users(num_users=50000):
    """生成更多用户数据"""
    conn = get_connection()
    try:
        with conn.cursor() as cursor:
            batch_size = 5000
            for i in range(0, num_users, batch_size):
                values = [
                    (f"user{i + j}", f"user{i + j}@example.com", f"hash{random.randint(100000, 999999)}")
                    for j in range(batch_size)
                ]
                cursor.executemany(
                    "INSERT IGNORE INTO users (username, email, password_hash) VALUES (%s, %s, %s)",
                    values
                )
                conn.commit()
                logging.info(f"已插入 {min(i + batch_size, num_users)}/{num_users} 用户")
        logging.info("用户数据插入完成")
    except Exception as e:
        logging.error(f"插入用户数据失败: {e}")
        conn.rollback()
    finally:
        conn.close()
def generate_categories():
    conn = get_connection()
    try:
        with conn.cursor() as cursor:
            categories = [
                ('Electronics', None, 'Electronic devices and accessories'),
                ('Computers', None, 'Computers and laptops'),
                ('Smartphones', 1, 'Smartphones and mobile devices'),
                ('Laptops', 2, 'Laptops and notebooks'),
                ('Tablets', 1, 'Tablets and iPads'),
                ('Cameras', 1, 'Digital cameras and accessories'),
                ('TVs', 1, 'Televisions and home theater'),
                ('Books', None, 'Books and magazines'),
                ('Fiction', 8, 'Fiction books'),
                ('Non-Fiction', 8, 'Non-fiction books'),
                ('Clothing', None, 'Clothing and apparel'),
                ('Men', 11, "Men's clothing"),
                ('Women', 11, "Women's clothing"),
                ('Shoes', 11, 'Shoes and footwear'),
                ('Home', None, 'Home and kitchen'),
                ('Furniture', 15, 'Furniture'),
                ('Kitchen', 15, 'Kitchen appliances'),
                ('Garden', 15, 'Garden and outdoor'),
            ]
            cursor.executemany(
                "INSERT IGNORE INTO categories (name, parent_id, description) VALUES (%s, %s, %s)",
                categories
            )
            conn.commit()
            logging.info("分类数据插入完成")
    except Exception as e:
        logging.error(f"插入分类数据失败: {e}")
        conn.rollback()
    finally:
        conn.close()
def generate_products(num_products=20000):
    """生成更多产品数据"""
    conn = get_connection()
    try:
        with conn.cursor() as cursor:
            cursor.execute("SELECT id FROM categories")
            category_ids = [row[0] for row in cursor.fetchall()]
            batch_size = 2000
            product_names = ["Pro", "Elite", "Premium", "Advanced", "Standard", "Basic", "Deluxe", "Ultimate"]
            product_types = ["Laptop", "Smartphone", "Tablet", "Camera", "TV", "Monitor", "Headphones", "Speaker"]
            for i in range(0, num_products, batch_size):
                values = []
                for _ in range(batch_size):
                    name = f"{random.choice(product_names)} {random.choice(product_types)} {random.randint(1, 10000)}"
                    price = round(random.uniform(10, 2000), 2)
                    values.append(
                        (name, f"Description of {name}", price, random.choice(category_ids), random.randint(0, 1000)))
                cursor.executemany(
                    "INSERT IGNORE INTO products (name, description, price, category_id, stock_quantity) VALUES (%s, %s, %s, %s, %s)",
                    values
                )
                conn.commit()
                logging.info(f"已插入 {min(i + batch_size, num_products)}/{num_products} 产品")
        logging.info("产品数据插入完成")
    except Exception as e:
        logging.error(f"插入产品数据失败: {e}")
        conn.rollback()
    finally:
        conn.close()
def generate_orders(num_orders=100_000):
    """生成大量订单"""
    conn = get_connection()
    try:
        with conn.cursor() as cursor:
            cursor.execute("SELECT id FROM users")
            user_ids = [row[0] for row in cursor.fetchall()]
            cursor.execute("SELECT id, price FROM products")
            products = {row[0]: row[1] for row in cursor.fetchall()}
            product_ids = list(products.keys())
            batch_size = 10000
            start_date = datetime.now() - timedelta(days=365)
            for i in range(0, num_orders, batch_size):
                values = []
                for _ in range(batch_size):
                    user_id = random.choice(user_ids)
                    total = round(sum(products[random.choice(product_ids)] * random.randint(1, 3) for _ in range(3)), 2)
                    created_at = start_date + timedelta(
                        days=random.randint(0, 365),
                        hours=random.randint(0, 23),
                        minutes=random.randint(0, 59)
                    )
                    values.append((
                        user_id, total, random.choice(['pending', 'paid', 'shipped', 'delivered', 'cancelled']),
                        created_at, created_at, 'Address',
                        'credit_card'))
                cursor.executemany(
                    "INSERT INTO orders (user_id, total_amount, status, created_at, updated_at, shipping_address, payment_method) VALUES (%s, %s, %s, %s, %s, %s, %s)",
                    values
                )
                conn.commit()
                logging.info(f"已插入 {min(i + batch_size, num_orders)}/{num_orders} 订单")
        logging.info("订单数据插入完成")
    except Exception as e:
        logging.error(f"插入订单数据失败: {e}")
        conn.rollback()
    finally:
        conn.close()
def generate_order_items():
    """生成订单项"""
    conn = get_connection()
    try:
        with conn.cursor() as cursor:
            cursor.execute("SELECT id FROM orders")
            order_ids = [row[0] for row in cursor.fetchall()]
            cursor.execute("SELECT id, price FROM products")
            products = {row[0]: row[1] for row in cursor.fetchall()}
            batch_size = 50000
            values = []
            for order_id in order_ids:
                for _ in range(random.randint(1, 5)):
                    product_id = random.choice(list(products.keys()))
                    qty = random.randint(1, 3)
                    price = products[product_id]
                    values.append((order_id, product_id, qty, price, qty * price, datetime.now()))
                    if len(values) >= batch_size:
                        cursor.executemany(
                            "INSERT INTO order_items (order_id, product_id, quantity, price, subtotal, created_at) VALUES (%s, %s, %s, %s, %s, %s)",
                            values
                        )
                        conn.commit()
                        logging.info(f"已插入 {len(values)} 订单项")
                        values = []
            if values:
                cursor.executemany(
                    "INSERT INTO order_items (order_id, product_id, quantity, price, subtotal, created_at) VALUES (%s, %s, %s, %s, %s, %s)",
                    values
                )
                conn.commit()
                logging.info(f"已插入 {len(values)} 订单项")
        logging.info("订单项数据插入完成")
    except Exception as e:
        logging.error(f"插入订单项数据失败: {e}")
        conn.rollback()
    finally:
        conn.close()
def generate_product_reviews(num_reviews=200000):
    """生成大量评论"""
    conn = get_connection()
    try:
        with conn.cursor() as cursor:
            cursor.execute("SELECT id FROM users")
            user_ids = [row[0] for row in cursor.fetchall()]
            cursor.execute("SELECT id FROM products")
            product_ids = [row[0] for row in cursor.fetchall()]
            batch_size = 10000
            titles = ["Great!", "Good", "Ok", "Bad", "Excellent"]
            comments = ["Good product.", "Nice.", "Could be better.", "Excellent quality."]
            for i in range(0, num_reviews, batch_size):
                values = [
                    (
                        random.choice(product_ids),
                        random.choice(user_ids),
                        random.randint(1, 5),
                        random.choice(titles),
                        random.choice(comments),
                        datetime.now() - timedelta(days=random.randint(0, 180)),
                        datetime.now() - timedelta(days=random.randint(0, 180)),
                        random.random() > 0.2
                    )
                    for _ in range(batch_size)
                ]
                cursor.executemany(
                    "INSERT INTO product_reviews (product_id, user_id, rating, title, comment, created_at, updated_at, is_approved) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)",
                    values
                )
                conn.commit()
                logging.info(f"已插入 {min(i + batch_size, num_reviews)}/{num_reviews} 评论")
        logging.info("产品评论数据插入完成")
    except Exception as e:
        logging.error(f"插入评论失败: {e}")
        conn.rollback()
    finally:
        conn.close()
def generate_user_activity_logs(num_logs=500000):
    """生成大量日志"""
    conn = get_connection()
    try:
        with conn.cursor() as cursor:
            cursor.execute("SELECT id FROM users")
            user_ids = [row[0] for row in cursor.fetchall()]
            batch_size = 50000
            types = ['login', 'view_product', 'add_to_cart', 'purchase']
            ips = ['192.168.1.1', '10.0.0.1', '172.16.0.1']
            agents = ['Mozilla/5.0', 'iPhone', 'Android']
            for i in range(0, num_logs, batch_size):
                values = [
                    (
                        random.choice(user_ids),
                        random.choice(types),
                        f"Action: {random.choice(types)}",
                        random.choice(ips),
                        random.choice(agents),
                        datetime.now() - timedelta(days=random.randint(0, 90))
                    )
                    for _ in range(batch_size)
                ]
                cursor.executemany(
                    "INSERT INTO user_activity_logs (user_id, activity_type, description, ip_address, user_agent, created_at) VALUES (%s, %s, %s, %s, %s, %s)",
                    values
                )
                conn.commit()
                logging.info(f"已插入 {min(i + batch_size, num_logs)}/{num_logs} 日志")
        logging.info("日志数据插入完成")
    except Exception as e:
        logging.exception(f"插入日志失败: {e}")
        conn.rollback()
    finally:
        conn.close()
def populate_database():
    # 否则执行数据插入逻辑
    logging.info("开始填充数据...")
    generate_users(50_000)
    generate_categories()
    generate_products(20_000)
    generate_orders(100_000)
    generate_order_items()
    generate_product_reviews(200_000)
    generate_user_activity_logs(500_000)
    logging.info("数据填充完成")
# ==================== 查询函数(快速查询,产生高QPS)====================
def normal_queries(query_type):
    """快速查询,产生高QPS,增加CPU压力"""
    # 简化:每个线程一个长连接
    conn = get_connection()
    cursor = conn.cursor(pymysql.cursors.DictCursor)
    try:
        query_type = query_type if query_type <= 10 else query_type % 10
        while True:
            start_time = time.time()
            try:
                if query_type == 1:
                    user_id = random.randint(1, 50000)
                    cursor.execute("SELECT id, username, status FROM users WHERE id = %s", (user_id,))
                    cursor.fetchone()
                elif query_type == 2:
                    cursor.execute("SELECT id, username FROM users WHERE status = %s LIMIT 10",
                                   (random.choice(['active', 'inactive']),))
                    cursor.fetchall()
                elif query_type == 3:
                    order_id = random.randint(1, 100000)
                    cursor.execute(
                        "SELECT o.id, o.total_amount, o.status, u.username FROM orders o JOIN users u ON o.user_id = u.id WHERE o.id = %s",
                        (order_id,))
                    cursor.fetchone()
                elif query_type == 4:
                    keyword = random.choice(['Pro', 'Smart', 'Laptop'])
                    cursor.execute(
                        "SELECT id, name, price FROM products WHERE MATCH(name) AGAINST (%s IN NATURAL LANGUAGE MODE) LIMIT 5",
                        (keyword,))
                    cursor.fetchall()
                elif query_type == 5:
                    product_id = random.randint(1, 20000)
                    cursor.execute("SELECT id, name, price, category_id FROM products WHERE id = %s", (product_id,))
                    cursor.fetchone()
                elif query_type == 6:
                    username = f"user{random.randint(0, 49999)}"
                    cursor.execute("SELECT id, username, email FROM users WHERE username = %s", (username,))
                    cursor.fetchone()
                elif query_type == 7:
                    order_item_id = random.randint(1, 300000)
                    cursor.execute("SELECT id, order_id, product_id, quantity FROM order_items WHERE id = %s",
                                   (order_item_id,))
                    cursor.fetchone()
                elif query_type == 8:
                    # 简单的JOIN查询,增加CPU压力
                    user_id = random.randint(1, 50000)
                    cursor.execute(
                        "SELECT u.id, u.username, COUNT(o.id) as order_count FROM users u LEFT JOIN orders o ON u.id = o.user_id WHERE u.id = %s GROUP BY u.id, u.username",
                        (user_id,))
                    cursor.fetchone()
                elif query_type == 9:
                    # 简单的聚合查询,增加CPU压力
                    cursor.execute(
                        "SELECT status, COUNT(*) as cnt FROM orders WHERE status IN ('cancelled') GROUP BY status LIMIT 5")
                    cursor.fetchall()
                elif query_type == 10:
                    # 简单的范围查询,增加CPU压力
                    product_id = random.randint(1, 20000)
                    cursor.execute("SELECT id, name, price FROM products WHERE id BETWEEN %s AND %s LIMIT 10",
                                   (product_id, product_id + 100))
                    cursor.fetchall()
            except Exception as e:
                try:
                    conn.rollback()
                except:
                    pass
                continue
            finally:
                end_time = time.time()
                if end_time - start_time > 2:
                    logging.info(f"查询类型: {query_type}, 耗时: {end_time - start_time:.2f}s")
    finally:
        cursor.close()
        conn.close()
def start_worker_process(concurrent_tasks):
    """工作进程函数,创建指定数量的线程执行任务"""
    logging.info(f"工作进程启动,创建 {concurrent_tasks} 个线程")
    with ThreadPoolExecutor(max_workers=concurrent_tasks) as executor:
        # 提交normal_queries任务
        futures = [executor.submit(normal_queries, i) for i in range(concurrent_tasks)]
        # 等待所有任务完成
        for future in futures:
            try:
                future.result()
            except Exception as e:
                logging.error(f"任务执行出错: {e}")
# ==================== 主程序 ====================
if __name__ == "__main__":
    # 设置日志
    log_file = setup_logger()
    logging.info(f"日志文件已创建: {log_file}")
    logging.info("可以使用 'tail -f {log_file}' 命令实时查看日志")
    exit_signal = False
    def signal_handler(signum, frame):
        global exit_signal
        logging.info("检测到退出信号,程序将终止...")
        exit()
    signal.signal(signal.SIGINT, signal_handler)
    is_create = 'y'
    if is_create == 'y':
        logging.info("########## 开始创建数据库数据... ##########")
        create_database_if_not_exists()
        # 总是创建表,确保表存在
        logging.info("########## 开始创建表... ##########")
        create_tables()
        logging.info("########## 开始填充数据... ##########")
        populate_database()  # 填充大数据量
    logging.info("########## 开始准备进行场景构造... ##########")
    # 计算需要的进程数, mysql rds.mysql.s2.large规格,400并发 = 61%
    cpu_usage_ratio = 0.9
    total_concurrent = math.ceil(cpu_usage_ratio / (0.61/400))
    concurrency_per_process = 200
    num_processes = (total_concurrent + concurrency_per_process - 1) // concurrency_per_process  # 向上取整
    logging.info(f"将启动 {num_processes} 个进程,每个进程处理 {concurrency_per_process} 个并发任务,目标CPU使用率 {cpu_usage_ratio*100}%")
    # 创建并启动多个进程
    processes = []
    for i in range(num_processes):
        # 计算当前进程的并发数
        current_concurrency = min(concurrency_per_process, total_concurrent - i * concurrency_per_process)
        # 创建进程
        p = multiprocessing.Process(target=start_worker_process, args=(current_concurrency,))
        p.start()
        processes.append(p)
        logging.info(f"已启动进程 {p.pid},处理 {current_concurrency} 个并发任务")
    try:
        # 监控进程运行
        while not exit_signal:
            logging.info(f"加压进行中,当前时间是:{time.strftime('%Y-%m-%d %H:%M:%S')}")
            time.sleep(5)
        # 终止所有进程
        for p in processes:
            p.terminate()
            p.join()
        logging.info("所有进程已终止,程序结束")
    except KeyboardInterrupt:
        logging.info("收到键盘中断信号,正在终止所有进程...")
        for p in processes:
            p.terminate()
            p.join()
        logging.info("所有进程已终止")


👉欢迎 钉钉搜索群号:106730017609 入群交流

👉立即上手:登录云数据库RDS控制台

👉查看更多文档:RDS AI助手

👉「RDS AI助手」现在免费公测中,欢迎留下您的诉求:https://survey.aliyun.com/apps/zhiliao/m3RVhe0m4

相关文章
|
2月前
|
监控 网络协议 Ubuntu
Debian网络延迟排查指南(从零开始诊断网络卡顿问题)
本文介绍如何在Debian系统中诊断网络延迟问题,涵盖ping、traceroute、mtr等工具的使用方法,帮助用户定位延迟根源,优化网络性能。适合Linux新手快速掌握网络诊断技巧。
|
3月前
|
机器人 数据挖掘 API
一个销售数据分析机器人的诞生:看 Dify 如何在 DMS 助力下实现自动化闭环
Dify 作为一款低代码 AI 应用开发平台,凭借其直观的可视化工作流编排能力,极大降低了大模型应用的开发门槛。
511 22
一个销售数据分析机器人的诞生:看 Dify 如何在 DMS 助力下实现自动化闭环
|
3月前
|
关系型数据库 MySQL Java
开源PolarDB-X备份恢复操作实操
作者介绍: 付文革,航天壹进制(江苏)信息科技有限公司产品研发,专注于数据库备份,主攻MySQL相关数据库以及各种国产分布式数据库的备份恢复,主要使用Java 、Python、Shell等编程语言 航天壹进制(江苏)信息科技有限公司(简称航天壹进制)作为中国航天科工集团有限公司旗下上市公司航天工业发展股份有限公司的全资下属企业,专注于数据安全领域,自主研发并提供数据保护与业务连续性管理产品、解决方案及服务。
|
2月前
|
关系型数据库 分布式数据库 数据库
议程抢先看|2026阿里云PolarDB开发者大会,重磅来袭
2026年1月20日,阿里云PolarDB开发者大会将于上海五角场凯悦酒店举行!聚焦数据库前沿技术,1场主论坛+3场分论坛,探讨行业趋势与创新实践。议程精彩,报名从速!
|
3月前
|
人工智能 Java API
Java 正式进入 Agentic AI 时代:Spring AI Alibaba 1.1 发布背后的技术演进
Spring AI Alibaba 1.1 正式发布,提供极简方式构建企业级AI智能体。基于ReactAgent核心,支持多智能体协作、上下文工程与生产级管控,助力开发者快速打造可靠、可扩展的智能应用。
2955 43
|
3月前
|
存储 人工智能 关系型数据库
钉钉ONE选用阿里云PolarDB数据库,实现百亿级数据的高效向量检索
阿里云瑶池PolarDB PostgreSQL版作为钉钉ONE的底层数据库,凭借分布式架构与向量检索能力,支撑百亿级数据、高并发与AI智能推荐,助力钉钉实现“事找人”的办公新范式。
|
4月前
|
SQL 关系型数据库 MySQL
阿里云RDS云数据库全解析:产品功能、收费标准与活动参考
与云服务器ECS一样,关系型数据库RDS也是很多用户上云必买的热门云产品之一,阿里云的云数据库RDS主要包含RDS MySQL、RDS SQL Server、RDS PostgreSQL、RDS MariaDB等几个关系型数据库,并且提供了容灾、备份、恢复、监控、迁移等方面的全套解决方案,帮助您解决数据库运维的烦恼。本文为大家介绍阿里云的云数据库 RDS主要产品及计费方式、收费标准以及活动等相关情况,以供参考。