文章目录
1、什么是MQ
2、MQ的多种产品
3、MQ的工作原理
4、ActiveMQ 的配置
5、ActiveMQ 的数据存储方式
6、ActiveMQ的主从服务
7、ActiveMQ的集群负载均衡
什么是MQ?
Message Queue, 就是消息队列,MQ 经常会作为多系统当中的网络消息传输。是一种应用程序对应用程序的通信方式。也是WEB服务器的一种重要的第三方软件。
应用程序通过写和检索出入列队的针对应用程序的数据(消息)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。
MQ的多种产品
MQ产品名称 | 简单介绍 |
---|---|
ActiveMQ | 是Apache下的一个子项目。 类似于ZeroMQ,它能够以代理人和点对点的技术实现队列。同时类似于RabbitMQ,它少量代码就可以高效地实现高级应用场景。RabbitMQ、ZeroMQ、ActiveMQ均支持常用的多种语言客户端 C++、Java、.Net,、Python、 Php、 Ruby等,SOA因为用的是ActiveMQ,后面我们着重学习这个产品。 |
ZeroMQ | 号称最快的消息队列系统,尤其针对大吞吐量的需求场景。ZMQ能够实现RabbitMQ不擅长的高级/复杂的队列,但是开发人员需要自己组合多种技术框架,技术上的复杂度是对这MQ能够应用成功的挑战。ZeroMQ具有一个独特的非中间件的模式,你不需要安装和运行一个消息服务器或中间件,因为你的应用程序将扮演了这个服务角色。你只需要简单的引用ZeroMQ程序库,可以使用NuGet安装,然后你就可以愉快的在应用程序之间发送消息了。但是ZeroMQ仅提供非持久性的队列,也就是说如果down机,数据将会丢失。其中,Twitter的Storm中使用ZeroMQ作为数据流的传输。 |
RabbitMQ | 是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正是如此,使的它变的非常重量级,更适合于企业级的开发。同时实现了一个经纪人(Broker)构架,这意味着消息在发送给客户端时先在中心队列排队。对路由(Routing),负载均衡(Load balance)或者数据持久化都有很好的支持 |
Redis | 是一个Key-Value的NoSQL数据库,开发维护很活跃,虽然它是一个Key-Value数据库存储系统,但它本身支持MQ功能,所以完全可以当做一个轻量级的队列服务来使用。对于RabbitMQ和Redis的入队和出队操作,各执行100万次,每10万次记录一次执行时间。测试数据分为128Bytes、512Bytes、1K和10K四个不同大小的数据。实验表明:入队时,当数据比较小时Redis的性能要高于RabbitMQ,而如果数据大小超过了10K,Redis则慢的无法忍受;出队时,无论数据大小,Redis都表现出非常好的性能,而RabbitMQ的出队性能则远低于Redis |
Jafka/Kafka | Kafka是Apache下的一个子项目,是一个高性能跨语言分布式Publish/Subscribe消息队列系统,而Jafka是在Kafka之上孵化而来的,即Kafka的一个升级版。具有以下特性:快速持久化,可以在O(1)的系统开销下进行消息持久化;高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现复杂均衡;支持Hadoop数据并行加载,对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka通过Hadoop的并行加载机制来统一了在线和离线的消息处理,这一点也是本课题所研究系统所看重的。Apache Kafka相对于ActiveMQ是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。 其他一些队列列表HornetQ、Apache Qpid、Sparrow、Starling、Kestrel、Beanstalkd、Amazon SQS就不再一一分析。 |
MQ的几个基本概念
1 队列管理器(queue mr)
队理管理器是MQ系统当中最上层的一个管理器,由它为我们提供基于队列的消息服务
2 消息(message)
MQ当中,我们把应用程序交由MQ传输的数据定义为消息(比喻:用户的各种类型的数据文件)某个应用向应用程序发出的请求处理等都可以称做消息。
消息是由两部份组成:
1,消息描述符(message header,消息头),如: 消息的优先级,生命周期,消息ID
2,消息体(message body),就是用户的数据部份,(又为分永久性的,和非永久性消息)
3 队列(queue)
队列是消息的安全存放在地,队列存储的消息直到被应用程序消费之后再结束消息的生命周期
队列的工作方式:
a) 程序A形成对消息的创建, 消息队列检测到有新的消息进入,消息准备好投向程序B;
b) 消息队列管理器确认消息是发往程序B(根据队列名找到), 进入B系统所对应的队列,再由接收的消息队列驻留到程序B系统当中。
c) 适当时间后(消费后)程序B从队列当中读到此消息转变之后的数据,到这里也就意味着消息的生命周期结束。
由于采用了先进的程序设计思想以及内部工作机制,MQ能够在各种网络条件下保证消息的可靠传递,可以克服网络线路质量差或不稳定的现状,在传输过程中,如果通信线路出现故障或远端的主机发生故障,本地的应用程序都不会受到影响,可以继续发送数据,而无需等待网络故障恢复或远端主机正常后再重新运行。
在MQ中,队列分为很多种类型,其中包括:本地队列、远程队列、模板队列、动态队列、别名队列等。
本地队列又分为普通本地队列和传输队列,普通本地队列是应用程序通过API对其进行读写操作的队列;传输队列可以理解为存储-转发队列,比如:我们将某个消息交给MQ系统发送到远程主机,而此时网络发生故障,MQ将把消息放在传输队列中暂存,当网络恢复时,再发往远端目的地。
远程队列是目的队列在本地的定义,它类似一个地址指针,指向远程主机上的某个目的队列,它仅仅是个定义,不真正占用磁盘存储空间。
模板队列和动态队列是MQ的一个特色,它的一个典型用途是用作系统的可扩展性考虑。我们可以创建一个模板队列,当今后需要新增队列时,每打开一个模板队列,MQ便会自动生成一个动态队列,我们还可以指定该动态队列为临时队列或者是永久队列,若为临时队列我们可以在关闭它的同时将它删除,相反,若为永久队列,我们可以将它永久保留,为我所用。
4 通道
通道是MQ系统中队列管理器之间传递消息的管道,它是建立在物理的网络连接之上的一个逻辑概念,也是MQ产品的精华部分。
在MQ当中有三大类通道 ,消息通道 ,MQI通道(分为两种,一种接收receive通道 ,一种发送sender通道) ,Cluster通道(集群通道) 。
工作流程
按照JMS的规范,我们首先需要获得一个JMS connection factory.,通过这个connection factory来创建connection.在这个基础之上我们再创建session, destination, producer和consumer。因此主要的几个步骤如下:
\1. 获得JMS connection factory. 通过我们提供特定环境的连接信息来构造factory。
\2. 利用factory构造JMS connection
\3. 启动connection
\4. 通过connection创建JMS session.
\5. 指定JMS destination.
\6. 创建JMS producer或者创建JMS message并提供destination.
\7. 创建JMS consumer或注册JMS message listener.
\8. 发送和接收JMS message.
\9. 关闭所有JMS资源,包括connection, session, producer, consumer等。
MQ的通信方式
1 publish-subscrib(PS模式)
发布订阅模式:(一点对多模式,通俗的讲又叫广播模式),一方发送消息,多方接收(群发)
一对多的方式:相当于我们在同一个topic里注册了,只要是谁注册了就都能收到消息
P2P模式
也叫点对点模式,就好比打电话,这两个人是独享一条通信链接, 一方发送消息,一方接收。
Request-Response 模式
请求回复模式。(扩展模式,不是MQ默认提供的,需要开发者使用代码强化)
ActivMQ
下载软件如下:
https://activemq.apache.org/
直接解压软件
1, 启动服务
进入bin目录,双击activemq.bat 启动服务就完成了
2, 输入http://127.0.0.1:8161/
启动时需要用户名和密码
怎么查看用户名和密码?
1, 打开conf下的jetty-realm.properties查看用户名密码
查询mysql的版本号: SHOW VARIABLES LIKE "%version%";
kafka
1.简介
Kafka是一种高吞吐量的分布式发布订阅消息系统。详细介绍可查阅官网:kafka官网
2.环境搭建
2.1 安装JDK
下载地址:http://www.oracle.com/technetwork/java/javase/downloads/jre8-downloads-2133155.html
有关jdk的安装不再赘述。
2.2 安装Zookeeper
下载地址:https://zookeeper.apache.org/releases.html
下载后,解压放在目录D:\bigdata(本文所用的目录)下,关于zookeeper以及kafka的目录,路径中最好不要出现空格,比如D:\Program Files,尽量别用,运行脚本时会有问题。
①进入zookeeper的相关设置所在的文件目录,例如本文的:D:\bigdata\zookeeper-3.4.10\conf
②将"zoo_sample.cfg"重命名为"zoo.cfg"
③打开zoo.cfg(至于使用什么编辑器,根据自己喜好选即可),找到并编辑:
dataDir=/tmp/zookeeper to D:/bigdata/zookeeper-3.4.10/data或 D:\bigdata\zookeeper-3.4.10\data(路径仅为示例,具体可根据需要配置)
这里注意,路径要么是"/"分割,要么是转义字符"\",这样会生成正确的路径(层级,子目录)。
④与配置jre类似,在系统环境变量中添加:
a.系统变量中添加ZOOKEEPER_HOME=D:\bigdata\zookeeper-3.4.10
b.编辑系统变量中的path变量,增加%ZOOKEEPER_HOME%\bin
⑤在zoo.cfg文件中修改默认的Zookeeper端口(默认端口2181)
这是本文最终的zoo.cfg文件的内容:
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=D:/bigdata/zookeeper-3.4.10/data
#dataDir=D:\\bigdata\\zookeeper-3.4.10\\data
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
⑥打开cmd窗口,输入zkserver,运行Zookeeper,运行结果如下:
恭喜,Zookeeper已经安装完成,已在2181端口运行。
2.3 安装kafka
下载地址:http://kafka.apache.org/downloads
要下载Binary downloads这个类型,不要下载源文件,这种方便使用。下载后,解压放在D:\bigdata目录下。
①进入kafka配置文件所在目录,D:\bigdata\kafka_2.11-0.9.0.1\config
②编辑文件"server.properties",找到并编辑:
log.dirs=/tmp/kafka-logs to log.dirs=D:/bigdata/kafka_2.11-0.9.0.1/kafka-logs 或者 D:\bigdata\kafka_2.11-0.9.0.1\kafka-logs
同样注意:路径要么是"/"分割,要么是转义字符"\",这样会生成正确的路径(层级,子目录)。错误路径情况可自行尝试,文件夹名为这种形式:bigdatakafka_2.11-0.9.0.1kafka-logs
③在server.properties文件中,zookeeper.connect=localhost:2181代表kafka所连接的zookeeper所在的服务器IP以及端口,可根据需要更改。本文在同一台机器上使用,故不用修改。
④kafka会按照默认配置,在9092端口上运行,并连接zookeeper的默认端口2181。
2.4 运行kafka
提示:请确保启动kafka服务器前,Zookeeper实例已经在运行,因为kafka的运行是需要zookeeper这种分布式应用程序协调服务。
①进入kafka安装目录D:\bigdata\kafka_2.11-0.9.0.1
②按下shift+鼠标右键,选择"在此处打开命令窗口",打开命令行。
③在命令行中输入:.\bin\windows\kafka-server-start.bat .\config\server.properties 回车。
④正确运行的情况为:
到目前为止,zookeeper以及kafka都已正确运行。保持运行状态,不要关闭。
重要(操作日志的处理):
kafka启动后,如果你去查看kafka所在的根目录,或者是kafka本身的目录,会发现已经默认生成一堆操作日志(这样看起来真心很乱):
而且会不断生成不同时间戳的操作日志。刚开始不知所措,一番研究后,看了启动的脚本内容,发现启动的时候是会默认使用到这个log4j.properties文件中的配置,而在zoo.cfg是不会看到本身的启动会调用到这个,还以为只有那一个日志路径:
在这里配置一下就可以了,找到config下的log4j.properties:
将路径更改下即可,这样就可以归档在一个文件夹下边了,路径根据自己喜好定义:
另外如何消除不断生成日志的问题,就是同一天的不同时间会不停生成。
修改这里,还是在log4j.properties中:
本身都为trace,字面理解为会生成一堆跟踪日志,将其改为INFO即可。