基本架构
RDS -> SLS -> Spark Streaming -> Spark HDFS
上述链路主要包含3个过程:
- 如何把 RDS 的 binlog 收集到 SLS。
- 如何通过 Spark Streaming 将 SLS 中的日志读取出来,进行分析。
- 如何把链路 2 中读取和处理过的日志,保存到 Spark HDFS中。
环境准备
- 安装一个 MySQL 类型的数据库(使用 MySQL 协议,例如 RDS、DRDS 等),开启 log-bin 功能,且配置 binlog 类型为 ROW 模式(RDS默认开启)。
- 开通 SLS 服务。
操作步骤
-
检查 MySQL 数据库环境。
- 查看是否开启 log-bin 功能。
mysql> show variables like "log_bin"; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | log_bin | ON | +---------------+-------+ 1 row in set (0.02 sec)
- 查看 binlog 类型
mysql> show variables like "binlog_format"; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | binlog_format | ROW | +---------------+-------+ 1 row in set (0.03 sec)
-
添加用户权限。(也可以直接通过RDS控制台添加)
CREATE USER canal IDENTIFIED BY ‘canal’;GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO ‘canal’@’%’;FLUSH PRIVILEGES;
-
为 SLS 服务添加对应的配置文件,并检查数据是否正常采集。
- 在 SLS 控制台添加对应的 project 和 logstore,例如:创建一个名称为 canaltest 的 project,然后创建一个名称为 canal 的 logstore。
- 对 SLS 进行配置:在 /etc/ilogtail 目录下创建文件user_local_config.json,具体配置如下:
{ "metrics": { "##1.0##canaltest$plugin-local": { "aliuid": "****", "enable": true, "category": "canal", "defaultEndpoint": "*******", "project_name": "canaltest", "region": "cn-hangzhou", "version": 2 "log_type": "plugin", "plugin": { "inputs": [ { "type": "service_canal", "detail": { "Host": "*****", "Password": "****", "ServerID": ****, "User" : "***", "DataBases": [ "yourdb" ], "IgnoreTables": [ "\\S+_inner" ], "TextToString" : true } } ], "flushers": [ { "type": "flusher_sls", "detail": {} } ] } } } }
其中 detail 中的 Host 和 Password 等信息为 MySQL 数据库信息,User 信息为之前授权过的用户名。aliUid、defaultEndpoint、project_name、category 请根据自己的实际情况填写对应的用户和 SLS 信息。
- 等待约 2 分钟,通过 SLS 控制台查看日志数据是否上传成功,具体如图所示。
如果日志数据没有采集成功,请根据SLS的提示,查看SLS的采集日志进行排查。
-
准备代码,将代码编译成 jar 包,然后上传到 OSS。
- 将 EMR 的示例代码通过 git 复制下来,然后进行修改,具体命令为:
git clone https://github.com/aliyun/aliyun-emapreduce-demo.git。
示例代码中已经有 LoghubSample 类,该类主要用于从 SLS 采集数据并打印。以下是修改后的代码,供参考:
package com.aliyun.emr.example import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.aliyun.logservice.LoghubUtils import org.apache.spark.streaming.{Milliseconds, StreamingContext} object LoghubSample { def main(args: Array[String]): Unit = { if (args.length < 7) { System.err.println( """Usage: bin/spark-submit --class LoghubSample examples-1.0-SNAPSHOT-shaded.jar | | """.stripMargin) System.exit(1) } val loghubProject = args(0) val logStore = args(1) val loghubGroupName = args(2) val endpoint = args(3) val accessKeyId = args(4) val accessKeySecret = args(5) val batchInterval = Milliseconds(args(6).toInt * 1000) val conf = new SparkConf().setAppName("Mysql Sync") // conf.setMaster("local[4]"); val ssc = new StreamingContext(conf, batchInterval) val loghubStream = LoghubUtils.createStream( ssc, loghubProject, logStore, loghubGroupName, endpoint, 1, accessKeyId, accessKeySecret, StorageLevel.MEMORY_AND_DISK) loghubStream.foreachRDD(rdd => rdd.saveAsTextFile("/mysqlbinlog") ) ssc.start() ssc.awaitTermination() } }
其中的主要改动是:
loghubStream.foreachRDD(rdd => rdd.saveAsObjectFile(“/mysqlbinlog”) )
这样在 EMR 集群中运行时,就会把Spark Streaming 中流出来的数据,保存到 EMR 的 HDFS 中。
- 说明
由于如果要在本地运行,请在本地环境提前搭建 Hadoop 集群。
由于 EMR 的 Spark SDK 做了升级,其示例代码比较旧,不能直接在参数中传递 OSS 的 AccessKeyId、AccessKeySecret, 而是需要通过 SparkConf 进行设置,如下所示。
trait RunLocally {
val conf = new SparkConf().setAppName(getAppName).setMaster("local[4]")
conf.set("spark.hadoop.fs.oss.impl", "com.aliyun.fs.oss.nat.NativeOssFileSystem")
conf.set("spark.hadoop.mapreduce.job.run-local", "true")
conf.set("spark.hadoop.fs.oss.endpoint", "YourEndpoint")
conf.set("spark.hadoop.fs.oss.accessKeyId", "YourId")
conf.set("spark.hadoop.fs.oss.accessKeySecret", "YourSecret")
conf.set("spark.hadoop.job.runlocal", "true")
conf.set("spark.hadoop.fs.oss.impl", "com.aliyun.fs.oss.nat.NativeOssFileSystem")
conf.set("spark.hadoop.fs.oss.buffer.dirs", "/mnt/disk1")
val sc = new SparkContext(conf)
def getAppName: String
}
在本地调试时,需要把 loghubStream.foreachRDD(rdd => rdd.saveAsObjectFile(“/mysqlbinlog”) ) 中的 /mysqlbinlog 修改成本地 HDFS的地址。
代码编译。
在本地调试完成后,我们可以通过如下命令进行打包编译:clean install
- 上传 jar 包。
请先在 OSS 上建立 bucket 为 qiaozhou-EMR/jar的目录,然后通过OSS 控制台或 OSS 的 SDK 将 /target/shaded目录下的 examples-1.1-shaded.jar上传到 OSS 的这个目录下。上传后的 jar 包地址为 oss://qiaozhou-EMR/jar/examples-1.1-shaded.jar,这个地址在后面会用上,如下图所示:
-
搭建 EMR 集群,创建任务并运行执行计划。
- 通过 EMR 控制台创建一个 EMR 集群,大约需要 10 分钟左右,请耐心等待。
- 创建一个类型为 Spark 的作业。
请根据您具体的配置将SLS_endpoint $SLS_access_id $SLS_secret_key
替换成真实值。请注意参数的顺序,否则可能会报错。
—master yarn —deploy-mode client —driver-memory 4g —executor-memory 2g —executor-cores 2 —class com.aliyun.EMR.example.LoghubSample ossref://EMR-test/jar/examples-1.1-shaded.jar canaltest canal sparkstreaming $SLS_endpoint $SLS_access_id $SLS_secret_key 1
运行以上的命令
- 查询 Master 节点的IP
通过 SSH 登录后,执行以下命令:
fs -ls /
可以看到 mysqlbinlog 开头的目录,再通过以下命令查看 mysqlbinlog 文件:
fs -ls /mysqlbinlog
还可以通过 hadoop fs -cat /mysqlbinlog/part-00000 命令查看文件内容。
- 错误排查。
如果没有看到正常的结果,可以登陆节点,查看对应的作业的错误情况。