大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(已更完)

Spark(已更完)

Flink(已更完)

ClickHouse(已更完)

Kudu(正在更新…)

章节内容

上节我们完成了如下的内容:


Kudu Java API

增删改查 编写案例测试

实现思路

将数据从 Flink 下沉到 Kudu 的基本思路如下:


环境准备:确保 Flink 和 Kudu 环境正常运行,并配置好相关依赖。

创建 Kudu 表:在 Kudu 中定义要存储的数据表,包括主键和列类型。

数据流设计:使用 Flink 的 DataStream API 读取输入数据流,进行必要的数据处理和转换。

写入 Kudu:通过 Kudu 的连接器将处理后的数据写入 Kudu 表。需要配置 Kudu 客户端和表的相关信息。

执行作业:启动 Flink 作业,实时将数据流中的数据写入 Kudu,便于后续查询和分析。

添加依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flink-test</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <flink.version>1.11.1</flink.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kudu</groupId>
            <artifactId>kudu-client</artifactId>
            <version>1.17.0</version>
        </dependency>

    </dependencies>
</project>

数据源

new UserInfo("001", "Jack", 18),
new UserInfo("002", "Rose", 20),
new UserInfo("003", "Cris", 22),
new UserInfo("004", "Lily", 19),
new UserInfo("005", "Lucy", 21),
new UserInfo("006", "Json", 24),

自定义下沉器

package icu.wzk.kudu;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.*;
import org.apache.log4j.Logger;

import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.util.Map;


public class MyFlinkSinkToKudu extends RichSinkFunction<Map<String, Object>> {

    private final static Logger logger = Logger.getLogger("MyFlinkSinkToKudu");

    private KuduClient kuduClient;
    private KuduTable kuduTable;

    private String kuduMasterAddr;
    private String tableName;
    private Schema schema;
    private KuduSession kuduSession;
    private ByteArrayOutputStream out;
    private ObjectOutputStream os;

    public MyFlinkSinkToKudu(String kuduMasterAddr, String tableName) {
        this.kuduMasterAddr = kuduMasterAddr;
        this.tableName = tableName;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        out = new ByteArrayOutputStream();
        os = new ObjectOutputStream(out);
        kuduClient = new KuduClient.KuduClientBuilder(kuduMasterAddr).build();
        kuduTable = kuduClient.openTable(tableName);
        schema = kuduTable.getSchema();
        kuduSession = kuduClient.newSession();
        kuduSession.setFlushMode(KuduSession.FlushMode.AUTO_FLUSH_BACKGROUND);
    }

    @Override
    public void invoke(Map<String, Object> map, Context context) throws Exception {
        if (null == map) {
            return;
        }
        try {
            int columnCount = schema.getColumnCount();
            Insert insert = kuduTable.newInsert();
            PartialRow row = insert.getRow();
            for (int i = 0; i < columnCount; i ++) {
                Object value = map.get(schema.getColumnByIndex(i).getName());
                insertData(row, schema.getColumnByIndex(i).getType(), schema.getColumnByIndex(i).getName(), value);
                OperationResponse response = kuduSession.apply(insert);
                if (null != response) {
                    logger.error(response.getRowError().toString());
                }
            }
        } catch (Exception e) {
            logger.error(e);
        }
    }

    @Override
    public void close() throws Exception {
        try {
            kuduSession.close();
            kuduClient.close();
            os.close();
            out.close();
        } catch (Exception e) {
            logger.error(e);
        }
    }

    private void insertData(PartialRow row, Type type, String columnName, Object value) {
        try {
            switch (type) {
                case STRING:
                    row.addString(columnName, value.toString());
                    return;
                case INT32:
                    row.addInt(columnName, Integer.valueOf(value.toString()));
                    return;
                case INT64:
                    row.addLong(columnName, Long.valueOf(value.toString()));
                    return;
                case DOUBLE:
                    row.addDouble(columnName, Double.valueOf(value.toString()));
                    return;
                case BOOL:
                    row.addBoolean(columnName, Boolean.valueOf(value.toString()));
                    return;
                case BINARY:
                    os.writeObject(value);
                    row.addBinary(columnName, out.toByteArray());
                    return;
                case FLOAT:
                    row.addFloat(columnName, Float.valueOf(value.toString()));
                default:
                    throw new UnsupportedOperationException("Unknown Type: " + type);
            }

        } catch (Exception e) {
            logger.error("插入数据异常: " + e);
        }
    }
}

编写实体

package icu.wzk.kudu;

public class UserInfo {

    private String id;

    private String name;

    private Integer age;

    public UserInfo(String id, String name, Integer age) {
        this.id = id;
        this.name = name;
        this.age = age;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Integer getAge() {
        return age;
    }

    public void setAge(Integer age) {
        this.age = age;
    }
}

执行建表

package icu.wzk.kudu;

import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.CreateTableOptions;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;

import java.util.ArrayList;
import java.util.List;

public class KuduCreateTable {

    public static void main(String[] args) throws KuduException {
        String masterAddress = "localhost:7051,localhost:7151,localhost:7251";
        KuduClient.KuduClientBuilder kuduClientBuilder = new KuduClient.KuduClientBuilder(masterAddress);
        KuduClient kuduClient = kuduClientBuilder.build();

        String tableName = "user";
        List<ColumnSchema> columnSchemas = new ArrayList<>();
        ColumnSchema id = new ColumnSchema
                .ColumnSchemaBuilder("id", Type.INT32)
                .key(true)
                .build();
        columnSchemas.add(id);
        ColumnSchema name = new ColumnSchema
                .ColumnSchemaBuilder("name", Type.STRING)
                .key(false)
                .build();
        columnSchemas.add(name);
        ColumnSchema age = new ColumnSchema
                .ColumnSchemaBuilder("age", Type.INT32)
                .key(false)
                .build();
        columnSchemas.add(age);

        Schema schema = new Schema(columnSchemas);
        CreateTableOptions options = new CreateTableOptions();
        // 副本数量为1
        options.setNumReplicas(1);
        List<String> colrule = new ArrayList<>();
        colrule.add("id");
        options.addHashPartitions(colrule, 3);

        kuduClient.createTable(tableName, schema, options);
        kuduClient.close();
    }

}

主逻辑代码

package icu.wzk.kudu;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;

public class SinkToKuduTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<UserInfo> dataSource = env.fromElements(
                new UserInfo("001", "Jack", 18),
                new UserInfo("002", "Rose", 20),
                new UserInfo("003", "Cris", 22),
                new UserInfo("004", "Lily", 19),
                new UserInfo("005", "Lucy", 21),
                new UserInfo("006", "Json", 24)
        );
        SingleOutputStreamOperator<Map<String, Object>> mapSource = dataSource
                .map(new MapFunction<UserInfo, Map<String, Object>>() {
                    @Override
                    public Map<String, Object> map(UserInfo value) throws Exception {
                        Map<String, Object> map = new HashMap<>();
                        map.put("id", value.getId());
                        map.put("name", value.getName());
                        map.put("age", value.getAge());
                        return map;
                    }
                });

        String kuduMasterAddr = "localhost:7051,localhost:7151,localhost:7251";
        String tableInfo = "user";
        mapSource.addSink(new MyFlinkSinkToKudu(kuduMasterAddr, tableInfo));

        env.execute("SinkToKuduTest");
    }

}

解释分析

环境设置

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();:初始化 Flink 的执行环境,这是 Flink 应用的入口。


数据源创建

DataStreamSource dataSource = env.fromElements(…):创建了一个包含多个 UserInfo 对象的数据源,模拟了一个输入流。


数据转换

SingleOutputStreamOperator<Map<String, Object>> mapSource = dataSource.map(…):使用 map 函数将 UserInfo 对象转换为 Map<String, Object>,便于后续处理和写入 Kudu。每个 UserInfo 的属性都被放入一个 HashMap 中。


Kudu 配置信息

String kuduMasterAddr = “localhost:7051,localhost:7151,localhost:7251”; 和 String tableInfo = “user”;:定义 Kudu 的主节点地址和目标表的信息。


数据下沉

mapSource.addSink(new MyFlinkSinkToKudu(kuduMasterAddr, tableInfo));:将转换后的数据流添加到 Kudu 的自定义 Sink 中。MyFlinkSinkToKudu 类应该实现了将数据写入 Kudu 的逻辑。


执行作业

env.execute(“SinkToKuduTest”);:启动 Flink 作业,执行整个数据流处理流程。


测试运行

先运行建表

再运行主逻辑

我们建表之后,确认user表存在。然后我们运行Flink程序,将数据写入Kudu。

确认有表后,执行 Flink 程序:

注意事项

并发性:根据 Kudu 集群的规模和配置,可以调整 Flink 作业的并发性,以提高写入性能。

批量写入:Kudu 支持批量插入,可以通过适当配置 Flink 的 sink 来提高性能。

故障处理:确保在作业中处理异常和重试逻辑,以确保数据不会丢失。

监控与调试:使用 Flink 的监控工具和 Kudu 的工具(如 Kudu UI)来监控数据流和性能。


相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
打赏
0
0
1
0
103
分享
相关文章
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
483 33
The Past, Present and Future of Apache Flink
【YashanDB知识库】Flink CDC实时同步Oracle数据到崖山
本文介绍通过Flink CDC实现Oracle数据实时同步至崖山数据库(YashanDB)的方法,支持全量与增量同步,并涵盖新增、修改和删除的DML操作。内容包括环境准备(如JDK、Flink版本等)、Oracle日志归档启用、用户权限配置、增量日志记录设置、元数据迁移、Flink安装与配置、生成Flink SQL文件、Streampark部署,以及创建和启动实时同步任务的具体步骤。适合需要跨数据库实时同步方案的技术人员参考。
【YashanDB知识库】Flink CDC实时同步Oracle数据到崖山
Apache Flink 2.0.0: 实时数据处理的新纪元
Apache Flink 2.0.0 正式发布!这是自 Flink 1.0 发布九年以来的首次重大更新,凝聚了社区两年的努力。此版本引入分离式状态管理、物化表、流批统一等创新功能,优化云原生环境下的资源利用与性能表现,并强化了对人工智能工作流的支持。同时,Flink 2.0 对 API 和配置进行了全面清理,移除了过时组件,为未来的发展奠定了坚实基础。感谢 165 位贡献者的辛勤付出,共同推动实时计算进入新纪元!
177 1
Apache Flink 2.0.0: 实时数据处理的新纪元
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
341 2
ClickHouse与大数据生态集成:Spark & Flink 实战
SpringBoot 通过集成 Flink CDC 来实时追踪 MySql 数据变动
通过详细的步骤和示例代码,您可以在 SpringBoot 项目中成功集成 Flink CDC,并实时追踪 MySQL 数据库的变动。
332 43
Flink 基础详解:大数据处理的强大引擎
Apache Flink 是一个分布式流批一体化的开源平台,专为大规模数据处理设计。它支持实时流处理和批处理,具有高吞吐量、低延迟特性。Flink 提供统一的编程抽象,简化大数据应用开发,并在流处理方面表现卓越,广泛应用于实时监控、金融交易分析等场景。其架构包括 JobManager、TaskManager 和 Client,支持并行度、水位线、时间语义等基础属性。Flink 还提供了丰富的算子、状态管理和容错机制,如检查点和 Savepoint,确保作业的可靠性和一致性。此外,Flink 支持 SQL 查询和 CDC 功能,实现实时数据捕获与同步,广泛应用于数据仓库和实时数据分析领域。
432 32
您有一份 Apache Flink 社区年度报告请查收~
您有一份 Apache Flink 社区年度报告请查收~
Apache Flink 2.0:Streaming into the Future
本文整理自阿里云智能高级技术专家宋辛童、资深技术专家梅源和高级技术专家李麟在 Flink Forward Asia 2024 主会场的分享。三位专家详细介绍了 Flink 2.0 的四大技术方向:Streaming、Stream-Batch Unification、Streaming Lakehouse 和 AI。主要内容包括 Flink 2.0 的存算分离云原生化、流批一体的 Materialized Table、Flink 与 Paimon 的深度集成,以及 Flink 在 AI 领域的应用。
800 13
Apache Flink 2.0:Streaming into the Future
flink 向doris 数据库写入数据时出现背压如何排查?
本文介绍了如何确定和解决Flink任务向Doris数据库写入数据时遇到的背压问题。首先通过Flink Web UI和性能指标监控识别背压,然后从Doris数据库性能、网络连接稳定性、Flink任务数据处理逻辑及资源配置等方面排查原因,并通过分析相关日志进一步定位问题。
397 61

推荐镜像

更多