1. 导言:
Apache Flink是一款功能强大的流式处理引擎,可用于实时处理大规模数据。本文将介绍如何使用Flink与MySQL数据库进行交互,以清洗股票数据为例。
2. 环境准备:
首先,确保已安装Apache Flink并配置好MySQL数据库。导入相关依赖包,并创建必要的Table。同时需要提前创建好mysql表,一行source表,一张sink表。
CREATE TABLE `re_stock_code_price` ( `id` bigint NOT NULL AUTO_INCREMENT, `code` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '股票代码', `name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '股票名称', `close` double DEFAULT NULL COMMENT '最新价', `change_percent` double DEFAULT NULL COMMENT '涨跌幅', `change` double DEFAULT NULL COMMENT '涨跌额', `volume` double DEFAULT NULL COMMENT '成交量(手)', `amount` double DEFAULT NULL COMMENT '成交额', `amplitude` double DEFAULT NULL COMMENT '振幅', `turnover_rate` double DEFAULT NULL COMMENT '换手率', `peration` double DEFAULT NULL COMMENT '市盈率', `volume_rate` double DEFAULT NULL COMMENT '量比', `hign` double DEFAULT NULL COMMENT '最高', `low` double DEFAULT NULL COMMENT '最低', `open` double DEFAULT NULL COMMENT '今开', `previous_close` double DEFAULT NULL COMMENT '昨收', `pb` double DEFAULT NULL COMMENT '市净率', `create_time` varchar(64) NOT NULL COMMENT '写入时间', `rise` int NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=11207 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci CREATE TABLE `t_stock_code_price` ( `id` bigint NOT NULL AUTO_INCREMENT, `code` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '股票代码', `name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '股票名称', `close` double DEFAULT NULL COMMENT '最新价', `change_percent` double DEFAULT NULL COMMENT '涨跌幅', `change` double DEFAULT NULL COMMENT '涨跌额', `volume` double DEFAULT NULL COMMENT '成交量(手)', `amount` double DEFAULT NULL COMMENT '成交额', `amplitude` double DEFAULT NULL COMMENT '振幅', `turnover_rate` double DEFAULT NULL COMMENT '换手率', `peration` double DEFAULT NULL COMMENT '市盈率', `volume_rate` double DEFAULT NULL COMMENT '量比', `hign` double DEFAULT NULL COMMENT '最高', `low` double DEFAULT NULL COMMENT '最低', `open` double DEFAULT NULL COMMENT '今开', `previous_close` double DEFAULT NULL COMMENT '昨收', `pb` double DEFAULT NULL COMMENT '市净率', `create_time` varchar(64) NOT NULL COMMENT '写入时间', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=11207 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci
package org.east; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment; object TableETL { def main(args: Array[String]): Unit = { val senv = StreamExecutionEnvironment.getExecutionEnvironment .setRuntimeMode(RuntimeExecutionMode.STREAMING) val tEnv = StreamTableEnvironment.create(senv) // 定义源表 val source_table = """ CREATE TEMPORARY TABLE t_stock_code_price ( id BIGINT NOT NULL, code STRING NOT NULL, -- 其他字段... create_time STRING NOT NULL, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/mydb', 'driver' = 'com.mysql.cj.jdbc.Driver', 'table-name' = 't_stock_code_price', 'username' = 'root', 'password' = '12345678' ) """.stripMargin // 定义目标表 val sink_table = """ CREATE TEMPORARY TABLE re_stock_code_price ( id BIGINT NOT NULL, code STRING NOT NULL, -- 其他字段... create_time STRING NOT NULL, rise INT, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/mydb', 'driver' = 'com.mysql.cj.jdbc.Driver', 'table-name' = 're_stock_code_price', 'username' = 'root', 'password' = '12345678' ) """.stripMargin tEnv.executeSql(source_table) tEnv.executeSql(sink_table)
在这段代码中,我们首先创建了Flink的流式执行环境和StreamTableEnvironment。然后,我们定义了两个临时表,用于存储原始股票数据和清洗后的数据。
3. 数据清洗:
接下来,我们执行数据清洗操作,并将结果写入目标表。
// 执行清洗操作,并将结果写入目标表 tEnv.executeSql("INSERT INTO re_stock_code_price " + "SELECT *, CASE WHEN change_percent > 0 THEN 1 ELSE 0 END AS rise FROM t_stock_code_price")
在这里,我们计算了股票涨跌情况,并将结果写入到目标表中。在这个例子中,我们假设change_percent字段表示股票价格的变化百分比,rise字段为1表示股票上涨,为0表示股票下跌。
4. 结果展示:
最后,我们查询目标表并打印结果。
5. 完整代码:
下面是完整的代码:
package org.east; import org.apache.flink.api.common.RuntimeExecutionMode import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment object TableETL { def main(args: Array[String]): Unit = { val senv = StreamExecutionEnvironment.getExecutionEnvironment .setRuntimeMode(RuntimeExecutionMode.STREAMING) val tEnv = StreamTableEnvironment.create(senv) val source_table = """ |CREATE TEMPORARY TABLE t_stock_code_price ( | id BIGINT NOT NULL, | code STRING NOT NULL, | name STRING NOT NULL, | `close` DOUBLE, | change_percent DOUBLE, | change DOUBLE, | volume DOUBLE, | amount DOUBLE, | amplitude DOUBLE, | turnover_rate DOUBLE, | peration DOUBLE, | volume_rate DOUBLE, | hign DOUBLE, | low DOUBLE, | `open` DOUBLE, | previous_close DOUBLE, | pb DOUBLE, | create_time STRING NOT NULL, | PRIMARY KEY (id) NOT ENFORCED |) WITH ( | 'connector' = 'jdbc', | 'url' = 'jdbc:mysql://localhost:3306/mydb', | 'driver' = 'com.mysql.cj.jdbc.Driver', | 'table-name' = 't_stock_code_price', | 'username' = 'root', | 'password' = '12345678' |) |""".stripMargin val sink_table = """ |CREATE TEMPORARY TABLE re_stock_code_price ( | id BIGINT NOT NULL, | code STRING NOT NULL, | name STRING NOT NULL, | `close` DOUBLE, | change_percent DOUBLE, | change DOUBLE, | volume DOUBLE, | amount DOUBLE, | amplitude DOUBLE, | turnover_rate DOUBLE, | peration DOUBLE, | volume_rate DOUBLE, | hign DOUBLE, | low DOUBLE, | `open` DOUBLE, | previous_close DOUBLE, | pb DOUBLE, | create_time STRING NOT NULL, | rise int, | PRIMARY KEY (id) NOT ENFORCED |) WITH ( | 'connector' = 'jdbc', | 'url' = 'jdbc:mysql://localhost:3306/mydb', | 'driver' = 'com.mysql.cj.jdbc.Driver', | 'table-name' = 're_stock_code_price', | 'username' = 'root', | 'password' = '12345678' |) |""".stripMargin tEnv.executeSql(source_table) tEnv.executeSql(sink_table) tEnv.executeSql("insert into re_stock_code_price select *,case when change_percent>0 then 1 else 0 end as rise from t_stock_code_price") val user_DS = tEnv.executeSql("select * from re_stock_code_price") user_DS.print() } }