SpringBoot使用RabbitMQ发送消息队列

准备

因为RabbitMQ使用erlang语言写的,所以要先安装erlang语言,貌似国内访问很慢,这里放一个下载好的,版本erlang:10.2,rabbitMq:3.7.9

下载:https://pan.baidu.com/s/1k0w23XJYTp_0gTaMb5bXRg
提取码:aeiz

启动(两种方式):

  1. 直接双击rabbitmq_server-3.7.9/sbin/rabbitmq-service.bat
  2. 在cmd中启动

建议在cmd中启动,直接双击的话关闭只能到任务管理器中关

1. 添加依赖

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. 添加配置

1
2
3
4
5
6
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest

3. direct模式

这种是点对点模式,一个发送一个接收

3.1 配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @author pibigstar
* @create 2018-12-17 10:57
* @desc direct模式,点对点,一个发送一个接收
**/
@Configuration
public class QueueRabbitConfig {
public static final String QUEUE_NAME = "pibigstar";
/**
* 定义一个名为:pibigstar 的队列
*/
@Bean
public Queue pibigstarQueue() {
return new Queue(QUEUE_NAME);
}
}

3.2 消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import com.pibgstar.demo.amqp.config.QueueRabbitConfig;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.logging.Logger;

/**
* @author pibigstar
* @create 2018-12-17 11:00
* @desc direct模式消费者
**/
@Component
// 监听pibigstar队列
@RabbitListener(queues = QueueRabbitConfig.QUEUE_NAME)
public class QueueConsumer {
private static Logger logger = Logger.getLogger("QueueConsumer");
@RabbitHandler
public void process(String message) {
logger.info("接受到的消息:"+message);
}
}

3.3 发送者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* @author pibigstar
* @create 2018-12-17 11:04
* @desc 消息发送者
**/
@Component
public class Sender {
@Autowired
private AmqpTemplate rabbitTemplate;

/**
* 点对点发送
*/
public void sendMessage(String message) {
rabbitTemplate.convertAndSend(QueueRabbitConfig.QUEUE_NAME,message);
}
}

4. Topic模式

这种是订阅/发布模式,一个发送多个接收

4.1 配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @author pibigstar
* @create 2018-12-17 11:18
* @desc topic模式,订阅/发布模式,一个发送多个接收
**/
@Configuration
public class TopicRabbitConfig {
// 队列1
public final static String ONE = "topic.one";
// 队列2
public final static String TWO = "topic.two";
// 所有队列
public final static String ALL = "topic.#";
// 转发器
public final static String EXCHANGE = "topic.exchange";

@Bean
public Queue queueOne() {
return new Queue(TopicRabbitConfig.ONE);
}

@Bean
public Queue queueTwo() {
return new Queue(TopicRabbitConfig.TWO);
}

@Bean
TopicExchange exchange() {
return new TopicExchange(EXCHANGE);
}

/**
* 绑定队列one到转发器上
*/
@Bean
Binding bindingExchangeMessage(Queue queueOne, TopicExchange exchange) {
return BindingBuilder.bind(queueOne).to(exchange).with(ONE);
}
/**
* 绑定所有队列(one和two)到转发器上
*/
@Bean
Binding bindingExchangeMessages(Queue queueTwo, TopicExchange exchange) {
return BindingBuilder.bind(queueTwo).to(exchange).with(ALL);
}
}

4.2 消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import com.pibgstar.demo.amqp.config.TopicRabbitConfig;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.logging.Logger;

/**
* @author pibigstar
* @create 2018-12-17 11:29
* @desc topic模式消费者
**/
@Component
public class TopicConsumer {
private static Logger logger = Logger.getLogger("TopicConsumer");

@RabbitListener(queues = TopicRabbitConfig.ONE)
public void receiveOne(String message){
logger.info("one接受到的消息:"+message);
}

@RabbitListener(queues = TopicRabbitConfig.TWO)
public void receiveTwo(String message){
logger.info("two接受到的消息:"+message);
}
}

4.3 发送者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import com.pibgstar.demo.amqp.config.QueueRabbitConfig;
import com.pibgstar.demo.amqp.config.TopicRabbitConfig;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
* @author pibigstar
* @create 2018-12-17 11:04
* @desc 消息发送者
**/
@Component
public class Sender {
@Autowired
private AmqpTemplate rabbitTemplate;
/**
* 一对多发送
*/
public void sendTopicMessage(String message) {
// 往队列one中发送信息, receiverOne和receiverTwo都可以收到
rabbitTemplate.convertAndSend(TopicRabbitConfig.EXCHANGE,TopicRabbitConfig.ONE,message);
// 往队列two中发送信息,只有receiverTwo收到
rabbitTemplate.convertAndSend(TopicRabbitConfig.EXCHANGE,TopicRabbitConfig.TWO,message);
}
}
-------------本文结束感谢您的阅读-------------