在追加模式下激活水印和窗口-问答-阿里云开发者社区-阿里云

开发者社区> 问答> 正文

在追加模式下激活水印和窗口

社区小助手 2018-12-11 17:21:20 802

下面结构化的流媒体代码水印和窗口数据,24小时间隔,15分钟幻灯片。代码在附加模式下仅生成空的批处理0。在更新模式下,结果会正确显示。需要附加模式,因为S3接收器仅在附加模式下工作。

String windowDuration = "24 hours";
String slideDuration = "15 minutes";
Dataset sliding24h = rowData

    .withWatermark(eventTimeCol, slideDuration)
    .groupBy(functions.window(col(eventTimeCol), windowDuration, slideDuration),
            col(nameCol)).count();

sliding24h

    .writeStream()
    .format("console")
    .option("truncate", false)
    .option("numRows", 1000)
    .outputMode(OutputMode.Append())
    //.outputMode(OutputMode.Complete())
    .start()
    .awaitTermination();

以下是完整的测试代码:

public static void main(String [] args) throws StreamingQueryException {

 SparkSession spark = SparkSession.builder().master("local[*]").getOrCreate();

 ArrayList<String> rl = new ArrayList<>();
 for (int i = 0; i < 200; ++i) {
     long t = 1512164314L + i * 5 * 60;
     rl.add(t + ",qwer");
 }

 String nameCol = "name";
 String eventTimeCol = "eventTime";
 String eventTimestampCol = "eventTimestamp";

 MemoryStream<String> input = new MemoryStream<>(42, spark.sqlContext(), Encoders.STRING());
 input.addData(JavaConversions.asScalaBuffer(rl).toSeq());
 Dataset<Row> stream = input.toDF().selectExpr(
         "cast(split(value,'[,]')[0] as long) as " + eventTimestampCol,
         "cast(split(value,'[,]')[1] as String) as " + nameCol);

 System.out.println("isStreaming: " +  stream.isStreaming());

 Column eventTime = functions.to_timestamp(col(eventTimestampCol));
 Dataset<Row> rowData = stream.withColumn(eventTimeCol, eventTime);

 String windowDuration = "24 hours";
 String slideDuration = "15 minutes";
 Dataset<Row> sliding24h = rowData
         .withWatermark(eventTimeCol, slideDuration)
         .groupBy(functions.window(col(eventTimeCol), windowDuration, slideDuration),
                 col(nameCol)).count();

 sliding24h
         .writeStream()
         .format("console")
         .option("truncate", false)
         .option("numRows", 1000)
         .outputMode(OutputMode.Append())
         //.outputMode(OutputMode.Complete())
         .start()
         .awaitTermination();

}

分布式计算 数据安全/隐私保护 Spark
分享到
取消 提交回答
全部回答(1)
大数据
使用钉钉扫一扫加入圈子
+ 订阅

大数据计算实践乐园,近距离学习前沿技术

推荐文章
相似问题