开发者社区> qiaozhou> 正文

使用EMR来进行mysqlbinlog日志准实时传输

简介: 如何利用阿里云的sls插件功能和emr来进行mysql binlog的准实时传输
+关注继续查看

简介

本文将介绍如何利用阿里云的sls插件功能和emr来进行mysql binlog的准实时传输

基本架构

rds -> sls -> spark streaming -> spark hdfs

主要包含3个链路:

1. 怎么把rds的binlog收集到sls

2.怎么通过spark streaming将sls中的日志读取出来,进行分析

3.怎么把2中读取和处理过的日志,保存到spark hdfs中

环境准备

需要mysql类型数据库、开通sls服务并添加对应的project和logstore、然后创建一个emr集群。

具体步骤

准备好mysql数据库环境,添加用户权限

首先需要一个mysql类型数据库(使用mysql协议,例如RDS、DRDS等),
数据库开启binlog,且配置binlog类型为ROW模式(RDS默认开启)。

  1. 先检查下环境:

  ### 查看是否开启binlog
  mysql> show variables like "log_bin";
  +---------------+-------+
  | Variable_name | Value |
  +---------------+-------+
  | log_bin       | ON    |
  +---------------+-------+
  1 row in set (0.02 sec)
  ### 查看binlog类型
  mysql> show variables like "binlog_format";
  +---------------+-------+
  | Variable_name | Value |
  +---------------+-------+
  | binlog_format | ROW   |
  +---------------+-------+
  1 row in set (0.03 sec)

  2. 添加权限,当然也可以直接通过rds控制台添加:

  CREATE USER canal IDENTIFIED BY 'canal';
  GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
  FLUSH PRIVILEGES;

开通sls,添加对应的配置文件,并检查数据是否正常采集

需要开通sls,并在sls控制台添加对应的project和logstore,例如:我创建的project叫canaltest,logstore叫canal.
然后对sls进行配置,配置文件目录在:/etc/ilogtail下创建文件:user_local_config.json,具体配置文件如下:


 {
      "metrics": {
          "##1.0##canaltest$plugin-local": {
              "aliuid": "****",
              "enable": true,
              "category": "canal",
              "defaultEndpoint": "*******",
              "project_name": "canaltest",
              "region": "cn-hangzhou",
              "version": 2
              "log_type": "plugin",
              "plugin": {
                  "inputs": [
                      {
                          "type": "service_canal",
                          "detail": {
                              "Host": "*****",
                              "Password": "****",
                              "ServerID": ****,
                              "User" : "***",
                              "DataBases": [
                                  "yourdb"
                              ],
                              "IgnoreTables": [
                                  "\\S+_inner"
                              ],
                               "TextToString" : true
                          }
                      }
                  ],
                  "flushers": [
                      {
                          "type": "flusher_sls",
                          "detail": {}
                      }
                  ]
              }
          }
      }
  }

其中detail中的Host和Password等信息为MySQL数据库信息,User请用之前授权过的用户名。
aliUid、defaultEndpoint、project_name、category请根据自己的实际情况填写对应的用户和sls信息。
配置好后,我们过2分钟左右,通过sls控制台,看下数据有没有上来,具体如图:
sls日志.png
如果没有正常采集上来,请根据sls的提示,查看sls的采集日志进行排查。

准备好代码,编译成jar包,上传到oss

  • 代码准备

我们可以将emr的example代码通过git复制下来,修改下,具体命令如下:

git clone https://github.com/aliyun/aliyun-emapreduce-demo.git

example代码中已经有LoghubSample这个类,主要用于从sls采集数据,然后直接打印出来。
我们只要稍做修改,就可以达到我们的目的了。
以下是我修改后的代码:


package com.aliyun.emr.example

  import org.apache.spark.SparkConf
  import org.apache.spark.storage.StorageLevel
  import org.apache.spark.streaming.aliyun.logservice.LoghubUtils
  import org.apache.spark.streaming.{Milliseconds, StreamingContext}

  object LoghubSample {
    def main(args: Array[String]): Unit = {
      if (args.length < 7) {
        System.err.println(
          """Usage: bin/spark-submit --class LoghubSample examples-1.0-SNAPSHOT-shaded.jar
            |            
            |           
          """.stripMargin)
        System.exit(1)
      }

      val loghubProject = args(0)
      val logStore = args(1)
      val loghubGroupName = args(2)
      val endpoint = args(3)
      val accessKeyId = args(4)
      val accessKeySecret = args(5)
      val batchInterval = Milliseconds(args(6).toInt * 1000)

      val conf = new SparkConf().setAppName("Mysql Sync")
  //    conf.setMaster("local[4]");
      val ssc = new StreamingContext(conf, batchInterval)
      val loghubStream = LoghubUtils.createStream(
        ssc,
        loghubProject,
        logStore,
        loghubGroupName,
        endpoint,
        1,
        accessKeyId,
        accessKeySecret,
        StorageLevel.MEMORY_AND_DISK)

      loghubStream.foreachRDD(rdd =>
          rdd.saveAsTextFile("/mysqlbinlog")
      )

      ssc.start()
      ssc.awaitTermination()
    }
  }

其中主要改动是:



loghubStream.foreachRDD(rdd =>
       rdd.saveAsObjectFile("/mysqlbinlog")
)


这样在emr里运行时,就会把spark streaming中流出来的数据,保存到emr的hdfs中。

注意

  1. 由于如果要在本地运行起来,请在本地环境提前搭建hadoop集群。
  2. 由于emr的spark sdk做了升级,他的example code有点旧,不能直接在参数中传递oss的accessKeyId、accessKeySecret 而是需要通过sparkConf设置进来。

trait RunLocally {
  val conf = new SparkConf().setAppName(getAppName).setMaster("local[4]")
  conf.set("spark.hadoop.fs.oss.impl", "com.aliyun.fs.oss.nat.NativeOssFileSystem")
  conf.set("spark.hadoop.mapreduce.job.run-local", "true")
  conf.set("spark.hadoop.fs.oss.endpoint", "YourEndpoint")
  conf.set("spark.hadoop.fs.oss.accessKeyId", "YourId")
  conf.set("spark.hadoop.fs.oss.accessKeySecret", "YourSecret")
  conf.set("spark.hadoop.job.runlocal", "true")
  conf.set("spark.hadoop.fs.oss.impl", "com.aliyun.fs.oss.nat.NativeOssFileSystem")
  conf.set("spark.hadoop.fs.oss.buffer.dirs", "/mnt/disk1")


  val sc = new SparkContext(conf)

  def getAppName: String
}

  1. 在本地调试时,需要把


loghubStream.foreachRDD(rdd =>
rdd.saveAsObjectFile("/mysqlbinlog")
)

中的/mysqlbinlog修改成本地hdfs的地址

  • 代码编译

在本地调试完成后,我们可以通过如下命令进行打包编译:

mvn clean install

  • 上传jar包

通过oss控制台或oss的sdk将target/shaded目录下的examples-1.1-shaded.jar上传到oss上

我在oss上建立了bucket为qiaozhou-emr/jar的目录,这个上传后的jar包地址为oss://qiaozhou-emr/jar/examples-1.1-shaded.jar

注意,这个地址在后续会用上,如下图:

4e0bc88da76dab1f1660ead1e3f0fa233460da8f


搭建emr集群,创建任务,运行执行计划,查看结果

  1. 首先我们需要通过emr控制台创建一个emr集群,这个过程比较漫长,需要10分钟左右,请耐心等待
  2. 创建类型为spark的作业
    --master yarn --deploy-mode client --driver-memory 4g --executor-memory 2g --executor-cores 2 --class com.aliyun.emr.example.LoghubSample ossref://emr-test/jar/examples-1.1-shaded.jar canaltest canal sparkstreaming $sls_endpoint $sls_access_id $sls_secret_key 1
    
    

请根据你的具体配置将

slsendpointslsendpointsls_access_id $sls_secret_key替换成真实值
注意参数的顺序,否则会报错。
3. 创建执行计划,将作业和emr集群绑定后,开始运行
4. 找到master节点的ip,如图:
Master.png

通过ssh登录上去后,执行命令:

hadoop fs -ls /


就会看到mysqlbinlog开头的目录,再通过命令:

hadoop fs -ls /mysqlbinlog


查看mysqlbinlog文件,如图:
hdfs1.png

最后还可以通过命令:

hadoop fs -cat /mysqlbinlog/part-00000


如图:

查看文件内容,是不是很神奇呀?

错误排查

如果没有出来正常的结果,可以通过emr的运行记录,来进行问题排查,如图:
errortrace.png

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
大数据数据采集的数据迁移(同步/传输)的Sqoop之基本命令和使用的help
在大数据领域,数据迁移(同步/传输)也是非常重要的一环。Sqoop作为一个开源的数据迁移工具,可以帮助我们轻松地实现关系型数据库与Hadoop之间的数据迁移。本文将会对Sqoop的基本命令和使用进行详细介绍。
15 0
基于 Flink CDC 打通数据实时入湖
基于 Flink CDC 打通数据实时入湖
526 0
StarRocks X Flink CDC,打造端到端实时链路
StarRocks X Flink CDC,打造端到端实时链路
99 0
服务器日志快速入湖实践
阿里云提供的企业级数据湖解决方案,存储层基于阿里云对象存储 OSS 构建,本文主要介绍服务器日志如何快速入湖
234 0
数据湖实操讲解【数据迁移】第二讲:数据无忧 - 利用 checksum 迁移 HDFS 数据到 OSS
数据湖 JindoFS+OSS 实操干货 36讲 每周二16点准时直播!扫文章底部二维码入钉群,线上准时观看~ Github链接: https://github.com/aliyun/alibabacloud-jindofs/blob/master/docs/jindo_distcp/jindo_distcp_overview.md
464 0
基于实时ETL的日志存储与分析实践
我们正处于大数据、多样化数据(非结构化)的时代,实时的机器数据快速产生,做一家数据公司的核心之一是如何充分利用好大量日志数据。本文将为大家介绍在 SLS 上兼顾日志数据灵活性、经济性的存储策略与实践。
1957 0
DataWorks实时同步/实时ETL/批同步ETL灰度邀测中
DataWorks实时同步功能可以支持多种实时数据源(Kafka、MySQL Binlog,Oracle CDC等),可以将实时消息数据经过一些列处理后再写入目的数据源。同时在此前DataWorks数据集成强大EL(Extract-Load)能力基础之上,增加了数据处理能力(Transform),实现了完整了ETL链路。
6259 0
海量监控日志基于EMR Spark Streaming SQL进行实时聚合
从EMR-3.21.0 版本开始将提供Spark Streaming SQL的预览版功能,支持使用SQL来开发流式分析作业。结果数据可以实时写入Tablestore。 本文以LogHub为数据源,收集ECS上的日志数据,通过Spark Streaming SQL进行聚合后,将流计算结果数据实时写入Tablestore,展示一个简单的日志监控场景。
1370 0
基于Spark Streaming 进行 MySQL Binlog 日志准实时传输
基本架构 RDS -> SLS -> Spark Streaming -> Spark HDFS 上述链路主要包含3个过程: 如何把 RDS 的 binlog 收集到 SLS。 如何通过 Spark Streaming 将 SLS 中的日志读取出来,进行分析。
10589 0
大数据同步利器: 表格存储全增量一体消费通道
本文会给大家详细介绍表格存储重磅推出的一项新功能--全增量一体数据通道。文章会为大家阐述数据通道的主要使用场景,表格存储数据通道的功能优势,并带大家快速入门如何使用数据通道来消费数据。本文会给大家详细介绍表格存储重磅推出的一项新功能--全增量一体数据通道。
3972 0
+关注
qiaozhou
文章
问答
视频
相关课程
更多
相关电子书
更多
实时即未来
立即下载
低代码开发师(初级)实战教程
立即下载
阿里巴巴DevOps 最佳实践手册
立即下载