【消息队列】一文搞定大数据消息队列Kafka1

本文涉及的产品
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: 【消息队列】一文搞定大数据消息队列Kafka

1.JMS+AMQP核心知识

1.1.什么是MQ中间件

  • 全称MessageQueue,主要是用于程序和程序之间的通信,异步+解耦

1.2.使用场景

  • 核心应用:
  • 解耦:订单系统->物流系统
  • 异步:用户注册->发送右键,初始化信息
  • 削峰:秒杀、日志处理
  • 跨平台、多语言
  • 分布式事务、最终一致性
  • RPC调用上下游对接,数据源变动->通知下属

1.3.JMS消息服务和和常见核心概念

(1)什么是JMS

  • java消息服务(java message service) ,Java平台中关于面向消息中间件的接口

(2)特性

  • 面向java平台的标准消息传递API
  • 在Java或JVM语言比如Scala、Groovy中具有互用性
  • 无需担心底层协议
  • 有queues和topics两种消息传递模型
  • 支持事务、能够定义消息格式(消息头、属性和内容)
  • (3)常见概念
  • JMS提供者:连接面向消息中间件的,JMS接口的一个实现,RocketMQ,ActiveMQ,Kafka等等
  • JMS生产者:生产消息的服务
  • JMS消费者:消费消息的服务

2.分布式流处平台Kafka核心概念

2.1.Kafka核心概念

Broker

kafka的服务端程序,可以认为一个mq节点就是一个broker,broker存储topic的数据

Producer生产者

创建消息Message,然后发布到MQ中,该角色将消息发布到Kafka的topic中

Consumer消费者

消费队列里的消息

ConsumerGroup消费者组

同个topic,广播发送给不同的group,一个group中只能又一个consumer可以消费此消息

Topic主题

每条发布到kafka集群的消息都有一个类别,这个类别被称为Topic,主题的意思

Partition分区

kafka数据存储的基本单元,topic中的数据分割称一个或者多个partition,每个topic至少又一个partition,有序的
一个Topic的多个partitions,被分布在kafka集群中的多个server上
消费者数量<=partition数量

Replication副本

同个partition会有多个副本replication,多个副本的数据是一样的,当其他broker挂掉后,系统可以主动用副本提供服务
默认每个topic的副本都是1(默认没有副本的)。可以在创建topic的时候指定
如果当前kafka集群只有3个broker节点,则replication-factor的最大数就是3.

ReplicationLeader

Partition有多个副本,但只有一个replicationleader负载该partition和生产者消费者交互

ReplicationFollower

ReplicationFollower只做备份,从ReplicationLeader进行同步

ReplicationManager

负责Broker所有分区副本信息,Replication副本状态切换

offset

每个consumer示例需要为他消费的partition维护一个记录自己消费道哪里的便宜offset
kafka把offset保存在消费端的消费者组里


8615eb8aadf3400d94bae2ec4448bd48.jpg

2.2.特点总结

  • 多订阅者
  • 一个topic可以有一个或者多个订阅者
  • 每个订阅者都要有一个partition,所以订阅者数量要少于等于partition
  • 高吞吐量、低延迟:每秒可以处理几十万条消息
  • 高并发:几千个客户端同时读写
  • 容错性:多副本、多分区允许集群中节点失败。
  • 扩展性强:支持热扩展

2.3.基于消费者组可以实现

  • 基于队列的模型:所有消费者都在同一消费者组里,每条消息只会被一个消费者处理
  • 基于发布订阅模型:消费者属于不同的消费者组,假如每个消费者都有自己的消费者组,这样kafka消息就能广播到所有消费者实例上

3.Linux环境下Zookeeper和Kafka安装

注意:提前安装好jdk1.8

#编辑群居配置文件
vim /etc/profile
#在最下面,按i进入insert模式,添加一下内容
JAVA_HOME=jdk路径
export JAVA_HOME
CLASSPATH=.:$JAVA_HOME/lib
export CLASSPATH
PATH=$PATH:$JAVA_HOME/bin:$CLASSPATH
export PATH
#重新加载配置
source /etc/profile

3.1.安装启动Zookeeper

(1)解压zookeeper,重命名

tar -xvf apache-zookeeper-3.7.0-bin.tar.gz
mv apache-zookeeper-3.7.0-bin zookeeper

(2)复制默认的配置文件zoo.cfg

3bb2125971d14fdd87982a1973c1fa33.jpg

(3)启动zookeeper,默认2181端口

/usr/local/software/zookeeper/bin/zkServer.sh start

3.2.安装启动Kafka

(1)解压Kafka,重命名

tar -xvf kafka_2.13-2.8.0.tgz
mv kafka_2.13-2.8.0 kafka

(2)修改配置文件config目录下的server.properties

#标识broker编号,集群中有多个broker,则每个broker的编号需要设置不同
broker.id=0
#修改listeners(内网ip)和advertised.listeners(公网ip)配置,不能一样的ip否则启动报错
listeners=PLAINTEXT://(内网ip):端口号 
advertised.listeners=PLAINTEXT://(公网ip):端口号 
#修改zk地址,默认为localhost,注意:zk在那个机器上配置那个机器的公网ip地址
zookeeper.connection=localhost:2181

90fd3304c649414bafec834ed4637e0b.jpg

a11b1bba62ee4378a0f434cf39931a1c.jpg(3)bin目录启动,默认9092端口

#启动
./kafka-server-start.sh ../config/server.properties &
#停止
kafka-server-stop.sh

(4)创建topic

./kafka-topics.sh --create --zookeeper 8.140.116.67:2181 --replication-factor 1 --partitions 2 --topic xd888-topic

f750f4a1b6bb4806991f5acb746acf4a.jpg

3350e3d445764c66b50cec0a282ac608.jpg(5)查看topic

./kafka-topics.sh --list --zookeeper 8.140.116.67:2181


94b46c9f04f44d37ad6dee8af6bd2903.jpg(6)kafka如果直接启动信息会打印在控制台,如果关闭窗口,kafka随之关闭,用守护线程的方式启动

./kafka-server-start.sh -daemon ../config/server.properties &

4.生产者发送消息和消费者消费消息

4.1.创建topic

./kafka-topics.sh --create --zookeeper 8.140.116.67:2181 --replication-factor 1 --partitions 2 --topic topic-test
参数:
kafka-topics.sh脚本启动
--zookeeper ip:端口     #zookeeper在那台机器就写那个IP端口
--replication-factor 1   #指定副本的数量为1 
--partitions 2       #指定分区数为2
--topic t1         #指定topic的名称

67027a8c9be84cac986480c963d27506.jpg

4.2.查看topic

./kafka-topics.sh --list --zookeeper localhost:2181
参数:
kafka-topics.sh脚本启动
--list  #查询列表
--zookeeper  ip:端口  #zookeeper在那台机器就写那个IP端口


09f668c969824e6f95a2ee7e0a8254bc.jpg

4.3.生产者发送消息

./kafka-console-producer.sh --broker-list 8.140.116.67:9092 --topic topic-test 
参数:
kafka-console-producer.sh脚本启动
--broker-list ip:端口号  #指定kafka节点的ip端口号(注意这块一定要写配置文件里配置的ip)
--topic t1          #指定是那个topic

4.4.消费者消费消息

./kafka-console-consumer.sh --bootstrap-server 8.140.116.67:9092 --from-beginning --topic topic-test
参数:
--bootstrap-server ip:端口号 #指定kafka节点的ip端口号(注意这块一定要写配置文件里配置的ip)
--from-begnning         #指当客户端退出时,重新连接消费者,那么他会将之前的消息重新消费一遍
--topic t1            #指定是那个topic

f7eeb2afc9494befba0478978bf853e4.jpg

4.5.删除topic

./kafka-topics.sh --zookeeper 8.140.116.67:2181 --delete --topic topic-test


35f096dcc26f4a6ca3e5709c18d3c839.jpg

4.6.查看broker节点topic状态信息

./kafka-topics.sh --describe --zookeeper localhost:2181 --topic xdclass-topic


1510a4b076c541378018c40e715e1bc6.jpg

注意:添加、删除、查看topic都用的./kafka-topics.sh脚本

5.Kafka点对点模型和发布订阅模型

5.1.JMS规范支持两种消息模型

点对点(point to point)

  • 消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息
  • 消息被消费之后,queue中不在存储,所以消息消费者不能消费到已经被消费的消息。Queue支持存在多个消费者,但对于一个消息而言,只会又一个消费者可以消费

发布订阅(publish/subscribe)

  • 消息生产者(发布)将消息发布到topic中,同事有多个消息消费者(订阅)消费该消息
  • 和点对点方式不同,发布到topic的消息会被所有订阅者消费

5.2.消费者点对点消费模型

  • 编辑消费者配置(确保同个名称group.id一样)config/consumer.properties

159350f1500445e7858d55093219f58c.jpg

  • 创建topic,1个分区
./kafka-topics.sh --create --zookeeper 112.74.55.160:2181 --replication-factor 1 --partitions 1 --topic t1
  • 指定配置文件启动两个消费者
./kafka-console-consumer.sh --bootstrap-server 112.74.55.160:9092 --from-beginning --topic t1 --consumer.config ../config/consumer.properties
参数:
--consumer.config #指定配置文件启动
  • 现象

只有一个消费者可以消费到数据,一个分区只能被同个消费者组的某个消费者进行消费

5.3.消费者发布订阅消息模型

  • 编辑消费者配置,使其groud.id不一样
config/consumer-1.properties
config/consumer-2.properties

f75b697cff154c129ae5d0d0db68bf0f.jpg

  • 指定配置文件启动两个消费者
./kafka-console-consumer.sh --bootstrap-server 112.74.55.160:9092 --from-beginning --topic t1 --consumer.config ../config/consumer-1.properties
./kafka-console-consumer.sh --bootstrap-server 112.74.55.160:9092 --from-beginning --topic t1 --consumer.config ../config/consumer-2.properties
  • 现象

两个消费者同时能够消费到消息


相关文章
|
1天前
|
消息中间件 运维 大数据
道旅科技借助云消息队列 Kafka 版加速旅游大数据创新发展
阿里云云消息队列 Kafka 版 Serverless 系列凭借其卓越的弹性能力,为道旅科技提供了灵活高效的数据流处理解决方案。无论是应对突发流量还是规划长期资源需求,该方案均能帮助企业实现资源动态调整和成本优化,同时保障业务的高可用性和连续性。
|
7天前
|
消息中间件 存储 负载均衡
2024消息队列“四大天王”:Rabbit、Rocket、Kafka、Pulsar巅峰对决
本文对比了 RabbitMQ、RocketMQ、Kafka 和 Pulsar 四种消息队列系统,涵盖架构、性能、可用性和适用场景。RabbitMQ 以灵活路由和可靠性著称;RocketMQ 支持高可用和顺序消息;Kafka 专为高吞吐量和低延迟设计;Pulsar 提供多租户支持和高可扩展性。性能方面,吞吐量从高到低依次为
44 1
|
1月前
|
消息中间件 Java Kafka
初识Apache Kafka:搭建你的第一个消息队列系统
【10月更文挑战第24天】在数字化转型的浪潮中,数据成为了企业决策的关键因素之一。而高效的数据处理能力,则成为了企业在竞争中脱颖而出的重要武器。在这个背景下,消息队列作为连接不同系统和服务的桥梁,其重要性日益凸显。Apache Kafka 是一款开源的消息队列系统,以其高吞吐量、可扩展性和持久性等特点受到了广泛欢迎。作为一名技术爱好者,我对 Apache Kafka 产生了浓厚的兴趣,并决定亲手搭建一套属于自己的消息队列系统。
75 2
初识Apache Kafka:搭建你的第一个消息队列系统
|
2月前
|
消息中间件 存储 druid
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
52 3
|
2月前
|
消息中间件 druid 大数据
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
42 2
|
2月前
|
消息中间件 分布式计算 druid
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
62 1
|
2月前
|
消息中间件 中间件 Kafka
解锁Kafka等消息队列中间件的测试之道
在这个数字化时代,分布式系统和消息队列中间件(如Kafka、RabbitMQ)已成为日常工作的核心组件。本次公开课由前字节跳动资深专家KK老师主讲,深入解析消息队列的基本原理、架构及测试要点,涵盖功能、性能、可靠性、安全性和兼容性测试,并探讨其主要应用场景,如应用解耦、异步处理和限流削峰。课程最后设有互动答疑环节,助你全面掌握消息队列的测试方法。
|
5月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5月前
|
消息中间件 Java C语言
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
1月前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。