开发者学堂课程【高校精品课-上海交通大学-企业级应用体系架构:Messaging1】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/75/detail/15828
Messaging1(三)
内容介绍:
一、回顾上节内容
二、参考文件
三、异步通信
四、JMS
五、编程类型
五、编程模型
像 database,database 中要找到一个数据源,然后在它上创建连接,再执行 SQL。在讲解事务时,事务不仅仅发生在数据库中,实际上消息中间件也可以有事务对象,例如刚才所说一组对象要一起接收或一起发送。它与数据库访问类似,也需要获取到消息中间件的连接即消息服务器的连接,再执行相应的操作。
具体的编程模型:
1、A point-to-point (PTP) product or application is built on the concept of message queues, senders, and receivers.
消息在发送时认为消息发送到一个目的地,queue部分是在消息服务器中,相当于邮箱,这封信过去后应该只有一个人收,例如 A 发给 B 一封信,只有 B 一个人收到。还是希望发送广告一样,B、C、D 都可收到。如果是一对一那么有一个特殊名词叫 Queue。在消息服务器中就有很多的 Queue,类似邮局中有很多邮箱,约定好往哪个 Queue 中发送消息。对方监听队列消息就会收到。这种模式为点对点,消息目的地为 Queue。
2、ln a publish/subscribe (pub/sub] product or application, clients address messages to a topic, which functions somewhat like a bulletin board.Publishers and subscribers can dynamically publish or subscribe to the topic.
还有一种为消息有很多人收到,就像对方订阅了消息一样,订阅一个频道,该频道有消息所有订阅的人都会有通知,该模型为 publish/subscribe 模式。
往消息目的地中放入一个消息,但凡订阅都可以得到一个副本。这种模式目的地叫 topic。Publishes 为消息的发布者,Subscribes 为消息的订阅者。
以上是两者的区别,最本质的区别为同样一个消息,是一对一消费掉,还是有多个副本被所有订阅的人收到。点对点只是一种特殊的订阅,区别为到底被多少人接收到。默认情况下我们使用订阅发布的模式让多个用户都可以收到消息。虽然两个例子都讲解了,但是只需要使用订阅模式。
真正的编程模型与 DB 类似,所以需要先到消息的服务器的连接工厂,与 datasource 相同。datasource 实际上是一个到数据源的连接工厂,在连接工厂上得到创建一个连接,在连接中再创建很多 session。注意都是一对多的,一个连接工厂上可以创建多个连接,类似 datasource 上可以创建多个数据库连接。与前面讲解的 spring 相同,在于数据库交互时总是创建一个 session,在 session 中通过 tranction 再执行操作,执行完后提交再关闭掉 session。Session 依附在 Connection 上。获取了 session 后在 session 中真正操作与数据库相同。要写数据库或是读数据库在此处就是创建消息发送者然后创建一个消息发走,还是创建一个消息接收者让它针对某一个 Destination 进行 Queue 或是 Topic接收消息。所以编程的模型与数据库非常类似。
再来考虑 DataSource,DataSource 在 JNDI 树上,在 context.xml 中配置的就是 JNDI 树上的名字。所以第一步是在JNDI树上寻找 DataSource,没有写代码,系统会帮你找,用注入方式注入到代码中。我们写了 autowired,然后spring 到 context.xml 文件中找到 JNDI 上的 DataSource 后会自动注入进去。
如果写 JMS 的应用,类似先到 JNDI 树上找连接工厂,找到后创建一个连接,在连接上创建一个 session,然后在session 上指定对哪个 Queue 或 Topic 处理。无论是 Queue 还是 Topic 都在 JNDI 树上寻找。所以到 JNDI 树上寻找消息的 Destination 即 Queue 或 Topic,然后对 Queue 或 Topic 生成一个消息的生成者或是消费者,然后就可以发送消息或是接收消息了。
代码例子:
import javax.naming.*;
import javax.jms.*;
QueueConnectionFactory queueConnectionFactory;
Context messaging = new InitialContext( );
queueConnectionFactory = (QueueConnectionFactory)
messaging.lookup( “QueueConnectionFactory”); //第一步
InitialContext获取消息连接工厂
or
@Resource( lookup = "java:comp/DefaultMSConnectionFactory")private static ConnectionFactory connectionFactory; //或者是像spring中使用resource标注该对象的名字是什么
得到后第二步:
Queue queue;
queue = (Queue )messaging.lookup( “theQueue" );
Topic topic;
topic = (Topic)messaging.lookup( "theTopic" ); //再在下面找Queue或者Topic
or
@Resource( lookup = "jms /MyQueue" )private static Queue queue; //或者如果都使用annotation,那么将Queue或Topic也写成annotation,使用resource进行注释
@Resource( lookup = "jms/MyTopic")private static Topic topic;
第三步:
JMSContext context = connectionFactory.createContext( ); //创建一个上下文
try (JMSContext context = connectionFactory.createContext();)
{
JMSProducer producer = context.createProducer(); //创建消费者或是生产者
...
context.createProducer( ).send(dest, message); //然后可以send一个消息
Or
try (JMSContext context = connectionFactory.createContext();)
{
JMSConsumer consumer = context.createConsumer(dest);
...
Message m = consumer.receive(); //或者receive消息。当然现在的receive是同步的,我们不希望同步接收,而是异步接收。所以在接收者处需要写入一个监听器
Message m = consumer.receive(1000) ;
完成后第四步进行创建消息:
TextMessage message = context.createTextMessage(); //创建一个文本消息
message.setText(msg_text); //文本消息中使用文本,用setText将msg_text字符串写入
// msg_text is a string
context.createProducer( ).send( message); //然后直接send
Message m = consumer.receive(); //receive接收消息
if (m instanceof TextMessage) { //判断是否为TextMessage
string message = m.getBody ( String.class); //因为是文本消息,所以文本消息体中是一个字符串。取出后剩下就是需要做的事情
system.out.println( "Reading message: " + message);}else {
// Handle error or process another message type
}
以上就是我们看到的一个编程模型,如果是异步消息:
Listener myListener = new Listener( ); //设置一个监听器
consumer.setMessageListener( myListener); //将监听器注册到消息接受者上,这样代码就不需要调用 receive一直等待
Listener:
void onMessage(Message inMessage) //一旦有消息,onMessage 方法就会被激活,就会收到消息,然后就是对消息进行处理。
补充:属性可以设置举例:
String data;
TextMessage message;
message = session.createTextMessage();
message.setText(data);
message.setStringProperty ( "selector", "Technology" );
//名字随便命名,值可以随意设置
例如第二种模型 publish/subscribe 模式中,新浪网站发布新闻,所有news都放在 Topic 中,A 只对体育感兴趣,B只对经济感兴趣,那么虽然两人都订阅了新闻 Topic,但是接收到的消息不同。在发送消息时就可以在消息中添加一个属性,名字随便命名,值可以随意设置。例如写 categories,sports,另外一种也有 categories 属性,但是它的种类为 economic。
String selector;
selector = new String(“(Selector = ‘Technology’) ");
MSConsumer consumer = context.createConsumer(dest,selector);
在接受消息时,不同的人会写入不同的 categories,例如一个为 categories=sports,在下面创建消息目的地的同时指定 categories=sports 字符串作为第二参数写入。即便是在 Topic 中的消息只有符合条件即有一个属性条件=sports才会进行接收,其它的不接收。
以上就是大体上看到的编程模型。
此外如果需要运行历程,此处提供了两个,一个为 JMS,在 component 中没有应用服务器,需要安装一个 wildfly。第二个历程为 kafka,安装后按照 ppt 中所讲过程进行运行。Kafka 在启动时需要一个 zookeeper,然后可以看到接收到的消息。
最后讲解一个问题:用户与服务网站之间为对称密钥。最终访问的不是 seSqoop,而是下面的内容。下面的系统有很多,与每一个都应该有独立的加密进行操作。仍旧是一次登录都可以使用,只是在下面不同 service 时给予一个专用的密钥使用。