开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

Flink这个问题怎么解决 ?

Flink这个问题怎么解决?--**--
-- Author: chenshengpeng
-- Created Time: 2024-03-11 16:04:01
-- Description: Write your description here
-- Hints: You can use SET statements to modify the configuration



-- Author: chenshengpeng
-- Created Time: 2024-02-29 18:21:32
-- Description: Write your description here
-- Hints: You can use SET statements to modify the configuration


--创建一个print_table临时表。

CREATE TEMPORARY TABLE print(
user_id bigint ,
member_id bigint,
user_name STRING,
phone STRING,
reg_time TIMESTAMP,
reg_type int,
client STRING,
user_type int,
user_level int,
idcard_no STRING,
idcard_name STRING,
idcard_address STRING,
vip_expire_time TIMESTAMP,
sm_device_id STRING,
system_version STRING,
register_ip STRING,
bind_phone STRING,
last_login_ip STRING,
last_login_time TIMESTAMP,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'selectdb',
'load-url' = 'selectdb-cn-jia3ma3b003.selectdbfe.rds.aliyuncs.com:8080',
'jdbc-url' = 'selectdb-cn-jia3ma3b003.selectdbfe.rds.aliyuncs.com:9030',
'cluster-name' = 'new_cluster',
'table.identifier' = 'center_risk_service.data_model_user',
'username' = '',
'password' = '',
'sink.parallelism' = '3'
);

BEGIN STATEMENT SET;

-- INSERT INTO select-db.center_risk_service.data_model_user
INSERT INTO print
(
user_id
,member_id
,user_name
,phone
,reg_time
,reg_type
,client
,user_type
,user_level
,idcard_no
,idcard_name
,idcard_address
,vip_expire_time
,sm_device_id
,system_version
,register_ip
,bind_phone
,last_login_ip
,last_login_time
)
select
CAST(ui.user_id as bigint) user_id
,CAST(ui.member_id as bigint) member_id
,ui.user_name
,ui.phone
,ui.create_time as reg_time
,ui.reg_type
,ui.client
,CAST(ui.user_type as int) user_type
,ugc.level as user_level
,pua.idcard_no
,pua.idcard_name
,pua.idcard_address
,vi.expire_time vip_expire_time
,aall.device_id sm_device_id
,aall.system_version
,ma.register_ip
,ma.phone bind_phone
,ma.last_login_ip
,aall.login_time as last_login_time
from msyql.t_user_service.user_info ui

展开
收起
三分钟热度的鱼 2024-03-20 14:41:23 45 0
2 条回答
写回答
取消 提交回答
  • 对于最后给出的 Flink SQL 脚本内容,这是一个将 MySQL 数据写入到名为 print 的临时表的例子,然后通过 INSERT INTO 将查询结果插入到该临时表中。需要注意的是,这里使用的 selectdb 连接器可能并不是 Flink 内置的连接器,可能是某个自定义或第三方的 JDBC 连接器。如果要实现实时同步数据到其他存储,应当确保连接器支持实时流处理,并正确配置相应参数。同时,表路由的问题在此脚本中并没有体现,如需路由至不同表,需要根据实际业务逻辑调整 INSERT INTO 语句中的表名。

    2024-03-20 16:15:47
    赞同 展开评论 打赏
  • 对于您自定义的selectDB数据库结果表设置,目前看仅支持临时表方式来设置并发,元数据方式不行。不过您可以对任务资源配置成专家模式,然后设置参数table.exec.split-slot-sharing-group-per-vertex为ture,重启任务后对sink端的并发和资源进行调大。然后在重新运行任务,避免sink端的busy繁忙而导致上游反压,影响数据写入速度。此回答整理自钉群“实时计算Flink产品交流群”

    2024-03-20 14:52:28
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载