Flink自定义Connector

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: Flink自定义Connector

01 引言

因为最近需要使用自定义Connectors,所以参照了Flink官网的教程,整理了在Flink上如何实现自定义SourceSink

官网地址:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/sourcessinks/

首先我们需要知道动态表(dynamic tables)只是一个逻辑概念,Flink本身并不拥有数据。相反,动态表的内容存储在外部系统(如数据库、键值存储、消息队列)或文件中。

动态源(sources)和动态接收器(sink)可用于从外部系统读取数据和向外部系统写入数据,一般源(sources)和接收器(sink)通常用术语连接器connector)下进行总结

02 Connector概述

在许多情况下,开发人员不需要从头开始创建新的连接器(connector),只要稍微修改现有的连接器或钩子,就可以加载连接器到现有的堆栈。

Flink 实现Connector的流程如下:

从上图可以看到,Flink实现Connector连接器主要分为Metadata(元数据)、Planning(规划),Runtime(运行时提供者)三个部分的内容。

2.1 Metadata - 元数据模块

Table API 和 SQL 都是声明式 API,注意是表的声明。因此,如上图所示,在执行CREATE TABLE语句会导致目标目录Catalog中的元数据更新。

对于大多数目标目录Catalog实现,外部系统中的物理数据不会针对此类操作进行修改,因此Connector连接器的依赖项不必存在于类路径中,SQL语句中声明的选项WITH既不被验证也不被解释。

动态表的元数据(即通过 DDL 创建或由目录提供)表示为CatalogTable, 必要时,将在CatalogTable内部解析表名。

2.2 Planning - 规划模块

如上图所示,目标目录CatalogTable下一步在对表程序进行规划和优化时,CatalogTable需要将a解析为两种动态表类型,分别是:

  • DynamicTableSource:用于在查询中读取SELECT
  • DynamicTableSink:用于在语句中写入INSERT INTO

DynamicTableSourceFactoryDynamicTableSinkFactory提供特定于连接器的逻辑,用于将元数据转换为DynamicTableSourceDynamicTableSink实例。在大多数情况下,工厂的目的是验证选项(例如:DynamicTableSourceDynamicTableSink'port' = '5022'),配置编码/解码格式(如果需要),并创建表连接器的参数化实例。

默认情况下,使用 Java 的服务提供者接口 (SPI)实现DynamicTableSourceFactoryDynamicTableSinkFactory。连接器选项(例如 'connector' = 'custom')必须对应于有效的工厂标识符。

虽然它在类命名中可能并不明显,但DynamicTableSourceDynamicTableSink 可以看作是有状态的工厂,最终产生具体的运行时实现来读取/写入实际数据。

规划器planner使用源source和接收器sink实例来执行特定于连接器connector的双向通信,直到找到最佳逻辑规划。

根据可选声明的能力接口(例如 SupportsProjectionPushDownSupportsOverwrite),规划器planner可能会将更改应用于实例,从而改变生成的运行时实现。

2.3 Runtime - 运行时模块

一旦逻辑规划完成,规划器planner将从表连接器connector获取运行时实现。运行时逻辑在 Flink 的核心连接器接口中实现,例如InputFormatSourceFunction

这些接口按另一个抽象级别分组为ScanRuntimeProviderLookupRuntimeProvider和的子类SinkRuntimeProvider

例如:

  • OutputFormatProvider:providing org.apache.flink.api.common.io.OutputFormat
  • SinkFunctionProvider:providing org.apache.flink.streaming.api.functions.sink.SinkFunction

都是SinkRuntimeProvider 计划者Planner可以处理的具体实例。

03 Connector相关API

3.1 Dynamic Table Factories - 动态表工厂

动态表工厂用于根据目录和会话信息为外部存储系统配置动态表连接器

  • org.apache.flink.table.factories.DynamicTableSourceFactory :可以实现构造一个DynamicTableSource.
  • org.apache.flink.table.factories.DynamicTableSinkFactory:可以实现构造一个DynamicTableSink.

默认情况下,使用connector选项的值作为工厂标识符和Java的服务提供者接口(SPI)来发现工厂。

在 JAR 文件中,可以将对新实现的引用添加到服务文件中:

  • META-INF/services/org.apache.flink.table.factories.Factory

该框架由工厂标识符和请求的基类唯一标识的单个匹配工厂检查,例如:DynamicTableSourceFactory

如有必要,目录实现可以绕过工厂发现过程。为此,目录需要返回一个实例,该实例在org.apache.flink.table.catalog.Catalog#getFactory.

3.2 Dynamic Table Source - 动态表源

根据定义,动态表可以随时间变化。

在读取动态表时,内容可以被认为是:

  • 一个更改日志(有限或无限),所有更改都会持续使用,直到更改日志用完,这由ScanTableSource接口表示。
  • 一个不断变化的或非常大的外部表,其内容通常不会被完全读取,而是在必要时查询单个值 ,这由LookupTableSource 接口表示。

一个类可以同时实现这两个接口,规划器根据指定的查询决定它们的使用。

3.2.1 Scan Table Source - 扫描表源

ScanTableSource(扫描表源)在运行时扫描来自外部存储系统的所有行。

扫描的行不一定只包含插入,还可以包含更新和删除。因此,表源可用于读取(有限或无限)变更日志。返回的更改日志模式指示计划程序在运行时可以预期的一组更改。

  • 对于常规的批处理场景,源可以发出有限的仅插入行流。
  • 对于常规流式处理方案,源可以发出无限制的仅插入行流。
  • 对于变更数据捕获 (CDC) 方案,源可以发出带有插入、更新和删除行的有界或无界流。

Table Source可以实现进一步的能力接口,SupportsProjectionPushDown在规划期间可能会改变实例。所有能力都可以在org.apache.flink.table.connector.source.abilities 包中找到并列在源能力表中。

运行时实现的ScanTableSource必须产生内部数据结构,因此,记录必须以org.apache.flink.table.data.RowData. 该框架提供了运行时转换器,因此源仍然可以处理常见的数据结构并在最后执行转换。

3.2.2 Lookup Table Source - 查找表源

Lookup TableSource在运行时通过一个或多个键查找外部存储系统的行。

  • 与ScanTableSource相比,源不必读取整个表,并且可以在必要时从(可能不断变化的)外部表中懒惰地获取单个值。
  • 与ScanTableSource相比, LookupTableSource当前仅支持发出仅插入更改。

LookupTableSource 的运行时由 TableFunction或AsyncTableFunction实现,该函数将在运行时使用给定查找键的值调用。

接口 描述
SupportsFilterPushDown 启用将过滤器下推到DynamicTableSource. 为了提高效率,源可以将过滤器进一步向下推,以接近实际数据生成。
SupportsLimitPushDown 允许将限制(预期的最大生成记录数)下推到DynamicTableSource.
SupportsPartitionPushDown 允许将可用分区传递给规划器并将分区下推到DynamicTableSource. 在运行时,源将只从传递的分区列表中读取数据以提高效率。
SupportsProjectionPushDown 能够将(可能是嵌套的)投影下推到DynamicTableSource. 为了提高效率,源可以将投影进一步向下推,以接近实际的数据生成。如果源也实现SupportsReadingMetadata了,源也将只读取所需的元数据。
SupportsReadingMetadata 允许从DynamicTableSource. 源负责在生成的行末尾添加所需的元数据。这包括可能从包含的格式转发元数据列。
SupportsWatermarkPushDown 能够将水印策略下推到DynamicTableSource. 水印策略是时间戳提取和水印生成的构建器/工厂。在运行时,水印生成器位于源内部,并且能够生成每个分区的水印。
SupportsSourceWatermark 可以完全依赖ScanTableSource 自身提供的水印策略。因此,CREATE TABLEDDL 能够使用SOURCE_WATERMARK()内置标记功能,计划器将检测到该功能,并在可用时将其转换为对该接口的调用。

3.3 Dynamic Table Sink - 动态表接收器

根据定义,动态表可以随时间变化。在编写动态表时,可以始终将内容视为更改日志(有限或无限),其中所有更改都被连续写出,直到更改日志用完为止。返回的更改日志模式 指示接收器在运行时接受的更改集。

  • 对于常规批处理场景,接收器可以仅接受仅插入行并写出有界流。
  • 对于常规的流式处理方案,接收器只能接受仅插入行,并且可以写出无界流。
  • 对于变更数据捕获 (CDC) 场景,接收器可以使用插入、更新和删除行写出有界或无界流。

表接收器可以实现进一步的能力接口,所有能力都可以在org.apache.flink.table.connector.sink.abilities 包装中找到,并列在水槽能力表中。

运行时实现DynamicTableSink必须使用内部数据结构。因此,记录必须被接受为org.apache.flink.table.data.RowData, 该框架提供了运行时转换器,因此接收器仍然可以在通用数据结构上工作并在开始时执行转换。

接口 描述
SupportsOverwrite 允许覆盖DynamicTableSink. 默认情况下,如果未实现此接口,则无法使用例如 SQL INSERT OVERWRITE子句覆盖现有表或分区。
SupportsPartitioning 允许将分区数据写入DynamicTableSink.
SupportsWritingMetadata 允许将元数据列写入DynamicTableSource. 表接收器负责在消费行的末尾接受请求的元数据列并将它们持久化。这包括可能将元数据列转发到包含的格式。

3.4 Encoding / Decoding Formats - 编码/解码格式

一些表连接器接受对键和/或值进行编码和解码的不同格式。

格式的工作方式类似于模式DynamicTableSourceFactory -> DynamicTableSource -> ScanRuntimeProvider,工厂负责翻译选项,源负责创建运行时逻辑。

因为格式可能位于不同的模块中,所以使用类似于表工厂的 Java 服务提供者接口来发现它们。为了发现格式工厂,动态表工厂搜索与工厂标识符和特定于连接器的基类相对应的工厂。

例如,Kafka 表源需要一个DeserializationSchema作为运行时接口的解码格式。因此,Kafka 表源工厂使用该value.format选项的值来发现一个DeserializationFormatFactory.

当前支持以下格式工厂:

org.apache.flink.table.factories.DeserializationFormatFactory
org.apache.flink.table.factories.SerializationFormatFactory
• 1
• 2

格式工厂将选项转换为EncodingFormatDecodingFormat。这些接口是另一种为给定数据类型生成专用格式运行时逻辑的工厂。

例如:对于 Kafka 表源工厂,DeserializationFormatFactory将返回一个EncodingFormat<DeserializationSchema> 可以传递给 Kafka 表源的值。

04 案例演示

需求表源使用简单的单线程SourceFunction打开一个监听传入字节的socket,原始字节通过可插入格式解码成行,该格式需要一个更改日志标志作为第一列

下面讲解所有组件是如何协同工作的,比如:

  • 创建解析和验证选项的工厂
  • 实现表连接器
  • 实现和发现自定义格式,
  • 使用提供的工具类,例如:数据结构转换器和FactoryUtil.

我们将使用上面提到的大部分接口来启用以下 DDL

CREATE TABLE UserScores (name STRING, score INT)
WITH (
  'connector' = 'socket',
  'hostname' = 'localhost',
  'port' = '9999',
  'byte-delimiter' = '10',
  'format' = 'changelog-csv',
  'changelog-csv.column-delimiter' = '|'
);

由于该格式支持变更日志语义,我们能够在运行时摄取更新并创建一个可以持续评估变化数据的更新视图:

SELECT name, SUM(score) FROM UserScores GROUP BY name;

使用以下命令在终端中抓取数据:

> nc -lk 9999
INSERT|Alice|12
INSERT|Bob|5
DELETE|Alice|12
INSERT|Alice|18

4.1 工厂实现

这里说明如何将来自目录的元数据(Metadata)转换为具体的连接器实例。

注意:两个工厂都已添加到META-INF/services目录中

4.1.1 SocketDynamicTableFactory

这里将SocketDynamicTableFactory目录表转换为表源(因为表源需要解码格式,所以FactoryUtil为了方便起见,我们正在使用提供的格式来发现格式)。

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
public class SocketDynamicTableFactory implements DynamicTableSourceFactory {
  // define all options statically
  public static final ConfigOption<String> HOSTNAME = ConfigOptions.key("hostname")
    .stringType()
    .noDefaultValue();
  public static final ConfigOption<Integer> PORT = ConfigOptions.key("port")
    .intType()
    .noDefaultValue();
  public static final ConfigOption<Integer> BYTE_DELIMITER = ConfigOptions.key("byte-delimiter")
    .intType()
    .defaultValue(10); // corresponds to '\n'
  @Override
  public String factoryIdentifier() {
    return "socket"; // used for matching to `connector = '...'`
  }
  @Override
  public Set<ConfigOption<?>> requiredOptions() {
    final Set<ConfigOption<?>> options = new HashSet<>();
    options.add(HOSTNAME);
    options.add(PORT);
    options.add(FactoryUtil.FORMAT); // use pre-defined option for format
    return options;
  }
  @Override
  public Set<ConfigOption<?>> optionalOptions() {
    final Set<ConfigOption<?>> options = new HashSet<>();
    options.add(BYTE_DELIMITER);
    return options;
  }
  @Override
  public DynamicTableSource createDynamicTableSource(Context context) {
    // either implement your custom validation logic here ...
    // or use the provided helper utility
    final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
    // discover a suitable decoding format
    final DecodingFormat<DeserializationSchema<RowData>> decodingFormat = helper.discoverDecodingFormat(
      DeserializationFormatFactory.class,
      FactoryUtil.FORMAT);
    // validate all options
    helper.validate();
    // get the validated options
    final ReadableConfig options = helper.getOptions();
    final String hostname = options.get(HOSTNAME);
    final int port = options.get(PORT);
    final byte byteDelimiter = (byte) (int) options.get(BYTE_DELIMITER);
    // derive the produced data type (excluding computed columns) from the catalog table
    final DataType producedDataType =
            context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();
    // create and return dynamic table source
    return new SocketDynamicTableSource(hostname, port, byteDelimiter, decodingFormat, producedDataType);
  }
}

4.1.2 ChangelogCsvFormatFactory

ChangelogCsvFormatFactory的功能主要是转换为特定的格式。

FactoryUtilinSocketDynamicTableFactory 负责相应地调整选项键(option keys)并处理前缀,如changelog-csv.column-delimiter.

因为这个工厂实现DeserializationFormatFactory了 ,它也可以用于支持反序列化格式的其他连接器,例如 Kafka 连接器。

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableFactory;
public class ChangelogCsvFormatFactory implements DeserializationFormatFactory {
  // define all options statically
  public static final ConfigOption<String> COLUMN_DELIMITER = ConfigOptions.key("column-delimiter")
    .stringType()
    .defaultValue("|");
  @Override
  public String factoryIdentifier() {
    return "changelog-csv";
  }
  @Override
  public Set<ConfigOption<?>> requiredOptions() {
    return Collections.emptySet();
  }
  @Override
  public Set<ConfigOption<?>> optionalOptions() {
    final Set<ConfigOption<?>> options = new HashSet<>();
    options.add(COLUMN_DELIMITER);
    return options;
  }
  @Override
  public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
      DynamicTableFactory.Context context,
      ReadableConfig formatOptions) {
    // either implement your custom validation logic here ...
    // or use the provided helper method
    FactoryUtil.validateFactoryOptions(this, formatOptions);
    // get the validated options
    final String columnDelimiter = formatOptions.get(COLUMN_DELIMITER);
    // create and return the format
    return new ChangelogCsvFormat(columnDelimiter);
  }
}

4.2 表源和解码格式

这里讲解如何从计划层的实例转换为交付到集群的运行时实例。

4.2.1 SocketDynamicTableSource

SocketDynamicTableSource在计划期间使用。在我们的示例中,我们没有实现任何可用的能力接口。

因此,主要逻辑(getScanRuntimeProvider(...)) 可以在我们实例化所需SourceFunction及其DeserializationSchema运行时的地方找到。两个实例都被参数化以返回内部数据结构(即RowData)。

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
public class SocketDynamicTableSource implements ScanTableSource {
  private final String hostname;
  private final int port;
  private final byte byteDelimiter;
  private final DecodingFormat<DeserializationSchema<RowData>> decodingFormat;
  private final DataType producedDataType;
  public SocketDynamicTableSource(
      String hostname,
      int port,
      byte byteDelimiter,
      DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
      DataType producedDataType) {
    this.hostname = hostname;
    this.port = port;
    this.byteDelimiter = byteDelimiter;
    this.decodingFormat = decodingFormat;
    this.producedDataType = producedDataType;
  }
  @Override
  public ChangelogMode getChangelogMode() {
    // in our example the format decides about the changelog mode
    // but it could also be the source itself
    return decodingFormat.getChangelogMode();
  }
  @Override
  public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
    // create runtime classes that are shipped to the cluster
    final DeserializationSchema<RowData> deserializer = decodingFormat.createRuntimeDecoder(
      runtimeProviderContext,
      producedDataType);
    final SourceFunction<RowData> sourceFunction = new SocketSourceFunction(
      hostname,
      port,
      byteDelimiter,
      deserializer);
    return SourceFunctionProvider.of(sourceFunction, false);
  }
  @Override
  public DynamicTableSource copy() {
    return new SocketDynamicTableSource(hostname, port, byteDelimiter, decodingFormat, producedDataType);
  }
  @Override
  public String asSummaryString() {
    return "Socket Table Source";
  }
}

4.2.2 ChangelogCsvFormat

ChangelogCsvFormatDeserializationSchema在运行时使用的解码格式。它支持INSERTDELETE更改。

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.DynamicTableSource.DataStructureConverter;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.types.RowKind;
public class ChangelogCsvFormat implements DecodingFormat<DeserializationSchema<RowData>> {
  private final String columnDelimiter;
  public ChangelogCsvFormat(String columnDelimiter) {
    this.columnDelimiter = columnDelimiter;
  }
  @Override
  @SuppressWarnings("unchecked")
  public DeserializationSchema<RowData> createRuntimeDecoder(
      DynamicTableSource.Context context,
      DataType producedDataType) {
    // create type information for the DeserializationSchema
    final TypeInformation<RowData> producedTypeInfo = (TypeInformation<RowData>) context.createTypeInformation(
      producedDataType);
    // most of the code in DeserializationSchema will not work on internal data structures
    // create a converter for conversion at the end
    final DataStructureConverter converter = context.createDataStructureConverter(producedDataType);
    // use logical types during runtime for parsing
    final List<LogicalType> parsingTypes = producedDataType.getLogicalType().getChildren();
    // create runtime class
    return new ChangelogCsvDeserializer(parsingTypes, converter, producedTypeInfo, columnDelimiter);
  }
  @Override
  public ChangelogMode getChangelogMode() {
    // define that this format can produce INSERT and DELETE rows
    return ChangelogMode.newBuilder()
      .addContainedKind(RowKind.INSERT)
      .addContainedKind(RowKind.DELETE)
      .build();
  }
}

4.3 Runtime运行时

接下来将说明SourceFunctionDeserializationSchema的运行时逻辑。

4.3.1 ChangelogCsvDeserializer

ChangelogCsvDeserializer包含一个简单的解析逻辑,用于将字节转换Integer 类型Row行为String,最后的转换步骤将它们转换为内部数据结构。

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.connector.RuntimeConverter.Context;
import org.apache.flink.table.connector.source.DynamicTableSource.DataStructureConverter;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
public class ChangelogCsvDeserializer implements DeserializationSchema<RowData> {
  private final List<LogicalType> parsingTypes;
  private final DataStructureConverter converter;
  private final TypeInformation<RowData> producedTypeInfo;
  private final String columnDelimiter;
  public ChangelogCsvDeserializer(
      List<LogicalType> parsingTypes,
      DataStructureConverter converter,
      TypeInformation<RowData> producedTypeInfo,
      String columnDelimiter) {
    this.parsingTypes = parsingTypes;
    this.converter = converter;
    this.producedTypeInfo = producedTypeInfo;
    this.columnDelimiter = columnDelimiter;
  }
  @Override
  public TypeInformation<RowData> getProducedType() {
    // return the type information required by Flink's core interfaces
    return producedTypeInfo;
  }
  @Override
  public void open(InitializationContext context) {
    // converters must be open
    converter.open(Context.create(ChangelogCsvDeserializer.class.getClassLoader()));
  }
  @Override
  public RowData deserialize(byte[] message) {
    // parse the columns including a changelog flag
    final String[] columns = new String(message).split(Pattern.quote(columnDelimiter));
    final RowKind kind = RowKind.valueOf(columns[0]);
    final Row row = new Row(kind, parsingTypes.size());
    for (int i = 0; i < parsingTypes.size(); i++) {
      row.setField(i, parse(parsingTypes.get(i).getTypeRoot(), columns[i + 1]));
    }
    // convert to internal data structure
    return (RowData) converter.toInternal(row);
  }
  private static Object parse(LogicalTypeRoot root, String value) {
    switch (root) {
      case INTEGER:
        return Integer.parseInt(value);
      case VARCHAR:
        return value;
      default:
        throw new IllegalArgumentException();
    }
  }
  @Override
  public boolean isEndOfStream(RowData nextElement) {
    return false;
  }
}

4.3.2 SocketSourceFunction

SocketSourceFunction打开一个套接字并消费字节。它通过给定的字节分隔符(\n默认情况下)拆分记录,并将解码委托给可插入的DeserializationSchema,源函数只能在并行度为 1 的情况下工作。

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.data.RowData;
public class SocketSourceFunction extends RichSourceFunction<RowData> implements ResultTypeQueryable<RowData> {
  private final String hostname;
  private final int port;
  private final byte byteDelimiter;
  private final DeserializationSchema<RowData> deserializer;
  private volatile boolean isRunning = true;
  private Socket currentSocket;
  public SocketSourceFunction(String hostname, int port, byte byteDelimiter, DeserializationSchema<RowData> deserializer) {
    this.hostname = hostname;
    this.port = port;
    this.byteDelimiter = byteDelimiter;
    this.deserializer = deserializer;
  }
  @Override
  public TypeInformation<RowData> getProducedType() {
    return deserializer.getProducedType();
  }
  @Override
  public void open(Configuration parameters) throws Exception {
    deserializer.open(() -> getRuntimeContext().getMetricGroup());
  }
  @Override
  public void run(SourceContext<RowData> ctx) throws Exception {
    while (isRunning) {
      // open and consume from socket
      try (final Socket socket = new Socket()) {
        currentSocket = socket;
        socket.connect(new InetSocketAddress(hostname, port), 0);
        try (InputStream stream = socket.getInputStream()) {
          ByteArrayOutputStream buffer = new ByteArrayOutputStream();
          int b;
          while ((b = stream.read()) >= 0) {
            // buffer until delimiter
            if (b != byteDelimiter) {
              buffer.write(b);
            }
            // decode and emit record
            else {
              ctx.collect(deserializer.deserialize(buffer.toByteArray()));
              buffer.reset();
            }
          }
        }
      } catch (Throwable t) {
        t.printStackTrace(); // print and continue
      }
      Thread.sleep(1000);
    }
  }
  @Override
  public void cancel() {
    isRunning = false;
    try {
      currentSocket.close();
    } catch (Throwable t) {
      // ignore
    }
  }
}
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
3月前
|
消息中间件 分布式计算 大数据
大数据-113 Flink DataStreamAPI 程序输入源 自定义输入源 非并行源与并行源
大数据-113 Flink DataStreamAPI 程序输入源 自定义输入源 非并行源与并行源
62 0
|
4月前
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。
|
3月前
|
分布式计算 监控 大数据
大数据-114 Flink DataStreamAPI 程序输入源 自定义输入源 Rich并行源 RichParallelSourceFunction
大数据-114 Flink DataStreamAPI 程序输入源 自定义输入源 Rich并行源 RichParallelSourceFunction
69 0
|
8月前
|
Oracle NoSQL 关系型数据库
实时计算 Flink版产品使用合集之MongoDB CDC connector的全量快照功能可以并发读取吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
132 2
|
5月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之如何配置Connector来保持与MySOL一致
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
消息中间件 存储 关系型数据库
实时计算 Flink版产品使用问题之如何使用Kafka Connector将数据写入到Kafka
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
消息中间件 Kafka 数据处理
实时计算 Flink版操作报错合集之使用kafka connector时,报错:java.lang.ClassNotFoundException,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
6月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之在自定义RichSinkFunction中,如何获取source的schema
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
7月前
|
NoSQL 关系型数据库 Java
实时计算 Flink版产品使用问题之如何使用Flink MongoDB Connector连接MongoDB
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
7月前
|
关系型数据库 数据库 流计算
实时计算 Flink版操作报错合集之在使用Flink CDC TiDB Connector时,无法获取到事件,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
416 0