mysql连接池的实现

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: mysql连接池的实现

概要:本文介绍mysql连接池的实现,要求读者了解线程池

一、为什么需要mysql连接池?

资源复用 :不使用连接池,每次数据库请求都新建一条连接,将耗费系 统资源。 流程如下:

  1. 通过三次握手建立 TCP 连接
  2. MySQL 认证
  3. SQL 执行
  4. 通过四次挥手断开 TCP 连接

更快的系统响应速度

1.一次连接建立和销毁,可复用同一条连接多次执行 SQL 语句。

2.统一的连接管理,避免数据库连接泄露

二、mysql连接池运行原理

三、代码实现

1.结构体定义
typedef struct task_t {
    struct task_t *next; // 指向下一个任务节点
    int clientfd; // 客户端fd
    char SQL[MAX_SQL_LENGTH]; // SQL语句缓冲区
} task_t;
typedef struct task_queue_t { // task队列
    task_t *head; // 指向队列的第一个task节点
    task_t *tail; // 指向队列的最后一个task节点
    int block; // 阻塞标志
    pthread_spinlock_t lock; // 自旋锁变量
    pthread_mutex_t mutex; // 互斥锁变量
    pthread_cond_t cond; // 条件变量
} task_queue_t;
typedef struct argc {
    MYSQL *mysql;
    task_queue_t *queue;
} argc;
2.资源创建

a.任务队列

task_queue_t *task_queue_create() { // 创建一个任务队列
    int ret;
    task_queue_t *queue = (task_queue_t *)malloc(sizeof(task_queue_t));
    if (queue) {
        ret = pthread_mutex_init(&queue->mutex, NULL);
        if (ret == 0) {
            ret = pthread_cond_init(&queue->cond, NULL);
            if (ret == 0) {
                pthread_spin_init(&queue->lock, 0);
                queue->head = NULL;
                queue->tail = NULL;
                queue->block = 1;
                return queue;
            }
        }
        free(queue);
    }
    return NULL;
}

b.mysql连接句柄

void mysql_conn_init(MYSQL* mysql) {
    
    mysql_init(mysql); // 初始化mysql句柄
    // 连接到MySQL数据库
    mysql_real_connect(mysql, MYSQL_SERVER_IP, MYSQL_SERVER_USERNAME, MYSQL_SERVER_PASSWORD, 
    MYSQL_SERVER_DEFAULT_DB, MYSQL_SERVER_PORT, NULL, 0);
}
2.sql任务的添加、执行

a.push、pop

void add_task(task_queue_t *queue, task_t *task) { // 向任务队列中添加一个task
    pthread_spin_lock(&queue->lock);
    if (!queue->tail) {
        queue->tail->next = task;
        queue->tail = task;
    }
    else {
        queue->head = task;
        queue->tail = task;
    }
    pthread_spin_unlock(&queue->lock);
    pthread_cond_signal(&queue->cond);
}
void *pop_task(task_queue_t *queue) { // 从任务队列中取出一个任务
    pthread_spin_lock(&queue->lock);
    if (queue->head == NULL) {
        pthread_spin_unlock(&queue->lock);
        return NULL;
    }
    // 取出队列中第一个任务
    task_t *task;
    task = queue->head;
    queue->head = task->next;
    
    //判断队列是否为空
    if (queue->head == NULL) {
        queue->tail = queue->head;
    }
    pthread_spin_unlock(&queue->lock);
    return task;
}
task_t *get_task(task_queue_t *queue) { // 原子地从队列中取出一个任务
    task_t *task;
    while ((task = pop_task(queue)) == NULL) {
        pthread_mutex_lock(&queue->mutex);
        if (queue->block == 0) {
            pthread_mutex_unlock(&queue->mutex);
            return NULL;
        }
        pthread_cond_wait(&queue->cond, &queue->mutex);
        pthread_mutex_unlock(&queue->mutex);
    }
    return task;
}

b.执行任务并将mysql服务器的回复信息转发给客户端

void *mysql_conn_thrd_worker(void *argc) {
    
    task_t *task;
    struct argc *arg = (struct argc*)argc;
    task_queue_t *queue = arg->queue;
    MYSQL *mysql = arg->mysql;
    while (!destroy_pool) {
        task = get_task(queue);
        if(!task) break;
        // 执行其中的SQL语句
        mysql_real_query(mysql, task->SQL, strlen(task->SQL)); // 注入sql语句
        MYSQL_RES *res = mysql_store_result(mysql); // 存储mysql返回信息
        char response[64];
        // 将mysql回复结果cpoy进response
        if (res) {
            MYSQL_ROW row;
            row = mysql_fetch_row(res);
            if (row) {
                snprintf(response, sizeof(response), "%s", row[0]); // 假设结果为字符串类型,仅复制第一列数据
            } else {
                snprintf(response, sizeof(response), "No result found");
            }
            mysql_free_result(res); // 释放结果集
        } else {
            snprintf(response, sizeof(response), "Error retrieving result");
        }
        // 发送回复信息
        send(task->clientfd, response, 64, 0);
        // 销毁任务
        free(task);
    }
}

3.主线程接收客户端连接、sql请求

int tcp_server(task_queue_t *queue) {
    //  初始化服务器套接字
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0) {
        perror("create sockfd fail\n");
        return -1;
    }
    struct sockaddr_in addr;
    memset(&addr, 0, sizeof(struct sockaddr_in));
    
    addr.sin_family = AF_INET;
    addr.sin_port = htons(2024);
    addr.sin_addr.s_addr = htonl(INADDR_ANY);
    if (-1 == bind(sockfd, (struct sockaddr*)&addr, sizeof(addr))) {
        perror("bind fail\n");
        return -2;
    }
    
    if (-1 == listen(sockfd, 5)) {
        perror("listen fail\n");
        return -3;
    }
   
    //IO多路复用
    int epfd = epoll_create(1);
    struct epoll_event ev;
    ev.events = EPOLLIN;
    ev.data.fd = sockfd;
    
    epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev);
    struct epoll_event events[1024] = {0}; 
    while (1) {
        int ret = epoll_wait(epfd, events, 1024, -1);
        
        if (ret == -1) {
            perror("epoll_wait fail");
            break;
        }
        
        int i = 0;
        for (i = 0; i < ret; i++) {
            if (sockfd == events[i].data.fd) { 
                struct sockaddr_in clientaddr;
                socklen_t len = sizeof(clientaddr);
                int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);
                fcntl(clientfd, F_SETFL, SOCK_NONBLOCK);
            
                ev.events = EPOLLIN;
                ev.data.fd = clientfd;
                
                epoll_ctl(epfd, EPOLL_CTL_ADD, clientfd, &ev);
            } else if (events[i].events & EPOLLIN){
                while (1) {
                    char buffer[256] = {0};
                    int count = recv(events[i].data.fd, buffer, 10, 0);
                    if (count < 0) {//读取完毕或当前没有数据可读或者出错
                        if( (errno == EAGAIN) || (errno == EWOULDBLOCK)) {//读取完毕
                        printf("recv finished\n");
                        break;
                        }
                        //recv出错
                        close(events[i].data.fd);//关闭事件的套接字
                        break;
                    } 
                    else if (count == 0) {//对方发送fin断开连接
                        epoll_ctl(epfd, EPOLL_CTL_DEL, events[i].data.fd, &ev);//移除该事件
                        close(events[i].data.fd);//关闭事件的套接字
                        break;
                    }
                    else {//接收到数据
                        task_t *task;  // 将clientfd和buffer包装进task
                        init_task(task); // 初始化task的next指针
                        strcpy(task->SQL, buffer); // 装入sql请求
                        task->clientfd = events[i].data.fd; 
                        
                        add_task(queue, task); // 将此task添加到任务队列
                    } 
                }
            }
        }
    }
    close(sockfd);
    return 0;
}

4.main函数

int main() {
    // 工作队列
    task_queue_t *queue = task_queue_create();
    if (!queue) exit(1);
    
    // 创建MySQL连接
    MYSQL mysqls[NUM_MYSQL_CONNECTION] = {0};
    int i;
    for (i = 0; i < NUM_MYSQL_CONNECTION; i++) {
        mysql_conn_init(&mysqls[i]);
    }
    
    // 创建工作线程
    pthread_t threadid[NUM_MYSQL_CONNECTION];
    for (i = 0; i < NUM_MYSQL_CONNECTION; i++) {
        struct argc *argc = (struct argc *)malloc(sizeof(struct argc));
        argc->mysql = &mysqls[i];
        argc->queue = queue;
        pthread_create(&threadid[i], NULL, mysql_conn_thrd_worker, argc);
        free(argc);
    }
    tcp_server(queue); // Tcp 服务器,接收客户端连接,包装SQL请求信息并添加到工作队列
    return 0;
}

推荐学习 https://xxetb.xetslk.com/s/p5Ibb

相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
8月前
|
SQL 关系型数据库 MySQL
池式组件-Mysql连接池的原理与实现
池式组件-Mysql连接池的原理与实现
133 0
|
8月前
|
NoSQL Java 关系型数据库
mysql和redis连接池
mysql和redis连接池
64 0
|
8月前
|
SQL 关系型数据库 MySQL
Mysql连接池详解——原理部分
Mysql连接池详解——原理部分
|
8月前
|
关系型数据库 MySQL Java
mysql连接池和redis连接池的实现
mysql连接池和redis连接池的实现
92 0
|
8月前
|
关系型数据库 MySQL Java
mysql连接池和redis连接池
mysql连接池和redis连接池
201 0
|
8月前
|
SQL 关系型数据库 MySQL
Spring_jdbc数据连接池(mysql实现增、删、改、查)
Spring_jdbc数据连接池(mysql实现增、删、改、查)
57 0
|
6月前
|
SQL 监控 druid
MySQL连接池DataSource怎样使用?
**摘要:** 本文深入讨论了数据库连接池的重要性,特别是DruidDataSource,它是阿里巴巴的高性能Java数据库连接池。DruidDataSource不仅提供连接管理,还包括SQL监控和性能优化功能。文中通过代码示例展示了如何配置和使用DruidDataSource,包括在Java应用和Spring Boot中的集成,并提到了SQL执行监控和连接池参数的合理设置,强调了定期监控和使用内置监控工具以优化应用性能。
MySQL连接池DataSource怎样使用?
|
6月前
|
SQL 监控 druid
MySQL连接池DataSource是什么?
**摘要:** 本文探讨了数据库连接池在高并发Web应用中的重要性,聚焦于DruidDataSource,一个高效的Java数据库连接池组件。DruidDataSource提供连接池管理、SQL监控及性能优化功能。文中通过代码示例展示了如何配置和使用DruidDataSource,包括在Java应用中的直接配置和在Spring Boot中的集成。此外,还提到了使用技巧,如合理设置连接池参数、定期监控调整以及利用Druid的内置监控工具优化性能。
164 3
|
6月前
|
SQL 监控 druid
MySQL连接池DataSource怎么使用?
**摘要:** 本文探讨了数据库连接池在高并发Web应用中的重要性,特别聚焦于阿里巴巴的DruidDataSource。DruidDataSource是一个高效的Java数据库连接池,包含监控、SQL防护和日志功能。文中通过示例展示了如何配置和使用DruidDataSource,包括在Java应用中的直接配置和在Spring Boot中的集成,并提到了启用SQL监控。此外,还分享了设置连接池参数的技巧,如合理设定初始、最大和最小连接数,并强调了定期监控和使用内置监控工具优化性能的重要性。
281 0
|
7月前
|
关系型数据库 MySQL 分布式数据库
PolarDB操作报错合集之无法创建mysql的连接池什么导致的
在使用阿里云的PolarDB(包括PolarDB-X)时,用户可能会遇到各种操作报错。下面汇总了一些常见的报错情况及其可能的原因和解决办法:1.安装PolarDB-X报错、2.PolarDB安装后无法连接、3.PolarDB-X 使用rpm安装启动卡顿、4.PolarDB执行UPDATE/INSERT报错、5.DDL操作提示“Lock conflict”、6.数据集成时联通PolarDB报错、7.编译DN报错(RockyLinux)、8.CheckStorage报错(源数据库实例被删除)、9.嵌套事务错误(TDDL-4604)。