基于flink 的LakeHouse湖仓一体平台

简介: 基于flink 的LakeHouse湖仓一体平台

介绍

Flink CDC

CDC全称是Change Data Capture,捕获变更数据,是一个比较广泛的概念,只要是能够捕获所有数据的变化,比如数据库捕获完整的变更日志记录增、删、改等,都可以称为CDC。该功能被广泛应用于数据同步、更新缓存、微服务间同步数据等场景,本文主要介绍基于Flink CDC在数据实时同步场景下的应用。

通过以上分析,基于Flink SQL CDC的数据同步有如下优点:

  • 业务解耦:无需入侵业务,和业务完全解耦,也就是业务端无感知数据同步的存在。
  • 性能消耗:业务数据库性能消耗小,数据同步延迟低。
  • 同步易用:使用SQL方式执行CDC同步任务,极大的降低使用维护门槛。
  • 数据完整:完整的数据库变更记录,不会丢失任何记录,Flink 自身支持 Exactly Once。

HUDI

Apache Hudi (发音为 “Hoodie”)在 DFS 的数据集上提供以下流原语:

  • 插入更新 (如何改变数据集?)
  • 增量拉取 (如何获取变更的数据?)

Hudi 维护在数据集上执行的所有操作的时间轴 (timeline),以提供数据集的即时视图。Hudi 将数据集组织到与 Hive 表非常相似的基本路径下的目录结构中。数据集分为多个分区,文件夹包含该分区的文件。每个分区均由相对于基本路径的分区路径唯一标识。

分区记录会被分配到多个文件。每个文件都有一个唯一的文件 ID 和生成该文件的提交 (commit)。如果有更新,则多个文件共享相同的文件 ID,但写入时的提交 (commit) 不同。

Hudi 解决了以下限制:

  • HDFS 的可伸缩性限制;
  • 需要在 Hadoop 中更快地呈现数据;
  • 没有直接支持对现有数据的更新和删除;
  • 快速的 ETL 和建模;
  • 要检索所有更新的记录,无论这些更新是添加到最近日期分区的新记录还是对旧数据的更新,Hudi 都允许用户使用最后一个检查点时间戳。此过程不用执行扫描整个源表的查询。

Hudi的优势:

  • HDFS 中的可伸缩性限制;
  • Hadoop 中数据的快速呈现;
  • 支持对于现有数据的更新和删除;
  • 快速的 ETL 和建模。

Pulsar

Pulsar 是一个用于服务器到服务器的流原生消息系统,具有多租户、高性能等优势。Pulsar 最初由 Yahoo 开发,目前由 Apache 软件基金会管理。

Pulsar 的关键特性如下:

  • Pulsar 的单个实例原生支持多个集群,可跨机房在集群间无缝地完成消息复制。
  • 极低的发布延迟和端到端延迟。
  • 可无缝扩展到超过一百万个 topic。
  • 简单的客户端 API,支持 Java、Go、Python 和 C++。
  • 支持多种 topic 订阅模式(独占订阅、共享订阅、故障转移订阅)。
  • 通过 Apache BookKeeper 提供的持久化消息存储机制保证消息传递 。
  • 由轻量级的 serverless 计算框架 Pulsar Functions 实现流原生的数据处理。
  • 基于 Pulsar Functions 的 serverless connector 框架 Pulsar IO 使得数据更易移入、移出 Apache Pulsar。
  • 分层式存储可在数据陈旧时,将数据从热存储卸载到冷/长期存储(如S3、GCS)中。

此处使用pulsar的原因:

1: 支持永久存储

2: 异域复制,存储计算分离

3: 能直接集成presto

4:更灵活的订阅模式。

5:多租户

60a6bcefe26f4b118e50f46e4d0afd1d.png

设计思想

MySQL 数据通过 Flink CDC 进入到 Pulsar。之所以数据先入 Pulsar 而不是直接入 Hudi,是为了实现多个实时任务复用 MySQL 过来的数据,避免多个任务通过 Flink CDC 接 MySQL 表以及 Binlog,对 MySQL 库的性能造成影响。



Kafka,HDFS数据同步到Pulsar.这一层作为实时数仓的ODS,用于记录最原始的数据,包含了所有业务的变更过程,以及比较细粒度的数据。此层包含了所有的原始业务数据,日志数据,规则数据等....


到pulsar的数据会同时按照实时数据仓库的链路,从 ODS->DWD->DWS->OLAP 数据库,最后供报表等数据服务使用。实时数仓的每一层结果数据会准实时的落一份到离线数仓,通过这种方式做到程序一次开发、指标口径统一,数据统一。而每一层的转化可以通过Flink/pulsar Functions或者集成presto来实现。



而存储在 Pulsar 的数据虽然可以永久存储,但是pulsar 针对upsert,delete,增量更新拉取等一些场景,仍旧不太完善。再者,如果把大量的历史数据再一次推到 pulsar,走实时计算的链路来修正历史数据,可能会影响当天的实时作业。所以针对重跑历史数据,会通过数据修正这一步来处理。而hudi本身依赖于hdfs,同步至hive非常容易。
总体上说,这个架构属于 Lambda 和 Kappa 混搭的架构。流批一体数据仓库的各个数据链路有数据质量校验的流程。第二天对前一天的数据进行对账,如果前一天实时计算的数据无异常,则不需要修正数据,Kappa 架构已经足够。

使用案例

    Flink CDC 2.0 也已经正式发布,此次的核心改进和提升包括:
  • 提供 MySQL CDC 2.0,核心 feature 包括
  • 并发读取,全量数据的读取性能可以水平扩展;
  • 全程无锁,不对线上业务产生锁的风险;
  • 断点续传,支持全量阶段的 checkpoint。
  • 版本搭配:
   flink
      hudi
    pulsar
   flink cdc 
    1.12.2      0.9.0     2.7.2
1.4
    1.13.1     0.10.0     2.8.1
2.0

准备前提:

Mysql 相关:

1:开启binlog

修改/etc/my.cnf文件

[myself@hadoop202 module]$ sudo vim /etc/my.cnf
server-id= 1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=gmall_flink_DW
注意:binlog-do-db根据自己的情况进行修改,指定具体要同步的数据库名字

2:授权

创建mysql用户

mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';

授予用户所需要的权限

mysql> GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';

注意:scan.incremental.snapshot.enabled启用后不再需要 RELOAD 权限(默认启用)。
最终确定的用户权限

FLUSH PRIVILEGES;

3: 为每个Reader 分配server ID

每个用于读取 binlog 的 MySQL 数据库客户端都应该有一个唯一的 id,称为服务器 id。MySQL 服务器将使用此 id 来维护网络连接和 binlog 位置。因此,如果不同的作业共享相同的服务器 id,可能会导致从错误的 binlog 位置读取。因此,建议通过SQL Hints为每个阅读器设置不同的服务器 id ,例如假设源并行度为 4,那么我们可以使用为 4 个源阅读器中的每一个分配唯一的服务器 id。
SELECT * FROM source_table /*+ OPTIONS('server-id'='5401-5404') */ ;

maven 相关依赖:

<!--cdc 连接器-->
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>2.0.0</version>
            <scope>provided</scope>
        </dependency>
<!--pulsar 连接器--> 
        <!-- https://mvnrepository.com/artifact/io.streamnative.connectors/pulsar-flink-connector -->
        <dependency>
            <groupId>io.streamnative.connectors</groupId>
            <artifactId>pulsar-flink-connector_2.11</artifactId>
            <version>1.13.1.2</version>
            <!--            <scope>provided</scope>-->
        </dependency>
 <!--hudi相关--> 
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.Hudi</groupId>
            <artifactId>Hudi-Flink-bundle_2.11</artifactId>
            <version>${Hudi.version}</version>
            <scope>system</scope>
            <systemPath>${project.basedir}/libs/Hudi-Flink-bundle_2.11-0.10.0-SNAPSHOT.jar</systemPath>
        </dependency>       

自定义cdc 反序列化器:

package csc;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.utils.TemporalConversions;
import io.debezium.data.Envelope;
import io.debezium.time.MicroTimestamp;
import io.debezium.time.NanoTimestamp;
import io.debezium.time.Timestamp;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.time.LocalDateTime;
import java.time.ZoneId;
/**
 * @description: 自定义cdc反序列化器,解析binlog转换成Maxwell形式的json字符串
 * @author: mdz
 * @date: 2021/8/17
 **/
public class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {
    ZoneId serverTimeZone;
    //反序列化方法
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
        //获取库名和表名
        String topic = sourceRecord.topic();
        String[] split = topic.split("\\.");
        String database = split[1];
        String table = split[2];
        //获取操作类op,insert,update,delete
        String operation = Envelope.operationFor(sourceRecord).toString().toLowerCase();
        //获取数据
        Struct value = (Struct) sourceRecord.value();
        Struct after = value.getStruct("after");
        Struct before = value.getStruct("before");
        //使用fastjson创建JSONObject存放数据
        JSONObject beforeJson = new JSONObject();
        JSONObject afterJson = new JSONObject();
        //如果操作是删除的话,after为空,需要获取before,用于后续同步删除操作
        outer:
        if (operation.equals("delete")) {
            Schema schema = before.schema();
            for (Field field : schema.fields()) {
                //遍历schema,取出各个字段数据
                beforeJson.put(field.name(),before.get(field.name()));
            }
        }else if (operation.equals("update")){
            Schema afterSchema = after.schema();
            for (Field field : afterSchema.fields()) {
                //遍历schema,取出各个字段数据
                afterJson.put(field.name(),after.get(field.name()));
            }
            Schema beforeSchema = before.schema();
            for (Field field : beforeSchema.fields()) {
                //遍历schema,取出各个字段数据
                beforeJson.put(field.name(),before.get(field.name()));
            }
        } else{//如果是插入没有before数据
            Schema afterSchema = after.schema();
            for (Field field : afterSchema.fields()) {
                //遍历schema,取出各个字段数据
                afterJson.put(field.name(),after.get(field.name()));
            }
        }
        //获取server_id
        String server_id = value.getStruct("source").get("server_id").toString();
        //获取主键
        JSONObject keyJson = null;
        if (sourceRecord.key() != null) {
            keyJson = new JSONObject();
            Struct key = (Struct) sourceRecord.key();
            Schema keySchema = key.schema();
            for (Field field : keySchema.fields()) {
                //遍历schema,取出各个字段数据
                if (operation.equals("delete")) {
                    keyJson.put(field.name(),before.get(field.name()));
                }else{
                    keyJson.put(field.name(),after.get(field.name()));
                }
            }
        }
        //获取时间戳
        String ts_ms = value.get("ts_ms").toString();
        //创建JSON用于存放最终结果
        JSONObject result = new JSONObject();
        result.put("database", database);
        result.put("table", table);
        result.put("type", operation);
        result.put("before", beforeJson);
        result.put("after", afterJson);
        result.put("server_id", server_id);
        result.put("ts_ms", ts_ms);
        result.put("key", keyJson);
        collector.collect(result.toJSONString());
    }
    //定义类型方法
    @Override
    public TypeInformation<String> getProducedType() {
        return TypeInformation.of(String.class);
    }
    // 解决cdc 相差八小时时区问题
    private TimestampData convertToTimestamp(Object dbzObj, Schema schema) {
        if (dbzObj instanceof Long) {
            switch (schema.name()) {
                case Timestamp.SCHEMA_NAME:
                    return TimestampData.fromEpochMillis((Long) dbzObj);
                case MicroTimestamp.SCHEMA_NAME:
                    long micro = (long) dbzObj;
                    return TimestampData.fromEpochMillis(micro / 1000, (int) (micro % 1000 * 1000));
                case NanoTimestamp.SCHEMA_NAME:
                    long nano = (long) dbzObj;
                    return TimestampData.fromEpochMillis(nano / 1000_000, (int) (nano % 1000_000));
            }
        }
        LocalDateTime localDateTime = TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone);
        return TimestampData.fromLocalDateTime(localDateTime);
    }
}

mysql 数据cdc 到pulsar:

import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSink;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions;
import org.apache.flink.streaming.connectors.pulsar.table.PulsarSinkSemantic;
import org.apache.flink.streaming.util.serialization.PulsarSerializationSchemaWrapper;
import org.apache.flink.table.api.DataTypes;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import java.util.Optional;
import java.util.Properties;
/**
 * Created with IntelliJ IDEA.
 *
 * @Author: mdz
 * @Param: $
 * @Date: 2021/10/16/14:08
 * @Description:
 * @version:
 **/
public class MysqlCDC2Pulsar {
    public static void main(String[] args) throws Exception {
        //读取配置文件
        ParameterTool parameters = null;
        ParameterTool parameters_tool = ParameterTool.fromArgs(args);
        //指定参数名:local_path
        String local_path = parameters_tool.get("configfile", null);
        if (StringUtils.isBlank(local_path)) {
            parameters = parameters_tool;
        } else {
            parameters = ParameterTool.fromPropertiesFile(local_path);
        }
        Properties debeziumProperties = new Properties();
        debeziumProperties.put("snapshot.locking.mode", "none");// do not use lock
//        debeziumProperties.put("scan.incremental.snapshot.enabled", "true");
//        debeziumProperties.put("scan.incremental.snapshot.chunk.size", 8096);
//        debeziumProperties.put("scan.snapshot.fetch.size", 1024);
        /**
         *  datastream API MySqlSource 只支持单并发,连接就一个,binaryLog Client也是一个。
         */
        DebeziumSourceFunction<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("hostName")
                .port(port)
                .databaseList("yourDatabase") // set captured database
                .tableList("yourDatabase.table1,yourDatabase.table2...") // set captured table
                .username("userName")
                .password("password")
                .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
                .debeziumProperties(debeziumProperties)
                .startupOptions(StartupOptions.initial())
                .serverId(100035)
                .serverTimeZone("UTC")
                .build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(3000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // 设置任务关闭时候保留最后一次checkpoint 的数据
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        // 指定ck 的自动重启策略
        env.setStateBackend(new FsStateBackend("hdfs://192.168.1.161:8020/cdc2.0-test/ck"));
        // 设置hdfs 的访问用户名
        System.setProperty("HADOOP_USER_NAME","hdfs");
        DataStreamSource<String> BinlogDS = env.addSource(mySqlSource);
        // 定义pulsar sink的一些参数
        String topic = parameters.get("pulsardcTopic");
        String adminUrl = parameters.get("pulsarAdminUrl");
        Properties props = new Properties();
        props.setProperty(PulsarOptions.TOPIC_SINGLE_OPTION_KEY, topic);
        props.setProperty(PulsarOptions.PARTITION_DISCOVERY_INTERVAL_MS_OPTION_KEY, "5000");
        props.setProperty(PulsarOptions.FLUSH_ON_CHECKPOINT_OPTION_KEY, "false");
        props.setProperty(PulsarOptions.ENABLE_KEY_HASH_RANGE_KEY, "true");
        props.setProperty("pulsar.producer.blockIfQueueFull", "true");
        props.setProperty("partition.discovery.interval-millis", "5000");     // 分区发现,定期检查topic分区情况
        props.setProperty("pulsar.reader.receiverQueueSize", "100000");
        props.setProperty("format", "debezium-json");
        //pulsar的连接及认证
        ClientConfigurationData conf = new ClientConfigurationData();
        conf.setServiceUrl(parameters.get("pulsarServiceUrl"));
        conf.setAuthPluginClassName("org.apache.pulsar.client.impl.auth.AuthenticationToken");
        conf.setAuthParams(parameters.get("pulsarToken"));
//        创建pulsar sink
        FlinkPulsarSink<String> pulsarSink = new FlinkPulsarSink<String>(
                adminUrl,
                Optional.of(topic),
                conf,
                props,
                new PulsarSerializationSchemaWrapper.Builder<>(new SimpleStringSchema()).useAtomicMode(DataTypes.STRING()).build()
                , PulsarSinkSemantic.EXACTLY_ONCE
        );
        String jobName = parameters.get("jobName");
        //sink 到pulsar
        BinlogDS.addSink(pulsarSink).name("ORGA_BI_ORGBASEINFO job");
        env.execute(jobName);
        env.execute("Print MySQL Snapshot + Binlog");
    }
}

pulsar2Hudi

....

常见错误及总结

snapshot.mode的各种参数,以下是测试效果
properties.setProperty("snapshot.mode", "never");//Encountered change event for table sensor_offset.offset_manager whose schema isn't known to this connector
properties.setProperty("snapshot.mode", "initial");每次重启都会读全量
properties.setProperty("snapshot.mode", "initial_only");//读不到数据
properties.setProperty("snapshot.mode", "when_needed");//跟initial效果类似
properties.setProperty("snapshot.mode", "schema_only");//只会记录最新的更改,历史全量读不到
properties.setProperty("snapshot.mode", "schema_only_recovery");//Could not find existing binlog information while attempting schema only recovery snapshot

MySQL CDC源等待超时

在扫描表期间,由于没有可恢复的位置,因此无法执行checkpoints。为了不执行检查点,MySQL CDC源将保持检查点等待超时。超时检查点将被识别为失败的检查点,默认情况下,这将触发Flink作业的故障转移。因此,如果数据库表很大,则建议添加以下Flink配置,以避免由于超时检查点而导致故障转移:

60a6bcefe26f4b118e50f46e4d0afd1d.png

execution.checkpointing.interval: 10min
execution.checkpointing.tolerable-failed-checkpoints: 100
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 214748364

Flink Connector mysql CDC sql-client模式下全库扫描,直到capturing到目标表,如果实例下库表比较多,会影响任务启动时效

可以通过缩小数据库权限避免扫描全库
-- 授予REPLICATION Slave权限
GRANT RELOAD,REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'username';
-- 授予目标库的访问权限
GRANT SELECT ON `your_single_database`.* TO 'username'@'%';
-- 刷新权限
FLUSH PRIVILEGES;

扫描全表阶段慢,在 Web UI 出现如下现象:

640.jpg

  • 原因:扫描全表阶段慢不一定是 cdc source 的问题,可能是下游节点处理太慢反压了。


configuration:
  table.exec.mini-batch.enabled: true
  table.exec.mini-batch.allow-latency: 2s
  table.exec.mini-batch.size: 5000
  table.optimizer.distinct-agg.split.enabled: true

    pulsar:

    Caused by: org.apache.pulsar.client.api.PulsarClientException$InvalidMessageException: Message size is bigger than 5242880 bytes

    在bookkeeper.conf中配置:
    nettyMaxFrameSizeBytes=10M Netty 传输过程中,单条消息的最大size。  默认5242880
    broker.conf中配置
    maxMessageSize=10M 单条消息的最大size。默认5242880

    pulsar Producer send queue is full

    程序中配置:
    pulsar.producer.blockIfQueueFull=true 

      Hudi

      ClassNotFoundException

      flink lib包下添加:
      ./lib/hadoop-mapreduce-client-core-2.7.3.jar
      hive auxlib下添加:
      Hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar





      相关实践学习
      阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
      云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
      相关文章
      |
      4月前
      |
      存储 消息中间件 人工智能
      Lazada 如何用实时计算 Flink + Hologres 构建实时商品选品平台
      本文整理自 Lazada Group EVP 及供应链技术负责人陈立群在 Flink Forward Asia 2025 新加坡实时分析专场的分享。作为东南亚领先的电商平台,Lazada 面临在六国管理数十亿商品 SKU 的挑战。为实现毫秒级数据驱动决策,Lazada 基于阿里云实时计算 Flink 和 Hologres 打造端到端实时商品选品平台,支撑日常运营与大促期间分钟级响应。本文深入解析该平台如何通过流式处理与实时分析技术重构电商数据架构,实现从“事后分析”到“事中调控”的跃迁。
      475 55
      Lazada 如何用实时计算 Flink + Hologres 构建实时商品选品平台
      |
      2月前
      |
      数据可视化 关系型数据库 MySQL
      基于python大数据的的海洋气象数据可视化平台
      针对海洋气象数据量大、维度多的挑战,设计基于ECharts的可视化平台,结合Python、Django与MySQL,实现数据高效展示与交互分析,提升科研与决策效率。
      |
      2月前
      |
      存储 消息中间件 人工智能
      云栖实录|实时计算 Flink 全新升级 - 全栈流处理平台助力实时智能
      本文根据 2025 云栖大会演讲整理而成,演讲信息如下 演讲人:黄鹏程 阿里云智能集团计算平台事业部实时计算Flink版产品负责人
      259 1
      云栖实录|实时计算 Flink 全新升级 - 全栈流处理平台助力实时智能
      |
      7月前
      |
      存储 消息中间件 OLAP
      基于 Flink+Paimon+Hologres 搭建淘天集团湖仓一体数据链路
      本文整理自淘天集团高级数据开发工程师朱奥在Flink Forward Asia 2024的分享,围绕实时数仓优化展开。内容涵盖项目背景、核心策略、解决方案、项目价值及未来计划五部分。通过引入Paimon和Hologres技术,解决当前流批存储不统一、实时数据可见性差等痛点,实现流批一体存储与高效近实时数据加工。项目显著提升了数据时效性和开发运维效率,降低了使用门槛与成本,并规划未来在集团内推广湖仓一体架构,探索更多技术创新场景。
      1499 3
      基于 Flink+Paimon+Hologres 搭建淘天集团湖仓一体数据链路
      |
      5月前
      |
      数据采集 人工智能 大数据
      10倍处理效率提升!阿里云大数据AI平台发布智能驾驶数据预处理解决方案
      阿里云大数据AI平台推出智能驾驶数据预处理解决方案,助力车企构建高效稳定的数据处理流程。相比自建方案,数据包处理效率提升10倍以上,推理任务提速超1倍,产能翻番,显著提高自动驾驶模型产出效率。该方案已服务80%以上中国车企,支持多模态数据处理与百万级任务调度,全面赋能智驾技术落地。
      637 0
      |
      7月前
      |
      消息中间件 运维 Kafka
      直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
      直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
      235 12
      |
      3月前
      |
      存储 JSON 数据处理
      Flink基于Paimon的实时湖仓解决方案的演进
      本文源自Apache CommunityOverCode Asia 2025,阿里云专家苏轩楠分享Flink与Paimon构建实时湖仓的演进实践。深度解析Variant数据类型、Lookup Join优化等关键技术,提升半结构化数据处理效率与系统可扩展性,推动实时湖仓在生产环境的高效落地。
      441 1
      Flink基于Paimon的实时湖仓解决方案的演进
      |
      2月前
      |
      传感器 人工智能 监控
      拔俗多模态跨尺度大数据AI分析平台:让复杂数据“开口说话”的智能引擎
      在数字化时代,多模态跨尺度大数据AI分析平台应运而生,打破数据孤岛,融合图像、文本、视频等多源信息,贯通微观与宏观尺度,实现智能诊断、预测与决策,广泛应用于医疗、制造、金融等领域,推动AI从“看懂”到“会思考”的跃迁。
      |
      3月前
      |
      存储 人工智能 监控
      淘宝闪购基于Flink&Paimon的Lakehouse生产实践:从实时数仓到湖仓一体化的演进之路
      本文整理自淘宝闪购(饿了么)大数据架构师王沛斌在 Flink Forward Asia 2025 上海站的分享,深度解析其基于 Apache Flink 与 Paimon 的 Lakehouse 架构演进与落地实践,涵盖实时数仓发展、技术选型、平台建设及未来展望。
      913 0
      淘宝闪购基于Flink&Paimon的Lakehouse生产实践:从实时数仓到湖仓一体化的演进之路

      热门文章

      最新文章