部署ELK+Kafka+Filebeat日志收集分析系统

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: ELK+Kafka+Filebeat日志系统文章目录ELK+Kafka+Filebeat日志系统1.环境规划2.部署elasticsearch集群2.1.配置es-1节点2.2.配置es-2节点2.3.配置es-3节点2.4.使用es-head插件查看集群状态3.部署kibana4.部署zookeeper4.1.配置zookeeper-1节点4.2.配置zookeeper-2节点4.3.配置zookeeper-3节点4.4.启动所有节点5.部署kafka5.1.配置kafka-1节点5.2.配置kafka-2节点5.3.配置kafka-3节点

1.环境规划

image.png

日志系统架构图

nginx—>filebeat—>kafka—>logstash—>elasticsearch—>kibana

2.部署elasticsearch集群

2.1.配置es-1节点

1.下载elasticsearch7.6
[root@elk-1 ~]# wget https://mirrors.huaweicloud.com/elasticsearch/7.6.0/elasticsearch-7.6.0-x86_64.rpm
[root@elk-1 ~/soft]# rpm -ivh elasticsearch-7.6.0-x86_64.rpm 
2.编辑配置文件,配置集群模式
[root@elk-1 ~]#  vim /etc/elasticsearch/elasticsearch.yml
cluster.name: elk-application
node.name: elk-1
path.data: /data/elasticsearch
path.logs: /var/log/elasticsearch
bootstrap.memory_lock: true
network.host: 192.168.81.210,127.0.0.1
http.port: 9200
cluster.initial_master_nodes: ["elk-1"]
discovery.zen.ping.unicast.hosts: ["192.168.81.210","192.168.81.220","192.168.81.230"]
discovery.zen.fd.ping_timeout: 120s
discovery.zen.fd.ping_retries: 6
discovery.zen.fd.ping_interval: 30s
http.cors.enabled: true
http.cors.allow-origin: "*"
3.创建数据目录
[root@elk-1 ~]# mkdir /data/elasticsearch/ -p
[root@elk-1 ~]# chown -R elasticsearch.elasticsearch /data/elasticsearch/
4.配置内存锁定
[root@elk-1 ~]# mkdir /etc/systemd/system/elasticsearch.service.d/
[root@elk-1 ~]# vim /etc/systemd/system/elasticsearch.service.d/override.conf
[Service]
LimitMEMLOCK=infinity
5.启动elasticsearch
[root@elk-1 ~]# systemctl daemon-reload 
[root@elk-1 ~]# systemctl start elasticsearch
[root@elk-1 ~]# systemctl enable elasticsearch

2.2.配置es-2节点

只是配置文件中node.name和network.host不同,其他操作方式一致

[root@elk-2 ~]#  vim /etc/elasticsearch/elasticsearch.yml
cluster.name: elk-application
node.name: elk-2
path.data: /data/elasticsearch
path.logs: /var/log/elasticsearch
bootstrap.memory_lock: true
network.host: 192.168.81.220,127.0.0.1
http.port: 9200
cluster.initial_master_nodes: ["elk-1"]
discovery.zen.ping.unicast.hosts: ["192.168.81.210","192.168.81.220","192.168.81.230"]
discovery.zen.fd.ping_timeout: 120s
discovery.zen.fd.ping_retries: 6
discovery.zen.fd.ping_interval: 30s
http.cors.enabled: true
http.cors.allow-origin: "*"

2.3.配置es-3节点

只是配置文件中node.name和network.host不同,其他操作方式一致

[root@elk-2 ~]#  vim /etc/elasticsearch/elasticsearch.yml
cluster.name: elk-application
node.name: elk-3
path.data: /data/elasticsearch
path.logs: /var/log/elasticsearch
bootstrap.memory_lock: true
network.host: 192.168.81.230,127.0.0.1
http.port: 9200
cluster.initial_master_nodes: ["elk-1"]
discovery.zen.ping.unicast.hosts: ["192.168.81.210","192.168.81.220","192.168.81.230"]
discovery.zen.fd.ping_timeout: 120s
discovery.zen.fd.ping_retries: 6
discovery.zen.fd.ping_interval: 30s
http.cors.enabled: true
http.cors.allow-origin: "*"

2.4.使用es-head插件查看集群状态

3.部署kibana

1.下载kibana rpm包
[root@elk-1 ~]#  rpm -ivh kibana-7.6.0-x86_64.rpm 
2.配置kibana
[root@elk-1 ~]# vim /etc/kibana/kibana.yml
server.port: 5601                 
server.host: "192.168.81.210"               
server.name: "elk-application"                        
elasticsearch.hosts: ["http://192.168.81.210:9200"]       
i18n.locale: "zh-CN"
[root@elk-1 ~]# systemctl restart kibana
[root@elk-1 ~]#  systemctl enable elasticsearch

kibana部署成功

4.部署zookeeper

4.1.配置zookeeper-1节点

1.下载软件
[root@elk-1 ~]# wget http://archive.apache.org/dist/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz
2.解压并移动zookeeper
[root@elk-1 ~]# tar xf soft/zookeeper-3.4.13.tar.gz -C /data/
[root@elk-1 ~]# mv /data/zookeeper-3.4.13/ /data/zookeeper
3.创建数据目录和日志目录
[root@elk-1 ~]# mkdir /data/zookeeper/{data,logs}
4.准备配置文件
[root@elk-1 ~]# cd /data/zookeeper/conf
[root@elk-1 /data/zookeeper/conf]# cp zoo_sample.cfg zoo.cfg
[root@elk-1 /data/zookeeper/conf]# vim zoo.cfg 
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper/data
DataLogDir=/data/zookeeper/logs
clientPort=2181
server.1=192.168.81.210:2888:3888
server.2=192.168.81.220:2888:3888
server.3=192.168.81.230:2888:3888
5.生成节点id文件
#节点id只能保护数字
[root@elk-1 /data/zookeeper]# echo 1 > /data/zookeeper/data/myid

4.2.配置zookeeper-2节点

与zookeeper-1节点只有配置文件和节点id文件有点不同,其余全一样

[root@elk-2 /data/zookeeper/conf]# cat zoo.cfg 
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper/data
DataLogDir=/data/zookeeper/logs
clientPort=2181
server.1=192.168.81.210:2888:3888
server.2=192.168.81.220:2888:3888
server.3=192.168.81.230:2888:3888
[root@elk-2 /data/zookeeper/conf]# echo 2 > /data/zookeeper/data/myid

4.3.配置zookeeper-3节点

[root@elk-3 /data/zookeeper/conf]# cat zoo.cfg 
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper/data
DataLogDir=/data/zookeeper/logs
clientPort=2181
server.1=192.168.81.210:2888:3888
server.2=192.168.81.220:2888:3888
server.3=192.168.81.230:2888:3888
[root@elk-3 /data/zookeeper/conf]# echo 3 > /data/zookeeper/data/myid

4.4.启动所有节点

zookeeper集群必须保证有两个节点存活,也就是说必须同时要启动两个节点,否则集群将启动不成功,因此要都修改好配置文件后,再统一启动

[root@elk-1 /data/zookeeper]# ./bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /data/zookeeper/bin/../conf/zoo.cfg
Mode: follower
[root@elk-2 /data/zookeeper]# ./bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /data/zookeeper/bin/../conf/zoo.cfg
Mode: follower
[root@elk-3 /data/zookeeper]# ./bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /data/zookeeper/bin/../conf/zoo.cfg
Mode: leader

5.部署kafka

注意:不要使用kafka2.11版本,有严重的bug,filebeat无法写入数据到kafka集群,写入的协议版本不同,存在问题

5.1.配置kafka-1节点

1.下载二进制包
[root@elk-1 ~]# wget https://archive.apache.org/dist/kafka/2.0.0/kafka_2.11-2.0.0.tgz
2.安装kafka
[root@elk-1 ~/soft]# tar xf kafka_2.13-2.4.0.tgz -C /data/
[root@elk-1 ~]# mv /data/kafka_2.13-2.4.0 /data/kafka
3.修改配置文件
[root@elk-1 ~]# cd /data/kafka
[root@elk-1 /data/kafka]# vim config/server.properties 
broker.id=1
listeners=PLAINTEXT://192.168.81.210:9092
host.name=192.168.81.210
advertised.listeners=PLAINTEXT://192.168.81.210:9092
advertised.host.name=192.168.81.210
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/data
num.partitions=3
delete.topic.enable=true
auto.create.topics.enable=true 
replica.fetch.max.bytes=5242880
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
message.max.byte=5242880
log.cleaner.enable=true
log.retention.hours=48
log.segment.bytes=1073741824
log.retention.check.interval.ms=15000
zookeeper.connect=192.168.81.210:2181,192.168.81.220:2181,192.168.81.230:2181
zookeeper.connection.timeout.ms=60000
group.initial.rebalance.delay.ms=0
4.创建数据目录
[root@elk-3 ~]# mkdir /data/kafka/data

5.2.配置kafka-2节点

只是配置文件不同,其余与kafka-1节点操作一致

配置文件需要改的地方:broker.id改成2,表示第二个节点 listeners host.name advertised.listeners advertised.host.name改成本机ip地址

[root@elk-2 /data/kafka]# cat config/server.properties 
broker.id=2
listeners=PLAINTEXT://192.168.81.220:9092
host.name=192.168.81.220
advertised.listeners=PLAINTEXT://192.168.81.220:9092
advertised.host.name=192.168.81.220
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/data
num.partitions=3
delete.topic.enable=true
auto.create.topics.enable=true 
replica.fetch.max.bytes=5242880
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
message.max.byte=5242880
log.cleaner.enable=true
log.retention.hours=48
log.segment.bytes=1073741824
log.retention.check.interval.ms=15000
zookeeper.connect=192.168.81.210:2181,192.168.81.220:2181,192.168.81.230:2181
zookeeper.connection.timeout.ms=60000
group.initial.rebalance.delay.ms=0

5.3.配置kafka-3节点

只是配置文件不同,其余与kafka-1节点操作一致

配置文件需要改的地方:broker.id改成3,表示第三个节点 listeners host.name advertised.listeners advertised.host.name改成本机ip地址

[root@elk-3 /data/kafka]# cat config/server.properties 
broker.id=3
listeners=PLAINTEXT://192.168.81.230:9092
host.name=192.168.81.230
advertised.listeners=PLAINTEXT://192.168.81.230:9092
advertised.host.name=192.168.81.230
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/data
num.partitions=3
delete.topic.enable=true
auto.create.topics.enable=true 
replica.fetch.max.bytes=5242880
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
message.max.byte=5242880
log.cleaner.enable=true
log.retention.hours=48
log.segment.bytes=1073741824
log.retention.check.interval.ms=15000
zookeeper.connect=192.168.81.210:2181,192.168.81.220:2181,192.168.81.230:2181
zookeeper.connection.timeout.ms=60000
group.initial.rebalance.delay.ms=0

5.4.启动kafka

[root@elk-1 ~]# /data/kafka/bin/kafka-server-start -daemon /data/kafka/config/server.properties
[root@elk-2 ~]# /data/kafka/bin/kafka-server-start -daemon /data/kafka/config/server.properties
[root@elk-3 ~]# /data/kafka/bin/kafka-server-start -daemon /data/kafka/config/server.properties

6.测试kafka与zookeeper连接

kafka能够产生数据并消费,整个集群就可以使用了

1.创建一个topic
[root@elk-1 /data/kafka]# ./bin/kafka-topics.sh --create --zookeeper 192.168.81.210:2181,192.168.81.220:2181,192.168.81.230:2181 --replication-factor 1 --partitions 1 --topic testpic
Created topic "testpic".
2.查看topic
[root@elk-1 /data/kafka]# ./bin/kafka-topics.sh --list --zookeeper 192.168.81.210:2181,192.168.81.220:2181,192.168.81.230:2181
testpic
3.查看topic的描述信息
[root@elk-1 /data/kafka]# ./bin/kafka-topics.sh --describe --zookeeper 192.168.81.210:2181,192.168.81.220:2181,192.168.81.230:2181 --topic testpic
4.使用kafka-console-producer控制台生产数据
[root@elk-1 /data/kafka]# ./bin/kafka-console-producer.sh --broker-list 192.168.81.210:9092,192.168.81.220:9092,192.168.81.230:9092 --topic testpic
>test1
>test2
>test3
>test4
>test5
>test6
>test7
>test8
>test9
>test10
5.使用kafka-console-consumer控制台消费数据
[root@elk-1 /data/kafka]# ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.81.210:9092,192.168.81.220:9092,192.168.81.230:9092 --topic testpic --from-beginning
test1
test2
test3
test4
test5
test6
test7
test8
test9
test10
#删除一个topic
[root@elk-1 /data/kafka]# ./bin/kafka-topics.sh --delete --zookeeper 192.168.81.210:2181  --topic testpic

7.配置filebeat收集nginx、tomcat日志并存储到kafka中

7.1.安装并配置nginx服务

1.安装nginx
[root@elk-3 ~]# yum -y install nginx
2.配置nginx日志格式
[root@elk-3 ~]# vim /etc/nginx/nginx.conf
http {
··············
    log_format  main '{"时间":"$time_iso8601",'
                       '"客户端外网地址":"$http_x_forwarded_for",'
                       '"客户端内网地址":"$remote_addr",'
                       '"状态码":$status,'
                       '"传输流量":$body_bytes_sent,'
                       '"跳转来源":"$http_referer",'
                       '"URL":"$request",'
                       '"浏览器":"$http_user_agent",'
                       '"请求响应时间":$request_time,'
                       '"后端地址":"$upstream_addr"}';
    access_log  /var/log/nginx/access.log  main;
··············
}
2.启动nginx
[root@elk-3 ~]# systemctl start nginx
[root@elk-3 ~]# systemctl enable nginx
4.访问产生日志查看效果
[root@elk-3 ~]# curl 127.0.0.1
[root@elk-3 ~]# tail /var/log/nginx/access.log 
{"时间":"2021-07-12T11:29:33+08:00","客户端外网地址":"-","客户端内网地址":"127.0.0.1","状态码":200,"传输流量":4833,"跳转来源":"-","URL":"GET / HTTP/1.1","浏览器":"curl/7.29.0","请求响应时间":0.000,"后端地址":"-"}

7.2.安装tomcat服务

[root@elk-3 ~]# tar xf apache-tomcat-8.5.12.tar.gz -C /data/
[root@elk-3 ~]# mv /data/apache-tomcat-8.5.12/ /data/tomcat
[root@elk-3 ~]# /data/tomcat/bin/startup.sh 
Using CATALINA_BASE:   /data/tomcat
Using CATALINA_HOME:   /data/tomcat
Using CATALINA_TMPDIR: /data/tomcat/temp
Using JRE_HOME:        /usr
Using CLASSPATH:       /data/tomcat/bin/bootstrap.jar:/data/tomcat/bin/tomcat-juli.jar
Tomcat started.

7.3.安装filebeat服务

[root@elk-3 ~]# rpm -ivh filebeat-7.6.0-x86_64.rpm ```

7.4.配置filebeat收集应用日志并存储到kafka

1.配置filebeat
[root@elk-3 ~]# vim /etc/filebeat/filebeat.yml 
filebeat.inputs:
- type: log                                      #类型为log
  enabled: true
  paths:                                        #指定日志所在的路径
    - /var/log/nginx/access.log
  json.keys_under_root: true                    #支持json格式的日志输出
  json.overwriite_keys: true
  fields:                                       #在日志中增加一个字段,字段为log_topic,值为nginx_access,logstash根据带有这个字段的日志存储到指定的es索引库
    log_topic: nginx-access
  tail_files: true                              #开启日志监控,从日志的最后一行开始收集
- type: log
  enabled: true
  paths:
    - /data/tomcat/logs/catalina.out
  multiline.pattern: '^20'                      #收集tomcat错误日志,从第一个20到下一个20之间的日志整合在一行中显示
  multiline.negate: true
  multiline.match: after
  fields:
    log_topic: tomcat-cata
  tail_files: true
output.kafka:                                   #输出到kafka系统
  enabled: true
  hosts: ["192.168.81.210:9092","192.168.81.220:9092","192.168.81.230:9092"]                           #kafka的地址
  topic: '%{[fields][log_topic]}'               #指定将日志存储到kafka集群的哪个topic中,这里的topic值是引用在inputs中定义的fields,通过这种方式可以将不同路径的日志分别存储到不同的topic中
  partition.round_robin:
    reachable_only: false
  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000
2.启动filebeat
[root@elk-3 ~]# systemctl start filebeat
[root@elk-3 ~]# systemctl enable filebeat

7.5.产生程序日志数据观察数据是否存储kafka

1.产生程序日志

1.产生nginx日志
[root@elk-3 ~]# ab -n 1000 -c 100 http://127.0.0.1/index.html
2.产生tomcat日志
[root@elk-3 ~]# /data/tomcat/bin/shutdown.sh
[root@elk-3 ~]# /data/tomcat/bin/startup.sh 

2.观察kafka中是否创建对应的topic

[root@elk-1 /data/kafka]# ./bin/kafka-topics.sh --list --zookeeper 192.168.81.210:2181,192.168.81.220:2181,192.168.81.230:2181
__consumer_offsets
nginx-access
testpic
tomcat-cata
#nginx-access以及tomcat-cata的topic已经创建成功

3.观察kafka日志的输出

[root@elk-1 /data/kafka]# tail -f logs/kafkaServer.out

8.配置logstash从kafka中读取数据并存储到es集群

部署logstash,配置logstash从kafka中读取topic数据并存储到es集群

8.1.部署logstash服务

1.安装logstash
[root@elk-3 ~]# rpm -ivh logstash-7.6.0.rpm

8.2.配置logstash从kafka读取数据存储到es集群

[root@elk-3 ~]# cat /etc/logstash/conf.d/in_kafka_to_es.conf 
#从kafka中读取日志数据
input {       #数据源端
  kafka {       #类型为kafka
    bootstrap_servers => ["192.168.81.210:9092,192.168.81.220:9092,192.168.81.230:9092"]      #kafka集群地址
    topics => ["nginx-access","tomcat-cata"]      #要读取那些kafka topics
    codec => "json"                   #处理json格式的数据
    auto_offset_reset => "latest"           #只消费最新的kafka数据
  }
}
#处理数据,去掉没用的字段
filter {
  if[fields][log_topic] == "nginx-access" {     #如果log_topic字段为nginx-access则进行以下数据处理
      json {          #json格式数据处理
           source => "message"      #source等于message的
           remove_field => ["@version","path","beat","input","log","offset","prospector","source","tags"]     #删除指定的字段
      }
      mutate {      #修改数据
           remove_field => ["_index","_id","_type","_version","_score","referer","agent"]     #删除没用的字段
      }
  }
  if[fields][log_topic] == "tomcat-cata" {    #如果log_topic字段为tomcat-cata
      grok {        #解析格式
     match => {
          "message" => "(?<时间>20[0-9]{2}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}) \[(?<线程名称>[^\s]{0,})\] (?<日志等级>\w+) (?<类名称>[^\s]{0,}) (?<日志详情>[\W\w]+)"      #将message的值增加上一些格式
     }
      }
      mutate {      #修改数据
                 remove_field => ["_index","_id","_type","_version","_score","referer","agent"]     #删除没用的字段
            }  
  }
}
#数据处理后存储es集群
output {        #目标端
  if[fields][log_topic] == "nginx-access" {     #如果log_topic的字段值为nginx-access就存到下面的es集群里
      elasticsearch {           
    action => "index"     #类型为索引
    hosts => ["192.168.81.210:9200","192.168.81.220:9200","192.168.81.230:9200"]    #es集群地址
    index => "nginx-access-%{+YYYY.MM.dd}"      #存储到es集群的哪个索引里
    codec => "json"           #处理json格式的解析
      } 
  }
  if[fields][log_topic] == "tomcat-cata" {        #如果log_topic的字段值为tomcat-cata就存到下面的es集群里
      elasticsearch {
    action => "index"       #类型为索引
    hosts => ["192.168.81.210:9200","192.168.81.220:9200","192.168.81.230:9200"]      #es集群地址
    index => "tomcat-cata-%{+YYYY.MM.dd}"     #存储到es集群的哪个索引里
    codec => "json"           #处理json格式的解析
      } 
  }     
}

8.3.启动logstash并观察日志

[root@elk-3 ~]# nphup /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/in_kafka_to_es.conf &

观察日志的输出,已经从nginx-access、tomcat-cata topic中读取了数据并存到了es集群中

8.4.查看elasticsearch集群是否增加了对应的索引库

es集群已经生成了tomcat-cata以及nginx-access索引库

到此为止logstash已经成功从kafka集群读取到日志数据,然后传入到elasticsearch集群不同的索引库

9.在kibana上关联elasticsearch索引库浏览日志数据

9.1.在kibana上添加nginx-access索引模式

1)点击创建索引

2)填写索引名

采用通配符的方式,填写完点击下一步完成创建即可

3)添加一个时间筛选字段

4)创建成功

9.2.同样方法添加tomcat-cata索引模式

9.3.查询nginx-access索引日志数据

9.4.查看tomcat-cata索引日志数据

10.报错合集

10.1.es启动时报错无法指定被请求的地址

报错内容如下

解决方法:仔细检查配置文件,肯定是某个地址配置错了,我的就是监听地址的ip写错了

10.2.filebeat写入数据到kafka api版本报错

报错如下:

分析解决思路:初步判定为kafka2.11版本问题导致的,换成2.13问题解决

目录
相关文章
|
7天前
|
存储 运维 监控
Linux--深入理与解linux文件系统与日志文件分析
深入理解 Linux 文件系统和日志文件分析,对于系统管理员和运维工程师来说至关重要。文件系统管理涉及到文件的组织、存储和检索,而日志文件则记录了系统和应用的运行状态,是排查故障和维护系统的重要依据。通过掌握文件系统和日志文件的管理和分析技能,可以有效提升系统的稳定性和安全性。
26 7
|
10天前
|
监控 安全 Linux
启用Linux防火墙日志记录和分析功能
为iptables启用日志记录对于监控进出流量至关重要
|
1月前
|
监控 应用服务中间件 定位技术
要统计Nginx的客户端IP,可以通过分析Nginx的访问日志文件来实现
要统计Nginx的客户端IP,可以通过分析Nginx的访问日志文件来实现
|
2月前
|
存储 SQL 监控
|
2月前
|
运维 监控 安全
|
3月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
111 1
|
3月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
63 1
|
5月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
364 9
|
5月前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
86 3
|
5月前
|
vr&ar 图形学 开发者
步入未来科技前沿:全方位解读Unity在VR/AR开发中的应用技巧,带你轻松打造震撼人心的沉浸式虚拟现实与增强现实体验——附详细示例代码与实战指南
【8月更文挑战第31天】虚拟现实(VR)和增强现实(AR)技术正深刻改变生活,从教育、娱乐到医疗、工业,应用广泛。Unity作为强大的游戏开发引擎,适用于构建高质量的VR/AR应用,支持Oculus Rift、HTC Vive、Microsoft HoloLens、ARKit和ARCore等平台。本文将介绍如何使用Unity创建沉浸式虚拟体验,包括设置项目、添加相机、处理用户输入等,并通过具体示例代码展示实现过程。无论是完全沉浸式的VR体验,还是将数字内容叠加到现实世界的AR应用,Unity均提供了所需的一切工具。
188 0