Flink CPU问题之CPU利用率低如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:kafka-connect json格式适配问题?

kafka->Flink->kafka->mysql Flink用sql处理之后数据写入kafka里面,格式为json,再用kafka-connect-jdbc将数据导出到mysql中。 使用kafka-connect是方便数据同时导出到其他存储

Flink定义输出表结构:

CREATE TABLE print_table \

(total_count BIGINT, username STRING, update_time TIMESTAMP(6)) \

WITH (\

'connector' = 'kafka', \

'topic' = 'test_out', \

'properties.bootstrap.servers' = '127.0.0.1:9092', \

'sink.partitioner' = 'round-robin', \

'format' = 'json')

输出的数据格式示例:

{"total_count":12,"username":"admin","update_time":"2020-07-27 17:23:00"}

但是kafka-connect-jdbc的json格式需要schema和payload,示例:

{

"schema": {

"type": "struct",

"fields": [

{

"type": "int64",

"optional": false,

"field": "id"

},

{

"type": "string",

"optional": true,

"field": "name"

}

],

"optional": true,

"name": "user"

},

"payload": {

"id": 1,

"name": "admin"

}

}

请教下在Flink里面如何处理(补上schema和payload?),才能形成kafka connect匹配的json格式?

当前Flink处理sql:

INSERT INTO test_out(total_count,username,update_time) SELECT count(1) AS total_count,username,TUMBLE_START(update_time,INTERVAL '1' MINUTE) as update_time FROM table1 GROUP BY username,TUMBLE(update_time,INTERVAL '1' MINUTE)*来自志愿者整理的flink邮件归档



参考答案:

你需要在 DDL 和 query 上都补上 schema 和 payload:

CREATE TABLE print_table \

(schema STRING, payload ROW<total_count BIGINT, username STRING,

update_time TIMESTAMP(6)>) \

WITH (\

'connector' = 'kafka', \

'topic' = 'test_out', \

'properties.bootstrap.servers' = '127.0.0.1:9092', \

'sink.partitioner' = 'round-robin', \

'format' = 'json')

-- DML 上可以用常量写死 schema, 用 ROW 函数封装 payload

INSERT INTO output

SELECT '{ "type": "struct", ...}' as schema, ROW(totall_count, username,

update_time) as payload

FROM ...

Btw, 我想问一下,为什么一定要用 kafka-jdbc-connect 去同步到 mysql 呢?个人觉得直接用 Flink SQL 同步到

mysql 不是很方便么?*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371520?spm=a2c6h.12873639.article-detail.17.29d04378ApxdqJ



问题二:Blink的Batch模式的并行度问题

Flink 目前的blink table planner batch mode

(读hdfs上的orc文件)只支持StreamTableSource和LookupableTableSource,

但是StreamTableSource的并行度默认应该是1 , 底层是ContinuousFileMonitoringFunction ,

那么如何能扩大并行度来优化性能呢?*来自志愿者整理的flink邮件归档



参考答案:

可以配置 table.exec.resource.default-parallelism 为需要的并发。详见文档[1]

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/config.html#table-exec-resource-default-parallelism*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371518?spm=a2c6h.12873639.article-detail.18.29d04378ApxdqJ



问题三:Could not find any factory for identifier 'kafka'

Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create 语句的时候报错了 编译的jar包是jar-with-dependencies的

代码片段: public String ddlSql = String.format("CREATE TABLE %s (\n" + " number BIGINT,\n" + " msg STRING,\n" + " username STRING,\n" + " update_time TIMESTAMP(3)\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = '%s',\n" + " 'properties.bootstrap.servers' = '%s',\n" + " 'properties.group.id' = '%s',\n" + " 'format' = 'json',\n" + " 'json.fail-on-missing-field' = 'false',\n" + " 'json.ignore-parse-errors' = 'true'\n" + ")\n", tableName, topic, servers, group);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); tableEnv.executeSql(ddlSql);

报错信息: Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath. Available factory identifiers are: datagen at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240) at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326) ... 33 more

参考了这个 http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-DDL-td4890.html#a4893 补充了flink-connector-kafka_2.12,flink-sql-connector-kafka_2.12, 还是会报一样的错

附上pom依赖: org.apache.flink flink-java ${flink.version} org.apache.flink flink-table-api-java-bridge_2.12 ${flink.version} org.apache.flink flink-table-api-java ${flink.version} org.apache.flink flink-connector-kafka_2.12 ${flink.version} org.apache.flink flink-sql-connector-kafka_2.12 ${flink.version} org.apache.flink flink-json ${flink.version} *来自志愿者整理的flink邮件归档



参考答案:

可能跟你的打包方式有关系。你这个程序如果直接在idea里面运行是可以运行的么?

如果可以在idea运行,但是打出来的jar包不能提交运行的话,很有可能跟SPI文件有关系。 如果你用的是shade plugin,需要看下这个transformer[1] [1] https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#AppendingTransformer *来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371517?spm=a2c6h.12873639.article-detail.19.29d04378ApxdqJ



问题四:flink 聚合 job 重启问题

请教个问题,我通过程序拉取kafka消息后,注册为flink流表。然后执行sql: "select user_id, count(*)cnt from 流表", 将结果写入到mysql 聚合表中(SINK组件为:flink1.11版本JdbcUpsertTableSink)。 但问题是,每次JOB重启后,之前mysql 聚合表结果会被清空。我设置了checkpoint和racksdbbackendstate.*来自志愿者整理的flink邮件归档



参考答案:

伪代码发下看看?看下jdbc sink的配置,是不是支持删除记录,更新的时候旧记录被删除了*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371515?spm=a2c6h.12873639.article-detail.20.29d04378ApxdqJ



问题五:Flink CPU利用率低

想问下大佬们 Flink的cpu利用率这么低吗 0.012?*来自志愿者整理的flink邮件归档



参考答案:

Flink CPU利用率的高低主要还是取决于你的任务中的业务逻辑,框架本身的CPU占用是很低的试想一下,如果你的任务是计算非常简单(或则就是sleep),那整个TM的CPU利用率就很低了,约等于框架占用的如果是一个计算很密集的(或者就是死循环),那TM CPU利用率就是取决于你的slot数量了,2个slot就是200%的CPU利用率 *来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371512?spm=a2c6h.12873639.article-detail.21.29d04378ApxdqJ

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
3月前
|
存储 监控 Java
实时计算 Flink版产品使用问题之随着时间增加,作业的CPU繁忙度增加,是什么原因
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
Prometheus 监控 Cloud Native
grafana展示的CPU利用率与实际不符的问题探究
观察到`mpstat`命令显示单核CPU的`%usr`和`%sys`分别持续在70%和20%,而Grafana监控数据显示较低。问题源于Grafana表达式计算的是CPU时间增量而非利用率。`mpstat`通过`/proc/stat`获取数据并计算CPU利用率,而`node-exporter`直接导出原始数据。调整Grafana表达式以匹配`mpstat`的计算方式后,两者结果一致。解决方案是修正Grafana查询以准确反映CPU占用率。
257 1
grafana展示的CPU利用率与实际不符的问题探究
|
5月前
|
Python
python3获取内存和cpu利用率记录日志文件psutil
python3获取内存和cpu利用率记录日志文件psutil
69 1
|
6月前
|
关系型数据库 MySQL Java
实时计算 Flink版操作报错之整内存和cpu分配之后启动报错如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
SQL 数据处理 API
实时计算 Flink版产品使用问题之holo的io以及cpu使用较为稳定,sink端busy一直在20%左右,有时候50%,该如何优化
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
监控 Shell
Shell脚本监控CPU、内存和硬盘利用率
Shell脚本监控CPU、内存和硬盘利用率
|
6月前
|
SQL Java Apache
Flink CPU问题之CPU较低如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
9天前
|
弹性计算 Kubernetes Perl
k8s 设置pod 的cpu 和内存
在 Kubernetes (k8s) 中,设置 Pod 的 CPU 和内存资源限制和请求是非常重要的,因为这有助于确保集群资源的合理分配和有效利用。你可以通过定义 Pod 的 `resources` 字段来设置这些限制。 以下是一个示例 YAML 文件,展示了如何为一个 Pod 设置 CPU 和内存资源请求(requests)和限制(limits): ```yaml apiVersion: v1 kind: Pod metadata: name: example-pod spec: containers: - name: example-container image:
|
18天前
|
存储 关系型数据库 MySQL
查询服务器CPU、内存、磁盘、网络IO、队列、数据库占用空间等等信息
查询服务器CPU、内存、磁盘、网络IO、队列、数据库占用空间等等信息
192 2
|
2月前
|
存储 关系型数据库 MySQL
查询服务器CPU、内存、磁盘、网络IO、队列、数据库占用空间等等信息
查询服务器CPU、内存、磁盘、网络IO、队列、数据库占用空间等等信息
130 5

相关产品

  • 实时计算 Flink版