springboot与activemq

  本文章简单介绍了activemq,以及在springboot项目中使用activemq

activemq介绍

  ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。
  ActiveMQ的消息机制:
    1.点对点方式:
  每个消息只能有一个消费者。
    2.发布/订阅消息传递域
      每个消息可以有多个消费者。

在springboot中使用

配置pom文件

1
2
3
4
5
<!-- activemq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

配置文件

1
2
3
4
5
6
7
8
9
10
11
spring:
activemq:
broker-url: tcp://localhost:61616 # URL of the ActiveMQ broker.Auto-generated by default. For instance `tcp://localhost:61616`
in-memory: false # Specify if the default broker URL shouldbe in memory. Ignored if an explicit broker has been specified.是否是内存模式
password: x # Login password of the broker.
user: xx # Login user of the broker.
packages:
trust-all: false
trusted: com.hfy.activemq # Trust all packages.
pool:
enabled: false

配置工厂

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import javax.jms.ConnectionFactory;
@Configuration
@EnableJms
public class JmsConfiguration {
// queue模式的ListenerContainer
@Bean(name = "jmsListenerContainerQueue")
public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
// 实际运行时DefaultMessageListenerContainer对象将使用5000ms(5s)这个时间间隔刷新JMS连接,输出连接异常信息
// 此处使用10s重连
bean.setRecoveryInterval(10000L);
bean.setConnectionFactory(activeMQConnectionFactory);
return bean;
}
// topic模式的ListenerContainer
@Bean(name = "jmsListenerContainerTopic")
public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
// 实际运行时DefaultMessageListenerContainer对象将使用5000ms(5s)这个时间间隔刷新JMS连接,输出连接异常信息
// 此处使用10s重连
bean.setRecoveryInterval(10000L);
bean.setPubSubDomain(true);
bean.setConnectionFactory(activeMQConnectionFactory);
return bean;
}
}

发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;
@Service
public class SendMsgServiceImpl implements SendMsgService {
private static final Logger logger = LoggerFactory.getLogger(SendMsgServiceImpl.class);
@Autowired
private JmsTemplate jmsTemplate;
@Override
public void sendQueueMessageText(String message) {
logger.info("activemq发送消息:{}", message);
jmsTemplate.convertAndSend(QueueList.QUEUE_SEND, message);
}
}

接收消息

  此处是接收的topic模式的消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;
@Service
public class ReceiveMsgServiceImpl implements ReceiveMsgService {
private static final Logger logger = LoggerFactory.getLogger(ReceiveMsgServiceImpl.class);
@JmsListener(destination = QueueList.TOPIC_RECEIVE, containerFactory = "jmsListenerContainerTopic")
@Override
public void getQueueMessageText(String message) {
try {
logger.info("activemq收到消息:{}", message);
// ...
} catch (IOException e) {
// ...
}
}
}

队列的定义

1
2
3
4
5
6
7
8
9
10
11
12
13
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
/**
* 消息队列
*/
@Component
public class QueueList {
public static final String QUEUE_SEND = "queue_send";
public static final String TOPIC_RECEIVE = "topic_receive";
}

异常错误

Could not connect to broker URL

问题:
  Could not refresh JMS Connection for destination ‘queue_toc_tob’ - retrying using FixedBackOff{interval=10000, currentAttempts=2013, maxAttempts=unlimited}. Cause: Could not connect to broker URL: tcp://localhost:61616. Reason: java.net.ConnectException: Connection refused
分析:
  activemq正常启动着,数据量小没有问题,通过 nestat -nalp |grep 61616 发现有很多连接。
解决:
  使用连接池。
1、在pom.xml中:(springboot使用1.5.6.RELEASE版本)

1
2
3
4
5
6
7
8
9
<!-- activemq support -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>

2、在application.yml中:

1
2
3
4
5
6
7
8
spring:
activemq:
。。。。。。
pool:
enabled: true
max-connections: 2
expiry-timeout: 10000
idle-timeout: 30000

参考

http://blog.csdn.net/q_zx1bydcom/article/details/53078870

文章目录
  1. 1. activemq介绍
  2. 2. 在springboot中使用
    1. 2.1. 配置pom文件
    2. 2.2. 配置文件
    3. 2.3. 配置工厂
    4. 2.4. 发送消息
    5. 2.5. 接收消息
    6. 2.6. 队列的定义
  3. 3. 异常错误
    1. 3.1. Could not connect to broker URL
  4. 4. 参考
|