问题一:flink-1.11 KafkaDynamicTableSouce groupBy 结果怎样发送到
INSERT INTO kafka_dws_artemis_out_order select warehouse_id, count(*)
from kafka_ods_artemis_out_order group by warehouse_id;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Table sink
'myhive.wanglei.kafka_dws_artemis_out_order' doesn't support consuming
update changes which is produced by node
GroupAggregate(groupBy=[warehouse_id], select=[warehouse_id, COUNT(*) AS
EXPR$1])
在 Flink-1.10 中可以更改 KafkaTableSinkBase 让它 implements RetractStream 实现。
我看现在 Flink-1.11 中是用了 KafkaDynamicSource, KafkaDynamicSink,这样怎样改动才能让
GroupBy 的结果也发送到 Kafka 呢?
*来自志愿者整理的flink邮件归档
参考答案:
DynamicTableSink有一个方法是getChangelogMode,可以通过这个方法来指定这个sink接收什么种类的数据*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370230?spm=a2c6h.12873639.article-detail.63.6f9243783Lv0fl
问题二:FlinkSQL 任务提交后 任务名称问题
代码大概是这样子的,一张kafka source表,一张es Sink表,最后通过tableEnv.executeSql("insert into esSinkTable select ... from kafkaSourceTable")执行
任务提交后任务名称为“inset-into_某某catalog_某某database.某某Table”
这样很不友好啊,能不能我自己指定任务名称呢?
*来自志愿者整理的flink邮件归档
参考答案:
在zeppelin中你可以指定insert 语句的job name,如下图,(对Zeppelin感兴趣的,可以加入钉钉群:32803524)
%flink.ssql(jobName="my job")
insert into sink_kafka select status, direction, cast(event_ts/1000000000
as timestamp(3)) from source_kafka where status <> 'foo'*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370228?spm=a2c6h.12873639.article-detail.64.6f9243783Lv0fl
问题三:state无法从checkpoint中恢复
state无法从checkpoint中恢复 配置代码env.enableCheckpointing(1000);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//作业失败后不重启
env.setRestartStrategy(RestartStrategies.noRestart());
env.getCheckpointConfig().setCheckpointTimeout(500);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setStateBackend(new RocksDBStateBackend("file:///opt/flink/flink-1.7.2/checkpoints")); 使用状态的代码private transient ListState<String> counts;
@Override
public void open(Configuration parameters) throws Exception {
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.minutes(30))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ListStateDescriptor<String> lastUserLogin = new ListStateDescriptor<>("lastUserLogin", String.class);
lastUserLogin.enableTimeToLive(ttlConfig);
counts = getRuntimeContext().getListState(lastUserLogin);
}
我重启了task managers 后。发现 counts 里面的数据都丢失了
*来自志愿者整理的flink邮件归档
参考答案:
1 counts 的数据丢失了能否详细描述一下呢?你预期是什么,看到什么现象
2 能否把你关于 counts 的其他代码也贴一下
- 你的作业是否从 checkpoint 恢复了呢?这个可以从 JM log 来查看
- 如果你确定是数据有丢失的话,或许你可以使用 state-process-api[1] 看一下是序列化出去有问题,还是 restore 回来有问题
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/state_processor_api.html*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370227?spm=a2c6h.12873639.article-detail.65.6f9243783Lv0fl
问题四:sql 内嵌josn数组解析报 类型转换报错
hi all我这边有个嵌套的json数组,报类型转换错误(ts AS CAST(FROM_UNIXTIME(hiido_time) AS TIMESTAMP(3)),这里报错),是不是不能这么写
create table hiido_push_sdk_mq (
datas ARRAY<ROW<from
string,hdid string,event string,hiido_time bigint,ts AS CAST(FROM_UNIXTIME(hiido_time) AS TIMESTAMP(3)),WATERMARK FOR ts AS ts - INTERVAL '5' MINUTE>>
) with (
'connector' = 'kafka',
'topic' = 'hiido_pushsdk_event',
'properties.bootstrap.servers' = 'kafkafs002-core001.yy.com:8103,kafkafs002-core002.yy.com:8103,kafkafs002-core003.yy.com:8103',
'properties.group.id' = 'push_click_sql_version_consumer',
'scan.startup.mode' = 'latest-offset',
'format.type' = 'json');
错误如下:
[ERROR] 2020-07-17 20:17:50,640(562284338) --> [http-nio-8080-exec-10] com.yy.push.flink.sql.gateway.sql.parse.SqlCommandParser.parseBySqlParser(SqlCommandParser.java:77): parseBySqlParser, parse: com.yy.push.flink.sql.gateway.context.JobContext$1@5d5f32d1, stmt: create table hiido_push_sdk_mq ( datas ARRAY<ROW<from
string,hdid string,event string,hiido_time bigint,ts AS CAST(FROM_UNIXTIME(hiido_time) AS TIMESTAMP(3)),WATERMARK FOR ts AS ts - INTERVAL '5' MINUTE>>) with ('connector' = 'kafka','topic' = 'hiido_pushsdk_event','properties.bootstrap.servers' = 'kafkafs002-core001.yy.com:8103,kafkafs002-core002.yy.com:8103,kafkafs002-core003.yy.com:8103','properties.group.id' = 'push_click_sql_version_consumer','scan.startup.mode' = 'latest-offset','format.type' = 'json'), error info: SQL parse failed. Encountered "AS" at line 1, column 115.
Was expecting one of:
"ROW" ...
<BRACKET_QUOTED_IDENTIFIER> ...
<QUOTED_IDENTIFIER> ...
<BACK_QUOTED_IDENTIFIER> ...
<IDENTIFIER> ...
<UNICODE_QUOTED_IDENTIFIER> ...
"STRING" ...
"BYTES" ...
"ARRAY" ...
"MULTISET" ...
"RAW" ...
"BOOLEAN" ...
"INTEGER" ...
"INT" ...
"TINYINT" ...
"SMALLINT" ...
"BIGINT" ...
"REAL" ...
"DOUBLE" ...
"FLOAT" ...
"BINARY" ...
"VARBINARY" ...
"DECIMAL" ...
"DEC" ...
"NUMERIC" ...
"ANY" ...
"CHARACTER" ...
"CHAR" ...
"VARCHAR" ...
"DATE" ...
"TIME" ...
"TIMESTAMP" ...
*来自志愿者整理的flink邮件归档
参考答案:
计算列只能写在最外层,不能在嵌套类型里面有计算列。
*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370226?spm=a2c6h.12873639.article-detail.66.6f9243783Lv0fl
问题五:flink不带参数的udf始终返回第一次调用的结果
我有一个不带参数的udf,用于返回系统当前时间的字符串格式,但是调用时每次都返回这个udf第一次调用的结果,所以拿到的时间全部都是一样的
udf的实时如下:
public class GetTimeFunc extends ScalarFunction {
public String eval() {
return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
}
}
请问,针对这种没有入参的udf,flink内部是有做什么优化吗,导致每次调用返回的结果都一样?
*来自志愿者整理的flink邮件归档
参考答案:
是的,这种就被当做常量被优化掉了。
你可以覆盖一下ScalarFunction#isDeterministic方法,说明你这个函数时非确定性的,就不会被优化掉了。
*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370224?spm=a2c6h.12873639.article-detail.67.6f9243783Lv0fl