开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

请教下,finksql 这个statettl 哪里设置

image.png

展开
收起
游客6vdkhpqtie2h2 2022-09-23 10:19:39 1564 0
12 条回答
写回答
取消 提交回答
  • Flink SQL 中的 State TTL 可以在创建表时指定。State TTL 是指定状态的时效性,即指定状态可以存活的最长时间,超过这个时间,状态将被自动删除。一般的,状态存储时间的设定要结合业务需求及数据量大小等因素进行设置,以避免状态无限制的占用系统资源。

    下面是一个在 Flink SQL 中创建带有 State TTL 的表的例子:

    CREATE TABLE myTable (
        id BIGINT,
        name STRING,
        event_time TIMESTAMP(3),
        PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
        'connector.type' = 'kafka',
        'connector.topic' = 'my-topic',
        'connector.properties.bootstrap.servers' = 'localhost:9092',
        'connector.startup-mode' = 'earliest-offset',
        'format.type' = 'json',
        'json.timestamp-format.standard' = 'ISO-8601',
        'json.timestamp-format.string' = 'yyyy-MM-dd HH:mm:ss',
        'kafka.partition-discovery.interval-millis' = '1000',
        'state.ttl' = '1 HOUR'  --在这里设置 State TTL 的值,单位为小时
    );
    

    在上述代码中,在 WITH 子句中使用了 state.ttl 参数来设置 State TTL 的值,该值被设置为一个小时。

    2023-05-06 10:10:24
    赞同 展开评论 打赏
  • 公众号:网络技术联盟站,InfoQ签约作者,阿里云社区签约作者,华为云 云享专家,BOSS直聘 创作王者,腾讯课堂创作领航员,博客+论坛:https://www.wljslmz.cn,工程师导航:https://www.wljslmz.com

    在 Flink SQL 中,使用 TUMBLEHOPSESSION 窗口函数来实现窗口聚合操作。这些窗口函数都支持定义 WINDOW_TYPEWINDOW_SIZE 等参数,可以用来设置窗口长度、滑动步长、时间戳字段等信息。

    针对状态过期(State TTL),Flink 提供了一些配置项来控制状态的过期和自动清理。在 Flink SQL 中,可以通过在 CREATE TABLE 语句中使用 WITH 子句来配置状态过期。如下所示:

    CREATE TABLE my_table (
      name STRING,
      score INT,
      event_time TIMESTAMP(3),
      WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
    ) WITH (
      'connector.type' = 'kafka',
      'connector.topic' = 'input',
      'connector.version' = 'universal',
      'connector.properties.bootstrap.servers' = 'localhost:9092',
      'update-mode' = 'append',
      'connector.startup-mode' = 'latest-offset',
      'format.type' = 'json'
    ) WITH (
      'state.ttl.ms' = '3600000' -- 设置状态过期时间为 1 小时
    );
    

    在上述示例中,我们通过在 WITH 子句中使用 state.ttl.ms 参数来设置状态过期时间为 1 小时(即 3600000 毫秒)。一旦某个键值对状态超过了该时间,Flink 就会自动清理它们。

    除了 state.ttl.ms 参数之外,还可以使用其他配置项来控制状态过期行为。例如:

    • state.ttl.enabled: 是否启用状态 TTL,默认为 false。
    • state.ttl.cleanup-strategy: 状态清理策略,可以是 JOB(当整个作业终止时清理状态)或 KEEP_LATEST(只保留最近的状态),默认为 KEEP_LATEST
    • state.ttl.max-timestamp-diff: 允许的水印和处理时间之间的最大时间差,默认为 0 毫秒。
    • state.ttl.write-timestamps-to-chaos: 是否将状态到期事件写入混沌日志,以便进行可用性测试。

    更多关于 Flink SQL 中状态过期的配置信息可以参考 Flink 官方文档中的相关部分:https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/config/#state-ttl-config。

    2023-05-05 20:33:27
    赞同 展开评论 打赏
  • 从事java行业9年至今,热爱技术,热爱以博文记录日常工作,csdn博主,座右铭是:让技术不再枯燥,让每一位技术人爱上技术

    Flink State是有状态计算中,用来保存中间结果或者缓存数据,并提供Exactly-Once语义。在大部分场景下,基于自适应调参功能,GeminiStateBackend会自动调整参数而不需要您手动配置。 在希望协调内存资源和性能时,请使用内存配置,详情请参见内存配置。 在遇到本地盘空间不足的问题时,请使用存算分离配置,详情请参见存算分离配置。 在遇到Join算子有性能瓶颈时,请使用KV分离配置,详情请参见KV分离配置。 基本配置如下图 image.png

    2023-05-04 16:03:02
    赞同 展开评论 打赏
  • 在 Flink SQL 中,可以使用 TUMBLE、HOP 和 SESSION 等窗口函数来定义窗口的大小和滑动区间,从而控制 State TTL。可以使用以下语法来设置 State TTL:

    CREATE TABLE myTable (
      ...
    ) WITH (
      'connector' = '...',
      ...
      'state.ttl' = '1h'  -- 设置状态 TTL 时间
    )
    

    其中,state.ttl 参数的值可以是以秒、分钟、小时、天等为单位的时间长度。例如,'state.ttl' = '3600s' 表示状态 TTL 时间为 1 小时。

    2023-05-02 07:52:36
    赞同 展开评论 打赏
  • 云端行者觅知音, 技术前沿我独行。 前言探索无边界, 阿里风光引我情。

    在FlinkSQL中,可以使用WITH子句来设置state.ttl参数,以控制状态数据的过期时间。state.ttl参数用于设置状态TTL(Time-To-Live)的过期时间,以避免状态数据过多导致内存溢出。

    举一个简单例子:

    CREATE TABLE my_table ( id INT, name STRING, age INT, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'topic' = 'my_topic', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'my_group', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true', 'state.ttl' = '1h' -- 设置状态 TTL 为 1 小时 );

    在这个示例中,我们使用WITH子句设置了Kafka连接器的相关配置,以及state.ttl参数。其中,state.ttl参数用于设置状态TTL的过期时间,可以使用各种时间单位,例如1h表示 1 小时,30m表示30分钟。

    2023-04-27 17:50:39
    赞同 展开评论 打赏
  • 值得去的地方都没有捷径

    Flink SQL是Apache Flink的一个模块,Statettl是其中一个用于实现基于时间的事件窗口处理和聚合操作的函数。在Flink SQL中,可以在CREATE TABLE语句中通过使用TUMBLE、HOP和SESSION等时间窗口函数来使用Statettl。

    设置Statettl的TTL(time-to-live)可以通过在CREATE TABLE语句中使用WITH选项来配置。例如,在使用TUMBLE函数的情况下,可以使用以下语法来设置TTL:

    CREATE TABLE myTable ( id BIGINT, eventTime TIMESTAMP(3), ... ) WITH ( ... 'connector.timestamp-extractor.timestamp-field' = 'eventTime', 'connector.window-assigner.type' = 'tumble', 'connector.window-assigner.size' = '10 min', 'connector.window-assigner.slide' = '5 min', 'connector.window-assigner.offset' = '0 min', 'connector.ttl' = '1 h' --设置窗口数据的TTL为1小时 ); 在上述示例中,'connector.ttl'选项设置了窗口数据的TTL为1小时。

    需要注意的是,当使用Statettl时,需要使用带有时间属性的数据源,并且可以通过在CREATE TABLE语句中指定时间属性名称来指定时间属性。例如,在上述示例中,'connector.timestamp-extractor.timestamp-field'选项设置了事件时间属性为'eventTime'。

    2023-04-26 12:35:49
    赞同 展开评论 打赏
  • 天下风云出我辈,一入江湖岁月催,皇图霸业谈笑中,不胜人生一场醉。

    在 Flink SQL 中,可以使用 TUMBLE 和 HOP 两种窗口来对数据流进行处理。这两种窗口都包含了状态存储,而在窗口滑动时,为了避免存储过大,需要对状态进行清理,这就是 state.ttl 的作用。

    在 Flink SQL 1.12 及之后的版本中,可以通过以下方式来设置 state.ttl:

    sql

    CREATE TABLE myTable ( name STRING, age INT, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND -- 定义 watermark,即数据的最大延迟时间为 5s ) WITH ( ... 'state.ttl' = '10h', -- 设置 state 的生存时间为 10 小时 'state.ttl.cleanup.interval' = '15min' -- 设置清理过期数据的时间间隔为 15 分钟 );

    在上述示例中,通过在 WITH 子句中添加 'state.ttl' 和 'state.ttl.cleanup.interval' 参数来设置 state.ttl。其中,'state.ttl' 表示状态的生存时间,可以使用 ISO-8601 时间格式或者带有数字和单位的字符串形式(如 '10h' 表示 10 小时)。'state.ttl.cleanup.interval' 则表示清理过期数据的时间间隔,可以使用与 'state.ttl' 相同的时间格式。

    需要注意的是,'state.ttl' 只能应用于被声明为 append-only 或者 upsert 模式的表。对于 retract 模式的表,需要使用 state.retention.time 参数来设置状态的保留时间。

    2023-04-26 09:35:39
    赞同 展开评论 打赏
  • 十分耕耘,一定会有一分收获!

    楼主你好,你可以通过tumble、hop等窗口的操作符来定义设置窗口,也可以使用 WITH TUMBLE_TIME_INTERVAL 或 WITH SESSION_GAP 语句来定义设置窗口的时间间隔和延迟触发时间。

    2023-04-25 08:42:38
    赞同 展开评论 打赏
  • 在 Flink SQL 中,可以使用 StateTtlConfig 来设置状态的生存时间。

    StateTtlConfig 是一个 Flink 的状态 TTL 配置类,可以设置状态的 TTL 时间以及在过期时是否可以保留过期状态的值。

    在 Flink SQL 中,可以通过以下 SQL 语句来设置状态的 TTL 配置:

    SET execution.checkpointing.ttl.ms = 60000;
    

    上述语句将状态的 TTL 设置为 60 秒。在 Flink SQL 中,可以使用 StateTtlConfig 将状态的 TTL 配置传递给 SinkUpsertMaterializer。例如,可以在创建 SinkUpsertMaterializer 时传递一个 StateTtlConfig 对象来设置状态的 TTL 配置。以下是一个示例:

    import org.apache.flink.api.common.state.StateTtlConfig;
    import org.apache.flink.streaming.api.functions.sink.SinkFunction;
    import org.apache.flink.table.data.RowData;
    import org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer;
    import org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerFactory;
    import org.apache.flink.table.types.logical.RowType;
    import java.time.Duration;
    public class MySinkUpsertMaterializerFactory implements SinkUpsertMaterializerFactory {
        private final StateTtlConfig ttlConfig;
        public MySinkUpsertMaterializerFactory(StateTtlConfig ttlConfig) {
            this.ttlConfig = ttlConfig;
        }
        @Override
        public SinkFunction<RowData> createSinkFunction(RowType rowType) {
            // create SinkUpsertMaterializer with the given TTL config
            SinkUpsertMaterializer.Builder builder = SinkUpsertMaterializer.builder();
            if (ttlConfig != null) {
                builder.withStateTtlConfig(ttlConfig);
            }
            return builder.build();
        }
    }
    

    在上述示例中,创建了一个 MySinkUpsertMaterializerFactory 类,该类实现了 SinkUpsertMaterializerFactory 接口。在该类的构造函数中,传入了一个 StateTtlConfig 对象,该对象将用于设置状态的 TTL 配置。在 createSinkFunction 方法中,创建了一个 SinkUpsertMaterializer.Builder 对象,并通过 withStateTtlConfig 方法将 StateTtlConfig 对象传递给 SinkUpsertMaterializer。最后,调用 build 方法创建 SinkUpsertMaterializer。 在 Flink SQL 中,可以使用以下 SQL 语句将自定义的 SinkUpsertMaterializerFactory 应用到查询中:

    CREATE TABLE mytable (
      ...
    ) WITH (
      'connector' = '...',
      'sink-upsert-materializer-factory' = 'com.example.MySinkUpsertMaterializerFactory',
      'sink-upsert-materializer-factory.ttl' = '60000'
    )
    

    在上述示例中,使用了自定义的 SinkUpsertMaterializerFactory,并通过 sink-upsert-materializer-factory.ttl 参数将状态的 TTL 设置为 60 秒。

    2023-04-24 14:03:21
    赞同 展开评论 打赏
  • 全栈JAVA领域创作者

    在 Flink SQL 中,设置状态 TTL 有两种方式:

    使用 CREATE TABLE 语句设置: sql Copy code CREATE TABLE my_table ( -- 列定义 ) WITH ( 'connector' = 'kafka', 'state.ttl.ms' = '600000' -- 这里设置 TTL 的时间,单位是毫秒 ); 使用 Flink 配置文件(flink-conf.yaml)设置: makefile Copy code state.ttl.ms: 600000 这里设置的 TTL 时间同样是单位是毫秒。

    2023-04-23 22:34:59
    赞同 展开评论 打赏
  • 热爱开发

    在 Flink SQL 中,可以通过 TUMBLE、HOP 或 SESSION 等窗口操作符来定义窗口,同时可以使用 WITH TUMBLE_TIME_INTERVAL、WITH HOP_TIME_INTERVAL 或 WITH SESSION_GAP 语句来定义窗口的时间间隔和延迟触发时间。在这些语句中,也可以设置窗口状态的 TTL 时间。

    例如,以下是一个使用 TUMBLE 和 WITH TUMBLE_TIME_INTERVAL 定义窗口,同时设置了窗口状态的 TTL 时间为 10 秒的示例:

    CREATE TABLE MyTable ( id BIGINT, name STRING, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND -- 指定水印 ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'my_topic', 'connector.startup-mode' = 'earliest-offset', 'connector.properties.zookeeper.connect' = 'localhost:2181', 'connector.properties.bootstrap.servers' = 'localhost:9092', 'format.type' = 'json' );

    CREATE TABLE MyOutputTable ( product_id BIGINT, order_cnt BIGINT ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://localhost:3306/mydb?useSSL=false', 'connector.table' = 'result_table', 'connector.username' = 'root', 'connector.password' = 'root', 'connector.write.flush.max-rows' = '1' );

    INSERT INTO MyOutputTable SELECT id, COUNT(*) as order_cnt FROM MyTable GROUP BY TUMBLE(event_time, INTERVAL '1' MINUTE) WITH ( 'window.max-parallelism' = '8', 'window.state-ttl' = '10000' -- 窗口状态的 TTL 时间为 10 秒 ); 在上述示例中,我们使用 TUMBLE 和 WITH TUMBLE_TIME_INTERVAL 来定义窗口,同时在 WITH 子句中设置了窗口状态的 TTL 时间为 10 秒。这意味着,如果某个窗口在 10 秒内没有接收到新的数据,则会被删除。

    需要注意的是,Flink 的窗口和状态管理机制非常灵活和强大,您可以根据具体场景来设置窗口和状态的参数,以达到最佳的性能和可靠性。建议先参考 Flink 官方文档,深入理解窗口和状态的概念和使用方法。

    2023-04-23 17:33:34
    赞同 展开评论 打赏
  • 在创建 Table 时,对 Table Schema 中的某一列进行设置,例如:

    CREATE TABLE MyTable ( id INT, name STRING, login_time TIMESTAMP(3), EXPIRES_PROCTIME AS PROCTIME() + INTERVAL '10' MINUTE, EXPIRES_ROWTIME TIMESTAMP(3) ROWTIME AS login_time + INTERVAL '10' MINUTE ) WITH ( 'connector.type' = 'kafka', 'connector.topic' = 'user_behavior', 'format.type' = 'json', 'format.derive-schema' = 'true', -- 设置 State TTL 'state.ttl' = '1h' ); 在上述代码中,我们通过在 Table Schema 中添加 EXPIRES_ROWTIME 列以及 state.ttl 参数,实现了对该列的 State TTL 设置。也可以通过 EXPIRES_PROCTIME 来设置处理时间为主键的 State TTL。

    2023-04-23 17:05:06
    赞同 展开评论 打赏
滑动查看更多

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载