Flink问题之中文有乱码如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:集群重启如何保证带状态的任务自动从最近一个checkpoint恢复?


集群重启如何保证带状态的任务自动从最近一个checkpoint恢复?


参考回答:

FixedDelaStrategy 默认是从最近一个ck 恢复,其他的策略可以看官网。如果你是想问怎么实现的,不建议在邮件列表里问实现原理的问题。可以google找相关文章、相关flip 或者 直接debug源码。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359846


问题二:关于savepoint恢复问题咨询?


hi, 社区  版本flink 1.7  我正在尝试从保存点(或检查点)还原flink作业,该作业的工作是从kafka读取->执行30分钟的窗口聚合(只是AggregationFunction,就像一个计数器)->下沉到kafka。  我使用rocksdb和启用检查点。  现在我尝试手动触发一个保存点。 每个汇总的期望值是30(1个数据/每分钟)。 但是,当我从保存点还原时(flink运行-d -s {savepoint的url}),聚合值不是30(小于30,取决于我取消flink作业并还原的时间)。 但是当作业正常运行时,它将达到30。  我不知道为什么有些数据似乎会丢失?  日志显示``No restore state for FlinkKafkaConsumer''    四川省成都市高新区天府三街199号 太平洋保险金融大厦A区11楼 11th Floor,Tower A,Pacific insurance finance Building, No.199 TianFu 3rd Street, GaoXin District, Chengdu, Sichuan Province Mobile +86 15817382279 Email wangchunhao@navercorp.com

NCloud

-----Original Message----- From: "王春浩"wa...@navercorp.com To: us...@flink.apache.org; Cc: Sent: 2021/5/26周三 17:03 (GMT+08:00) Subject: inquire about restore from savepoint

Hi Community,  version flink 1.7 im trying to make a flink job restore from a savepoint(or checkpoint), what the job do is reading from kafka -> do a 30-minutes-window aggregation(just AggregationFunction, acts like a counter) -> sink to kafka. i use rocksdb and enabled checkpoint. now i try to trigger a savepoint manually. the expected value of each aggregated one is 30(1 data/per minute). but when i restore from a savepoint(flink run -d -s {savepoint's url}), the aggregated value is not 30(less than 30, depends on the time i cancel flink job and restore). but when the job run normally, it gets 30. i don't know why could some data seems to be lost? and a log shows "No restore state for FlinkKafkaConsumer"*来自志愿者整理的flink邮件归档


参考回答:

看下你的 flink 命令对不对,然后去 Flink Web UI Checkpoint 界面,看下是否从 Savepoint 恢复(下面有个

restore path).

之后再看下你的窗口时间类型用的是什么。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359849


问题三:有哪些flink状态查看工具可以使用?


我有一个flink sql写的数据实时同步作业,从mysql binlog cdc消费发到mongodb,仅此而已,没有lookup,也没有join。 查看checkpoint页显示状态有17MB,checkpoint耗时要2s。 想知道为什么状态会如此之大,有没有状态查看工具看看里面到底存了什么信息?*来自志愿者整理的flink邮件归档


参考回答:

可以使用 State Processor [1]。

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/libs/state_processor_api/


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359851


问题四:flink sql写mysql中文有乱码问题怎么办?


我的flink sql作业如下

SELECT product_name, window_start, window_end, CAST(SUM(trans_amt)ASDECIMAL(24,2)) trans_amt, CAST(COUNT(order_no)ASBIGINT) trans_cnt, -- LOCALTIMESTAMP AS insert_time, '微支付事业部'AS bus_name FROM(

mysql sink表的定义如下 CREATE TABLE XXX ( ) Engine=InnoDB AUTO_INCREMENT=31 DEFAULT CHARSET=utf8mb4;

运行起来后写入mysql表的数据带有中文乱码 ??????

查看作业运行日志后发现其使用了 UTF-16LE 字符集,有什么办法可以让其使用 utf8mb4 字符集么? 2021-05-17 18:02:25,010 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task GroupAggregate(groupBy=[product_name, window_start, window_end], select=[product_name, window_start, window_end, SUM_RETRACT(trans_amt) AS f3,COUNTRETRACT(orderno)ASf3,COUNTRETRACT(orderno)ASf3, COUNT_RETRACT(order_no) AS f4]) -> Calc(select=[CAST(product_name) AS product_name, (CAST(window_start) DATE_FORMAT _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS window_start, (CAST(window_end) DATE_FORMAT _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS window_end, CAST(f3)AStransamt,CAST(f3)AStransamt,CAST(f3) AS trans_amt, CAST(f4) AS trans_cnt, CAST(()) AS insert_time, _UTF-16LE'??????????????????':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS bus_name]) -> Sink: Sink(table=[default_catalog.default_database.all_trans_5m_new], fields=[product_name, window_start, window_end, trans_amt, trans_cnt, insert_time, bus_name]) (1/1)#0 (1b5f26dcd9a6071f36753b93a0ea9bea), deploy into slot with allocation id 9f4c7d45bdf429f89158e2f8451663e0. 2021-05-17 18:02:25,013 INFO org.apache.flink.runtime.taskmanager.Task [] - GroupAggregate(groupBy=[product_name, window_start, window_end, id, data_type, mer_cust_id, order_no, trans_date], select=[product_name, window_start, window_end, id, data_type, mer_cust_id, order_no, trans_date, MAX_RETRACT(trans_amt) AS trans_amt]) -> Calc(select=[product_name, window_start, window_end, trans_amt, order_no]) (1/1)#0 (ef6b0a94e75cc1665e4ce3d40e74ab0c) switched from CREATED to DEPLOYING.*来自志愿者整理的flink邮件归档


参考回答:

你在flinkSQL连接mysql表的时候配置url=jdbc:mysql://127.0.0.1:3306/database?useUnicode=true&characterEncoding=UTF-8,像这样

CREATE TABLE jdbc_sink(

id INT COMMENT '订单id',

goods_name VARCHAR(128) COMMENT '商品名称',

price DECIMAL(32,2) COMMENT '商品价格',

user_name VARCHAR(64) COMMENT '用户名称'

) WITH (

'connector' = 'jdbc',

'url' = 'jdbc:mysql://127.0.0.1:3306/database?useUnicode=true&characterEncoding=UTF-8',

'username' = 'mysqluser',

'password' = 'mysqluser',

'table-name' = 'jdbc_sink'

)


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359853


问题五:flink sql支持Common Table Expression (CTE)吗?


flink sql支持Common Table Expression (CTE)吗?是不是可以通过 create temporary view xxx 来实现?CTE和temporary view的区别是什么? 例如 with toronto_ppl as ( SELECT DISTINCT name FROM population WHERE country = "Canada" AND city = "Toronto" ) , avg_female_salary as ( SELECT AVG(salary) as avgSalary FROM salaries WHERE gender = "Female" ) SELECT name , salary FROM People WHERE name in (SELECT DISTINCT FROM toronto_ppl) AND salary >= (SELECT avgSalary FROM avg_female_salary)*来自志愿者整理的flink邮件归档


参考回答:

支持。

如果只是在单个sql中复用expression,和temporary view基本一样,区别不大。

在某些优化路径上不同,一般没有实质影响。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359854

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
2天前
|
消息中间件 Kafka Apache
Apache Flink 是一个开源的分布式流处理框架
Apache Flink 是一个开源的分布式流处理框架
707 5
|
2天前
|
消息中间件 API Apache
官宣|阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会
本文整理自阿里云开源大数据平台徐榜江 (雪尽),关于阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会。
1772 2
官宣|阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会
|
2天前
|
SQL Java API
官宣|Apache Flink 1.19 发布公告
Apache Flink PMC(项目管理委员)很高兴地宣布发布 Apache Flink 1.19.0。
1764 2
官宣|Apache Flink 1.19 发布公告
|
2天前
|
SQL Apache 流计算
Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
【2月更文挑战第25天】Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
339 3
|
2天前
|
Oracle 关系型数据库 流计算
flink cdc 同步问题之报错org.apache.flink.util.SerializedThrowable:如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
423 0
|
2天前
|
XML Java Apache
Apache Flink自定义 logback xml配置
Apache Flink自定义 logback xml配置
243 0
|
2天前
|
消息中间件 Java Kafka
Apache Hudi + Flink作业运行指南
Apache Hudi + Flink作业运行指南
153 1
|
2天前
|
缓存 分布式计算 Apache
Apache Hudi与Apache Flink更好地集成,最新方案了解下?
Apache Hudi与Apache Flink更好地集成,最新方案了解下?
69 0
|
2天前
|
监控 Apache 开发工具
Apache Flink 1.12.2集成Hudi 0.9.0运行指南
Apache Flink 1.12.2集成Hudi 0.9.0运行指南
69 0
|
2天前
|
SQL Java Apache
超详细步骤!整合Apache Hudi + Flink + CDH
超详细步骤!整合Apache Hudi + Flink + CDH
130 0

热门文章

最新文章

相关产品

  • 实时计算 Flink版