rabbitmq消息阻塞情况分析;自动重试处理异常

现象描述

消费者因为代码问题出现了异常,此时默认是自动提交的消息,这个RuntimException会导致消息直接重新入队,再次投递(进入队首),此时会导致后面的消息被阻塞.

分析

auto自动确认分四种情况,第一种就是正常消费,其他三种则为异常情况

  1. 消息成功被消费,没有抛出异常,则自动确认,回复ack。不涉及requeue,毕竟已经成功了。requeue是对被拒绝的消息生效。
  2. 当抛出ImmediateAcknowledgeAmqpException异常,则视为成功消费,确认该消息。
  3. 当抛出AmqpRejectAndDontRequeueException异常的时候,则消息会被拒绝,且requeue = false(该异常会在重试超过限制后抛出)
  4. 其他的异常,则消息会被拒绝,且requeue = true

我遇到的是第四种情况,导致mq消息阻塞,并且消费者一直在消费同一条消息,然后抛异常,此时就进入了死循环,cpu磁盘io直接拉高.

解决方案

1-在消费者消费逻辑外面套个catch,把异常吃掉,然后把当前异常的消息再做额外处理
2-把mq重新入队关闭
3-抛出ImmediateAcknowledgeAmqpException/AmqpRejectAndDontRequeueException异常

我的解决方案

使用spring-retry重试3次后还是失败就记录到mysql中,作为后续补偿的记录.

需要依赖如下

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-aspects</artifactId>
</dependency>

启动类加上@EnableRetry注解
在这里插入图片描述

实现自动重试代码demo

package com.fchan.mq.process;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Recover;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.Objects;

/**
 * ClassName: MyRabbitConsume <br/>
 * Description: <br/>
 * date: 2022/7/18 17:07<br/>
 *
 * @author fchen<br />
 */
@Component
public class MyRabbitNormalMessageProcess{
    
    Logger log = LoggerFactory.getLogger(MyRabbitNormalMessageProcess.class);

	//默认的重试就是3次,maxAttempts = 第一次正常请求 + 后续异常重试次数
    @Retryable(value = AmqpRejectAndDontRequeueException.class, maxAttempts = 3,backoff = @Backoff(delay = 2000L, multiplier = 1.5), recover = "sendNormalRetryable")
    public Object processMessage(Map<String,Object> data){
        log.info("处理消息中----------");
        if(Objects.equals(data.get("data"), "exception")){
            throw new AmqpRejectAndDontRequeueException("mq消费时出现异常");
        }
        log.info("收到normal信息:{}",data);
        log.info("然后进行一系列逻辑处理 Thanks♪(・ω・)ノ");

        return "success";
    }

	//这里利用的就是spring aop,所以需要aspect依赖
	//入参除了第一个throwable其余需要和原方法一直,返回值也需要一致
    @Recover
    public Object sendNormalRetryable(Throwable throwable, Map<String,Object> data){
        log.error("重试次数上限,异常信息:{}", throwable.getMessage());
        log.info("入参map:{},插入db", data);
        return "exception process";
    }


}