[企业云-实时计算]SLS 全新 Connector 实现来看 Flink Connector 的细节(FLIP-27/FLIP-191)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 背景根据之前的企业云的实时计算架构可以知道,我们的选型中,SLS 承担的重要的角色:打通弹外向弹内实时数据回流的链路作为接口 Interface,向工程和算法同学提供宽表和服务,满足业务的自定义需求。但是 SLS 的 connector 怎么说呢,几个字:严重不满足需求。目前网上搜索,大体上有以下几个实现aliyun-log-flink-connector:貌似是SLS 官方团队的实现,只提供了 

背景

根据之前的企业云的实时计算架构可以知道,我们的选型中,SLS 承担的重要的角色:

  1. 打通弹外向弹内实时数据回流的链路
  2. 作为接口 Interface,向工程和算法同学提供宽表和服务,满足业务的自定义需求。

但是 SLS 的 connector 怎么说呢,几个字:严重不满足需求。

目前网上搜索,大体上有以下几个实现

  1. aliyun-log-flink-connector 貌似是SLS 官方团队的实现,只提供了 Datastream 接口。实现也比较老了。
  2. alibaba-flink-connectors : 阿里云官方的版本,也没有 SQL 接口,和上面貌似一样。最近更新是 21 年 5 月
  3. 日志服务SLS源表:VVR 的官方版本,终于有了 SQL 版本。但是文档很少,一些关键功能没有提供。后面会提到,貌似实现的版本时基于 BLINK 版本,所以很多 Flink 功能不支持(例如 1.15 中的)。

所以本篇文章聊一聊当前的 Flink Source / Sink API,以及业务场景下,为什么及如何重构 SLS connector。

业务需求

目前业务上的需求(现有 VVR 版本 SLS Connector 不满足的),主要有以下几个点:

1. 不支持 Watermark Pushdown

Sls Source 不支持 Watermark Pushdown,在做流关联等需要 watermark 的场景,补数据的时候很容易丢数据。

例如我们业务日志的采集通过 logtail 进行,所有日志都在一个 logstore 中,工程通过 logType 字段来区分;由于 logtail 没有配置负载均衡策略(hash key),所以需要的 logType 有可能在各个 shard 是倾斜严重的。

这时候如果进行消费,会出现若干 shard 消费线程很快;某些 shard 消费很慢,watermark 被 push 的很快,流关联很容易丢数据。

同时由于 Logstore 的 shard 数很大(高峰期 60+,加上 Readonly 实例 80+),flink 资源有限,很难开 80+的并发度(一个并发消费一个 shard)。

解决方案一般采用 Flink-27 的接口实现 Source 解决,下面会分析。

2.  不支持动态分区自适应

如果流量徒增,SLS shard 进行分裂,Flink 任务会重启才能消费新的Shard布局(!!!!);

有时候业务高峰,shard 不断分裂,带来三个问题:

  1. 数据来不及补:每次重启都从 checkpoint 读取,分裂频繁情况下延迟越来越高
  2. watermark 倾斜:由于上述 shard 倾斜问题,导致 watermark 容易增长过快,导致流关联等丢数据
  3. EXACTLY ONCE:由于 sls 本身不支持幂等(支持,下面会说,但是比较隐蔽),所以每次重启,都可能导致重复的数据写入,下游统计会出问题

所以如何能够再不重启任务的情况下,动态自适应的消费 sls 的 shards 变化,是一个比较重要的业务需求。

3.  原版数据类型支持很差

原版 VVR 的 SLS 有个很有意思的事情,我举个例子:

CREATE TEMPORERY TABLE tbl (
  intField INT,
  strField STRING,
  timeField TIMESTAMP(3),
  dateFIeld DATE
)
WITH (
  'connector' = 'sls'
);

DDL 中有 4 列,分别是 INT、STRING、TIMESTAMP(3)、DATE 类型。

写入时没有什么问题。但是如果希望通过元数据管理(Catalog)复用 DDL,那么就会遇到以下问题:

  1. 如果非 STRING 类型的字段,存在空值,写入时 connector 会处理成空字符串;读取时会报 NumberFormatError,无法将空字符串解析成 INT 等数值类型
  2. sls connector 不支持 DATE 和 TIMESTAMP 类型的数据解析,而且还不报错,静默给了 null。

这两个问题卡了我们好久找不到原因(读取时没数据),最后翻源码才解决掉。

这意味着,想要声明一次 DDL,读取、写入复用是没戏的,必须准备两个 DDL 分别应付读和写场景。

目前已经给 VVR 官方提了 PR,但是不知道什么时候能合并,同时下一次发版什么时候目前也不知道。

4. 无法 shard hash 负载均衡 

其实不是不支持,只是文档里没写。翻阅代码会发现有个 partitionField 的选项,在 options 里声明,会根据这个字段 MD5 并拿到 hash key。

但是官方文档的 metadata 主要通过 options 中进行配置,估计是实现比较老(从 Blink 流传下来的?)。Flink 官方通过 Flip-107 ,通过 TableFactory 实现 SupportsReadingMetadata 和 SupportsWriteMetadata 来支持。

VVR SLS Connector 的 Source 实现中实现了SupportsReadingMetadata;但是 Sink 未实现 SupportsWriteMetadata。

其实这里更合理的是通过 PARTITION BY 确定 hash key,或者主键来确定 Hash key。

也看到 Aone 有外部客户提过这个需求,但是貌似没有进展。

5. source 消费时 Shard 和 Parallism 分配不合理

当前 VVR 版本的 SLS Connector 在 shard 和 reader 分配时,采用的是 ModAssign 的模式(ShardId  mod ReaderNum)。

目前暂未发现 Sls ShardId 的规律,每次分裂、合并后均为生成一个新的(貌似是递增)的 ShardId。

在不指定 SLS Hash Key 的情况下,默认 SLS 采用的是负载均衡模式;此模式下随机投递到具体的 Shard 上,那么 ModAssign 模式没有什么问题;但是如果采用 Hash Key 方式的时候,SLS 采用的是 MD5 Hash Key 前缀进行分配 Shard 的方式,会导致分配到 Flink Reader 的任务负载是不一致的。

所以在采用 Hash Key 的情况下,使用 Consistent Hash 的方式,将分裂后或合并后的 Shard 分配给相同的 Reader,保证 Reader 间负载均衡。

 

6. 不支持 Consumer Group Resume

其实原有 Flink Connector 已经支持了 SLS Consumer Group,但是只支持更新,不支持从 SLS Consumer Group Resume。

这意味着其实 SLS Consumer Group 只是用来观察、报警,没有太大用途。

也导致一个问题,就是如果 SQL 逻辑发生较大的变化,Flink State 无法恢复时,是无法从 Consumer Group 断点续传来实现 Exactly Once 语义的。

所以支持从 Consumer Group Checkpoint 断点启动任务的需求是迫切的。

新的 Connector 实现

这里主要介绍下,为了满足业务需求,在新的实现中,一些关键点和特性。

Source 实现

FLIP-27: 新的 Flink Source 接口 

在 1.12 版本中,Flink 提供了新的 Source 接口:

  1. 提供了更好的抽象,实现简单(理解需要花些时间)
  2. 可以支持 Watermark Pushdown 到 source 侧,同时提供了 reader 间的等待 pause,防止某个 shard 过度消费。
  3. 流批一体:一个实现,同时支持流处理和批处理。
  4. Datastream 和 Table API 统一:同一个实现,简单开发即可支持 Datastream 和 Table API(SQL)

后续也有一些 FLIP 基于此 Proposal 进行,包括不限于 Watermark Alignment、Split Balance 等。 

所以基于 FLIP-27 实现新的 SLS connector 是最好的选择。

 

从图上可以看出,要实现 FLIP-27,主要需要实现以下几个类(按照我实现的先后顺序进行):

  1. SourceSplitReader:功能及其简单,只关注于如果从数据源(SLS)拉取数据,并生成 RecordsBySplitIds 即可
  2. SplitFetcher:几乎没有太多逻辑,参考 Kafka 的 connector 实现.
  3. RecordEmiiter:将 RecordsBySplitIds 处理成最后需要的 InputT 类型,一般 DeserializationSchema 在此进行。如果有数据源的 Watermark(例如 SLS 就有 __time__作为采集时间),可以在这里输出给 SourceOutput,方便 Flink 进行数据源对齐。
  4. SourceReaderBase:主要将 SplitEnumerator 传入的 Split 进行分配和分发;同时 checkpoint 触发、回滚时,需要将 split 进行分配。
  5. SplitEnumerator:split 的发现。在本场景中,就是 SLS shard 的分裂、合并等发现,并生成 Split 传给对应的 SourceReader。分配方式可以指定。

在 Fetcher 模型选型上,最终选择了一个 Shard 一个 SplitFetcher 线程。好处是一个 Fetcher 只关注一个具体的 shard;同时如果 shard 发生变化,那么该 fetcher 线程关闭即可。

新的 ShardAssigner 抽象

实现过程中,抽象了ShardAssigner 的 interface 出来,用于在 SplitEnumerator 中合理的将Shard 分配至若干 Reader 中。

public interface ShardAssigner extends Serializable {

  static ShardAssigner consistent() {
    return new ConsistentHashShardAssigner();
  }

  static ShardAssigner roundRobin() {
    return new RoundRobinShardAssigner();
  }

  int assignToReader(LogStoreShard shard, int numReaders);
}

例如一致性 Hash 分配的实现:

public class ConsistentHashShardAssigner implements ShardAssigner {
  private static final Logger LOG = LoggerFactory.getLogger(ConsistentHashShardAssigner.class);

  @Override
  public int assignToReader(LogStoreShard shard, int numReaders) {
    Integer beginKey = Integer.parseInt(shard.getHashKeyBegin().substring(0, 4), 16);
    Integer endKey = Integer.parseInt(shard.getHashKeyEnd().substring(0, 4), 16);
    int middleKey = (endKey + beginKey) / 2;
    assert middleKey >= 0;
    int keysPerShard = 65536 / numReaders;
    LOG.debug(
        "Assign Shard(id={}) with middle key [0x{}@{}] to {}, (keysPerShard={}, numReaders={})",
        shard.getShardId(),
        Integer.toHexString(middleKey),
        middleKey,
        middleKey / keysPerShard,
        keysPerShard,
        numReaders);
    return middleKey / keysPerShard;
  }

原理是根据 SLS Shard 的 HashKeyStart/HashKeyEnd 信息,取前 4 位(16^4=65536)进行计算中心,判断其属于哪个 Reader。

这样在 Shard 合并、分裂后,我们能够根据新的 Hash Key Range 将新的 Shard 分配到原来 Shard 所在的 Reader,从而实现Reader 的负载均衡。

CursorInitializer 抽象

为了支持不同的初始化读取方式,参考 Kafka Connector 的实现,我们抽象了 CursorInitializer 接口,通过实现该接口,可以满足不同启动初始化方式。

public interface CursorInitializer extends Serializable {
  int LATEST_TIME_FOR_CURSOR = Integer.MAX_VALUE;
  int EARLIEST_TIME_FOR_CURSOR = 0;
  
  
  Map<LogStoreShard, String> getShardCursor(
      Collection<LogStoreShard> shards, ShardCursorRetriever retriever);

  CursorResetStrategy getAutoCursorResetStrategy();
  
  static CursorInitializer timestamp(int timestamp) {
    return new TimestampCursorInitializer(timestamp);
  }

  static CursorInitializer earliest() {
    return new TimestampCursorInitializer(EARLIEST_TIME_FOR_CURSOR);
  }

  static CursorInitializer latest() {
    return new TimestampCursorInitializer(LATEST_TIME_FOR_CURSOR);
  }

  static CursorInitializer end() {
    return new StoppingCursorInitializer();
  }

  static CursorInitializer checkpoint() {
    return new ConsumerCheckpointCursorsInitializer();
  }
}

目前已经实现了几种初始化方式,满足的绝大部分的业务需求,同时支持了通过 Consumer Group Checkpoint 恢复的能力:

  1. earliest:从 shard 最早的位置开始消费
  2. latest:从 shard  最新的位置的下一个点开始消费
  3. end:主要针对  READ-ONLY 节点,获取Shard 最后的数据位置
  4. checkpoint:通过 Consumer Group 消费组来恢复上一次读取的节点
  5. timestamp:指定时间进行读取

DeserializationSchema 的实现

SLS 的 SDK 读取的数据格式是一串 Protobuf 格式的二进制数组,结构由外至内分别为:

  1. LogGroup 的数组:一次拉取可能包含若干个 LogGroup
  2. LogGroup:SLS 提交和拉取的单位,包含若干 Log;同时 Tags/Source/Topic等 meta 信息存在这一层级
  3. Log:一条日志。包含若干字段组(LogContent),同时含有 Time 的 meta 信息
  4. LogContent:KV 字段组

从层级上来看,对于 Flink 实现不太友好,尤其是 metadata 信息存在于多级(LogGroup 和 Log),在 DeserializationSchema 中无法抽象出来。

SLS SDK 中提供了两个类实现,例如真多 LogGroup:

  1. LogGroup:使用 Protobuf 格式进行解析;可以转换为 FastLogGroup。
  2. FastLogGroup:直接字符遍历;相比较 LogGroup 的特点是,ZeroCopy,不会对 byte[] 数组进行拷贝。

其实 VVR SLS Connector 中单独使用了一套解析方法,没仔细看,估计是为了控制内存占用。

但是两个实现对于 Flink 的友好程度都不够,在 Sink 端尤其明显(后续会提到)。

所以这里封装了LogGroup 和 LogRecord 两个类,内部使用 FastLog,后续可以考虑自己 parse。

其中 LogRecord 包含了从 LogGroup 继承的所有 Metadata(只是增加了引用),所以资源占用还好。

综上 DeserializationSchema 抽象了了两个类出来:

  1. SlsLogGroupDeserializationSchema :对 LogGroup 进行解析,可以自行操作所有的 Metadata 信息。
  2. SlsLogDeserializationSchema :对 LogRecord 进行解析。绝大多数的人需要扩展时,只需要继承这个类就可以了。

同时为了方便使用 2 进行扩展的同学,也提供了 Wrapper 类将SlsLogDeserializationSchema转换成SlsLogGroupDeserializationSchema的供程序进行调用。

public class SlsLogOnlyDeserializationSchemaWrapper<T>
    implements SlsLogGroupDeserializationSchema<T> {
  private final SlsLogDeserializationSchema<T> deserializationSchema;

  private final SourceOutputCollector<T> outputCollector;

  protected SlsLogOnlyDeserializationSchemaWrapper(
      SlsLogDeserializationSchema<T> deserializationSchema) {
    this(deserializationSchema, new SourceOutputCollector<>());
  }

  protected SlsLogOnlyDeserializationSchemaWrapper(
      SlsLogDeserializationSchema<T> deserializationSchema, SourceOutputCollector<T> collector) {
    this.deserializationSchema = deserializationSchema;
    this.outputCollector = collector;
  }

  public static <V> SlsLogOnlyDeserializationSchemaWrapper<V> of(
      SlsLogDeserializationSchema<V> schema) {
    return new SlsLogOnlyDeserializationSchemaWrapper<>(schema);
  }

  /**
   * Deserializes the byte message.
   *
   * <p>Can output multiple records through the {@link Collector}. Note that number and size of the
   * produced records should be relatively small. Depending on the source implementation records can
   * be buffered in memory or collecting records might delay emitting checkpoint barrier.
   *
   * @param record The ConsumerRecord to deserialize.
   * @param out The collector to put the resulting messages.
   */
  @Override
  public void deserialize(LogGroup record, Collector<T> out) throws IOException {
    for (LogRecord log : record.getLogRecords()) {
      this.outputCollector.setTimestamp(log.getTime() * 1000L);
      this.deserializationSchema.deserialize(log, out);
    }
  }

  /**
   * Deserializes the byte message.
   *
   * <p>Can output multiple records through the {@link Collector}. Note that number and size of the
   * produced records should be relatively small. Depending on the source implementation records can
   * be buffered in memory or collecting records might delay emitting checkpoint barrier.
   *
   * @param record The ConsumerRecord to deserialize.
   * @param out The collector to put the resulting messages.
   */
  @Override
  public void deserialize(LogGroup record, SourceOutput<T> out) throws IOException {
    this.outputCollector.setSourceOutput(out);
    this.deserialize(record, this.outputCollector);
  }

  /**
   * Gets the data type (as a {@link TypeInformation}) produced by this function or input format.
   *
   * @return The data type produced by this function or input format.
   */
  @Override
  public TypeInformation<T> getProducedType() {
    return this.deserializationSchema.getProducedType();
  }
}

SupportsReadingMetadata 及 SupportsProjectionPushDown

什么是 SupportsReadingMetadata

就是 DDL 中的 METADATA 声明的列。声明了 SupportsReadingMetadata 的 TableSource,可以处理这一类元数据的产出。

CREATE TABLE kafka_table (
  id BIGINT,
  name STRING,
  offset INT METADATA,                 -- access Kafka 'offset' metadata
  headers MAP<STRING, BYTES> METADATA  -- access Kafka 'headers' metadata
) WITH (
  'connector' = 'kafka', 
  'topic' = 'test-topic', 

  'format' = 'avro'
)

SLS 中也涉及到了元数据,例如当前有用的主要是以下元数据

public enum SlsReadableMetadata {
  /** source of sls log */
  SOURCE(
      "__source__",
      DataTypes.STRING(),
      new MetadataConverter() {
        private static final long serialVersionUID = 1L;

        @Override
        public Object read(LogRecord logRecord) {
          return StringData.fromString(logRecord.getSource());
        }
      }),
  /** topic of sls log */
  TOPIC(
      "__topic__",
      DataTypes.STRING(),
      new MetadataConverter() {
        private static final long serialVersionUID = 1L;

        @Override
        public Object read(LogRecord logRecord) {
          return StringData.fromString(logRecord.getTopic());
        }
      }),
  /** timestamp of sls log */
  TIMESTAMP(
      "__timestamp__",
      DataTypes.INT().notNull(),
      new MetadataConverter() {
        private static final long serialVersionUID = 1L;

        @Override
        public Object read(LogRecord logRecord) {
          return logRecord.getTime();
        }
      }),
  /** tag of sls log */
  TAG(
      "__tag__",
      DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()),
      new MetadataConverter() {
        private static final long serialVersionUID = 1L;

        @Override
        public Object read(LogRecord logRecord) {
          return new GenericMapData(logRecord.getTags().toMap());
        }
      }),

  MACHINE_UUID(
      "__machine_uuid__",
      DataTypes.STRING(),
      new MetadataConverter() {
        private static final long serialVersionUID = 1L;

        @Override
        public Object read(LogRecord logRecord) {
          return StringData.fromString(logRecord.getMachineUuid());
        }
      });

  private final String key;

  private final DataType dataType;

  private final MetadataConverter converter;

  SlsReadableMetadata(String key, DataType dataType, MetadataConverter converter) {
    this.key = key;
    this.dataType = dataType;
    this.converter = converter;
  }

  public String getKey() {
    return key;
  }

  public DataType getDataType() {
    return dataType;
  }

  public MetadataConverter getConverter() {
    return converter;
  }
}

什么是SupportsProjectionPushDown

这个 Feature 是一个比较有用的 Feature,但是目前有部分 connector 没实现。

简单来说,就是 DDL 中声明的列,如果在后续的程序逻辑中没用被用到,那么在这里可以被优化掉,不用解析,从而减少源数据解析的开销成本。

这个对于 SLS 和我们的业务场景非常有用,例如上述业务分析中提到,我们采用的是一个 LogStore 中若干类型的日志格式都有。

一般情况下,我们会声明一个大而全的 DDL 保存在 Catalog 元数据管理中。

在解析的时候和使用的时候,可能某个 logType 只用的很少数的几列,那么其实没必要所有的字段都进行解析,中间涉及到大量的 byte[]拷贝和解码,如果能够跳过,能够大幅减少资源占用。

为什么两个功能放在一起聊

这两个功能其实在功能上会有重叠,实现时会遇到不小的坑,记录一下。

原理上在使用SupportsReadingMetadata 时,如果 DDL 中有声明的 metadta 列,planner 会调用 applyReadableMetadata 函数。我们看下接口的说明:

		/**
		 * Provides a list of metadata keys that the produced {@link RowData} must contain as appended
     * metadata columns.
     *
     * <p>Implementations of this method must be idempotent. The planner might call this method
     * multiple times.
     *
     * <p>Note: Use the passed data type instead of {@link ResolvedSchema#toPhysicalRowDataType()}
     * for describing the final output data type when creating {@link TypeInformation}. If the
     * source implements {@link SupportsProjectionPushDown}, the projection is already considered in
     * the given output data type, use the {@code producedDataType} provided by this method instead
     * of the {@code producedDataType} provided by {@link
     * SupportsProjectionPushDown#applyProjection(int[][], DataType)}.
     *
     * @param metadataKeys a subset of the keys returned by {@link #listReadableMetadata()}, ordered
     *     by the iteration order of returned map
     * @param producedDataType the final output type of the source, it is intended to be only
     *     forwarded and the planner will decide on the field names to avoid collisions
     * @see DecodingFormat#applyReadableMetadata(List)
     */
		 void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType);

该函数提供了两个入参,分别是用到的元数据列表,和最后要产生的数据类型 producedDataType。

源数据列表好理解,producedDataType 应该怎么处理?

我们看到备注里有描述,producedDataType 是DeserializationSchema 等要产生的最终 TypeInformation 要用到的,其中包含了物理列和元数据列的所有信息;但是我们 DeserializationSchema 中,一般情况下都是处理物理列的。

另外一个关键点是,这里提到了SupportsProjectionPushDown和SupportsReadingMetadata同时支持时,如果处理的建议;最开始实现时没注意,吃了大亏。

简单来说,TableSource 中需要处理两个数据类型,我们分别命名为:

  1. PhysicalDataType:物理列的类型。SupportsProjectionPushDown 会将用到的列给出来,没用到的列忽略掉;我们需要根据这个类型,构造 DeserializationSchema,只 parse 用到的列,从而减少 parse 的 overhead;
  2. ProducedDataType:最后 TableSource 产出的列的类型,包含了物理列和元数据列。一般情况下是物理列在前,源数据列在后。实现的时候可以通过JoinedRowData类合并物理列和元数据列。其中 DeserializationSchema 中 getProducedType函数需要返回ProducedDataType的 TypeInformation。

所以这里要认真处理,否则会出现 source 结果读取不正确空值,甚至报错的场景。

一个 Flink VVR 的坑

在执行时,如果使用了同时支持SupportsReadingMetadata和 SupportsProjectionPushDown 的 Connector(例如 Kafka 和本 conector),并且使用了 FROM METADATA 的用法,例如下述:

CREATE TEMPORARY TABLE tbl_src
(
	phycial_col STRING,
  topic STRING METADATA __topic__
) WITH (
	'connector' = 'sls-ng'
);

SELECT * FROM tbl_src;

则会报错 java.lang.ArrayIndexOutOfBoundsException: -1 。开源 1.15.3 的 Planner 没问题。这个已经提交 BUG 给 VVR 团队了。

Sink 实现

Flink 在 1.15 实现了 Flip-191 ,新的统一的 unified SinkV2 interface。

提供了若干扩展能力,可以支持 PreWrite/PreCommit/PostCommit 的拓扑逻辑处理,功能较为强大。

但是我们主要聊的不是这个 FLIP,SLS 的写入用不到这么复杂的逻辑。

我们主要用到的是 AWS Contribute 的 FLIP-171: Async Sink,值得一提的是,首要实现是 AWS 的 Kinesis 产品,对标的和 SLS 比较像,sign....

在 FLIP-171 中,提供了全新的 AsyncSink 的实现封装,用户只需要简单的实现 AsyncSinkWriter (主要写入逻辑)和 AsyncSink (只需要进行构造),Flink 将为你提供以下的能力:

  1. throttle 限速支持:支持队列、分批提交,以及根据 byte 大小、或者请求数进行限速,当 sink 端能力受限时,及时背压上游,防止打爆 sink 端
  2. 写入的异常处理:如果写入异常,会重新返回队列进行重试,防止丢数据
  3. checkpoint 支持:不需要关注 checkpoint;AsyncWriter 提供了封装能力。我们只需要关注  RequestEntry类的序列化就可以,大幅减少开发成本。

值得一提的是,SLS 也提供了 Producer SDK,提供了几乎相同的功能。唯一的问题就是异常处理(可恢复异常的 case 如何处理)和 checkpoint 的适配。

ShardHashKeyGenerator 抽象

为了支持从输入中提取 Hash Key 的能力,我们提供了 ShardHashKeyGenerator 的抽象函数接口:

@PublicEvolving
@FunctionalInterface
public interface ShardHashKeyGenerator<InputT> extends Function<InputT, String>, Serializable {
  static <V> ShardHashKeyGenerator<V> nonPartition() {
    return new NonShardHashKeyGenerator<>();
  }

  static <V> ShardHashKeyGenerator<V> adjust(ShardHashKeyGenerator<V> generator) {
    return new AdjustShardHashKeyWrapper<>(generator);
  }
}

其中提供了两个内置实现,NonShardHashKeyGenerator 和AdjustShardHashKeyWrapper:

  1. NonShardHashKeyGenerator:返回空值,进行 SLS 负载均衡语义
  2. AdjustShardHashKeyWrapper:只取 hash 值的前 3 位 (16^3 = 256)

为什么要有 AdjustShardHashKeyWrapper 的存在?

这里要从 SLS 的 SDK 聊起,我们看下 putLogs 函数

    /**
     * Construct a put log request
     *
     * @param project  project name
     * @param logStore log store name of the project
     * @param topic    topic name of the log store
     * @param source   source of the log
     * @param logItems log data
     * @param hashKey  hashKey
     */
    public PutLogsRequest(String project, String logStore, String topic,
                          String source, List<LogItem> logItems, String hashKey) {
        super(project);
        mLogStore = logStore;
        mTopic = topic;
        mSource = source;
        mlogItems = new ArrayList<LogItem>(logItems);
        mHashKey = hashKey;
    }

可以发现,一组 List<LogItem>,需要提供相同的 topic / source / hashKey。

那么意味着,如果有 1k 条数据,如果 hashKey 不同,会导致一个问题:需要调用 putLogs 函数 1k 次....sign...

所以如果不是用到 SLS 的 EXACTLY ONCE语义(隐藏功能,后续提到),一般需要 ADJUST 的方式将相同前缀的 LogItem 放到一个 Batch 中进行提交,大幅提高吞吐和调用次数。

官方的 LogProducer 中也有相关的逻辑:

public class ShardHashAdjuster {

  private static final int HEX_LENGTH = 32;

  private static final int BINARY_LENGTH = 128;

  private int reservedBits;

  public ShardHashAdjuster(int buckets) {
    if (!isPowerOfTwo(buckets)) {
      throw new IllegalArgumentException("buckets must be a power of 2, got " + buckets);
    }
    reservedBits = Integer.bitCount(buckets - 1);
  }

  public String adjust(String shardHash) {
    HashCode hashCode = Hashing.md5().hashBytes(shardHash.getBytes());
    String binary =
        Strings.padStart(new BigInteger(1, hashCode.asBytes()).toString(2), BINARY_LENGTH, '0');
    String adjustedBinary = Strings.padEnd(binary.substring(0, reservedBits), BINARY_LENGTH, '0');
    return Strings.padStart(new BigInteger(adjustedBinary, 2).toString(16), HEX_LENGTH, '0');
  }

  public static boolean isPowerOfTwo(int number) {
    return number > 0 && ((number & (number - 1)) == 0);
  }
}

另外在 TableApi 中,我们也提供了 KeyedShardHashKeyGenerator 实现,支持通过 PARTITIONED BY(key1, key2) 的 SQL 声明方式,指明 HashKey 的生成方式。优先级如下:

  1. 如果DDL 中有 PARTITION KEY,则使用 PARTITION KEY 进行 Hash
  2. 如果 DDL 没有 Partition Key,但是有主键,则使用 Priamry Key 主键进行 Hash
  3. 如果均没有,则采用负载均衡方式进行投递

WithPreWriteTopology

在 FLIP 191 中,提供了这个预写入拓扑的编辑能力。在我们的开发中,有个场景会涉及到这个:

上游处理了 case 要写入 sls,那么如果将数据分配给 Writer?

我们总是希望 Shard Hash Key 的数据能够负载均衡到同一个 Writer 上,这样能够更好的组 Batch 进行提交;同时能够充分的负载均衡 Writer。

所以我们继承 SlsSink 类,并实现 WithPreWriteTopology,在有ShardHashKeyGenerator 时,可以通过 keyBy ShardHashKey的方式,将相同的 ShardHashKey 的数据分流到相同的 Writer 进程中进行提交。

public class SlsDistributionSink extends SlsSink<RowData> implements WithPreWriteTopology<RowData> {
  private final ShardHashKeyGenerator<RowData> keyGenerator;

  /**
   * Adds an arbitrary topology before the writer. The topology may be used to repartition the data.
   *
   * @param inputDataStream the stream of input records.
   * @return the custom topology before {@link SinkWriter}.
   */
  @Override
  public DataStream<RowData> addPreWriteTopology(DataStream<RowData> inputDataStream) {
    return inputDataStream.keyBy(new RowDataDistributionKeySelector(keyGenerator));
  }

  public static class RowDataDistributionKeySelector implements KeySelector<RowData, String> {
    private static final long serialVersionUID = 7111223332619098084L;

    private final ShardHashKeyGenerator<RowData> generator;

    public RowDataDistributionKeySelector(ShardHashKeyGenerator<RowData> generator) {
      this.generator = generator;
    }

    @Override
    public String getKey(RowData row) throws Exception {
      return generator.apply(row);
    }
  }
}

总结

在这个过程中,我们基于全新的 Flink FLIP Interface,实现了 SLS 的新版 Connector。相比于 VVR 官方 SLS Connector,有如下改进:

  • Source
    • 支持 FLIP-27,从而支持了 Watermark Pushdown 及 Watermark Alignment
    • 支持 ShardAssigner 扩展,并提供了一致性 hash 的 Shard-Reader 分配方式,更加合理和负载均衡
    • 支持 Consumer Group 订阅,同时支持从 SLS Consumer Group 中恢复读取进度
    • 支持自适应 Shard 分裂/合并,无需重启服务;分裂合并后,Reader 的负载能够自适应
    • 支持几乎所有类型的 parse 和 Cast,包括 DATE 和 TIMESTAMP
    • 提供 CursorInitializer 抽象,并提供了若干任务启动初始化方式
  • Sink
    • 支持 WriteMetadata,通过__topic__, __source__, __time__提交;
    • 支持 SupportPartitioning,通过制定 Partition Key 的方式使用 SLS Hash Key
    • 通过 WithPreWriteTopology,优化 write task 负载,使得相同的 Shard Hash Key 数据会被分配到相同的 writer task 中
    • 支持 Throttle,支持配置最大请求次数、最大 batch-size、最大 buffer 大小、和 Flush 间隔的方式,控制写入端的速率。

Future Works

业务稳定使用后提 PR 给 VVR 同学

未来在内部使用稳定后,会提交给 VVR,让更多的人能够用上

支持 SLS Exactly Once 写入

类似 Kafka Upsert 的用法,通过 Hash Key 和 Sequence ID 的方式,保证相同主键的数据只保留一条。

目前相关文档极少,接口也没透出。等后续有业务需求后考虑实现。

移除 LogProducer

LogProducer 和 AsyncWriter 的功能说实话有重复了(组 Batch、queue、throttling),而且比较重。

后续可以直接使用 SLS SDK 的 putLogs 即可。

Reference

目录
相关文章
|
1天前
|
Oracle Java 关系型数据库
实时计算 Flink版操作报错合集之本地打成jar包,运行报错,idea运行不报错,是什么导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
19 6
|
1天前
|
Java 数据库连接 数据库
实时计算 Flink版操作报错合集之flink jdbc写入数据时,长时间没写入后报错,是什么原因导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
17 9
|
1天前
|
消息中间件 Oracle 关系型数据库
实时计算 Flink版操作报错合集之一直无法正常运行,并且网络状况良好,是什么原因导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
19 8
|
1天前
|
关系型数据库 MySQL 数据库
实时计算 Flink版操作报错合集之任务启动后加动态表读binlog报错如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
19 6
|
1天前
|
Oracle 关系型数据库 MySQL
实时计算 Flink版操作报错合集之写Oracle时出现的缺失等号错误如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
22 9
|
1天前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之遇到报错:Apache Kafka Connect错误如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
12 5
|
1天前
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错合集之报错:org.apache.flink.table.api.validationexception如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
8 1
|
1天前
|
存储 SQL 关系型数据库
实时计算 Flink版操作报错合集之报错:WARN (org.apache.kafka.clients.consumer.ConsumerConfig:logUnused)这个错误如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
13 3
|
4天前
|
消息中间件 API Apache
官宣|阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会
本文整理自阿里云开源大数据平台徐榜江 (雪尽),关于阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会。
1809 2
官宣|阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会
|
4天前
|
SQL Java API
官宣|Apache Flink 1.19 发布公告
Apache Flink PMC(项目管理委员)很高兴地宣布发布 Apache Flink 1.19.0。
1795 2
官宣|Apache Flink 1.19 发布公告