Flink部署问题之编译失败如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:FLINK SQL 消费kafka消息乱序问题

kafka消息里有一个随时间递增的“成交额”字段,写了一个UDAF统计累加当前成交额与上一条数据的成交额的差值,发现差值有出现负数的情况 用工具看topic里的消息是有序的,分区数为1。flink版本1.11.2

*来自志愿者整理的flink邮件归档



参考答案:

检查了一下上游,发现在source端把并行度改成1就不乱序了 *来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370034?spm=a2c6h.13066369.question.71.33bf585fMmQxos



问题二:flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从sav

源表test: CREATE TABLE test ( id INT, name VARCHAR(255), time TIMESTAMP(3), status INT, PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '1', 'database-name' = 'ai_audio_lyric_task', 'table-name' = 'test' ) 源表status CREATE TABLE status ( id INT, name VARCHAR(255), PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '1', 'database-name' = 'ai_audio_lyric_task', 'table-name' = 'status' );

输出表 CREATE TABLE test_status ( id INT, name VARCHAR(255), time TIMESTAMP(3), status INT, status_name VARCHAR(255) PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'xxx', 'index' = 'xxx', 'username' = 'xxx', 'password' = 'xxx', 'sink.bulk-flush.backoff.max-retries' = '100000', 'sink.bulk-flush.backoff.strategy' = 'CONSTANT', 'sink.bulk-flush.max-actions' = '5000', 'sink.bulk-flush.max-size' = '10mb', 'sink.bulk-flush.interval' = '1s' );

输出语句: INSERT into test_status SELECT t.*, s.name FROM test AS t LEFT JOIN status AS s ON t.status = s.id;

mysql表中已经有数据 test: 0, name0, 2020-07-06 00:00:00 , 0 1, name1, 2020-07-06 00:00:00 , 1 2, name2, 2020-07-06 00:00:00 , 1 .....

status 0, status0 1, status1 2, status2 .....

操作顺序与复现: 1、启动任务,设置并行度为40, 表中数据算完后。/data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink savepoint保存,然后web ui上取消任务。 ==> test_status中的数据正常: 0, name0, 2020-07-06 00:00:00 , 0, status0 1, name1, 2020-07-06 00:00:00 , 1, status1 2, name2, 2020-07-06 00:00:00 , 1, status1

2、操作mysql, 将status中id=1数据变更为 status1_modify

3、接下来的重启上面的任务不同并行度下,1和大于1的情况下,在并行度大于1的情况下,结果跟预期不相符。 /data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink -s savepoint -p 1 job 下, ==> test_status中的数据正常: 0, name0, 2020-07-06 00:00:00 , 0, status0 1, name1, 2020-07-06 00:00:00 , 1, status1_modify 2, name2, 2020-07-06 00:00:00 , 1, status1_modify /data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink -s savepoint -p 40 job 下 ==> test_status中的数据不正常, id = 1,2的两条数据缺失: 0, name0, 2020-07-06 00:00:00 , 0, status0

怀疑与cdc的变化流在多个并行度下sink输出流的时候,先执行了 insert id=1再执行delete id=1的动作,导致数据有问题!!!

这里是不是bug?还是从save point里恢复的时候,算子的状态有问题? 如果是,能不能在sink的时候,只把sink这里的并行度设置为1??

*来自志愿者整理的flink邮件归档



参考答案:

看起来是这个 jdbc sink bug 导致的 https://issues.apache.org/jira/browse/FLINK-19423

这个 bug 会导致删的时候,取的 pk 索引不对,所以可能导致 index 异常,或是删错数据。

这个bug 会在即将发布的 1.11.3 中修复。

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370032?spm=a2c6h.13066369.question.72.33bf585flrGPJ5



问题三:flink1.11编译失败是什么原因

hi、flink1.11 release source编译为什么会缺失类文件,去github仓库也没找到,如何解决这个问题~

import org.apache.flink.sql.parser.impl.ParseException;

import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl;

Error:(39, 87) java: 找不到符号

符号: 类 ParseException

位置: 类

org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTableAddReplaceColumn

*来自志愿者整理的flink邮件归档



参考答案:

这两个类是 codegen 生成的,所以源码里没有,你编译下flink-sql-parser模块就会自动生成这几个类。

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370030?spm=a2c6h.13066369.question.73.33bf585fRbLJ0F



问题四:canal-json 分库分表场景应用有什么思路吗?

你好 我这边有很多这种场景,把分库分表的binlog 写入到同一个topic 然后用canal-json解析,这边想获取到json里面的table 字段,

然后根据 table名称加主键 写入到下游 合成一张表,写入到下游表,

然后发现 canal-json 是获取不到表名的,然后这边去修改canal-json的format。

添加 createJsonRowType方法的DataTypes.FIELD("table", DataTypes.STRING())

然后在deserialize方法里面把 table字段set到 data里面去。但是发现这种好像是不成功的 ,请问下 有什么具体点的思路提供下吗,

*来自志愿者整理的flink邮件归档



参考答案:

目前还不支持,读取 table 元信息。

在 1.12 中,debezium-json 支持了这种功能,文档[1], 代码[2]。

canal-json的话需要按照类似的方式支持下元信息读取。

Best,

Jark

[1]:

https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#available-metadata

[2]:

https://github.com/apache/flink/blob/0a7c23cac26af49bce7c1f79fbf993c0a7f87835/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDecodingFormat.java#L136

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370029?spm=a2c6h.13066369.question.76.33bf585fdLxyOW



问题五:flink on native k8s

flink on native k8s 按照官网配置,可以看到jobmanager ui 但是没有tm为0 slot为0 提交任务 就会一直卡主

角色配置: [root@node20 rbac]# cat rbac-role.yaml kind: Role apiVersion: rbac.authorization.k8s.io/v1 metadata: namespace: di-flink-dev name: flink-admin rules: - apiGroups: [""] resources: ["pods"] verbs: ["create","delete","get", "watch", "list"]

角色绑定: [root@node20 rbac]# cat rbac-serviceaccount.yaml apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: name: flink-admin-bind namespace: di-flink-dev subjects: - kind: Group name: system:serviceaccounts:di-flink-dev apiGroup: rbac.authorization.k8s.io roleRef: kind: Role name: flink-admin apiGroup: rbac.authorization.k8s.io

session启动命令: ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=my-cluster-test01 -Dkubernetes.namespace=di-flink-dev -Dkubernetes.rest-service.exposed.type=NodePort -Dtaskmanager.memory.process.size=1028m -Dkubernetes.taskmanager.cpu=2 -Dtaskmanager.numberOfTaskSlots=2 -Dresourcemanager.taskmanager-timeout=3600000

任务提交命令: ./bin/flink run -d -t kubernetes-session -Dkubernetes.cluster-id=my-cluster-test01 -Dkubernetes.namespace=di-flink-dev examples/streaming/WindowJoin.jar

*来自志愿者整理的flink邮件归档



参考答案:

你的提交命令是没有问题的,需要确认一下Flink client和JM的rest endpoint的联通情况

可以curl一下flink run命令打出来的JM rest地址,看看网络通不通

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370028?spm=a2c6h.13066369.question.77.33bf585fwLr80g

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
3月前
|
容灾 流计算
美团 Flink 大作业部署问题之 Checkpoint 跨机房副本的制作能力如何实现
美团 Flink 大作业部署问题之 Checkpoint 跨机房副本的制作能力如何实现
|
1月前
|
资源调度 分布式计算 大数据
大数据-111 Flink 安装部署 YARN部署模式 FlinkYARN模式申请资源、提交任务
大数据-111 Flink 安装部署 YARN部署模式 FlinkYARN模式申请资源、提交任务
92 0
|
1月前
|
Kubernetes Cloud Native 流计算
Flink-12 Flink Java 3分钟上手 Kubernetes云原生下的Flink集群 Rancher Stateful Set yaml详细 扩容缩容部署 Docker容器编排
Flink-12 Flink Java 3分钟上手 Kubernetes云原生下的Flink集群 Rancher Stateful Set yaml详细 扩容缩容部署 Docker容器编排
74 0
|
3月前
|
容灾 流计算
美团 Flink 大作业部署问题之Checkpoint 的 metadata 文件包含什么信息
美团 Flink 大作业部署问题之Checkpoint 的 metadata 文件包含什么信息
|
3月前
|
SQL Kubernetes 数据处理
实时计算 Flink版产品使用问题之如何把集群通过kubernetes进行部署
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
消息中间件 Kubernetes 监控
实时计算 Flink版操作报错合集之在编译源码时遇到报错:无法访问,该如何处理
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
3月前
|
机器学习/深度学习 人工智能 运维
美团 Flink 大作业部署问题之Flink在生态技术演进上有什么主要方向
美团 Flink 大作业部署问题之Flink在生态技术演进上有什么主要方向
|
3月前
|
监控 Serverless Apache
美团 Flink 大作业部署问题之如何体现Flink在业界的影响力
美团 Flink 大作业部署问题之如何体现Flink在业界的影响力
|
3月前
|
监控 Serverless 数据库
美团 Flink 大作业部署问题之端云联调并将流量恢复到云端实例如何结束
美团 Flink 大作业部署问题之端云联调并将流量恢复到云端实例如何结束
|
3月前
|
监控 Java Serverless
美团 Flink 大作业部署问题之想在Serverless平台上实时查看Spring Boot应用的日志要怎么操作
美团 Flink 大作业部署问题之想在Serverless平台上实时查看Spring Boot应用的日志要怎么操作

相关产品

  • 实时计算 Flink版