• 关于

    streaming sql

    的搜索结果

问题

如何使用spark将kafka主题中的writeStream数据写入hdfs?

我一直试图让这段代码工作几个小时:val spark = SparkSession.builder() .appName("Consumer") .getOrCreate() spark.readStream .format("kafka"...
社区小助手 2019-12-01 19:29:59 1407 浏览量 回答数 2

回答

入湖引擎使用了阿里云 EMR 团队自研的 Spark Streaming SQL 以及 EMR Spark 引擎,Streaming SQL 基于 Spark Structured Streaming,提供了相对完善的 Streaming SQL 语法,极大简化了实时计算的开发成本。对于实时增量模板,上层入湖 模板部分将入湖模板翻译成 Streaming SQL,然后提交 Spark 集群运行。我们在 Streaming SQL 里面扩展了 Merge Into 语法来支持 update、delete 操作。对于 RDS 等全量模板,则直接翻译成 Spark SQL 运行。
Lee_tianbai 2021-01-07 16:15:44 0 浏览量 回答数 0

回答

Spark Streaming SQL 在 Spark Structured Streaming 之上提供了 SQL 能力, 降低了实时业务开发的门槛,使得离线业务实时化更简单方便。 Spark Streaming SQL 支持的语法如下: 下面以实时消费 SLS 为例: # 创建 loghub 源表 spark-sql> CREATE TABLE loghub_intput_tbl(content string) > USING loghub > OPTIONS > (...) # 创建 delta 目标表 spark-sql> CREATE TABLE delta_output_tbl(content string) > USING delta > OPTIONS > (...); # 创建流式 SCAN spark-sql> CREATE SCAN loghub_table_intput_test_stream > ON loghub_intput_tbl > USING STREAM; # 将 loghub 源表数据插入 delta 目标表 spark-sql> INSERT INTO delta_output_tbl SELECT content FROM loghub_table_i ntput_test_stream;
Lee_tianbai 2021-01-07 16:30:36 0 浏览量 回答数 0

问题

Spark Streaming SQL是什么?

Spark Streaming SQL是什么?...
Lee_tianbai 2021-01-07 16:29:39 1 浏览量 回答数 1

问题

如何使用Spark Streaming SQL进行 PV/UV统计?

如何使用Spark Streaming SQL进行 PV/UV统计? 求大佬解答...
爱吃鱼的程序员 2020-12-28 11:38:30 0 浏览量 回答数 1

问题

使用Spark Streaming SQL进行 PV/UV统计的准备工作?

使用Spark Streaming SQL进行 PV/UV统计的准备工作? 求大佬解答...
爱吃鱼的程序员 2020-12-28 11:39:00 0 浏览量 回答数 1

问题

Blink streaming 不支持print(), 那为什么又有这样的api呢 val sql1 = "SELECT a, c FROM sourceTable" tEnv.sqlQuery(sql1).print() // org.apache.flink.table.api.TableException: collect is not supported.

转自钉钉群21789141:Blink streaming 不支持print(), 那为什么又有这样的api呢 val sql1 = "SELECT a, c FROM sourceTable" tEnv.sqlQuery(sql1).pr...
赵慧 2019-12-01 19:33:21 542 浏览量 回答数 1

问题

Spark Structured Streaming error读取字段'topic_metadata'时出错

我正在运行spark 2.4.0和Kafka 0.10.2 var streamingInputDF = spark.readStream .format("kafka") .option("kafka.bootstrap.servers...
社区小助手 2019-12-01 19:23:55 778 浏览量 回答数 1

问题

使用spark streaming连接loghub报错,是什么问题

"main" java.lang.ClassNotFoundException: Failed to find data source: loghub. Please find packages at http://spark.apache...
游客lplm6xso3kx3e 2019-12-01 19:41:16 462 浏览量 回答数 1

问题

Pyspark - 打印来自Kafka的消息

我建立了一个带有生产者和消费者的kafka系统,作为消息流式传输json文件的行。 使用pyspark,我需要分析不同流媒体窗口的数据。为此,我需要查看pyspark流式传输的数据......我该怎么做? 要运行代码,我使用了Yannael...
社区小助手 2019-12-01 19:25:44 646 浏览量 回答数 1

回答

使用.explain运算符,它将逻辑和(启用扩展标志)物理计划打印到控制台。 val records = spark. readStream. format("rate"). load scala> records.explain == Physical Plan ==StreamingRelation rate, [timestamp#0, value#1L] scala> records.explain(extended = true) == Parsed Logical Plan ==StreamingRelation DataSource(org.apache.spark.sql.SparkSession@4071aa13,rate,List(),None,List(),None,Map(),None), rate, [timestamp#0, value#1L] == Analyzed Logical Plan ==timestamp: timestamp, value: bigintStreamingRelation DataSource(org.apache.spark.sql.SparkSession@4071aa13,rate,List(),None,List(),None,Map(),None), rate, [timestamp#0, value#1L] == Optimized Logical Plan ==StreamingRelation DataSource(org.apache.spark.sql.SparkSession@4071aa13,rate,List(),None,List(),None,Map(),None), rate, [timestamp#0, value#1L] == Physical Plan ==StreamingRelation rate, [timestamp#0, value#1L]物理计划是DAG的转换,因此它可以帮助您。 这个链接可能会有所帮助: https://jaceklaskowski.gitbooks.io/spark-structured-streaming/spark-sql-streaming-Dataset-explain.html
社区小助手 2019-12-02 01:46:16 0 浏览量 回答数 0

问题

mysql5.7编译安装报错,环境centos7.2

报错见下: [ 70%] Building CXX object libmysqld/CMakeFiles/sql_embedded.dir/__/sql/filesort_utils.cc.o [ 70%] Buildin...
dusk-x 2019-12-01 20:57:24 2081 浏览量 回答数 1

问题

spark2.1.0的kafka链接失败了:报错

in thread "main" java.lang.IllegalArgumentException: 'path' is not specified     at org.apache.spark.sql...
kun坤 2020-06-14 13:44:30 63 浏览量 回答数 1

回答

目前MaxCompute Spark支持以下使用场景: Java/Scala所有离线场景,GraphX、Mllib、RDD、Spark-SQL、PySpark等。 读写MaxCompute Table。 OSS非结构化存储支持。 暂不支持以下场景: 读写VPC环境下的服务,如RDS、Redis、ECS上部署的服务等。 Streaming场景。 交互式类需求,Spark-Shell、Spark-SQL-Shell、PySpark-Shell等。 按量计费开发者版资源仅支持MaxCompute SQL(支持使用UDF)、PyODPS作业任务,暂不支持MapReduce、Spark等其它任务。
LiuWH 2020-03-19 23:04:37 0 浏览量 回答数 0

问题

推荐一个flink-sql 客户端 图形化

https://github.com/zhp8341/flink-streaming-platform-web...
西湖 2021-03-18 00:02:44 2 浏览量 回答数 0

问题

Spark写入流到IBM Cloud对象存储失败,“Access KEY为空。请提供有效的访问密钥“

我目前正在使用Apache Spark 2.3.2并创建一个管道来从文件系统中读取流csv文件,然后将其写入IBM Cloud对象存储。 我正在使用Stocator连接器。通过以下配置,对IBM COS的常规读取和写入工作正常。但是,读写流...
社区小助手 2019-12-01 19:28:45 652 浏览量 回答数 1

问题

在本地执行flink是报错TypeDeserializerAdapter not found

本地执行代码如下 EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build...
游客3yybxutymqvrm 2020-10-07 07:02:44 483 浏览量 回答数 1

问题

Flink 的 Scala API 怎么指定 时间字段

代码如下 import com.alibaba.fastjson.JSON import com.alibaba.fastjson.serializer.SerializeFilter import org.apache.flink.api...
冷丰 2019-12-01 19:39:10 819 浏览量 回答数 1

问题

实时计算(流计算)热门回答01

1、在flink集群模式下,能不能指定某个节点的solt来执行一个task? https://developer.aliyun.com/ask/136420 2、是否可以将flink部署在运行在JDK1.7的...
问问小秘 2019-12-01 19:51:57 18 浏览量 回答数 0

问题

惠州开诊断证明-rxq

惠州开诊断证明-rxq(微)电〗【186-6605-3854〗在sbt项目中引入reactive-influx: libraryDependencies ++= Seq( "com.pygmalios...
游客5k2abgdj3m2ti 2019-12-01 22:09:10 3 浏览量 回答数 0

回答

Spark提供了一系列面向不同应用需求的组件,主要有Spark Core、Spark SQL、Spark Streaming、MLlib、GraphX。
bigbigtree 2020-03-19 19:18:58 0 浏览量 回答数 0

回答

首先监控是延迟的,关于监控报警,开源的spark做的还是不够好的。主要通过以下两种方式进行1、struct Streaming可以直接在driver端通过接口,获取监控信息eg:query.recentProgress // an array of the most recent progress updates for this query2、spark streaming(DStream)除了UI,另外的方法配置StreamingSource把监控信息输出到ganglia、文件、或者开发下输出到你自己的某个监控系统在其他文章中也有使用 Prometheus 和 Grafana 监控 Spark 应用,参考资料如下:https://blog.csdn.net/lsshlsw/article/details/82670508Grafana主要还是展示,spark服务端数据怎么吐出来目前社区版本还是需要加强的。只是监控不报警的话用ganglia就好。目前阿里云HBase支持的Spark服务是把streaming的核心指标对接到云监控,然后用户可以对指标订阅报警,比如latency超过了40ms,就会发钉钉、短信、电话等https://help.aliyun.com/document_detail/95995.html?spm=a2c4g.11186623.6.605.489b2fc04QZj4Ehttp://spark.apache.org/releases/spark-release-2-4-0.html#core-and-spark-sql
hbase小助手 2019-12-02 01:43:21 0 浏览量 回答数 0

问题

本机执行flink,遇到TypeDeserializerAdapter notfound

在本地maven工程是1.11.1 EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMod...
游客3yybxutymqvrm 2020-10-07 07:02:55 150 浏览量 回答数 1

问题

【百问百答】Apache Spark 中文实战攻略(上册)

1、Apache Spark 3.0 是什么? 2、Apache Spark 3.0 与性能相关的新功能主要有哪些? 3、 Spark 3.0 的Adaptive Query Execution是什么࿱...
爱吃鱼的程序员 2020-12-28 12:07:15 1 浏览量 回答数 0

回答

spark是基于DAG,有cache的管理,原生就长在内存计算上的,其上支持 内存计算、流式计算、图计算、SQL等功能,这些又是在一套core上,互相之间可以交叉使用。还包含了丰富的API,RDD api、dataframe、dataset等。支持java、scala、python、R语言。是数据分析处理的一大利器。hadoop mr是基于map-reduce的,相对spark开发较早,稳定性较好,做数据清洗时能获取比较大的吞吐量。hadoop tez是基于DAG的,比spark应该晚点,以后作为hadoop hive的可选引擎之一。所以:ETL:hadoop mr/tez机器学习:spark mllib流式计算(s以上):spark streaming流式计算(s以下):storm图分析:spark graphx需要cache数据的,使用spark使用hive:则hadoop/tez使用SQL,可以尝试用spark sql,使用hive相对稳定一些更加宏观的可以参考文章:https://yq.aliyun.com/articles/15306?spm=0.0.0.0.v2fm6G
封神 2019-12-02 01:46:15 0 浏览量 回答数 0

问题

Apache flink 1.52 Rowtime时间戳为空

我正在使用以下代码进行一些查询: env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<Row> ds = SourceHelp.bu...
flink小助手 2019-12-01 19:24:30 938 浏览量 回答数 1

问题

postgresql standby server执行部分sql不响应,如count整个表

我使用hot_standby做的备库,给线上主库做负载(pgpool)。但是发现,备库在执行某些sql的时候一直的时候,响应时间很长,甚至几个小时没结果返回也不中断,,sy_bs_package這张表目前数据量在1000w+级别,数据更新很...
dynz 2019-12-01 19:26:23 1308 浏览量 回答数 1

问题

Spark结构化流媒体从Cassandra中丰富

我使用结构化流式传输来自Kafka的数据 val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .o...
社区小助手 2019-12-01 19:25:50 737 浏览量 回答数 1

回答

需要的pom依赖: <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>${flink.version}</version> <groupId>org.apache.flink</groupId> <artifactId>flink-table_2.11</artifactId> <version>${flink.version}</version> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>${flink.version}</version> https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.8 --> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.8_2.11</artifactId> <version>${flink.version}</version> 从kafka消费数据,转换为table,然后进行sql查询。用scala开发,需要导入的包,不要漏掉,否则会有问题。 import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08import org.apache.flink.streaming.util.serialization.SimpleStringSchemaimport org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._123456下面是完整代码:package com.ddxygq.bigdata.flink.sql import java.util.Properties import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08import org.apache.flink.streaming.util.serialization.SimpleStringSchemaimport org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.table.api.scala._ /** @ Author: keguang @ Date: 2019/2/22 16:13 @ version: v1.0.0 @ description: */ object TableDemo { def main(args: Array[String]): Unit = { demo } def demo2(): Unit ={ val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) val input:DataSet[WC] = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1)) val input2:DataSet[WC] = env.fromElements(WC("hello", 1), WC("hello", 1)) val table = input.toTable(tEnv, 'word, 'frequency) val table2 = input2.toTable(tEnv, 'word2, 'frequency2) val result = table.join(table2).where('word == 'word2).select('word, 'frequency) result.toDataSet[(String, Long)].print() } def demo: Unit ={ val sEnv = StreamExecutionEnvironment.getExecutionEnvironment val sTableEnv = TableEnvironment.getTableEnvironment(sEnv) // 连接kafka val ZOOKEEPER_HOST = "qcloud-test-hadoop01:2181" val KAFKA_BROKERS = "qcloud-test-hadoop01:9092,qcloud-test-hadoop02:9092,qcloud-test-hadoop03:9092" val TRANSACTION_GROUP = "transaction" val kafkaProps = new Properties() kafkaProps.setProperty("zookeeper.connect",ZOOKEEPER_HOST) kafkaProps.setProperty("bootstrap.servers", KAFKA_BROKERS) kafkaProps.setProperty("group.id",TRANSACTION_GROUP) val input = sEnv.addSource( new FlinkKafkaConsumer08[String]("flink-test", new SimpleStringSchema(), kafkaProps) ) .flatMap(x => x.split(" ")) .map(x => (x, 1L)) val table = sTableEnv.registerDataStream("Words", input, 'word, 'frequency) val result = sTableEnv .scan("Words") .groupBy("word") .select('word, 'frequency.sum as 'cnt) sTableEnv.toRetractStream[(String, Long)](result).print() sTableEnv.sqlQuery("select * from Words").toAppendStream[(String, Long)].print() sEnv.execute("TableDemo") }} 这里有两个地方:1、这里举例用了table的算子,和标准的sql查询语法,为了演示table的基本用法。 val result = sTableEnv .scan("Words") .groupBy("word") .select('word, 'frequency.sum as 'cnt) 这个分组聚合统计其实可以替换成: val result = sTableEnv.sqlQuery("select word,sum(frequency) as cnt from Words group by word") // 打印到控制台sTableEnv.toRetractStream(String, Long).print()那么这个与下面的查询结果有什么区别呢? sTableEnv.sqlQuery("select * from Words").toAppendStream[(String, Long)].print() 区别很明显,这里消费kafka的实时数据,那么Words表是一个动态的流表,数据在不断append,一个是group by的分组聚合,结果需要不断更新,比如当前是(hello,4),这时候又来了一个词语hello,就需要update结果为(hello,5),如果有新词,还需要insert,而后者是select * from Words,只是追加结果。 所以,这里只是展示打印到控制台的写法不同,前者调用的是toRetractStream方法,而后者是调用toAppendStream。
小六码奴 2019-12-02 02:02:15 0 浏览量 回答数 0

回答

理想情况是,实时与离线使用同一套SQL,同一套计算逻辑,同一个数据源,这样随时可以用离线脚本重跑历史数据。但是现实是没有哪个框架支持。所谓流批一体,都是在引擎层面,例如Spark的streaming和SQL都是batch的方式,流只是更小的批。而Flink则希望用流的方式去处理批数据,批只是有边界的流。针对高阶的SQLAPI,流批都有很大的区别。基于DeltaLake的分区表,将dw层的实时数据按时间分区,这样可以随时用离线作业恢复历史分区的数据。而DW之上的汇总因为数据量相对较小,恢复之后可以用流作业从头消费。
爱吃鱼的程序员 2020-12-28 11:28:49 0 浏览量 回答数 0

云产品推荐

上海奇点人才服务相关的云产品 小程序定制 上海微企信息技术相关的云产品 国内短信套餐包 ECS云服务器安全配置相关的云产品 开发者问答 阿里云建站 自然场景识别相关的云产品 万网 小程序开发制作 视频内容分析 视频集锦 代理记账服务 阿里云AIoT