开发者社区> 问答> 正文

Flink和Beam SDK如何处理窗口 - 哪个更有效?

我将Apache Beam SDK与用于流处理的Flink SDK进行比较,以确定使用Beam作为附加框架的成本/优势。

我有一个非常简单的设置,其中从Kafka源读取数据流并由运行Flink的节点集群并行处理。

根据我对这些SDK如何工作的理解,按窗口处理数据窗口流的最简单方法是:

使用Apache Beam(在Flink上运行):

1.1。创建管道对象。

1.2。创建一个Kafka记录的PCollection。

1.3。应用窗口函数。

1.4。按窗口将管道转换为键。

1.5。按键(窗口)分组记录。

1.6。应用窗口记录所需的任何功能。

使用Flink SDK

2.1。从Kafka源创建数据流。

2.2。通过提供关键功能将其转换为键控流。

2.3。应用窗口函数。

2.4。应用窗口记录所需的任何功能。

虽然Flink解决方案在程序上看起来更简洁,但根据我的经验,它在大量数据方面的效率较低。我只能想象密钥提取功能引入了开销,因为Beam不需要这一步骤。

我的问题是:这些过程不相同吗?有什么可以解释Beam方式更有效率,因为它使用Flink作为运行者(并且所有其他条件都相同)?

这是使用Beam SDK的代码

PipelineOptions options = PipelineOptionsFactory.create();

//Run with Flink
FlinkPipelineOptions flinkPipelineOptions = options.as(FlinkPipelineOptions.class);
flinkPipelineOptions.setRunner(FlinkRunner.class);
flinkPipelineOptions.setStreaming(true);
flinkPipelineOptions.setParallelism(-1); //Pick this up from the user interface at runtime

// Create the Pipeline object with the options we defined above.
Pipeline p = Pipeline.create(flinkPipelineOptions);

// Create a PCollection of Kafka records
PCollection<KafkaRecord<byte[], byte[]>> kafkaCollection = p.apply(KafkaIO.<Long, String>readBytes()
        .withBootstrapServers(KAFKA_IP + ":" + KAFKA_PORT)
        .withTopics(ImmutableList.of(REAL_ENERGY_TOPIC, IT_ENERGY_TOPIC))
        .updateConsumerProperties(ImmutableMap.of("group.id", CONSUMER_GROUP)));

//Apply Windowing Function    
PCollection<KafkaRecord<byte[], byte[]>> windowedKafkaCollection = kafkaCollection.apply(Window.into(SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1))));

//Transform the pipeline to key by window
PCollection<KV<IntervalWindow, KafkaRecord<byte[], byte[]>>> keyedByWindow =
        windowedKafkaCollection.apply(
                ParDo.of(
                        new DoFn<KafkaRecord<byte[], byte[]>, KV<IntervalWindow, KafkaRecord<byte[], byte[]>>>() {
                            @ProcessElement
                            public void processElement(ProcessContext context, IntervalWindow window) {
                                context.output(KV.of(window, context.element()));
                            }
                        }));
//Group records by key (window)
PCollection<KV<IntervalWindow, Iterable<KafkaRecord<byte[], byte[]>>>> groupedByWindow = keyedByWindow
        .apply(GroupByKey.<IntervalWindow, KafkaRecord<byte[], byte[]>>create());

//Process windowed data
PCollection<KV<IIntervalWindowResult, IPueResult>> processed = groupedByWindow
        .apply("filterAndProcess", ParDo.of(new PueCalculatorFn()));

// Run the pipeline.
p.run().waitUntilFinish();

这是使用Flink SDK的代码

// Create a Streaming Execution Environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setParallelism(6);

//Connect to Kafka
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", KAFKA_IP + ":" + KAFKA_PORT);
properties.setProperty("group.id", CONSUMER_GROUP);

DataStream stream = env

        .addSource(new FlinkKafkaConsumer010<>(Arrays.asList(REAL_ENERGY_TOPIC, IT_ENERGY_TOPIC), new JSONDeserializationSchema(), properties));

//Key by id
stream.keyBy((KeySelector) jsonNode -> jsonNode.get("id").asInt())

    //Set the windowing function.
    .timeWindow(Time.seconds(5L), Time.seconds(1L))

    //Process Windowed Data
    .process(new PueCalculatorFn(), TypeInformation.of(ImmutablePair.class));

// execute program
env.execute("Using Flink SDK");

我想我应该添加一些可能相关的指标。

网络接收字节
Flink SDK
taskmanager.2
2644786446
taskmanager.3
2645765232
taskmanager.1
2827676598
taskmanager.6
2422309148
taskmanager.4
2428570491
taskmanager.5
2431368644
beam
taskmanager.2
4092154160
taskmanager.3
4435132862
taskmanager.1
4766399314
taskmanager.6
4425190393
taskmanager.4
4096576110
taskmanager.5
4092849114
CPU利用率(最大值)
Flink SDK
taskmanager.2
93.00%
taskmanager.3
92.00%
taskmanager.1
91.00%
taskmanager.6
90.00%
taskmanager.4
90.00%
taskmanager.5
92.00%
beam
taskmanager.2
52.0%
taskmanager.3
71.0%
taskmanager.1
72.0%
taskmanager.6
40.0%
taskmanager.4
56.0%
taskmanager.5
26.0%
Beam似乎使用了更多的网络,而Flink使用了更多的CPU。这是否表明Beam以更有效的方式并行处理?

编辑No2
我很确定PueCalculatorFn类是等价的,但我会在这里分享代码,看看两个进程之间是否有明显的差异。

public class PueCalculatorFn extends DoFn>>, KV> implements Serializable {
private transient List realEnergyRecords;
private transient List itEnergyRecords;

@ProcessElement
public void procesElement(DoFn>>, KV>.ProcessContext c, BoundedWindow w) {

KV<IntervalWindow, Iterable<KafkaRecord<byte[], byte[]>>> element = c.element();
Instant windowStart = Instant.ofEpochMilli(element.getKey().start().getMillis());
Instant windowEnd = Instant.ofEpochMilli(element.getKey().end().getMillis());
Iterable<KafkaRecord<byte[], byte[]>> records = element.getValue();

//Calculate Pue
IPueResult result = calculatePue(element.getKey(), records);

//Create IntervalWindowResult object to return
DateTimeFormatter formatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.of("UTC"));
IIntervalWindowResult intervalWindowResult = new IntervalWindowResult(formatter.format(windowStart),
        formatter.format(windowEnd), realEnergyRecords, itEnergyRecords);

//Return Pue keyed by Window
c.output(KV.of(intervalWindowResult, result));

}

private PueResult calculatePue(IntervalWindow window, Iterable> records) {

//Define accumulators to gather readings
final DoubleAccumulator totalRealIncrement = new DoubleAccumulator((x, y) -> x + y, 0.0);
final DoubleAccumulator totalItIncrement = new DoubleAccumulator((x, y) -> x + y, 0.0);

//Declare variable to store the result
BigDecimal pue = BigDecimal.ZERO;

//Initialise transient lists
realEnergyRecords = new ArrayList<>();
itEnergyRecords = new ArrayList<>();

//Transform the results into a stream
Stream<KafkaRecord<byte[], byte[]>> streamOfRecords = StreamSupport.stream(records.spliterator(), false);

//Iterate through each reading and add to the increment count
streamOfRecords
        .map(record -> {
            byte[] valueBytes = record.getKV().getValue();
            assert valueBytes != null;
            String valueString = new String(valueBytes);
            assert !valueString.isEmpty();
            return KV.of(record, valueString);
        }).map(kv -> {
    Gson gson = new GsonBuilder().registerTypeAdapter(KafkaConsumption.class, new KafkaConsumptionDeserialiser()).create();
    KafkaConsumption consumption = gson.fromJson(kv.getValue(), KafkaConsumption.class);
    return KV.of(kv.getKey(), consumption);

}).forEach(consumptionRecord -> {
            switch (consumptionRecord.getKey().getTopic()) {
                case REAL_ENERGY_TOPIC:
                    totalRealIncrement.accumulate(consumptionRecord.getValue().getEnergyConsumed());
                    realEnergyRecords.add(consumptionRecord.getValue());
                    break;
                case IT_ENERGY_TOPIC:
                    totalItIncrement.accumulate(consumptionRecord.getValue().getEnergyConsumed());
                    itEnergyRecords.add(consumptionRecord.getValue());
                    break;
            }
        }
);

assert totalRealIncrement.doubleValue() > 0.0;
assert totalItIncrement.doubleValue() > 0.0;

//Beware of division by zero
if (totalItIncrement.doubleValue() != 0.0) {
    //Calculate PUE
    pue = BigDecimal.valueOf(totalRealIncrement.getThenReset()).divide(BigDecimal.valueOf(totalItIncrement.getThenReset()), 9, BigDecimal.ROUND_HALF_UP);
}

//Create a PueResult object to return
IWindow intervalWindow = new Window(window.start().getMillis(), window.end().getMillis());
return new PueResult(intervalWindow, pue.stripTrailingZeros());

}

@Override
protected void finalize() throws Throwable {

super.finalize();
RecordSenderFactory.closeSender();
WindowSenderFactory.closeSender();

}
}
flinkpublic class PueCalculatorFn extends ProcessWindowFunction {
private transient List realEnergyRecords;
private transient List itEnergyRecords;

@Override
public void process(Integer integer, Context context, Iterable iterable, Collector collector) throws Exception {

Instant windowStart = Instant.ofEpochMilli(context.window().getStart());
Instant windowEnd = Instant.ofEpochMilli(context.window().getEnd());
BigDecimal pue = calculatePue(iterable);

//Create IntervalWindowResult object to return
DateTimeFormatter formatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.of("UTC"));
IIntervalWindowResult intervalWindowResult = new IntervalWindowResult(formatter.format(windowStart),
        formatter.format(windowEnd), realEnergyRecords
        .stream()
        .map(e -> (IKafkaConsumption) e)
        .collect(Collectors.toList()), itEnergyRecords
        .stream()
        .map(e -> (IKafkaConsumption) e)
        .collect(Collectors.toList()));
//Create PueResult object to return
IPueResult pueResult = new PueResult(new Window(windowStart.toEpochMilli(), windowEnd.toEpochMilli()), pue.stripTrailingZeros());

//Collect result
collector.collect(new ImmutablePair<>(intervalWindowResult, pueResult));

}

protected BigDecimal calculatePue(Iterable iterable) {

//Define accumulators to gather readings
final DoubleAccumulator totalRealIncrement = new DoubleAccumulator((x, y) -> x + y, 0.0);
final DoubleAccumulator totalItIncrement = new DoubleAccumulator((x, y) -> x + y, 0.0);

//Declare variable to store the result
BigDecimal pue = BigDecimal.ZERO;

//Initialise transient lists
realEnergyRecords = new ArrayList<>();
itEnergyRecords = new ArrayList<>();

//Iterate through each reading and add to the increment count
StreamSupport.stream(iterable.spliterator(), false)
        .forEach(object -> {
            switch (object.get("topic").textValue()) {
                case REAL_ENERGY_TOPIC:
                    totalRealIncrement.accumulate(object.get("energyConsumed").asDouble());
                    realEnergyRecords.add(KafkaConsumptionDeserialiser.deserialize(object));
                    break;
                case IT_ENERGY_TOPIC:
                    totalItIncrement.accumulate(object.get("energyConsumed").asDouble());
                    itEnergyRecords.add(KafkaConsumptionDeserialiser.deserialize(object));
                    break;
            }

        });

assert totalRealIncrement.doubleValue() > 0.0;
assert totalItIncrement.doubleValue() > 0.0;

//Beware of division by zero
if (totalItIncrement.doubleValue() != 0.0) {
    //Calculate PUE
    pue = BigDecimal.valueOf(totalRealIncrement.getThenReset()).divide(BigDecimal.valueOf(totalItIncrement.getThenReset()), 9, BigDecimal.ROUND_HALF_UP);
}
return pue;

}

}
这是我在Beam示例中使用的自定义反序列化器。

KafkaConsumptionDeserialiser
public class KafkaConsumptionDeserialiser implements JsonDeserializer {

public KafkaConsumption deserialize(JsonElement jsonElement, Type type, JsonDeserializationContext jsonDeserializationContext) throws JsonParseException {

if(jsonElement == null) {
    return null;
} else {
    JsonObject jsonObject = jsonElement.getAsJsonObject();
    JsonElement id = jsonObject.get("id");
    JsonElement energyConsumed = jsonObject.get("energyConsumed");
    Gson gson = (new GsonBuilder()).registerTypeAdapter(Duration.class, new DurationDeserialiser()).registerTypeAdapter(ZonedDateTime.class, new ZonedDateTimeDeserialiser()).create();
    Duration duration = (Duration)gson.fromJson(jsonObject.get("duration"), Duration.class);
    JsonElement topic = jsonObject.get("topic");
    Instant eventTime = (Instant)gson.fromJson(jsonObject.get("eventTime"), Instant.class);
    return new KafkaConsumption(Integer.valueOf(id != null?id.getAsInt():0), Double.valueOf(energyConsumed != null?energyConsumed.getAsDouble():0.0D), duration, topic != null?topic.getAsString():"", eventTime);
}

}

}

展开
收起
flink小助手 2018-12-10 13:36:33 3678 0
1 条回答
写回答
取消 提交回答
  • flink小助手会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关flink的问题及回答。

    不确定为什么你写的Beam管道更快,但在语义上它与Flink作业不同。类似于窗口在Flink中的工作方式,一旦在Beam中分配窗口,所有后续操作都会自动考虑窗口。您不需要按窗口分组。

    您的Beam管道定义可以简化如下:

    // Create the Pipeline object with the options we defined above.
    Pipeline p = Pipeline.create(flinkPipelineOptions);

    // Create a PCollection of Kafka records
    PCollection> kafkaCollection = ...

    //Apply Windowing Function
    PCollection> windowedKafkaCollection = kafkaCollection.apply(
    Window.into(SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1))));

    //Process windowed data
    PCollection> processed = windowedKafkaCollection

    .apply("filterAndProcess", ParDo.of(new PueCalculatorFn()));
    

    // Run the pipeline.
    p.run().waitUntilFinish();
    至于性能,它取决于许多因素,但请记住,Beam是Flink上的抽象层。一般来说,如果你看到Beam on Flink的性能提升,我会感到惊讶。

    编辑:为了进一步说明,您不要对Beam管道中的JSON“id”字段进行分组,您可以在Flink代码段中进行分组。

    2019-07-17 23:19:13
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载