spark 计算结果写入mysql 案例及常见问题解决

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS PostgreSQL,高可用系列 2核4GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: package com.jxdimport org.apache.spark.SparkContextimport org.apache.spark.SparkConfimport java.sql.

package com.jxd

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import java.sql.Connection
import java.sql.DriverManager
object hello {
def main(args: Array[String]): Unit = {
var conf = new SparkConf().setAppName("Hello World")
var sc = new SparkContext(conf)
var input = sc.textFile("test/hello", 2)
var count = input.flatMap(name => name.split(" ")).map((_, 1)).reduceByKey(((a, b) => a + b))
count.foreachPartition(insertToMysql)

}
def insertToMysql(iterator: Iterator[(String, Int)]): Unit = {
val driver = "com.mysql.jdbc.Driver"
val url = "jdbc:mysql://192.168.10.58:3306/test"
val username = "root"
val password = "1"
var connectionMqcrm: Connection = null
Class.forName(driver)
connectionMqcrm = DriverManager.getConnection(url, username, password)
val sql = "INSERT INTO t_spark (`name`,`num`) VALUES (?,?)"
iterator.foreach(data => {
val statement = connectionMqcrm.prepareStatement(sql)
statement.setString(1, data._1)
statement.setInt(2, data._2)
var result = statement.executeUpdate()
if (result == 1) {
println("写入mysql成功.............")
}
})
}
}

 

Caused by: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at com.jxd.hello$.insertToMysql(hello.scala:22)
at com.jxd.hello$$anonfun$main$1.apply(hello.scala:13)
at com.jxd.hello$$anonfun$main$1.apply(hello.scala:13)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

 

spark1.4以前版本 在/spark/jars 加入mysql驱动 并不起作用需在提交任务指定mysql驱动包

例如:

spark-submit --master spark://192.168.10.160:7077 --driver-class-path /usr/spark/jars/mysql-connector-java-5.1.18-bin.jar --class com.jxd.hello /usr/spark/wc.jar /usr/spark/test/hello

 

高版本如 spark2.2已经修复此问题

注意:集群中每一个server都得加入mysql驱动包(建议先加一个 然后采用远程复制)

集群所有节点加完mysql驱动包后直接提交即可

park-submit --master spark://192.168.10.160:7077 --class com.jxd.hello /usr/spark/wc.jar /usr/spark/test/hello

相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
5月前
|
关系型数据库 MySQL Linux
MySQL原理简介—6.简单的生产优化案例
本文介绍了数据库和存储系统的几个主题: 1. **MySQL日志的顺序写和数据文件的随机读指标**:解释了磁盘随机读和顺序写的原理及对数据库性能的影响。 2. **Linux存储系统软件层原理及IO调度优化原理**:解析了Linux存储系统的分层架构,包括VFS、Page Cache、IO调度等,并推荐使用deadline算法优化IO调度。 3. **数据库服务器使用的RAID存储架构**:介绍了RAID技术的基本概念及其如何通过多磁盘阵列提高存储容量和数据冗余性。 4. **数据库Too many connections故障定位**:分析了MySQL连接数限制问题的原因及解决方法。
165 23
|
3月前
|
关系型数据库 MySQL 大数据
大数据新视界--大数据大厂之MySQL 数据库课程设计:MySQL 数据库 SQL 语句调优的进阶策略与实际案例(2-2)
本文延续前篇,深入探讨 MySQL 数据库 SQL 语句调优进阶策略。包括优化索引使用,介绍多种索引类型及避免索引失效等;调整数据库参数,如缓冲池、连接数和日志参数;还有分区表、垂直拆分等其他优化方法。通过实际案例分析展示调优效果。回顾与数据库课程设计相关文章,强调全面认识 MySQL 数据库重要性。为读者提供综合调优指导,确保数据库高效运行。
|
9月前
|
SQL 关系型数据库 MySQL
案例剖析:MySQL唯一索引并发插入导致死锁!
案例剖析:MySQL唯一索引并发插入导致死锁!
591 0
案例剖析:MySQL唯一索引并发插入导致死锁!
|
5月前
|
存储 SQL 关系型数据库
服务器数据恢复—云服务器上mysql数据库数据恢复案例
某ECS网站服务器,linux操作系统+mysql数据库。mysql数据库采用innodb作为默认存储引擎。 在执行数据库版本更新测试时,操作人员误误将在本来应该在测试库执行的sql脚本在生产库上执行,导致生产库上部分表被truncate,还有部分表中少量数据被delete。
150 25
|
9月前
|
SQL 关系型数据库 MySQL
案例剖析,MySQL共享锁引发的死锁问题!
案例剖析,MySQL共享锁引发的死锁问题!
119 0
|
9月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
572 0
|
9月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
160 0
|
5月前
|
SQL 关系型数据库 MySQL
数据库数据恢复——MySQL简介和数据恢复案例
MySQL数据库数据恢复环境&故障: 本地服务器,安装的windows server操作系统。 操作系统上部署MySQL单实例,引擎类型为innodb,表空间类型为独立表空间。该MySQL数据库没有备份,未开启binlog。 人为误操作,在用Delete命令删除数据时未添加where子句进行筛选导致全表数据被删除,删除后未对该表进行任何操作。
|
5月前
|
SQL 关系型数据库 MySQL
MySQL原理简介—11.优化案例介绍
本文介绍了四个SQL性能优化案例,涵盖不同场景下的问题分析与解决方案: 1. 禁止或改写SQL避免自动半连接优化。 2. 指定索引避免按聚簇索引全表扫描大表。 3. 按聚簇索引扫描小表减少回表次数。 4. 避免产生长事务长时间执行。
|
7月前
|
存储 关系型数据库 MySQL
10个案例告诉你mysql不使用子查询的原因
大家好,我是V哥。上周与朋友讨论数据库子查询问题,深受启发。为此,我整理了10个案例,详细说明如何通过优化子查询提升MySQL性能。主要问题包括性能瓶颈、索引失效、查询优化器复杂度及数据传输开销等。解决方案涵盖使用EXISTS、JOIN、IN操作符、窗口函数、临时表及索引优化等。希望通过这些案例,帮助大家在实际开发中选择更高效的查询方式,提升系统性能。关注V哥,一起探讨技术,欢迎点赞支持!
416 5

推荐镜像

更多