1.JMS+AMQP核心知识
1.1.什么是MQ中间件
- 全称MessageQueue,主要是用于程序和程序之间的通信,异步+解耦
1.2.使用场景
- 核心应用:
- 解耦:订单系统->物流系统
- 异步:用户注册->发送右键,初始化信息
- 削峰:秒杀、日志处理
- 跨平台、多语言
- 分布式事务、最终一致性
- RPC调用上下游对接,数据源变动->通知下属
1.3.JMS消息服务和和常见核心概念
(1)什么是JMS
- java消息服务(java message service) ,Java平台中关于面向消息中间件的接口
(2)特性
- 面向java平台的标准消息传递API
- 在Java或JVM语言比如Scala、Groovy中具有互用性
- 无需担心底层协议
- 有queues和topics两种消息传递模型
- 支持事务、能够定义消息格式(消息头、属性和内容)
- (3)常见概念
- JMS提供者:连接面向消息中间件的,JMS接口的一个实现,RocketMQ,ActiveMQ,Kafka等等
- JMS生产者:生产消息的服务
- JMS消费者:消费消息的服务
2.分布式流处平台Kafka核心概念
2.1.Kafka核心概念
Broker
kafka的服务端程序,可以认为一个mq节点就是一个broker,broker存储topic的数据
Producer生产者
创建消息Message,然后发布到MQ中,该角色将消息发布到Kafka的topic中
Consumer消费者
消费队列里的消息
ConsumerGroup消费者组
同个topic,广播发送给不同的group,一个group中只能又一个consumer可以消费此消息
Topic主题
每条发布到kafka集群的消息都有一个类别,这个类别被称为Topic,主题的意思
Partition分区
kafka数据存储的基本单元,topic中的数据分割称一个或者多个partition,每个topic至少又一个partition,有序的 一个Topic的多个partitions,被分布在kafka集群中的多个server上 消费者数量<=partition数量
Replication副本
同个partition会有多个副本replication,多个副本的数据是一样的,当其他broker挂掉后,系统可以主动用副本提供服务 默认每个topic的副本都是1(默认没有副本的)。可以在创建topic的时候指定 如果当前kafka集群只有3个broker节点,则replication-factor的最大数就是3.
ReplicationLeader
Partition有多个副本,但只有一个replicationleader负载该partition和生产者消费者交互
ReplicationFollower
ReplicationFollower只做备份,从ReplicationLeader进行同步
ReplicationManager
负责Broker所有分区副本信息,Replication副本状态切换
offset
每个consumer示例需要为他消费的partition维护一个记录自己消费道哪里的便宜offset kafka把offset保存在消费端的消费者组里
2.2.特点总结
- 多订阅者
- 一个topic可以有一个或者多个订阅者
- 每个订阅者都要有一个partition,所以订阅者数量要少于等于partition
- 高吞吐量、低延迟:每秒可以处理几十万条消息
- 高并发:几千个客户端同时读写
- 容错性:多副本、多分区允许集群中节点失败。
- 扩展性强:支持热扩展
2.3.基于消费者组可以实现
- 基于队列的模型:所有消费者都在同一消费者组里,每条消息只会被一个消费者处理
- 基于发布订阅模型:消费者属于不同的消费者组,假如每个消费者都有自己的消费者组,这样kafka消息就能广播到所有消费者实例上
3.Linux环境下Zookeeper和Kafka安装
注意:提前安装好jdk1.8
#编辑群居配置文件 vim /etc/profile #在最下面,按i进入insert模式,添加一下内容 JAVA_HOME=jdk路径 export JAVA_HOME CLASSPATH=.:$JAVA_HOME/lib export CLASSPATH PATH=$PATH:$JAVA_HOME/bin:$CLASSPATH export PATH #重新加载配置 source /etc/profile
3.1.安装启动Zookeeper
(1)解压zookeeper,重命名
tar -xvf apache-zookeeper-3.7.0-bin.tar.gz mv apache-zookeeper-3.7.0-bin zookeeper
(2)复制默认的配置文件zoo.cfg
(3)启动zookeeper,默认2181端口
/usr/local/software/zookeeper/bin/zkServer.sh start
3.2.安装启动Kafka
(1)解压Kafka,重命名
tar -xvf kafka_2.13-2.8.0.tgz mv kafka_2.13-2.8.0 kafka
(2)修改配置文件config目录下的server.properties
#标识broker编号,集群中有多个broker,则每个broker的编号需要设置不同 broker.id=0 #修改listeners(内网ip)和advertised.listeners(公网ip)配置,不能一样的ip否则启动报错 listeners=PLAINTEXT://(内网ip):端口号 advertised.listeners=PLAINTEXT://(公网ip):端口号 #修改zk地址,默认为localhost,注意:zk在那个机器上配置那个机器的公网ip地址 zookeeper.connection=localhost:2181
(3)bin目录启动,默认9092端口
#启动 ./kafka-server-start.sh ../config/server.properties & #停止 kafka-server-stop.sh
(4)创建topic
./kafka-topics.sh --create --zookeeper 8.140.116.67:2181 --replication-factor 1 --partitions 2 --topic xd888-topic
(5)查看topic
./kafka-topics.sh --list --zookeeper 8.140.116.67:2181
(6)kafka如果直接启动信息会打印在控制台,如果关闭窗口,kafka随之关闭,用守护线程的方式启动
./kafka-server-start.sh -daemon ../config/server.properties &
4.生产者发送消息和消费者消费消息
4.1.创建topic
./kafka-topics.sh --create --zookeeper 8.140.116.67:2181 --replication-factor 1 --partitions 2 --topic topic-test 参数: kafka-topics.sh脚本启动 --zookeeper ip:端口 #zookeeper在那台机器就写那个IP端口 --replication-factor 1 #指定副本的数量为1 --partitions 2 #指定分区数为2 --topic t1 #指定topic的名称
4.2.查看topic
./kafka-topics.sh --list --zookeeper localhost:2181 参数: kafka-topics.sh脚本启动 --list #查询列表 --zookeeper ip:端口 #zookeeper在那台机器就写那个IP端口
4.3.生产者发送消息
./kafka-console-producer.sh --broker-list 8.140.116.67:9092 --topic topic-test 参数: kafka-console-producer.sh脚本启动 --broker-list ip:端口号 #指定kafka节点的ip端口号(注意这块一定要写配置文件里配置的ip) --topic t1 #指定是那个topic
4.4.消费者消费消息
./kafka-console-consumer.sh --bootstrap-server 8.140.116.67:9092 --from-beginning --topic topic-test 参数: --bootstrap-server ip:端口号 #指定kafka节点的ip端口号(注意这块一定要写配置文件里配置的ip) --from-begnning #指当客户端退出时,重新连接消费者,那么他会将之前的消息重新消费一遍 --topic t1 #指定是那个topic
4.5.删除topic
./kafka-topics.sh --zookeeper 8.140.116.67:2181 --delete --topic topic-test
4.6.查看broker节点topic状态信息
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic xdclass-topic
注意:添加、删除、查看topic都用的./kafka-topics.sh脚本
5.Kafka点对点模型和发布订阅模型
5.1.JMS规范支持两种消息模型
点对点(point to point)
- 消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息
- 消息被消费之后,queue中不在存储,所以消息消费者不能消费到已经被消费的消息。Queue支持存在多个消费者,但对于一个消息而言,只会又一个消费者可以消费
发布订阅(publish/subscribe)
- 消息生产者(发布)将消息发布到topic中,同事有多个消息消费者(订阅)消费该消息
- 和点对点方式不同,发布到topic的消息会被所有订阅者消费
5.2.消费者点对点消费模型
- 编辑消费者配置(确保同个名称group.id一样)config/consumer.properties
- 创建topic,1个分区
./kafka-topics.sh --create --zookeeper 112.74.55.160:2181 --replication-factor 1 --partitions 1 --topic t1
- 指定配置文件启动两个消费者
./kafka-console-consumer.sh --bootstrap-server 112.74.55.160:9092 --from-beginning --topic t1 --consumer.config ../config/consumer.properties 参数: --consumer.config #指定配置文件启动
- 现象
只有一个消费者可以消费到数据,一个分区只能被同个消费者组的某个消费者进行消费
5.3.消费者发布订阅消息模型
- 编辑消费者配置,使其groud.id不一样
config/consumer-1.properties config/consumer-2.properties
- 指定配置文件启动两个消费者
./kafka-console-consumer.sh --bootstrap-server 112.74.55.160:9092 --from-beginning --topic t1 --consumer.config ../config/consumer-1.properties ./kafka-console-consumer.sh --bootstrap-server 112.74.55.160:9092 --from-beginning --topic t1 --consumer.config ../config/consumer-2.properties
- 现象
两个消费者同时能够消费到消息