消息队列:第六章:ObjectMessage与MapMessage

简介: 消息队列:第六章:ObjectMessage与MapMessage

javax.jms.jmsexception:无法从内容生成正文。可序列化类不可用于代理原因:java.lang.ClassNotFoundException:

禁止类com.javaliao.portal.model.TbLogVisit不信任将此类序列化为objectMessage负载。

有关如何配置受信任类的详细信息,请查看http://activemq.apache.org/objectmessage.html

控制台打印:
javax.jms.JMSException: Failed to build body from content. Serializable class not available to broker. 
Reason: java.lang.ClassNotFoundException: Forbidden class com.javaliao.portal.model.TbLogVisit! 
This class is not trusted to be serialized as ObjectMessage payload. Please take a look at 
http://activemq.apache.org/objectmessage.html for more information on how to configure trusted classes.
    at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:36)
    at org.apache.activemq.command.ActiveMQObjectMessage.getObject(ActiveMQObjectMessage.java:213)
    at com.javaliao.portal.listener.LogVisitListener.consumePaymentResult(LogVisitListener.java:50)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)

出现这个问题是因为:


尽管通常不鼓励使用ObjectMessage,因为它在生产者和消费者之间引入了类路径的耦合,但ActiveMQ支持它们作为JMS规范的一部分。

安防

objectMessage对象依赖于marshal/unmarshal对象负载的Java序列化。此进程通常被认为是不安全的,因为恶意负载可以利用主机系统进行攻击。这就是为什么从版本5.12.2和5.13.0开始,ActiveMQ强制用户显式地列出可以使用ObjectMessages交换的包的白名单。

如果需要交换对象消息,则需要添加应用程序正在使用的包。通过使用org.apache.activemq.serializable_packages系统属性(由代理和activemq客户机库解释)可以做到这一点。您可以将此系统属性添加到${activemq_home}/bin/env脚本中的activemq_opts变量。

例如

-dorg.apache.activemq.serializable_packages=java.lang,javax.security,java.util,org.apache.activemq,org.fusesource.hawtbuf,com.thoughtworks.xstream.mapper,com.mycompany.myapp

将com.mycompany.myapp包添加到受信任包列表中注意,这里列出的其他包在默认情况下是启用的,因为它们是常规代理工作所必需的。如果您想简化此机制,可以使用*通配符来信任所有包,如

-dorg.apache.activemq.serializable_包=*

客户

在客户机端,您需要使用与在objectMessage.getObject()调用上反序列化恶意代码相同的机制,从而破坏应用程序的环境。您可以在代理上使用相同的配置机制,并使用系统属性配置受信任的类但是,这在客户端应用程序中通常不方便,因此在5.12.2和5.13.1中,我们引入了使用activemqconnectionfactory的附加配置机制。定义了两种附加方法:

setTrustedPackages()方法允许您设置要取消序列化的受信任包的列表,如

activemqconnectionfactory=new activemqconnectionfactory(“tcp://localhost:61616”);

factory.setTrustedPackages(新的ArrayList(Arrays.asList(“org.apache.activemq.test,org.apache.camel.test.split(“,”)));

setTrustAllPackages()允许关闭安全检查并信任所有类。它对测试很有用。

activemqconnectionfactory=new activemqconnectionfactory(“tcp://localhost:61616”);

factory.setTrustAllPackages(真);

您可以在camel上下文中设置相同的属性,如:```

org.apache.activemq.test org.apache.camel.test测试

```如果设置了系统属性,则此配置将覆盖这些属性。



我的代码:


发消息代码:

    @Autowired
    ActiveMQUtil activeMQUtil;
    @Override
    public void insertLogVisit(TbLogVisit tbLogVisit) {
        //使用信息队列发信息异步执行保存到数据库中
        try {
            // 连接消息服务器
            Connection connection = activeMQUtil.getConnection();
            connection.start();
            //第一个值表示是否使用事务,如果选择true,第二个值相当于选择0
            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
            // 发送消息
            Queue testqueue = session.createQueue("LOG_VISIT_QUEUE");
            MessageProducer producer = session.createProducer(testqueue);
            ObjectMessage objectMessage = session.createObjectMessage();
            /*AMQ_SCHEDULED_DELAY   long  延迟投递的时间
            AMQ_SCHEDULED_PERIOD  long  重复投递的时间间隔
            AMQ_SCHEDULED_REPEAT  int   重复投递次数
            AMQ_SCHEDULED_CRON  String  Cron表达式*/
            //设置时间为30s
            objectMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,1000*30);
            objectMessage.setObject(tbLogVisit);
            // 设置持久化传输
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            producer.send(objectMessage);
            session.commit();// 事务型消息,必须提交后才生效
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

连接工具类:

package com.javaliao.portal.util;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.pool.PooledConnectionFactory;
import javax.jms.Connection;
import javax.jms.JMSException;
import java.util.ArrayList;
import java.util.Arrays;
public class ActiveMQUtil {
    PooledConnectionFactory pooledConnectionFactory=null;
    public  void init(String brokerUrl){
        ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(brokerUrl);
        //设置白名单包
        activeMQConnectionFactory.setTrustedPackages(
                new ArrayList(Arrays.asList("org.apache.activemq.test,org.apache.camel.test".split(","))));
        activeMQConnectionFactory.setTrustAllPackages(true);
        pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory);
        pooledConnectionFactory.setExpiryTimeout(2000);
        pooledConnectionFactory.setMaximumActiveSessionPerConnection(10);
        pooledConnectionFactory.setMaxConnections(30);
        pooledConnectionFactory.setReconnectOnException(true);
    }
    public Connection getConnection(){
        Connection connection = null;
        try {
            connection = pooledConnectionFactory.createConnection();
        } catch (JMSException e) {
            e.printStackTrace();
        }
        return connection;
    }
}

配置类:

package com.javaliao.portal.config;
import com.javaliao.portal.util.ActiveMQUtil;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import javax.jms.JMSException;
import javax.jms.Session;
@Configuration
public class ActiveMQConfig {
    @Value("${spring.activemq.broker-url:disabled}")
    String brokerURL ;
    @Value("${activemq.listener.enable:disabled}")
    String listenerEnable;
    @Bean
    public ActiveMQUtil getActiveMQUtil() throws JMSException {
        if(brokerURL.equals("disabled")){
            return null;
        }
        ActiveMQUtil activeMQUtil=new ActiveMQUtil();
        activeMQUtil.init(brokerURL);
        return  activeMQUtil;
    }
    @Bean
    public ActiveMQConnectionFactory activeMQConnectionFactory ( ){
/*        if((url==null||url.equals(""))&&!brokerURL.equals("disabled")){
            url=brokerURL;
        }*/
        ActiveMQConnectionFactory activeMQConnectionFactory =
                new ActiveMQConnectionFactory(  brokerURL);
        return activeMQConnectionFactory;
    }
    //定义一个消息监听器连接工厂,这里定义的是点对点模式的监听器连接工厂
    @Bean(name = "jmsQueueListener")
    public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory ) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        if(!listenerEnable.equals("true")){
            return null;
    }
        factory.setConnectionFactory(activeMQConnectionFactory);
    //设置并发数
        factory.setConcurrency("5");
    //重连间隔时间
       factory.setRecoveryInterval(5000L);
       factory.setSessionTransacted(false);
       factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
        return factory;
    }
}

实体类:

import java.io.Serializable;
import java.util.Date;
public class TbLogVisit implements Serializable {
    private static final long serialVersionUID = 2659644940173539668L;
    private Long id;
    private String visitIpAddress;
    private String visitHostName;
    private String visitChannel;
    private String visitDescription;
    private String visitApi;
    private String visitParams;
    private String visitUrl;
    private String visitTimeConsuming;
    private String visitResult;
    private Long visitNum;
    private String visitThrowingErro;
    private Date visitStartTime;
    private Date visitEndTime;
    private Date createTime;
    private Date updateTime;

接收消息:

package com.javaliao.portal.listener;
import com.javaliao.portal.log4j.BaseLogger;
import com.javaliao.portal.model.TbLogVisit;
import com.javaliao.portal.service.ActionService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
@Component
public class LogVisitListener {
    @Autowired
    ActionService actionService;
    /**
     * 为了体现差距,专门做了个很大的class,使用json转换后大概35MB左右
     * 使用json传输的情况,单位均为ms:
     * 总时间:17366
     * 传输时间:220
     * 发送者把object转为json的时间:6271
     * 发送总共时间:10000
     * 接收者把message转换为textMessage时间:0
     * 接收者把json转换为object时间:7146。
     * ****************************************
     * 使用ObjectMessage进行传输的情况:
     * 总时间:6742
     * 传输时间:173
     * 发送总时间:4836
     * 接收者把message转换为ObjectMessage时间:1733
     * ******************************************
     * 结论
     *   虽然没做压力测试,虽然只测了一次,虽然测试环境仅仅是我的笔记本,但我想我已经可以做出结论了。
     *   在server之间的异步通信时,object优于json。
     *   优势主要集中于java序列化和对象之间转换的效率远高于json盒对象转换的效率,
     * 另外序列化后对象的大小比json的小也是有利传输的原因。
     */
    @JmsListener(containerFactory = "jmsQueueListener" ,destination = "LOG_VISIT_QUEUE")
    public void consumePaymentResult(Message mapMessage){
        try {
            ObjectMessage tbLogVisitObject = (ObjectMessage) mapMessage;
            TbLogVisit object = (TbLogVisit) tbLogVisitObject.getObject();
            int count = actionService.insertLog(object);
            if(count < 1){
                BaseLogger.info("日志更新失败");
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

application.preperties:

#activemq信息队列
spring.activemq.broker-url=tcp://192.168.134.100:61616
activemq.listener.enable=true

 


配置那边:


我的是配置在linux系统下


 

然并卵,没什么用,然后强迫我使用String类型,改后代码:

    @Override
    public void insertLogVisit(TbLogVisit tbLogVisit) {
        try {
            // 连接消息服务器
            Connection connection = activeMQUtil.getConnection();
            connection.start();
            //第一个值表示是否使用事务,如果选择true,第二个值相当于选择0
            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
            // 发送消息
            Queue testqueue = session.createQueue("LOG_VISIT_QUEUE");
            MessageProducer producer = session.createProducer(testqueue);
            MapMessage mapMessage=new ActiveMQMapMessage();
            String toString = JSONObject.fromObject(tbLogVisit).toString();
            mapMessage.setString("tbLogVisit",toString);
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            producer.send(mapMessage);
            session.commit();// 事务型消息,必须提交后才生效
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

接收消息:

    @JmsListener(containerFactory = "jmsQueueListener" ,destination = "LOG_VISIT_QUEUE")
    public void consumeLogResult(MapMessage mapMessage){
        try {
            String object = mapMessage.getString("tbLogVisit");
            JSONObject jsonObject = new JSONObject().fromObject(object);
            TbLogVisit logVisit = (TbLogVisit) JSONObject.toBean(jsonObject, TbLogVisit.class);
            int count = actionService.insertLog(logVisit);
            if(count < 1){
                BaseLogger.info("日志更新失败");
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

然后控制台:

相关文章
|
Java API Maven
一文搞懂Java日志级别,重复记录、丢日志问题(下)
一文搞懂Java日志级别,重复记录、丢日志问题
1574 0
一文搞懂Java日志级别,重复记录、丢日志问题(下)
|
小程序 前端开发 iOS开发
微信小程序:多行文本溢出出现多余的文字-webkit-line-clamp
微信小程序:多行文本溢出出现多余的文字-webkit-line-clamp
739 0
微信小程序:多行文本溢出出现多余的文字-webkit-line-clamp
|
11月前
|
存储 弹性计算 NoSQL
os-copilot安装与多项功能测评
本文介绍了os-copilot的安装及多项功能测评。首先,通过xShell连接服务器并使用`rpm -q os-copilot`检查是否已安装,若未安装则用`yum install`命令安装。接着,配置ACCESS_KEY信息以连接阿里云服务。深入测试部分展示了-t参数用于环境健康检查、-f参数处理复杂任务、|参数解释代码等功能,还演示了编写shell脚本创建和启动Redis的便捷性。这些功能对基础运维和开发人员有较大帮助。
320 14
|
JSON JavaScript Linux
【MCP教程系列】如何自己打包MCP服务并部署到阿里云百炼上
本文章以阿里云百炼的工作流为例,介绍如何将其封装为MCP服务并部署到平台。主要步骤包括:1)使用Node.js和TypeScript搭建MCP服务;2)将项目打包并发布至npm官方平台;3)在阿里云百炼平台创建自定义MCP服务;4)将服务添加到智能体中进行测试。通过这些步骤,您可以轻松实现工作流的MCP化,并在智能体中调用自定义服务。
3755 0
|
监控 负载均衡 Kubernetes
深入探索微服务架构中的服务治理
深入探索微服务架构中的服务治理
397 0
|
消息中间件 Java Spring
Spring Boot与JMS消息中间件的集成
Spring Boot与JMS消息中间件的集成
|
Java
SpringBoot全局异常@RestControllerAdvice全局异常
SpringBoot全局异常@RestControllerAdvice全局异常
150 0
|
存储 算法 前端开发
一文带你学会国产加密算法SM4的java实现方案
今天给大家带来一个国产SM4加密解密算法的java后端解决方案,代码完整,可以直接使用,希望给大家带来帮助,尤其是做政府系统的开发人员,可以直接应用到项目中进行加密解密。
4043 1
@RequiredArgsConstructor(onConstructor_ = @Autowired)报错
@RequiredArgsConstructor(onConstructor_ = @Autowired)报错
|
监控 Java Spring
Spring Boot中一般如何使用线程池?
在Spring Boot应用程序中,合理地使用线程池可以有效地提高系统的性能和并发处理能力。本文将深入探讨Spring Boot中如何一般性地使用线程池,包括线程池的配置、使用方式以及一些最佳实践。
2103 0