开发者社区> 问答> 正文

从Cassandra查询的数据不能再次在同一列上过滤(InvalidQueryException)

我试图从cassandra中按时间查询大块数据,然后使用spark数据集来一次处理较小的块,但是,应用程序失败并出现无效的查询异常:

WARN 2018-11-22 13:16:54 org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 2.0 (TID 5, 192.168.1.212, executor 0): java.io.IOException: Exception during preparation of SELECT "userid", "event_time", "value" FROM "user_1234"."data" WHERE token("userid") > ? AND token("userid") <= ? AND "event_time" >= ? AND "event_time" >= ? AND "event_time" <= ? ALLOW FILTERING: More than one restriction was found for the start bound on event_time

    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.createStatement(CassandraTableScanRDD.scala:323)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.com$datastax$spark$connector$rdd$CassandraTableScanRDD

$$ fetchTokenRange(CassandraTableScanRDD.scala:339) at com.datastax.spark.connector.rdd.CassandraTableScanRDD $$

anonfun$17.apply(CassandraTableScanRDD.scala:366)

    at com.datastax.spark.connector.rdd.CassandraTableScanRDD

$$ anonfun$17.apply(CassandraTableScanRDD.scala:366) at scala.collection.Iterator $$

anon$12.nextCur(Iterator.scala:434)

    at scala.collection.Iterator

$$ anon$12.hasNext(Iterator.scala:440) at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12) at scala.collection.Iterator $$

anon$11.hasNext(Iterator.scala:408)

    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec

$$ anonfun$8 $$

anon$1.hasNext(WholeStageCodegenExec.scala:395)

    at org.apache.spark.sql.execution.SparkPlan

$$ anonfun$2.apply(SparkPlan.scala:234) at org.apache.spark.sql.execution.SparkPlan $$

anonfun$2.apply(SparkPlan.scala:228)

    at org.apache.spark.rdd.RDD

$$ anonfun$mapPartitionsInternal$1 $$

anonfun$apply$25.apply(RDD.scala:827)

    at org.apache.spark.rdd.RDD

$$ anonfun$mapPartitionsInternal$1 $$

anonfun$apply$25.apply(RDD.scala:827)

    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: More than one restriction was found for the start bound on event_time

    at com.datastax.driver.core.exceptions.InvalidQueryException.copy(InvalidQueryException.java:41)
    at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:28)
    at com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:108)
    at com.datastax.driver.dse.DefaultDseSession.prepare(DefaultDseSession.java:278)
    at com.datastax.spark.connector.cql.PreparedStatementCache$.prepareStatement(PreparedStatementCache.scala:45)

这是我试图执行的代码段:

case class RawDataModel(userid: String, event_time: Long, value: Double)
var dtRangeEnd = System.currentTimeMillis()
var dtRangeStart = (dtRangeEnd - (60 60 1000).toLong)

val queryTimeRange = "SELECT * FROM user1234.datafile WHERE event_time >= " + dtRangeStart

val dataFrame = sparkSession.sql(queryTimeRange)

import sparkSession.implicits._
val dataSet: Dataset[RawDataModel] = dataFrame.as[RawDataModel]

dataSet.show(1)

dtRangeEnd = System.currentTimeMillis()
dtRangeStart = (dtRangeEnd - (15 60 1000).toLong)

val dtRangeData = dataSet.filter(dataSet("event_time").between(dtRangeStart, dtRangeEnd))

dtRangeData.show(1)

展开
收起
社区小助手 2018-12-11 18:15:51 2565 0
1 条回答
写回答
取消 提交回答
  • 社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。

    spark合并sparkSession.sql(queryTimeRange)和dataSet.filter(dataSet("event_time").between(dtRangeStart, dtRangeEnd))到CQL指令中,像这样:

    SELECT“sensorid”,“event_time”,“value”FROM“company_5a819ee2522e572c8a16a43a”。“data”WHERE token(“sensorid”)>?AND令牌(“sensorid”)<=?和“event_time”> =?和“event_time”> =?AND“event_time”<=?

    在那里你在同一个领域得到两个相同的限制"event_time" >= ?。

    如果你坚持filter之前执行dataFrame. Spark将与.filter分开计算dataFrame:

    val dataFrame = sparkSession.sql(queryTimeRange)
    dataFrame.persist
    dataSet.filter(dataSet("event_time").between(dtRangeStart, dtRangeEnd))

    2019-07-17 23:19:58
    赞同 展开评论 打赏
问答分类:
问答地址:
问答排行榜
最热
最新

相关电子书

更多
基于HBase的海量数据查询与检索解析_游骐_202105_v3 立即下载
RowKey与索引设计:技巧与案例分析 立即下载
Phoenix 全局索引原理与实践 立即下载