带你读《Elastic Stack 实战手册》之79:——4.2.5.在Docker上使用Elastic Stack和 Kafka(上)

本文涉及的产品
Elasticsearch Serverless通用抵扣包,测试体验金 200元
简介: 带你读《Elastic Stack 实战手册》之79:——4.2.5.在Docker上使用Elastic Stack和 Kafka(上)

4.2.5.在Docker上使用Elastic Stack和 Kafka

创作人:刘晓国

 

你是否考虑分析和可视化地理数据? 为什么不尝试 Elastic Stack? 也就是所谓的  ELK

(Elasticsearch + Logstash + Kibana)或 Elatic Stack 不仅是 NoSQL数据库。 它是一个整体系统,可以实时存储,搜索,分析和可视化来自任何来源的数据。 在这种情况下,我们将使用有关华沙公共交通位置的开放数据。

 

本文中,我将介绍如何使用 Elastic Stack 和 Kafka 来监控公共交通的车辆。我们将使用

Docker 来部署所有需要的组件。下面是整个系统的框架图:

image.png

整个应用的框架如上:

 

· 汽车或公交的数据上传到一个数据平台。它提供 REST API 接口来被调用。

· Python 应用定时从 data portal 进行抓取数据,并同时发送到 Kafka

· Kafaka 的数据发送到 Logstash 进行加工,并导入到 Elasticsearch 中

· 在 Kibana 中对数据进行呈现

 安装 

Python

我们有一个应用是用 python 语言写的。你需要安装 python3 来运行该应用。


API key

为了测试这个应用,我们必须得到相应的华沙公共交通信息的 API key。

 

我们可以在地址https://api.um.warszawa.pl/# 进行申请。  

image.png

点击上面的 “登录” 链接,并进行脑力测试:

 

image.png

最终得到如上所示的 API key:86882ed9-4533-4630-b03b-47b3d68ae5e5。这个 key 将在一下的 python 应用中使用。

 

Elastic Stack 及 Kafka

 

你需要安装 Docker 来实现 Elastic Stack 及 Kafka 的安装。

 

本展示的所有的源码可以在地址https://github.com/liu-xiao-guo/wiadro-danych-kafka-to-es-ztm 进行下载。

 

Docker-compose 包含 Elasticsearch,Kibana,Zookeeper,Kafka,Logstash 和应用程序 Kafka Streams (由于一些原因,在本展示中将不被采用)。

 

docker-compose.yml


version: '3.3'
services:
    elasticsearch:
        image: docker.elastic.co/elasticsearch/elasticsearch:7.7.0
        restart: unless-stopped
        environment:
        - discovery.type=single-node
        - bootstrap.memory_lock=true
        - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
        ulimits:
            memlock:
                soft: -1
                hard: -1
        volumes:
        - esdata:/usr/share/elasticsearch/data
        ports:
        - 9200:9200
    kibana:
        image: docker.elastic.co/kibana/kibana:7.7.0
        restart: unless-stopped
        depends_on:
            - elasticsearch
        ports:
            - 5601:5601
        volumes:
            - kibanadata:/usr/share/kibana/data
             zookeeper:
        image: 'bitnami/zookeeper:3'
        ports:
            - '2181:2181'
        volumes:
            - 'zookeeper_data:/bitnami'
        environment:
            - ALLOW_ANONYMOUS_LOGIN=yes
    kafka:
        image: 'bitnami/kafka:2'
        ports:
            - '9092:9092'
            - '29092:29092'
        volumes:
            - 'kafka_data:/bitnami'
        environment:
            - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
            - ALLOW_PLAINTEXT_LISTENER=yes
            - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,PLAINTEXT_HOST://:29092
            - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
            - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
        depends_on:
            - zookeeper
    ztm_kafka_streams:
        image: "maciejszymczyk/ztm_stream:1.0"
        environment:
          - APPLICATION_ID_CONFIG=awesome_overrided_ztm_stream_app_id
          - BOOTSTRAP_SERVERS_CONFIG=kafka:9092
        depends_on:
          - kafka
    logstash:
        image: docker.elastic.co/logstash/logstash:7.7.0
        volumes:
             - "./pipeline:/usr/share/logstash/pipeline"
        environment:
            LS_JAVA_OPTS: "-Xmx256m -Xms256m"
        depends_on:
            - elasticsearch
            - kafka
            volumes:
    esdata:
        driver: local
    kibanadata:
        driver: local
    zookeeper_data:
        driver: local
    kafka_data:
        driver: local

 

我们在自己电脑的 console 中打入如下的命令:

docker-compose up

我们可以看到如下的画面:


image.png

从上面我们可以看出来 Logstash 已经被成功地启动。

 

我们在浏览器的地址栏中输入地址http://localhost:5601

 

image.png


我们可以看到 Kibana 已经成功启动,这也意味着 Elasticsearch 被成功地运行起来了。

 

配置及运行

 

Logstash

 

我们使用如下的 pipeline 来实现对数据的处理:

 

pipeline/kafka_to_es.conf


input {
   kafka {
        topics => "ztm-input"
        bootstrap_servers => "kafka:9092"
             codec => "json"
   }
}
filter {
    mutate {
        convert => {"Lat" => "float"}
        convert => {"Lon" => "float"}
        add_field => ["location", "%{Lat},%{Lon}"]
        remove_field => ["Lat", "Lon"]
    }
    }
output {
    stdout {
        codec => rubydebug
    }
    elasticsearch {
        hosts => ["elasticsearch:9200"]
        index => "ztm"
   }
}

它从 Kafaka 的 "ztm-input" topic 获取数据,并把相应的 Lat 及 Lon 字段合并成为一个

location 字段。在 output 的部分,我们把数据导入到 Elasticsearch 之中。

 

Elasticsearch

 

我们使用了索引生命周期管理机制, 而不是将记录放入诸如 ztm-2020.05.24 之类的索引中。它使你可以自动执行索引的寿命。 它会自动进行汇总,并根据你配置策略的方式更改索引属性(热-热-冷架构)。假设我希望在索引达到1GB或30天过去后进行 rollover,我们在 Kibana


中执行如下的命令:


PUT _ilm/policy/ztm_policy
{
  "policy": {
    "phases": {
      "hot":{
        "actions": {
          "rollover": {
            "max_size": "1gb",
            "max_age": "30d"
          }
        }
      }
    }
  }
}

你还需要一个模板,该模板具有 ztm_policy 将连接到的适当 mapping。 如果没有 mapping,Elasticsearch 将不会猜测到 location 字段为 geo_point 的数据类型,并且时间字段将是纯文本。

 

PUT _template/ztm_template
{
  "index_patterns": ["ztm-*"],
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 0,
    "index.lifecycle.name":"ztm_policy",
    "index.lifecycle.rollover_alias": "ztm"
  },
  "mappings": {
    "properties": {
      "@timestamp": {
        "type": "date"
      },
      "@version": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        }
      },
      "bearing": {
        "type": "float"
      },
      "brigade": {
        "type": "keyword"
      },
      "distance": {
        "type": "float"
      },
      "lines": {
        "type": "keyword"
      },
      "location": {
        "type": "geo_point"
      },
      "speed": {
        "type": "float"
      },
      "time": {
        "type": "date",
            },
      "vehicleNumber": {
        "type": "keyword"
      }
    }
  }
}

现在该使用适当的别名创建第一个索引了。

PUT ztm-000001
{
  "aliases": {
    "ztm": {
      "is_write_index":true
    }
  }
}

我们在 Kibana 中运行上面的三个命令。

 

Python 脚本

 

首先,我们必须获得所需要的 API key。这个在上面我们已经讲述了。

 

ztm.py


import requests 
import json
import time
from kafka import KafkaProducer
token = '86882ed9-4533-4630-b03b-47b3d68ae5e5'
url = 'https://api.um.warszawa.pl/api/action/busestrams_get/'
resource_id = 'f2e5503e927d-4ad3-9500-4ab9e55deb59'
sleep_time = 15
bus_params = {
    'apikey':token,
    'type':1,
    'resource_id': resource_id
    }
tram_params = {
    'apikey':token,
     'type':2,
    'resource_id': resource_id
    }
while True:
    try:
        r = requests.get(url = url, params = bus_params)
        data = r.json() 
        producer = KafkaProducer(bootstrap_servers=['localhost:29092'],
                                value_serializer=lambda x: json.dumps(x).encode('utf-8'),
                                key_serializer=lambda x: x
                                )
        print('Sending records...')
        for record in data['result']:
            print(record)
            future = producer.send('ztm-input', value=record, key=record["VehicleNumber"].encode('utf-8'))
            result = future.get(timeout=60)
except:
  print("¯\_(ツ)_/¯")
    time.sleep(sleep_time)

上面的代码其实是蛮简单的。它定时从 API portal 获取公交系统的位置信息,并转发到Kafka。

 

我们使用如下的命令来运行上面的应用:

python3 ztm.py

image.png

这个时候,我们可以在屏幕上看到所获得很多的关于公交系统车辆的信息。

 

我们可以转到运行 docker-compopse up 命令的那个 console,我们可以看到如下的信息:

 

image.png

它表明我们的 Logstash 是在正常工作。

 


 《Elastic Stack 实战手册》——四、应用实践——4.2 可观测性应用场景 ——4.2.5.在Docker上使用Elastic Stack和 Kafka(下) https://developer.aliyun.com/article/1225718

 

相关文章
|
5月前
|
消息中间件 数据可视化 Kafka
docker arm架构部署kafka要点
本内容介绍了基于 Docker 的容器化解决方案,包含以下部分: 1. **Docker 容器管理**:通过 Portainer 可视化管理工具实现对主节点和代理节点的统一管理。 2. **Kafka 可视化工具**:部署 Kafka-UI 以图形化方式监控和管理 Kafka 集群,支持动态配置功能, 3. **Kafka 安装与配置**:基于 Bitnami Kafka 镜像,提供完整的 Kafka 集群配置示例,涵盖 KRaft 模式、性能调优参数及数据持久化设置,适用于高可用生产环境。 以上方案适合 ARM64 架构,为用户提供了一站式的容器化管理和消息队列解决方案。
381 10
|
8月前
|
消息中间件 Kafka 流计算
docker环境安装kafka/Flink/clickhouse镜像
通过上述步骤和示例,您可以系统地了解如何使用Docker Compose安装和配置Kafka、Flink和ClickHouse,并进行基本的验证操作。希望这些内容对您的学习和工作有所帮助。
760 28
|
8月前
|
消息中间件 Kafka Docker
docker compose 安装 kafka
通过本文的步骤,您可以快速在本地使用 Docker Compose 安装并配置 Kafka 和 Zookeeper。Docker Compose 简化了多容器应用的管理,方便快速搭建和测试分布式系统。
1024 2
|
11月前
|
应用服务中间件 nginx Docker
Docker Swarm、Docker Stack和Portainer的使用
Docker Swarm、Docker Stack 和 Portainer 各有其独特的功能和优势。Docker Swarm 适用于分布式服务的管理和编排,Docker Stack 便于多容器应用的定义和部署,而 Portainer 提供了直观的 UI,简化了 Docker 环境的管理。结合使用这些工具,可以大大提高容器化应用的部署和管理效率。希望本文对您理解和应用这些工具有所帮助。
504 5
|
12月前
|
消息中间件 分布式计算 监控
大数据-78 Kafka 集群模式 集群的应用场景与Kafka集群的搭建 三台云服务器
大数据-78 Kafka 集群模式 集群的应用场景与Kafka集群的搭建 三台云服务器
261 6
|
12月前
|
消息中间件 NoSQL Kafka
大数据-52 Kafka 基础概念和基本架构 核心API介绍 应用场景等
大数据-52 Kafka 基础概念和基本架构 核心API介绍 应用场景等
176 5
|
12月前
|
消息中间件 NoSQL Kafka
Flink-10 Flink Java 3分钟上手 Docker容器化部署 JobManager TaskManager Kafka Redis Dockerfile docker-compose
Flink-10 Flink Java 3分钟上手 Docker容器化部署 JobManager TaskManager Kafka Redis Dockerfile docker-compose
262 4
|
消息中间件 存储 缓存
高性能、高可靠性!Kafka的技术优势与应用场景全解析
**Kafka** 是一款高吞吐、高性能的消息系统,擅长日志收集、消息传递和用户活动跟踪。其优点包括:零拷贝技术提高传输效率,顺序读写优化磁盘性能,持久化保障数据安全,分布式架构支持扩展,以及客户端状态维护确保可靠性。在实际应用中,Kafka常用于日志聚合、解耦生产者与消费者,以及实时用户行为分析。
465 3
|
消息中间件 Java Kafka
Docker 安装 kafka
Docker 安装 kafka
201 0
|
应用服务中间件 nginx Docker
docker service 与 docker stack
docker service 与 docker stack
285 0