Structured Streaming 读取kafka 写入Neo4j

简介: Structured Streaming 读取kafka 写入Neo4j

Output Modes

有几种输出模式:

  • Append mode (default) - 这是默认模式,其中只有添加到结果表(因为最后一个触发器)的新行将输出到接收器。这只适用于那些添加到结果表的行永远不会改变的查询。因此,此模式保证每个行只输出一次(假设容错接收器)。例如,只有select、where、map、flatMap、filter、join等查询将支持Append模式。
  • Complete mode - 每次触发后,整个结果表将输出到接收器。这对于聚合查询是受支持的。
  • Update mode - (从Spark 2.1.1起可用)只有结果表中自最后一个触发器以来更新的行将输出到接收器。

OutputSinks

1.文件接收器-----将输出存储到目录中

writeStream.format("parquet")        // 可以是"orc", "json", "csv"等等格式
    .option("path", "path/to/destination/dir").start()

2.Kafka 接收器——将输出发送到Kafka中的一个或多个Topic。

writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("topic", "updates")
    .start()

3.Foreach接收器——在输出中的记录上运行任意计算

wordCount.writeStream
      .outputMode("update")
      .foreach(new ForeachWriter[Row] {
        //插入数据, 当有重复的 key 的时候更新,
        val sql = "insert into word_count values(?, ?) on duplicate key update word=?, count=?";
        var conn: Connection = null
        // open 一般用于打开链接,返回false时则表示跳过该区的数据
        override def open(partitionId: Long, epochId: Long): Boolean = {
          Class.forName("com.mysql.jdbc.Driver")
          conn = DriverManager.getConnection("jdbc:mysql://c702:3306/ss", "root", "Root.123")
          //如果conn不为空,而且conn 没有关,则可以往外写
          conn != null && !conn.isClosed
        }
        //把数据写入到连接中
        override def process(value: Row): Unit = {
          val ps: PreparedStatement = conn.prepareStatement(sql)
          ps.setString(1,value.getString(0))
          ps.setLong(2,value.getLong(1))
          ps.setString(3,value.getString(0))
          ps.setLong(4,value.getLong(1))
          ps.execute()
          ps.close()
        }
        //关闭连接
        override def close(errorOrNull: Throwable): Unit = {
          if (conn != null && !conn.isClosed) conn.close()
        }
      })
      .start()
    query.awaitTermination()

4. ForeachSink接收器——可以实现批量写入或者实现写入多个任意位置。

  • 默认情况下,foreachBatch仅提供至少一次写入保证。但是,您可以使用提供给函数的 batchId 作为对输出进行重复数据删除并获得精确一次保证的方法。
wordCount.writeStream
      .outputMode("complete")
      .foreachBatch((df,batchId) => {
        df.persist()//持久化
        df.write.mode("overwrite")//如果有则覆盖
          .jdbc("jdbc:mysql://c701:3306/ss","word_count",props)//写到jdbc
        df.write.mode("overwrite").json("./foreachBatch")//写到json
        df.unpersist()
      })
      .start()
    query.awaitTermination()
  }

5.内存接收器(同样用于调试)——输出作为内存中的表存储在内存中。这两种模式都支持Append和Complete输出模式。整个输出被收集并存储在Driver程序的内存中,这应该用于在低数据量上进行调试。因此,请谨慎使用。

writeStream
    .format("memory")
    .queryName("tableName")
    .start()
  • Event time must be defined on a window or a timestamp 时间时间必须为时间戳

  • Sorting is not supported on streaming DataFrames/Datasets, unless it is on aggregated DataFrame/Dataset in Complete output mode;排序在流式处理的时候不支

  • 输出模式必须是append或update. 在输出模式是complete的时候(必须有聚合), 要求每次输出所有的聚合结果. 我们使用 watermark 的目的是丢弃一些过时聚合数据, 所以complete模式使用wartermark无效也无意义

  • 在输出模式是append时, 必须设置 watermask 才能使用聚合操作. 其实, watermask 定义了 append 模式中何时输出聚合聚合结果(状态), 并清理过期状态

  • 在输出模式是update时, watermask 主要用于过滤过期数据并及时清理过期状态.

  • watermask 会在处理当前批次数据时更新, 并且会在处理下一个批次数据时生效使用. 但如果节点发送故障, 则可能延迟若干批次生效

  • withWatermark 必须使用与聚合操作中的时间戳列是同一列.df.withWatermark(“time”, “1 min”).groupBy(“time2”).count() 无效

  • withWatermark 必须在聚合之前调用f.groupBy(“time”).count().withWatermark(“time”, “1 min”) 无效

Structured Streaming 写入Neo4j demo

1: 简单数据流向

60a6bcefe26f4b118e50f46e4d0afd1d.png

2: 引入pom.xml 相关依赖

<dependency>
    <groupId>org.neo4j</groupId>
    <artifactId>neo4j</artifactId>
    <version>3.4.18</version>
</dependency>
<dependency>
    <groupId>org.neo4j</groupId>
    <artifactId>neo4j-spatial</artifactId>
    <version>0.25.5-neo4j-3.4.1</version>
</dependency>
<dependency>
    <groupId>neo4j-contrib</groupId>
    <artifactId>neo4j-spark-connector</artifactId>
    <version>2.4.1-M1</version>
</dependency>

3: 样例数据:

{"change":[{"kind":"insert","schema":"public","table":"sample","columnnames":["xzqhbm","qylx","ybbq","ybbqdm","ybpssj","fbl","dtyxmc","whq","zjzt","sfyh","sfzd","sfyy","area","length","xcenter","ycenter","ybzxyxq","ybzdyxq","dem","slope","geometry"],"columntypes":["character varying(255)","character varying(255)","character varying(255)","character varying(255)","date","numeric","character varying(255)","bigint","bigint","bigint","bigint","bigint","double precision","double precision","numeric","numeric","date","date","bigint","bigint","public.geometry"],"columnvalues":["370303","平原","乔灌果园","YD010100","2020-05-12",1,"GF-2",0,0,0,0,0,0.00940095134656,0,0,0,null,null,0,0,"0106000000010000000103000000010000001500000090138B24A9265E4090596931FC454240881F4397B5265E400021E1D4F745424060C2D3F1B9265E402057AE1AF845424068E585DEBC265E4058B9B4DAF6454240A09FEB60BC265E4050131A81EB454240E84044A7BD269E4028CB3343EB45424020BFC614BD265E406809D346E1454240B06AA148BD265E4000F9D6EADF454240807380ACBC265E40B8E9315FD8454240C8F9D769B9265E40A8ADA1D9D745424068382007BA265E4038AD4E1ADF454240380E5AE7B6265E4040A64984E1454240F0AB13E6A9265E403877A5E6E445424050FC8C36A7265E40D871155BE645424068D89587A6265E40680C1980E745424000C0390FA6265E40482D6882EA45424008A88F11A7265E40B0963FA2F345424048CE2A82AB265E4098FEDC3FF2454240E80FFDE2AB265E40F026E831F545424018FC146CA8265E40580A4D21F645424090138B24A9265E4090596931FC454290","kafka2neo4j"]}]}

    4: 核心代码如下:

    import org.apache.spark.sql.{DataFrame, ForeachWriter, Row, SparkSession}
    import org.json.{JSONArray, JSONObject}
    import org.neo4j.driver.v1.{AuthTokens, Driver, GraphDatabase, Session}
    /**
     * @Description TODO
     * @Data 2020/10/21 10:16
     * @Created by mdz
     */
    object GetDataForeach {
      def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession
          .builder()
          .appName("kafka2Delta")
          .master("local[*]")
          .getOrCreate()
        import spark.implicits._
        import org.apache.spark.sql.functions._
        val kafkaDF: DataFrame =
          spark
            .readStream //readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", "192.168.0.219:9092,192.168.0.220:9092,192.168.0.221:9092")
            .option("subscribe", "db02_topic")
            .option("startingOffsets", "latest") //earliest  //latest
            .option("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
            .option("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
            .load()
        val resDF: DataFrame = kafkaDF
          .select(col("value")
            .cast("String").as("json"))
          .filter(col("json").contains("kafka2neo4j"))
          .filter(col("json").contains("insert"))
          .distinct()
        val query = resDF.writeStream
          .foreach(new ForeachWriter[Row] {
            var leaderPath: String = _
            var driver: Driver = _
            var session: Session = _
            override def open(partitionId: Long, epochId: Long): Boolean = {
              println("開始創建連接.....")
              leaderPath = "bolt://localhost:27687"
              driver = GraphDatabase.driver(leaderPath, AuthTokens.basic("neo4j", "neo4j"))
              session = driver.session()
              driver != null && session != null
            }
            override def process(value: Row): Unit = {
              println("開始執行任務....")
              val jsonData: JSONObject =new JSONObject(value.mkString)
              val jsonArray: JSONArray = jsonData.getJSONArray("change")
              val valuedata = jsonArray.get(0).toString
              val json = new JSONObject(valuedata)
              val columntypes: JSONArray = json.getJSONArray("columntypes")
              val columnvalues: JSONArray = json.getJSONArray("columnvalues")
              val columnnames: JSONArray = json.getJSONArray("columnnames")
              var cql = "CREATE (n {"
              for(i <- 0 until columnnames.length()){
                if(columntypes.get(i).toString.contains("character")||columntypes.get(i).toString.contains("geometry")||columntypes.get(i).toString.contains("date")){
                  cql = cql+columnnames.get(i) + ":'" +columnvalues.get(i) + "',"
                }else{
                  cql = cql+columnnames.get(i) + ":" +columnvalues.get(i) + ","
                }
              }
              cql  =cql.substring(0,cql.length-1) +  "})"
              println(cql)
              session.run(cql)
            }
            override def close(errorOrNull: Throwable): Unit = {
              println("關閉連接.....")
              session.close()
              driver.close()
            }
          }).start()
        query.awaitTermination()
      }
    }
    相关文章
    |
    8月前
    |
    消息中间件 分布式计算 Kafka
    195 Spark Streaming整合Kafka完成网站点击流实时统计
    195 Spark Streaming整合Kafka完成网站点击流实时统计
    50 0
    |
    2月前
    |
    消息中间件 分布式计算 Kafka
    使用Kafka与Spark Streaming进行流数据集成
    使用Kafka与Spark Streaming进行流数据集成
    |
    10月前
    |
    消息中间件 分布式计算 Kafka
    大数据Spark Structured Streaming集成 Kafka
    大数据Spark Structured Streaming集成 Kafka
    87 0
    |
    10月前
    |
    消息中间件 分布式计算 Kafka
    大数据Spark Streaming集成Kafka
    大数据Spark Streaming集成Kafka
    92 0
    |
    消息中间件 存储 运维
    如何用Know Streaming来查询Kafka的消息
    如何用Know Streaming来查询Kafka的消息
    如何用Know Streaming来查询Kafka的消息
    |
    消息中间件 分布式计算 Kafka
    Spark Streaming实时流处理项目实战笔记——使用KafkaSInk将Flume收集到的数据输出到Kafka
    Spark Streaming实时流处理项目实战笔记——使用KafkaSInk将Flume收集到的数据输出到Kafka
    |
    消息中间件 分布式计算 Java
    Spark Streaming实时流处理项目实战笔记——Kafka Consumer Java API编程
    Spark Streaming实时流处理项目实战笔记——Kafka Consumer Java API编程
    |
    18天前
    |
    消息中间件 存储 Kafka
    实时计算 Flink版产品使用问题之通过flink同步kafka数据进到doris,decimal数值类型的在kafka是正常显示数值,但是同步到doris表之后数据就变成了整数,该如何处理
    实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
    |
    18天前
    |
    消息中间件 存储 Kafka
    实时计算 Flink版产品使用问题之 从Kafka读取数据,并与两个仅在任务启动时读取一次的维度表进行内连接(inner join)时,如果没有匹配到的数据会被直接丢弃还是会被存储在内存中
    实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
    |
    17天前
    |
    消息中间件 Java 关系型数据库
    实时计算 Flink版操作报错合集之从 PostgreSQL 读取数据并写入 Kafka 时,遇到 "initial slot snapshot too large" 的错误,该怎么办
    在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
    774 0

    热门文章

    最新文章