# 基本概念

  • MQ全称 Message Queue(消息队列),是消息传输过程中保存消息的容器
  • 多用于分布式系统之间进行通信
  • 发送方称为生产者,接收方称为消费者

# MQ的优势

  • 应用解耦:解除系统之间的相互作用,提高系统容错性和可维护性
  • 异步提速:提升用户体验和系统吞吐量(单位时间内处理请求的数目)
  • 削峰填谷:限制消费消息的速度,提高系统稳定性

# MQ的劣势

  • 系统可用性降低:系统引入的外部依赖越多,稳定性越差。一旦MQ宕机,就会对业务造成影响
  • 系统复杂度提高:通过MQ进行异步调用。如何保证消息没有被重复消费?怎么处理消息丢失情况?怎么保证消息传递的顺序性?
  • 一致性问题:A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息数据,如果 B 系统、C 系统处理成功,D 系统处理失败。如何保证消息数据处理的一致性?

# 常见的MQ产品

常见的MQ产品

# AMQP

  • 即 Advanced Message Queuing Protocol(高级消息队列协议),是应用层协议的一个开放标准
  • 基于此协议的客户端与消息中间件可传递消息,不受客户端、中间件、开发语言等条件的限制
  • 2006年,AMQP 规范发布。类比HTTP

# JMS

  • JMS 即 Java 消息服务(JavaMessage Service)应用程序接口,是 Java 中关于面向消息中间件的API
  • JMS 是 JavaEE 规范中的一种,类比JDBC
  • 很多消息中间件都实现了JMS规范,如ActiveMQ,官方没有提供RabbitMQ的JMS实现包,开源社区有

# RabbitMQ

2007年,Rabbit 技术公司基于 AMQP 协议开发的 RabbitMQ (opens new window) 1.0 发布。采用 Erlang 语言开发
Erlang 语言由 Ericson 设计,专门开发高并发和分布式系统的一种语言,在电信领域使用广泛

# RabbitMQ 安装配置

  • Windows 安装
//安装erlang并配置环境变量
//新建系统变量名为:ERLANG_HOME 变量值为erlang安装地址
//双击系统变量path,点击“新建”,将%ERLANG_HOME%\bin加入到path中
erl //验证erlang是否安装成功

//安装RabbitMQ
//安装RabbitMQ-Plugins
//RabbitMQ的sbin目录
E:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.4\sbin
//然后输入以下命令进行安装
rabbitmq-plugins enable rabbitmq_management
//验证rabbitmq是否安装成功
rabbitmqctl status
//打开浏览器,地址栏输入mq访问地址
http://127.0.0.1:15672  //用户名和密码,都为guest 

//问题解决:TCP connection succeeded but Erlang distribution failed
//是Erlang新版本的cookie位置换了
C:\Windows\System32\config\systemprofile
//这里有一个.erlang.cookie,复制这个文件到C:\Users\你的用户名下
  • Linux 安装
# 安装依赖环境
yum install build-essential openssl openssl-devel unixODBC unixODBC-devel 
    make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz

# 安装Erlang
rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm

# 安装RabbitMQ
rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm

# 启动rabbitmq
systemctl start rabbitmq-server
# 查看rabbitmq状态
systemctl status rabbitmq-server
# 停止rabbitmq
systemctl stop rabbitmq-server
# 开机启动
systemctl enable rabbitmq-server

# 开启管理界面及配置
# 先停止rabbitmq
systemctl stop rabbitmq-server
# 停止开启web管理端
rabbitmq-plugins enable rabbitmq_management
# 然后启动rabbitmq就可以访问管理页面
systemctl start rabbitmq-server

# 远程登录使用guest账户是登录不上的,因为guest只允许localhost访问
# 这个是查看rabbitmq下面有多少用户
rabbitmqctl list_users
# 增加账户admin并在设置admin的密码
rabbitmqctl add_user admin 密码
# 设置admin的角色为管理员
rabbitmqctl set_user_tags admin administrator
# 设置admin的权限
rabbitmqctl set_permissions -p "/" admin ".\*" ".\*" ".\*"

用户guest访问报错User can only log in via localhost解决方案

修改sbin/rabbitmq.app文件,把其中的一行直接修改成如下,再重启rabbitmq即可

# 如果没有rabbitmq.app文件,创建/etc/rabbitmq/rabbitmq.config文件并添加以下内容
[{rabbit, [{loopback_users, []}]}].

# RabbitMQ 基础架构

RabbitMQ

Producer:# 生产者
Consumer:# 消费者
Broker:# 接收和分发消息的应用,RabbitMQ Server就是 Message Broker
Virtual host:# 数据隔离的作用,默认为 /
              # 多个用户使用同一个 RabbitMQ server 时,可以划分出多个vhost
              # 每个用户在自己的 vhost 创建 exchange/queue

Exchange:# 交换机,匹配查询表中的 routing key,并根据分发规则将消息分发到queue中去。常用的类型有
          fanout (multicast) # 扇形,广播模式
          direct (point-to-point) # 点对点模式
          topic (publish-subscribe) # 通配符模式
  
Queue:# 消息最终被送到这里等待 consumer 取走
Binding:# exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key
Connection:# publisher/consumer 和 broker 之间的 TCP 连接
Channel:# 是Connection内部建立的连接,若应用程序支持多线程,则每个线程创建单独的channel进行通讯

# SpringAMQP

SpringAMQP (opens new window)是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分:

spring-amqp # 是基础抽象
spring-rabbit # 是底层的默认实现

SpringAMQP提供了三个功能:

# 自动声明队列、交换机及其绑定关系
# 封装了RabbitTemplate工具,用于发送消息
# 基于注解的监听器模式,异步接收消息

使用流程,主要分三步:

  • 在父工程中引入spring-amqp的依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列
spring:
  rabbitmq:
    host: 127.0.0.1 # RabbitMQ的IP地址
    port: 5672 # RabbitMQ的通信端口
    username: lyf # RabbitMQ的用户名
    password: 123456 # RabbitMQ的密码
    virtual-host: / # RabbitMQ的虚拟主机,这个可以去RabbitMQ管理界面查看





 








 



@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAMQPTest {
 
    @Autowired
    private RabbitTemplate rabbitTemplate;
 
    @Test
    public void testSendMessage2SimpleQueue() {
        // 队列名称
        String queueName = "simple.queue";
        // 消息
        String message = "hello,spring amqp!";
        // 发送消息
        rabbitTemplate.convertAndSend(queueName, message);
    }
}
  • 在consumer服务中编写消费逻辑,绑定simple.queue这个队列
spring:
  rabbitmq:
    host: 127.0.0.1 # RabbitMQ的IP地址
    port: 5672 # RabbitMQ的通信端口
    username: lyf # RabbitMQ的用户名
    password: 123456 # RabbitMQ的密码
    virtual-host: / # RabbitMQ的虚拟主机,这个可以去RabbitMQ管理界面查看
 


 





@Component // 添加@Component注解
public class SpringRabbitListener {
 
    @RabbitListener(queues = "simple.queue")
    public void  listenerSimpleQueueMessage(String msg){
        System.out.println("接收到消息:【" + msg + "】");
    }
}

开启管理界面及配置

  • Docker安装
docker run 
       --privileged=true 
	   -d --name rabbitmq 
	   --restart=always 
	   -p 5672:5672 
	   -p 15672:15672  
	   -v /usr/docker/rabbitmq/data/:/var/rabbitmq/lib 
	   -e RABBITMQ_DEFAULT_USER=linhuo 
	   -e RABBITMQ_DEFAULT_PASS=Linhuo123@ 
	   rabbitmq:3.12.12-management # management 管理控制台

# 工作模式

工作模式介绍 (opens new window)

# 简单模式

# 一个生产者对一个消费者,不需要设置交换机(使用默认的交换机)

简单模式

# Work queues 工作队列模式

# 多个消费者绑定到一个队列,同一个消息只会被一个消费者处理,不需要设置交换机(使用默认的交换机)
# 应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度,解决消息堆积

工作队列模式

  • 在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAMQPTest {
 
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMessage2WorkQueue() throws InterruptedException {
        // 队列名称
        String queueName = "simple.queue";
        // 消息
        String message = "hello,message____";
        for (int i = 1; i <= 50; i++) {
            // 发送消息
            rabbitTemplate.convertAndSend(queueName, message + i);
            Thread.sleep(20); // 一秒发送50条消息
        }
    }
}
  • 在consumer服务中定义两个消息监听者,都监听simple.queue队列
@Component
public class SpringRabbitListener {
 
    @RabbitListener(queues = "simple.queue")
    public void  listenerWorkQueueMessage1(String msg) throws InterruptedException {
        System.out.println("消费者1接收到消息:【" + msg + "】" + LocalDateTime.now());
        Thread.sleep(20); // 消费者1每秒处理50条消息
    }
 
    @RabbitListener(queues = "simple.queue")
    public void  listenerWorkQueueMessage2(String msg) throws InterruptedException {
        System.err.println("消费者2接收到消息:【" + msg + "】" + LocalDateTime.now());
        Thread.sleep(100); // 消费者2每秒处理10条消息
    }
}
  • 消费预取限制:默认情况下,RabbitMQ会将消息依次投递给每个消费者,没有考虑消费者是否已经处理完,可能出现消息堆积
spring:
   rabbitmq:
      listener:
	     simple:
		    prefetch: 1 #每次只能获取一条消息,处理完成才能获取下一个消息

# Pub/Sub 订阅模式

Queue:# 消息队列,接收消息、缓存消息
Exchange:# 交换机(X),接收生产者发送的消息,并根据分发规则将消息分发到queue中去。常用的类型有
  + Fanout:# 广播,将消息交给所有绑定到交换机的队列
  + Direct:# 定向,把消息交给符合指定routing key 的队列
  + Topic:# 通配符,把消息交给符合routing pattern(路由模式) 的队列

注意

  • 交换机需要与队列进行绑定,绑定之后;允许将同一消息发送给多个消费者
  • Exchange 只负责转发消息,不具备存储消息的能力,如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失

订阅模式

  • 在publisher服务中的SpringAmqpTest类中添加测试方法
@Test
public void testSendFanoutExchange() {
	// 交换机名称
	String exchangeName = "itcast.fanout";
	// 消息
	String message = "hello,everyone!";
	// 发送消息,参数分别是:交互机名称、RoutingKey(暂时为空)、消息
	rabbitTemplate.convertAndSend(exchangeName,"",message);
}
  • 在consumer服务中,创建一个类,声明队列、交换机,并将两者绑定
@Configuration
public class FanoutConfig {
    // 声明交换机
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("itcast.fanout"); // 创建名为itcast.fanout的交换机
    }
 
    // 声明队列-fanout.queue1
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1"); // 创建名为fanout.queue1的队列
    }
 
    // 将交换机与队列1绑定
    @Bean
    public Binding bindingQueue1(FanoutExchange fanoutExchange,Queue fanoutQueue1){
        return BindingBuilder
                .bind(fanoutQueue1)
                .to(fanoutExchange);
    }
 
    // 声明队列-fanout.queue2
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2"); // 创建名为fanout.queue2的队列
    }
 
    // 将交换机与队列2绑定
    @Bean
    public Binding bindingQueue2(FanoutExchange fanoutExchange,Queue fanoutQueue2){
        return BindingBuilder
                .bind(fanoutQueue2)
                .to(fanoutExchange);
    }
}

// 或者
@Configuration
public class FanoutConfig {
    public static final String EXCHANGE_NAME = "boot_topic_exchange";
    public static final String QUEUE_NAME ="boot_queue";

    //1、交换机   durable:是否持久化
    @Bean("bootExchange")
    public Exchange bootExchange(){
       return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }

    //2、队列
    @Bean("bootQueue")
    public Queue bootQueue(){
        return QueueBuilder.durable(QUEUE_NAME).build();
    }

    //3、队列和交换机绑定  noargs:没有参数
    @Bean
    public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, 
	                                 @Qualifier("bootExchange") Exchange exchange){
		//如果需要设置多个routingkey则需要添加多个bindQueueExchange方法
        return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
    }
	
	//或者
	@Bean
    public Binding bindQueueExchange(){
        return BindingBuilder.bind(bootQueue()).to(bootExchange()).with("boot.#").noargs();
    }
}
  • 在consumer服务中的SpringRabbitListener中添加两个方法,作为消费者
@Component
public class SpringRabbitListener {
 
	@RabbitListener(queues = "fanout.queue1")
	public void  listenerFanoutQueue1(String msg){
		System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
	}
 
	@RabbitListener(queues = "fanout.queue2")
	public void  listenerFanoutQueue2(String msg){
		System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
	}
}

# Routing 路由模式

# 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
# 消息的发送方在向Exchange发送消息时,也必须指定消息的RoutingKey
# Exchange不再把消息交给每一个绑定的队列,而是根据消息的RoutingKey进行判断
# 只有队列的RoutingKey与消息的RoutingKey完全一致才会收到消息

路由模式

  • 在publisher服务的SpringAmqpTest类中添加测试方法
@Test
public void testSendDirectExchange() {
	// 交换机名称
	String exchangeName = "itcast.direct";
	// 消息
	String message = "hello,blue!";
	// 发送消息,参数分别是:交互机名称、RoutingKey、消息
	rabbitTemplate.convertAndSend(exchangeName,"blue",message);
}
  • 在consumer的SpringRabbitListener中添加两个消费者,同时基于注解来声明队列和交换机
@Component
public class SpringRabbitListener {
 
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),
            exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),
            key = {"red","blue"}))
    public void  listenerDirectQueue1(String msg){
        System.out.println("消费者1接收到direct.queue1消息:【" + msg + "】");
    }
	
 
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),
            exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),
            key = {"red","yellow"}))
    public void  listenerDirectQueue2(String msg){
        System.out.println("消费者2接收到direct.queue2消息:【" + msg + "】");
    }
}

注意

声明队列有两种方式,一种是在SpringRabbitListener基于注解声明,另一种是在FanoutCofig里面注入Bean声明。但是第一种方式消息一出来就被消费了

# Topic 通配符模式

# Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列
# 只不过 Topic 类型中的 Exchange 可以让队列在绑定 Routing key 的时候使用通配符

# Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
# 通配符规则:# 匹配一个或多个词,例如:item.# 能够匹配 item.insert.abc或者 item.insert
#            * 匹配一个词,例如:item.* 只能匹配 item.insert

通配符模式

  • publisher服务的SpringAmqpTest类中添加测试方法
@Test
public void testSendTopicExchange() {
	// 交换机名称
	String exchangeName = "itcast.topic";
	// 消息
	String message = "恭喜樊振东拿到巴黎奥运会乒乓球男子单打冠军!";
	// 发送消息,参数分别是:交互机名称、RoutingKey、消息
	rabbitTemplate.convertAndSend(exchangeName,"china.news",message);
}
  • 在consumer服务的SpringRabbitListener中添加方法
@Component
public class SpringRabbitListener {
 
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue1"),
            exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
            key = "china.#"
    ))
    public void  listenerTopicQueue1(String msg){
        System.out.println("消费者1接收到topic.queue1消息:【" + msg + "】");
    }
	
     
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue2"),
            exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
            key = "#.news"
    ))
    public void  listenerTopicQueue2(String msg){
        System.out.println("消费者2接收到topic.queue2消息:【" + msg + "】");
    }
}  

# 消息转换器

如果发送的消息类型为对象类型时,获取到的消息是转换后的可读性比较差

@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void testSend(){
	Map<String,Object> msg = new HashMap<>();
	msg.put("name","jack");
	msg.put("age",21);
	rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.haha",msg);
}

消息转换器

建议采用JSON序列化代替默认的JDK序列化,需要做两件事:

  • 在producer和consumer中都要引入jackjson依赖
<dependency>
	<groupId>com.fasterxml.jackson.core</groupId>
	<artifactId>jackson-databind</artifactId>
</dependency>
  • 在producer和consumer中都要配置MessageConverter
@SpringBootApplication
public class ProducerApplication {

    public static void main(String[] args) {

        SpringApplication.run(ProducerApplication.class, args);
    }

    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

# 消息的可靠性

消息从生产者到exchange,再到queue,再到消费者,有哪些导致消息丢失的可能性?

生产者发送的消息未送达exchange # 解决办法:生产者消息确认(发送者确认)
消息到达exchange后未能成功路由到queue # 解决办法:生产者消息确认(发送者回执)
RabbitMQ宕机,queue将消息丢失 # 解决办法:消息持久化
消息者接收到消息后还未来得及消费就宕机 # 解决办法:消息失败重试机制

# 生产者可靠性

# 生产者确认

RabbitMQ作为消息发送方为我们提供了两种方式用来控制消息的投递可靠性模式:

  • publisher-confirm,发送者确认
    • 消息成功投递到交换机,返回ack
    • 消息未投递到交换机,返回nack
  • publisher-return,发送者回执
    • 消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因 生产者确认
spring:
  rabbitmq:
    # 开启确认模式 
	# none关闭确认模式、simple同步阻塞等待MQ回执消息、correlated异步调用返回回执消息
    publisher-confirm-type: correlated 
    publisher-returns: true # 开启退回模式

注意

生产者确认需要额外的网络和系统资源开销,尽量不要使用。
如果一定要使用,无需开启Publisher-Return机制,因为一般路由失败是自己业务问题。

  • 每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置
@Slf4j
@Configuration
public class RabbitMQConfig implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) 
	                                                                 throws BeansException {
        
		RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback(){
            @Override
            public void returnedMessage(ReturnedMessage returnedMessage) {
               log.debug("收到的return callback, exchange:{},key:{},msg:{},code:{},text:{}",
                       returnedMessage.getExchange(),
                       returnedMessage.getRoutingKey(),
                       returnedMessage.getMessage(),
                       returnedMessage.getReplyCode(),
                       returnedMessage.getReplyText());
            }
        });
    }
}
  • ConfirmCallback需要在每次发送消息时指定
@Test
public void testSend(){
	//创建消息的唯一id,根据具体的消息进行回调,以区分不同消息,避免ack冲突
	CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
	cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
		@Override
		public void onFailure(Throwable ex) {
			log.debug("消息回调失败:"+ ex.getMessage());
		}

		@Override
		public void onSuccess(CorrelationData.Confirm result) {
			if(result.isAck()){
				log.debug("消息回调成功,收到ACK");
			}else{
				log.debug("消息回调失败,收到NACK,原因:"+result.getReason());
			}
		}
	});
	Map<String,Object> msg = new HashMap<>();
	msg.put("name","jack");
	msg.put("age",21);
	
	//需要增加cd参数
	rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.haha", msg, cd);
}

# 生产者重连

有时候可能出现客户端连接MQ失败的情况,通过配置失败重连机制解决:

spring:
  rabbitmq:
    connection-timeout: 1s # 设置MQ的连接超时时间
    template:
      retry:
        enabled: true #开启超时重连
        initial-interval: 1000ms # 失败后的初始等待时间
        multiplier: 1 # 失败后下次的等待时长倍数,下次等待时常=initial-interval*multiplier
        max-attempts: 3 #最大重试次数

注意

Spring AMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的,后续业务代码不会执行,会影响业务性能。
如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。

# 消息的持久化

MQ 默认是内存存储,开启持久化可以确保缓存在 MQ 中的消息不丢失,实现数据持久化包括3个方面:

  • 交换机持久化(Durable属性,Spring默认设置为Durable)
  • 队列持久化(Durable属性,Spring默认设置为Durable)
  • 消息持久化(发送消息时设置delivery_mode=2persisent,Spring发送的消息默认是持久化的)
// 交换机持久化
@Bean
public DirectExchange simpleDirect(){
	// 三个参数:交换机名称, 是否持久化, 当没有queue 与其绑定时是否自动删除
	return new DirectExchange("simple.direct", true, false);
}

// 队列持久化
@Bean
public Queue simpleQueue(){
	// 使用QueueBuilder构建队列,durable就是持久化的
	return QueueBuilder.durable("simple.queue").build();
}

// 消息持久化
@Test
public void testDurableMessage(){
	// 1. 准备消息
	Message message = MessageBuilder.withBody("hello".getBytes(StandardCharsets.UTF_8))
			  .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
			  .build();
	// 2. 发送消息
	rabbitTemplate.convertAndSend("simple.queue", message);
}

注意

非持久化,会出现paged out,会阻塞IO,性能下降

# 消费者可靠性

保证消费者的可靠性主要有三种手段:消费者确认机制、消费失败处理、业务幂等性

# 消费者确认机制

当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知消息处理状态,回执有三种可选值:

ack # 成功处理消息,RabbitMQ从队列中删除该消息
nack # 消息处理失败,RabbitMQ需要再次投递消息
reject # 消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息(一般是消息的参数不正确)

Spring AMQP已经实现了消息确认功能。并允许我们通过配置文件选择ACK处理方式,有三种方式:

  • 开启消费者确认其机制
spring:
  rabbitmq:
    listener:
      direct: # 工作模式
		# none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
		# manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活
		
		# auto:自动模式。Spring AMQP利用AOP对我们的消息处理逻辑做了环绕增强
		# 当业务正常执行时则自动返回ack。当业务出现异常时,根据异常判断返回不同结果
		# 如果是业务异常,会自动返回nack
        # 如果是消息处理或校验异常,自动返回reject
        acknowledge-mode: auto
  • 消费者业务模拟异常
@RabbitListener(queues = "simple.queue")
public void listSimpleQueue(String msg) {
	System.out.println("消费者收到了simple.queue的消息:【" + msg + "】");
	throw new RuntimeException("测试异常");
}

结果

auto模式下,消息未被处理会保留,并会一直尝试重新投递给消费者

# 消费者失败重试

当消费者出现异常后,消息会不断重新入队,再重新发送给消费者,然后再次异常,无限循环,导致mg的消息处理飙升,带来不必要的压力

# 消费者配置文件
spring:
  rabbitmq:
    listener:
	  # 需要设置为simple
      simple:
        acknowledge-mode: auto #none:关闭ack;manual:手动ack;auto:自动ack
        retry:
          enabled: true #开启消费者失败重试
          initial-interval: 1000ms #初始的失败等待时长为1秒
          multiplier: 1 #下次失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 #最大重试次数
          stateless: true #true无状态;false有状态。如果业务中包含事务,这里改为false

如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:

RejectAndDontRequeueRecoverer # 重试耗尽后,直接reject,丢弃消息。默认就是这种方式
ImmediateRequeueMessageRecoverer # 重试耗尽后,返回nack,消息重新入队
RepublishMessageRecoverer # 重试耗尽后,将失败消息投递到指定的交换机

RepublishMessageRecoverer 模式的处理过程

  • 首先,定义接收失败消息的交换机,队列及其绑定关系:
@Bean
public DirectExchange errorMessageExchange(){
	return new DirectExchange("error.direct");
}

@Bean
public Queue errorQueue(){
	return new Queue("error.queue");
}

@Bean
public Binding errorMessageBinding(){
	return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
}
  • 定义 RepublishMessageRecoverer
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate)
{
	return new RepublishMessageRecoverer(rabbitTemplate, "error.direct","error");
}

消费者失败重试

完成上面的配置后,消费者处理消息异常后,会进行本地的重试。而不是直接 requeue 到队列中

当重试次数耗尽后,会把错误消息投递到 error.direct 交换机上,然后在 error.queue上进行转存。这样既能减轻 mq 的压力,也能在队列上找到处理异常的消息,进行人工介入处理。

# 业务幂等性

幂等是一个数学概念,用函数表达式来描述是这样的: f(x)= f(f(X)。
在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。

保证业务幂等性的方案:

  • 方案一:是给每个消息都设置一个唯一id,利用id区分是否是重复消息
@Bean
public MessageConverter jacksonMessageConvertor() {
	//1.定义消息转换器
	Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
	//2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
	jjmc.setCreateMessageIds(true);
	return jjmc;
}
  • 方案二:结合业务逻辑,基于业务本身做判断
# 以我们的业务为例:我们要在支付后修改订单状态为已支付,应该在修改订单状态前先查询订单状态
# 判断状态是否是未支付。只有未支付订单才需要修改,其它状态不做处理
  • 方案三:使用Token令牌,生成一个token存储在redis中
# 请求的时候携带这个token一起请求,后端需要对这个Token作为 Key在redis中进行校验
# 如果 Key存在就执行删除命令,然后正常执行后面的业务逻辑
# 如果不存在对应的 Key 就返回重复执行的错误信息,这样来保证幂等操作

# 延迟消息设置

生产者发送消息后,消费者不会立刻收到消息,而是在指定时间之后才收到,延迟消息的实现有两种:

  • 死信交换机
  • 延迟消息插件

# 死信交换机

当一个队列中的消息满足下列情况之一时,可以称为死信 (dead letter):

# 消费者使用 basic.reject 或 basic.nack 声明消费失败,并且消息的 requeue 参数设置为 false
# 消息是一个过期消息,超时无人消费
# 要投递的队列消息堆积满了,最早的消息可能成为死信

如果队列通过dead-letter-exchange属性指定了一个交换机,那么该队列中的死信就会投递到这个死信交换机(Dead Letter Exchange,简称DLX)中 死信交换机

注意死信交换机绑定的是队列,错误消息交换机绑定的是消费者

如何给队列绑定死信交换机?

  • 给队列设置 dead-letter-exchange 属性,指定一个交换机
  • 给队列设置 dead-letter-routing-key 属性,设置死信交换机与死信队列的 RoutingKey

TTL也就是 Time-To-Live。如果一个队列中的消息 TTL 结束仍未消费,则变为死信,ttl 超时分两种情况:

# 消息所在的队列设置了存活时间
# 消息本身设置了存活时间
# 两者共存时,以时间短的 ttl 为准

实现过程:

  • 先声明一组死信交换机和队列 (dl.direct 和 dl.queue)
@RabbitListener(bindings = @QueueBinding(
	value = @Queue(name = "dl.queue", durable = "true"),
	exchange = @Exchange(name = "dl.direct"),
	key = "dl"
))
public void listenDlQueue(String msg){
	log.info("消费者接收到了 dl.queue的延迟消息");
}
  • 声明投递消息的队列,并设置超时时间 (配置x-message-ttl 属性)
@Configuration
public class TTLMessageConfig {

	// 延时交换机
	@Bean
	public DirectExchange directExchange(){
		return new DirectExchange("ttl.direct");
	}
	
	// 延时队列
	@Bean
	public Queue ttlQueue(){
		return QueueBuilder
			.durable("ttl.queue") // 指定队列名称,并持久化
			.ttl(10000) // 设置队列超时时间,10秒
			.deadLetterExchange("dl.direct") // 指定死信交换机
			.deadLetterRoutingKey("dl") // 指定死信RoutingKey
			.build();
	}
	@Bean
	public Binding ttlBinding(){
		return BindingBuilder.bind(ttlQueue()).to(directExchange()).with("ttl");
	}
}
  • 发送消息时也可以给消息本身设置超时时间
@Test
public void testTTLMessage(){
	// 1. 准备消息
	Message message = MessageBuilder
		.withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8))
		.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
		.setExpiration("5000") // 设置消息超时时间
		.build();
	//消息TD,需要封装到correlationData中CorrelationData
	correlationData = new CorrelationData(UUID.randomUUID().toString());
	// 2. 发送消息
	rabbitTemplate.convertAndSend("ttl.direct","ttl", message, correlationData);
	// 3.记录日志
	log.info("消息已经成功发送!");
}

# 延迟消息插件(推荐使用)

原理是一种支持消息延迟功能的交换机,当消息投递到交换机后可以暂存一定时间,到期后再投递到队列

  • 点击下载 (opens new window)与RabbitMQ版本对应的延迟消息插件
  • 将下载的插件存放在 D:\Program Files\RabbitMQ Server\rabbitmq_server-3.12.6\plugins 目录下
  • 打开RabbitMQ sbin控制台,输入
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  • 安装成功后打开 RabbitMQ Management,看到一下场景即为安装成功

死信交换机

  • DelayExchange的本质还是官方的三种交换机,只是添加了延迟功能,因此使用时只需要声明一个交换机,交换机的类型可以是任意类型,然后设定delaved属性为true即可


 













 



















 













@RabbitListener(bindings = @QueueBinding(
		value = @Queue(value = "delay.queue", durable = "true"),
		exchange = @Exchange(value = "delay.direct", delayed = "ture"),
		key = "hi"
))
public void listenDelayQueue(String msg) {
	log.info("接收到delay.queue的消息:{}", msg);
}

// 然后我们向这个delay为true的交换机中发送消息
@Test
void testSendDelayMessage() {
	rabbitTemplate.convertAndSend("delay.direct", "hi", "hello", new MessagePostProcessor(){
		@Override
		public Message postProcessMessage(Message message) throws AmqpException {
			// 一定要添加一个header:x-delay,值为延迟的时间,单位为毫秒
			message.getMessageProperties().setDelay(10000);
			return message;
		}
	});
	log.info("消息发送成功!!");
}

// 延迟消息,忽略错误提示
@Slf4j
@Configuration
public class RabbitMQConfig implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) 
	                                                               throws BeansException {
        
		RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback(){
            @Override
            public void returnedMessage(ReturnedMessage returnedMessage) {
				// 是一个延迟消息,忽略这个错误提示
				if(message.getMessageProperties().getReceivedDelay()>0){
					return;
				}
                log.debug("收到return callback, exchange:{},key:{},msg:{},code:{},text:{}",
                       returnedMessage.getExchange(),
                       returnedMessage.getRoutingKey(),
                       returnedMessage.getMessage(),
                       returnedMessage.getReplyCode(),
                       returnedMessage.getReplyText());
            }
        });
    }
}

# 消息堆积问题

当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。最早接收到的消息,可能就会成为死信,会被丢弃,这就是消息堆积问题。

解决消息堆积有三种种思路:

# 增加更多消费者,提高消费速度
# 在消费者内开启线程池加快消息处理速度
# 扩大队列容积,提高堆积上限

惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存(内存中只保留最近的消息,默认2048条)
  • 消费者要消费消息时才会从磁盘中读取并加载到内存
  • 支持数百万条的消息存储

注意

在3.12版本后,所有队列都是Lazy Queue模式,无法更改

3.12之前需要设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可:












 





// 创建惰性队列
@Bean
public Queue lazyQueue(){
	return QueueBuilder.durable("lazy.queue").lazy().build();
}


// 基于注解创建
@RabbitListener(queuesToDeclare = @Queue(
	name = "lazy.queue",
	durable = "true",
	arguments = @Argument(name = "x-queue-mode",value = "lazy")
))
public void listenLazyQueue(String msg){
	log.info("接收到lazy.queue的消息:{}",msg);
}

# 消息的高可用

同其他中间件解决高可用的方法一样,那就是搭建集群。
RabbitMQ的是基于Erlang语言编写,而Erlang天然支持集群模式。RabbitMQ的集群有两种模式:

普通集群  # 是一种分布式集群,将队列分散到集群的各个节点,从而提高整个集群的并发能力
镜像集群  # 是一种主从集群,普通集群的基础上,添加了主从备份功能,提高集群的数据可用性

仲裁队列

镜像集群虽然支持主从,但主从同步并不是强一致的,在同步期间可能有数据丢失的风险。
因此在RabbitMQ3.8版本后,推出仲裁队列代替镜像集群,底层采用Raft协议确保主从数据一致。

# 普通集群

普通集群,或者叫标准集群 (classic cluster),具备下列特征:

# 会在集群的各个节点间共享部分数据,包括:交换机、队列元信息。不包含队列中的消息
# 当访问集群某节点时,如果队列不在该节点,会从数据所在节点传递到当前节点并返回
# 队列所在节点宕机,队列中的消息就会丢失

集群模式中的每个RabbitMQ节点使用Cookie来确定它们是否被允许相互通信

# 具有相同的共享秘密两个节点才能够通信,称为Erlang cookie。Cookie是最多255个字符的字母数字字符
# 从一个启动的MQ实例获取Cookie:docker exec -it mq cat /var/lib/rabbitmq/.erlang.cookie

使用Docker搭建集群流程

# 1、在/tmp目录新建一个配置文件 rabbitmq.conf,文件内容如下:
loopback_users.guest = false # 禁用guest用户访问
listeners.tcp.default = 5672 # 访问端口
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
cluster_formation.classic_config.nodes.1 = rabbit@mq1 # 节点名称
cluster_formation.classic_config.nodes.2 = rabbit@mq2
cluster_formation.classic_config.nodes.3 = rabbit@mq3

# 2、再创建一个配置文件用来存放Cookie信息
touch .erlang.cookie # 创建文件
echo "XSLBYZHRKDOLAMZYTNML" > .erlang.cookie # 写入cookie
# 修改cookie文件的权限为只读,不允许其他人修改
chmod 600 .erlang.cookie

# 3、创建实例文件夹
mkdir mq1 mq2 mq3
# 将配置文件拷贝到其他文件夹
cp rabbitmq.conf mq1
cp rabbitmq.conf mq2
cp rabbitmq.conf mq3
cp .erlang.cookie mq1
cp .erlang.cookie mq2
cp .erlang.cookie mq3

# 4、准备一个docker网络
docker network create mq-net

# 5、启动容器
docker run -d --net mq-net \ # 启动第一个容器并设置网络
-v ${PWD}/mq1/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
--name mq1 \
--hostname mq1 \
-p 8071:5672 \
-p 8081:15672 \
rabbitmq:tag

docker run -d --net mq-net \ # 启动第二个容器并设置网络
-v ${PWD}/mq2/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
--name mq2 \
--hostname mq2 \
-p 8072:5672 \
-p 8082:15672 \
rabbitmq:3.8-management

docker run -d --net mq-net \ # 启动第三个容器并设置网络
-v ${PWD}/mq3/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
--name mq3 \
--hostname mq3 \
-p 8073:5672 \
-p 8083:15672 \
rabbitmq:tag

访问8081端口,在OverView中看到三个节点
普通集群

# 镜像集群

镜像集群本质是主从模式,具备下面的特征:

# 交换机、队列、队列中的消息会在各个mq的镜像节点之间同步备份
# 创建队列的节点被称为该队列的主节点,备份到的其它节点叫做该队列的镜像节点
# 一个队列的主节点可能是另一个队列的镜像节点
# 所有操作都是主节点完成,然后同步给镜像节点
# 主节点宕机后,镜像节点会被当做主节点

镜像集群有三个模式:

ha-mode ha-params 效果
精确模式 队列的副本量count 集群中队列副本(主服务器和镜像服务器之和)的数量。count如果为1意味着单个副本:即队列主节点。count值为2表示2个副本:1个队列主和1个队列镜像。换句话说:count = 镜像数量 + 1。如果群集中的节点数少于count,则该队列将镜像到所有节点。如果有集群总数大于count+1,并且包含镜像的节点出现故障,则将在另一个节点上创建一个新的镜像
all (none) 队列在群集中的所有节点之间进行镜像。队列将镜像到任何新加入的节点。镜像到所有节点将对所有群集节点施加额外的压力,包括网络I/O,磁盘I/O和磁盘空间使用情况。推荐使用精确模式,设置副本数为(N/2+1)
nodes node names 指定队列创建到哪些节点,如果指定的节点全部不存在,则会出现异常。如果指定的节点在集群中存在,但是暂时不可用,会创建节点到当前客户端连接到的节点

精确模式:

docker exec -it mq1 bash
rabbitmqctl set_policy hello "^two\."
                            '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'

# rabbitmqctl set_policy:固定写法
# hello:策略名称,自定义
# "^two\.":匹配队列的正则表达式,符合命名规则的队列才生效,这里是任何以two.开头的队列名称
# '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}': 策略内容
# "ha-mode":"exactly":策略模式,此处是exactly模式,指定副本数量
# "ha-params":2:策略参数,这里是2,就是副本数量为2,1主1镜像
# "ha-sync-mode":"automatic":同步策略,默认是manual,即新加入的镜像节点不会同步旧的消息
# 如果设置为automatic,则新加入的镜像节点会把主节点中所有消息都同步,会带来额外的网络开销

all模式:

docker exec -it mq2 bash
rabbitmqctl set_policy hello "^all\." '{"ha-mode":"all"}'

# hello:策略名称,自定义
# "^all\.":匹配所有以all.开头的队列名
# '{"ha-mode":"all"}':策略内容
# "ha-mode":"all":策略模式,此处是all模式,即所有节点都会称为镜像节点

nodes模式:

docker exec -it mq3 bash
rabbitmqctl set_policy hello "^nodes\."
                             '{"ha-mode":"nodes","ha-params":["rabbit@mq1", "rabbit@mq2"]}'

# hello:策略名称,自定义
# "^nodes\.":匹配队列的正则表达式,这里是任何以nodes.开头的队列名称
# '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}': 策略内容
# "ha-mode":"nodes":策略模式,此处是nodes模式
# "ha-params":["rabbit@mq1", "rabbit@mq2"]:策略参数,这里指定副本所在节点名称

# 仲裁队列

仲裁队列:仲裁队列是3.8版本以后才有的新功能,用来替代镜像队列,具备下列特征:

# 与镜像队列一样,都是主从模式,支持主从数据同步(默认的count为5)
# 使用非常简单,没有复杂的配置
# 主从同步基于Raft协议,强一致

仲裁队列

使用AMQP实现仲裁队列:

# 修改配置文件,配置节点信息
spring:
  rabbitmq:
  	addresses: 192.168.116.131:8071,192.168.116.131:8072,192.168.116.131:8073
    username: admin
    password: admin
    virtual-host: /
	
# 创建队列
@Configuration
public class QuorumConfig {
    @Bean
    public Queue quorumQueue(){
        return QueueBuilder
        .durable("quorum.queue2")
        .quorum()
        .build();
    }
}

# 启动消费者就可以看到已经创建出quorum.queue2队列了