基于Spark Streaming 进行 MySQL Binlog 日志准实时传输

简介: 基本架构 RDS -> SLS -> Spark Streaming -> Spark HDFS 上述链路主要包含3个过程: 如何把 RDS 的 binlog 收集到 SLS。 如何通过 Spark Streaming 将 SLS 中的日志读取出来,进行分析。

基本架构

RDS -> SLS -> Spark Streaming -> Spark HDFS

上述链路主要包含3个过程:

  1. 如何把 RDS 的 binlog 收集到 SLS。
  2. 如何通过 Spark Streaming 将 SLS 中的日志读取出来,进行分析。
  3. 如何把链路 2 中读取和处理过的日志,保存到 Spark HDFS中。

环境准备

  1. 安装一个 MySQL 类型的数据库(使用 MySQL 协议,例如 RDS、DRDS 等),开启 log-bin 功能,且配置 binlog 类型为 ROW 模式(RDS默认开启)。
  2. 开通 SLS 服务。

操作步骤

  1. 检查 MySQL 数据库环境。

    1. 查看是否开启 log-bin 功能。
    mysql> show variables like "log_bin";
    +---------------+-------+
    | Variable_name | Value |
    +---------------+-------+
    | log_bin       | ON    |
    +---------------+-------+
    1 row in set (0.02 sec)
    1. 查看 binlog 类型
    mysql> show variables like "binlog_format";
    +---------------+-------+
    | Variable_name | Value |
    +---------------+-------+
    | binlog_format | ROW   |
    +---------------+-------+
    1 row in set (0.03 sec)  
  2. 添加用户权限。(也可以直接通过RDS控制台添加)

    CREATE USER canal IDENTIFIED BY ‘canal’;GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO ‘canal’@’%’;FLUSH PRIVILEGES;
  3. 为 SLS 服务添加对应的配置文件,并检查数据是否正常采集。

    1. 在 SLS 控制台添加对应的 project 和 logstore,例如:创建一个名称为 canaltest 的 project,然后创建一个名称为 canal 的 logstore。
    2. 对 SLS 进行配置:在 /etc/ilogtail 目录下创建文件user_local_config.json,具体配置如下:
    {
    "metrics": {
     "##1.0##canaltest$plugin-local": {
         "aliuid": "****",
         "enable": true,
         "category": "canal",
         "defaultEndpoint": "*******",
         "project_name": "canaltest",
         "region": "cn-hangzhou",
         "version": 2
         "log_type": "plugin",
         "plugin": {
             "inputs": [
                 {
                     "type": "service_canal",
                     "detail": {
                         "Host": "*****",
                         "Password": "****",
                         "ServerID": ****,
                         "User" : "***",
                         "DataBases": [
                             "yourdb"
                         ],
                         "IgnoreTables": [
                             "\\S+_inner"
                         ],
                          "TextToString" : true
                     }
                 }
             ],
             "flushers": [
                 {
                     "type": "flusher_sls",
                     "detail": {}
                 }
             ]
         }
     }
    }
    }

    其中 detail 中的 Host 和 Password 等信息为 MySQL 数据库信息,User 信息为之前授权过的用户名。aliUid、defaultEndpoint、project_name、category 请根据自己的实际情况填写对应的用户和 SLS 信息。

    1. 等待约 2 分钟,通过 SLS 控制台查看日志数据是否上传成功,具体如图所示。
      image

如果日志数据没有采集成功,请根据SLS的提示,查看SLS的采集日志进行排查。

  1. 准备代码,将代码编译成 jar 包,然后上传到 OSS。

    1. 将 EMR 的示例代码通过 git 复制下来,然后进行修改,具体命令为:
    git clone https://github.com/aliyun/aliyun-emapreduce-demo.git。

    示例代码中已经有 LoghubSample 类,该类主要用于从 SLS 采集数据并打印。以下是修改后的代码,供参考:

    package com.aliyun.emr.example
    import org.apache.spark.SparkConf
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.aliyun.logservice.LoghubUtils
    import org.apache.spark.streaming.{Milliseconds, StreamingContext}
    object LoghubSample {
    def main(args: Array[String]): Unit = {
    if (args.length < 7) {
     System.err.println(
       """Usage: bin/spark-submit --class LoghubSample examples-1.0-SNAPSHOT-shaded.jar
         |            
         |           
       """.stripMargin)
     System.exit(1)
    }
    val loghubProject = args(0)
    val logStore = args(1)
    val loghubGroupName = args(2)
    val endpoint = args(3)
    val accessKeyId = args(4)
    val accessKeySecret = args(5)
    val batchInterval = Milliseconds(args(6).toInt * 1000)
    val conf = new SparkConf().setAppName("Mysql Sync")
    //    conf.setMaster("local[4]");
    val ssc = new StreamingContext(conf, batchInterval)
    val loghubStream = LoghubUtils.createStream(
     ssc,
     loghubProject,
     logStore,
     loghubGroupName,
     endpoint,
     1,
     accessKeyId,
     accessKeySecret,
     StorageLevel.MEMORY_AND_DISK)
    loghubStream.foreachRDD(rdd =>
       rdd.saveAsTextFile("/mysqlbinlog")
    )
    ssc.start()
    ssc.awaitTermination()
    }
    }

其中的主要改动是:

loghubStream.foreachRDD(rdd => rdd.saveAsObjectFile(“/mysqlbinlog”) )

这样在 EMR 集群中运行时,就会把Spark Streaming 中流出来的数据,保存到 EMR 的 HDFS 中。

  1. 说明
    由于如果要在本地运行,请在本地环境提前搭建 Hadoop 集群。

由于 EMR 的 Spark SDK 做了升级,其示例代码比较旧,不能直接在参数中传递 OSS 的 AccessKeyId、AccessKeySecret, 而是需要通过 SparkConf 进行设置,如下所示。

trait RunLocally {
val conf = new SparkConf().setAppName(getAppName).setMaster("local[4]")
conf.set("spark.hadoop.fs.oss.impl", "com.aliyun.fs.oss.nat.NativeOssFileSystem")
conf.set("spark.hadoop.mapreduce.job.run-local", "true")
conf.set("spark.hadoop.fs.oss.endpoint", "YourEndpoint")
conf.set("spark.hadoop.fs.oss.accessKeyId", "YourId")
conf.set("spark.hadoop.fs.oss.accessKeySecret", "YourSecret")
conf.set("spark.hadoop.job.runlocal", "true")
conf.set("spark.hadoop.fs.oss.impl", "com.aliyun.fs.oss.nat.NativeOssFileSystem")
conf.set("spark.hadoop.fs.oss.buffer.dirs", "/mnt/disk1")
val sc = new SparkContext(conf)
def getAppName: String
}

在本地调试时,需要把 loghubStream.foreachRDD(rdd => rdd.saveAsObjectFile(“/mysqlbinlog”) ) 中的 /mysqlbinlog 修改成本地 HDFS的地址。

  1. 代码编译。
    在本地调试完成后,我们可以通过如下命令进行打包编译:

  2. clean install

  3. 上传 jar 包。
    请先在 OSS 上建立 bucket 为 qiaozhou-EMR/jar的目录,然后通过OSS 控制台或 OSS 的 SDK 将 /target/shaded目录下的 examples-1.1-shaded.jar上传到 OSS 的这个目录下。上传后的 jar 包地址为 oss://qiaozhou-EMR/jar/examples-1.1-shaded.jar,这个地址在后面会用上,如下图所示:

image

  1. 搭建 EMR 集群,创建任务并运行执行计划。

    1. 通过 EMR 控制台创建一个 EMR 集群,大约需要 10 分钟左右,请耐心等待。
    2. 创建一个类型为 Spark 的作业。
      请根据您具体的配置将 SLS_endpoint $SLS_access_id $SLS_secret_key 替换成真实值。请注意参数的顺序,否则可能会报错。
    —master yarn —deploy-mode client —driver-memory 4g —executor-memory 2g —executor-cores 2 —class com.aliyun.EMR.example.LoghubSample ossref://EMR-test/jar/examples-1.1-shaded.jar canaltest canal sparkstreaming $SLS_endpoint $SLS_access_id $SLS_secret_key 1

运行以上的命令

  1. 查询 Master 节点的IP
  2. 通过 SSH 登录后,执行以下命令:

  3. fs -ls /

  4. 可以看到 mysqlbinlog 开头的目录,再通过以下命令查看 mysqlbinlog 文件:

  5. fs -ls /mysqlbinlog

还可以通过 hadoop fs -cat /mysqlbinlog/part-00000 命令查看文件内容。

  1. 错误排查。
    如果没有看到正常的结果,可以登陆节点,查看对应的作业的错误情况。
相关实践学习
数据湖构建DLF快速入门
本教程通过使⽤数据湖构建DLF产品对于淘宝用户行为样例数据的分析,介绍数据湖构建DLF产品的数据发现和数据探索功能。
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
21天前
|
SQL 关系型数据库 MySQL
Mysql 的binlog日志的原理【4月更文挑战第1天】
【4月更文挑战第1天】 MySQL的binlog(二进制日志)是一个记录数据库更改的日志文件,它主要用于复制和恢复操作。以下是binlog日志的工作原理的简要概述: **事件写入**:当MySQL服务器执行一个事务时,它会将该事务中所有对数据库的修改操作(如INSERT、UPDATE和DELETE等)记录为一个事件(event)。这些事件包含了修改操作的相关信息,如操作类型、涉及的表、修改的行等。
101 1
|
4天前
|
SQL 关系型数据库 MySQL
使用mysql数据库的binlog应对故障
【6月更文挑战第1天】本文介绍`mysql的 binlog`工具用于解析MySQL的二进制日志,转换为可执行的SQL语句,主要用于数据库主从复制和增量恢复。定期备份和binlog推送能实现故障时的数据恢复。
41 8
使用mysql数据库的binlog应对故障
|
8天前
|
SQL 分布式计算 监控
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用
本文演示了使用 EMR Serverless Spark 产品搭建一个日志分析应用的全流程,包括数据开发和生产调度以及交互式查询等场景。
55892 1
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用
|
13天前
|
存储 关系型数据库 MySQL
实时计算 Flink版产品使用合集之如何配置可以实现实时同步多张MySQL源表时只读取一次binlog
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
18天前
|
SQL 资源调度 关系型数据库
实时计算 Flink版产品使用合集之在抓取 MySQL binlog 数据时,datetime 字段会被自动转换为时间戳形式如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
25 2
|
19天前
|
关系型数据库 MySQL 数据处理
实时计算 Flink版产品使用合集之在同步MySQL的时候卡在某个binlog文件处如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
实时计算 Flink版产品使用合集之在同步MySQL的时候卡在某个binlog文件处如何解决
|
19天前
|
Prometheus Cloud Native 关系型数据库
实时计算 Flink版产品使用合集之binlog被清理掉的问题,并且binlog有备份,有什么方法来恢复到RDS
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
19天前
|
关系型数据库 MySQL Java
实时计算 Flink版产品使用合集之如果MySQL的binlog保存时间只有三天,那么三天之前的数据是不是会通过Doris的外表手动插入
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
21天前
|
DataWorks 关系型数据库 MySQL
DataWorks产品使用合集之在DataWorks中,如何通过PolarDB for MySQL来查看binlog日志
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
40 1
|
5天前
|
存储 NoSQL 关系型数据库
mysql 数据库 基本介绍
mysql 数据库 基本介绍