【1】pom文件
添加ActiveMQ依赖:
<!--整合ActiveMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> </dependency>
【2】yml配置
yml配置文件如下:
spring: activemq: user: root password: 123456 broker-url: tcp://127.0.0.1:61616 pool: enabled: true max-connections: 50 packages: trust-all: true
【3】生产者消费者队列
主程序类配置如下:
@SpringBootApplication @EnableJms //ActiveMQ public class HhProvinceApplication { public static void main(String[] args) { SpringApplication.run(HhProvinceApplication.class, args); } }
MyActiveMQConfig如下:
@Configuration public class MyActiveMQConfig { @Bean public Queue logQueue() { return new ActiveMQQueue("app.log"); } }
生产者示例:
@Service public class SysVisitLogServiceImpl implements ISysVisitLogService { private static final Logger log = LoggerFactory.getLogger(SysVisitLogServiceImpl.class); @Autowired private JmsMessagingTemplate jmsMessagingTemplate; @Autowired private Queue logQueue; /* 日志插入 */ public void insertVisitLog(SysVisitLog sysVisitLog) { log.debug("insertVisitLog :收到请求,开始调用队列插入访问日志--"+sysVisitLog); jmsMessagingTemplate.convertAndSend(logQueue, sysVisitLog); } }
消费者示例:
@Service public class ConsumerListener { private static final Logger log = LoggerFactory.getLogger(ConsumerListener.class); @Autowired SysVisitLogMapper visitLogMapper; @JmsListener(destination="app.log") @Transactional(rollbackFor={Exception.class}) public void insertVisitLog(SysVisitLog sysVisitLog){ int i = visitLogMapper.insertSelective(sysVisitLog); log.info("消费者插入日志成功 i:"+i+"--sysVisitLog : "+sysVisitLog); } }
【4】以前SSM下使用ActiveMQ
以前在SSM(SpringMVC Spring MyBatis)下主要使用xml对ActiveMQ进行配置,代码中生产者和消费者同样使用注解。
ActiveMQ xml配置如下:
<!--这个是队列目的地,点对点的--> <bean id="InsertVisitLogQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>InsertVisitLogQueue</value> </constructor-arg> </bean> <bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://127.0.0.1:61616" /> <property name="userName" value="root" /> <property name="password" value="123456" /> <property name="useAsyncSend" value="true" /> <property name="trustAllPackages" value="true"/> </bean> </property> </bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory"/> <property name="receiveTimeout" value="2000" /> </bean> <!-- 支持@JmsListener自动启动监听器 --> <jms:annotation-driven/> <bean id="jmsListenerContainerFactory" class="org.springframework.jms.config.DefaultJmsListenerContainerFactory"> <property name="connectionFactory" ref="connectionFactory"/> </bean>
对比SpringBoot2.0,可以发现简化了很多配置。
生产者示例:
@Service public class SysVisitLogServiceImpl implements ISysVisitLogService { private static final Logger log = LoggerFactory.getLogger(SysVisitLogServiceImpl.class); @Autowired private JmsTemplate jmsTemplate; @Qualifier("InsertVisitLogQueue") @Autowired private Destination destinationInsertVisitLogQueue; /* 插入tb_sys_visit_log */ @Override public void insertVisitLog(SysVisitModel sysVisitModel) { log.debug("insertVisitLog :收到请求,开始调用队列插入访问日志--"+sysVisitModel); new Thread(new Runnable(){ @Override public void run() { jmsTemplate.send(destinationInsertVisitLogQueue, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { ActiveMQObjectMessage msg = (ActiveMQObjectMessage) session.createObjectMessage(); msg.setObject(sysVisitModel); /* 一分钟后插入访问日志 */ long delay = 60 * 1000; msg.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay); return msg; } }); }}).start(); } }
消费者示例:
@Service public class ConsumerListener { @JmsListener(destination="InsertVisitLogQueue",concurrency="10-20") @Transactional(rollbackFor={Exception.class}) public void insertVisitLog(SysVisitModel sysVisitModel){ log.info("消费者获取到的sysVisitModel : "+sysVisitModel+Thread.currentThread().getName()); //... } }
pom依赖:
<!-- activeMQ --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> </dependency>
ActiveMQ安装:https://blog.csdn.net/j080624/article/category/7358806