有人有flip-191 sink的Demo吗。或者哪里可以看具体怎么自定义
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
FLIP-191 是 Apache Flink 中关于使用新版 TableSink API 的一个 Feature,它提供了一种新的 TableSink 编写方法,以便更好地支持批处理和增量处理的组合使用。如果您想了解如何自定义 FLIP-191 TableSink,请参考下面的示例:
public abstract class MyTableSink implements AppendStreamTableSink<Row>, BatchTableSink<Row> {
    /**
     * 构造函数
     */
    protected MyTableSink() {}
    /**
     * 从 TableSink 获取基本类型信息
     *
     * @return 基本数据类型信息
     */
    @Override
    public TypeInformation<Row> getOutputType() {
        return Types.ROW(Types.STRING, Types.INT, Types.LONG);
    }
    /**
     * 构建追加流处理器
     *
     * @return 追加流处理器
     */
    protected abstract AppendStreamTableSink<Row> createAppendStreamTableSink();
    /**
     * 构建批处理器
     *
     * @param rowDataSet 批处理数据集
     */
    protected abstract void consumeDataSet(DataSet<Row> rowDataSet);
    /**
     * 获取追加流 TableSink
     *
     * @param strings 字段名称
     * @param types 字段类型
     * @return 追加流 TableSink
     */
    @Override
    public AppendStreamTableSink<Row> configure(String[] strings, TypeInformation<?>[] types) {
        return createAppendStreamTableSink();
    }
    /**
     * 处理追加流数据
     *
     * @param dataStream 数据流
     * @param tableSchema 表模式
     * @return TableSink 转化结果
     */
    @Override
    public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream, TableSchema tableSchema) {
        return createAppendStreamTableSink().consumeDataStream(dataStream, tableSchema);
    }
    /**
     * 处理批处理数据
     *
     * @param dataSet 批处理数据集
     * @param tableSchema 表模式
     */
    @Override
    public void consumeDataSet(DataSet<Row> dataSet, TableSchema tableSchema) {
        consumeDataSet(dataSet);
    }
    /**
     * 获取批处理器的返回类型集合
     *
     * @return 批处理器的返回类型集合
     */
    @Override
    public TypeInformation<Row> getOutputType(TableSchema tableSchema) {
        return Types.ROW(Types.STRING, Types.INT, Types.LONG);
    }
    /**
     * 获取输出格式器的名称
     *
     * @return 输出格式器的名称
     */
    @Override
    public String getOutputFormatName() {
        return getClass().getName();
    }
}
在这个示例中,我们定义了一个基于 AppendStreamTableSink 和 BatchTableSink 接口的抽象 TableSink 类 MyTableSink。该类中实现了 getOutputType()、getOutputFormatName(),configure() 和不同 TableSink 操作需要的 createAppendStreamTableSink() 和 consumeDataSet()。您可以根据各种任务,重写这些操作。
接下来,您可以创建一个类(例如 MySimpleTableSink 类),来继承 MyTableSink 类,并实现它中定义的各种操作。例如,下面是一个简单的 MySimpleTableSink 示例:
public class MySimpleTableSink extends MyTableSink {
    private String tableName;
    public MySimpleTableSink(String tableName) {
        this.tableName = tableName;
    }
    /**
     * 构建追加流处理器
     *
     * @return 追加流处理器
     */
    @Override
    protected AppendStreamTableSink<Row> createAppendStreamTableSink() {
        return new AppendStreamTableSink<Row>() {
            @Override
            public TableSchema getTableSchema() {
                return TableSchema.builder().fields(
                        new String[]{"name", "age", "timestamp"},
                        new TypeInformation[]{Types.STRING, Types.INT, Types.LONG}).build();
            }
            @Override
            public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
                // 输入数据流的处理
                return dataStream.addSink(
                        new FlinkKafkaProducer<>(topicName, new SimpleStringSchema(), properties));
            }
        };
    }
    /**
     * 构建批处理器
在 Flink 1.9.1 版本中,提供了一个新的 Elasticsearch Sink,可以将数据写入 Elasticsearch 中。您可以通过以下方式使用 Elasticsearch Sink:
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId>
  <version>${flink.version}</version>
</dependency>
# Elasticsearch 集群地址,多个地址用逗号分隔
rest.addresses=127.0.0.1:9200
# Elasticsearch 索引名称
rest.index=example_index
# Elasticsearch 类型名称
rest.document-type=example_doc_type
# 每批次发送的最大记录数
bulk.flush.max-size=100
# 每个批次的最长等待时间
bulk.flush.max-actions=10
# 是否开启失败重试,默认开启
bulk.flush.backoff.enable=true
# 重试等待时间
bulk.flush.backoff.delay=3000
# 重试次数
bulk.flush.backoff.max-attempts=5
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkBuilder;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import java.util.HashMap;
import java.util.Map;
public class ElasticsearchSinkExample {
  public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // 数据源
    DataStream<Map<String, Object>> events = env.fromElements(
        exampleMap("A", 1), exampleMap("B", 2), exampleMap("A", 3), exampleMap("C", 4), exampleMap("B", 5)
    );
    // Elasticsearch Sink
    ElasticsearchSink.Builder<Map<String, Object>> esSinkBuilder = new ElasticsearchSink.Builder<>(
      RestClientFactoryUtil.getRestClientFactory(restConfig), // 自定义 RestClientFactory
      new CustomIndexRequestBuilder() // 自定义 IndexRequestBuilder
    );
    events.addSink(esSinkBuilder.build());
    env.execute("Elasticsearch Sink Example");
  }
  public static Map<String, Object> exampleMap(String name, int age) {
    Map<String, Object> map = new HashMap<>();
    map.put("name", name);
    map.put("age", age);
    return map;
  }
  public static class CustomIndexRequestBuilder implements ElasticsearchSinkFunction<Map<String, Object>> {
    @Override
    public void process(Map<String, Object> element, RuntimeContext ctx, RequestIndexer indexer) {
      IndexRequest indexRequest = Requests.indexRequest()
          .index("example_index")
          .type("example_doc_type")
          .source(element);
      indexer.add(indexRequest);
    }
  }
}
在这个示例中,我们使用 StreamExecutionEnvironment 来执行数据流应用程序。然后,我们定义一个包含数据的 Map 并将其添加到 DataStream 中。最后,我们使用 ElasticsearchSinkBuilder 构建 ElasticsearchSink 并使用 addSink 方法将其添加到 DataStream 中。在自定义 IndexRequestBuilder 中,我们创建 IndexRequest 并将其添加到 RequestIndexer 中,ElasticsearchSink 将 IndexRequest 批量等待并提交到 Elasticsearch 集群中。
FLIP-191 引入了一种新的 Sink API,包括两个可扩展接口:DynamicTableSink 和 DynamicTableSinkFactory。您可以根据需要自定义实现这两个接口。下面是具体的步骤:
1、实现 DynamicTableSinkFactory 接口,这是一个工厂类接口,在 Flink 中使用此接口来动态地创建 DynamicTableSink。
public class MySinkFactory implements DynamicTableSinkFactory {
 
    private static final Logger LOGGER = LoggerFactory.getLogger(MySinkFactory.class);
 
    @Override
    public String factoryIdentifier() {
        return "my-sink";
    }
 
    // ...
}
在此示例中,factoryIdentifier() 方法返回的是 Sink 工厂的唯一标识符。
2、实现 DynamicTableSink 接口,创建自定义的 Sink。
public class MySink implements DynamicTableSink {
 
    private static final Logger LOGGER = LoggerFactory.getLogger(MySink.class);
 
    private final String[] fieldNames;
    private final DataType[] fieldTypes;
    private final String tableName;
    private final String dbName;
    private final String endpoint;
 
    public MySink(String[] fieldNames, DataType[] fieldTypes, String tableName, String dbName, String endpoint) {
        this.fieldNames = fieldNames;
        this.fieldTypes = fieldTypes;
        this.tableName = tableName;
        this.dbName = dbName;
        this.endpoint = endpoint;
    }
 
    @Override
    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
        return ChangelogMode.insertOnly();
    }
 
    // ...
}
在此示例中,我们定义了一个名为 MySink 的自定义 Sink。构造函数中,我们传递了表名、数据库名、终端点和字段名及其数据类型列表。然后,我们实现了 getChangelogMode 方法,返回 ChangelogMode.insertOnly(),表示此 Sink 仅支持插入模式。
3、实现 fromOptions() 方法,在其中解析所需的配置选项,以创建 DynamicTableSink 对象。
public DynamicTableSink createDynamicTableSink(Context context) {
    FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
    ConfigOption<String> tableOpt = helper
            .optional("table")
            .withDescription("Name of the table to write to.")
            .withDefaultFactory(() -> "default_table")
            .create();
    ConfigOption<String> databaseOpt = helper
            .optional("database")
            .withDescription("Name of the database to write to.")
            .withDefaultFactory(() -> "default_database")
            .create();
    ConfigOption<String> endpointOpt = helper
            .required("endpoint")
            .withDescription("Endpoint of the target system.")
            .create();
    String tableName = helper.getOptions().get(tableOpt);
    String dbName = helper.getOptions().get(databaseOpt);
    String endpoint = helper.getOptions().get(endpointOpt);
    return new MySink(context.getCatalogTable().getSchema().getFieldNames(),
        context.getCatalogTable().getSchema().getFieldDataTypes(),
        tableName,
        dbName,
        endpoint);
}
在此示例中,我们使用了 FactoryUtil.TableFactoryHelper,这是一个辅助类,用于帮助解析配置选项和创建 DynamicTableSink 对象。然后,我们定义了三个配置选项:表名、数据库名和终端点。我们从上下文中获取表模式,并将其用作 MySink 构造函数中的参数,以定义 DynamicTableSink 实例。
4、注册 Sink。
MySinkFactory sinkFactory = new MySinkFactory();
TableEnvironment tableEnv = ...
tableEnv.getConfig().addConfiguration(MyConfigUtil.getSinkConfig());
tableEnv.registerTableSink("custom-sink", sinkFactory);
在此示例中,我们创建一个名为 sinkFactory 的自定义 Sink 工厂,并使用 Flink 的 TableEnvironment 将其注册为 custom-sink。我们通过 MyConfigUtil.getSinkConfig() 方法获取的配置对象将配置选项添加到 TableEnvironment 的配置中,以便在之后的查询中使用 Sink。
5、处理并将数据写入自定义 Sink。
java
tableEnv.executeSql("INSERT INTO custom-sink SELECT * FROM myTable
FLIP-191是Flink社区提出的一个提案,旨在为Flink提供一种通用的Sink API,使得用户可以更加方便地定义和使用自定义的Sink。在FLIP-191中,定义了一套通用的Sink API,包括以下几个核心接口:SinkFunction、TwoPhaseCommitSinkFunction、Sink、TwoPhaseCommitSink。
在FLIP-191中,用户可以通过实现上述接口,来定义自己的Sink函数或Sink。
如果你想使用FLIP-191提供的通用Sink API,可以参考Flink社区提供的相关文档和代码实现。在Flink 1.12及以上版本中,flip-191已经被正式引入,并支持自定义Sink。

Flip-191是Flink 1.11版本中引入的新特性,它支持创建自定义的Sink。Flip-191提供了一种简单的方式来开发,部署和管理自定义Sink。在Flip-191中,Sink提供了可配置的属性,使用户可以轻松地定义其Sink的行为。
要自定义Flip-191 Sink,您需要:
创建一个Flip-191 Sink类,该类需要继承自AbstractSink类
实现Sink的核心功能,在open()和invoke()函数中编写代码
提供Sink的配置选项,使用户可以为其Sink设置属性,比如Sink的名称,数据写入速率等
打包JAR文件,并在Flink中注册该Sink
有关如何自定义Flip-191 Sink的更多详细信息,请参阅Flink官方文档。
关于 Flink 中的 Flip-191,它是一个实验性功能,目前还没有被正式发布。其主要用途是将 Flink 的数据流输出到 Elasticsearch、InfluxDB 等第三方系统中。
由于 Flip-191 还处于实验阶段,所以并没有官方的 Demo 示例代码。但是你可以参考 Flink 官方文档中的示例代码和相关说明来自定义 Flip-191 sink。
具体来说,自定义 Flip-191 sink 需要实现 ElasticsearchSinkFunction 或者 InfluxDBSinkFunction 接口,并使用相应的构造器初始化一个 ElasticsearchSink 或者 InfluxDBSink 对象。然后,将该对象传递给 Flink 的 addSink 方法即可。
FLIP-191 是 Flink 中关于自定义 Sink 的一个 FLIP(Flink Improvement Proposal),该 FLIP 引入了一个新的 Sink API,使得用户可以更方便地自定义 Sink。下面是一个简单的自定义 Sink 的示例,您可以参考一下:
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.ExceptionUtils;
import javax.annotation.Nullable;
import java.util.Properties;
public class MyKafkaSink<T> extends TwoPhaseCommitSinkFunction<T, FlinkKafkaProducer<T>> {
    private FlinkKafkaProducer<T> producer;
    public MyKafkaSink(String topic, Properties producerConfig, SerializationSchema<T> serializationSchema) {
        super(new FlinkKafkaProducer<>(topic, serializationSchema, producerConfig));
        this.producer = getTransactionalResource();
    }
    @Override
    protected void invoke(FlinkKafkaProducer<T> producer, T value, Context context) throws Exception {
        producer.produce(value);
    }
    @Nullable
    @Override
    protected FlinkKafkaProducer<T> beginTransaction() throws Exception {
        return new FlinkKafkaProducer<>(producer.getTopic(), producer.getSerializationSchema(), producer.getProducersProperties());
    }
    @Override
    protected void preCommit(FlinkKafkaProducer<T> kafkaProducer) throws Exception {
        // nothing to do
    }
    @Override
    protected void commit(FlinkKafkaProducer<T> kafkaProducer) {
        // nothing to do
    }
    @Override
    protected void abort(FlinkKafkaProducer<T> kafkaProducer) {
        try {
            kafkaProducer.close();
        } catch (Exception e) {
            ExceptionUtils.rethrow(e);
        }
    }
}
在上述示例中,我们创建了一个 MyKafkaSink,继承自 TwoPhaseCommitSinkFunction,用于将数据写入 Kafka。MyKafkaSink 的构造函数接受三个参数:Kafka 主题名称、Kafka 生产者配置和序列化模式。在 MyKafkaSink 中,我们重写了 invoke 方法,并在其中调用了 FlinkKafkaProducer 的 produce 方法将数据写入 Kafka。我们还重写了 beginTransaction 方法,该方法返回一个新的 FlinkKafkaProducer 对象,用于执行事务的第一阶段。我们还重写了 preCommit、commit 和 abort 方法,这些方法用于在事务的第二阶段执行预提交、提交和中止操作。 要使用 MyKafkaSink,您可以按照以下步骤进行:
将上述代码复制到您的项目中,并根据需要进行更改。
创建一个 Kafka 生产者配置对象,例如:
Properties producerConfig = new Properties();
producerConfig.setProperty("bootstrap.servers", "localhost:9092");
SerializationSchema<String> serializationSchema = new SimpleStringSchema();
CREATE TABLE mytable (
  ...
) WITH (
  'connector' = '...',
  'sink' = 'MyKafkaSink',
  'topic' = 'mytopic',
  'producerConfig' = '{"bootstrap.servers": "localhost:9092"}',
  'serializationSchema' = 'org.apache.flink.api.common.serialization.SimpleStringSchema'
)
在上述示例中,我们使用了自定义的 Sink MyKafkaSink,并通过 topic、producerConfig 和 serializationSchema 参数进行配置。在 Flink SQL 中,您需要将 Sink 的类名作为 'sink' 参数的值,并将 Sink 的构造函数参数作为其他参数的值。在本例中,'topic' 参数对应 Kafka 主题名称,'producerConfig' 参数对应 Kafka 生产者配置,'serializationSchema' 参数对应序列化模式。
您好,Flip-191是一个Flink社区提出的关于Sink API的改进计划,目的是为了更好地支持用户自定义Sink。具体的内容可以参考这个JIRA issue:https://issues.apache.org/jira/browse/FLINK-191。
在官方文档中,也提供了有关如何使用自定义Sink的一些说明:https://ci.apache.org/projects/flink/flink-docs-release-1.13/dev/stream/sink.html#using-user-defined-sinks。
如果您需要具体的示例代码,可以在Github上搜索Flip-191的相关项目,或者参考社区中其他开发者的代码实现。
在 Flink 1.12 及以上版本中,flip-191 已经被正式引入,并支持自定义 Sink。您可以参考以下步骤实现自定义 Sink:
实现 SinkFunction 接口 首先,您需要实现 SinkFunction 接口,该接口是所有 Sink 的基础接口。例如,以下代码展示了如何实现一个简单的自定义 Sink:
public class MySink implements SinkFunction {
@Override
public void invoke(String value, Context context) throws Exception {
    System.out.println("Received: " + value);
}
} 上述 MySink 类实现了 SinkFunction 接口,并在 invoke() 方法中输出接收到的数据。
配置自定义 Sink 接下来,您需要在 Flink 程序中配置自定义 Sink。使用 OutputBuilder 对象来构建自定义 Sink。例如,以下代码演示了如何通过 OutputBuilder 对象来创建一个自定义 Sink:
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.table.factories.StreamTableSinkFactory; import org.apache.flink.table.types.DataType; import org.apache.flink.types.Row;
import java.util.ArrayList; import java.util.List; import java.util.Map;
public class MySinkFactory implements StreamTableSinkFactory {
@Override
public Map<String, String> requiredContext() {
    return null;
}
@Override
public List<String> supportedProperties() {
    List<String> properties = new ArrayList<>();
    properties.add("path");
    properties.add("format");
    return properties;
}
@Override
public StreamTableSink<Row> createStreamTableSink(Map<String, String> properties) {
    String path = properties.get("path");
    String format = properties.get("format");
    StreamingFileSink<Row> sink = StreamingFileSink.forRowFormat(new Path(path), new SimpleStringEncoder<Row>())
            .withBucketAssigner(new EventTimeBucketAssigner())
            .build();
    return new MyStreamTableSink(sink);
}
@Override
public StreamTableSink<Row> createStreamTableSink(Map<String, String> map, DataType dataType) {
    return createStreamTableSink(map);
}
} 在上述代码中,我们实现了 StreamTableSinkFactory 接口,并覆盖了 createStreamTableSink() 方法。该方法通过 OutputBuilder 对象来创建一个自定义 Sink。
需要注意的是,为了支持 Flink 的新版 Sink API,您需要将项目的依赖版本升级至 Flink 1.12 及以上版本。在 Flink 1.12 中,新版 Sink API 已经被引入,可以更方便地实现自定义 Sink。
建议通过阅读 Flink 官方文档来深入理解自定义 Sink 和适配器的使用方法和注意事项。
如果您想了解如何自定义 Flink 的 Sink,可以参考 Flink 官方文档中的 “Custom Sinks” 章节。另外,FLIP-191 介绍了 Flink 新增的 Datastream API 扩展,包括了自定义 Sink 这一功能。
在 FLIP-191 中,Flink 将 DataStream API 扩展为可以实现任意类型的数据写出。其中,开发者需要实现一个 SinkFunctionV2 接口,该接口提供了三个方法:open(),invoke() 和 close(),分别对应着开启资源、写入数据和关闭资源的操作。示例代码如下所示:
public class CustomSink implements SinkFunctionV2<Tuple2<Integer, String>> {
    private String outputPath;
    private FileSystem fileSystem;
    private FSDataOutputStream fsDataOutputStream;
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        // 初始化输出路径
        outputPath = parameters.get("output.path");
        // 初始化文件系统和输出流
        fileSystem = FileSystem.get(new URI(outputPath), new Configuration());
        fsDataOutputStream = fileSystem.create(new Path(outputPath),
                FileSystem.WriteMode.OVERWRITE);
    }
    @Override
    public void invoke(Tuple2<Integer, String> value, SinkFunction.Context context) throws Exception {
        // 写出数据到文件中
        String message = value.f0 + "," + value.f1 + "\n";
        fsDataOutputStream.write(message.getBytes());
    }
    @Override
    public void close() throws Exception {
        super.close();
        // 关闭输出流和文件系统
        if (fsDataOutputStream != null)
            fsDataOutputStream.close();
        if (fileSystem != null)
            fileSystem.close();
    }
}
在自定义的 CustomSink 中,我们继承了 SinkFunctionV2 接口,并实现了 open(),invoke() 和 close() 三个方法。其中,open() 方法用于初始化输出路径、文件系统和输出流等资源,invoke() 方法用于实际的数据写出操作,close() 方法用于关闭资源。
需要注意的是,为了在 Flink 中成功使用自定义的 Sink,您需要在执行程序时将自定义 Sink 与 DataStream 通过 addSink() 方法进行绑定,例如:
DataStream<Tuple2<Integer, String>> stream = env.fromElements(
        Tuple2.of(1, "Hello"),
        Tuple2.of(2, "World"),
        Tuple2.of(3, "Flink")
);
stream.addSink(new CustomSink())
        .setParallelism(1);
env.execute("Custom Sink demo");
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。