Flink 实时写入数据到 ElasticSearch 性能调优

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 线上业务反应使用 Flink 消费上游 kafka topic 里的轨迹数据出现 backpressure,数据积压严重。单次 bulk 的写入量为:3000/50mb/30s,并行度为 48。针对该问题,为了避免影响线上业务申请了一个与线上集群配置相同的 ES 集群。

作者:张刘毅

背景说明

线上业务反应使用 Flink 消费上游 kafka topic 里的轨迹数据出现 backpressure,数据积压严重。单次 bulk 的写入量为:3000/50mb/30s,并行度为 48。针对该问题,为了避免影响线上业务申请了一个与线上集群配置相同的 ES 集群。本着复现问题进行优化就能解决的思路进行调优测试。

测试环境

  • Elasticsearch 2.3.3
  • Flink 1.6.3
  • flink-connector-elasticsearch 2_2.11
  • 八台 SSD,56 核 :3 主 5 从

Rally 分布式压测 ES 集群

1.jpg

从压测结果来看,集群层面的平均写入性能大概在每秒 10 w+ 的 doc。

Flink 写入测试

配置文件

config.put("cluster.name", ConfigUtil.getString(ES_CLUSTER_NAME, "flinktest"));
config.put("bulk.flush.max.actions", ConfigUtil.getString(ES_BULK_FLUSH_MAX_ACTIONS, "3000"));
config.put("bulk.flush.max.size.mb", ConfigUtil.getString(ES_BULK_FLUSH_MAX_SIZE_MB, "50"));
config.put("bulk.flush.interval.ms", ConfigUtil.getString(ES_BULK_FLUSH_INTERVAL, "3000"));

执行代码片段

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 initEnv(env);
 Properties properties = ConfigUtil.getProperties(CONFIG_FILE_PATH);
 //从kafka中获取轨迹数据
 FlinkKafkaConsumer010<String> flinkKafkaConsumer010 =
     new FlinkKafkaConsumer010<>(properties.getProperty("topic.name"), new SimpleStringSchema(), properties);
 //从checkpoint最新处消费
 flinkKafkaConsumer010.setStartFromLatest();
 DataStreamSource<String> streamSource = env.addSource(flinkKafkaConsumer010);
10//Sink2ES
streamSource.map(s -> JSONObject.parseObject(s, Trajectory.class))
    .addSink(EsSinkFactory.createSinkFunction(new TrajectoryDetailEsSinkFunction())).name("esSink");
env.execute("flinktest");

运行时配置

3.jpg

任务容器数为 24 个 container,一共 48 个并发。savepoint 为 15 分钟:

4.jpg

  • 运行现象

(1)source 和 Map 算子均出现较高的反压

5.jpg

(2)ES 集群层面,目标索引写入速度写入陡降

平均 QPS 为:12 k 左右。

(3)对比取消 sink 算子后的 QPS

streamSource.map(s -> JSONObject.parseObject(s, FurionContext.class)).name("withnosink");

66.jpg

平均QPS为:116 k 左右。

有无sink参照实验的结论:

取消 sink 2 ES 的操作后,QPS 达到 110 k,是之前 QPS 的十倍。由此可以基本判定: ES 集群写性能导致的上游反压

优化方向

索引字段类型调整

77.jpg

bulk 失败的原因是由于集群 dynamic mapping 自动监测,部分字段格式被识别为日期格式而遇到空字符串无法解析报错。

解决方案:关闭索引自动检测。

6.jpg

效果: ES 集群写入性能明显提高但 Flink operator 依然存在反压:

7.jpg

降低副本数

curl -XPUT{集群地址}/{索引名称}/_settings?timeout=3m -H "Content-Type: application/json" -d'{"number_of_replicas":"0"}'

提高 refresh_interval

针对这种 ToB、日志型、实时性要求不高的场景,我们不需要查询的实时性,通过加大甚至关闭 refresh_interval 的参数提高写入性能。

curl -XPUT{集群地址}/{索引名称}/_settings?timeout=3m -H "Content-Type: application/json" -d '{ "settings": {  "index": {"refresh_interval" : -1   }   }  }'

检查集群各个节点 CPU 核数

在 Flink 执行时,通过 Grafana 观测各个节点 CPU 使用率以及通过 Linux 命令查看各个节点 CPU 核数。发现 CPU 使用率高的节点 CPU 核数比其余节点少。为了排除这个短板效应,我们将在这个节点中的索引 shard 移动到 CPU 核数多的节点。

curl -XPOST {集群地址}/_cluster/reroute  -d'{"commands":[{"move":{"index":"{索引名称}","shard":5,"from_node":"源node名称","to_node":"目标node名称"}}]}' -H "Content-Type:application/json"

以上优化的效果:

8.jpg

经过以上的优化,我们发现写入性能提升有限。因此,需要深入查看写入的瓶颈点。

在 CPU 使用率高的节点使用 Arthas 观察线程

9.jpg

打印阻塞的线程堆栈

"elasticsearch[ES-077-079][bulk][T#3]" Id=247 WAITING on java.util.concurrent.LinkedTransferQueue@369223fa
   at sun.misc.Unsafe.park(Native Method)
     -  waiting on java.util.concurrent.LinkedTransferQueue@369223fa
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.LinkedTransferQueue.awaitMatch(LinkedTransferQueue.java:737)
    at java.util.concurrent.LinkedTransferQueue.xfer(LinkedTransferQueue.java:647)
    at java.util.concurrent.LinkedTransferQueue.take(LinkedTransferQueue.java:1269)
    at org.elasticsearch.common.util.concurrent.SizeBlockingQueue.take(SizeBlockingQueue.java:161)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

从上面的线程堆栈我们可以看出线程处于等待状态。

关于这个问题的讨论详情查看 https://discuss.elastic.co/t/thread-selection-and-locking/26051/3,这个 issue 讨论大致意思是:节点数不够,需要增加节点。于是我们又增加节点并通过设置索引级别的 total_shards_per_node 参数将索引 shard 的写入平均到各个节点上。

线程队列优化

ES 是将不同种类的操作(index、search…)交由不同的线程池执行,主要的线程池有三:index、search 和 bulk thread_pool。线程池队列长度配置按照官网默认值,我觉得增加队列长度而集群本身没有很高的处理能力线程还是会 await(事实上实验结果也是如此在此不必赘述),因为实验节点机器是 56 核,对照官网:

10.jpg

因此修改 size 数值为 56。

11.jpg

经过以上的优化,我们发现在 kafka 中的 topic 积压有明显变少的趋势:

12.jpg

index buffer size 的优化

参照官网:

13.jpg

indices.memory.index_buffer_size : 10%

translog 优化:

索引写入 ES 的基本流程是:

  • 数据写入 buffer 缓冲和 translog;
  • 每秒 buffer 的数据生成 segment 并进入内存,此时 segment 被打开并供 search 使用查询;
  • buffer 清空并重复上述步骤 ;
  • buffer 不断添加、清空 translog 不断累加,当达到某些条件触发 commit 操作,刷到磁盘;

ES 默认的刷盘操作为 Request 但容易部分操作比较耗时,在日志型集群、允许数据在刷盘过程中少量丢失可以改成异步 async。

另外一次 commit 操作是在 translog 达到某个阈值执行的,可以把大小(flush_threshold_size )调大,刷新间隔调大。

index.translog.durability : async
index.translog.flush_threshold_size : 1gb
index.translog.sync_interval : 30s

效果:

  • Flink 反压从打满 100% 降到 40%(output buffer usage):

14.jpg

kafka 消费组里的积压明显减少:

15.jpg

总结

当 ES 写入性能遇到瓶颈时,我总结的思路应该是这样:

  • 看日志,是否有字段类型不匹配,是否有脏数据。
  • 看 CPU 使用情况,集群是否异构
  • 客户端是怎样的配置?使用的 bulk 还是单条插入
  • 查看线程堆栈,查看耗时最久的方法调用
  • 确定集群类型:ToB 还是 ToC,是否允许有少量数据丢失?
  • 针对 ToB 等实时性不高的集群减少副本增加刷新时间
  • index buffer 优化 translog 优化,滚动重启集群

▼ Apache Flink 社区推荐 ▼

Apache Flink 及大数据领域顶级盛会 Flink Forward Asia 2019 大会议程重磅发布,参与问卷调研就有机会免费获取门票!

https://developer.aliyun.com/special/ffa2019

相关文章
|
1月前
|
运维 数据处理 Apache
数据实时计算产品对比测评报告:阿里云实时计算Flink版
数据实时计算产品对比测评报告:阿里云实时计算Flink版
|
1月前
|
存储 缓存 监控
深入解析:Elasticsearch集群性能调优策略与最佳实践
【10月更文挑战第8天】Elasticsearch 是一个分布式的、基于 RESTful 风格的搜索和数据分析引擎,它能够快速地存储、搜索和分析大量数据。随着企业对实时数据处理需求的增长,Elasticsearch 被广泛应用于日志分析、全文搜索、安全信息和事件管理(SIEM)等领域。然而,为了确保 Elasticsearch 集群能够高效运行并满足业务需求,需要进行一系列的性能调优工作。
85 3
|
1月前
|
Web App开发 JavaScript Java
elasticsearch学习五:springboot整合 rest 操作elasticsearch的 实际案例操作,编写搜索的前后端,爬取京东数据到elasticsearch中。
这篇文章是关于如何使用Spring Boot整合Elasticsearch,并通过REST客户端操作Elasticsearch,实现一个简单的搜索前后端,以及如何爬取京东数据到Elasticsearch的案例教程。
177 0
elasticsearch学习五:springboot整合 rest 操作elasticsearch的 实际案例操作,编写搜索的前后端,爬取京东数据到elasticsearch中。
|
1月前
|
分布式计算 监控 大数据
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
57 1
|
1月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
47 1
|
1月前
|
存储 运维 监控
实时计算Flink版在稳定性、性能、开发运维、安全能力等等跟其他引擎及自建Flink集群比较。
实时计算Flink版在稳定性、性能、开发运维和安全能力等方面表现出色。其自研的高性能状态存储引擎GeminiStateBackend显著提升了作业稳定性,状态管理优化使性能提升40%以上。核心性能较开源Flink提升2-3倍,资源利用率提高100%。提供一站式开发管理、自动化运维和丰富的监控告警功能,支持多语言开发和智能调优。安全方面,具备访问控制、高可用保障和全链路容错能力,确保企业级应用的安全与稳定。
41 0
|
1月前
|
SQL 分布式计算 大数据
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(一)
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(一)
46 0
|
1月前
|
大数据 流计算
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(二)
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(二)
46 0
|
11天前
|
存储 安全 数据管理
如何在 Rocky Linux 8 上安装和配置 Elasticsearch
本文详细介绍了在 Rocky Linux 8 上安装和配置 Elasticsearch 的步骤,包括添加仓库、安装 Elasticsearch、配置文件修改、设置内存和文件描述符、启动和验证 Elasticsearch,以及常见问题的解决方法。通过这些步骤,你可以快速搭建起这个强大的分布式搜索和分析引擎。
29 5
|
1月前
|
存储 JSON Java
elasticsearch学习一:了解 ES,版本之间的对应。安装elasticsearch,kibana,head插件、elasticsearch-ik分词器。
这篇文章是关于Elasticsearch的学习指南,包括了解Elasticsearch、版本对应、安装运行Elasticsearch和Kibana、安装head插件和elasticsearch-ik分词器的步骤。
125 0
elasticsearch学习一:了解 ES,版本之间的对应。安装elasticsearch,kibana,head插件、elasticsearch-ik分词器。

相关产品

  • 实时计算 Flink版
  • 下一篇
    无影云桌面