开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

有人有flip-191 sink的Demo吗。或者哪里可以看具体怎么自定义

有人有flip-191 sink的Demo吗。或者哪里可以看具体怎么自定义

展开
收起
游客6vdkhpqtie2h2 2022-09-23 10:32:16 521 0
10 条回答
写回答
取消 提交回答
  • FLIP-191 是 Apache Flink 中关于使用新版 TableSink API 的一个 Feature,它提供了一种新的 TableSink 编写方法,以便更好地支持批处理和增量处理的组合使用。如果您想了解如何自定义 FLIP-191 TableSink,请参考下面的示例:

    public abstract class MyTableSink implements AppendStreamTableSink<Row>, BatchTableSink<Row> {
    
        /**
         * 构造函数
         */
        protected MyTableSink() {}
    
        /**
         * 从 TableSink 获取基本类型信息
         *
         * @return 基本数据类型信息
         */
        @Override
        public TypeInformation<Row> getOutputType() {
            return Types.ROW(Types.STRING, Types.INT, Types.LONG);
        }
    
        /**
         * 构建追加流处理器
         *
         * @return 追加流处理器
         */
        protected abstract AppendStreamTableSink<Row> createAppendStreamTableSink();
    
        /**
         * 构建批处理器
         *
         * @param rowDataSet 批处理数据集
         */
        protected abstract void consumeDataSet(DataSet<Row> rowDataSet);
    
        /**
         * 获取追加流 TableSink
         *
         * @param strings 字段名称
         * @param types 字段类型
         * @return 追加流 TableSink
         */
        @Override
        public AppendStreamTableSink<Row> configure(String[] strings, TypeInformation<?>[] types) {
            return createAppendStreamTableSink();
        }
    
        /**
         * 处理追加流数据
         *
         * @param dataStream 数据流
         * @param tableSchema 表模式
         * @return TableSink 转化结果
         */
        @Override
        public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream, TableSchema tableSchema) {
            return createAppendStreamTableSink().consumeDataStream(dataStream, tableSchema);
        }
    
        /**
         * 处理批处理数据
         *
         * @param dataSet 批处理数据集
         * @param tableSchema 表模式
         */
        @Override
        public void consumeDataSet(DataSet<Row> dataSet, TableSchema tableSchema) {
            consumeDataSet(dataSet);
        }
    
        /**
         * 获取批处理器的返回类型集合
         *
         * @return 批处理器的返回类型集合
         */
        @Override
        public TypeInformation<Row> getOutputType(TableSchema tableSchema) {
            return Types.ROW(Types.STRING, Types.INT, Types.LONG);
        }
    
        /**
         * 获取输出格式器的名称
         *
         * @return 输出格式器的名称
         */
        @Override
        public String getOutputFormatName() {
            return getClass().getName();
        }
    }
    

    在这个示例中,我们定义了一个基于 AppendStreamTableSinkBatchTableSink 接口的抽象 TableSink 类 MyTableSink。该类中实现了 getOutputType()getOutputFormatName()configure() 和不同 TableSink 操作需要的 createAppendStreamTableSink()consumeDataSet()。您可以根据各种任务,重写这些操作。

    接下来,您可以创建一个类(例如 MySimpleTableSink 类),来继承 MyTableSink 类,并实现它中定义的各种操作。例如,下面是一个简单的 MySimpleTableSink 示例:

    public class MySimpleTableSink extends MyTableSink {
    
        private String tableName;
    
        public MySimpleTableSink(String tableName) {
            this.tableName = tableName;
        }
    
        /**
         * 构建追加流处理器
         *
         * @return 追加流处理器
         */
        @Override
        protected AppendStreamTableSink<Row> createAppendStreamTableSink() {
            return new AppendStreamTableSink<Row>() {
                @Override
                public TableSchema getTableSchema() {
                    return TableSchema.builder().fields(
                            new String[]{"name", "age", "timestamp"},
                            new TypeInformation[]{Types.STRING, Types.INT, Types.LONG}).build();
                }
    
                @Override
                public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
                    // 输入数据流的处理
                    return dataStream.addSink(
                            new FlinkKafkaProducer<>(topicName, new SimpleStringSchema(), properties));
                }
            };
        }
    
        /**
         * 构建批处理器
    2023-05-06 10:13:26
    赞同 展开评论 打赏
  • 公众号:网络技术联盟站,InfoQ签约作者,阿里云社区签约作者,华为云 云享专家,BOSS直聘 创作王者,腾讯课堂创作领航员,博客+论坛:https://www.wljslmz.cn,工程师导航:https://www.wljslmz.com

    在 Flink 1.9.1 版本中,提供了一个新的 Elasticsearch Sink,可以将数据写入 Elasticsearch 中。您可以通过以下方式使用 Elasticsearch Sink:

    1. 添加依赖:请确保在 Flink 应用程序中添加以下依赖项:
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    
    1. 配置连接:
    # Elasticsearch 集群地址,多个地址用逗号分隔
    rest.addresses=127.0.0.1:9200
    
    # Elasticsearch 索引名称
    rest.index=example_index
    
    # Elasticsearch 类型名称
    rest.document-type=example_doc_type
    
    # 每批次发送的最大记录数
    bulk.flush.max-size=100
    
    # 每个批次的最长等待时间
    bulk.flush.max-actions=10
    
    # 是否开启失败重试,默认开启
    bulk.flush.backoff.enable=true
    
    # 重试等待时间
    bulk.flush.backoff.delay=3000
    
    # 重试次数
    bulk.flush.backoff.max-attempts=5
    
    1. 创建 Elasticsearch Sink:使用如下方式创建 Elasticsearch Sink 实例:
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;
    import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkBuilder;
    import org.apache.flink.api.common.functions.RuntimeContext;
    import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
    import org.elasticsearch.action.index.IndexRequest;
    import org.elasticsearch.client.Requests;
    import java.util.HashMap;
    import java.util.Map;
    
    public class ElasticsearchSinkExample {
    
      public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
        // 数据源
        DataStream<Map<String, Object>> events = env.fromElements(
            exampleMap("A", 1), exampleMap("B", 2), exampleMap("A", 3), exampleMap("C", 4), exampleMap("B", 5)
        );
    
        // Elasticsearch Sink
        ElasticsearchSink.Builder<Map<String, Object>> esSinkBuilder = new ElasticsearchSink.Builder<>(
          RestClientFactoryUtil.getRestClientFactory(restConfig), // 自定义 RestClientFactory
          new CustomIndexRequestBuilder() // 自定义 IndexRequestBuilder
        );
    
        events.addSink(esSinkBuilder.build());
    
        env.execute("Elasticsearch Sink Example");
      }
    
      public static Map<String, Object> exampleMap(String name, int age) {
        Map<String, Object> map = new HashMap<>();
        map.put("name", name);
        map.put("age", age);
        return map;
      }
    
      public static class CustomIndexRequestBuilder implements ElasticsearchSinkFunction<Map<String, Object>> {
    
        @Override
        public void process(Map<String, Object> element, RuntimeContext ctx, RequestIndexer indexer) {
          IndexRequest indexRequest = Requests.indexRequest()
              .index("example_index")
              .type("example_doc_type")
              .source(element);
          indexer.add(indexRequest);
        }
      }
    }
    

    在这个示例中,我们使用 StreamExecutionEnvironment 来执行数据流应用程序。然后,我们定义一个包含数据的 Map 并将其添加到 DataStream 中。最后,我们使用 ElasticsearchSinkBuilder 构建 ElasticsearchSink 并使用 addSink 方法将其添加到 DataStream 中。在自定义 IndexRequestBuilder 中,我们创建 IndexRequest 并将其添加到 RequestIndexer 中,ElasticsearchSink 将 IndexRequest 批量等待并提交到 Elasticsearch 集群中。

    2023-05-05 20:33:27
    赞同 展开评论 打赏
  • FLIP-191 引入了一种新的 Sink API,包括两个可扩展接口:DynamicTableSink 和 DynamicTableSinkFactory。您可以根据需要自定义实现这两个接口。下面是具体的步骤:

    1、实现 DynamicTableSinkFactory 接口,这是一个工厂类接口,在 Flink 中使用此接口来动态地创建 DynamicTableSink。

    public class MySinkFactory implements DynamicTableSinkFactory {
     
        private static final Logger LOGGER = LoggerFactory.getLogger(MySinkFactory.class);
     
        @Override
        public String factoryIdentifier() {
            return "my-sink";
        }
     
        // ...
    
    }
    

    在此示例中,factoryIdentifier() 方法返回的是 Sink 工厂的唯一标识符。

    2、实现 DynamicTableSink 接口,创建自定义的 Sink。

    public class MySink implements DynamicTableSink {
     
        private static final Logger LOGGER = LoggerFactory.getLogger(MySink.class);
     
        private final String[] fieldNames;
        private final DataType[] fieldTypes;
        private final String tableName;
        private final String dbName;
        private final String endpoint;
     
        public MySink(String[] fieldNames, DataType[] fieldTypes, String tableName, String dbName, String endpoint) {
            this.fieldNames = fieldNames;
            this.fieldTypes = fieldTypes;
            this.tableName = tableName;
            this.dbName = dbName;
            this.endpoint = endpoint;
        }
     
        @Override
        public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
            return ChangelogMode.insertOnly();
        }
     
        // ...
    }
    
    

    在此示例中,我们定义了一个名为 MySink 的自定义 Sink。构造函数中,我们传递了表名、数据库名、终端点和字段名及其数据类型列表。然后,我们实现了 getChangelogMode 方法,返回 ChangelogMode.insertOnly(),表示此 Sink 仅支持插入模式。

    3、实现 fromOptions() 方法,在其中解析所需的配置选项,以创建 DynamicTableSink 对象。

    public DynamicTableSink createDynamicTableSink(Context context) {
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
        ConfigOption<String> tableOpt = helper
                .optional("table")
                .withDescription("Name of the table to write to.")
                .withDefaultFactory(() -> "default_table")
                .create();
        ConfigOption<String> databaseOpt = helper
                .optional("database")
                .withDescription("Name of the database to write to.")
                .withDefaultFactory(() -> "default_database")
                .create();
        ConfigOption<String> endpointOpt = helper
                .required("endpoint")
                .withDescription("Endpoint of the target system.")
                .create();
        String tableName = helper.getOptions().get(tableOpt);
        String dbName = helper.getOptions().get(databaseOpt);
        String endpoint = helper.getOptions().get(endpointOpt);
        return new MySink(context.getCatalogTable().getSchema().getFieldNames(),
            context.getCatalogTable().getSchema().getFieldDataTypes(),
            tableName,
            dbName,
            endpoint);
    }
    

    在此示例中,我们使用了 FactoryUtil.TableFactoryHelper,这是一个辅助类,用于帮助解析配置选项和创建 DynamicTableSink 对象。然后,我们定义了三个配置选项:表名、数据库名和终端点。我们从上下文中获取表模式,并将其用作 MySink 构造函数中的参数,以定义 DynamicTableSink 实例。

    4、注册 Sink。

    MySinkFactory sinkFactory = new MySinkFactory();
    TableEnvironment tableEnv = ...
    tableEnv.getConfig().addConfiguration(MyConfigUtil.getSinkConfig());
    tableEnv.registerTableSink("custom-sink", sinkFactory);
    
    

    在此示例中,我们创建一个名为 sinkFactory 的自定义 Sink 工厂,并使用 Flink 的 TableEnvironment 将其注册为 custom-sink。我们通过 MyConfigUtil.getSinkConfig() 方法获取的配置对象将配置选项添加到 TableEnvironment 的配置中,以便在之后的查询中使用 Sink。

    5、处理并将数据写入自定义 Sink。

    java
    tableEnv.executeSql("INSERT INTO custom-sink SELECT * FROM myTable
    
    2023-05-02 07:52:37
    赞同 展开评论 打赏
  • 云端行者觅知音, 技术前沿我独行。 前言探索无边界, 阿里风光引我情。

    FLIP-191是Flink社区提出的一个提案,旨在为Flink提供一种通用的Sink API,使得用户可以更加方便地定义和使用自定义的Sink。在FLIP-191中,定义了一套通用的Sink API,包括以下几个核心接口:SinkFunction、TwoPhaseCommitSinkFunction、Sink、TwoPhaseCommitSink。

    在FLIP-191中,用户可以通过实现上述接口,来定义自己的Sink函数或Sink。

    如果你想使用FLIP-191提供的通用Sink API,可以参考Flink社区提供的相关文档和代码实现。在Flink 1.12及以上版本中,flip-191已经被正式引入,并支持自定义Sink。

    image.png

    2023-04-27 18:21:21
    赞同 展开评论 打赏
  • 值得去的地方都没有捷径

    Flip-191是Flink 1.11版本中引入的新特性,它支持创建自定义的Sink。Flip-191提供了一种简单的方式来开发,部署和管理自定义Sink。在Flip-191中,Sink提供了可配置的属性,使用户可以轻松地定义其Sink的行为。

    要自定义Flip-191 Sink,您需要:

    创建一个Flip-191 Sink类,该类需要继承自AbstractSink类

    实现Sink的核心功能,在open()和invoke()函数中编写代码

    提供Sink的配置选项,使用户可以为其Sink设置属性,比如Sink的名称,数据写入速率等

    打包JAR文件,并在Flink中注册该Sink

    有关如何自定义Flip-191 Sink的更多详细信息,请参阅Flink官方文档。

    2023-04-26 12:35:51
    赞同 展开评论 打赏
  • 天下风云出我辈,一入江湖岁月催,皇图霸业谈笑中,不胜人生一场醉。

    关于 Flink 中的 Flip-191,它是一个实验性功能,目前还没有被正式发布。其主要用途是将 Flink 的数据流输出到 Elasticsearch、InfluxDB 等第三方系统中。

    由于 Flip-191 还处于实验阶段,所以并没有官方的 Demo 示例代码。但是你可以参考 Flink 官方文档中的示例代码和相关说明来自定义 Flip-191 sink。

    具体来说,自定义 Flip-191 sink 需要实现 ElasticsearchSinkFunction 或者 InfluxDBSinkFunction 接口,并使用相应的构造器初始化一个 ElasticsearchSink 或者 InfluxDBSink 对象。然后,将该对象传递给 Flink 的 addSink 方法即可。

    2023-04-26 09:25:41
    赞同 展开评论 打赏
  • FLIP-191 是 Flink 中关于自定义 Sink 的一个 FLIP(Flink Improvement Proposal),该 FLIP 引入了一个新的 Sink API,使得用户可以更方便地自定义 Sink。下面是一个简单的自定义 Sink 的示例,您可以参考一下:

    import org.apache.flink.api.common.serialization.SerializationSchema;
    import org.apache.flink.streaming.api.functions.sink.SinkFunction;
    import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
    import org.apache.flink.util.ExceptionUtils;
    import javax.annotation.Nullable;
    import java.util.Properties;
    public class MyKafkaSink<T> extends TwoPhaseCommitSinkFunction<T, FlinkKafkaProducer<T>> {
        private FlinkKafkaProducer<T> producer;
        public MyKafkaSink(String topic, Properties producerConfig, SerializationSchema<T> serializationSchema) {
            super(new FlinkKafkaProducer<>(topic, serializationSchema, producerConfig));
            this.producer = getTransactionalResource();
        }
        @Override
        protected void invoke(FlinkKafkaProducer<T> producer, T value, Context context) throws Exception {
            producer.produce(value);
        }
        @Nullable
        @Override
        protected FlinkKafkaProducer<T> beginTransaction() throws Exception {
            return new FlinkKafkaProducer<>(producer.getTopic(), producer.getSerializationSchema(), producer.getProducersProperties());
        }
        @Override
        protected void preCommit(FlinkKafkaProducer<T> kafkaProducer) throws Exception {
            // nothing to do
        }
        @Override
        protected void commit(FlinkKafkaProducer<T> kafkaProducer) {
            // nothing to do
        }
        @Override
        protected void abort(FlinkKafkaProducer<T> kafkaProducer) {
            try {
                kafkaProducer.close();
            } catch (Exception e) {
                ExceptionUtils.rethrow(e);
            }
        }
    }
    

    在上述示例中,我们创建了一个 MyKafkaSink,继承自 TwoPhaseCommitSinkFunction,用于将数据写入 Kafka。MyKafkaSink 的构造函数接受三个参数:Kafka 主题名称、Kafka 生产者配置和序列化模式。在 MyKafkaSink 中,我们重写了 invoke 方法,并在其中调用了 FlinkKafkaProducer 的 produce 方法将数据写入 Kafka。我们还重写了 beginTransaction 方法,该方法返回一个新的 FlinkKafkaProducer 对象,用于执行事务的第一阶段。我们还重写了 preCommit、commit 和 abort 方法,这些方法用于在事务的第二阶段执行预提交、提交和中止操作。 要使用 MyKafkaSink,您可以按照以下步骤进行:

    1. 将上述代码复制到您的项目中,并根据需要进行更改。

    2. 创建一个 Kafka 生产者配置对象,例如:

    Properties producerConfig = new Properties();
    producerConfig.setProperty("bootstrap.servers", "localhost:9092");
    
    1. 创建一个 SerializationSchema 对象,例如:
    SerializationSchema<String> serializationSchema = new SimpleStringSchema();
    
    1. 在 Flink SQL 中,使用以下 SQL 语句创建一个表并将自定义的 Sink 应用到该表中:
    CREATE TABLE mytable (
      ...
    ) WITH (
      'connector' = '...',
      'sink' = 'MyKafkaSink',
      'topic' = 'mytopic',
      'producerConfig' = '{"bootstrap.servers": "localhost:9092"}',
      'serializationSchema' = 'org.apache.flink.api.common.serialization.SimpleStringSchema'
    )
    

    在上述示例中,我们使用了自定义的 Sink MyKafkaSink,并通过 topic、producerConfig 和 serializationSchema 参数进行配置。在 Flink SQL 中,您需要将 Sink 的类名作为 'sink' 参数的值,并将 Sink 的构造函数参数作为其他参数的值。在本例中,'topic' 参数对应 Kafka 主题名称,'producerConfig' 参数对应 Kafka 生产者配置,'serializationSchema' 参数对应序列化模式。

    2023-04-24 14:03:21
    赞同 展开评论 打赏
  • 全栈JAVA领域创作者

    您好,Flip-191是一个Flink社区提出的关于Sink API的改进计划,目的是为了更好地支持用户自定义Sink。具体的内容可以参考这个JIRA issue:https://issues.apache.org/jira/browse/FLINK-191。

    在官方文档中,也提供了有关如何使用自定义Sink的一些说明:https://ci.apache.org/projects/flink/flink-docs-release-1.13/dev/stream/sink.html#using-user-defined-sinks。

    如果您需要具体的示例代码,可以在Github上搜索Flip-191的相关项目,或者参考社区中其他开发者的代码实现。

    2023-04-23 22:23:52
    赞同 展开评论 打赏
  • 热爱开发

    在 Flink 1.12 及以上版本中,flip-191 已经被正式引入,并支持自定义 Sink。您可以参考以下步骤实现自定义 Sink:

    实现 SinkFunction 接口 首先,您需要实现 SinkFunction 接口,该接口是所有 Sink 的基础接口。例如,以下代码展示了如何实现一个简单的自定义 Sink:

    public class MySink implements SinkFunction {

    @Override
    public void invoke(String value, Context context) throws Exception {
        System.out.println("Received: " + value);
    }
    

    } 上述 MySink 类实现了 SinkFunction 接口,并在 invoke() 方法中输出接收到的数据。

    配置自定义 Sink 接下来,您需要在 Flink 程序中配置自定义 Sink。使用 OutputBuilder 对象来构建自定义 Sink。例如,以下代码演示了如何通过 OutputBuilder 对象来创建一个自定义 Sink:

    import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.table.factories.StreamTableSinkFactory; import org.apache.flink.table.types.DataType; import org.apache.flink.types.Row;

    import java.util.ArrayList; import java.util.List; import java.util.Map;

    public class MySinkFactory implements StreamTableSinkFactory {

    @Override
    public Map<String, String> requiredContext() {
        return null;
    }
    
    @Override
    public List<String> supportedProperties() {
        List<String> properties = new ArrayList<>();
        properties.add("path");
        properties.add("format");
        return properties;
    }
    
    @Override
    public StreamTableSink<Row> createStreamTableSink(Map<String, String> properties) {
        String path = properties.get("path");
        String format = properties.get("format");
    
        StreamingFileSink<Row> sink = StreamingFileSink.forRowFormat(new Path(path), new SimpleStringEncoder<Row>())
                .withBucketAssigner(new EventTimeBucketAssigner())
                .build();
    
        return new MyStreamTableSink(sink);
    }
    
    @Override
    public StreamTableSink<Row> createStreamTableSink(Map<String, String> map, DataType dataType) {
        return createStreamTableSink(map);
    }
    

    } 在上述代码中,我们实现了 StreamTableSinkFactory 接口,并覆盖了 createStreamTableSink() 方法。该方法通过 OutputBuilder 对象来创建一个自定义 Sink。

    需要注意的是,为了支持 Flink 的新版 Sink API,您需要将项目的依赖版本升级至 Flink 1.12 及以上版本。在 Flink 1.12 中,新版 Sink API 已经被引入,可以更方便地实现自定义 Sink。

    建议通过阅读 Flink 官方文档来深入理解自定义 Sink 和适配器的使用方法和注意事项。

    2023-04-23 17:32:28
    赞同 展开评论 打赏
  • 如果您想了解如何自定义 Flink 的 Sink,可以参考 Flink 官方文档中的 “Custom Sinks” 章节。另外,FLIP-191 介绍了 Flink 新增的 Datastream API 扩展,包括了自定义 Sink 这一功能。

    在 FLIP-191 中,Flink 将 DataStream API 扩展为可以实现任意类型的数据写出。其中,开发者需要实现一个 SinkFunctionV2 接口,该接口提供了三个方法:open()invoke()close(),分别对应着开启资源、写入数据和关闭资源的操作。示例代码如下所示:

    public class CustomSink implements SinkFunctionV2<Tuple2<Integer, String>> {
    
        private String outputPath;
        private FileSystem fileSystem;
        private FSDataOutputStream fsDataOutputStream;
    
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
    
            // 初始化输出路径
            outputPath = parameters.get("output.path");
    
            // 初始化文件系统和输出流
            fileSystem = FileSystem.get(new URI(outputPath), new Configuration());
            fsDataOutputStream = fileSystem.create(new Path(outputPath),
                    FileSystem.WriteMode.OVERWRITE);
        }
    
        @Override
        public void invoke(Tuple2<Integer, String> value, SinkFunction.Context context) throws Exception {
            // 写出数据到文件中
            String message = value.f0 + "," + value.f1 + "\n";
            fsDataOutputStream.write(message.getBytes());
        }
    
        @Override
        public void close() throws Exception {
            super.close();
    
            // 关闭输出流和文件系统
            if (fsDataOutputStream != null)
                fsDataOutputStream.close();
            if (fileSystem != null)
                fileSystem.close();
        }
    }
    

    在自定义的 CustomSink 中,我们继承了 SinkFunctionV2 接口,并实现了 open()invoke()close() 三个方法。其中,open() 方法用于初始化输出路径、文件系统和输出流等资源,invoke() 方法用于实际的数据写出操作,close() 方法用于关闭资源。

    需要注意的是,为了在 Flink 中成功使用自定义的 Sink,您需要在执行程序时将自定义 Sink 与 DataStream 通过 addSink() 方法进行绑定,例如:

    DataStream<Tuple2<Integer, String>> stream = env.fromElements(
            Tuple2.of(1, "Hello"),
            Tuple2.of(2, "World"),
            Tuple2.of(3, "Flink")
    );
    
    stream.addSink(new CustomSink())
            .setParallelism(1);
    
    env.execute("Custom Sink demo");
    
    2023-04-23 17:09:16
    赞同 展开评论 打赏
滑动查看更多

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

热门讨论

热门文章

相关电子书

更多
Data Flow&StreamingFundamental 立即下载
Sparksheet - Transforming Spreadsheets into Spark Data Frames 立即下载
BUILDING REALTIME DATA PIPELINES WITH KAFKA CONNECT AND SPARK STREAMING 立即下载