Flink SQL 中的 State TTL 可以在创建表时指定。State TTL 是指定状态的时效性,即指定状态可以存活的最长时间,超过这个时间,状态将被自动删除。一般的,状态存储时间的设定要结合业务需求及数据量大小等因素进行设置,以避免状态无限制的占用系统资源。
下面是一个在 Flink SQL 中创建带有 State TTL 的表的例子:
CREATE TABLE myTable (
id BIGINT,
name STRING,
event_time TIMESTAMP(3),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector.type' = 'kafka',
'connector.topic' = 'my-topic',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'connector.startup-mode' = 'earliest-offset',
'format.type' = 'json',
'json.timestamp-format.standard' = 'ISO-8601',
'json.timestamp-format.string' = 'yyyy-MM-dd HH:mm:ss',
'kafka.partition-discovery.interval-millis' = '1000',
'state.ttl' = '1 HOUR' --在这里设置 State TTL 的值,单位为小时
);
在上述代码中,在 WITH
子句中使用了 state.ttl
参数来设置 State TTL 的值,该值被设置为一个小时。
在 Flink SQL 中,使用 TUMBLE
、HOP
或 SESSION
窗口函数来实现窗口聚合操作。这些窗口函数都支持定义 WINDOW_TYPE
和 WINDOW_SIZE
等参数,可以用来设置窗口长度、滑动步长、时间戳字段等信息。
针对状态过期(State TTL),Flink 提供了一些配置项来控制状态的过期和自动清理。在 Flink SQL 中,可以通过在 CREATE TABLE
语句中使用 WITH
子句来配置状态过期。如下所示:
CREATE TABLE my_table (
name STRING,
score INT,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector.type' = 'kafka',
'connector.topic' = 'input',
'connector.version' = 'universal',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'update-mode' = 'append',
'connector.startup-mode' = 'latest-offset',
'format.type' = 'json'
) WITH (
'state.ttl.ms' = '3600000' -- 设置状态过期时间为 1 小时
);
在上述示例中,我们通过在 WITH
子句中使用 state.ttl.ms
参数来设置状态过期时间为 1 小时(即 3600000 毫秒)。一旦某个键值对状态超过了该时间,Flink 就会自动清理它们。
除了 state.ttl.ms
参数之外,还可以使用其他配置项来控制状态过期行为。例如:
state.ttl.enabled
: 是否启用状态 TTL,默认为 false。state.ttl.cleanup-strategy
: 状态清理策略,可以是 JOB
(当整个作业终止时清理状态)或 KEEP_LATEST
(只保留最近的状态),默认为 KEEP_LATEST
。state.ttl.max-timestamp-diff
: 允许的水印和处理时间之间的最大时间差,默认为 0 毫秒。state.ttl.write-timestamps-to-chaos
: 是否将状态到期事件写入混沌日志,以便进行可用性测试。更多关于 Flink SQL 中状态过期的配置信息可以参考 Flink 官方文档中的相关部分:https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/config/#state-ttl-config。
Flink State是有状态计算中,用来保存中间结果或者缓存数据,并提供Exactly-Once语义。在大部分场景下,基于自适应调参功能,GeminiStateBackend会自动调整参数而不需要您手动配置。 在希望协调内存资源和性能时,请使用内存配置,详情请参见内存配置。 在遇到本地盘空间不足的问题时,请使用存算分离配置,详情请参见存算分离配置。 在遇到Join算子有性能瓶颈时,请使用KV分离配置,详情请参见KV分离配置。 基本配置如下图
在 Flink SQL 中,可以使用 TUMBLE、HOP 和 SESSION 等窗口函数来定义窗口的大小和滑动区间,从而控制 State TTL。可以使用以下语法来设置 State TTL:
CREATE TABLE myTable (
...
) WITH (
'connector' = '...',
...
'state.ttl' = '1h' -- 设置状态 TTL 时间
)
其中,state.ttl 参数的值可以是以秒、分钟、小时、天等为单位的时间长度。例如,'state.ttl' = '3600s' 表示状态 TTL 时间为 1 小时。
在FlinkSQL中,可以使用WITH子句来设置state.ttl参数,以控制状态数据的过期时间。state.ttl参数用于设置状态TTL(Time-To-Live)的过期时间,以避免状态数据过多导致内存溢出。
举一个简单例子:
CREATE TABLE my_table ( id INT, name STRING, age INT, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'topic' = 'my_topic', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'my_group', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true', 'state.ttl' = '1h' -- 设置状态 TTL 为 1 小时 );
在这个示例中,我们使用WITH子句设置了Kafka连接器的相关配置,以及state.ttl参数。其中,state.ttl参数用于设置状态TTL的过期时间,可以使用各种时间单位,例如1h表示 1 小时,30m表示30分钟。
Flink SQL是Apache Flink的一个模块,Statettl是其中一个用于实现基于时间的事件窗口处理和聚合操作的函数。在Flink SQL中,可以在CREATE TABLE语句中通过使用TUMBLE、HOP和SESSION等时间窗口函数来使用Statettl。
设置Statettl的TTL(time-to-live)可以通过在CREATE TABLE语句中使用WITH选项来配置。例如,在使用TUMBLE函数的情况下,可以使用以下语法来设置TTL:
CREATE TABLE myTable ( id BIGINT, eventTime TIMESTAMP(3), ... ) WITH ( ... 'connector.timestamp-extractor.timestamp-field' = 'eventTime', 'connector.window-assigner.type' = 'tumble', 'connector.window-assigner.size' = '10 min', 'connector.window-assigner.slide' = '5 min', 'connector.window-assigner.offset' = '0 min', 'connector.ttl' = '1 h' --设置窗口数据的TTL为1小时 ); 在上述示例中,'connector.ttl'选项设置了窗口数据的TTL为1小时。
需要注意的是,当使用Statettl时,需要使用带有时间属性的数据源,并且可以通过在CREATE TABLE语句中指定时间属性名称来指定时间属性。例如,在上述示例中,'connector.timestamp-extractor.timestamp-field'选项设置了事件时间属性为'eventTime'。
在 Flink SQL 中,可以使用 TUMBLE 和 HOP 两种窗口来对数据流进行处理。这两种窗口都包含了状态存储,而在窗口滑动时,为了避免存储过大,需要对状态进行清理,这就是 state.ttl 的作用。
在 Flink SQL 1.12 及之后的版本中,可以通过以下方式来设置 state.ttl:
sql
CREATE TABLE myTable ( name STRING, age INT, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND -- 定义 watermark,即数据的最大延迟时间为 5s ) WITH ( ... 'state.ttl' = '10h', -- 设置 state 的生存时间为 10 小时 'state.ttl.cleanup.interval' = '15min' -- 设置清理过期数据的时间间隔为 15 分钟 );
在上述示例中,通过在 WITH 子句中添加 'state.ttl' 和 'state.ttl.cleanup.interval' 参数来设置 state.ttl。其中,'state.ttl' 表示状态的生存时间,可以使用 ISO-8601 时间格式或者带有数字和单位的字符串形式(如 '10h' 表示 10 小时)。'state.ttl.cleanup.interval' 则表示清理过期数据的时间间隔,可以使用与 'state.ttl' 相同的时间格式。
需要注意的是,'state.ttl' 只能应用于被声明为 append-only 或者 upsert 模式的表。对于 retract 模式的表,需要使用 state.retention.time 参数来设置状态的保留时间。
楼主你好,你可以通过tumble、hop等窗口的操作符来定义设置窗口,也可以使用 WITH TUMBLE_TIME_INTERVAL 或 WITH SESSION_GAP 语句来定义设置窗口的时间间隔和延迟触发时间。
在 Flink SQL 中,可以使用 StateTtlConfig 来设置状态的生存时间。
StateTtlConfig 是一个 Flink 的状态 TTL 配置类,可以设置状态的 TTL 时间以及在过期时是否可以保留过期状态的值。
在 Flink SQL 中,可以通过以下 SQL 语句来设置状态的 TTL 配置:
SET execution.checkpointing.ttl.ms = 60000;
上述语句将状态的 TTL 设置为 60 秒。在 Flink SQL 中,可以使用 StateTtlConfig 将状态的 TTL 配置传递给 SinkUpsertMaterializer。例如,可以在创建 SinkUpsertMaterializer 时传递一个 StateTtlConfig 对象来设置状态的 TTL 配置。以下是一个示例:
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer;
import org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerFactory;
import org.apache.flink.table.types.logical.RowType;
import java.time.Duration;
public class MySinkUpsertMaterializerFactory implements SinkUpsertMaterializerFactory {
private final StateTtlConfig ttlConfig;
public MySinkUpsertMaterializerFactory(StateTtlConfig ttlConfig) {
this.ttlConfig = ttlConfig;
}
@Override
public SinkFunction<RowData> createSinkFunction(RowType rowType) {
// create SinkUpsertMaterializer with the given TTL config
SinkUpsertMaterializer.Builder builder = SinkUpsertMaterializer.builder();
if (ttlConfig != null) {
builder.withStateTtlConfig(ttlConfig);
}
return builder.build();
}
}
在上述示例中,创建了一个 MySinkUpsertMaterializerFactory 类,该类实现了 SinkUpsertMaterializerFactory 接口。在该类的构造函数中,传入了一个 StateTtlConfig 对象,该对象将用于设置状态的 TTL 配置。在 createSinkFunction 方法中,创建了一个 SinkUpsertMaterializer.Builder 对象,并通过 withStateTtlConfig 方法将 StateTtlConfig 对象传递给 SinkUpsertMaterializer。最后,调用 build 方法创建 SinkUpsertMaterializer。 在 Flink SQL 中,可以使用以下 SQL 语句将自定义的 SinkUpsertMaterializerFactory 应用到查询中:
CREATE TABLE mytable (
...
) WITH (
'connector' = '...',
'sink-upsert-materializer-factory' = 'com.example.MySinkUpsertMaterializerFactory',
'sink-upsert-materializer-factory.ttl' = '60000'
)
在上述示例中,使用了自定义的 SinkUpsertMaterializerFactory,并通过 sink-upsert-materializer-factory.ttl 参数将状态的 TTL 设置为 60 秒。
在 Flink SQL 中,设置状态 TTL 有两种方式:
使用 CREATE TABLE 语句设置: sql Copy code CREATE TABLE my_table ( -- 列定义 ) WITH ( 'connector' = 'kafka', 'state.ttl.ms' = '600000' -- 这里设置 TTL 的时间,单位是毫秒 ); 使用 Flink 配置文件(flink-conf.yaml)设置: makefile Copy code state.ttl.ms: 600000 这里设置的 TTL 时间同样是单位是毫秒。
在 Flink SQL 中,可以通过 TUMBLE、HOP 或 SESSION 等窗口操作符来定义窗口,同时可以使用 WITH TUMBLE_TIME_INTERVAL、WITH HOP_TIME_INTERVAL 或 WITH SESSION_GAP 语句来定义窗口的时间间隔和延迟触发时间。在这些语句中,也可以设置窗口状态的 TTL 时间。
例如,以下是一个使用 TUMBLE 和 WITH TUMBLE_TIME_INTERVAL 定义窗口,同时设置了窗口状态的 TTL 时间为 10 秒的示例:
CREATE TABLE MyTable ( id BIGINT, name STRING, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND -- 指定水印 ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'my_topic', 'connector.startup-mode' = 'earliest-offset', 'connector.properties.zookeeper.connect' = 'localhost:2181', 'connector.properties.bootstrap.servers' = 'localhost:9092', 'format.type' = 'json' );
CREATE TABLE MyOutputTable ( product_id BIGINT, order_cnt BIGINT ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://localhost:3306/mydb?useSSL=false', 'connector.table' = 'result_table', 'connector.username' = 'root', 'connector.password' = 'root', 'connector.write.flush.max-rows' = '1' );
INSERT INTO MyOutputTable SELECT id, COUNT(*) as order_cnt FROM MyTable GROUP BY TUMBLE(event_time, INTERVAL '1' MINUTE) WITH ( 'window.max-parallelism' = '8', 'window.state-ttl' = '10000' -- 窗口状态的 TTL 时间为 10 秒 ); 在上述示例中,我们使用 TUMBLE 和 WITH TUMBLE_TIME_INTERVAL 来定义窗口,同时在 WITH 子句中设置了窗口状态的 TTL 时间为 10 秒。这意味着,如果某个窗口在 10 秒内没有接收到新的数据,则会被删除。
需要注意的是,Flink 的窗口和状态管理机制非常灵活和强大,您可以根据具体场景来设置窗口和状态的参数,以达到最佳的性能和可靠性。建议先参考 Flink 官方文档,深入理解窗口和状态的概念和使用方法。
在创建 Table 时,对 Table Schema 中的某一列进行设置,例如:
CREATE TABLE MyTable ( id INT, name STRING, login_time TIMESTAMP(3), EXPIRES_PROCTIME AS PROCTIME() + INTERVAL '10' MINUTE, EXPIRES_ROWTIME TIMESTAMP(3) ROWTIME AS login_time + INTERVAL '10' MINUTE ) WITH ( 'connector.type' = 'kafka', 'connector.topic' = 'user_behavior', 'format.type' = 'json', 'format.derive-schema' = 'true', -- 设置 State TTL 'state.ttl' = '1h' ); 在上述代码中,我们通过在 Table Schema 中添加 EXPIRES_ROWTIME 列以及 state.ttl 参数,实现了对该列的 State TTL 设置。也可以通过 EXPIRES_PROCTIME 来设置处理时间为主键的 State TTL。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。