Flink CPU问题之CPU利用率低如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:kafka-connect json格式适配问题?

kafka->Flink->kafka->mysql Flink用sql处理之后数据写入kafka里面,格式为json,再用kafka-connect-jdbc将数据导出到mysql中。 使用kafka-connect是方便数据同时导出到其他存储

Flink定义输出表结构:

CREATE TABLE print_table \

(total_count BIGINT, username STRING, update_time TIMESTAMP(6)) \

WITH (\

'connector' = 'kafka', \

'topic' = 'test_out', \

'properties.bootstrap.servers' = '127.0.0.1:9092', \

'sink.partitioner' = 'round-robin', \

'format' = 'json')

输出的数据格式示例:

{"total_count":12,"username":"admin","update_time":"2020-07-27 17:23:00"}

但是kafka-connect-jdbc的json格式需要schema和payload,示例:

{

"schema": {

"type": "struct",

"fields": [

{

"type": "int64",

"optional": false,

"field": "id"

},

{

"type": "string",

"optional": true,

"field": "name"

}

],

"optional": true,

"name": "user"

},

"payload": {

"id": 1,

"name": "admin"

}

}

请教下在Flink里面如何处理(补上schema和payload?),才能形成kafka connect匹配的json格式?

当前Flink处理sql:

INSERT INTO test_out(total_count,username,update_time) SELECT count(1) AS total_count,username,TUMBLE_START(update_time,INTERVAL '1' MINUTE) as update_time FROM table1 GROUP BY username,TUMBLE(update_time,INTERVAL '1' MINUTE)*来自志愿者整理的flink邮件归档



参考答案:

你需要在 DDL 和 query 上都补上 schema 和 payload:

CREATE TABLE print_table \

(schema STRING, payload ROW<total_count BIGINT, username STRING,

update_time TIMESTAMP(6)>) \

WITH (\

'connector' = 'kafka', \

'topic' = 'test_out', \

'properties.bootstrap.servers' = '127.0.0.1:9092', \

'sink.partitioner' = 'round-robin', \

'format' = 'json')

-- DML 上可以用常量写死 schema, 用 ROW 函数封装 payload

INSERT INTO output

SELECT '{ "type": "struct", ...}' as schema, ROW(totall_count, username,

update_time) as payload

FROM ...

Btw, 我想问一下,为什么一定要用 kafka-jdbc-connect 去同步到 mysql 呢?个人觉得直接用 Flink SQL 同步到

mysql 不是很方便么?*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371520?spm=a2c6h.12873639.article-detail.17.29d04378ApxdqJ



问题二:Blink的Batch模式的并行度问题

Flink 目前的blink table planner batch mode

(读hdfs上的orc文件)只支持StreamTableSource和LookupableTableSource,

但是StreamTableSource的并行度默认应该是1 , 底层是ContinuousFileMonitoringFunction ,

那么如何能扩大并行度来优化性能呢?*来自志愿者整理的flink邮件归档



参考答案:

可以配置 table.exec.resource.default-parallelism 为需要的并发。详见文档[1]

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/config.html#table-exec-resource-default-parallelism*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371518?spm=a2c6h.12873639.article-detail.18.29d04378ApxdqJ



问题三:Could not find any factory for identifier 'kafka'

Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create 语句的时候报错了 编译的jar包是jar-with-dependencies的

代码片段: public String ddlSql = String.format("CREATE TABLE %s (\n" + " number BIGINT,\n" + " msg STRING,\n" + " username STRING,\n" + " update_time TIMESTAMP(3)\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = '%s',\n" + " 'properties.bootstrap.servers' = '%s',\n" + " 'properties.group.id' = '%s',\n" + " 'format' = 'json',\n" + " 'json.fail-on-missing-field' = 'false',\n" + " 'json.ignore-parse-errors' = 'true'\n" + ")\n", tableName, topic, servers, group);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); tableEnv.executeSql(ddlSql);

报错信息: Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath. Available factory identifiers are: datagen at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240) at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326) ... 33 more

参考了这个 http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-DDL-td4890.html#a4893 补充了flink-connector-kafka_2.12,flink-sql-connector-kafka_2.12, 还是会报一样的错

附上pom依赖: org.apache.flink flink-java ${flink.version} org.apache.flink flink-table-api-java-bridge_2.12 ${flink.version} org.apache.flink flink-table-api-java ${flink.version} org.apache.flink flink-connector-kafka_2.12 ${flink.version} org.apache.flink flink-sql-connector-kafka_2.12 ${flink.version} org.apache.flink flink-json ${flink.version} *来自志愿者整理的flink邮件归档



参考答案:

可能跟你的打包方式有关系。你这个程序如果直接在idea里面运行是可以运行的么?

如果可以在idea运行,但是打出来的jar包不能提交运行的话,很有可能跟SPI文件有关系。 如果你用的是shade plugin,需要看下这个transformer[1] [1] https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#AppendingTransformer *来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371517?spm=a2c6h.12873639.article-detail.19.29d04378ApxdqJ



问题四:flink 聚合 job 重启问题

请教个问题,我通过程序拉取kafka消息后,注册为flink流表。然后执行sql: "select user_id, count(*)cnt from 流表", 将结果写入到mysql 聚合表中(SINK组件为:flink1.11版本JdbcUpsertTableSink)。 但问题是,每次JOB重启后,之前mysql 聚合表结果会被清空。我设置了checkpoint和racksdbbackendstate.*来自志愿者整理的flink邮件归档



参考答案:

伪代码发下看看?看下jdbc sink的配置,是不是支持删除记录,更新的时候旧记录被删除了*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371515?spm=a2c6h.12873639.article-detail.20.29d04378ApxdqJ



问题五:Flink CPU利用率低

想问下大佬们 Flink的cpu利用率这么低吗 0.012?*来自志愿者整理的flink邮件归档



参考答案:

Flink CPU利用率的高低主要还是取决于你的任务中的业务逻辑,框架本身的CPU占用是很低的试想一下,如果你的任务是计算非常简单(或则就是sleep),那整个TM的CPU利用率就很低了,约等于框架占用的如果是一个计算很密集的(或者就是死循环),那TM CPU利用率就是取决于你的slot数量了,2个slot就是200%的CPU利用率 *来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371512?spm=a2c6h.12873639.article-detail.21.29d04378ApxdqJ

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
6天前
|
监控 Shell
Shell脚本监控CPU、内存和硬盘利用率
Shell脚本监控CPU、内存和硬盘利用率
|
6天前
|
SQL Java Apache
Flink CPU问题之CPU较低如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
10月前
|
监控 Shell Perl
监控CPU、内存和硬盘利用率
监控CPU、内存和硬盘利用率
114 1
|
11月前
CPU利用率高又看不到占用率高的进程?
CPU利用率高又看不到占用率高的进程?
143 0
|
存储 弹性计算 运维
CPU 利用率从 10% 提升至 60%:中型企业云原生成本优化实战指南
在互联网早期迅速发展时,相关领域企业更多注重于扩展业务,为了迅速占领市场,往往会投入较高的成本。然而,随着互联网人口红利逐渐消退,以及近几年的疫情影响,越来越多企业开始重视成本管理,从“粗放式经营”转变为“精细化运营”模式,成本优化成为企业重点关注事项。
463 0
CPU 利用率从 10% 提升至 60%:中型企业云原生成本优化实战指南
|
Java
[最佳实践] Java线程栈分析 - CPU利用率持续升高
使用应用诊断分析平台ATP的Java线程栈分析功能,诊断CPU利用率持续升高问题
301 0
[最佳实践] Java线程栈分析 - CPU利用率持续升高
|
Cloud Native Linux 应用服务中间件
助力Koordinator云原生单机混部,龙蜥混部技术提升CPU利用率达60%|龙蜥技术
龙蜥社区的三大原生技术为 Koordinator 社区提供了强大的 CPU 混部底层技术支持。
助力Koordinator云原生单机混部,龙蜥混部技术提升CPU利用率达60%|龙蜥技术
|
Linux
linux下用top命令查看,cpu利用率超过100%时怎么回事
linux下用top命令查看,cpu利用率超过100%时怎么回事
399 0
|
算法 调度
2.2.2操作系统(CPU利用率 系统吞吐量 周转时间 调度算法 FCFS SJF HRRN)
调度算法的评价指标 ​1.CPU利用率 2.系统吞吐量 3.周转时间 4.等待时间 5.响应时间 调度算法 1.先来先服务(FCFS, First Come First Serve) 2.短作业优先(SJF, Shortest Job First) 非抢占式 抢占式 ​注意几个小细节: 对FCFS和SJF两种算法的思考… 3.高响应比优先(HRRN, Highest Response Ratio Next) 知识回顾与重要考点
2.2.2操作系统(CPU利用率 系统吞吐量 周转时间 调度算法 FCFS SJF HRRN)
|
Dubbo Java Linux
Sentinel在docker中获取CPU利用率的一个BUG
微服务治理中限流、熔断、降级是一块非常重要的内容。目前市面上开源的组件也不是很多,简单场景可以使用Guava,复杂场景可以选用Hystrix、Sentinel。今天要说的就是Sentinel,Sentinel是一款阿里开源的产品,
375 0
Sentinel在docker中获取CPU利用率的一个BUG

热门文章

最新文章

  • 1
    实时计算 Flink版操作报错合集之遇到报错:"An OperatorEvent from an OperatorCoordinator to a task was lost. Triggering task failover to ensure consistency." ,该怎么办
    15
  • 2
    实时计算 Flink版操作报错合集之在连接Oracle 19c时报错如何解决
    23
  • 3
    实时计算 Flink版操作报错合集之写入 Kafka 报错 "Failed to send data to Kafka: Failed to allocate memory within the configured max blocking time 60000 ms",该怎么解决
    13
  • 4
    实时计算 Flink版操作报错合集之报错显示“Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT and DELETE"是什么意思
    17
  • 5
    实时计算 Flink版操作报错合集之报错io.debezium.DebeziumException: The db history topic or its content is fully or partially missing. Please check database history topic configuration and re-execute the snapshot. 是什么原因
    16
  • 6
    实时计算 Flink版操作报错合集之本地打成jar包,运行报错,idea运行不报错,是什么导致的
    12
  • 7
    实时计算 Flink版操作报错合集之使用 Event Time Temporal Join 关联多个 HBase 后,Kafka 数据的某个字段变为 null 是什么原因导致的
    17
  • 8
    实时计算 Flink版操作报错合集之使用 Event Time Temporal Join 关联多个 HBase 后,Kafka 数据的某个字段变为 null 是什么原因导致的
    21
  • 9
    实时计算 Flink版操作报错合集之查询sqlserver ,全量阶段出现报错如何解决
    16
  • 10
    实时计算 Flink版操作报错合集之执行Flink job,报错“Could not execute SQL statement. Reason:org.apache.flink.table.api.ValidationException: One or more required options are missing”,该怎么办
    38
  • 相关产品

  • 实时计算 Flink版