1 Kafka数据写入至ES方式
- Kafka->logstash->elasticsearch->kibana(简单,只需启动一个代理程序)
根据以上情况,项目决定采用方案一将Kafka中的数据存入到elasticsearch中去。
项目拓扑图如下所示:
此时消息的整体流向为:日志/消息整体流向Flume => kafka => logstash => elasticsearch => kibana
2 环境搭建
Logstash同步MySql数据到Elasticsearch
input { kafka { bootstrap_servers => "192.168.1.252:9092" #kafka服务器地址 topics => "252nginx-accesslog" batch_size => 5 codec => "json" #写入的时候使用json编码,因为logstash收集后会转换成json格式 group_id => "252nginx-access-log" consumer_threads => 1 decorate_events => true } kafka { bootstrap_servers => "192.168.1.252:9092" topics => "system-log-252" consumer_threads => 1 decorate_events => true codec => "json" } } output { if [type] == "252nginx-accesslo" { elasticsearch { hosts => ["192.168.1.252:9200"] index => "252nginx-accesslog-%{+YYYY.MM.dd}"}} if [type] == "system-log-252" { elasticsearch { hosts => ["192.168.1.252:9200"] index => "system-log-1512-%{+YYYY.MM.dd}" } }