DataHub Flink Connector

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 我们在阿里云上使用DataHub作为Flink程序输入输出的消息队列,使用成本比较低,但由于是阿里云的云产品,周边生态做的不是很好,Flink Stream的Connector并没有开源出来。因此本人参照RocketMQ Flink Connector写了DataHub的Flink Connector。

DataHub Flink Connector

概述

GitHub:https://github.com/CharleyWuCL/datahub-flink-connector

Author: Charley Wu

E-Mail: charleywu@aliyun.com

Blog: www.charleywu.com

简介

我们在阿里云上使用DataHub作为Flink程序输入输出的消息队列,使用成本比较低,但由于是阿里云的云产品,周边生态做的不是很好,Flink Stream的Connector并没有开源出来。因此本人参照RocketMQ Flink Connector写了DataHub的Flink Connector。

DataHub主要提供两个SDK,aliyun-sdk-datahubdatahub-client-library;前者是较为基础的SDK,提供了丰富的DataHub操作接口,使用较为复杂,需要对DataHub有较为深入的了解;后者在前者的基础上进行了封装,提供Consumer和Producer进行协同消费及生产。本SDK使用的是后者。

依赖介绍

  • Flink:org.apache.flink:flink-streaming-java_2.12:1.13.1
  • datahub-client-library: com.aliyun.datahub:datahub-client-library:1.2.0-public
  • aliyun-sdk-datahub: com.aliyun.datahub:aliyun-sdk-datahub:2.21.6-public
 <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <java.version>1.8</java.version>

    <lombok.version>1.18.22</lombok.version>

    <flink.version>1.13.1</flink.version>
    <scala.binary.version>2.12</scala.binary.version>

    <datahub-sdk.version>2.21.6-public</datahub-sdk.version>
    <datahub-library.version>1.2.0-public</datahub-library.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <version>${lombok.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
      <scope>compile</scope>
    </dependency>

    <dependency>
      <groupId>com.aliyun.datahub</groupId>
      <artifactId>datahub-client-library</artifactId>
      <version>${datahub-library.version}</version>
      <exclusions>
        <exclusion>
          <groupId>com.aliyun.datahub</groupId>
          <artifactId>aliyun-sdk-datahub</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

    <dependency>
      <groupId>com.aliyun.datahub</groupId>
      <artifactId>aliyun-sdk-datahub</artifactId>
      <version>${datahub-sdk.version}</version>
    </dependency>
  </dependencies>

Connector使用

依赖

目前还没有上传中央仓库,需使用者自行下载代码进行编译。

<dependency>
    <groupId>charley.wu</groupId>
    <artifactId>datahub-flink-connector</artifactId>
    <version>1.0-SNAPSHOT</version>
</dependency>

Source

  /**
   * DataHub Source
   *
   * @return new DataHubSource.
   */
  public DataHubSource<Input> createDataHubSource() {
    // Validate common params.
    Validate.isTrue(props.containsKey(DataHubConfig.ENDPOINT), "DataHub endpoint can not be null");
    Validate.isTrue(props.containsKey(DataHubConfig.ACCESS_ID), "DataHub accessId can not be null");
    Validate.isTrue(props.containsKey(DataHubConfig.ACCESS_KEY), "DataHub accessKey can not be null");

    // Validate source params.
    Validate.isTrue(props.containsKey(DataHubConfig.SOURCE_PROJECT), "DataHub project can not be null");
    Validate.isTrue(props.containsKey(DataHubConfig.SOURCE_TOPIC), "DataHub topic can not be null");
    Validate.isTrue(props.containsKey(DataHubConfig.SOURCE_SUBID), "DataHub subId can not be null");

    // Return new DataHub Source.
    return new DataHubSource<>(props, new DataHubSourceDeserializer());
  }

Sink

public DataHubSink<Output> createDataHubSink() {
    Validate.isTrue(props.containsKey(DataHubConfig.ENDPOINT), "DataHub endpoint can not be null");
    Validate.isTrue(props.containsKey(DataHubConfig.ACCESS_ID), "DataHub accessId can not be null");
    Validate.isTrue(props.containsKey(DataHubConfig.ACCESS_KEY), "DataHub accessKey can not be null");
    Validate.isTrue(props.containsKey(DataHubConfig.SINK_PROJECT), "DataHub project can not be null");
    Validate.isTrue(props.containsKey(DataHubConfig.SINK_TOPIC), "DataHub topic can not be null");

    DataHubSink<Output> sink = new DataHubSink<>(props, new DataHubSinkSerializer(), new DataHubShardSelector());
    boolean batchEnable = ConfigUtil.getBoolean(props, DataHubConfig.BATCH_ENABLE, DataHubConfig.DEFAULT_BATCH_ENABLE);
    sink.setBatchFlushOnCheckpoint(batchEnable);
    return sink;
  }

Job

/**
 * Test Job
 *
 * @author Charley Wu
 * @since 2021/11/05
 */
public class TestFlinkJob {

  public static void main(String[] args) throws Exception {
    Properties customParams = ConfigUtil.getPropertiesParams();
    ConnectorManager connector = new ConnectorManager(customParams);

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // enable checkpoint, default 180s.
    env.enableCheckpointing(180000);

    // Source data stream
    env.addSource(connector.createDataHubSource())
        .map(new Transfer())
        .addSink(connector.createDataHubSink());

    // Execute job.
    env.execute("job");
  }
}

案例链接

https://github.com/CharleyWuCL/datahub-flink-connector/tree/master/src/test/java/charley/wu/flink/datahub/core

DataHub工具

使用DataHub过程中,发现在控制台进行Topic创建是一个非常痛苦的过程,因此写了一个小工具,帮助创建Tpoic。

注解

将DataHub Topic进行实体化,提供两个注解定义Topic及Field。

@DataHubTopic

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface DataHubTopic {

  // Topic名称
  String name();

  // Topic分片数量
  int shardNum();

  // Topic描述
  String comment();
}

@DataHubField

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface DataHubField {

  // 别名,对应下划线分割的字段名
  String alias();

  // 类型,DataHub字段类型
  FieldType type();

  // 字段描述
  String comment() default "";
}

TopicCreator

// 1、构造函数
// 主要传入访问DataHub的阿里云AccessID和AccessKey
public TopicCreator(String accessId, String accessKey) 

// 2、创建方法
/**
   *  Topic 创建方法
   *
   * @param project                    Topic所在DataHub项目名称
   * @param topicClass             Topic对应实体类Class
   * @param needEventTime       是否增加event_time,Timestamp类型,微秒um
   * @param <T>                         实体类泛型     
   * @throws Exception             异常
   */
public <T> void createTopic(String project, Class<T> topicClass, boolean needEventTime)

使用

定义实体类

@Data
@DataHubTopic(name = "tp_test", shardNum = 4, comment = "DataHub测试Topic")
public class TestTopic implements Serializable {

  @DataHubField(alias = "name", type = FieldType.STRING)
  private String name;

  @DataHubField(alias = "age", type = FieldType.BIGINT)
  private Integer age;

  @DataHubField(alias = "money", type = FieldType.DOUBLE)
  private Double money;

  @DataHubField(alias = "create_time", type = FieldType.STRING)
  private String createTime;
}

创建方法

public class TopicCreatorTest{

  public static void main(String[] args) {
    try {
      TopicCreator creator = new TopicCreator("阿里云AccessId", "阿里云AccessKey");
      creator.createTopic("test_project", TestMessage.class, true);
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}
目录
相关文章
|
4月前
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。
|
5月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之如何配置Connector来保持与MySOL一致
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
消息中间件 存储 关系型数据库
实时计算 Flink版产品使用问题之如何使用Kafka Connector将数据写入到Kafka
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
监控 NoSQL 数据处理
实时计算 Flink版产品使用问题之要如何将Flink整库同步到DataHub中
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
消息中间件 Kafka 数据处理
实时计算 Flink版操作报错合集之使用kafka connector时,报错:java.lang.ClassNotFoundException,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
6月前
|
SQL 网络协议 网络安全
实时计算 Flink版产品使用问题之如何将Flink整库同步到DataHub中
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
消息中间件 SQL 大数据
实时计算 Flink版产品使用问题之Flink+DataHub+Hologres相比于Flink+Hologres加入了DataHub组件,有什么优势
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
消息中间件 SQL Java
Flink自定义Connector
Flink自定义Connector
505 0
|
8月前
|
消息中间件 Oracle 关系型数据库
实时计算 Flink版产品使用合集之如果想自定义connector和pipeline要如何入手
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
SQL 存储 NoSQL
Flink SQL 自定义 redis connector
一般情况下,我们不需要创建新的 connector,因为 Flink SQL 已经内置了丰富的 connector 供我们使用,但是在实际生产环境中我们的存储是多种多样的,所以原生的 connector 并不能满足所有用户的需求,这个时候就需要我们自定义 connector,这篇文章的重点就是介绍一下如何实现自定义 Flink SQL connector ? 先来看一下官网的一张 connector 架构图:
Flink SQL 自定义 redis connector