Python实现跨服务器数据库迁移

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,高可用系列 2核4GB
简介: Python实现跨服务器数据库迁移

注意:

  • 本文是基于Pyhon3.6的实现方式
  • 以每100000一查询一提交进行,可根据实际情况修改
  • 程序中,如传输某表时发现目标数据库中存在此表,认为该数据表是完整存在的,可优化
  • 数据库连接时注意编码方式,本文为 charset='latin1',可根据实际情况修改为utf8或其他方式
  • 如服务器中有多个数据库,通过修改cur_db和cur_local_db变量切换数据库
  • 本人使用后愚见,本程序迁移速度相比Navicat中的数据传输功能较慢一点,但Navicat中有时存在漏传(虽情况出现次数极少)

程序代码如下:

import pymysql
import warnings

warnings.filterwarnings("ignore")


class ConnectMysql(object):
    def __init__(self):
        #     这里设置分页查询, 每页查询多少数据
        self.page_size = 100000


    def getTable(self):
        #需迁移的数据库
        cur_db='****'
        #目标数据库
        cur_local_db='****'
        #需迁移服务器
        conn = pymysql.connect(
            host="***.***.***.***",
            user="*****",
            passwd="*****",
            db=cur_db,
            charset='latin1'
        )
        #目标服务器
        conn_local = pymysql.connect(
            host="***.***.***.***",
            user="*****",
            passwd="*****",
            db=cur_local_db,
            charset='latin1'
        )
        cur = conn.cursor()
        cur_local = conn_local.cursor()
        cur.execute('show tables')
        tables = cur.fetchall()

        for table in tables:
            # 需要迁移的数据库查询表的列数
            cur.execute(
                "SELECT COUNT(*) FROM information_schema.COLUMNS WHERE TABLE_SCHEMA='"+cur_db+"' AND table_name='" + table[
                    0] + "'")
            table_col_count = cur.fetchone()
            # print("需要迁移数据列数",table_col_count)
            # print table_col_count[0]
            # 需要迁移的数据库查询表的结构
            cur.execute('show create table ' + table[0])
            result = cur.fetchall()
            create_sql = result[0][1]
            # 查询需要迁移的数据库表的数据条数
            cur.execute('select count(*) from ' + table[0])
            total = cur.fetchone()
            # print("迁移数据条数",total)
            page = total[0] / self.page_size
            page1 = total[0] % self.page_size
            if page1 != 0:
                page = page + 1

            # 数据库创建表
            cur_local.execute(
                "SELECT table_name FROM information_schema.`TABLES` WHERE TABLE_SCHEMA='"+cur_local_db+"' AND table_name='" + str(
                    table[0]) + "'")
            table_name = cur_local.fetchone()
            if table_name is None:
                cur_local.execute(create_sql)
                page = int(page)
                for p in range(0, page):
                    while True:
                        try:
                            print('开始', table[0], '的第', p + 1, '页查询')
                            if p == 0:
                                limit_param = ' limit ' + str(p * self.page_size) + ',' + str(self.page_size)
                            else:
                                limit_param = ' limit ' + str(p * self.page_size + 1) + ',' + str(self.page_size)
                            cur.execute('select * from ' + table[0] + limit_param)
                            inserts = cur.fetchall()
                            #print(inserts)
                            # print('查询成功')
                            param = ''
                            for i in range(0, table_col_count[0]):
                                param = param + '%s,'
                                # print(param)
                            # print('开始插入')
                            cur_local.executemany('replace into ' + table[0] + ' values (' + param[0:-1] + ')', inserts)
                          
                            conn_local.commit()
                            break
                        except Exception as e:
                            print(e)
                            # time.sleep(1)
                            cur = conn.cursor()
                            cur_local = conn_local.cursor()
                    print(table[0], ' 插入完成')
                 
            else:
                print(str(table[0]),"已存在")

        cur_local.close()
        conn_local.close()
        cur.close()
        conn.close()


if __name__ == '__main__':
    conn_mysql = ConnectMysql()
    conn_mysql.getTable()

最后希望本文对大家有所帮助,也希望大家多多指教!

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
2天前
|
安全 Python
使用Python实现简单的Web服务器
使用Python实现简单的Web服务器
12 6
|
11天前
|
存储 Oracle 关系型数据库
Oracle同一台服务器创建多个数据库
【8月更文挑战第30天】在 Oracle 中,可在同一服务器上创建多个数据库。首先确保已安装 Oracle 软件并具有足够资源,然后使用 DBCA 工具按步骤创建,包括选择模板、配置存储及字符集等。重复此过程可创建多个数据库,需确保名称、SID 和存储位置唯一。创建后,可通过 Oracle Enterprise Manager 进行管理,注意服务器资源分配与规划。
26 10
|
9天前
|
SQL Java 数据库连接
数据库迁移不再难:Flyway 与 Liquibase 大比拼,哪个才是你的真命天子?
【9月更文挑战第3天】数据库迁移在软件开发中至关重要,尤其在使用 ORM 框架如 Hibernate 时。为确保部署时能顺利应用最新的数据库变更,开发者常使用自动化工具。Flyway 和 Liquibase 是当前流行的两种选择,均能有效管理数据库版本控制。Flyway 采用 SQL 脚本表示变更,简单易用;Liquibase 支持多种脚本格式,功能更强大,适合复杂项目。本文将对比这两种工具的特点,并通过示例展示各自的优缺点,帮助开发者根据项目需求做出合适的选择。
14 1
|
12天前
|
运维 监控 数据库
自动化运维:使用Python脚本实现服务器监控
【8月更文挑战第31天】在这篇文章中,我们将探索如何利用Python编写简单的脚本来实现对服务器的基本监控。通过学习和应用这些技术,你可以快速检测服务器的状态,包括CPU使用率、内存占用和磁盘空间等关键指标。这不仅有助于及时发现问题,还能提升运维效率。文章将逐步引导你理解监控的重要性,并展示如何从零开始构建自己的监控工具。
|
14天前
|
存储 前端开发 关系型数据库
秀啊,用Python快速开发在线数据库更新修改工具
秀啊,用Python快速开发在线数据库更新修改工具
|
14天前
|
前端开发 数据库 虚拟化
太6了!用Python快速开发数据库入库系统
太6了!用Python快速开发数据库入库系统
|
14天前
|
前端开发 数据库 Python
用Python轻松开发数据库取数下载工具
用Python轻松开发数据库取数下载工具
|
15天前
|
SQL API 数据库
原来Python自带了数据库,用起来真方便!
原来Python自带了数据库,用起来真方便!
|
15天前
|
缓存 NoSQL 数据库
Web服务器与数据库优化:提升系统性能的最佳实践
【8月更文第28天】在现代的Web应用中,Web服务器与后端数据库之间的交互是至关重要的部分。优化这些组件及其相互作用可以显著提高系统的响应速度、吞吐量和可扩展性。本文将探讨几种常见的优化策略,并提供一些具体的代码示例。
30 1
|
11天前
|
存储 运维 监控
自动化运维:使用Python脚本进行服务器监控
【8月更文挑战第31天】在数字化时代,服务器的稳定运行对于企业至关重要。本文将介绍如何使用Python编写一个简单的服务器监控脚本,帮助运维人员及时发现并解决潜在问题。我们将从基础的服务器资源监控开始,逐步深入到日志分析与报警机制的实现。通过实际代码示例和操作步骤,使读者能够快速掌握自动化监控的技能,提升工作效率。

热门文章

最新文章