数据源:
userid,addres,age,username 001,guangzhou,20,alex 002,shenzhen,34,jack 003,beijing,23,lili
创建mysql数据表
create table person( userid varchar(20), addres varchar(20), age varchar(20), username varchar(20) );
insert into person(userid,addres,age,username) values('001','guangzhou','20','alex'); insert into person(userid,addres,age,username) values('002','shenzhen','34','jack'); insert into person(userid,addres,age,username) values('003','beijing','23','lili');
代码实现:
package com.kfk.spark.sql import com.kfk.spark.common.CommSparkSessionScala import org.apache.spark.sql.{DataFrame, SparkSession} import java.util.Properties /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/11 * @time : 2:11 下午 */ object JDBCSpark { def main(args: Array[String]): Unit = { val spark = CommSparkSessionScala.getSparkSession() // 方法一创建jdbc链接 val jdbcDF = spark.read .format("jdbc") .option("url", "jdbc:mysql://bigdata-pro-m04/spark") .option("dbtable", "person") .option("user", "root") .option("password", "199911") .load() jdbcDF.show() getData(spark) writeData(jdbcDF) } /** * 读取mysql中的数据 * @param spark */ def getData(spark : SparkSession): Unit ={ // 方法二创建jdbc链接 val connectionProperties = new Properties() connectionProperties.put("user", "root") connectionProperties.put("password", "199911") val jdbcDF2 = spark.read.jdbc("jdbc:mysql://bigdata-pro-m04/spark", "person", connectionProperties) jdbcDF2.show() } /** * 将数据写入到mysql中 * @param jdbcDF */ def writeData(jdbcDF : DataFrame): Unit ={ jdbcDF.write .format("jdbc") .option("url", "jdbc:mysql://bigdata-pro-m04/spark") .option("dbtable", "person_info") .option("user", "root") .option("password", "199911") .save() } }
运行结果:
查看写入的数据
mysql> select * from person_info; +--------+-----------+------+----------+ | userid | addres | age | username | +--------+-----------+------+----------+ | 001 | guangzhou | 20 | alex | | 002 | shenzhen | 34 | jack | | 003 | beijing | 23 | lili | +--------+-----------+------+----------+