概要:本文介绍mysql连接池的实现,要求读者了解线程池
一、为什么需要mysql连接池?
资源复用 :不使用连接池,每次数据库请求都新建一条连接,将耗费系 统资源。 流程如下:
- 通过三次握手建立 TCP 连接
- MySQL 认证
- SQL 执行
- 通过四次挥手断开 TCP 连接
更快的系统响应速度:
1.一次连接建立和销毁,可复用同一条连接多次执行 SQL 语句。
2.统一的连接管理,避免数据库连接泄露
二、mysql连接池运行原理
三、代码实现
1.结构体定义
typedef struct task_t { struct task_t *next; // 指向下一个任务节点 int clientfd; // 客户端fd char SQL[MAX_SQL_LENGTH]; // SQL语句缓冲区 } task_t; typedef struct task_queue_t { // task队列 task_t *head; // 指向队列的第一个task节点 task_t *tail; // 指向队列的最后一个task节点 int block; // 阻塞标志 pthread_spinlock_t lock; // 自旋锁变量 pthread_mutex_t mutex; // 互斥锁变量 pthread_cond_t cond; // 条件变量 } task_queue_t; typedef struct argc { MYSQL *mysql; task_queue_t *queue; } argc;
2.资源创建
a.任务队列
task_queue_t *task_queue_create() { // 创建一个任务队列 int ret; task_queue_t *queue = (task_queue_t *)malloc(sizeof(task_queue_t)); if (queue) { ret = pthread_mutex_init(&queue->mutex, NULL); if (ret == 0) { ret = pthread_cond_init(&queue->cond, NULL); if (ret == 0) { pthread_spin_init(&queue->lock, 0); queue->head = NULL; queue->tail = NULL; queue->block = 1; return queue; } } free(queue); } return NULL; }
b.mysql连接句柄
void mysql_conn_init(MYSQL* mysql) { mysql_init(mysql); // 初始化mysql句柄 // 连接到MySQL数据库 mysql_real_connect(mysql, MYSQL_SERVER_IP, MYSQL_SERVER_USERNAME, MYSQL_SERVER_PASSWORD, MYSQL_SERVER_DEFAULT_DB, MYSQL_SERVER_PORT, NULL, 0); }
2.sql任务的添加、执行
a.push、pop
void add_task(task_queue_t *queue, task_t *task) { // 向任务队列中添加一个task pthread_spin_lock(&queue->lock); if (!queue->tail) { queue->tail->next = task; queue->tail = task; } else { queue->head = task; queue->tail = task; } pthread_spin_unlock(&queue->lock); pthread_cond_signal(&queue->cond); } void *pop_task(task_queue_t *queue) { // 从任务队列中取出一个任务 pthread_spin_lock(&queue->lock); if (queue->head == NULL) { pthread_spin_unlock(&queue->lock); return NULL; } // 取出队列中第一个任务 task_t *task; task = queue->head; queue->head = task->next; //判断队列是否为空 if (queue->head == NULL) { queue->tail = queue->head; } pthread_spin_unlock(&queue->lock); return task; } task_t *get_task(task_queue_t *queue) { // 原子地从队列中取出一个任务 task_t *task; while ((task = pop_task(queue)) == NULL) { pthread_mutex_lock(&queue->mutex); if (queue->block == 0) { pthread_mutex_unlock(&queue->mutex); return NULL; } pthread_cond_wait(&queue->cond, &queue->mutex); pthread_mutex_unlock(&queue->mutex); } return task; }
b.执行任务并将mysql服务器的回复信息转发给客户端
void *mysql_conn_thrd_worker(void *argc) { task_t *task; struct argc *arg = (struct argc*)argc; task_queue_t *queue = arg->queue; MYSQL *mysql = arg->mysql; while (!destroy_pool) { task = get_task(queue); if(!task) break; // 执行其中的SQL语句 mysql_real_query(mysql, task->SQL, strlen(task->SQL)); // 注入sql语句 MYSQL_RES *res = mysql_store_result(mysql); // 存储mysql返回信息 char response[64]; // 将mysql回复结果cpoy进response if (res) { MYSQL_ROW row; row = mysql_fetch_row(res); if (row) { snprintf(response, sizeof(response), "%s", row[0]); // 假设结果为字符串类型,仅复制第一列数据 } else { snprintf(response, sizeof(response), "No result found"); } mysql_free_result(res); // 释放结果集 } else { snprintf(response, sizeof(response), "Error retrieving result"); } // 发送回复信息 send(task->clientfd, response, 64, 0); // 销毁任务 free(task); } }
3.主线程接收客户端连接、sql请求
int tcp_server(task_queue_t *queue) { // 初始化服务器套接字 int sockfd = socket(AF_INET, SOCK_STREAM, 0); if (sockfd < 0) { perror("create sockfd fail\n"); return -1; } struct sockaddr_in addr; memset(&addr, 0, sizeof(struct sockaddr_in)); addr.sin_family = AF_INET; addr.sin_port = htons(2024); addr.sin_addr.s_addr = htonl(INADDR_ANY); if (-1 == bind(sockfd, (struct sockaddr*)&addr, sizeof(addr))) { perror("bind fail\n"); return -2; } if (-1 == listen(sockfd, 5)) { perror("listen fail\n"); return -3; } //IO多路复用 int epfd = epoll_create(1); struct epoll_event ev; ev.events = EPOLLIN; ev.data.fd = sockfd; epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev); struct epoll_event events[1024] = {0}; while (1) { int ret = epoll_wait(epfd, events, 1024, -1); if (ret == -1) { perror("epoll_wait fail"); break; } int i = 0; for (i = 0; i < ret; i++) { if (sockfd == events[i].data.fd) { struct sockaddr_in clientaddr; socklen_t len = sizeof(clientaddr); int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len); fcntl(clientfd, F_SETFL, SOCK_NONBLOCK); ev.events = EPOLLIN; ev.data.fd = clientfd; epoll_ctl(epfd, EPOLL_CTL_ADD, clientfd, &ev); } else if (events[i].events & EPOLLIN){ while (1) { char buffer[256] = {0}; int count = recv(events[i].data.fd, buffer, 10, 0); if (count < 0) {//读取完毕或当前没有数据可读或者出错 if( (errno == EAGAIN) || (errno == EWOULDBLOCK)) {//读取完毕 printf("recv finished\n"); break; } //recv出错 close(events[i].data.fd);//关闭事件的套接字 break; } else if (count == 0) {//对方发送fin断开连接 epoll_ctl(epfd, EPOLL_CTL_DEL, events[i].data.fd, &ev);//移除该事件 close(events[i].data.fd);//关闭事件的套接字 break; } else {//接收到数据 task_t *task; // 将clientfd和buffer包装进task init_task(task); // 初始化task的next指针 strcpy(task->SQL, buffer); // 装入sql请求 task->clientfd = events[i].data.fd; add_task(queue, task); // 将此task添加到任务队列 } } } } } close(sockfd); return 0; }
4.main函数
int main() { // 工作队列 task_queue_t *queue = task_queue_create(); if (!queue) exit(1); // 创建MySQL连接 MYSQL mysqls[NUM_MYSQL_CONNECTION] = {0}; int i; for (i = 0; i < NUM_MYSQL_CONNECTION; i++) { mysql_conn_init(&mysqls[i]); } // 创建工作线程 pthread_t threadid[NUM_MYSQL_CONNECTION]; for (i = 0; i < NUM_MYSQL_CONNECTION; i++) { struct argc *argc = (struct argc *)malloc(sizeof(struct argc)); argc->mysql = &mysqls[i]; argc->queue = queue; pthread_create(&threadid[i], NULL, mysql_conn_thrd_worker, argc); free(argc); } tcp_server(queue); // Tcp 服务器,接收客户端连接,包装SQL请求信息并添加到工作队列 return 0; }