现状:
采用filebeat→logstash→elasticsearch的流程进行业务日志采集,elk版本为6.3.2。
生产系统采用Log4j作为日志系统。
filebeat负责采集log4j输出的业务日志并部署多个节点,单节点单个日志文件test.log大小设置50Mb,日志输出量巨大,几十秒甚至几秒钟就能把50MB打满,然后日志文件被重命名为test.log.1.....test.log.N。由于test.log文件在重命名时filebeat尚未把test.log中的日志全部输送出去(因为此时filebeat data目录中的register文件中还存在对重命名后的文件的索引),导致test.log.1的句柄仍未释放。由此导致当日志量巨大的时候,filebeat来不及处理就会存在大量test.log.N的句柄不释放,而不释放的句柄又会占用大量内存,直至操作系统内存被耗尽。
分析:
业务日志量巨大,下游logstash无法满足吞吐要求,会对上游的filebeat产生反压,反压的后果是导致filebeat收集性能急剧降低,register文件中积压大量重命名之后的log文件。
解决思路:
1、添加Kafka做日志缓冲
2、横向扩充logstash节点
部署Kafka:找了5台配置为CPU16核内存16GB硬盘77GB的机器做Kafka集群(具体部署方式网上一大堆),然后将filebeat的output指向Kafka具体请参考官往https://www.elastic.co/guide/en/beats/filebeat/current/kafka-output.html
部署完成,创建日志topic并将partitions设置为30,重启启动filebeat,运行一段时间之后register文件中的积压现象不复存在,但是观察Kafka中的订阅情况,发现订阅延迟迅速增加,初步判断为logstash吞吐不够。
扩充logstash节点:修改logstash的input为Kafka,具体修改方式请参考官网https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html
迅速扩充为3个节点,每个节点开10个线程订阅Kafka中的日志topic。重启logstash,发现Kafka中的延迟现象并未得到缓解,此时怀疑是Kafka分区太少,并发数不够,随即将partitions由30添加至300,并调整每台logstash的线程数为100,观察一段时间之后发现延迟仍未得到缓解。增一度怀疑是logstash性能太差,期间曾出现过几次elasticsearch节点发生OOM而宕机,此时已接近崩溃。后来开始注意到logstash中输出大量的如下INFO日志:
意思是发送给elasticsearch的请求被拒绝。此时观察elasticsearch,发现存在大量的error:
2018-11-29T16:39:25,152[o.e.a.b.TransportBulkAction] [data-node3] failed to execute pipeline for a bulk request
org.elasticsearch.common.util.concurrent.EsRejectedExecutionException: rejected execution of org.elasticsearch.ingest.PipelineExecutionService$1@1e5ccc24 on EsThreadPoolExecutor[name = data-node3/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@c58bbfc[Running, pool size = 80, active threads = 80, queued tasks = 200, completed tasks = 18742]]at org.elasticsearch.common.util.concurrent.EsAbortPolicy.rejectedExecution(EsAbortPolicy.java:48) ~[elasticsearch-6.3.2.jar:6.3.2] at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) ~[?:1.8.0_121] at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) ~[?:1.8.0_121] at org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor.doExecute(EsThreadPoolExecutor.java:98) ~[elasticsearch-6.3.2.jar:6.3.2] at org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor.execute(EsThreadPoolExecutor.java:93) ~[elasticsearch-6.3.2.jar:6.3.2] at org.elasticsearch.ingest.PipelineExecutionService.executeBulkRequest(PipelineExecutionService.java:59) ~[elasticsearch-6.3.2.jar:6.3.2] at org.elasticsearch.action.bulk.TransportBulkAction.processBulkIndexIngestRequest(TransportBulkAction.java:495) ~[elasticsearch-6.3.2.jar:6.3.2] at org.elasticsearch.action.bulk.TransportBulkAction.doExecute(TransportBulkAction.java:134) ~[elasticsearch-6.3.2.jar:6.3.2] at org.elasticsearch.action.bulk.TransportBulkAction.doExecute(TransportBulkAction.java:85) ~[elasticsearch-6.3.2.jar:6.3.2] at org.elasticsearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:167) ~[elasticsearch-6.3.2.jar:6.3.2] at org.elasticsearch.xpack.security.action.filter.SecurityActionFilter.apply(SecurityActionFilter.java:128) ~[?:?] at org.elasticsearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:165) ~[elasticsearch-6.3.2.jar:6.3.2] at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:139) ~[elasticsearch-6.3.2.jar:6.3.2] at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:81) ~[elasticsearch-6.3.2.jar:6.3.2] at org.elasticsearch.client.node.NodeClient.executeLocally(NodeClient.java:87) ~[elasticsearch-6.3.2.jar:6.3.2] at org.elasticsearch.client.node.NodeClient.doExecute(NodeClient.java:76) ~[elasticsearch-6.3.2.jar:6.3.2] at org.elasticsearch.client.support.AbstractClient.execute(AbstractClient.java:405) ~[elasticsearch-6.3.2.jar:6.3.2] at org.elasticsearch.client.support.AbstractClient.bulk(AbstractClient.java:482) ~[elasticsearch-6.3.2.jar:6.3.2] at org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin(ClientHelper.java:62) ~[x-pack-core-6.3.2.jar:6.3.2] at org.elasticsearch.xpack.monitoring.exporter.local.LocalBulk.doFlush(LocalBulk.java:108) ~[?:?] at org.elasticsearch.xpack.monitoring.exporter.ExportBulk.flush(ExportBulk.java:60) ~[?:?] at org.elasticsearch.xpack.monitoring.exporter.ExportBulk$Compound.lambda$doFlush$1(ExportBulk.java:154) ~[?:?] at org.elasticsearch.xpack.core.common.IteratingActionListener.run(IteratingActionListener.java:81) [x-pack-core-6.3.2.jar:6.3.2] at org.elasticsearch.xpack.monitoring.exporter.ExportBulk$Compound.doFlush(ExportBulk.java:170) [x-pack-monitoring-6.3.2.jar:6.3.2] at org.elasticsearch.xpack.monitoring.exporter.ExportBulk.flushAndClose(ExportBulk.java:84) [x-pack-monitoring-6.3.2.jar:6.3.2] at org.elasticsearch.xpack.monitoring.exporter.ExportBulk.close(ExportBulk.java:74) [x-pack-monitoring-6.3.2.jar:6.3.2] at org.elasticsearch.xpack.monitoring.exporter.Exporters.export(Exporters.java:195) [x-pack-monitoring-6.3.2.jar:6.3.2] at org.elasticsearch.xpack.monitoring.MonitoringService$MonitoringExecution$1.doRun(MonitoringService.java:258) [x-pack-monitoring-6.3.2.jar:6.3.2] at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-6.3.2.jar:6.3.2] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_121] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_121] at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:626) [elasticsearch-6.3.2.jar:6.3.2] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_121] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_121] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
2018-11-29T16:39:25,161[o.e.x.m.MonitoringService] [data-node3] monitoring execution failed
org.elasticsearch.xpack.monitoring.exporter.ExportException: Exception when closing export bulk
at org.elasticsearch.xpack.monitoring.exporter.ExportBulk$1$1.<init>(ExportBulk.java:95) ~[?:?]
at org.elasticsearch.xpack.monitoring.exporter.ExportBulk$1.onFailure(ExportBulk.java:93) ~[?:?]
at org.elasticsearch.xpack.monitoring.exporter.ExportBulk$Compound$1.onResponse(ExportBulk.java:206) ~[?:?]
at org.elasticsearch.xpack.monitoring.exporter.ExportBulk$Compound$1.onResponse(ExportBulk.java:200) ~[?:?]
at org.elasticsearch.xpack.core.common.IteratingActionListener.onResponse(IteratingActionListener.java:96) ~[?:?]
at org.elasticsearch.xpack.monitoring.exporter.ExportBulk$Compound.lambda$doFlush$0(ExportBulk.java:164) ~[?:?]
at org.elasticsearch.action.ActionListener$1.onFailure(ActionListener.java:68) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.xpack.monitoring.exporter.local.LocalBulk.lambda$doFlush$1(LocalBulk.java:115) ~[?:?]
at org.elasticsearch.action.ActionListener$1.onFailure(ActionListener.java:68) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.action.support.ContextPreservingActionListener.onFailure(ContextPreservingActionListener.java:50) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.action.support.TransportAction$1.onFailure(TransportAction.java:91) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.action.bulk.TransportBulkAction.lambda$processBulkIndexIngestRequest$4(TransportBulkAction.java:502) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.ingest.PipelineExecutionService$1.onFailure(PipelineExecutionService.java:63) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.common.util.concurrent.AbstractRunnable.onRejection(AbstractRunnable.java:63) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.onRejection(ThreadContext.java:715) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor.doExecute(EsThreadPoolExecutor.java:104) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor.execute(EsThreadPoolExecutor.java:93) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.ingest.PipelineExecutionService.executeBulkRequest(PipelineExecutionService.java:59) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.action.bulk.TransportBulkAction.processBulkIndexIngestRequest(TransportBulkAction.java:495) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.action.bulk.TransportBulkAction.doExecute(TransportBulkAction.java:134) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.action.bulk.TransportBulkAction.doExecute(TransportBulkAction.java:85) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:167) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.xpack.security.action.filter.SecurityActionFilter.apply(SecurityActionFilter.java:128) ~[?:?]
at org.elasticsearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:165) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:139) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:81) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.client.node.NodeClient.executeLocally(NodeClient.java:87) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.client.node.NodeClient.doExecute(NodeClient.java:76) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.client.support.AbstractClient.execute(AbstractClient.java:405) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.client.support.AbstractClient.bulk(AbstractClient.java:482) ~[elasticsearch-6.3.2.jar:6.3.2]
at org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin(ClientHelper.java:62) ~[x-pack-core-6.3.2.jar:6.3.2]
at org.elasticsearch.xpack.monitoring.exporter.local.LocalBulk.doFlush(LocalBulk.java:108) ~[?:?]
at org.elasticsearch.xpack.monitoring.exporter.ExportBulk.flush(ExportBulk.java:60) ~[?:?]
at org.elasticsearch.xpack.monitoring.exporter.ExportBulk$Compound.lambda$doFlush$1(ExportBulk.java:154) ~[?:?]
at org.elasticsearch.xpack.core.common.IteratingActionListener.run(IteratingActionListener.java:81) [x-pack-core-6.3.2.jar:6.3.2]
at org.elasticsearch.xpack.monitoring.exporter.ExportBulk$Compound.doFlush(ExportBulk.java:170) [x-pack-monitoring-6.3.2.jar:6.3.2]
at org.elasticsearch.xpack.monitoring.exporter.ExportBulk.flushAndClose(ExportBulk.java:84) [x-pack-monitoring-6.3.2.jar:6.3.2]
at org.elasticsearch.xpack.monitoring.exporter.ExportBulk.close(ExportBulk.java:74) [x-pack-monitoring-6.3.2.jar:6.3.2]
at org.elasticsearch.xpack.monitoring.exporter.Exporters.export(Exporters.java:195) [x-pack-monitoring-6.3.2.jar:6.3.2]
at org.elasticsearch.xpack.monitoring.MonitoringService$MonitoringExecution$1.doRun(MonitoringService.java:258) [x-pack-monitoring-6.3.2.jar:6.3.2]
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-6.3.2.jar:6.3.2]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_121]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_121]
at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:626) [elasticsearch-6.3.2.jar:6.3.2]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_121]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_121]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
Caused by: org.elasticsearch.xpack.monitoring.exporter.ExportException: failed to flush export bulks
at org.elasticsearch.xpack.monitoring.exporter.ExportBulk$Compound.lambda$doFlush$0(ExportBulk.java:156) ~[?:?]
... 41 more
Caused by: org.elasticsearch.xpack.monitoring.exporter.ExportException: failed to flush export bulk [default_local]
... 40 more
Caused by: org.elasticsearch.common.util.concurrent.EsRejectedExecutionException: rejected execution of org.elasticsearch.ingest.PipelineExecutionService$1@1e5ccc24 on EsThreadPoolExecutor[name = data-node3/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@c58bbfc[Running, pool size = 80, active threads = 80, queued tasks = 200, completed tasks = 18742]]
at org.elasticsearch.common.util.concurrent.EsAbortPolicy.rejectedExecution(EsAbortPolicy.java:48) ~[elasticsearch-6.3.2.jar:6.3.2]
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) ~[?:1.8.0_121]
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) ~[?:1.8.0_121]
at org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor.doExecute(EsThreadPoolExecutor.java:98) ~[elasticsearch-6.3.2.jar:6.3.2]
... 31 more
意思也是在拒绝大量请求。才意识到原来瓶颈出现在elasticsearch端,elasticsearch无法响应大批量请求后,对logstash请求进行拒绝。
后来续持续对elasticsearch进行优化后,Kafka中的堆积延迟消费状况得到缓解。此时已经历了一周的时间。
具体elasticsearch是如何调优才缓解该问题的会在下一篇博客中进行记录。