MySQL上云同步脚本-Python3

本文涉及的产品
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介: 最近在做本地同步至云的脚本 使用kettle的话,几百个表的同步要一一设置,实在是蛋疼的紧 还好python可以解决一部分问题,所以写了个转换

最近在做本地同步至云的脚本
使用kettle的话,几百个表的同步要一一设置,实在是蛋疼的紧
还好python可以解决一部分问题,所以写了个转换
由于5.6版本的mysql对null的处理不是很好,因此全部转为varchar型
blob和longblob,text都单独处理

#-*- coding: UTF-8 -*-
#获取对比列表
#表结构同步到云上mysql
#实现功能:oracle-mysql列转换/写入脚本/传输脚本
#待实现功能:执行脚本
#james.peng 20170905

import os 
os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8'
import cx_Oracle
import pymysql
import datetime
import time

Start_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
date_nyr = time.strftime('%Y%m%d', time.localtime(time.time()))
#存放结果的txt
os_dir=os.chdir('/a/e/p/y')
remote_loc='/a/e/p/y/'+date_nyr+'/'
try:
    os.mkdir(date_nyr)
    linux_shell='chmod 777 '+date_nyr
    os.popen(linux_shell)
except:
    print('folder_existed!')
os_dir='/a/e/p/y/'+date_nyr
os.chdir(os_dir)

try:
    f=open(os_dir+'/create_y_script.txt','w')
    f.truncate()
    f.write("use "+Mysql_schema+";\n")
except:
    print('exception!')
print(os.getcwd())

#存放结果的list
create_yrs_table_list=[]

#parameters--参数设置
jump_server_host=
jump_server_destination=

Mysql_yrs_ip_port=
Mysql_yrs_username=
Mysql_yrs_passwd=
Mysql_yrs_db=
Mysql_yrs_port=
Mysql_yrs_schema=

Mysql_ip_port=
Mysql_username=
Mysql_passwd=
Mysql_db=
Mysql_port=
Mysql_schema=


#connection strings--连接信息
mysql_yrs_info = pymysql.connect(Mysql_yrs_ip_port,Mysql_yrs_username,Mysql_yrs_passwd,Mysql_yrs_db,Mysql_yrs_port,charset='utf8')
mysql_yrs_cursor = mysql_yrs_info.cursor()
mysql_info = pymysql.connect(Mysql_ip_port,Mysql_username,Mysql_passwd,Mysql_db,Mysql_port)
mysql_cursor = mysql_info.cursor()


#开始连接,
#<<<<步骤一>>>>先获取要同步的表,按表名顺序排列
mysql_yrs_cursor.execute("SET group_concat_max_len=10000;")
Mysql_yrs_table_name_sql="select table_name from information_schema.tables where "
Mysql_yrs_table_name_sql+="TABLE_SCHEMA='"+Mysql_yrs_schema+"' order by table_name asc ;"
mysql_yrs_cursor.execute(Mysql_yrs_table_name_sql)
Mysql_yrs_table_name = mysql_yrs_cursor.fetchall()
#print(Mysql_yrs_table_name)

for i_sync_list in Mysql_yrs_table_name:
    i_sync_list=str(i_sync_list).replace('(','').replace(',)','').replace('\'','')
    print(i_sync_list)
    drop_table_sql="drop table  "+Mysql_schema+"."+i_sync_list+';'
    """
    construct_table_sql="select  concat('create table test320.account_credit(',"
    construct_table_sql+=" group_concat(concat(column_name,' varchar(' ,"
    construct_table_sql+=" case when data_type in ('int','decimal','bigint','tinyint','double','bit') then numeric_precision*3"
    construct_table_sql+=" when data_type in ('varchar','char') then round(character_maximum_length*1.2) "
    construct_table_sql+=" when data_type in ('date','datetime','datetime','timestamp') then '50' "
    construct_table_sql+=" else data_type end ,')'))) from information_schema.columns "
    construct_table_sql+=" where TABLE_SCHEMA='test320'and table_name='account_credit'"
    """
    construct_table_sql="select  concat('create table "+Mysql_schema+"."+i_sync_list+"(',"
    construct_table_sql+=" group_concat(concat(column_name,' varchar(' ,"
    construct_table_sql+=" case when data_type in ('int','decimal','bigint','tinyint','double','bit') then round(numeric_precision*1.2)"
    construct_table_sql+=" when data_type in ('varchar','char') then round(character_maximum_length*1.2) "
    construct_table_sql+=" when data_type in ('date','datetime','datetime','timestamp') then '50' "
    construct_table_sql+=" else data_type end ,')'))) from information_schema.columns "
    construct_table_sql+=" where TABLE_SCHEMA='"+Mysql_yrs_schema+"'and table_name='"+i_sync_list+"'"    
    
    #print("构建")
    #print(construct_table_sql)
    #print("构建完成")
    mysql_yrs_cursor.execute(construct_table_sql)
    construct_table_sql_rslt=str(mysql_yrs_cursor.fetchall()).replace("(('","").replace("'))",");")
    construct_table_sql_rslt=construct_table_sql_rslt.replace("varchar(longtext)","longtext").replace("varchar(blob)","blob")
    construct_table_sql_rslt=construct_table_sql_rslt.replace("varchar(text)","text").replace("varchar(longblob)","longblob")
    #print("行:\n")
    #print(drop_table_sql)
    #print("行:\n")
    #print(construct_table_sql_rslt)
    f.write(drop_table_sql)
    f.write('\n')
    f.write(construct_table_sql_rslt)
    f.write('\n')
    #print(mysql_create_sql_build)
f.close()




linux_shell="/usr/bin/scp -P 20 create_yrs_script.txt rds@aliyunsql:/h/r/a/i/y/"
linux_shell_result=os.popen(linux_shell).read()
print(linux_shell)
print(linux_shell_result)


mysql_yrs_cursor.close()
mysql_cursor.close()
End_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
print('开始时间', Start_time)
print('完成时间', End_time)
相关实践学习
基于CentOS快速搭建LAMP环境
本教程介绍如何搭建LAMP环境,其中LAMP分别代表Linux、Apache、MySQL和PHP。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
21天前
|
Linux Shell Python
Linux执行Python脚本
Linux执行Python脚本
26 1
|
22天前
|
缓存 NoSQL 关系型数据库
在Python Web开发过程中:数据库与缓存,MySQL和NoSQL数据库的主要差异是什么?
MySQL是关系型DB,依赖预定义的表格结构,适合结构化数据和复杂查询,但扩展性有限。NoSQL提供灵活的非结构化数据存储(如JSON),无统一查询语言,但能横向扩展,适用于大规模、高并发场景。选择取决于应用需求和扩展策略。
112 1
|
23天前
|
关系型数据库 MySQL
elasticsearch对比mysql以及使用工具同步mysql数据全量增量
elasticsearch对比mysql以及使用工具同步mysql数据全量增量
20 0
|
30天前
|
XML 关系型数据库 MySQL
python将word(doc或docx)的内容导入mysql数据库
用python先把doc文件转换成docx文件(这一步也可以不要后续会说明),然后读取docx的文件并另存为htm格式的文件(上一步可以直接把doc文件另存为htm),python根据bs4获取p标签里的内容,如果段落中有图片则保存图片。(图片在word文档中的位置可以很好的还原到生成的数据库内容) 我见网上有把docx压缩后解压获取图片的,然后根据在根据xml来读取图片的位置,我觉得比较繁琐。用docx模块读取段落的时候还需要是不是判断段落中有分页等,然而转成htm之后就不用判断那么多直接判断段落里的样式或者图片等就可以了。
21 1
|
1月前
|
关系型数据库 MySQL API
Flink CDC产品常见问题之mysql整库同步到starrock时任务挂掉如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
1月前
|
SQL DataWorks 关系型数据库
DataWorks常见问题之dataworks同步Rds任务失败如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
|
1月前
|
SQL 关系型数据库 MySQL
Python怎么操作Mysql数据库
Python怎么操作Mysql数据库
49 0
|
30天前
|
SQL 关系型数据库 MySQL
python在mysql中插入或者更新null空值
这段代码是Python操作MySQL数据库的示例。它执行SQL查询从表`a_kuakao_school`中选取`id`,`university_id`和`grade`,当`university_id`大于0时按升序排列。然后遍历结果,根据`row[4]`的值决定`grade`是否为`NULL`。若不为空,`grade`被格式化为字符串;否则,设为`NULL`。接着构造UPDATE语句更新`university`表中对应`id`的`grade`值,并提交事务。重要的是,字符串`NULL`不应加引号,否则更新会失败。
19 2
|
11天前
|
JSON 测试技术 持续交付
自动化测试与脚本编写:Python实践指南
【4月更文挑战第9天】本文探讨了Python在自动化测试中的应用,强调其作为热门选择的原因。Python拥有丰富的测试框架(如unittest、pytest、nose)以支持自动化测试,简化测试用例的编写与维护。示例展示了使用unittest进行单元测试的基本步骤。此外,Python还适用于集成测试、系统测试等,提供模拟外部系统行为的工具。在脚本编写实践中,Python的灵活语法和强大库(如os、shutil、sqlite3、json)助力执行复杂测试任务。同时,Python支持并发、分布式执行及与Jenkins、Travis CI等持续集成工具的集成,提升测试效率和质量。
|
17天前
|
存储 监控 异构计算
【Python】GPU内存监控脚本
【Python】GPU内存监控脚本