Python实现跨服务器数据库迁移

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群版 2核4GB 100GB
推荐场景:
搭建个人博客
云数据库 RDS MySQL,高可用版 2核4GB 50GB
简介: Python实现跨服务器数据库迁移

注意:

  • 本文是基于Pyhon3.6的实现方式
  • 以每100000一查询一提交进行,可根据实际情况修改
  • 程序中,如传输某表时发现目标数据库中存在此表,认为该数据表是完整存在的,可优化
  • 数据库连接时注意编码方式,本文为 charset='latin1',可根据实际情况修改为utf8或其他方式
  • 如服务器中有多个数据库,通过修改cur_db和cur_local_db变量切换数据库
  • 本人使用后愚见,本程序迁移速度相比Navicat中的数据传输功能较慢一点,但Navicat中有时存在漏传(虽情况出现次数极少)

程序代码如下:

import pymysql
import warnings

warnings.filterwarnings("ignore")


class ConnectMysql(object):
    def __init__(self):
        #     这里设置分页查询, 每页查询多少数据
        self.page_size = 100000


    def getTable(self):
        #需迁移的数据库
        cur_db='****'
        #目标数据库
        cur_local_db='****'
        #需迁移服务器
        conn = pymysql.connect(
            host="***.***.***.***",
            user="*****",
            passwd="*****",
            db=cur_db,
            charset='latin1'
        )
        #目标服务器
        conn_local = pymysql.connect(
            host="***.***.***.***",
            user="*****",
            passwd="*****",
            db=cur_local_db,
            charset='latin1'
        )
        cur = conn.cursor()
        cur_local = conn_local.cursor()
        cur.execute('show tables')
        tables = cur.fetchall()

        for table in tables:
            # 需要迁移的数据库查询表的列数
            cur.execute(
                "SELECT COUNT(*) FROM information_schema.COLUMNS WHERE TABLE_SCHEMA='"+cur_db+"' AND table_name='" + table[
                    0] + "'")
            table_col_count = cur.fetchone()
            # print("需要迁移数据列数",table_col_count)
            # print table_col_count[0]
            # 需要迁移的数据库查询表的结构
            cur.execute('show create table ' + table[0])
            result = cur.fetchall()
            create_sql = result[0][1]
            # 查询需要迁移的数据库表的数据条数
            cur.execute('select count(*) from ' + table[0])
            total = cur.fetchone()
            # print("迁移数据条数",total)
            page = total[0] / self.page_size
            page1 = total[0] % self.page_size
            if page1 != 0:
                page = page + 1

            # 数据库创建表
            cur_local.execute(
                "SELECT table_name FROM information_schema.`TABLES` WHERE TABLE_SCHEMA='"+cur_local_db+"' AND table_name='" + str(
                    table[0]) + "'")
            table_name = cur_local.fetchone()
            if table_name is None:
                cur_local.execute(create_sql)
                page = int(page)
                for p in range(0, page):
                    while True:
                        try:
                            print('开始', table[0], '的第', p + 1, '页查询')
                            if p == 0:
                                limit_param = ' limit ' + str(p * self.page_size) + ',' + str(self.page_size)
                            else:
                                limit_param = ' limit ' + str(p * self.page_size + 1) + ',' + str(self.page_size)
                            cur.execute('select * from ' + table[0] + limit_param)
                            inserts = cur.fetchall()
                            #print(inserts)
                            # print('查询成功')
                            param = ''
                            for i in range(0, table_col_count[0]):
                                param = param + '%s,'
                                # print(param)
                            # print('开始插入')
                            cur_local.executemany('replace into ' + table[0] + ' values (' + param[0:-1] + ')', inserts)
                          
                            conn_local.commit()
                            break
                        except Exception as e:
                            print(e)
                            # time.sleep(1)
                            cur = conn.cursor()
                            cur_local = conn_local.cursor()
                    print(table[0], ' 插入完成')
                 
            else:
                print(str(table[0]),"已存在")

        cur_local.close()
        conn_local.close()
        cur.close()
        conn.close()


if __name__ == '__main__':
    conn_mysql = ConnectMysql()
    conn_mysql.getTable()

最后希望本文对大家有所帮助,也希望大家多多指教!

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
3天前
|
SQL NoSQL 数据库
在Python中使用sqlalchemy来操作数据库的几个小总结
在探索使用 FastAPI, SQLAlchemy, Pydantic,Redis, JWT 构建的项目的时候,其中数据库访问采用SQLAlchemy,并采用异步方式。数据库操作和控制器操作,采用基类继承的方式减少重复代码,提高代码复用性。在这个过程中设计接口和测试的时候,对一些问题进行跟踪解决,并记录供参考。
|
3天前
|
网络协议 安全 Unix
6! 用Python脚本演示TCP 服务器与客户端通信过程!
6! 用Python脚本演示TCP 服务器与客户端通信过程!
|
11天前
|
SQL 关系型数据库 数据库
【python】python社交交友平台系统设计与实现(源码+数据库)【独一无二】
【python】python社交交友平台系统设计与实现(源码+数据库)【独一无二】
43 10
|
5天前
|
JSON NoSQL Ubuntu
在Ubuntu 14.04上如何备份、恢复和迁移MongoDB数据库
在Ubuntu 14.04上如何备份、恢复和迁移MongoDB数据库
15 1
|
6天前
|
弹性计算 JSON 开发工具
"一键玩转阿里云ECS!Python大神揭秘:如何自动化创建镜像并跨地域复制,让你的云资源部署秒变高效达人!"
【8月更文挑战第14天】本文介绍如何使用Python与阿里云SDK自动化管理ECS镜像,包括创建镜像及跨地域复制,以优化云资源部署。首先安装`aliyun-python-sdk-ecs`并配置阿里云凭证。接着,通过Python脚本实现镜像创建与复制功能,简化日常运维工作并增强灾难恢复能力。注意权限及费用问题。
19 2
|
6天前
|
关系型数据库 数据库 数据安全/隐私保护
"告别繁琐!Python大神揭秘:如何一键定制阿里云RDS备份策略,让数据安全与效率并肩飞,轻松玩转云端数据库!"
【8月更文挑战第14天】在云计算时代,数据库安全至关重要。阿里云RDS提供自动备份,但标准策略难以适应所有场景。传统手动备份灵活性差、管理成本高且恢复效率低。本文对比手动备份,介绍使用Python自定义阿里云RDS备份策略的方法,实现动态调整备份频率、集中管理和智能决策,提升备份效率与数据安全性。示例代码演示如何创建自动备份任务。通过自动化与智能化备份管理,支持企业数字化转型。
17 2
|
11天前
|
SQL 关系型数据库 MySQL
Python系列:教你使用PyMySQL操作MySQL数据库
Python系列:教你使用PyMySQL操作MySQL数据库
21 8
|
11天前
|
SQL 关系型数据库 MySQL
【python】python学生信息管理系统 ——数据库版(源码)【独一无二】
【python】python学生信息管理系统 ——数据库版(源码)【独一无二】
|
5天前
|
存储 开发框架 .NET
ASP.NET Web Api 使用 EF 6,DateTime 字段如何取数据库服务器当前时间
ASP.NET Web Api 使用 EF 6,DateTime 字段如何取数据库服务器当前时间
|
5天前
|
SQL Ubuntu 关系型数据库
如何在云服务器上创建和管理 MySQL 和 MariaDB 数据库
如何在云服务器上创建和管理 MySQL 和 MariaDB 数据库
8 0