使用Apache Spark从MySQL到Kafka再到HDFS的数据转移
在本文中,将介绍如何构建一个实时数据pipeline,从MySQL数据库读取数据,通过Kafka传输数据,最终将数据存储到HDFS中。我们将使用Apache Spark的结构化流处理和流处理功能,以及Kafka和HDFS作为我们的数据传输和存储工具。
1、环境设置:
首先,确保在您的环境中正确安装并配置了mysql、Kafka和HDFS。同时需要在idea中构建依赖配置的pom文件:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>spark_project</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <scala.version>2.12.12</scala.version> <spark.version>3.2.0</spark.version> <kafka.version>2.8.1</kafka.version> </properties> <dependencies> <!-- Spark dependencies --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.76</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.12</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.12</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.12</artifactId> <version>${spark.version}</version> </dependency> <!-- Kafka dependencies --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.28</version> </dependency> <!-- Scala library --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> </dependencies> </project>
mysql中表结构:
2、从MySQL读取数据到Kafka:
我们将使用Spark的结构化流处理功能从MySQL数据库中读取数据,并将其转换为JSON格式,然后将数据写入到Kafka主题中。以下是相应的Scala代码:
package org.example.mysql2kafka2hdfs import org.apache.spark.sql.SparkSession import java.util.Properties object Mysql2Kafka { def main(args: Array[String]): Unit = { // 创建 SparkSession val spark = SparkSession.builder() .appName("MySQLToKafka") .master("local[*]") .getOrCreate() // 设置 MySQL 连接属性 val mysqlProps = new Properties() mysqlProps.setProperty("user", "root") mysqlProps.setProperty("password", "12345678") mysqlProps.setProperty("driver", "com.mysql.jdbc.Driver") // 从 MySQL 数据库中读取数据 val jdbcDF = spark.read.jdbc("jdbc:mysql://localhost:3306/mydb", "comment", mysqlProps) // 将 DataFrame 转换为 JSON 字符串 val jsonDF = jdbcDF.selectExpr("to_json(struct(*)) AS value") // 将数据写入 Kafka jsonDF.show() jsonDF .write .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("topic", "comment") .save() // 停止 SparkSession spark.stop() } }
以上代码首先创建了一个SparkSession,然后设置了连接MySQL所需的属性。接着,它使用jdbc.read从MySQL数据库中读取数据,并将数据转换为JSON格式,最后将数据写入到名为"comment"的Kafka主题中。提示:topic主题会被自动创建。
从Kafka消费数据并写入HDFS:
接下来,我们将设置Spark Streaming来消费Kafka中的数据,并将数据保存到HDFS中。以下是相应的Scala代码:
package org.example.mysql2kafka2hdfs import com.alibaba.fastjson.JSON import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext} case class Comment(author_name:String, fans:String, comment_text:String, comment_time:String, location:String, user_gender:String) object kafka2Hdfs { def main(args: Array[String]): Unit = { // 设置 SparkConf val sparkConf = new SparkConf() .setAppName("KafkaToHDFS") .setMaster("local[*]") // 创建 StreamingContext,每秒处理一次 val ssc = new StreamingContext(sparkConf, Seconds(1)) // 设置 Kafka 相关参数 val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "localhost:9092", // Kafka broker 地址 "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "spark-consumer-group", // Spark 消费者组 "auto.offset.reset" -> "earliest", // 从最新的偏移量开始消费 "enable.auto.commit" -> (false: java.lang.Boolean) // 不自动提交偏移量 ) // 设置要订阅的 Kafka 主题 val topics = Array("comment") // 创建 Kafka Direct Stream val stream = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaParams) ) // 从 Kafka 中读取消息,然后将其写入 HDFS stream.map({rdd=> val comment = JSON.parseObject(rdd.toString(), classOf[Comment]) comment.author_name+","+comment.comment_text+","+comment.comment_time+","+comment.fans+","+comment.location+","+comment.user_gender }).foreachRDD { rdd => if (!rdd.isEmpty()) { println(rdd) rdd.saveAsTextFile("hdfs://hadoop101:8020/tmp/") } } // 启动 Spark Streaming ssc.start() ssc.awaitTermination() } }
以上代码设置了Spark Streaming来消费Kafka中的数据。它将JSON格式的数据解析为Comment类对象,并将其保存为逗号分隔的文本文件,最终存储在HDFS的/tmp目录中。
结论:
通过本文的介绍和示例代码,您现在应该了解如何使用Apache Spark构建一个实时数据流水线,从MySQL数据库读取数据,通过Kafka传输数据,最终将数据保存到HDFS中。这个流水线可以应用于各种实时数据处理和分析场景中。