下面结构化的流媒体代码水印和窗口数据,24小时间隔,15分钟幻灯片。代码在附加模式下仅生成空的批处理0。在更新模式下,结果会正确显示。需要附加模式,因为S3接收器仅在附加模式下工作。
String windowDuration = "24 hours";
String slideDuration = "15 minutes";
Dataset sliding24h = rowData
.withWatermark(eventTimeCol, slideDuration)
.groupBy(functions.window(col(eventTimeCol), windowDuration, slideDuration),
col(nameCol)).count();
sliding24h
.writeStream()
.format("console")
.option("truncate", false)
.option("numRows", 1000)
.outputMode(OutputMode.Append())
//.outputMode(OutputMode.Complete())
.start()
.awaitTermination();
以下是完整的测试代码:
public static void main(String [] args) throws StreamingQueryException {
SparkSession spark = SparkSession.builder().master("local[*]").getOrCreate();
ArrayList<String> rl = new ArrayList<>();
for (int i = 0; i < 200; ++i) {
long t = 1512164314L + i * 5 * 60;
rl.add(t + ",qwer");
}
String nameCol = "name";
String eventTimeCol = "eventTime";
String eventTimestampCol = "eventTimestamp";
MemoryStream<String> input = new MemoryStream<>(42, spark.sqlContext(), Encoders.STRING());
input.addData(JavaConversions.asScalaBuffer(rl).toSeq());
Dataset<Row> stream = input.toDF().selectExpr(
"cast(split(value,'[,]')[0] as long) as " + eventTimestampCol,
"cast(split(value,'[,]')[1] as String) as " + nameCol);
System.out.println("isStreaming: " + stream.isStreaming());
Column eventTime = functions.to_timestamp(col(eventTimestampCol));
Dataset<Row> rowData = stream.withColumn(eventTimeCol, eventTime);
String windowDuration = "24 hours";
String slideDuration = "15 minutes";
Dataset<Row> sliding24h = rowData
.withWatermark(eventTimeCol, slideDuration)
.groupBy(functions.window(col(eventTimeCol), windowDuration, slideDuration),
col(nameCol)).count();
sliding24h
.writeStream()
.format("console")
.option("truncate", false)
.option("numRows", 1000)
.outputMode(OutputMode.Append())
//.outputMode(OutputMode.Complete())
.start()
.awaitTermination();
}
这是2.4.0中已解决的错误请参阅:https : //issues.apache.org/jira/browse/SPARK-26167 https://issues.apache.org/jira/browse/SPARK-24156
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。