Messaging2(三)|学习笔记

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: 快速学习 Messaging2(三)

开发者学堂课程【高校精品课-上海交通大学-企业级应用体系架构:Messaging2】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/75/detail/15829


Messaging2(三)

 

内容介绍:

一、前言

二、消息机制解耦性

三、Jboss 启动

四、代码例子

五、Kafka 启动

 

五、Kafka 启动

在真正使用时必须要有一台 jboss,jboss 实际上不是一个专门的消息中间件,是一个应用服务器,本节的代码例子都是普通 java 类,基于这种考虑,如果四个普通 java 类还需要启用一个 jboss 运行比较浪费。所以使用一个专门只提供消息服务的服务器,其他功能都不需要。

像 jboss 中还支持数据库连接等。实际上如果只进行消息通信则不需要,所以需要像 Kafka 这种专门的消息中间件。Kafka 实现原理与前面类似,所以也是 PUBLISH&SUBSCRIBE 发布预定,里面也是一些 topic,可以到 topic 中进行订阅相应内容。例如订阅新闻,就要通过属性进行筛选订阅自己感兴趣的新闻。任何一个消息在 JMS 中也存在。例如

Durable subscription 持久化订阅 ,例如早上9:30开始,在12:00结束,但是在10:00到11:40在上课,在系统这段时间内一直有很多消息在发送,但是有一段时间人不在,但也不希望消息丢失,就可以做一个持久化订阅。

String subName = "MySub" ;

MSConsumer consumer =

context.createDurableConsumer(myTopic, subName) ; //持久化消费者。在订阅时传第二个参数,参数就叫持久化订阅的名字。现在要上课就将数据都停止,等到下课再用同样名字进行订阅,服务器就会发现在使用同一个名字在订阅,就会将当时离线之后没有转发成功到这次上线中间的消息依次性进行转发

consumer.close();

context.unsubscribe( subName );

JMSConsumer consumer =

context.createSharedDurableConsumer(topic,"MakeItLast");

image.png

如图起初对 topic 接收消息,消费了 M1、M2,然后因为上课关闭,此时中间会有 M3、M4发送过来没有收到,然后在下课后再次创建消费者,用同样名字进行创建,此时 M5、M6就可以收到,M3和 M4也可以收到。即在消息中间件中,消息被持久化到了硬盘中存储,至于在 DB 中还是 File 中,不同的消息中间件实现不同。持久化是指知道来订阅,但是消息转发时不成功,不成功就存储在此处不断转发,当下一次创建时发现订阅者又出现,就会将中间没有转发成功的消息进行转发。如果一直不出现,所有的service还存在一个timeout,超过一定的时间不出现就会进行删除,就不会占用硬盘空间。

Kafka 实现原理相同,所以也可以进行持久化到 DB 等,Kafka 可以创建集群,所以在 Kafka 启动时要求运行一个zookeeper(集群的管理器)。建集群表示一台 Kafka 无法服务大量用户,所以需要创建很多 Kafka。

Kafka has four core APIs:

- The Producer API allows an application to publish a stream of records to one or more Kafka topics.   //生产者

- The Consumer API allows an application to subscribe to one or more topics and process the stream of records produced to them.   //消费者

- The Streams APl allows an application to act as a stream processor,consuming an input stream from one or more topics and producing an output stream to one or more output topics, effectively transforming the input streams to output streams.   //生产消息如何通过 streams API 扔到 toipc 中

- The Connector API allows building and running reusable producers or consumers that connect Kafka topics to existing applications or data systems.For example, a connector to a relational database might capture every change to a table.   //连接器

下图是 Kafka 的内部结构,进行分区后,写消息时这些消息都是一次性写入不会进行修改。

image.png

即给 Kafka 或其它的消息中间件发送消息时不存在消息发送过来后需要进行修改的功能,所有的消息都是一次性写入。Kafka 建集群就会有多个机器,在多个机器上一个 topic 在逻辑上是一个,但是在 Kafka 集群上,Kafka 为了负载均衡会将 topic  放入集群的多个节点上进行存储,于是将 topic 分为若干个 partition。每一台机器上可以存储一个partition,partition 中所有的数据像日志一样是只读的,可以不断堆叠,所以在写入时可以按照一定逻辑例如Partiton0服务器能力较强,那么有50%的消息写入此处。Partiton 1较弱,只放入20%。Partiton 2一般,放入30%。按照这种概率将用户发消息的请求在这三台机器上做负载均衡。

image.png

所谓的发送消息就是在消费者发入的消息,Kafka 将其写入到图中,然后消费者就可以消费其中的内容。Kafka不管消费日志有没有被消费掉都会进行持久化,持久化后约定消息有一个保存周期,超过该周期就会将内容删除,否则不会进行删除。图中的消息0不是之前的讲解的读走了消息0,消息0就会从 topic 中删除。而是0还在,但是会记录一个offset 记录当前已经读到哪个消息,未来在收消息时不会收到重复消息,例如0已经被读取,不会再读0,offset会往前移一格,保证消息不会被重复读取,但是所有消息可以一直保留。是从安全性进行考虑,例如需要群收5个消息,第一个读走了就会删除,第二个失败了就会恢复出0,所以为了处理简便,使用 offset 标记用户已经读到的地方。然后将所有消息持久化,在 timeout 后删除。

以下是从 Kafka 官网上截取的如何运行:

Quickstart

- https://kafka.apache.org/quickstart

Step 1: Download the code

Download the 2.4.0 release and un-tar it.

1 > tar -xzf kafka_2.12-2.4.0.tgz

2 > cd kafka_2.12-2.4.0

Step 2: Start the server

Kafka uses ZooKeeper so you need to first start a ZooKeeper server if you don't already have one. You can use the convenience script packaged with kafka to get a quick-and-dirty single-node ZooKeeper instance.

1 > bin/zookeeper-server-start.sh config/zookeeper.properties

2[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.ser

3...

Now start the Kafka server:

1 > bin/kafka-server-start.sh config/server.properties

2 [2013-04-22 15:01:47,028] INF0 Verifying properties (kafka.utils.VerifiableProperties)

3[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576(kafka.utils.Verifiabl

4. ..

其实只需要下载 Kafka 即可,以上自带了一个 Zookeeper 脚本和单节点的 zookeeper 实例,所以只需要运行相应操作即可。运行起来后创建 topic,发消息等。

实际上 java 中也可以使用客户端进行访问,与发送者接收者代码相同,要写很多属性,然后创建消费者,将属性写入。收到一个消息,消息有 offset、key、value

代码运行结果如图

image.png

有一个在接收消息,有一个在发送消息。右侧为 zookeeper。

所以 Kafka 是一个很好的选择,如果不想要启动 jboss,就可以使用 kafka 接收和发送消息。

相关文章
|
5月前
|
数据处理 开发者 监控
揭秘实时Web应用开发:WebSocket与Akka Streams如何让Play Framework如虎添翼?
【8月更文挑战第31天】实时Web应用需求日益增长,覆盖了从即时通讯到在线游戏等多个领域。Play Framework结合WebSocket与Akka Streams,简化了高效实时应用的开发。WebSocket提供全双工通信,使服务器能主动向客户端推送消息;Akka Streams支持声明式数据流处理,有效避免系统因数据处理不及时而崩溃。本文通过示例代码展示了如何利用这些技术构建实时股票报价系统,展现了其在实时数据处理方面的强大能力。掌握这一技术组合,将大幅提升你在实时Web应用开发中的效率与稳定性。
58 0
|
消息中间件 缓存 前端开发
Messaging2(二)|学习笔记
快速学习 Messaging2(二)
Messaging2(二)|学习笔记
|
消息中间件 SQL Java
Messaging1(三)|学习笔记
快速学习 Messaging1(三)
125 0
Messaging1(三)|学习笔记
|
消息中间件 Java Kafka
Messaging2(一)|学习笔记
快速学习 Messaging2(一)
105 0
Messaging2(一)|学习笔记
|
消息中间件 传感器 算法
Messaging1(一)|学习笔记
快速学习 Messaging1(一)
125 0
Messaging1(一)|学习笔记
|
安全 Java 编译器
Security1 1(二)|学习笔记
快速学习 Security1 1(二)
Security1 1(二)|学习笔记
|
安全 Java 数据安全/隐私保护
Security1 1(三)|学习笔记
快速学习 Security1 1(三)
103 0
Security1 1(三)|学习笔记
|
安全 小程序 Java
Security1 1(一)|学习笔记
快速学习 Security1 1(一)
Security1 1(一)|学习笔记
|
SQL 机器学习/深度学习 Java
PyFlink 快速上手(一)|学习笔记
快速学习 PyFlink 快速上手
836 0
PyFlink 快速上手(一)|学习笔记
|
SQL Java API
PyFlink 快速上手(二)|学习笔记
快速学习 PyFlink 快速上手(二)
453 0
PyFlink 快速上手(二)|学习笔记