Flink这个问题怎么解决?--**--
-- Author: chenshengpeng
-- Created Time: 2024-03-11 16:04:01
-- Description: Write your description here
-- Hints: You can use SET statements to modify the configuration
-- Author: chenshengpeng
-- Created Time: 2024-02-29 18:21:32
-- Description: Write your description here
-- Hints: You can use SET statements to modify the configuration
--创建一个print_table临时表。
CREATE TEMPORARY TABLE print(
user_id bigint ,
member_id bigint,
user_name STRING,
phone STRING,
reg_time TIMESTAMP,
reg_type int,
client STRING,
user_type int,
user_level int,
idcard_no STRING,
idcard_name STRING,
idcard_address STRING,
vip_expire_time TIMESTAMP,
sm_device_id STRING,
system_version STRING,
register_ip STRING,
bind_phone STRING,
last_login_ip STRING,
last_login_time TIMESTAMP,
PRIMARY KEY (user_id
) NOT ENFORCED
) WITH (
'connector' = 'selectdb',
'load-url' = 'selectdb-cn-jia3ma3b003.selectdbfe.rds.aliyuncs.com:8080',
'jdbc-url' = 'selectdb-cn-jia3ma3b003.selectdbfe.rds.aliyuncs.com:9030',
'cluster-name' = 'new_cluster',
'table.identifier' = 'center_risk_service.data_model_user',
'username' = '',
'password' = '',
'sink.parallelism' = '3'
);
BEGIN STATEMENT SET;
-- INSERT INTO select-db
.center_risk_service.data_model_user
INSERT INTO print
(
user_id
,member_id
,user_name
,phone
,reg_time
,reg_type
,client
,user_type
,user_level
,idcard_no
,idcard_name
,idcard_address
,vip_expire_time
,sm_device_id
,system_version
,register_ip
,bind_phone
,last_login_ip
,last_login_time
)
select
CAST(ui.user_id as bigint) user_id
,CAST(ui.member_id as bigint) member_id
,ui.user_name
,ui.phone
,ui.create_time as reg_time
,ui.reg_type
,ui.client
,CAST(ui.user_type as int) user_type
,ugc.level as user_level
,pua.idcard_no
,pua.idcard_name
,pua.idcard_address
,vi.expire_time vip_expire_time
,aall.device_id sm_device_id
,aall.system_version
,ma.register_ip
,ma.phone bind_phone
,ma.last_login_ip
,aall.login_time as last_login_time
from msyql.t_user_service.user_info ui
对于最后给出的 Flink SQL 脚本内容,这是一个将 MySQL 数据写入到名为 print
的临时表的例子,然后通过 INSERT INTO
将查询结果插入到该临时表中。需要注意的是,这里使用的 selectdb
连接器可能并不是 Flink 内置的连接器,可能是某个自定义或第三方的 JDBC 连接器。如果要实现实时同步数据到其他存储,应当确保连接器支持实时流处理,并正确配置相应参数。同时,表路由的问题在此脚本中并没有体现,如需路由至不同表,需要根据实际业务逻辑调整 INSERT INTO
语句中的表名。
对于您自定义的selectDB数据库结果表设置,目前看仅支持临时表方式来设置并发,元数据方式不行。不过您可以对任务资源配置成专家模式,然后设置参数table.exec.split-slot-sharing-group-per-vertex为ture,重启任务后对sink端的并发和资源进行调大。然后在重新运行任务,避免sink端的busy繁忙而导致上游反压,影响数据写入速度。此回答整理自钉群“实时计算Flink产品交流群”
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。