基于Spark Streaming 进行 MySQL Binlog 日志准实时传输

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 基本架构 RDS -> SLS -> Spark Streaming -> Spark HDFS 上述链路主要包含3个过程: 如何把 RDS 的 binlog 收集到 SLS。 如何通过 Spark Streaming 将 SLS 中的日志读取出来,进行分析。

基本架构

RDS -> SLS -> Spark Streaming -> Spark HDFS

上述链路主要包含3个过程:

  1. 如何把 RDS 的 binlog 收集到 SLS。
  2. 如何通过 Spark Streaming 将 SLS 中的日志读取出来,进行分析。
  3. 如何把链路 2 中读取和处理过的日志,保存到 Spark HDFS中。

环境准备

  1. 安装一个 MySQL 类型的数据库(使用 MySQL 协议,例如 RDS、DRDS 等),开启 log-bin 功能,且配置 binlog 类型为 ROW 模式(RDS默认开启)。
  2. 开通 SLS 服务。

操作步骤

  1. 检查 MySQL 数据库环境。

    1. 查看是否开启 log-bin 功能。
    mysql> show variables like "log_bin";
    +---------------+-------+
    | Variable_name | Value |
    +---------------+-------+
    | log_bin       | ON    |
    +---------------+-------+
    1 row in set (0.02 sec)
    AI 代码解读
    1. 查看 binlog 类型
    mysql> show variables like "binlog_format";
    +---------------+-------+
    | Variable_name | Value |
    +---------------+-------+
    | binlog_format | ROW   |
    +---------------+-------+
    1 row in set (0.03 sec)  
    AI 代码解读
  2. 添加用户权限。(也可以直接通过RDS控制台添加)

    CREATE USER canal IDENTIFIED BY ‘canal’;GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO ‘canal’@’%’;FLUSH PRIVILEGES;
    AI 代码解读
  3. 为 SLS 服务添加对应的配置文件,并检查数据是否正常采集。

    1. 在 SLS 控制台添加对应的 project 和 logstore,例如:创建一个名称为 canaltest 的 project,然后创建一个名称为 canal 的 logstore。
    2. 对 SLS 进行配置:在 /etc/ilogtail 目录下创建文件user_local_config.json,具体配置如下:
    {
    "metrics": {
     "##1.0##canaltest$plugin-local": {
         "aliuid": "****",
         "enable": true,
         "category": "canal",
         "defaultEndpoint": "*******",
         "project_name": "canaltest",
         "region": "cn-hangzhou",
         "version": 2
         "log_type": "plugin",
         "plugin": {
             "inputs": [
                 {
                     "type": "service_canal",
                     "detail": {
                         "Host": "*****",
                         "Password": "****",
                         "ServerID": ****,
                         "User" : "***",
                         "DataBases": [
                             "yourdb"
                         ],
                         "IgnoreTables": [
                             "\\S+_inner"
                         ],
                          "TextToString" : true
                     }
                 }
             ],
             "flushers": [
                 {
                     "type": "flusher_sls",
                     "detail": {}
                 }
             ]
         }
     }
    }
    }
    AI 代码解读

    其中 detail 中的 Host 和 Password 等信息为 MySQL 数据库信息,User 信息为之前授权过的用户名。aliUid、defaultEndpoint、project_name、category 请根据自己的实际情况填写对应的用户和 SLS 信息。

    1. 等待约 2 分钟,通过 SLS 控制台查看日志数据是否上传成功,具体如图所示。
      image

如果日志数据没有采集成功,请根据SLS的提示,查看SLS的采集日志进行排查。

  1. 准备代码,将代码编译成 jar 包,然后上传到 OSS。

    1. 将 EMR 的示例代码通过 git 复制下来,然后进行修改,具体命令为:
    git clone https://github.com/aliyun/aliyun-emapreduce-demo.git。
    AI 代码解读

    示例代码中已经有 LoghubSample 类,该类主要用于从 SLS 采集数据并打印。以下是修改后的代码,供参考:

    package com.aliyun.emr.example
    import org.apache.spark.SparkConf
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.aliyun.logservice.LoghubUtils
    import org.apache.spark.streaming.{Milliseconds, StreamingContext}
    object LoghubSample {
    def main(args: Array[String]): Unit = {
    if (args.length < 7) {
     System.err.println(
       """Usage: bin/spark-submit --class LoghubSample examples-1.0-SNAPSHOT-shaded.jar
         |            
         |           
       """.stripMargin)
     System.exit(1)
    }
    val loghubProject = args(0)
    val logStore = args(1)
    val loghubGroupName = args(2)
    val endpoint = args(3)
    val accessKeyId = args(4)
    val accessKeySecret = args(5)
    val batchInterval = Milliseconds(args(6).toInt * 1000)
    val conf = new SparkConf().setAppName("Mysql Sync")
    //    conf.setMaster("local[4]");
    val ssc = new StreamingContext(conf, batchInterval)
    val loghubStream = LoghubUtils.createStream(
     ssc,
     loghubProject,
     logStore,
     loghubGroupName,
     endpoint,
     1,
     accessKeyId,
     accessKeySecret,
     StorageLevel.MEMORY_AND_DISK)
    loghubStream.foreachRDD(rdd =>
       rdd.saveAsTextFile("/mysqlbinlog")
    )
    ssc.start()
    ssc.awaitTermination()
    }
    }
    AI 代码解读

其中的主要改动是:

loghubStream.foreachRDD(rdd => rdd.saveAsObjectFile(“/mysqlbinlog”) )
AI 代码解读

这样在 EMR 集群中运行时,就会把Spark Streaming 中流出来的数据,保存到 EMR 的 HDFS 中。

  1. 说明
    由于如果要在本地运行,请在本地环境提前搭建 Hadoop 集群。

由于 EMR 的 Spark SDK 做了升级,其示例代码比较旧,不能直接在参数中传递 OSS 的 AccessKeyId、AccessKeySecret, 而是需要通过 SparkConf 进行设置,如下所示。

trait RunLocally {
val conf = new SparkConf().setAppName(getAppName).setMaster("local[4]")
conf.set("spark.hadoop.fs.oss.impl", "com.aliyun.fs.oss.nat.NativeOssFileSystem")
conf.set("spark.hadoop.mapreduce.job.run-local", "true")
conf.set("spark.hadoop.fs.oss.endpoint", "YourEndpoint")
conf.set("spark.hadoop.fs.oss.accessKeyId", "YourId")
conf.set("spark.hadoop.fs.oss.accessKeySecret", "YourSecret")
conf.set("spark.hadoop.job.runlocal", "true")
conf.set("spark.hadoop.fs.oss.impl", "com.aliyun.fs.oss.nat.NativeOssFileSystem")
conf.set("spark.hadoop.fs.oss.buffer.dirs", "/mnt/disk1")
val sc = new SparkContext(conf)
def getAppName: String
}
AI 代码解读

在本地调试时,需要把 loghubStream.foreachRDD(rdd => rdd.saveAsObjectFile(“/mysqlbinlog”) ) 中的 /mysqlbinlog 修改成本地 HDFS的地址。

  1. 代码编译。
    在本地调试完成后,我们可以通过如下命令进行打包编译:

  2. clean install

  3. 上传 jar 包。
    请先在 OSS 上建立 bucket 为 qiaozhou-EMR/jar的目录,然后通过OSS 控制台或 OSS 的 SDK 将 /target/shaded目录下的 examples-1.1-shaded.jar上传到 OSS 的这个目录下。上传后的 jar 包地址为 oss://qiaozhou-EMR/jar/examples-1.1-shaded.jar,这个地址在后面会用上,如下图所示:

image

  1. 搭建 EMR 集群,创建任务并运行执行计划。

    1. 通过 EMR 控制台创建一个 EMR 集群,大约需要 10 分钟左右,请耐心等待。
    2. 创建一个类型为 Spark 的作业。
      请根据您具体的配置将 SLS_endpoint $SLS_access_id $SLS_secret_key 替换成真实值。请注意参数的顺序,否则可能会报错。
    —master yarn —deploy-mode client —driver-memory 4g —executor-memory 2g —executor-cores 2 —class com.aliyun.EMR.example.LoghubSample ossref://EMR-test/jar/examples-1.1-shaded.jar canaltest canal sparkstreaming $SLS_endpoint $SLS_access_id $SLS_secret_key 1
    AI 代码解读

运行以上的命令

  1. 查询 Master 节点的IP
  2. 通过 SSH 登录后,执行以下命令:

  3. fs -ls /

  4. 可以看到 mysqlbinlog 开头的目录,再通过以下命令查看 mysqlbinlog 文件:

  5. fs -ls /mysqlbinlog

还可以通过 hadoop fs -cat /mysqlbinlog/part-00000 命令查看文件内容。

  1. 错误排查。
    如果没有看到正常的结果,可以登陆节点,查看对应的作业的错误情况。
相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
目录
打赏
0
0
0
0
3055
分享
相关文章
MySQL原理简介—7.redo日志的底层原理
本文介绍了MySQL中redo日志和undo日志的主要内容: 1. redo日志的意义:确保事务提交后数据不丢失,通过记录修改操作并在系统宕机后重做日志恢复数据。 2. redo日志文件构成:记录表空间号、数据页号、偏移量及修改内容。 3. redo日志写入机制:redo日志先写入Redo Log Buffer,再批量刷入磁盘文件,减少随机写以提高性能。 4. Redo Log Buffer解析:描述Redo Log Buffer的内存结构及刷盘时机,如事务提交、Buffer过半或后台线程定时刷新。 5. undo日志原理:用于事务回滚,记录插入、删除和更新前的数据状态,确保事务可完整回滚。
MySQL日志详解——日志分类、二进制日志bin log、回滚日志undo log、重做日志redo log
MySQL日志详解——日志分类、二进制日志bin log、回滚日志undo log、重做日志redo log、原理、写入过程;binlog与redolog区别、update语句的执行流程、两阶段提交、主从复制、三种日志的使用场景;查询日志、慢查询日志、错误日志等其他几类日志
MySQL日志详解——日志分类、二进制日志bin log、回滚日志undo log、重做日志redo log
Mysql、Oracle审计日志的开启
通过上述步骤,可以在 MySQL 和 Oracle 数据库中启用和配置审计日志。这些日志对于监控数据库操作、提高安全性和满足合规性要求非常重要。确保正确配置审计参数和策略,定期查看和分析审计日志,有助于及时发现并处理潜在的安全问题。
40 11
MySQL事务日志-Undo Log工作原理分析
事务的持久性是交由Redo Log来保证,原子性则是交由Undo Log来保证。如果事务中的SQL执行到一半出现错误,需要把前面已经执行过的SQL撤销以达到原子性的目的,这个过程也叫做"回滚",所以Undo Log也叫回滚日志。
MySQL事务日志-Undo Log工作原理分析
数据库灾难应对:MySQL误删除数据的救赎之道,技巧get起来!之binlog
《数据库灾难应对:MySQL误删除数据的救赎之道,技巧get起来!之binlog》介绍了如何利用MySQL的二进制日志(Binlog)恢复误删除的数据。主要内容包括: 1. **启用二进制日志**:在`my.cnf`中配置`log-bin`并重启MySQL服务。 2. **查看二进制日志文件**:使用`SHOW VARIABLES LIKE &#39;log_%&#39;;`和`SHOW MASTER STATUS;`命令获取当前日志文件及位置。 3. **创建数据备份**:确保在恢复前已有备份,以防意外。 4. **导出二进制日志为SQL语句**:使用`mysqlbinlog`
117 2
MySQL崩溃保险箱:探秘Redo/Undo日志确保数据库安全无忧!
《MySQL崩溃保险箱:探秘Redo/Undo日志确保数据库安全无忧!》介绍了MySQL中的三种关键日志:二进制日志(Binary Log)、重做日志(Redo Log)和撤销日志(Undo Log)。这些日志确保了数据库的ACID特性,即原子性、一致性、隔离性和持久性。Redo Log记录数据页的物理修改,保证事务持久性;Undo Log记录事务的逆操作,支持回滚和多版本并发控制(MVCC)。文章还详细对比了InnoDB和MyISAM存储引擎在事务支持、锁定机制、并发性等方面的差异,强调了InnoDB在高并发和事务处理中的优势。通过这些机制,MySQL能够在事务执行、崩溃和恢复过程中保持
125 3
Mysql并发控制和日志
通过深入理解和应用 MySQL 的并发控制和日志管理技术,您可以显著提升数据库系统的效率和稳定性。
174 10
MySQL进阶突击系列(02)一条更新SQL执行过程 | 讲透undoLog、redoLog、binLog日志三宝
本文详细介绍了MySQL中update SQL执行过程涉及的undoLog、redoLog和binLog三种日志的作用及其工作原理,包括它们如何确保数据的一致性和完整性,以及在事务提交过程中各自的角色。同时,文章还探讨了这些日志在故障恢复中的重要性,强调了合理配置相关参数对于提高系统稳定性的必要性。
mysql 的ReLog和BinLog区别
MySQL中的重做日志和二进制日志是确保数据库稳定性和可靠性的关键组件。重做日志主要用于事务的持久性和原子性,通过记录数据页的物理修改信息来恢复未提交的事务;而二进制日志记录SQL语句的逻辑变化,支持数据复制、恢复和审计。两者在写入时机、存储方式及配置参数等方面存在显著差异。
【赵渝强老师】MySQL的binlog日志文件
MySQL的binlog日志记录了所有对数据库的更改操作(不包括SELECT和SHOW),主要用于主从复制和数据恢复。binlog有三种模式,可通过设置binlog_format参数选择。示例展示了如何启用binlog、设置格式、查看日志文件及记录的信息。
382 6