问题一:能否通过配置参数在flinksql中获取这个状态码呢? 或者说 如果需要获取这些状态,需要如何实现?
能否通过配置参数在flinksql中获取这个状态码呢? 或者说 如果需要获取这些状态,需要如何实现?
参考答案:
在Flink SQL中,可以通过配置参数获取状态码。首先,需要在Flink SQL中使用CREATE TABLE
语句创建一个表,该表包含状态码字段。然后,可以使用INSERT INTO
语句将状态码插入到表中。最后,可以使用SELECT
语句查询状态码。
以下是一个简单的示例:
- 创建表:
CREATE TABLE status_codes ( id INT, status_code INT ) WITH ( 'connector' = '...', -- 这里填写具体的连接器类型,例如 'filesystem', 'jdbc', 'kafka' 等 'format' = '...', -- 这里填写具体的格式类型,例如 'csv', 'on', 'avro' 等 'path' = '...' -- 这里填写具体的数据源路径或者连接信息 );
- 插入状态码:
INSERT INTO status_codes (id, status_code) VALUES (1, 200); INSERT INTO status_codes (id, status_code) VAL(2, 404); INSERT INTO status_codes (id, status_code) VALUES (3, 500);
- 查询状态码:
SELECT * FROM status_codes;
这样,你就可以通过配置参数在Flink SQL中获取状态码了。如果需要获取其他状态,只需修改`CREATE TA
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/608103
问题二:Flink postgre cdc的相关配置里 是否有支持获取这个操作的标识符?
Flink postgre cdc的相关配置里 是否有支持获取(insert,update,delete)操作的标识符?目前已知的是只有op_ts这个元数据
参考答案:
这个要看连接器的支持情况,尝试加一列op看下,但是不是所有的连接器都支持
op STRING METADATA FROM 'value.operation' VIRTUAL。
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/608102
问题三:对于买了阿里云flink服务,但是使用jar开发的用户,你们这边推荐我们用社区的依赖,还是什么?
对于买了阿里云flink服务,但是使用jar开发的用户,你们这边推荐我们用社区的依赖,还是vvr提供的?
参考答案:
有内置 Connector 的建议使用 ververica 的依赖,底层对 batchSize 等优化参数、输入输出埋点 metric 都实现好了。
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/608100
问题四:Flink这块有什么好的方法去获取数据的状态吗?
用flink cdc去读取rds postgresql的日志 但是无法获取数据的op状态是update还是delete 只有一个op_ts为数据处理时间 Flink这块有什么好的方法去获取数据的状态吗?
参考答案:
我试了一下,是通的,你可以参考
CREATE TEMPORARY TABLE source_clicks(
username varchar,
click_url varchar,
eventtime varchar,
ts AS TO_TIMESTAMP(eventtime),
WATERMARK FOR ts AS ts - INTERVAL '2' SECOND --为Rowtime定义Watermark。
) WITH (
'connector' = 'mysql',
'hostname' = 'rm-......s.com',
'port' = '3306',
'username' = '...',
'password' = '...',
'database-name' = 'mysql_test',
'table-name' = 'source_clicks',
'scan.incremental.snapshot.enabled' = 'false'
);
-- select * from source_clicks;
CREATE TEMPORARY TABLE sink_output(
window_start TIMESTAMP,
window_end TIMESTAMP,
username VARCHAR,
clicks BIGINT
) WITH (
'connector' = 'mysql',
'hostname' = 'rm-.....com',
'port' = '3306',
'username' = '...',
'password' = '.....',
'database-name' = 'mysql_test',
'table-name' = 'sink_output'
);
-- select * from sink_output;
INSERT INTO sink_output
SELECT
HOP_START(ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE) as HOP_START,
HOP_END(ts, INTERVAL '30' SECOND,INTERVAL '1' MINUTE) as HOP_END,
username,
COUNT(click_url)
FROM source_clicks
GROUP BY HOP(ts,INTERVAL '30' SECOND, INTERVAL '1' MINUTE),username;
CREATE TABLE source_clicks(
username VARCHAR(50) ,
click_url VARCHAR(50) ,
eventtime VARCHAR(50)
);
CREATE TABLE sink_output(
window_start TIMESTAMP,
window_end TIMESTAMP,
username VARCHAR(50),
clicks BIGINT
)
insert into source_clicks values
('Jark','http://taobao.com/xxx','2017-10-10 10:00:00.0'),
('Jark','http://taobao.com/xxx','2017-10-10 10:00:10.0'),
('Jark','http://taobao.com/xxx','2017-10-10 10:00:49.0'),
('Jark','http://taobao.com/xxx','2017-10-10 10:01:05.0'),
('Jark','http://taobao.com/xxx','2017-10-10 10:01:58.0'),
('Timo','http://taobao.com/xxx','2017-10-10 10:02:10.0');
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/608099
问题五:我现在在做flink cdc2doris的操作目前出现了以下问题,要怎么解决?
我现在在做flink cdc2doris的操作目前出现了以下问题:原先同步10张表,都是先全量再增量同步的(scan.startup.mode=initial),现在新加两张表,配置从最早的binlog开始同步(scan.startup.mode=earliest-offset),从savepoint启动后发现配置没生效,新加的两张表还是先全量再增量。
参考答案:
启动方式是固定的,不可以随着savepoint更改。
关于本问题的更多回答可点击进行查看: