针对单个foreachRDD操作可以mysql的事务处理,多个的情况怎么做呀?有遇到这个问题吗?
本问题来自阿里云开发者社区的【11大垂直技术领域开发者社群】。点击链接欢迎加入感兴趣的技术领域群
import java.sql.DriverManager import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object ForeachRDDApp { def main(args: Array[String]) { val sparkConf = new SparkConf() .setAppName("ForeachRDDApp") .setMaster("local[2]") val ssc = new StreamingContext(sparkConf, Seconds(10)) val lines = ssc.socketTextStream("hadoop000",9997) val results = lines.flatMap(.split(",")).map((,1)).reduceByKey(+) // TODO... 将results写入到MySQL中 // results.foreachRDD(rdd => { // rdd.foreach(x => { // val connection = createConnection() // val word = x._1 // val count = x._2.toInt // val sql = s"insert into wc(word, c) values ('$word', $count)" // connection.createStatement().execute(sql) // }) // }) // 最佳实践 results.foreachRDD(rdd => { rdd.foreachPartition(partition => { val connection = createConnection() partition.foreach(x => { val word = x._1 val count = x._2.toInt val sql = s"insert into wc(word, c) values ('$word', $count)" connection.createStatement().execute(sql) }) connection.close() }) rdd.foreach(x => { val connection = createConnection() val word = x._1 val count = x._2.toInt val sql = s"insert into wc(word, c) values ('$word', $count)" connection.createStatement().execute(sql) }) }) ssc.start() // 一定要写 // lines.print() ssc.awaitTermination() } def createConnection() = { Class.forName("com.mysql.jdbc.Driver") DriverManager.getConnection("jdbc:mysql://hadoop000:3306/ss2","root","root") } } 答案来源网络,供参考 原文链接:https://blog.csdn.net/qq_15300683/article/details/80689998
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。