Messaging2(三)|学习笔记

本文涉及的产品
任务调度 XXL-JOB 版免费试用,400 元额度,开发版规格
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
MSE Nacos/ZooKeeper 企业版试用,1600元额度,限量50份
简介: 快速学习 Messaging2(三)

开发者学堂课程【高校精品课-上海交通大学-企业级应用体系架构:Messaging2】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/75/detail/15829


Messaging2(三)

 

内容介绍:

一、前言

二、消息机制解耦性

三、Jboss 启动

四、代码例子

五、Kafka 启动

 

五、Kafka 启动

在真正使用时必须要有一台 jboss,jboss 实际上不是一个专门的消息中间件,是一个应用服务器,本节的代码例子都是普通 java 类,基于这种考虑,如果四个普通 java 类还需要启用一个 jboss 运行比较浪费。所以使用一个专门只提供消息服务的服务器,其他功能都不需要。

像 jboss 中还支持数据库连接等。实际上如果只进行消息通信则不需要,所以需要像 Kafka 这种专门的消息中间件。Kafka 实现原理与前面类似,所以也是 PUBLISH&SUBSCRIBE 发布预定,里面也是一些 topic,可以到 topic 中进行订阅相应内容。例如订阅新闻,就要通过属性进行筛选订阅自己感兴趣的新闻。任何一个消息在 JMS 中也存在。例如

Durable subscription 持久化订阅 ,例如早上9:30开始,在12:00结束,但是在10:00到11:40在上课,在系统这段时间内一直有很多消息在发送,但是有一段时间人不在,但也不希望消息丢失,就可以做一个持久化订阅。

String subName = "MySub" ;

MSConsumer consumer =

context.createDurableConsumer(myTopic, subName) ; //持久化消费者。在订阅时传第二个参数,参数就叫持久化订阅的名字。现在要上课就将数据都停止,等到下课再用同样名字进行订阅,服务器就会发现在使用同一个名字在订阅,就会将当时离线之后没有转发成功到这次上线中间的消息依次性进行转发

consumer.close();

context.unsubscribe( subName );

JMSConsumer consumer =

context.createSharedDurableConsumer(topic,"MakeItLast");

image.png

如图起初对 topic 接收消息,消费了 M1、M2,然后因为上课关闭,此时中间会有 M3、M4发送过来没有收到,然后在下课后再次创建消费者,用同样名字进行创建,此时 M5、M6就可以收到,M3和 M4也可以收到。即在消息中间件中,消息被持久化到了硬盘中存储,至于在 DB 中还是 File 中,不同的消息中间件实现不同。持久化是指知道来订阅,但是消息转发时不成功,不成功就存储在此处不断转发,当下一次创建时发现订阅者又出现,就会将中间没有转发成功的消息进行转发。如果一直不出现,所有的service还存在一个timeout,超过一定的时间不出现就会进行删除,就不会占用硬盘空间。

Kafka 实现原理相同,所以也可以进行持久化到 DB 等,Kafka 可以创建集群,所以在 Kafka 启动时要求运行一个zookeeper(集群的管理器)。建集群表示一台 Kafka 无法服务大量用户,所以需要创建很多 Kafka。

Kafka has four core APIs:

- The Producer API allows an application to publish a stream of records to one or more Kafka topics.   //生产者

- The Consumer API allows an application to subscribe to one or more topics and process the stream of records produced to them.   //消费者

- The Streams APl allows an application to act as a stream processor,consuming an input stream from one or more topics and producing an output stream to one or more output topics, effectively transforming the input streams to output streams.   //生产消息如何通过 streams API 扔到 toipc 中

- The Connector API allows building and running reusable producers or consumers that connect Kafka topics to existing applications or data systems.For example, a connector to a relational database might capture every change to a table.   //连接器

下图是 Kafka 的内部结构,进行分区后,写消息时这些消息都是一次性写入不会进行修改。

image.png

即给 Kafka 或其它的消息中间件发送消息时不存在消息发送过来后需要进行修改的功能,所有的消息都是一次性写入。Kafka 建集群就会有多个机器,在多个机器上一个 topic 在逻辑上是一个,但是在 Kafka 集群上,Kafka 为了负载均衡会将 topic  放入集群的多个节点上进行存储,于是将 topic 分为若干个 partition。每一台机器上可以存储一个partition,partition 中所有的数据像日志一样是只读的,可以不断堆叠,所以在写入时可以按照一定逻辑例如Partiton0服务器能力较强,那么有50%的消息写入此处。Partiton 1较弱,只放入20%。Partiton 2一般,放入30%。按照这种概率将用户发消息的请求在这三台机器上做负载均衡。

image.png

所谓的发送消息就是在消费者发入的消息,Kafka 将其写入到图中,然后消费者就可以消费其中的内容。Kafka不管消费日志有没有被消费掉都会进行持久化,持久化后约定消息有一个保存周期,超过该周期就会将内容删除,否则不会进行删除。图中的消息0不是之前的讲解的读走了消息0,消息0就会从 topic 中删除。而是0还在,但是会记录一个offset 记录当前已经读到哪个消息,未来在收消息时不会收到重复消息,例如0已经被读取,不会再读0,offset会往前移一格,保证消息不会被重复读取,但是所有消息可以一直保留。是从安全性进行考虑,例如需要群收5个消息,第一个读走了就会删除,第二个失败了就会恢复出0,所以为了处理简便,使用 offset 标记用户已经读到的地方。然后将所有消息持久化,在 timeout 后删除。

以下是从 Kafka 官网上截取的如何运行:

Quickstart

- https://kafka.apache.org/quickstart

Step 1: Download the code

Download the 2.4.0 release and un-tar it.

1 > tar -xzf kafka_2.12-2.4.0.tgz

2 > cd kafka_2.12-2.4.0

Step 2: Start the server

Kafka uses ZooKeeper so you need to first start a ZooKeeper server if you don't already have one. You can use the convenience script packaged with kafka to get a quick-and-dirty single-node ZooKeeper instance.

1 > bin/zookeeper-server-start.sh config/zookeeper.properties

2[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.ser

3...

Now start the Kafka server:

1 > bin/kafka-server-start.sh config/server.properties

2 [2013-04-22 15:01:47,028] INF0 Verifying properties (kafka.utils.VerifiableProperties)

3[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576(kafka.utils.Verifiabl

4. ..

其实只需要下载 Kafka 即可,以上自带了一个 Zookeeper 脚本和单节点的 zookeeper 实例,所以只需要运行相应操作即可。运行起来后创建 topic,发消息等。

实际上 java 中也可以使用客户端进行访问,与发送者接收者代码相同,要写很多属性,然后创建消费者,将属性写入。收到一个消息,消息有 offset、key、value

代码运行结果如图

image.png

有一个在接收消息,有一个在发送消息。右侧为 zookeeper。

所以 Kafka 是一个很好的选择,如果不想要启动 jboss,就可以使用 kafka 接收和发送消息。

相关文章
|
机器学习/深度学习 数据挖掘 数据处理
alteryx哪里开发的,如何收费
【6月更文挑战第23天】alteryx哪里开发的,如何收费
328 5
|
存储 小程序 安全
【微信小程序】实现授权登入---超详细讲解
【微信小程序】实现授权登入---超详细讲解
1998 1
|
机器学习/深度学习 人工智能 自然语言处理
【每日算法Day 100】字节跳动 AI Lab 面试编程题(三道)
我当时太紧张了,真是脑抽了,还想着弄个优先队列,划分最大的,然后丢进去,再划分最大的,但是是错的。 正确解法小姐姐走了我才想起来,二分答案 m ,然后扫描一遍判断将每一段划分成小于等于 m 的一共需要多少次。如果次数大于 k ,说明 m 太短了,否则说明 m 太长了。
561 0
【每日算法Day 100】字节跳动 AI Lab 面试编程题(三道)
|
域名解析 弹性计算 Linux
PHP SDK 的安装与调用
Alibaba Cloud SDK for PHP是支持PHP开发者快速访问阿里云服务的开发包,由Alibaba Cloud Client for PHP提供底层支持。Alibaba Cloud SDK for PHP让您不用复杂编程即可访问云服务器、云数据库RDS和云监控等多个阿里云服务。本文将为大家介绍PHP SDK的安装与调用过程。
2552 0
PHP SDK 的安装与调用
|
Linux
Linux 中RPM软件包管理
Linux 中RPM软件包管理
198 2
|
网络协议 安全 Go
子域名收集神器:Subfinder 保姆级教程(附链接)
子域名收集神器:Subfinder 保姆级教程(附链接)
|
Ubuntu Linux
如何关闭或重新启动Ubuntu Linux?这四个命令希望你能熟练使用!
如何关闭或重新启动Ubuntu Linux?这四个命令希望你能熟练使用!
1947 0
如何关闭或重新启动Ubuntu Linux?这四个命令希望你能熟练使用!
|
Java C# 图形学
【3D捏脸功能实现】
【3D捏脸功能实现】
849 0
|
JSON 自然语言处理 对象存储
通义千问开源模型在PAI灵骏的最佳实践
本文将展示如何基于阿里云PAI灵骏智算服务,在通义千问开源模型之上进行高效分布式继续预训练、指令微调、模型离线推理验证以及在线服务部署。
|
开发框架 .NET
46【软件基础技术】托管调试助手 “ContextSwitchDeadlock“错误处理
VS2019处理一个数据量较大的程序时报这个错误:
904 0