Output Modes
有几种输出模式:
- Append mode (default) - 这是默认模式,其中只有添加到结果表(因为最后一个触发器)的新行将输出到接收器。这只适用于那些添加到结果表的行永远不会改变的查询。因此,此模式保证每个行只输出一次(假设容错接收器)。例如,只有select、where、map、flatMap、filter、join等查询将支持Append模式。
- Complete mode - 每次触发后,整个结果表将输出到接收器。这对于聚合查询是受支持的。
- Update mode - (从Spark 2.1.1起可用)只有结果表中自最后一个触发器以来更新的行将输出到接收器。
OutputSinks
1.文件接收器-----将输出存储到目录中
writeStream.format("parquet") // 可以是"orc", "json", "csv"等等格式 .option("path", "path/to/destination/dir").start()
2.Kafka 接收器——将输出发送到Kafka中的一个或多个Topic。
writeStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("topic", "updates") .start()
3.Foreach接收器——在输出中的记录上运行任意计算
wordCount.writeStream .outputMode("update") .foreach(new ForeachWriter[Row] { //插入数据, 当有重复的 key 的时候更新, val sql = "insert into word_count values(?, ?) on duplicate key update word=?, count=?"; var conn: Connection = null // open 一般用于打开链接,返回false时则表示跳过该区的数据 override def open(partitionId: Long, epochId: Long): Boolean = { Class.forName("com.mysql.jdbc.Driver") conn = DriverManager.getConnection("jdbc:mysql://c702:3306/ss", "root", "Root.123") //如果conn不为空,而且conn 没有关,则可以往外写 conn != null && !conn.isClosed } //把数据写入到连接中 override def process(value: Row): Unit = { val ps: PreparedStatement = conn.prepareStatement(sql) ps.setString(1,value.getString(0)) ps.setLong(2,value.getLong(1)) ps.setString(3,value.getString(0)) ps.setLong(4,value.getLong(1)) ps.execute() ps.close() } //关闭连接 override def close(errorOrNull: Throwable): Unit = { if (conn != null && !conn.isClosed) conn.close() } }) .start() query.awaitTermination()
4. ForeachSink接收器——可以实现批量写入或者实现写入多个任意位置。
- 默认情况下,
foreachBatch
仅提供至少一次写入保证。但是,您可以使用提供给函数的 batchId 作为对输出进行重复数据删除并获得精确一次保证的方法。
wordCount.writeStream .outputMode("complete") .foreachBatch((df,batchId) => { df.persist()//持久化 df.write.mode("overwrite")//如果有则覆盖 .jdbc("jdbc:mysql://c701:3306/ss","word_count",props)//写到jdbc df.write.mode("overwrite").json("./foreachBatch")//写到json df.unpersist() }) .start() query.awaitTermination() }
5.内存接收器(同样用于调试)——输出作为内存中的表存储在内存中。这两种模式都支持Append和Complete输出模式。整个输出被收集并存储在Driver程序的内存中,这应该用于在低数据量上进行调试。因此,请谨慎使用。
writeStream .format("memory") .queryName("tableName") .start()
- Event time must be defined on a window or a timestamp 时间时间必须为时间戳
- Sorting is not supported on streaming DataFrames/Datasets, unless it is on aggregated DataFrame/Dataset in Complete output mode;排序在流式处理的时候不支持
- 输出模式必须是append或update. 在输出模式是complete的时候(必须有聚合), 要求每次输出所有的聚合结果. 我们使用 watermark 的目的是丢弃一些过时聚合数据, 所以complete模式使用wartermark无效也无意义
- 在输出模式是append时, 必须设置 watermask 才能使用聚合操作. 其实, watermask 定义了 append 模式中何时输出聚合聚合结果(状态), 并清理过期状态
- 在输出模式是update时, watermask 主要用于过滤过期数据并及时清理过期状态.
- watermask 会在处理当前批次数据时更新, 并且会在处理下一个批次数据时生效使用. 但如果节点发送故障, 则可能延迟若干批次生效
- withWatermark 必须使用与聚合操作中的时间戳列是同一列.df.withWatermark(“time”, “1 min”).groupBy(“time2”).count() 无效
- withWatermark 必须在聚合之前调用f.groupBy(“time”).count().withWatermark(“time”, “1 min”) 无效
Structured Streaming 写入Neo4j demo
1: 简单数据流向
2: 引入pom.xml 相关依赖
<dependency> <groupId>org.neo4j</groupId> <artifactId>neo4j</artifactId> <version>3.4.18</version> </dependency> <dependency> <groupId>org.neo4j</groupId> <artifactId>neo4j-spatial</artifactId> <version>0.25.5-neo4j-3.4.1</version> </dependency> <dependency> <groupId>neo4j-contrib</groupId> <artifactId>neo4j-spark-connector</artifactId> <version>2.4.1-M1</version> </dependency>
3: 样例数据:
{"change":[{"kind":"insert","schema":"public","table":"sample","columnnames":["xzqhbm","qylx","ybbq","ybbqdm","ybpssj","fbl","dtyxmc","whq","zjzt","sfyh","sfzd","sfyy","area","length","xcenter","ycenter","ybzxyxq","ybzdyxq","dem","slope","geometry"],"columntypes":["character varying(255)","character varying(255)","character varying(255)","character varying(255)","date","numeric","character varying(255)","bigint","bigint","bigint","bigint","bigint","double precision","double precision","numeric","numeric","date","date","bigint","bigint","public.geometry"],"columnvalues":["370303","平原","乔灌果园","YD010100","2020-05-12",1,"GF-2",0,0,0,0,0,0.00940095134656,0,0,0,null,null,0,0,"0106000000010000000103000000010000001500000090138B24A9265E4090596931FC454240881F4397B5265E400021E1D4F745424060C2D3F1B9265E402057AE1AF845424068E585DEBC265E4058B9B4DAF6454240A09FEB60BC265E4050131A81EB454240E84044A7BD269E4028CB3343EB45424020BFC614BD265E406809D346E1454240B06AA148BD265E4000F9D6EADF454240807380ACBC265E40B8E9315FD8454240C8F9D769B9265E40A8ADA1D9D745424068382007BA265E4038AD4E1ADF454240380E5AE7B6265E4040A64984E1454240F0AB13E6A9265E403877A5E6E445424050FC8C36A7265E40D871155BE645424068D89587A6265E40680C1980E745424000C0390FA6265E40482D6882EA45424008A88F11A7265E40B0963FA2F345424048CE2A82AB265E4098FEDC3FF2454240E80FFDE2AB265E40F026E831F545424018FC146CA8265E40580A4D21F645424090138B24A9265E4090596931FC454290","kafka2neo4j"]}]}
4: 核心代码如下:
import org.apache.spark.sql.{DataFrame, ForeachWriter, Row, SparkSession} import org.json.{JSONArray, JSONObject} import org.neo4j.driver.v1.{AuthTokens, Driver, GraphDatabase, Session} /** * @Description TODO * @Data 2020/10/21 10:16 * @Created by mdz */ object GetDataForeach { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession .builder() .appName("kafka2Delta") .master("local[*]") .getOrCreate() import spark.implicits._ import org.apache.spark.sql.functions._ val kafkaDF: DataFrame = spark .readStream //readStream .format("kafka") .option("kafka.bootstrap.servers", "192.168.0.219:9092,192.168.0.220:9092,192.168.0.221:9092") .option("subscribe", "db02_topic") .option("startingOffsets", "latest") //earliest //latest .option("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") .option("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") .load() val resDF: DataFrame = kafkaDF .select(col("value") .cast("String").as("json")) .filter(col("json").contains("kafka2neo4j")) .filter(col("json").contains("insert")) .distinct() val query = resDF.writeStream .foreach(new ForeachWriter[Row] { var leaderPath: String = _ var driver: Driver = _ var session: Session = _ override def open(partitionId: Long, epochId: Long): Boolean = { println("開始創建連接.....") leaderPath = "bolt://localhost:27687" driver = GraphDatabase.driver(leaderPath, AuthTokens.basic("neo4j", "neo4j")) session = driver.session() driver != null && session != null } override def process(value: Row): Unit = { println("開始執行任務....") val jsonData: JSONObject =new JSONObject(value.mkString) val jsonArray: JSONArray = jsonData.getJSONArray("change") val valuedata = jsonArray.get(0).toString val json = new JSONObject(valuedata) val columntypes: JSONArray = json.getJSONArray("columntypes") val columnvalues: JSONArray = json.getJSONArray("columnvalues") val columnnames: JSONArray = json.getJSONArray("columnnames") var cql = "CREATE (n {" for(i <- 0 until columnnames.length()){ if(columntypes.get(i).toString.contains("character")||columntypes.get(i).toString.contains("geometry")||columntypes.get(i).toString.contains("date")){ cql = cql+columnnames.get(i) + ":'" +columnvalues.get(i) + "'," }else{ cql = cql+columnnames.get(i) + ":" +columnvalues.get(i) + "," } } cql =cql.substring(0,cql.length-1) + "})" println(cql) session.run(cql) } override def close(errorOrNull: Throwable): Unit = { println("關閉連接.....") session.close() driver.close() } }).start() query.awaitTermination() } }