开发者社区> 问答> 正文

官网文档和样例的不完整性和不严谨性的问题

Flink1.10的集群,用hdfs做backend

无论从flink最早的版本到flink 1.12都存在的一些文档和样例的不完整,或者说相同的代码,因输入源不同导致的结果差异。

比如说下面链接中的样例 https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/process_function.html

如果输入源分别为

  1. 一次性从内存中的List读取数据

  2. 一次性从文件目录读取读取数据

  3. 持续从文件目录读取数据

  4. 从socket流持续读取文件

上面的4者,只有3和4,对于KeyedStream的process(…)中使用ValueState 在处理onTimer函数时才会被触发调用,对于1和2是不会的。

相信其他的算子也存在类似的问题

具体代码如下:


package com.xxx.data.stream;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.IngestionTimeExtractor;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.util.Collector;

import javax.annotation.Nullable;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

public class KeyedStreamJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setParallelism(4);

//1.从内存获取数据
Tuple2<String, Integer> item = null;
List<Tuple2<String, Integer>> items = new ArrayList<>();
item = new Tuple2<>("k1", 1);
items.add(item);
item = new Tuple2<>("k3", 3);
items.add(item);
item = new Tuple2<>("k1", 10);
items.add(item);
item = new Tuple2<>("k2", 2);
items.add(item);
item = new Tuple2<>("k1", 100);
items.add(item);
item = new Tuple2<>("k2", 20);
items.add(item);
DataStreamSource<Tuple2<String, Integer>> streamSource = env.fromCollection(items);
SingleOutputStreamOperator<Tuple2<String, Integer>> listStream = streamSource.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Integer>>() {
@Nullable
@Override
public Watermark getCurrentWatermark() {
return null;
}

@Override
public long extractTimestamp(Tuple2<String, Integer> element, long previousElementTimestamp) {
System.out.println("---");
return System.currentTimeMillis();
}
});

//2.从文件夹一次性获取数据
SingleOutputStreamOperator<Tuple2<String, Integer>> fileStream = env.readTextFile("D:\\data", "UTF-8").map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return new Tuple2<>(value, 1);
}
})
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Integer>>() {
@Nullable
@Override
public Watermark getCurrentWatermark() {
return null;
}

@Override
public long extractTimestamp(Tuple2<String, Integer> element, long previousElementTimestamp) {
return System.currentTimeMillis();
}
});

//3.从文件夹持续获取数据
TypeInformation<String> typeInformation = BasicTypeInfo.STRING_TYPE_INFO;
TextInputFormat format = new TextInputFormat(new Path("D:\\data"));
format.setCharsetName("UTF-8");
//是否支持递归
format.setNestedFileEnumeration(true);
SingleOutputStreamOperator<Tuple2<String, Integer>> continuefileStream = env.readFile(format, "D:\\data", FileProcessingMode.PROCESS_CONTINUOUSLY, 6000L, typeInformation).map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return new Tuple2<>(value, 1);
}
})
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Integer>>() {
@Nullable
@Override
public Watermark getCurrentWatermark() {
return null;
}

@Override
public long extractTimestamp(Tuple2<String, Integer> element, long previousElementTimestamp) {
return System.currentTimeMillis();
}
});

//4.从socket中持续获取数据
SingleOutputStreamOperator<Tuple2<String, Integer>> socketStream = env.socketTextStream("localhost", 9999).map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return new Tuple2<>(value, 1);
}
})
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Integer>>() {
@Nullable
@Override
public Watermark getCurrentWatermark() {
return null;
}

@Override
public long extractTimestamp(Tuple2<String, Integer> element, long previousElementTimestamp) {
return System.currentTimeMillis();
}
});

//分别从1. 2. 3. 4. 测试数据的ValueState的超时触发,发现
//只有3.continuefileStream 4.socketStream 这些持续获取数据的可以触发onTimer
//至于1.listStream 2.fileStream 这些一次性获取书的不会触发onTimer
SingleOutputStreamOperator<Tuple2<String, Long>> sum = continuefileStream // listStream fileStream socketStream
.keyBy(0)
.process(new KeyedProcessFunction<Tuple, Tuple2<String, Integer>, Tuple2<String, Long>>() {
private ValueState<SumWithTimeStamp> sum;
private final SimpleDateFormat yyyyMMddHHmmss = new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss.SSS");

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
StateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(Time.seconds(1L)).returnExpiredIfNotCleanedUp().updateTtlOnReadAndWrite().useProcessingTime().build();
ValueStateDescriptor<SumWithTimeStamp> valueStateDescriptor = new ValueStateDescriptor<SumWithTimeStamp>("sum", SumWithTimeStamp.class);
// valueStateDescriptor.enableTimeToLive(stateTtlConfig);

sum = getRuntimeContext().getState(valueStateDescriptor);
}

@Override
public void processElement(Tuple2<String, Integer> item, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
SumWithTimeStamp sumValue = sum.value();
if (sumValue == null) {
sumValue = new SumWithTimeStamp();
sumValue.key = item.f0;
// Thread.sleep(1500L);
// Date cur = new Date();
// cur.setTime(ctx.timestamp());
// System.out.println("ini " + ctx.getCurrentKey().toString() + yyyyMMddHHmmss.format(cur));
sumValue.sum += item.f1.longValue();
sumValue.lastModified = ctx.timestamp();
sum.update(sumValue);
ctx.timerService().registerProcessingTimeTimer(sumValue.lastModified + 3*1000);
System.out.println("ini " + ctx.getCurrentKey().toString() + " item:" + item.toString() + " sum:" + sum.value().sum);
} else {
sumValue.sum += item.f1.longValue();
sumValue.lastModified = ctx.timestamp();
sum.update(sumValue);
// ctx.timerService().registerProcessingTimeTimer(sumValue.lastModified + 5*1000);
System.out.println("up " + ctx.getCurrentKey().toString() + " item:" + item.toString() + " sum:" + sum.value().sum);
}
// Date cur = new Date();
// cur.setTime(ctx.timestamp());
// System.out.println("up " + ctx.getCurrentKey().toString() + yyyyMMddHHmmss.format(cur));
// Thread.sleep(1500L);

}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {
// super.onTimer(timestamp, ctx, out);
System.out.println("-------" + ctx.getCurrentKey().toString());
if (timestamp <= sum.value().lastModified + 5000) {
out.collect(new Tuple2<String, Long>(sum.value().key, sum.value().sum));
// sum.clear();
}
}
});

sum.print();

//continueSum(streamSource);
env.execute("keyedSteamJob");
// System.in.read();
}

public static void continueSum(DataStreamSource<Tuple2<String, Integer>> streamSource) {
streamSource
//by 1
//.assignTimestampsAndWatermarks(new IngestionTimeExtractor())
.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
})
// .window(TumblingEventTimeWindows.of(Time.milliseconds(10L)))
.sum(1)
.print("+++++++++++++++++++++++++++");

}

public static class SumWithTimeStamp {
public String key;
public long sum;
public long lastModified;
}
}*来自志愿者整理的flink邮件归档

展开
收起
小阿怪 2021-12-06 11:50:20 604 0
1 条回答
写回答
取消 提交回答
  • Hi, xuefli.

    非常感谢你指出文档的问题!

    由于邮件中看代码比较吃力(没有语法高亮以及排版的问题),我只是粗略地看了下代码。

    当输入源 为 一次性从内存中的List读取数据,无法触发onTimer。 实际的例子中,我看到看到采用的是process time,且延时 3s 触发 。我怀疑是不是,数据量太少,所以程序很快就结束了导致没来得及触发timer,建议改成event time试试这种情况。

    Best, Shengkai来自志愿者整理的flink邮件归档

    2021-12-06 12:47:19
    赞同 展开评论 打赏
问答地址:
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载