基本架构
RDS -> SLS -> Spark Streaming -> Spark HDFS
上述链路主要包含3个过程:
- 如何把 RDS 的 binlog 收集到 SLS。
- 如何通过 Spark Streaming 将 SLS 中的日志读取出来,进行分析。
- 如何把链路 2 中读取和处理过的日志,保存到 Spark HDFS中。
环境准备
- 安装一个 MySQL 类型的数据库(使用 MySQL 协议,例如 RDS、DRDS 等),开启 log-bin 功能,且配置 binlog 类型为 ROW 模式(RDS默认开启)。
- 开通 SLS 服务。
操作步骤
-
检查 MySQL 数据库环境。
- 查看是否开启 log-bin 功能。
mysql> show variables like "log_bin";
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON |
+---------------+-------+
1 row in set (0.02 sec)
- 查看 binlog 类型
mysql> show variables like "binlog_format";
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW |
+---------------+-------+
1 row in set (0.03 sec)
-
添加用户权限。(也可以直接通过RDS控制台添加)
CREATE USER canal IDENTIFIED BY ‘canal’;GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO ‘canal’@’%’;FLUSH PRIVILEGES;
-
为 SLS 服务添加对应的配置文件,并检查数据是否正常采集。
- 在 SLS 控制台添加对应的 project 和 logstore,例如:创建一个名称为 canaltest 的 project,然后创建一个名称为 canal 的 logstore。
- 对 SLS 进行配置:在 /etc/ilogtail 目录下创建文件user_local_config.json,具体配置如下:
{
"metrics": {
"##1.0##canaltest$plugin-local": {
"aliuid": "****",
"enable": true,
"category": "canal",
"defaultEndpoint": "*******",
"project_name": "canaltest",
"region": "cn-hangzhou",
"version": 2
"log_type": "plugin",
"plugin": {
"inputs": [
{
"type": "service_canal",
"detail": {
"Host": "*****",
"Password": "****",
"ServerID": ****,
"User" : "***",
"DataBases": [
"yourdb"
],
"IgnoreTables": [
"\\S+_inner"
],
"TextToString" : true
}
}
],
"flushers": [
{
"type": "flusher_sls",
"detail": {}
}
]
}
}
}
}
其中 detail 中的 Host 和 Password 等信息为 MySQL 数据库信息,User 信息为之前授权过的用户名。aliUid、defaultEndpoint、project_name、category 请根据自己的实际情况填写对应的用户和 SLS 信息。
- 等待约 2 分钟,通过 SLS 控制台查看日志数据是否上传成功,具体如图所示。
如果日志数据没有采集成功,请根据SLS的提示,查看SLS的采集日志进行排查。
-
准备代码,将代码编译成 jar 包,然后上传到 OSS。
- 将 EMR 的示例代码通过 git 复制下来,然后进行修改,具体命令为:
git clone https://github.com/aliyun/aliyun-emapreduce-demo.git。
示例代码中已经有 LoghubSample 类,该类主要用于从 SLS 采集数据并打印。以下是修改后的代码,供参考:
package com.aliyun.emr.example
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.aliyun.logservice.LoghubUtils
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
object LoghubSample {
def main(args: Array[String]): Unit = {
if (args.length < 7) {
System.err.println(
"""Usage: bin/spark-submit --class LoghubSample examples-1.0-SNAPSHOT-shaded.jar
|
|
""".stripMargin)
System.exit(1)
}
val loghubProject = args(0)
val logStore = args(1)
val loghubGroupName = args(2)
val endpoint = args(3)
val accessKeyId = args(4)
val accessKeySecret = args(5)
val batchInterval = Milliseconds(args(6).toInt * 1000)
val conf = new SparkConf().setAppName("Mysql Sync")
// conf.setMaster("local[4]");
val ssc = new StreamingContext(conf, batchInterval)
val loghubStream = LoghubUtils.createStream(
ssc,
loghubProject,
logStore,
loghubGroupName,
endpoint,
1,
accessKeyId,
accessKeySecret,
StorageLevel.MEMORY_AND_DISK)
loghubStream.foreachRDD(rdd =>
rdd.saveAsTextFile("/mysqlbinlog")
)
ssc.start()
ssc.awaitTermination()
}
}
其中的主要改动是:
loghubStream.foreachRDD(rdd => rdd.saveAsObjectFile(“/mysqlbinlog”) )
这样在 EMR 集群中运行时,就会把Spark Streaming 中流出来的数据,保存到 EMR 的 HDFS 中。
- 说明
由于如果要在本地运行,请在本地环境提前搭建 Hadoop 集群。
由于 EMR 的 Spark SDK 做了升级,其示例代码比较旧,不能直接在参数中传递 OSS 的 AccessKeyId、AccessKeySecret, 而是需要通过 SparkConf 进行设置,如下所示。
trait RunLocally {
val conf = new SparkConf().setAppName(getAppName).setMaster("local[4]")
conf.set("spark.hadoop.fs.oss.impl", "com.aliyun.fs.oss.nat.NativeOssFileSystem")
conf.set("spark.hadoop.mapreduce.job.run-local", "true")
conf.set("spark.hadoop.fs.oss.endpoint", "YourEndpoint")
conf.set("spark.hadoop.fs.oss.accessKeyId", "YourId")
conf.set("spark.hadoop.fs.oss.accessKeySecret", "YourSecret")
conf.set("spark.hadoop.job.runlocal", "true")
conf.set("spark.hadoop.fs.oss.impl", "com.aliyun.fs.oss.nat.NativeOssFileSystem")
conf.set("spark.hadoop.fs.oss.buffer.dirs", "/mnt/disk1")
val sc = new SparkContext(conf)
def getAppName: String
}
在本地调试时,需要把 loghubStream.foreachRDD(rdd => rdd.saveAsObjectFile(“/mysqlbinlog”) ) 中的 /mysqlbinlog 修改成本地 HDFS的地址。
代码编译。
在本地调试完成后,我们可以通过如下命令进行打包编译:
clean install
- 上传 jar 包。
请先在 OSS 上建立 bucket 为 qiaozhou-EMR/jar的目录,然后通过OSS 控制台或 OSS 的 SDK 将 /target/shaded目录下的 examples-1.1-shaded.jar上传到 OSS 的这个目录下。上传后的 jar 包地址为 oss://qiaozhou-EMR/jar/examples-1.1-shaded.jar,这个地址在后面会用上,如下图所示:

-
搭建 EMR 集群,创建任务并运行执行计划。
- 通过 EMR 控制台创建一个 EMR 集群,大约需要 10 分钟左右,请耐心等待。
- 创建一个类型为 Spark 的作业。
请根据您具体的配置将 SLS_endpoint $SLS_access_id $SLS_secret_key
替换成真实值。请注意参数的顺序,否则可能会报错。
—master yarn —deploy-mode client —driver-memory 4g —executor-memory 2g —executor-cores 2 —class com.aliyun.EMR.example.LoghubSample ossref://EMR-test/jar/examples-1.1-shaded.jar canaltest canal sparkstreaming $SLS_endpoint $SLS_access_id $SLS_secret_key 1
运行以上的命令
- 查询 Master 节点的IP
通过 SSH 登录后,执行以下命令:
fs -ls /
可以看到 mysqlbinlog 开头的目录,再通过以下命令查看 mysqlbinlog 文件:
fs -ls /mysqlbinlog
还可以通过 hadoop fs -cat /mysqlbinlog/part-00000 命令查看文件内容。
- 错误排查。
如果没有看到正常的结果,可以登陆节点,查看对应的作业的错误情况。