克服Flink SQL限制的混合API方法

简介: 本文探讨了在 Apache Flink 中结合使用 Flink SQL 与 DataStream API 的混合编程模式,以克服 SQL 在处理 Kafka 数据时的两大局限:缺乏对坏记录的容错机制(如死信队列)和 Avro Enum、TimestampMicros 等类型映射缺失。通过 DataStream API 实现数据预处理与后处理,可在保障数据完整性的同时,发挥 SQL 的易用性,构建更健壮、灵活的流式数据管道。

作者:Gal Krispel

翻译:黄鹏程 阿里云实时计算 Flink 版产品负责人

阅读时间:11分钟 · 2025年10月19日

译者注:

本博客文章探讨了 Apache Flink 中的混合 API 方法如何帮助克服 Flink SQL 的一些固有限制,特别是在与 Apache Kafka 集成时。文章深入探讨了两个常见挑战:

  1. 对格式错误记录的有效错误处理;

  2. Avro 的 Enum 和 TimestampMicros 类型在数据类型映射方面的限制。

DataStream API 和 ProcessFunction API 凭借其更底层的控制能力,可用于强大的模式验证和死信队列(DLQ)实现。这种预处理步骤通过优雅地处理损坏的记录而不导致应用程序重启,保护了输入 Flink SQL 的数据完整性。

此外,Flink SQL 的数据类型映射问题可以通过将 Flink Table 转换回DataStream 并应用自定义 RichMapFunction 来缓解。这允许对序列化进行精确控制,从而在将数据写回 Kafka 时正确处理 Avro Enum 和 TimestampMicros 类型。

虽然 Flink SQL 提供了高度声明性和用户友好的接口,但将其与 DataStream API 的细粒度控制相结合,为复杂的现实世界流式挑战提供了强大而灵活的解决方案。鼓励 Flink 用户考虑如何在应用程序中战略性地切换这些 API,以解锁更大的健壮性、灵活性和控制力,从而构建更具弹性和功能丰富的数据管道。

Apache Flink 是一个强大的数据处理框架,它提供了一个高吞吐量、低延迟的运行时环境,能够以统一的方式处理无界流数据和有界批数据。Flink 提供了多种 API,从类似 SQL 的声明式接口到基于操作符的底层接口,使团队能够构建从实时分析管道到事件驱动应用程序的各种应用,并精确控制所需的功能级别。

Apache Flink 提供了多种实现作业的接口。最受欢迎的接口是 DataStream API,这是一种用于流作业的命令式、底层接口,可对操作符、状态、序列化和错误处理提供强有力的控制。它还提供了插入自定义连接器的能力,以进一步提高灵活性,从而扩展了社区支持的开源连接器的种类。

第二个流行的接口是 Flink SQL(由 Table API 驱动),这是一种更高级别的声明式语法,允许用户使用 ANSI 标准 SQL语法定义数据处理管道。众多连接器支持 Table API,大大简化了编写数据处理作业复杂代码的开销。这一特性被技术背景较弱的用户或不需要掌握 Apache Flink 专业知识的自助服务场景广泛采用。假设每个业务利益相关者都对 Apache Flink 有超出基本水平的理解,那么 Flink SQL 可能就不会像今天这样被广泛采用。然而,现实中并非如此,其声明式语法和易用性使其对没有软件工程背景的数据分析专业人员特别有吸引力。如前所述,它也可用于自助服务场景。

使用Flink SQL的隐藏陷阱

对于流式用例,一个流行的选择是将 Table API 与 Apache Kafka 结合使用,并采用 avro-confluent 格式以确保严格的模式保证。Confluent Schema Registry 确保了减少的有效载荷大小、模式一致性和兼容性,以及用于与注册表交互的简单 REST 接口。

Flink SQL 为原生流式 Kafka Consumer & Producer 流程或 Kafka Streams 应用程序提供了一种简单的替代方案。这些应用程序从主题读取流数据,通过连接、过滤、聚合和窗口等数据操作对其进行处理,然后将输出生成到另一个流中进行进一步处理。这些应用程序带有技术和操作开销——它们需要对 Apache Kafka 有深入的理解,并处理应用程序状态优化。

与编写完整的 Kafka Streams 应用程序相比,Flink SQL 接口允许您以声明方式定义管道,就像使用 SQL 一样。虽然这听起来很理想,但生产用例往往会达不到这种期望。

将 Flink SQL 与 Apache Kafka 结合使用的两个常见挑战是:

1. 缺乏有效的错误处理模式,例如死信队列(DLQ)。在 Apache Kafka 主题中处理模式不兼容记录的情况并不少见,因为模式兼容性没有被强制执行,或者仅仅是因为生产者应用程序的 bug 导致为主题生成了错误的模式。当 Table API 消费者遇到由于故障记录导致的反序列化错误时,它会立即使用快速失败策略并重启应用程序。不幸的是,这种策略在处理坏记录时是无效的,因为它在设置"avro"或"confluent-avro"格式时无法跳过故障记录。应用程序将陷入重启模式,手动干预是唯一能从这种情况中恢复的方法。由于 Kafka 连接器中的 Avro 反序列化不支持 skip-on-error 标志,Flink SQL 缺乏任何真正的错误处理选项。

2. 数据类型映射限制,因为 Flink SQL 类型可能无法精确表示原生 Avro 类型。Flink SQL 的一个显著限制是 Enum 类型处理——所有 Enum 字段都被解释为字符串。从处理端来看,Enum 字段与 String 字段无法区分是有道理的,但在尝试将该字段作为 Enum 类型重新生成到 Apache Kafka 时会产生序列化错误,因为 Flink SQL 中没有官方支持 Enum 数据类型。Avro LogicalType 的 TimestampMicros 也存在类似问题,因为 Flink SQL 不支持按原样读写 timestamp 字段,而只能作为 BIGINT。

混合接口方法

如前所述,由于 DataStream API 和 ProcessFunction API 的底层特性,它们比 Flink SQL 接口具有更丰富的功能集。同时,Flink SQL 接口对于没有深入了解 Apache Flink 的用户来说更加友好和易于使用。在 Apache Flink 运行时中,可以在两种 API 之间切换,这在通用应用程序中是有意义的。

想象一个提供自助服务的应用程序,该应用程序的用户输入将是基于 Table API 的 SQL 进行数据处理,而接收器连接器将使用 DataStream API 或 ProcessFunction API 进行优化写入目标。甚至可能存在来回切换API的用例。

在下面的部分中,将提供一个如何将 Table API 与底层 API 的更深层次功能相结合的演练,例如使用 RichMapFunction 实时转换记录以克服 Enum 序列化问题,以及使用 ProcessFunction 处理坏记录并将其转移到侧输出。

在Flink SQL之前验证模式:DLQ的底层API方法

Flink SQL 允许用户以声明方式处理数据。虽然运行时数据处理错误不是典型场景,但在 SQL 处理步骤之前的模式反序列化阶段经常会遇到格式错误的记录。初步的 Kafka 源流将确保正确编码数据的已验证流,其中坏记录不会被跳过,而是被转移到侧 Kafka 主题,这是流应用程序中众所周知的错误处理模式。正确编码记录的已验证流将被转换为 Table API 进行 SQL 处理。同时,DLQ Kafka 主题将用于存放不兼容或故障记录,而不会对主处理流造成任何运行时异常。

第一步是为验证流设置 Kafka 源连接器。它应该以最原始的形式接收记录,即字节数组,无需事先反序列化。

package com.example.flink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class KafkaSourceFactory {
   
    public static KafkaSource<ConsumerRecord<byte[], byte[]>> createRawBytesSource(
            String bootstrapServers, String topic, String groupId) {
   
        return KafkaSource.<ConsumerRecord<byte[], byte[]>>builder()
                .setBootstrapServers(bootstrapServers)
                .setTopics(topic)
                .setGroupId(groupId)
                .setDeserializer(new RawBytesDeserializer())
                .build();
    }
}

RawBytesDeserializer 的实现非常简单,如下所示:

public class RawBytesDeserializer implements KafkaRecordDeserializationSchema<ConsumerRecord<byte[], byte[]>> {
   
    @Override
    public void deserialize(org.apache.kafka.clients.consumer.ConsumerRecord<byte[], byte[]> record, Collector<ConsumerRecord<byte[], byte[]>> out) {
   
        out.collect(record);
    }
    @Override
    public TypeInformation<ConsumerRecord<byte[], byte[]>> getProducedType() {
   
        return TypeInformation.of(new TypeHint<ConsumerRecord<byte[], byte[]>>(){
   });
    }
}

此外,必须为坏记录的 DLQ 输出设置 Kafka 接收器连接器。

package com.example.flink;

import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class DLQKafkaSinkFactory {
   
    public static KafkaSink<ConsumerRecord<byte[], byte[]>> createDLQSink(
            String bootstrapServers, String dlqTopic) {
   
        return KafkaSink.<ConsumerRecord<byte[], byte[]>>builder()
                .setBootstrapServers(bootstrapServers)
                .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                .setRecordSerializer(new DlqRecordSerializer(dlqTopic))
                .build();
    }
}

DlqRecordSerializer 是一个简单的 Kafka 记录序列化器,它将坏记录作为原始字节数组生成到 DLQ 主题。

public class DlqRecordSerializer implements KafkaRecordSerializationSchema<ConsumerRecord<byte[], byte[]>> {
   
    private final String topic;

    public DlqRecordSerializer(String topic) {
   
        this.topic = topic;
    }

    @Override
    public ProducerRecord<byte[], byte[]> serialize(ConsumerRecord<byte[], byte[]> element, KafkaSinkContext context, Long timestamp) {
   
        return new ProducerRecord<>(topic, null, null, element.key(), element.value(), element.headers());
    }
}

ProcessFunction 将确定记录是否正确编码。如果验证失败,故障记录将被标记并转移到 DLQ 输出接收器。

package com.example.flink;

import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class SchemaValidationFunction extends ProcessFunction<ConsumerRecord<byte[], byte[]>, Tuple2<String, GenericRecord>> {
    private static final OutputTag<ConsumerRecord<byte[], byte[]>> DLQ_TAG = new OutputTag<>("dlq-output") {};

    private final boolean dlqEnabled;
    private final ConfluentRegistryAvroDeserializationSchema<GenericRecord> deserializer;

    public SchemaValidationFunction(
            boolean dlqEnabled,
            ConfluentRegistryAvroDeserializationSchema<GenericRecord> deserializer) {
        this.dlqEnabled = dlqEnabled;
        this.deserializer = deserializer;
    }

    @Override
    public void processElement(
            ConsumerRecord<byte[], byte[]> record,
            Context ctx,
            Collector<Tuple2<String, GenericRecord>> out) throws Exception {
        try {
            GenericRecord value = deserializer.deserialize(record.value());
            if (value == null) throw new RuntimeException("null after deserialization");

            String key = record.key() == null ? null : new String(record.key());
            out.collect(Tuple2.of(key, value));
        } catch (Exception e) {
            if (dlqEnabled) {
                ctx.output(DLQ_TAG, record);
            } else {
                throw e;
            }
        }
    }

    public static OutputTag<ConsumerRecord<byte[], byte[]>> getDlqTag() {
        return DLQ_TAG;
    }
}

上述资源将在主流作业中初始化。下面是一个示例应用程序,它使用 DataStream 读取 Kafka 主题,验证 Avro 编码,然后将已验证的记录发送到下游进行进一步的 SQL 处理。故障记录将生成到 DLQ 主题。

// 获取输入Avro模式并设置反序列化器和转换器
org.apache.avro.Schema avroSchema = schemaManager.fetchSchema(subject, version);
ConfluentRegistryAvroDeserializationSchema<GenericRecord> deserializer = 
    ConfluentRegistryAvroDeserializationSchema.forGeneric(avroSchema, registryUrl);
RowType rowType = (RowType) AvroSchemaConverter.convertToDataType(avroSchema.toString()).getLogicalType();
AvroToRowDataConverters.AvroToRowDataConverter converter = AvroToRowDataConverters.createRowConverter(rowType);

// 使用DataStream API读取原始消息
KafkaSource<ConsumerRecord<byte[], byte[]>> kafkaSource = KafkaSourceFactory.createRawBytesSource(props);
DataStream<ConsumerRecord<byte[], byte[]>> rawStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source");

TypeInformation<Tuple2<String, GenericRecord>> sourceTupleTypeInfo = 
    new TupleTypeInfo<>(TypeInformation.of(String.class), new GenericRecordAvroTypeInfo(avroSchema));

// 为已验证记录设置已验证流
SingleOutputStreamOperator<Tuple2<String, GenericRecord>> validatedRecordStream = rawStream
    .process(new SchemaValidationFunction(dlqEnabled, deserializer))
    .returns(sourceTupleTypeInfo);

// 如果启用,则创建DLQ接收器
if (dlqEnabled && !dlqTopic.isEmpty()) {
    DataStream<ConsumerRecord<byte[], byte[]>> dlqStream = validatedRecordStream.getSideOutput(SchemaValidationFunction.getDlqTag());
    KafkaSink<ConsumerRecord<byte[], byte[]>> dlqSink = DLQKafkaSinkFactory.createDLQSink(props, dlqTopic);
    dlqStream.sinkTo(dlqSink);
}

// 准备表环境基础表结构
RowType originalRowType = (RowType) AvroSchemaConverter.convertToDataType(avroSchema.toString()).getLogicalType();

// 操作已验证流以包含元数据,保留键以供上下文使用(当前未使用)
DataStream<RowData> rowStreamWithMetadata = validatedRecordStream
    .map(tuple -> (RowData) converter.convert(tuple.f1))
    .returns(InternalTypeInfo.of(originalRowType));

// 定义表的模式
Schema.Builder schemaBuilder = Schema.newBuilder();
schemaBuilder.fromRowDataType(AvroSchemaConverter.convertToDataType(inputAvroSchema.toString()));

// 将DataStream注册为具有avro模式的表
Table t = tableEnv.fromDataStream(rowStreamWithMetadata, schemaBuilder.build());
tableEnv.createTemporaryView("InputTable", t);

// 处理SQL查询
tableEnv.sqlQuery(sqlQuery);

提供的代码片段展示了如何有效地结合底层 API 进行强大的模式验证和 DLQ 错误处理,以及 Flink SQL 的声明式优势进行后续数据处理。

未来工作

使用 DataStream API 克服 Flink SQL 数据类型映射限制

Table API 可以实现用户的 Flink SQL 查询并将其接收到底层各种接收器连接器中,包括 Apache Kafka。然而,如上所述,它在数据类型映射方面有一些限制:

  • Enum 类型被转换为 String
  • Flink SQL 不支持精度高于3(毫秒)的 TIMESTAMP

这可能会非常令人沮丧。通常,用户的意图是让 Apache Flink 使用现有模式,而不是让它自己注册模式。如果有技术要求阻止 Apache Flink 注册模式,那么所有带有 TimestampMicros 或 Enum 的模式都无法写入 Apache Kafka 接收器主题,这使得它们在 Flink SQL 中实际上不受支持。

虽然 Enum 数据类型和 TimestampMicros Avro 类型在 Flink SQL 中不受支持,但它们在 DataStream API 中完全受支持,当将 GenericRecord 或 SpecificRecord 写回 Kafka 时。

之前展示了一个如何将 DataStream 转换为 Flink Table 以确保有效数据在 Flink SQL 中处理的示例。要克服这个问题,必须进行相反的转换——将 Flink Table 转换回 DataStream 并将其接收至 Kafka。

从 TableAPI 切换到 DataStream 很简单,可以通过 toDataStream 完成。然而,这种转换将产生 DataStream,这是一种与将 Avro 编码记录写入 Kafka 不兼容的格式。需要应用用户定义函数来执行从 DataStream 到 DataStream 的转换。在该函数中,将应用自定义代码将数据写回 Kafka,包括 Flink SQL 不支持的字段,如 Enum 或 TimestampMicros。

下面是如何为原始类型、Enum 和 Timestamp 编写从 Row 到 GenericRecord 的映射器的示例(扩展映射器以支持您的模式选择;下面的示例处理特定情况和一般基本原语)。

class AvroRowMapper extends RichMapFunction<Row, GenericRecord> {
   
    private final String schemaString;

    AvroRowMapper(String schemaString) {
   
        this.schemaString = schemaString;
    }

    @Override
    public GenericRecord map(Row row) {
   
        org.apache.avro.Schema schema = new org.apache.avro.Schema.Parser().parse(schemaString);
        GenericRecord rec = new GenericData.Record(schema);

        for (org.apache.avro.Schema.Field f : schema.getFields()) {
   
            String name = f.name();
            Object v = row.getField(name);
            org.apache.avro.Schema fs = unwrapNullable(f.schema());

            if (v == null) {
   
                rec.put(name, null);
                continue;
            }

            String logical = fs.getProp("logicalType");
            if ("timestamp-micros".equals(logical)) {
   
                rec.put(name, toEpochMicros(v));
                continue;
            } else if ("timestamp-millis".equals(logical)) {
   
                rec.put(name, toEpochMillis(v));
                continue;
            }

            switch (fs.getType()) {
   
                case STRING:
                    rec.put(name, v.toString());
                    break;
                case BOOLEAN:
                    rec.put(name, (Boolean) v);
                    break;
                case INT:
                    rec.put(name, ((Number) v).intValue());
                    break;
                case LONG:
                    rec.put(name, ((Number) v).longValue());
                    break;
                case FLOAT:
                    rec.put(name, ((Number) v).floatValue());
                    break;
                case DOUBLE:
                    rec.put(name, ((Number) v).doubleValue());
                    break;
                case BYTES:
                    if (v instanceof byte[])
                        rec.put(name, java.nio.ByteBuffer.wrap((byte[]) v));
                    else
                        rec.put(name, v);
                    break;
                case ENUM:
                    rec.put(name, new GenericData.EnumSymbol(fs, v.toString()));
                    break;
                default:
                    rec.put(name, v);
            }
        }
        return rec;
    }

    private static long toEpochMillis(Object v) {
   
        if (v instanceof java.time.LocalDateTime) {
   
            return ((java.time.LocalDateTime) v).toInstant(java.time.ZoneOffset.UTC).toEpochMilli();
        } else if (v instanceof java.time.Instant) {
   
            return ((java.time.Instant) v).toEpochMilli();
        } else if (v instanceof Number) {
   
            return ((Number) v).longValue();
        }
        throw new IllegalArgumentException("Unsupported timestamp-millis value: " + v);
    }

    private static long toEpochMicros(Object v) {
   
        if (v instanceof java.time.LocalDateTime) {
   
            long millis = ((java.time.LocalDateTime) v).toInstant(java.time.ZoneOffset.UTC).toEpochMilli();
            return millis * 1_000L;
        } else if (v instanceof java.time.Instant) {
   
            long millis = ((java.time.Instant) v).toEpochMilli();
            return millis * 1_000L;
        } else if (v instanceof Number) {
   
            return ((Number) v).longValue();
        }
        throw new IllegalArgumentException("Unsupported timestamp-micros value: " + v);
    }

    private static org.apache.avro.Schema unwrapNullable(org.apache.avro.Schema s) {
   
        if (s.getType() == org.apache.avro.Schema.Type.UNION) {
   
            for (org.apache.avro.Schema t : s.getTypes()) {
   
                if (t.getType() != org.apache.avro.Schema.Type.NULL)
                    return t;
            }
        }
        return s;
    }
}

创建一个 Kafka 接收器,将记录作为 Avro 编码消息返回到 Kafka。

package com.example.flink.sink;

import org.apache.avro.generic.GenericRecord;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;

public class KafkaSinkFactory {
   
    public static KafkaSink<GenericRecord> createKafkaSink(
            String bootstrapServers,
            String topic,
            String subject,
            org.apache.avro.Schema schema,
            String schemaRegistryUrl) {
   
        return KafkaSink.<GenericRecord>builder()
                .setBootstrapServers(bootstrapServers)
                .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                .setRecordSerializer(
                    KafkaRecordSerializationSchema.builder()
                        .setTopic(topic)
                        .setValueSerializationSchema(
                            ConfluentRegistryAvroSerializationSchema.forGeneric(
                                subject, schema, schemaRegistryUrl
                            )
                        )
                        .build()
                )
                .build();
    }
}

预定义的映射函数可在主应用程序中如下使用:

// 将Table转换为Row流
DataStream<Row> rows = tableEnv.toDataStream(result);

// 使用上面的最小映射器将行映射到GenericRecord
String avroSchemaString = /* your Avro schema JSON */;
DataStream<GenericRecord> records = rows.map(new AvroRowMapper(avroSchemaString))
    .returns(new GenericRecordAvroTypeInfo(outputSchema));

// 构建一个将GenericRecord与Confluent Avro一起写入的Kafka接收器
KafkaSink<GenericRecord> sink = KafkaSinkFactory(bootstrapServers,topic,subject,outputSchema,registryUrl)

// 发送它
records.sinkTo(sink);
相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
2月前
|
消息中间件 存储 Kafka
流、表与“二元性”的幻象
本文探讨流与表的“二元性”本质,指出实现该特性需具备主键、变更日志语义和物化能力。强调Kafka与Iceberg因缺乏更新语义和主键支持,无法真正实现二元性,唯有统一系统如Flink、Paimon或Fluss才能无缝融合流与表。
224 7
流、表与“二元性”的幻象
|
23天前
|
人工智能 数据处理 Apache
Forrester发布流式数据平台报告:Flink 创始团队跻身领导者行列,实时AI能力获权威认可
Ververica,由Apache Flink创始团队创立、阿里云旗下企业,首次入选Forrester 2025流式数据平台领导者象限,凭借在实时AI与流处理领域的技术创新及全场景部署能力获高度认可,成为全球企业构建实时数据基础设施的核心选择。
Forrester发布流式数据平台报告:Flink 创始团队跻身领导者行列,实时AI能力获权威认可
|
3月前
|
存储 分布式计算 运维
云栖实录|驰骋在数据洪流上:Flink+Hologres驱动零跑科技实时计算的应用与实践
零跑科技基于Flink构建一体化实时计算平台,应对智能网联汽车海量数据挑战。从车机信号实时分析到故障诊断,实现分钟级向秒级跃迁,提升性能3-5倍,降低存储成本。通过Flink+Hologres+MaxCompute技术栈,打造高效、稳定、可扩展的实时数仓,支撑100万台量产车背后的数据驱动决策,并迈向流批一体与AI融合的未来架构。
262 2
云栖实录|驰骋在数据洪流上:Flink+Hologres驱动零跑科技实时计算的应用与实践
|
5月前
|
存储 消息中间件 人工智能
Lazada 如何用实时计算 Flink + Hologres 构建实时商品选品平台
本文整理自 Lazada Group EVP 及供应链技术负责人陈立群在 Flink Forward Asia 2025 新加坡实时分析专场的分享。作为东南亚领先的电商平台,Lazada 面临在六国管理数十亿商品 SKU 的挑战。为实现毫秒级数据驱动决策,Lazada 基于阿里云实时计算 Flink 和 Hologres 打造端到端实时商品选品平台,支撑日常运营与大促期间分钟级响应。本文深入解析该平台如何通过流式处理与实时分析技术重构电商数据架构,实现从“事后分析”到“事中调控”的跃迁。
495 55
Lazada 如何用实时计算 Flink + Hologres 构建实时商品选品平台
|
3月前
|
人工智能 运维 监控
Flink 智能调优:从人工运维到自动化的实践之路
本文由阿里云Flink产品专家黄睿撰写,基于平台实践经验,深入解析流计算作业资源调优难题。针对人工调优效率低、业务波动影响大等挑战,介绍Flink自动调优架构设计,涵盖监控、定时、智能三种模式,并融合混合计费实现成本优化。展望未来AI化方向,推动运维智能化升级。
636 7
Flink 智能调优:从人工运维到自动化的实践之路
|
存储 Cloud Native 数据处理
从嵌入式状态管理到云原生架构:Apache Flink 的演进与下一代增量计算范式
本文整理自阿里云资深技术专家、Apache Flink PMC 成员梅源在 Flink Forward Asia 新加坡 2025上的分享,深入解析 Flink 状态管理系统的发展历程,从核心设计到 Flink 2.0 存算分离架构,并展望未来基于流批一体的通用增量计算方向。
379 0
从嵌入式状态管理到云原生架构:Apache Flink 的演进与下一代增量计算范式
|
5月前
|
SQL 人工智能 数据挖掘
Apache Flink:从实时数据分析到实时AI
Apache Flink 是实时数据处理领域的核心技术,历经十年发展,已从学术项目成长为实时计算的事实标准。它在现代数据架构中发挥着关键作用,支持实时数据分析、湖仓集成及实时 AI 应用。随着 Flink 2.0 的发布,其在流式湖仓、AI 驱动决策等方面展现出强大潜力,正推动企业迈向智能化、实时化的新阶段。
649 9
Apache Flink:从实时数据分析到实时AI