MySQL MGR集群单主模式的自动搭建和自动化故障修复

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS MySQL,高可用系列 2核4GB
简介: MySQL MGR集群单主模式的自动搭建和自动化故障修复/*the waiting game:尽管人生如此艰难,不要放弃;不要妥协;不要失去希望*/随着MySQL MGR的版本的升级以及技术成熟,在把MHA拉下神坛之后, MGR越来越成为MySQL高可用的首选方案。

MySQL MGR集群单主模式的自动搭建和自动化故障修复
/*
the waiting game:尽管人生如此艰难,不要放弃;不要妥协;不要失去希望
*/

随着MySQL MGR的版本的升级以及技术成熟,在把MHA拉下神坛之后, MGR越来越成为MySQL高可用的首选方案。
MGR的搭建并不算很复杂,但是有一系列手工操作步骤,为了简便MGR的搭建和故障诊断,这里完成了一个自动化的脚本,来实现MGR的自动化搭建,自动化故障诊断以及修复。

MGR自动化搭建
为了简便起见,这里以单机多实例的模式进行测试,
先装好三个MySQL实例,端口号分别是7001,7002,7003,其中7001作为写节点,其余两个节点作为读节,8000节点是笔者的另外一个测试节点,请忽略。
在指明主从节点的情况下,如下为mgr_tool.py一键搭建MGR集群的测试demo

MGR故障模拟1

MGR节点故障自动监测和自愈实现,如下是搭建完成后的MGR集群,目前集群处于完全正常的状态中。

主观造成主从节点间binlog的丢失

在主节点上对于对于从节点丢失的数据操作,GTID无法找到对应的数据,组复制立马熄火

非写入节点出现错误

看下errorlog

如果是手动解决的话,还是GTID跳过错误事物的套路,master上的GTID信息

尝试跳过最新的一个事物ID,然后重新连接到组,可以正常连接到组,另外一个节点仍旧处于error状态

stop group_replication;
SET GTID_NEXT='6c81c118-e67c-4416-9cb0-2d573d178c1d:13';
BEGIN; COMMIT;
set gtid_next='automatic';

另外一个节点类似,依次解决。

MGR故障模拟2

从节点脱离Group

这种情况倒是比较简单,重新开始组复制即可,start group_replication

MGR故障自动检测和修复

对于如上的两种情况,
1,如果是从节点丢失主节点的事物,尝试在从节点上跳过GTID,重新开始复制即可
2,如果是从节点非丢失主节点事物,尝试在从节点重新开始组复制即可

实现代码如下

复制代码
def auto_fix_mgr_error(conn_master_dict,conn_slave_dict):

group_replication_status = get_group_replication_status(conn_slave_dict)
if(group_replication_status[0]["MEMBER_STATE"]=="ERROR" or group_replication_status[0]["MEMBER_STATE"] == "OFFLINE"):
    print(conn_slave_dict["host"]+str(conn_slave_dict["port"])+'------>'+group_replication_status[0]["MEMBER_STATE"])
    print("auto fixing......")
    while 1 > 0:
        master_gtid_list = get_gtid(conn_master_dict)
        slave_gtid_list = get_gtid(conn_slave_dict)
        master_executed_gtid_value = int((master_gtid_list[-1]["Executed_Gtid_Set"]).split("-")[-1])
        slave_executed_gtid_value = int(slave_gtid_list[-1]["Executed_Gtid_Set"].split("-")[-1])
        slave_executed_gtid_prefix = slave_gtid_list[-1]["Executed_Gtid_Set"].split(":")[0]
        slave_executed_skiped_gtid = slave_executed_gtid_value + 1
        if (master_executed_gtid_value > slave_executed_gtid_value):
            print("skip gtid and restart group replication,skiped gtid is "
                  + slave_gtid_list[-1]["Executed_Gtid_Set"].split(":")[-1].split("-")[0]
                  + ":"+str(slave_executed_skiped_gtid))
            slave_executed_skiped_gtid = slave_executed_gtid_prefix+":"+str(slave_executed_skiped_gtid)
            skip_gtid_on_slave(conn_slave_dict,slave_executed_skiped_gtid)
            time.sleep(10)
            start_group_replication(conn_slave_dict)
            if(get_group_replication_status(conn_slave_dict)[0]["MEMBER_STATE"]=="ONLINE"):
                print("mgr cluster fixed,back to normal")
                break
        else:
            start_group_replication(conn_slave_dict)
            if(get_group_replication_status(conn_slave_dict)[0]["MEMBER_STATE"]=="ONLINE"):
                print("mgr cluster fixed,back to normal")
            break
elif (group_replication_status[0]['MEMBER_STATE'] == 'ONLINE'):
    print("mgr cluster is normal,nothing to do")
    check_replication_group_members(conn_slave_dict)

复制代码

对于故障类型1,GTID事物不一致的自动化修复

对于故障类型2从节点offline的自动化修复

完整的实现代码

该过程要求MySQL实例必须满足MGR的基本条件,如果环境本身无法满足MGR,一切都无从谈起,因此要非常清楚MGR环境的最基本要求

完成的实现代码如下,花了一个下午写的,目前来说存在以下不足
1,创建复制用户的时候,没有指定具体的slave机器,目前直接指定的%:create user repl@'%' identified by repl
2,对于slave的修复,目前无法整体修复,只能一台一台修复,其实就是少了一个循环slave机器判断的过程
3,目前搭建之前都会reset master(不管主从,主要是清理可能的残留GTID),因此只适合新环境的搭建
4,目前只支持offline和gtid事物冲突的错误类型修复,无法支持其他MGR错误类型的修复
5,开发环境是单机多实例模式测试,没有在多机单实例模式下充分测试
以上都会逐步改善&加强。

复制代码

-- coding: utf-8 --

import pymysql
import logging
import time
import decimal

def execute_query(conn_dict,sql):

conn = pymysql.connect(host=conn_dict['host'],
                       port=conn_dict['port'],
                       user=conn_dict['user'],
                       passwd=conn_dict['password'],
                       db=conn_dict['db'])
cursor = conn.cursor(pymysql.cursors.DictCursor)
cursor.execute(sql)
list = cursor.fetchall()
cursor.close()
conn.close()
return list

def execute_noquery(conn_dict,sql):

conn = pymysql.connect(host=conn_dict['host'],
                       port=conn_dict['port'],
                       user=conn_dict['user'],
                       passwd=conn_dict['password'],
                       db=conn_dict['db'])
cursor = conn.cursor()
cursor.execute(sql)
conn.commit()
cursor.close()
conn.close()
return list

def get_gtid(conn_dict):

sql = "show master status;"
list = execute_query(conn_dict,sql)
return list

def skip_gtid_on_slave(conn_dict,gtid):

sql_1 = 'stop group_replication;'
sql_2 = '''set gtid_next='{0}';'''.format(gtid)
sql_3 = 'begin;'
sql_4 = 'commit;'
sql_5 = '''set gtid_next='automatic';'''

try:
    execute_noquery(conn_dict, sql_1)
    execute_noquery(conn_dict, sql_2)
    execute_noquery(conn_dict, sql_3)
    execute_noquery(conn_dict, sql_4)
    execute_noquery(conn_dict, sql_5)
except:
    raise

def get_group_replication_status(conn_dict):

sql = '''select MEMBER_STATE from performance_schema.replication_group_members 
        where (MEMBER_HOST = '{0}' or ifnull(MEMBER_HOST,'') = '')   
        AND (MEMBER_PORT={1} or ifnull(MEMBER_PORT,'') ='') ; '''.format(conn_dict["host"], conn_dict["port"])
result = execute_query(conn_dict,sql)
if result:
    return result
else:
    return None

def check_replication_group_members(conn_dict):

print('-------------------------------------------------------')
result = execute_query(conn_dict, " select * from performance_schema.replication_group_members; ")
if result:
    column = result[0].keys()
    current_row = ''
    for key in column:
        current_row += str(key) + "    "
    print(current_row)

    for row in result:
        current_row = ''
        for key in row.values():
            current_row += str(key) + "    "
        print(current_row)
print('-------------------------------------------------------')

def auto_fix_mgr_error(conn_master_dict,conn_slave_dict):

group_replication_status = get_group_replication_status(conn_slave_dict)
if(group_replication_status[0]["MEMBER_STATE"]=="ERROR" or group_replication_status[0]["MEMBER_STATE"] == "OFFLINE"):
    print(conn_slave_dict["host"]+str(conn_slave_dict["port"])+'------>'+group_replication_status[0]["MEMBER_STATE"])
    print("auto fixing......")
    while 1 > 0:
        master_gtid_list = get_gtid(conn_master_dict)
        slave_gtid_list = get_gtid(conn_slave_dict)
        master_executed_gtid_value = int((master_gtid_list[-1]["Executed_Gtid_Set"]).split("-")[-1])
        slave_executed_gtid_value = int(slave_gtid_list[-1]["Executed_Gtid_Set"].split("-")[-1])
        slave_executed_gtid_prefix = slave_gtid_list[-1]["Executed_Gtid_Set"].split(":")[0]
        slave_executed_skiped_gtid = slave_executed_gtid_value + 1
        if (master_executed_gtid_value > slave_executed_gtid_value):
            print("skip gtid and restart group replication,skiped gtid is "
                  + slave_gtid_list[-1]["Executed_Gtid_Set"].split(":")[-1].split("-")[0]
                  + ":"+str(slave_executed_skiped_gtid))
            slave_executed_skiped_gtid = slave_executed_gtid_prefix+":"+str(slave_executed_skiped_gtid)
            skip_gtid_on_slave(conn_slave_dict,slave_executed_skiped_gtid)
            time.sleep(10)
            start_group_replication(conn_slave_dict)
            if(get_group_replication_status(conn_slave_dict)[0]["MEMBER_STATE"]=="ONLINE"):
                print("mgr cluster fixed,back to normal")
                break
        else:
            start_group_replication(conn_slave_dict)
            if(get_group_replication_status(conn_slave_dict)[0]["MEMBER_STATE"]=="ONLINE"):
                print("mgr cluster fixed,back to normal")
            break
elif (group_replication_status[0]['MEMBER_STATE'] == 'ONLINE'):
    print("mgr cluster is normal,nothing to do")
    check_replication_group_members(conn_slave_dict)

'''
reset master
'''
def reset_master(conn_dict):

try:
    execute_noquery(conn_dict, "reset master;")
except:
    raise

def install_group_replication_plugin(conn_dict):

get_plugin_sql = "SELECT name,dl FROM mysql.plugin WHERE name = 'group_replication';"
install_plugin_sql = '''install plugin group_replication soname 'group_replication.so'; '''
try:
    result = execute_query(conn_dict, get_plugin_sql)
    if not result:
        execute_noquery(conn_dict, install_plugin_sql)
except:
    raise

def create_mgr_repl_user(conn_master_dict,user,password):

try:
    reset_master(conn_master_dict)
    sql_exists_user = '''select user from mysql.user where user = '{0}'; '''.format(user)
    user_list = execute_query(conn_master_dict,sql_exists_user)
    if not user_list:
        create_user_sql = '''create user {0}@'%' identified by '{1}'; '''.format(user,password)
        grant_privilege_sql = '''grant replication slave on *.* to {0}@'%';'''.format(user)
        execute_noquery(conn_master_dict,create_user_sql)
        execute_noquery(conn_master_dict, grant_privilege_sql)
        execute_noquery(conn_master_dict, "flush privileges;")
except:
    raise

def set_super_read_only_off(conn_dict):

super_read_only_off = '''set global super_read_only = 0;'''
execute_noquery(conn_dict, super_read_only_off)

def open_group_replication_bootstrap_group(conn_dict):

sql = '''select variable_name,variable_value from performance_schema.global_variables where variable_name = 'group_replication_bootstrap_group';'''
result = execute_query(conn_dict, sql)
open_bootstrap_group_sql = '''set @@global.group_replication_bootstrap_group=on;'''
if result and result[0]['variable_value']=="OFF":
    execute_noquery(conn_dict, open_bootstrap_group_sql)

def close_group_replication_bootstrap_group(conn_dict):

sql = '''select variable_name,variable_value from performance_schema.global_variables where variable_name = 'group_replication_bootstrap_group';'''
result = execute_query(conn_dict, sql)
close_bootstrap_group_sql = '''set @@global.group_replication_bootstrap_group=off;'''
if result and result[0]['variable_value'] == "ON":
    execute_noquery(conn_dict, close_bootstrap_group_sql)

def start_group_replication(conn_dict):

start_group_replication = '''start group_replication;'''
group_replication_status = get_group_replication_status(conn_dict)
if not (group_replication_status[0]['MEMBER_STATE'] == 'ONLINE'):
    execute_noquery(conn_dict, start_group_replication)

def connect_to_group(conn_dict,repl_user,repl_password):

connect_to_group_sql = '''change master to
                                master_user='{0}',
                                master_password='{1}'
                                for channel 'group_replication_recovery'; '''.format(repl_user,repl_password)
try:
    execute_noquery(conn_dict, connect_to_group_sql)
except:
    raise

def start_mgr_on_master(conn_master_dict,repl_user,repl_password):

try:
    set_super_read_only_off(conn_master_dict)
    reset_master(conn_master_dict)
    create_mgr_repl_user(conn_master_dict,repl_user,repl_password)
    connect_to_group(conn_master_dict,repl_user,repl_password)

    open_group_replication_bootstrap_group(conn_master_dict)
    start_group_replication(conn_master_dict)
    close_group_replication_bootstrap_group(conn_master_dict)

    group_replication_status = get_group_replication_status(conn_master_dict)
    if (group_replication_status[0]['MEMBER_STATE'] == 'ONLINE'):
        print("master added in mgr and run successfully")
        return True
except:
    raise
    print("############start master mgr error################")
    exit(1)

def start_mgr_on_slave(conn_slave_dict,repl_user,repl_password):

try:
    set_super_read_only_off(conn_slave_dict)
    reset_master(conn_slave_dict)
    connect_to_group(conn_slave_dict,repl_user,repl_password)
    start_group_replication(conn_slave_dict)
    # wait for 10
    time.sleep(10)
    # then check mgr status
    group_replication_status = get_group_replication_status(conn_slave_dict)
    if (group_replication_status[0]['MEMBER_STATE'] == 'ONLINE'):
        print("slave added in mgr and run successfully")
    if (group_replication_status[0]['MEMBER_STATE'] == 'RECOVERING'):
        print("slave is recovering")
except:
    print("############start slave mgr error################")
    exit(1)

def auto_mgr(conn_master,conn_slave_1,conn_slave_2,repl_user,repl_password):

install_group_replication_plugin(conn_master)
master_replication_status = get_group_replication_status(conn_master)

if not (master_replication_status[0]['MEMBER_STATE'] == 'ONLINE'):
    start_mgr_on_master(conn_master,repl_user,repl_password)

slave1_replication_status = get_group_replication_status(conn_slave_1)
if not (slave1_replication_status[0]['MEMBER_STATE'] == 'ONLINE'):
    install_group_replication_plugin(conn_slave_1)
    start_mgr_on_slave(conn_slave_1, repl_user, repl_user)

slave2_replication_status = get_group_replication_status(conn_slave_2)
if not (slave2_replication_status[0]['MEMBER_STATE'] == 'ONLINE'):
    install_group_replication_plugin(conn_slave_2)
    start_mgr_on_slave(conn_slave_2, repl_user, repl_user)

check_replication_group_members(conn_master)

if name == '__main__':

conn_master  = {'host': '127.0.0.1', 'port': 7001, 'user': 'root', 'password': 'root', 'db': 'mysql', 'charset': 'utf8mb4'}
conn_slave_1 = {'host': '127.0.0.1', 'port': 7002, 'user': 'root', 'password': 'root', 'db': 'mysql', 'charset': 'utf8mb4'}
conn_slave_2 = {'host': '127.0.0.1', 'port': 7003, 'user': 'root', 'password': 'root', 'db': 'mysql', 'charset': 'utf8mb4'}
repl_user = "repl"
repl_password = "repl"
#auto_mgr(conn_master,conn_slave_1,conn_slave_2,repl_user,repl_password)

auto_fix_mgr_error(conn_master,conn_slave_1)
check_replication_group_members(conn_master)

复制代码
原文地址https://www.cnblogs.com/wy123/p/11391292.html

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
16天前
|
机器学习/深度学习 数据采集 运维
智能化运维:机器学习在故障预测和自动化响应中的应用
智能化运维:机器学习在故障预测和自动化响应中的应用
41 4
|
1月前
|
Java 关系型数据库 MySQL
自动化测试项目实战笔记(一):JDK、Tomcat、MySQL、Jpress环境安装和搭建
这篇文章是关于自动化测试项目实战笔记,涵盖了JDK、Tomcat、MySQL、Jpress环境的安装和搭建过程,以及测试用例和常见问题总结。
49 1
自动化测试项目实战笔记(一):JDK、Tomcat、MySQL、Jpress环境安装和搭建
|
1月前
|
机器学习/深度学习 数据采集 运维
智能化运维:机器学习在故障预测和自动化响应中的应用
【10月更文挑战第1天】智能化运维:机器学习在故障预测和自动化响应中的应用
67 3
|
2月前
|
存储 弹性计算 运维
自动化监控和响应ECS系统事件
阿里云提供的ECS系统事件用于记录云资源信息,如实例启停、到期通知等。为实现自动化运维,如故障处理与动态调度,可使用云助手插件`ecs-tool-event`。该插件定时获取并转化ECS事件为日志存储,便于监控与响应,无需额外开发,适用于大规模集群管理。详情及示例可见链接文档。
|
3月前
|
SQL 关系型数据库 MySQL
MySQL的match WITH QUERY EXPANSION 模式是什么?如何使用?
【8月更文挑战第29天】MySQL的match WITH QUERY EXPANSION 模式是什么?如何使用?
61 4
|
3月前
|
机器学习/深度学习 运维 监控
智能化运维:机器学习在故障预测和自动化响应中的应用
【8月更文挑战第2天】 本文探讨了将机器学习技术应用于IT运维领域,特别是在故障预测和自动化响应方面的潜力与挑战。通过分析机器学习如何优化传统运维流程,我们揭示了数据驱动的决策制定对提升系统稳定性和效率的影响。文章进一步讨论了实施机器学习模型时可能遇到的技术和非技术性问题,并提出了相应的解决策略。最后,我们反思了这一转变对IT专业人员技能要求的影响,以及如何在不断变化的技术环境中维持竞争力。
71 4
|
3月前
|
机器学习/深度学习 数据采集 运维
预见未来:机器学习引领运维革命——故障预测与自动化响应的新篇章
【8月更文挑战第2天】智能化运维:机器学习在故障预测和自动化响应中的应用
59 1
|
3月前
|
关系型数据库 MySQL 机器人
【MySQL】两个脚本自动化搞定 MySQL 备份恢复--XtraBackup
【MySQL】两个脚本自动化搞定 MySQL 备份恢复--XtraBackup
|
4月前
|
机器学习/深度学习 运维 监控
智能化运维:机器学习在故障预测和自动化修复中的应用
随着信息技术的迅猛发展,企业对运维工作的效率和准确性要求越来越高。传统的运维模式已难以应对日益复杂的系统环境和数据量。本文将探讨如何利用机器学习技术提升运维工作的智能化水平,实现故障的早期预测和自动化修复,从而减少系统停机时间,提高企业运营效率。通过分析机器学习在运维领域的应用实例,揭示其在实际工作中的有效性和潜力。
73 0
|
4月前
|
运维 中间件 PHP
深入理解PHP中的中间件模式自动化运维之脚本编程实践##
【7月更文挑战第31天】在PHP开发中,中间件模式是一种强大的设计模式,它允许开发者在请求处理流程中注入自定义的处理逻辑。本文将通过实际代码示例来探讨如何在PHP项目中实现和使用中间件,以及这种模式如何提升应用程序的可维护性和扩展性。 【7月更文挑战第31天】 在现代IT运维管理中,自动化不再是可选项,而是提高生产效率、确保服务质量的必需品。本文将通过Python脚本编程的角度,探讨如何利用代码简化日常运维任务,提升工作效率。我们将从实际案例出发,逐步剖析自动化脚本的设计思路、实现过程及其带来的益处。 ##
29 0

热门文章

最新文章