目背景
常见的MySQL、Oracle、SQLServer等数据库都是基于C/S架构设计的,即(客户端/服务器)架构,也就是说我们对数据库的操作相当于一个客户端,这个客户端使用既定的API把SQL语句通过网络发送给服务器端,MySQL Server执行完SQL语句后将结果通过网络返回客户端。通过网络通信的话就要涉及到TCP/IP协议里的“三次握手”、“四次挥手”等,大量访问时,每一个用户的请求都会对应一次“三次握手”、“四次挥手”的过程,这个性能的消耗是相当严重的;
对于数据库本质上是对磁盘的操作,如果对数据库的访问过多,即(I/O)操作过多,会出现访问瓶颈。
而常见的解决数据库访问瓶颈的方法有两种:
一、为减少磁盘 I/O的次数,在数据库和服务器的应用中间加一层 缓存数据库(例如:Redis、Memcache);
二、增加 连接池,来减少高并发情况下大量 TCP三次握手、MySQL Server连接认证、MySQL Server关闭连接回收资源和TCP四次挥手 所耗费的性能。
mysqlconn.hpp 实现连接 增删改查操作
#include <mysql/mysql.h> #include <iostream> #include <string> #include <ctime> #include <chrono> #include <memory> #define INFO 1 #define WARNING 2 #define ERROR 3 #define FATAL 4 #define LOG(level, message) Log(#level, message, __FILE__, __LINE__) void Log(std::string level, std::string message, std::string file_name, int line) { std::cout<<"["<<level<<"]["<<time(nullptr)<<"]["<<message<<"]["<<file_name<<"]["<<line<<"]"<<std::endl; } class mysqlconn{ private: MYSQL *m_conn = nullptr; MYSQL_RES* m_res = nullptr;//查询结果集 MYSQL_ROW m_row;//记录结构体 void freeResult() { if(m_res) { mysql_free_result(m_res); m_res = nullptr; } } std::chrono::steady_clock::time_point m_aliveTime; public: mysqlconn() { //获取一个MYSQL句柄 m_conn = mysql_init(nullptr); //设置字符集 mysql_set_character_set(m_conn,"utf8"); } ~mysqlconn() { freeResult(); if(m_conn != nullptr) { mysql_close(m_conn); } } bool query(std::string sql){ freeResult(); if(mysql_query(m_conn, sql.c_str())){ return false; } m_res = mysql_store_result(m_conn); return true; } //更新 修改 删除 bool update(std::string sql){ return mysql_query(m_conn, sql.c_str()); } //连接指定的数据库 bool connect(std::string ip, std::string user, std::string passwd, std::string dbName, unsigned int port) { return mysql_real_connect(m_conn, ip.c_str(), user.c_str(), passwd.c_str(), dbName.c_str(), port,nullptr,0) != nullptr; } //遍历得到的结果集 bool next() { if(m_res != nullptr) { m_row = mysql_fetch_row(m_res); //获取一行 if(m_row != nullptr) { return true; } } return false; } //获取结果集里的值 std::string value(int index){ int rowCount = mysql_num_fields(m_res); //返回结果集中字段数目 if(index >= rowCount || index < 0) { return std::string(); } char* ans = m_row[index]; unsigned long length = mysql_fetch_lengths(m_res)[index]; return std::string(ans,length); } //事务处理提交方式 bool transaction() { return mysql_autocommit(m_conn,false); } //事务提交 bool commit() { return mysql_commit(m_conn); } //事务回滚 bool rollback() { return mysql_rollback(m_conn); } //更新空闲时间点 void refreshAliveTime(){ m_aliveTime = std::chrono::steady_clock::now(); } //计算连接空闲时长 long long getAliveTime() { std::chrono::duration<double> diff = std::chrono::steady_clock::now() - m_aliveTime; //nanosecods 纳秒 return diff.count(); } };
connpool.hpp 连接池
#include <mutex> #include <condition_variable> #include <queue> #include <fstream> #include <thread> #include "mysqlconn.hpp" class ConnectionPool { private: std::string m_user; std::string m_passwd; std::string m_ip; std::string m_dbName; unsigned short m_port; //连接的上限和下限,自动维护线程池的连接数 int m_minSize; int m_maxSize; //连接的超时时长 int m_timeout; int m_maxIdleTime; //线程同步 std::mutex m_mutexQ; //互斥锁 std::condition_variable m_cond; //条件变量 std::queue<mysqlconn *> m_connectionQ; //共享资源 public: //对外接口,获取线程池 //静态局部变量是线程安全的 static ConnectionPool *getConnectPool() { static ConnectionPool pool; return &pool; } //获取线程池中的连接 std::shared_ptr<mysqlconn> getConnection() { //需要操作共享资源 std::unique_lock<std::mutex> locker(m_mutexQ); //判断连接池队列为空 while(m_connectionQ.empty()) { if(std::cv_status::timeout == m_cond.wait_for(locker, std::chrono::milliseconds(m_timeout))) { if(m_connectionQ.empty()) { continue; } } } //自定义shared_ptr析构方法,重新将连接放回到连接池中,而不是销毁 std::shared_ptr<mysqlconn> connptr(m_connectionQ.front(),[this](mysqlconn *conn){ std::unique_lock<std::mutex> locker(m_mutexQ); conn->refreshAliveTime(); m_connectionQ.push(conn); }); //弹出,放到了队尾 m_connectionQ.pop(); m_cond.notify_all(); return connptr; } //防止外界通过拷贝构造函数和移动拷贝构造函数 ConnectionPool(const ConnectionPool &obj) = delete; ConnectionPool& operator=(const ConnectionPool& obj) = delete; ~ConnectionPool() { while(!m_connectionQ.empty()) { mysqlconn *conn = m_connectionQ.front(); m_connectionQ.pop(); delete conn; } } private: //构造函数私有化 ConnectionPool() { //加载配置文件 if(!parseJsonFile()) { return; } //创建最少连接数 for(int i=0;i<m_minSize;++i) { addConnect(); } //创建子线程用于检测并创建新的连接 std::thread producer(&ConnectionPool::produceConnection,this); //销毁连接,检测并销毁连接 std::thread recycler(&ConnectionPool::recycleConnection,this); //设置线程分离 producer.detach(); recycler.detach(); } //解析配置文件 bool parseJsonFile(){ //可以通过配置文件配置数据 这里写死 m_ip = "127.0.0.1"; m_user = "pig"; m_passwd = "test1234"; m_dbName = "test"; m_port = 3306; m_minSize = 10; m_maxSize = 100; m_timeout = 10; m_maxIdleTime = 20; return true; } //任务函数 void produceConnection() //生产数据库连接 { //通过轮询的方式不断的去检测 while(true) { //操作共享资源,需要加锁 std::unique_lock<std::mutex> locker(m_mutexQ); //判断连接数是否达到容量,如果大于等于容量则需要阻塞一段时间 while (m_connectionQ.size() >= m_maxSize) { m_cond.wait(locker); } addConnect(); m_cond.notify_all(); //唤醒消费者 } } void recycleConnection() //销毁数据库连接 { while(true) { //休眠一定的时长 std::this_thread::sleep_for(std::chrono::milliseconds(500)); std::unique_lock<std::mutex> locker(m_mutexQ); //让线程池中最少保持用于 m_minSize个线程 while(m_connectionQ.size() > m_minSize) { mysqlconn *recyConn = m_connectionQ.front(); //如果超时则销毁 if(recyConn->getAliveTime() >= m_maxIdleTime) { m_connectionQ.pop(); delete recyConn; } else { break; } } } } void addConnect() //添加连接 { mysqlconn *conn = new mysqlconn; conn->connect(m_ip,m_user,m_passwd,m_dbName,m_port); conn->refreshAliveTime(); m_connectionQ.push(conn); } };
main.cpp 测试主函数 单线程 连接池 多线程连接池
#include "connpool.hpp" void pthread1_no_pool() { clock_t begin = clock(); std::unique_ptr<mysqlconn> sp = std::make_unique<mysqlconn>(); bool connflag = sp->connect("127.0.0.1","pig","test1234", "test",3306); if(connflag == false) return; for (int i = 0; i < 4 * 1000; ++i) { sp->refreshAliveTime(); char sql[1024] = { 0 }; sprintf(sql, "insert into tb_file values('%d','%s','%s');", i, "pthread1_no_pool", "1.png"); auto upflag = sp->update(sql); } clock_t end = clock(); std::cout << "pthread1_no_pool:" << (end - begin) << "ms" << std::endl; } void pthread1_use_pool(){ ConnectionPool *cp = ConnectionPool::getConnectPool(); clock_t begin = clock(); std::shared_ptr<mysqlconn> sp = cp->getConnection(); for (int i = 0; i < 1000 * 4; ++i) { char sql[1024] = { 0 }; sprintf(sql, "insert into tb_file(id, name, file) values('%d','%s','%s');", i, "pthread1_use_pool", "1.png"); sp->update(sql); } clock_t end = clock(); std::cout <<"pthread1_use_pool:" << (end - begin) << "ms" << std::endl; } void pthread4_no_pool() { clock_t begin = clock(); std::thread tt[4]; for(int n = 0; n < 4; n++){ tt[n] = std::thread([=]{ std::unique_ptr<mysqlconn> sp = std::make_unique<mysqlconn>(); sp->connect("127.0.0.1","pig","test1234", "test",3306); for (int i = 0; i < 1000 * (n + 1); ++i) { sp->refreshAliveTime(); char sql[1024] = { 0 }; sprintf(sql, "insert into tb_file values('%d','%s','%s');", i, "pthread1_no_pool", "1.png"); sp->update(sql); } }); } for(int i = 0; i < 4; i++){ tt[i].join(); } clock_t end = clock(); std::cout <<"pthread4_no_pool:" << (end - begin) << "ms" << std::endl; } void work(ConnectionPool *cp , int l){ std::shared_ptr<mysqlconn> sp = cp->getConnection(); for (int i = l * 1000; i < 1000 * (l + 1); ++i) { char sql[1024] = { 0 }; sprintf(sql, "insert into tb_file values('%d','%s','%s');", i, "pthread1_use_pool", "1.png"); auto upflag = sp->update(sql); if(upflag != 0) { std::cout <<"pthread4_use_pool:" << upflag << sql << std::endl; continue; } } } void pthread4_use_pool() { ConnectionPool *cp = ConnectionPool::getConnectPool(); clock_t begin = clock(); std::thread tt[4]; for(int i = 0; i < 4; i++){ tt[i] = std::thread(work, cp, i); } for(int i = 0; i < 4; i++){ tt[i].join(); } clock_t end = clock(); std::cout <<"pthread4_use_pool:" << (end - begin) << "ms" << std::endl; } // g++ -o main main.cpp connpool.hpp mysqlconn.hpp -lmysqlclient -std=c++14 -lpthread int main() { /*单线程 不使用连接池*/ //LOG(INFO, "pthread1_no_pool test:"); //pthread1_no_pool(); /*单线程 使用连接池*/ //LOG(INFO, "pthread1_use_pool test:"); //pthread1_use_pool(); /*多线程 不使用连接池*/ LOG(INFO, "pthread4_no_pool test:"); pthread4_no_pool(); /*多线程 使用连接池*/ //LOG(INFO, "pthread4_use_pool test:"); //pthread4_use_pool(); return 0; }
单线程 无连接池 4000条数据插入
单线程 连接池 4000条数据插入
4线程 无连接池
4线程 连接池
测试结果 和预期一样 多线程下使用连接池中的连接 比重复建立连接快很多
![结果](https://ucc.alicdn.com/images/user-upload-01/direct/5b15db6b9ade48b5b6f65b061a45b200.png参考