使用EMR来进行mysqlbinlog日志准实时传输

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
对象存储 OSS,20GB 3个月
对象存储 OSS,恶意文件检测 1000次 1年
简介: 如何利用阿里云的sls插件功能和emr来进行mysql binlog的准实时传输

简介

本文将介绍如何利用阿里云的sls插件功能和emr来进行mysql binlog的准实时传输

基本架构

rds -> sls -> spark streaming -> spark hdfs

主要包含3个链路:

1. 怎么把rds的binlog收集到sls

2.怎么通过spark streaming将sls中的日志读取出来,进行分析

3.怎么把2中读取和处理过的日志,保存到spark hdfs中

环境准备

需要mysql类型数据库、开通sls服务并添加对应的project和logstore、然后创建一个emr集群。

具体步骤

准备好mysql数据库环境,添加用户权限

首先需要一个mysql类型数据库(使用mysql协议,例如RDS、DRDS等),
数据库开启binlog,且配置binlog类型为ROW模式(RDS默认开启)。

  1. 先检查下环境:

  ### 查看是否开启binlog
  mysql> show variables like "log_bin";
  +---------------+-------+
  | Variable_name | Value |
  +---------------+-------+
  | log_bin       | ON    |
  +---------------+-------+
  1 row in set (0.02 sec)
  ### 查看binlog类型
  mysql> show variables like "binlog_format";
  +---------------+-------+
  | Variable_name | Value |
  +---------------+-------+
  | binlog_format | ROW   |
  +---------------+-------+
  1 row in set (0.03 sec)

  2. 添加权限,当然也可以直接通过rds控制台添加:

  CREATE USER canal IDENTIFIED BY 'canal';
  GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
  FLUSH PRIVILEGES;

开通sls,添加对应的配置文件,并检查数据是否正常采集

需要开通sls,并在sls控制台添加对应的project和logstore,例如:我创建的project叫canaltest,logstore叫canal.
然后对sls进行配置,配置文件目录在:/etc/ilogtail下创建文件:user_local_config.json,具体配置文件如下:


 {
      "metrics": {
          "##1.0##canaltest$plugin-local": {
              "aliuid": "****",
              "enable": true,
              "category": "canal",
              "defaultEndpoint": "*******",
              "project_name": "canaltest",
              "region": "cn-hangzhou",
              "version": 2
              "log_type": "plugin",
              "plugin": {
                  "inputs": [
                      {
                          "type": "service_canal",
                          "detail": {
                              "Host": "*****",
                              "Password": "****",
                              "ServerID": ****,
                              "User" : "***",
                              "DataBases": [
                                  "yourdb"
                              ],
                              "IgnoreTables": [
                                  "\\S+_inner"
                              ],
                               "TextToString" : true
                          }
                      }
                  ],
                  "flushers": [
                      {
                          "type": "flusher_sls",
                          "detail": {}
                      }
                  ]
              }
          }
      }
  }

其中detail中的Host和Password等信息为MySQL数据库信息,User请用之前授权过的用户名。
aliUid、defaultEndpoint、project_name、category请根据自己的实际情况填写对应的用户和sls信息。
配置好后,我们过2分钟左右,通过sls控制台,看下数据有没有上来,具体如图:
sls日志.png
如果没有正常采集上来,请根据sls的提示,查看sls的采集日志进行排查。

准备好代码,编译成jar包,上传到oss

  • 代码准备

我们可以将emr的example代码通过git复制下来,修改下,具体命令如下:

git clone https://github.com/aliyun/aliyun-emapreduce-demo.git

example代码中已经有LoghubSample这个类,主要用于从sls采集数据,然后直接打印出来。
我们只要稍做修改,就可以达到我们的目的了。
以下是我修改后的代码:

 
package com.aliyun.emr.example

  import org.apache.spark.SparkConf
  import org.apache.spark.storage.StorageLevel
  import org.apache.spark.streaming.aliyun.logservice.LoghubUtils
  import org.apache.spark.streaming.{Milliseconds, StreamingContext}

  object LoghubSample {
    def main(args: Array[String]): Unit = {
      if (args.length < 7) {
        System.err.println(
          """Usage: bin/spark-submit --class LoghubSample examples-1.0-SNAPSHOT-shaded.jar
            |            
            |           
          """.stripMargin)
        System.exit(1)
      }

      val loghubProject = args(0)
      val logStore = args(1)
      val loghubGroupName = args(2)
      val endpoint = args(3)
      val accessKeyId = args(4)
      val accessKeySecret = args(5)
      val batchInterval = Milliseconds(args(6).toInt * 1000)

      val conf = new SparkConf().setAppName("Mysql Sync")
  //    conf.setMaster("local[4]");
      val ssc = new StreamingContext(conf, batchInterval)
      val loghubStream = LoghubUtils.createStream(
        ssc,
        loghubProject,
        logStore,
        loghubGroupName,
        endpoint,
        1,
        accessKeyId,
        accessKeySecret,
        StorageLevel.MEMORY_AND_DISK)

      loghubStream.foreachRDD(rdd =>
          rdd.saveAsTextFile("/mysqlbinlog")
      )

      ssc.start()
      ssc.awaitTermination()
    }
  }

其中主要改动是:



loghubStream.foreachRDD(rdd =>
       rdd.saveAsObjectFile("/mysqlbinlog")
)


这样在emr里运行时,就会把spark streaming中流出来的数据,保存到emr的hdfs中。

注意

  1. 由于如果要在本地运行起来,请在本地环境提前搭建hadoop集群。
  2. 由于emr的spark sdk做了升级,他的example code有点旧,不能直接在参数中传递oss的accessKeyId、accessKeySecret 而是需要通过sparkConf设置进来。

trait RunLocally {
  val conf = new SparkConf().setAppName(getAppName).setMaster("local[4]")
  conf.set("spark.hadoop.fs.oss.impl", "com.aliyun.fs.oss.nat.NativeOssFileSystem")
  conf.set("spark.hadoop.mapreduce.job.run-local", "true")
  conf.set("spark.hadoop.fs.oss.endpoint", "YourEndpoint")
  conf.set("spark.hadoop.fs.oss.accessKeyId", "YourId")
  conf.set("spark.hadoop.fs.oss.accessKeySecret", "YourSecret")
  conf.set("spark.hadoop.job.runlocal", "true")
  conf.set("spark.hadoop.fs.oss.impl", "com.aliyun.fs.oss.nat.NativeOssFileSystem")
  conf.set("spark.hadoop.fs.oss.buffer.dirs", "/mnt/disk1")


  val sc = new SparkContext(conf)

  def getAppName: String
}

  1. 在本地调试时,需要把


loghubStream.foreachRDD(rdd =>
rdd.saveAsObjectFile("/mysqlbinlog")
)

中的/mysqlbinlog修改成本地hdfs的地址

  • 代码编译

在本地调试完成后,我们可以通过如下命令进行打包编译:

mvn clean install

  • 上传jar包

通过oss控制台或oss的sdk将target/shaded目录下的examples-1.1-shaded.jar上传到oss上

我在oss上建立了bucket为qiaozhou-emr/jar的目录,这个上传后的jar包地址为oss://qiaozhou-emr/jar/examples-1.1-shaded.jar

注意,这个地址在后续会用上,如下图:

4e0bc88da76dab1f1660ead1e3f0fa233460da8f


搭建emr集群,创建任务,运行执行计划,查看结果

  1. 首先我们需要通过emr控制台创建一个emr集群,这个过程比较漫长,需要10分钟左右,请耐心等待
  2. 创建类型为spark的作业
    --master yarn --deploy-mode client --driver-memory 4g --executor-memory 2g --executor-cores 2 --class com.aliyun.emr.example.LoghubSample ossref://emr-test/jar/examples-1.1-shaded.jar canaltest canal sparkstreaming $sls_endpoint $sls_access_id $sls_secret_key 1
    
    

请根据你的具体配置将

slsendpointslsendpointsls_access_id $sls_secret_key替换成真实值
注意参数的顺序,否则会报错。
3. 创建执行计划,将作业和emr集群绑定后,开始运行
4. 找到master节点的ip,如图:
Master.png

通过ssh登录上去后,执行命令:

hadoop fs -ls /


就会看到mysqlbinlog开头的目录,再通过命令:

hadoop fs -ls /mysqlbinlog


查看mysqlbinlog文件,如图:
hdfs1.png

最后还可以通过命令:

hadoop fs -cat /mysqlbinlog/part-00000


如图:

查看文件内容,是不是很神奇呀?

错误排查

如果没有出来正常的结果,可以通过emr的运行记录,来进行问题排查,如图:
errortrace.png

相关实践学习
基于CentOS快速搭建LAMP环境
本教程介绍如何搭建LAMP环境,其中LAMP分别代表Linux、Apache、MySQL和PHP。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
7月前
|
运维
SAP ABAP 系统记录传输请求处理过程的日志存放目录
SAP ABAP 系统记录传输请求处理过程的日志存放目录
62 0
|
分布式计算 监控 关系型数据库
基于Spark Streaming 进行 MySQL Binlog 日志准实时传输
基本架构 RDS -> SLS -> Spark Streaming -> Spark HDFS 上述链路主要包含3个过程: 如何把 RDS 的 binlog 收集到 SLS。 如何通过 Spark Streaming 将 SLS 中的日志读取出来,进行分析。
10897 0
|
13天前
|
SQL 关系型数据库 MySQL
MySQL技能完整学习列表11、日志和备份——1、查看日志——2、数据备份和恢复(mysqldump, mysqlbinlog)
MySQL技能完整学习列表11、日志和备份——1、查看日志——2、数据备份和恢复(mysqldump, mysqlbinlog)
56 0
|
10月前
|
消息中间件 存储 Kafka
MQ 学习日志(六) 保证消息的可靠性传输
消息的可靠性传输 简述
79 0
|
12月前
|
Oracle 关系型数据库 数据库
使用日志传输的方法在两个数据库之间同步数据
源 oracle18:oracle18c-standby 192.168.17.26 目标 oracle18-2:oracle18c-primary 192.168.17.109
115 0
|
12月前
处理Dataguard日志传输gap一例
制造gap 在主库上停止向备库传输日志
|
SQL 分布式计算 监控
阿里云EMR自定义日志投递与使用实践分享
EMR目前支持了日志管理,即日志客户SLS投递的功能,基于此功能,客户可以将需要的各种大数据组件日志收集到自身SLS中,做查询和分析。基于此功能,客户可以自定义日志路径、规则,对集群设备上的日志自行接收和消费。本文以采集指标文件为例,帮助您快速上手自定义日志投递与使用。
355 0
阿里云EMR自定义日志投递与使用实践分享
|
Go 数据库
|
分布式计算 监控 NoSQL
海量监控日志基于EMR Spark Streaming SQL进行实时聚合
从EMR-3.21.0 版本开始将提供Spark Streaming SQL的预览版功能,支持使用SQL来开发流式分析作业。结果数据可以实时写入Tablestore。 本文以LogHub为数据源,收集ECS上的日志数据,通过Spark Streaming SQL进行聚合后,将流计算结果数据实时写入Tablestore,展示一个简单的日志监控场景。

热门文章

最新文章