数据迁移到 Django 模型表:详尽指南

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群版 2核4GB 100GB
推荐场景:
搭建个人博客
云数据库 RDS MySQL,高可用版 2核4GB 50GB
简介: 数据迁移是许多应用程序开发过程中必不可少的一部分。在这篇文章中,我们将详细分析和总结如何通过一个定制的 Django 管理命令,将数据从 MySQL 数据库迁移到 Django 模型表中。这种方法可以确保数据在多个数据库之间有效且安全地迁移,同时避免了手动操作的繁琐和错误。

数据迁移是许多应用程序开发过程中必不可少的一部分。在这篇文章中,我们将详细分析和总结如何通过一个定制的 Django 管理命令,将数据从 MySQL 数据库迁移到 Django 模型表中。这种方法可以确保数据在多个数据库之间有效且安全地迁移,同时避免了手动操作的繁琐和错误。

项目概览

我们将实现一个 Django 管理命令,该命令将从 MySQL 数据库中提取数据并批量插入到 Django 模型表中。这个过程将使用事务处理来确保数据一致性,并通过记录偏移量来支持断点续传。

代码详解

首先,我们需要定义一个 Django 管理命令。以下是完整的代码:

import os
import traceback
import mysql.connector
from django.db import transaction
from mysql.connector import Error
from django.core.management.base import BaseCommand
from myapp.models import HotSearchTermsReportABA

db_config = {
    # MySQL 数据库配置
}

class Command(BaseCommand):
    help = '数据迁移到 Django 模型表'

    def handle(self, *args, **kwargs):
        try:
            db_conn = mysql.connector.connect(**db_config)
            db_cursor = db_conn.cursor()
            self.stdout.write(self.style.SUCCESS("正在连接数据库"))
        except Error as e:
            self.stdout.write(self.style.ERROR(f"连接过程中出现异常:{e}"))
            self.stdout.write(self.style.ERROR(str(traceback.format_exc())))
            return
        period = '最新'
        fetch_sql = f"""
            SELECT search_rank, search_term FROM hot_terms_table 
            WHERE period = '{period}' 
            LIMIT %s OFFSET %s;"""
        # 批量大小
        batch_size = 5000
        # 读取偏移量
        offset = self.get_last_offset()
        total_rows_transferred = 0

        try:
            while True:
                db_cursor.execute(fetch_sql, (batch_size, offset))
                batch_data = db_cursor.fetchall()
                if not batch_data:
                    break  # 如果没有更多的数据,退出循环

                # 将batch_data转换为HotSearchTermsReportABA对象列表
                objects = [
                    HotSearchTermsReportABA(
                        search_rank=row[0],
                        search_term=row[1]
                    ) for row in batch_data
                ]

                with transaction.atomic():  # 开启事务
                    # 在Django中批量创建对象
                    HotSearchTermsReportABA.objects.bulk_create(objects)

                    # 更新偏移量和总条数
                    offset += batch_size
                    total_rows_transferred += len(batch_data)
                    self.stdout.write(self.style.SUCCESS(f"{len(batch_data)} 行数据已在此批中转移。"))
                    self.stdout.write(self.style.SUCCESS(f"总共完成将 {total_rows_transferred} 行数据转移。"))

                    # 更新文件中的偏移量
                    self.update_last_offset(offset)

        except Error as e:
            self.stdout.write(self.style.ERROR(f"传输过程中出现异常:{e}"))
            self.stdout.write(self.style.ERROR(str(traceback.format_exc())))

        finally:
            # 关闭所有连接和游标
            if db_cursor:
                db_cursor.close()
            if db_conn:
                db_conn.close()

    def get_last_offset(self):
        # 从文件中读取偏移量
        offset_file = 'migration_offset.txt'
        if os.path.exists(offset_file):
            with open(offset_file, 'r') as file:
                return int(file.read().strip())
        return 0

    def update_last_offset(self, offset):
        # 将偏移量写入文件
        offset_file = 'migration_offset.txt'
        with open(offset_file, 'w') as file:
            file.write(str(offset))

# python manage.py migrate_data

代码分析

数据库连接

首先,代码尝试连接到 MySQL 数据库。如果连接失败,会捕获异常并输出错误信息。

try:
    db_conn = mysql.connector.connect(**db_config)
    db_cursor = db_conn.cursor()
    self.stdout.write(self.style.SUCCESS("正在连接数据库"))
except Error as e:
    self.stdout.write(self.style.ERROR(f"连接过程中出现异常:{e}"))
    self.stdout.write(self.style.ERROR(str(traceback.format_exc())))
    return

SQL 查询与数据提取

接下来,代码定义了一个 SQL 查询语句,用于从 hot_search_terms_report 表中获取数据。使用 LIMITOFFSET 实现分页读取数据。

period = '最新'
fetch_sql = f"""
    SELECT search_rank, search_term FROM hot_terms_table 
    WHERE period = '{period}' 
    LIMIT %s OFFSET %s;
"""
batch_size = 5000
offset = self.get_last_offset()
total_rows_transferred = 0

数据迁移与事务处理

代码使用一个循环来分页读取数据,并将数据转换为 Django 模型对象,然后使用事务处理将数据批量插入到 Django 数据库中。事务处理确保数据的一致性,即使在插入过程中发生错误,也能回滚事务。

try:
    while True:
        db_cursor.execute(fetch_sql, (batch_size, offset))
        batch_data = db_cursor.fetchall()
        if not batch_data:
            break  # 如果没有更多的数据,退出循环

        objects = [
            HotSearchTermsReportABA(
                search_rank=row[0],
                search_term=row[1]
            ) for row in batch_data
        ]

        with transaction.atomic():
            HotSearchTermsReportABA.objects.bulk_create(objects)

            offset += batch_size
            total_rows_transferred += len(batch_data)
            self.stdout.write(self.style.SUCCESS(f"{len(batch_data)} 行数据已在此批中转移。"))
            self.stdout.write(self.style.SUCCESS(f"总共完成了将 {total_rows_transferred} 行数据转移。"))

            self.update_last_offset(offset)

偏移量管理

为了支持断点续传,代码会将每次读取的数据偏移量存储在一个文件中。下次运行时,会从该文件读取偏移量,继续上次未完成的迁移任务。

def get_last_offset(self):
    offset_file = 'migration_offset.txt'
    if os.path.exists(offset_file):
        with open(offset_file, 'r') as file:
            return int(file.read().strip())
    return 0

def update_last_offset(self, offset):
    offset_file = 'migration_offset.txt'
    with open(offset_file, 'w') as file:
        file.write(str(offset))

使用方法

  1. 配置数据库连接: 在 db_config 中填写你的 MySQL 数据库连接配置。
  2. 创建 Django 管理命令: 将上述代码保存为 management/commands/migrate_data.py 文件。
  3. 运行命令: 使用以下命令运行数据迁移:
python manage.py migrate_data

总结

通过这种方法,我们可以实现从 MySQL 数据库到 Django 模型表的高效、安全的数据迁移。事务处理和偏移量管理的引入,不仅确保了数据的一致性和完整性,还为大规模数据迁移提供了良好的支持。这种方法同样适用于其他类似的数据迁移任务,具有很高的通用性和实用性。

相关实践学习
基于CentOS快速搭建LAMP环境
本教程介绍如何搭建LAMP环境,其中LAMP分别代表Linux、Apache、MySQL和PHP。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
14小时前
|
关系型数据库 MySQL 数据库
数据迁移脚本优化过程:从 MySQL 到 Django 模型表
在大规模的数据迁移过程中,性能问题往往是开发者面临的主要挑战之一。本文将分析一个数据迁移脚本的优化过程,展示如何从 MySQL 数据库迁移数据到 Django 模型表,并探讨优化前后的性能差异。
|
1天前
|
JSON API 数据格式
Django REST framework序列化器详解:普通序列化器与模型序列化器的选择与运用
Django REST framework序列化器详解:普通序列化器与模型序列化器的选择与运用
|
1天前
|
存储 SQL 数据处理
Django ORM实战:模型字段与元选项配置,以及链式过滤与QF查询详解
Django ORM实战:模型字段与元选项配置,以及链式过滤与QF查询详解
|
1天前
|
数据库 开发者 Python
Django ORM入门指南:从概念到实践,掌握模型创建、迁移与视图操作
Django ORM入门指南:从概念到实践,掌握模型创建、迁移与视图操作
|
20天前
|
数据采集 存储 数据库
优化 Django 模型设计:解决重复爬虫任务和商品数据
在开发数据采集(爬虫)应用时,我们常常面临这样一个问题:不同用户提交了相同的采集任务。为了避免在数据库中存储重复的 URL 和商品数据,我们需要优化模型设计。本文将介绍如何设计 Django 模型,以解决这个问题,并确保数据的一致性和完整性。
|
1月前
|
Python
使用Django时,如何设计模型关系(一对一、一对多、多对多)?
Django支持三种模型关联:ForeignKey(一对多),OneToOneField(一对一)和ManyToManyField(多对多)。ForeignKey示例:`Article`有一个指向`Author`的外键。OneToOneField示例:`UserProfile`与`User`一对一关联。ManyToManyField示例:`Student`和`Course`之间多对多关系。这些关联字段便于反向查询,如`article.author`获取作者,`author.article_set.all()`获取作者所有文章。
26 1
|
1月前
|
SQL 数据库 索引
Django MTV - 模型层 - (专题)知识要点与实战案例
Django MTV - 模型层 - (专题)知识要点与实战案例
42 0
|
1月前
|
存储 安全 网络协议
Python 教程之 Django(9)对模型中的字段进行验证
Python 教程之 Django(9)对模型中的字段进行验证
37 0
Python 教程之 Django(9)对模型中的字段进行验证
|
1月前
|
API 数据库 Python
Python 教程之 Django(8)在 Django 管理界面中渲染模型
Python 教程之 Django(8)在 Django 管理界面中渲染模型
29 0
Python 教程之 Django(8)在 Django 管理界面中渲染模型
|
1月前
|
SQL 存储 API
Python 教程之 Django(7)Django 模型
Python 教程之 Django(7)Django 模型
59 1
Python 教程之 Django(7)Django 模型