安装MySQL
sudo apt-get install mysql-server //mysql服务器端
sudo apt-get install mysql-client //mysql客户端--方便操作
sudo apt-get install libmysqlclient-dev //mysql开发的库
数据库连接池概述
1.如果不使用连接池,每次server端与数据库的交互都需要创建线程和销毁线程。大量的创建线程和销毁线程是消耗系统资源的
2.mysql数据库与服务器端的是通过TCP进行通信的,按照TCP的通信规则,需要建立连接和断开连接,这一过程相对比较慢和繁琐
3.每次连接都需要进行身份验证
通过预先创建一定数量的连接,放到一个池子。当客户端有请求时,服务器端需要与mysql进行交互,那么只需要从池子里取出一个连接,当操作完成再将连接放到连接池中。如此以来避免了频繁的创建和销毁线程。
池子只需要一个就够用了,一个池子里有很多连接了。所以对于线程池这个类而言使用单例模式最为合适。
创建线程池的步骤
1.首先我们需要有连接,即我们需要提前准备好池子里的连接。如何实现这些连接呢?
很简单,对libmysqlclient-dev所提供的API进行封装。根据一般需求的话,一个连接应该具备的功能:连接、更新、查询、事务处理、连接的状态(空闲时长)
优化:当数据库使用一段时间后,连接的数目是趋于稳定的,所以我们设计的连接池具备自动管理连接数的功能
2.设计一个池子来存放和管理这些连接
使用队列:
1).队列的特点-->先进先出
2).我们不妨把每次更新后的连接插入到队列的尾部。那么队头部分的连接一定是空闲时长比较久的,如果超过了限定的最大空闲时长,我们就销毁该线程。
3).也可以使用小顶堆,不过没用必要,实现起来还特别麻烦
涉及的技术点
1.多线程编程
2.线程同步(互斥锁、条件变量)
3.chrono库(处理时间)
4.智能指针
5.lambda表达式
6.单例模式
7.对MySQL数据操作的API进行封装
8.STL容器
9.生产者消费者模型
10.Jsoncpp库
技术细节
对API的封装 ===> 可以理解为连接池里的一个连接
1.初始化,获得一个操作数据库实例的对象(面向对象编程)
MYSm_conn = mysql_init(nullptr); //返回一个MYSQL对象
mysql_set_character_set(m_conn,"utf8"); //设置编码
2.连接指定的数据库:数据库名、IP、用户名、密码等信息 ===> 用户提供的参数
MYSQL *mysql_real_connect(MYSQL *mysql, const char *host, const char *user, const char *passwd, const char *db, unsigned int port, const char *unix_socket, unsigned long clientflag)
3.数据库相关操作
更新:增加、删除、插入 int mysql_query(MYSQL *mysql, const char *q)
查询:执行mysql语句 int mysql_query(MYSQL *mysql, const char *q)
得到结果集 MYSQL_RES *mysql_store_result(MYSQL *mysql)
获取一行的内容 MYSQL_ROW mysql_fetch_row(MYSQL_RES *result)
事务处理:事务的操作、事务的回滚、事务提交
my_bool mysql_autocommit(MYSQL *mysql, my_bool auto_mode)
my_bool mysql_commit(MYSQL *mysql)
my_bool mysql_rollback(MYSQL *mysql)
设计连接池 ==> 管理上面封装好的连接
1.因为池子只需要一个,所以采用单例模式
何为单例模式:单例模式属于创建型模式,它提供了一种创建对象的最佳方式。这种模式涉及到一个单一的类,该类负责创建自己的唯一的对象。该对象外界可以获取到,但是不能创建和拷贝。
通俗的讲:单例模式就是说每一个类只有单一的一个对象,并且这个对象由该对象自己创建。如果让外部调用构造函数或者拷贝构造函数等,很难保证外部只创建了一个实例对象。为了让外部获取到
2.使用生产者消费者模型:三要素(生产者、消费者、容器)
因为多线程并且存在资源竞争关系,肯定会涉及到线程同步的问题
解决线程同步问题:
只使用互斥锁
使用条件变量+互斥锁:有效使用条件变量可以避免当前队列为空时,消费者之间无意义的竞争,当队列满时生产者之间无意义的竞争。
3.通过配置文件来读取连接MySQL数据库的信息
XXXconfig.json ==> 使用jsoncpp库解析json文件
具体步骤:
1).安装Jsoncpp第三方库:sudo apt-install Jsoncpp-dev
2).需要配合 <fstream> 使用
3).读配置文件
定义 Reader对象 rd
定义 Value对象 root
将ifs指向的json文件中的内容存放到root中,root是一个二维数组
判断是否构建成功: root.isobject()
root[key] 获取对应的值 eg: m_username = root["username"];
4.对外提供一个获取连接的接口
外部需要从连接池中获取连接,并通过该连接对数据库进行操作
如何设计该对外接口:
使用智能指针作为返回值,因为获取连接的是地址,通过指针传递可以节省时间
智能指针可以帮助我们自动释放连接,这个连接可能被多个工作线程获取到,使用shared_ptr会维护一个引用计数,当引用计数为0时,会自动释放指针指向的内存空间。防止使用者忘记释放内存,导致内存泄漏。
函数内部实现: 其实是消费者从队列中取产品的过程
c++11新特性,使用模板unique_lock来包装mutex,自动加锁和解锁。创建时加锁,析构时解锁。一般是局部变量,所以会及时析构
c++11新特性,使用wait_for等待一定时长,如果等待一定时长后队列还是为空,则不再阻塞等待,再次判断队列是否为空(轮询的方式)
使用lambda返回获取的连接,并在函数体内部更新该连接的空闲时间的起始点
5.连接池实现自动管理连接数(不够时创建连接,销毁空闲时间较长的连接)
通过两个子线程,因为主线程不能阻塞在这个地方,所以通过两个子线程取完成,并设置线程分离
不够时自动创建连接:
线程池维护了两个数据,一个最小连接数,一个最大连接数
销毁空闲时间较长的连接:
从队头取出连接,并将该连接的空闲时长与设定的最大空闲时长比较,如果大于等于就从队列中弹出并销毁
代码
#pragma one #include <mysql/mysql.h> #include <string> #include <chrono> using namespace std::chrono_literals; using namespace std; using namespace chrono; class MysqlConn { public: //创建一个MYSQL实例对象并设置字符集 MysqlConn(); //是放在资源 ~MysqlConn(); //连接指定的数据库 bool connect(string ip,string user,string passwd,string bdName,unsigned int port); //更新:增加、删除、修改 bool undate(string sql); //查询 bool query(string sql); //遍历得到的结果集 bool next(); //获取结果集里的值 string value(int index); //事务处理提交方式 bool transaction(); //事务提交 bool commit(); //事务回滚 bool rollback(); //更新空闲时间点 void refreshAliveTime(); //计算连接空闲时长 long long getAliveTime(); private: //每次搜索都需要更新结果集 void freeResult(); MYSQL *m_conn = nullptr; MYSQL_RES *m_result = nullptr; MYSQL_ROW m_row; steady_clock::time_point m_aliveTime; };
#include "MysqlConn.h" MysqlConn::MysqlConn() { //获取一个MYSQL句柄 m_conn = mysql_init(nullptr); //设置字符集 mysql_set_character_set(m_conn,"utf8"); } MysqlConn:: ~MysqlConn() { if(m_conn != nullptr) { mysql_close(m_conn); } freeResult(); } bool MysqlConn::connect(string ip,string user,string passwd,string dbName,unsigned int port) { /* MYSQL *mysql_real_connect(MYSQL *mysql, const char *host, const char *user, const char *passwd, const char *db, unsigned int port, const char *unix_socket, unsigned long clientflag) */ MYSQL *p = mysql_real_connect(m_conn,ip.c_str(),user.c_str(),passwd.c_str(),dbName.c_str(),port,nullptr,0); return p != nullptr; } bool MysqlConn::undate(string sql) { if(mysql_query(m_conn,sql.c_str())) { return false; } return true; } bool MysqlConn::query(string sql) { freeResult(); if(mysql_query(m_conn,sql.c_str())) { return false; } //得到结果集 m_result = mysql_store_result(m_conn); return true; } bool MysqlConn::next() { if(m_result != nullptr) { m_row = mysql_fetch_row(m_result); //获取一行 if(m_row != nullptr) { return true; } } return false; } string MysqlConn::value(int index) { int rowCount = mysql_num_fields(m_result); //返回结果集中字段数目 if(index >= rowCount || index < 0) { return string(); } char* ans = m_row[index]; unsigned long length = mysql_fetch_lengths(m_result)[index]; return string(ans,length); } bool MysqlConn::transaction() { return mysql_autocommit(m_conn,false); //自动提交改为自动提交 } bool MysqlConn::commit() { return mysql_commit(m_conn); } bool MysqlConn::rollback() { return mysql_rollback(m_conn); } void MysqlConn::freeResult() { if(m_result) { mysql_free_result(m_result); m_result = nullptr; } } void MysqlConn::refreshAliveTime() { m_aliveTime = steady_clock::now(); } //计算连接空闲时长 long long MysqlConn::getAliveTime() { nanoseconds res = steady_clock::now() - m_aliveTime; //nanosecods 纳秒 milliseconds mil = duration_cast<microseconds>(res); //将纳秒转成微妙 return mil.count(); }
#include "MysqlConn.h" #include <mutex> #include <condition_variable> #include <queue> class ConnectionPool { public: //对外接口,获取线程池 static ConnectionPool *getConnectPool(); //静态局部变量是线程安全的 //获取线程池中的连接 shared_ptr<MysqlConn> getConnection(); //防止外界通过拷贝构造函数和移动拷贝构造函数 ConnectionPool(const ConnectionPool &obj) = delete; ConnectionPool& operator=(const ConnectionPool& obj) = delete; ~ConnectionPool(); private: //构造函数私有化 ConnectionPool(); //解析配置文件 bool parseJsonFile(); //任务函数 void produceConnection(); //生产数据库连接 void recycleConnection(); //销毁数据库连接 void addConnect(); //添加连接 private: string m_user; string m_passwd; string m_ip; string m_dbName; unsigned short m_port; //连接的上限和下限,自动维护线程池的连接数 int m_minSize; int m_maxSize; //连接的超时时长 int m_timeout; int m_maxIdleTime; //线程同步 mutex m_mutexQ; //互斥锁 condition_variable m_cond; //条件变量 queue<MysqlConn *>m_connectionQ; //共享资源 };
#include "ConnectionPool.h" #include <fstream> #include <thread> #include <jsoncpp/json/json.h> bool ConnectionPool::parseJsonFile() { //获取配置文件 fstream ifs("../dbconf.json"); //读配置文件 Json::Reader rd; Json::Value root; rd.parse(ifs,root); //解析配置文件 if(root.isObject()) { m_ip = root["ip"].asString(); m_user = root["username"].asString(); m_passwd = root["password"].asString(); m_dbName = root["dbName"].asString(); m_port = root["port"].asInt(); m_minSize = root["minSize"].asInt(); m_maxSize = root["maxSize"].asInt(); m_timeout = root["timeout"].asInt(); m_maxIdleTime = root["maxTdleTime"].asInt(); return true; } return false; } void ConnectionPool::addConnect() { MysqlConn *conn = new MysqlConn; /* bool MysqlConn::connect(std::string ip, std::string user, std::string passwd, std::string bdName, unsigned int port) */ conn->connect(m_ip,m_user,m_passwd,m_dbName,m_port); conn->refreshAliveTime(); m_connectionQ.push(conn); } ConnectionPool::ConnectionPool() { //加载配置文件 if(!parseJsonFile()) { return; } //创建最少连接数 for(int i=0;i<m_minSize;++i) { addConnect(); } //创建子线程用于检测并创建新的连接 thread producer(&ConnectionPool::produceConnection,this); //销毁连接,检测并销毁连接 thread recycler(&ConnectionPool::recycleConnection,this); //设置线程分离 producer.detach(); recycler.detach(); } //子线程-->任务函数 void ConnectionPool::produceConnection() { //通过轮询的方式不断的去检测 while(true) { //操作共享资源,需要加锁 unique_lock<mutex> locker(m_mutexQ); //判断连接数是否达到容量,如果大于等于容量则需要阻塞一段时间 while (m_connectionQ.size() >= m_maxSize) { m_cond.wait(locker); } addConnect(); m_cond.notify_all(); //唤醒消费者 } } void ConnectionPool::recycleConnection() { while(true) { //休眠一定的时长 this_thread::sleep_for(chrono::milliseconds(500)); unique_lock<mutex> locker(m_mutexQ); //让线程池中最少保持用于 m_minSize个线程 while(m_connectionQ.size() > m_minSize) { MysqlConn *recyConn = m_connectionQ.front(); //如果超时则销毁 if(recyConn->getAliveTime() >= m_maxIdleTime) { m_connectionQ.pop(); delete recyConn; } else { break; } } } } ConnectionPool::~ConnectionPool() { while(!m_connectionQ.empty()) { MysqlConn *conn = m_connectionQ.front(); m_connectionQ.pop(); delete conn; } } //对外接口,获取线程池 ConnectionPool* ConnectionPool::getConnectPool() { static ConnectionPool pool; return &pool; } //获取线程池中的连接 shared_ptr<MysqlConn> ConnectionPool::getConnection() { //需要操作共享资源 unique_lock<mutex> locker(m_mutexQ); //判断连接池队列为空 while(m_connectionQ.empty()) { if(cv_status::timeout == m_cond.wait_for(locker,chrono::milliseconds(m_timeout))) { if(m_connectionQ.empty()) { continue; } } } //自定义shared_ptr析构方法,重新将连接放回到连接池中,而不是销毁 shared_ptr<MysqlConn> connptr(m_connectionQ.front(),[this](MysqlConn *conn){ unique_lock<mutex> locker(m_mutexQ); conn->refreshAliveTime(); m_connectionQ.push(conn); }); //弹出,放到了队尾 m_connectionQ.pop(); m_cond.notify_all(); return connptr; }