开发者学堂课程【高校精品课-上海交通大学-企业级应用体系架构:Messaging2】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/75/detail/15829
Messaging2(三)
内容介绍:
一、前言
二、消息机制解耦性
三、Jboss 启动
四、代码例子
五、Kafka 启动
四、代码例子
创建生产者的例子(所有操作都与访问数据库类似):
JMSProducer.java
public class JMSProducer {
private static final Logger log = Logger.getLogger(JMSProducer.class.getName());
private static final String DEFAULT_MESSAGE="这是 JMS 信息...…";
private static final String DEFAULT_CONNECTION_FACTORY = "jms/RemoteConnectionFactory"; //刚才图中可以看到 Java:jboss/exported 下有 jms下有 topic 或 Queue,所以此处定义连接工厂。连接工厂本身在 Java:jboss/exported下的jms下的RemoteConnectionFactory
private static final String DEFAULT_DESTINATION = "/jms/queue/HelloWorldMDBQueue"; //对 Queue 操作,queue 对应发送者接收者。Topic 对应发布者和预定者。现在要做一个生产者的例子。
private static final String DEFAULT_MESSAGE_COUNT= "10";
private static final String DEFAULT_USERNAME = "root"; //定义用户名
private static final String DEFAULT_PASSWORD = "reins2011!"; //定义密码
private static final String INITIAL_CONTEXT_FACTORY = "org.wildfly.naming.client.WildFlylnitialContextFactory";
private static final String PROVIDER_URL = "http-remoting://localhost:8080"; //整个服务器启动起来后在此处
public static void main(String[] args) throws Exception {
Context context=null;
Connection connection=null;
try {
final Properties env = new Properties();
env.put(Context.INITIAL_CONTEXT_FACTORY,INITIAL_CONTEXT_FACTORY);//该 KEY 的值为初始化 Context 的工厂类,JNDI 驱动的类名
env.put(Context.PROVIDER_URL,PROVIDER_URL);
//该 KEY 的值为 Context 服务提供者的 URL.命名服务提供者的 URL
env.put(Context.SECURITY_PRINCIPAL,DEFAULT_USERNAME);
env.put(Context.SECURITY_CREDENTIALS, DEFAULT_PASSWORD);//应用用户的登录名,密码.
//获取到 InitialContext 对象.
context = new InitialContext(env); //创建 InitialContext,找 JNDI 的名字。创建时可以使用一个 Properties 集合类,其中放入属性,或者是写为一个 jndi.properties 文件
Destination destination = (Destination) context.lookup(DEFAULT_DESTINATION); //创建好后寻找DESTINATION,往哪里发送
//创建 JMS 连接、会话、生产者和消费者
connection = connectionFactory.createConnection(DEFAUIT_USERNAME,DEFAULT_PASSWORD); //对着连接工厂使用用户名和密码创建连接
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //创建 session
MessageProducer producer = session.createProducer(destination); //使用 session 对 destination 创建了一个发送者
connection.start(); //启动连接
int count = Integer.parselnt(DEFAULT_MESSAGE_COUNT);//发送特定数目的消息
TextMessage message = null; //创建消息
for (int i = 0; i
message = session.createTextMessage(DEFAULT_MESSAGE); //消息全为文本消息
producer.send(message); //全部发送
}
//等待30秒退出
CountDownLatch latch = new CountDownLatch(1);
latch.await(30,TimeUnit.SECONDS);
}catch (Exception e) { log.severe(e.getMessage()); throw e;
}finally {
if (context != null) { context.close(); } //发送完成后与数据库需要关闭连接相同,需要关闭所有连接
if (connection != null) { connection.close(); }
}
}
}
以上就是简单的发送者例子,最重要的为创建了消息后需要往消息中发什么内容。此处写的是 DEFAULT_MESSAG,定义了"这是 JMS 信息...…"。
接下来创建消费者例子:(前面操作相同,创建 destination,然后寻找 connectionFactory。都是 lookup 出来,创建 InitialContext。全部相同,包括用户名密码等)
JMSConsumer.java
public class JMSConsumer {
private static final Logger log =Logger.getLogger(JMSConsumer.class.getName());
private static final String DEFAULT_CONNECTION_FACTORY = "jms/RemoteConnectionFactory";
private static final String DEFAULT_DESTINATION ="/jms/queue/HelloWorldMDBQueue";
private static final String DEFAULT_USERNAME = "root";
private static final String DEFAULT_PASSWORD = "reins2011!";
private static final String INITIAL_CONTEXT_FACTORY= “org.wildfly.naming.client.WildFlyInitialContextFactory";
private static final String PROVIDER_URL = "http-remoting://localhost:8080";
private static final int WAIT_COUNT= 5;
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = null;
Connection connection = null;
MessageConsumer consumer = null;
Destination destination = null;
TextMessage message = null;
Context context = null;
try {
final Properties env = new Properties();
env.put(Context./NITIAL_CONTEXT_FACTORY,INITIAL_CONTEXT_FACTORY);
env.put(Context.PROVIDER_ URL,PROVIDER_URL);
env.put(Context.SECURITY_PRINCIPAL,DEFAULT_USERNAME);
env.put(Context.SECURITY_CREDENTIALS,DEFAULT_PASSWORD);
context = new InitialContext(env);
connectionFactory =(ConnectionFactory) context.lookup(DEFAULT_CONNECTION_FACTORY);
destination =(Destination) context.lookup(DEFAULT_DESTINATION);
connection = connectionFactory.createConnection(DEFAULT_USERNAME, DEFAULT_PASSWORD);
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session.createConsumer(destination); //创建消费者
connection.start();
//等待30秒退出
CountDownLatch latch = new CountDownLatch(1);log.info("开始从 JBOSs 端接收信息-----");
int i = 0;
for ( ; i< WAIT_COUNT; i++)
{if (message l= null) {
log.info("接收到的消息的内容:" +message.getText());
i = 0;
}
log.info("开始从 JBOSS 端接收信息-----");
message = (TextMessage) consumer.receive(5000); //接收是同步的,每隔5s 查找一次,悠就发送出来
latch.await(1,TimeUnit.SECONDS);
}
}catch (Exception e) { log.severe(e.getMessage()); throw e;}
finally {
if (context != null) { context.close();}
if (connection != null) { connection.close(); }
}
}
}
从代码中可以看出是一个最简单的消息通信代码,首先没有实现任何接口或者继承所有类,所以是一个很简单的普通的类。该编程模型比从一个远程的对象操作更复杂,是因为该编程模型更像一个操作数据库,不像操作一个rml对象简单。而且调用时 rmi 对象是一旦获取到就可以引用,直接像一个本地对象一样,所以可以调用 a.m()方法。目前所有的操作是没有一个远程对象的,所有操作都是发消息实现。所以每次都需要创建消息或者接收消息。接收到后剩下就是自己的逻辑。所以编程模型就比同步的编程模型更复杂。上述为同步的接收,异步的接收是在 consumer 上注册一个监听器,监听器中只有一个方法 onmessage,参数为 message。如果这么编写,那么 consumer 若监听到消息就会触发监听器上的 onmessage 执行。
JMSProducer 运行结果如图:
可以看到图上在发送消息:这是 JMS 消息...
JMSConsumer 运行结果如图:
可以看到在接收消息。
以上都是点对点的处理,如果发布预定模型,唯一的差异是在消息通信时消息的目的地变为 Topic,在消息发布时如果是一个新闻 topic,那么在新闻 Topic 中,有人关心体育新闻,有人关心军事新闻等,所以可以进行选择。
发布者例子部分代码:
JMSPub.java
System.out.print("输入要发送的消息:(数字0退出)");
line = msgStream.readLine(); //一段文本
if (line != null && line.trim().length() != 0){
TextMessage textMessage = session.createTextMessage(); //创建文本消息
textMessage.setText(line); //设置 Text 文本
textMessage.setStringProperty("selector" , "Funny");
//增加选择项。基础上增加了一个属性。一个消息分为三部分:不能扩充的 header(已经有了一些预定义字段),第二个为 properties,一部分已经写好了预定义的字段,另一部分没有预定义,允许自己扩展,允许自己添加属性。第三个为消息体。此处需要增加一个属性 Selector,属性叫Funny
producer.send(textMessage);
quitNow = line.equalslgnoreCase("0");
}
消息接收者例子:
JMSSub.java
String selector;
selector = new String("(Selector = 'Funny')");
consumer = session.createConsumer(topic, selector); //队长t opic 生成 consumer 时可以增加一个属性 selector,发送者写了 selector 属性,接收者规定selector=?,如果不为 Funny 就不能接收到。只有上述写的为 Funny,此处也写的为 Funny,消息才能够被接收。
consumer.setMessageListener(new javax.jms.MessageListener() {
//收到后不想同步接收,所以在 consumer上设置一个监听器,监听器定义了一个匿名类,该类没有名字。
public void onMessage(Message message){ //只有一个方法 onMessage,接收的参数为 message
TextMessage tm = (TextMessage) message; //拿到 message 后剩下的事情是自己做的,例如水费120waterfee:120,需要从中解析出然后写数据库或者做别的操作。剩下的逻辑就是自己写
system.out.println("接收到的消息内容: " +tm.getText().toString());
System.out.println("JMS 目的地: " + tm.getJMSDestination());
System.out.println("JMS 回复: " + tm.getJMSReplyTo());
System.out.println("JMS 消息 ID 号: " + tm.getJMSMessagelD());
System.out.println("是否重新接收: " + tm.getJMSRedelivered());}catch (JMSException e1){
e1.printStackTrace();
}
}
从以上代码例子中可以看出两者之间确实实现了解耦,发布者在发送消息时没有规定一定会被谁接收到,而接收端没有规定必须按照某一个 API 进行编程,而是直接发送文本就可以处理。所以实现了解耦。接收者实现了异步,使用监听器方式在接收,即实际上什么时候触发 onmessage 方法需要看具体情况例如系统是否忙碌。但绝对不是发布者一发送消息,接收端就可以立刻接收。表面上 Pub 与 Sub 是在同时运行,但是实际上是异步通信。在代码consumer.setMessageListener 后代码就会一直运行下去,不需要等待。
不像调用一个 a.m()方法,除非 m 方法返回,否则代码只能堵塞在此处,不能继续往下执行。现在使用异步通信方式,代码在此处注册了监听器就可以直接执行下去。以后有了方法也是触发 try 内代码进行执行,主线程不会阻塞。
JMSPub 运行结果如图:
显示在此处发送消息
JMSSub 运行结果如图:
显示在此处接收消息。两个程序之间互相进行交互。
在运行代码时,需要消息中间件,所以需要先运行 jboss。Jboss 跑起来后如图是控制台:
点击 Adminstration Console,进入后输入用户名密码(先通过脚本增加用户、赋权限等)进入后可以看到有些配置:上方有菜单,Runtime 表示当前启动起来的服务器:
如图左侧是实例,点击 JNDI,点击右侧介绍的 view,就可以看到整个树,如图就是通过配置文件增加进入的Topic与Queue:
Topic 和 Queue 能否通过编码的方式实现创建?基本上不提供该功能,如果有人可以通过编码的方式在应用服务器上增加一个 Queue 和 Topic,是一件很危险的事情。所以都是需要使用管理员的权限在其中更改配置文件来实现。以上生产者和消费者运行起来的过程。接着运行发布者与接收者,运行起来后在输入消息中写入要发送的内容:Today is Thursday,发送出去后可以看到 JMSSub 接收到
从运行过程中可以看到,这是两个不同的进程,确实是在通过 HelloWorldMDBTopic 进行通信。代码为上述例子代码。可以看到发送者与订阅者之间确实实现了解耦,但是两者可以通信。
所以希望防止双十一这种订单特别多的情况,如果都调用机器上某一个下订单的服务,调用该服务时需要写数据库,写数据库需要在上面加锁,一旦加锁,大量的线程同事过来写数据库,都要阻塞在此处进行等待。而此处是同步的,用户下订单没有结束,代码不能往下执行,代码大量请求发送过来时,多线程在进行处理,那么线程之间在访问数据库时,数据库为了做到事务的控制就需要加锁,所以就带来了等待,所以就会带来问题:大量用户则色在此处。页面无法刷新,也不能做其他操作。
所以改为前端发送过来下订单请求时,全部都放入一个 Queue 中,等于服务器中需要有这样一个类似的接收者。客户端在web应用中是 service,接收到用户请求后不是真正在调后面的 DAO 然后写数据库,而是产生一个消息放入Queue中,然后会产生一个 Sub 后台应用,会不断读取 Queue 内容。处理完后将处理结果再放入另一个 Queue中,然后service 监听返回结果的 queue,就可以知道是否成功,一旦成功再将消息退回给用户。所以用户下订单后会显示:已接收请等待,然后将消息扔到队列中,处理完后拿到返回消息后主动推给用户。以上就是消息的发送及接收,写一个异步通信接受订单的功能就是此处理。上节讲解过实际上存在一个前提,如图直线是能够处理的并发用户数,有时候请求超过有时候低于,是将超过的请求没有处理,将其缓存下来留在闲暇时间进行处理较快。
如果系统平均处理并发数固定,但是负载一直超出
显然即使使用异步通信也不能完成。因为即使在最闲时并发数也比能够处理的多,此时就没有办法进行处理。所以通过异步通信的方法能够增加响应速度,不是将处理结果立刻处理完成,是先告知收到,其次能够提高响应速度的前提是负载必须有时低于系统的最大处理能力,有时高于才可以。如果负载最小时都超出系统的处理能力,显然无论如何处理都不能处理完。此时就需要加集群,再加一台机器进行处理。实际上从总量上来讲,资源足够,但是某些时刻不足够,即负载不均,但是总的负载与总的处理能力匹配。这种情况下才能够使用异步通信解决问题。