Java面试题~消息中间件RabbitMQ如何保证消息不丢失且100%投递成功

作者: 修罗debug
版权声明:本文为博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。

摘要

RabbitMQ是一款可以用于实现消息通信、服务解耦的分布式中间件,在实际项目中具有许多典型的应用场景,如异步记录日志、业务服务模块异步通信解耦、高并发抢购场景下流量削峰、限流等等,本文我们将以实际项目中典型的业务场景:“发送邮件” 为案例,一起探讨RabbitMQ在发送消息时如何保证消息不丢失且保证其100%投递成功。

题外话~Java技术面时这无疑是一道高频必问之题

内容

一、 整体业务流程介绍

“发送邮件”这一业务场景想必各位小伙伴都经历过,在业务要求不是很严格的情况下,“邮件发送”可以采用异步的方式加以实现,传统的做法是直接在发送邮件的方法上加上注解 @Async 即可实现;

而在微服务、分布式时代,更多的是利用中间件加以实现,此时MQ便排上了用场,目前市面上比较典型的产品包括:ActiveMQRabbitMQKafka 以及 RocketMQ等等,在这里我们以RabbitMQ为主,并由此介绍一下RabbitMQ在发送消息期间如何保证消息不丢失以及保证其100%投递成功呢?

对这一问题,我们还是先给出答案吧,后续我们还会采用代码实战一一验证这些答案:

1)“发送确认”模式:即生产者通过MQ发送消息后,MQ需要将“已发送成功/失败”反馈给生产者,告知生产者消息已投递成功,此方式可确保消息正确地发送至RabbitMQ

 

2)“消费确认”模式:即消费者监听到MQ中队列的消息并执行完对应的业务逻辑后,需要发送“消息已被成功监听、消费”反馈给MQ,此方式可保证接收方正确接收并消费了消息,消费成功后消息将从队列中移除

 

3)“避免消息重复投递”:生产者在生产消息时,MQ内部会针对每条消息生成一个MsgId,该标识可以作为去重的依据(消息投递失败并重传),避免重复的消息进入队列;

 

4)“消息消费时保证幂等性”:这一点可以利用业务本身的特性来实现,即每个业务实体一般都会有一个唯一的ID,就像数据库表中唯一的主键一样,在监听消费处理时根据ID作为去重的依据;

 

5)“持久化”:将队列、交换机、消息都设置为持久化模式,确保消息在投递、发送期间出现MQ服务宕机后重启恢复过来时消息依旧存在;

 

6)“消息消费重试机制”:指的是消费者在监听、消费、处理消息的过程中出现了异常,导致业务逻辑没有处理成功,此时可以开启“消息重入队列”机制,设置消息重入队列N 进行 重试消费;

 

7)“消息投递补偿机制”:指的是消息在生产、投递期间出现“投递失败”,也就是“发送不成功”的情况,此时可以将其加入到DB中,并开启定时任务,拉取那些投递不成功的消息,重新投递入队列,如此一来便可以保证消息不丢失且准备被投递。

 

交代完了答案,接下来我们以“发送邮件”这一业务场景为案例,结合实际的代码实战一一验证上述的答案,如下图所示为这一业务场景以及“RabbitMQ保证消息不丢失和100%投递成功”的整体实现流程图:   


二、整体业务流程核心代码实战

1)首先,需要在数据库中创建一数据库表msg_log,用于记录RabbitMQ消息的投递以及消费过程,其DDL如下所示:

CREATE TABLE `msg_log` (
`msg_id` varchar(155) NOT NULL DEFAULT '' COMMENT '消息唯一标识',
`msg` text COMMENT '消息体, json格式化',
`exchange` varchar(255) NOT NULL DEFAULT '' COMMENT '交换机',
`routing_key` varchar(255) NOT NULL DEFAULT '' COMMENT '路由键',
`status` int(11) NOT NULL DEFAULT '0' COMMENT '状态: 0投递中 1投递成功 2投递失败 3已消费',
`try_count` int(11) NOT NULL DEFAULT '0' COMMENT '重试次数',
`next_try_time` datetime DEFAULT NULL COMMENT '下一次重试时间',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
`update_time` datetime DEFAULT NULL COMMENT '更新时间',
PRIMARY KEY (`msg_id`),
UNIQUE KEY `unq_msg_id` (`msg_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息投递日志';

紧接着,需要利用Mybatis逆向工程生成该数据库表对应的实体类EntitySQL操作接口Mapper以及Mapper.xml,在这里就不贴出来了,各位小伙伴自行生成即可(也可以在文末下载对应的源码直接拷贝出来使用即可)

2)接下来是创建用于发送邮件的控制器MailController,其完整源码如下所示:

@RestController
@RequestMapping("mail")
public class MailController {

@Autowired
private MailService mailService;

@Autowired
private RabbitTemplate rabbitTemplate;

@Autowired
private Environment env;

@Autowired
private MsgLogMapper msgLogMapper;

private static final Snowflake snowflake=new Snowflake(3,2);

//发送邮件
@RequestMapping(value = "send/v2",method = RequestMethod.POST)
public BaseResponse sendMailV2(@RequestBody @Validated MailDto dto, BindingResult result){
String checkRes=ValidatorUtil.checkResult(result);
if (StringUtils.isNotBlank(checkRes)){
return new BaseResponse(StatusCode.InvalidParams.getCode(),checkRes);
}
BaseResponse response=new BaseResponse(StatusCode.Success);
try {
MsgLog entity=new MsgLog();
String msgId=snowflake.nextIdStr();

entity.setMsgId(msgId);
entity.setExchange(env.getProperty("mq.email.v2.exchange"));
entity.setRoutingKey(env.getProperty("mq.email.v2.routing.key"));
entity.setStatus(Constant.MsgStatus.Sending.getStatus());
entity.setTryCount(0);
entity.setCreateTime(DateTime.now().toDate());

dto.setMsgId(msgId);
final String msg=new Gson().toJson(dto);

entity.setMsg(msg);
//在发送消息之前先创建并入库
msgLogMapper.insertSelective(entity);

//发送消息
rabbitTemplate.convertAndSend(entity.getExchange(), entity.getRoutingKey(),msg,new CorrelationData(entity.getMsgId()));

}catch (Exception e){
response=new BaseResponse(StatusCode.Fail.getCode(),e.getMessage());
}
return response;
}
}

从该代码中可以得知在RabbitMQ真正发送消息之前,需要将该消息插入数据库,并设置其状态为 “投递中”;与此同时,需要利用“雪花算法”工具生成该消息的全局唯一标识MsgId,并用其充当消息的标识;

之后便是将消息交给了RabbitMQ 执行发送逻辑,而为了能让RabbitMQ真正发送消息,我们需要在RabbitmqConfig配置类中创建相应的交换机、路由以及队列绑定;除此之外,需要在自定义注入RabbitTemplate  Bean组件中加入“发送确认”的处理逻辑,即监听 “消息是否真的投递成功”;

同时,也需要在自定义配置“监听器容器工厂” Bean组件中加入“消息确认消费模式”,在这里,我们采用的是“手动确认消费”模式,其完整源码如下所示:

//通用化 Rabbitmq 配置
@Configuration
public class RabbitmqConfig {

private final static Logger log = LoggerFactory.getLogger(RabbitmqConfig.class);

@Autowired
private Environment env;
@Autowired
private CachingConnectionFactory connectionFactory;
@Autowired
private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;

@Autowired
private MsgLogMapper msgLogMapper;

//多实例消费者 – 设定确认消费模式为手动确认
@Bean(name = "multiListenerContainer")
public SimpleRabbitListenerContainerFactory multiListenerContainer(){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factoryConfigurer.configure(factory,connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
//确认消费模式
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.simple.concurrency",int.class));
factory.setMaxConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.simple.max-concurrency",int.class));
factory.setPrefetchCount(env.getProperty("spring.rabbitmq.listener.simple.prefetch",int.class));
return factory;
}

//消息发送操作组件自定义注入配置:设置 "生产需要确认"
@Bean
public RabbitTemplate rabbitTemplate(){
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

// 触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("消息发送成功:消息唯一标识correlationData({}),消息确认结果:ack({}),失败原因:cause({})",correlationData,ack,cause);

//发送确认结果
if (correlationData!=null && StringUtils.isNotBlank(correlationData.getId())){
if (ack) {
//消息投递成功
msgLogMapper.updateMsgSendStatus(correlationData.getId(), Constant.MsgStatus.SendSuccess.getStatus());
} else {
//消息投递失败
msgLogMapper.updateMsgSendStatus(correlationData.getId(), Constant.MsgStatus.SendFail.getStatus());
}
}
}
});
// 消息是否从Exchange路由到Queue, 注意: 这是一个失败回调, 只有消息从Exchange路由到Queue失败才会回调这个方法
// 启动消息失败返回,比如路由不到队列时触发回调
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.warn("消息丢失:交换器exchange({}),路由route({}),响应结果replyCode({}),响应信息replyText({}),消息message:{}",exchange,routingKey,replyCode,replyText,message);
}
});
return rabbitTemplate;
}

//构建发送邮件消息模型-队列、交换机做持久化,消息到时候也设置持久化,将意味着MQ在发送消息的整个过程,将会被记录到持久化日志文件中
//可靠性投递
@Bean
public Queue emailQueueV2(){
return new Queue(env.getProperty("mq.email.v2.queue"),true);
}
@Bean
public DirectExchange emailExchangeV2(){
return new DirectExchange(env.getProperty("mq.email.v2.exchange"),true,false);
}
@Bean
public Binding emailBindingV2(){
return BindingBuilder.bind(emailQueueV2()).to(emailExchangeV2()).with(env.getProperty("mq.email.v2.routing.key"));
}
}

3)然后是创建用于“监听消费消息”的消费者MqListener,其监听消费处理消息的逻辑为:获取并解析出消息体的内容,根据MsgId查询消息当前的发送状态,如果处于“未投递”或者“投递失败”且“重试次数还没超过最大次数”时,则获取该消息体的真实内容,并将其交给MailService执行“发送邮件”的逻辑,其完整源代码如下所示:

@Component
public class MqListener {

private static final Logger log= LoggerFactory.getLogger(MqListener.class);

@Autowired
private MailService mailService;

@Autowired
private MsgLogMapper msgLogMapper;

//监听消费发送邮件消息-v2
@RabbitListener(queues = {"${mq.email.v2.queue}"},containerFactory = "multiListenerContainer")
public void consumeMailV2(Message message, Channel channel) throws Exception{
log.info("---监听消费发送邮件消息-v2---开始处理中...");
Long deliverTag=message.getMessageProperties().getDeliveryTag();

Integer tryCount=0;
String msgId="";
try {
MailDto dto=new Gson().fromJson(new String(message.getBody()), MailDto.class);
msgId=dto.getMsgId();
log.info("---监听到消息-v2:{}",dto);

//maxTry=msgLogMapper.selectMaxTry(dto.getMsgId());

/*此处可以加上一层分布式锁,保证超高并发时处理消息的原理操作 ~ 读者自行实现,有问题可以随时交流*/

//防止重复投递 - 保证幂等性
MsgLog msgLog=msgLogMapper.selectByPrimaryKey(dto.getMsgId());
if (msgLog!=null && !Constant.MsgStatus.ConsumeSuccess.getStatus().equals(msgLog.getStatus()) && msgLog.getTryCount()<Constant.MsgTryCount){
tryCount=msgLog.getTryCount();

//核心业务逻辑【消息处理~发送邮件(同步;异步)】
boolean res=mailService.sendSimpleEmail(msgId,dto.getSubject(),dto.getContent(),dto.getTos());
if (res){
//确认消费
channel.basicAck(deliverTag,false);
return;
}
}
}catch (Exception e){
e.printStackTrace();

//出现异常时可以走重试机制 ~ 重试次数为3次,每次间隔8s(自己灵活设定即可)
if (tryCount<Constant.MsgTryCount){
Thread.sleep(8000);

channel.basicNack(deliverTag,false,true);
msgLogMapper.updateMaxTry(msgId);
return;
}
}

//当走到这一步时代表消息已被消费(status=3)或者 重试次数达到上限 等情况-但不管是哪种情况,都需要将消息从队列中移除,防止下次项目重启时重新监听消费
channel.basicAck(deliverTag,false);
}
}

4)而mailServicesendSimpleEmail方法则主要用于执行真正的“发送邮件”逻辑,同时将最终的发送结果(业务逻辑处理结果)及时更新至数据库表“消息的发送状态”中,其完整的源码如下所示:

@Service
public class MailService {
private static final Logger log= LoggerFactory.getLogger(MailService.class);

@Autowired
private Environment env;

@Autowired
private JavaMailSender mailSender;

@Autowired
private MsgLogMapper msgLogMapper;

//TODO:发送简单的邮件消息
//@Async("threadPoolTaskExecutor")
public Boolean sendSimpleEmail(final String msgId,final String subject,final String content,final String ... tos) throws Exception{
Boolean res=true;
try {
SimpleMailMessage message=new SimpleMailMessage();
message.setSubject(subject);
message.setText(content);
message.setTo(tos);
message.setFrom(env.getProperty("mail.send.from"));
mailSender.send(message);

//int i=1/0;

log.info("----发送简单的邮件消息完毕--->");
}catch (Exception e){
log.error("--发送简单的邮件消息,发生异常:",e.fillInStackTrace());
res=false;
throw e;
}finally {
this.updateMsgSendStatus(msgId,res);
}
return res;
}

//TODO:更新消息处理的结果
private void updateMsgSendStatus(final String msgId,Boolean res){
if (StringUtils.isNotBlank(msgId)){
if (res){
msgLogMapper.updateMsgManageSuccess(msgId, Constant.MsgStatus.ConsumeSuccess.getStatus());
}else{
msgLogMapper.updateMsgManageSuccess(msgId, Constant.MsgStatus.ConsumeFail.getStatus());
}
}
}
}

5)最后是“定时任务补偿投递”机制,即开启一定时任务,拉取投递失败的消息重新进行投递,其源码如下所示:

@Component
public class MqScheduler {

private static final Logger log= LoggerFactory.getLogger(MqScheduler.class);

@Autowired
private MsgLogMapper msgLogMapper;

@Autowired
private RabbitTemplate rabbitTemplate;

//定时任务重新拉取并投递
@Scheduled(cron = "0/10 * * * * ?")
public void reSendMsg(){
try {
String status= Constant.MsgStatus.Sending.getStatus()+","+Constant.MsgStatus.SendFail.getStatus();
List<MsgLog> msgLogs=msgLogMapper.selectSendFailMsg(status);
if (msgLogs!=null && !msgLogs.isEmpty()){
msgLogs.forEach(msgLog -> {
if (StringUtils.isNotBlank(msgLog.getMsg()) && StringUtils.isNotBlank(msgLog.getExchange())
&& StringUtils.isNotBlank(msgLog.getRoutingKey())){
//发送消息
rabbitTemplate.convertAndSend(msgLog.getExchange(), msgLog.getRoutingKey(),msgLog.getMsg(),
new CorrelationData(msgLog.getMsgId()));
log.info("----已成功重新投递消息,消息id={}",msgLog.getMsgId());
}
});
}
}catch (Exception e){
log.error("定时任务重新拉取投递失败的消息~重新进行投递~发生异常:",e.fillInStackTrace());
}
}
}

做完这些之后,便可以开始整个业务流程的测试了,在此期间需要分多种情况进行测试,第一种是正常投递、正常消费的情况;第二种是投递失败,定时任务定时拉取重新投递的情况;最后一种是消息确认消费失败进入重试的情况。  

三、整个业务流程的测试

1)正常投递、正常消费的情况

A.点击运行项目,在Postman输入相应的请求URL、请求体、请求方式 以及请求体的内容类型,如下图所示:


B.观察IDEA控制台的输出信息,会发现消息已经成功插入数据库,并投递成功,稍等一两秒后可以看到消息已经被监听消费,如下图所示:




C.稍等片刻,会发现邮件已经成功,与此同时,数据库中该条消息的状态已变更为“消费成功”,如下图所示:





D.打开自己的邮箱,会发现也已经成功收到了该邮件信息,如下图所示:


2)投递失败,定时任务定时拉取重新投递的情况

A.为了演示“消息投递失败/不成功”的情况,我们可以将Controller中的sendMailV2()方法里的“发送MQ消息”的逻辑注释起来,如下所示:

//发送消息
//rabbitTemplate.convertAndSend(entity.getExchange(), entity.getRoutingKey(),msg,new CorrelationData(entity.getMsgId()));

这将意味着,前端Postman触发请求调用这个接口时,插入数据库中的消息的状态为“投递中”,即 status=0 的情况;除此之外,还可以偷偷地调整 消息对应的“交换机”或者“路由”的值,这样一来,RabbitmqConfigRabbitTemplate方法将会触发“消息投递失败 404”的错误,也就是 status=2 的情况;


B.下面我们就以第一种为主吧,将发送消息的代码逻辑注释起来,然后再次在Postman发起请求,此时会发现数据库表插入了一条 status=0 的消息,如下图所示:


与此同时,我们的定时器因为cron设定时间为 10s 来一次轮询,因此稍等片刻后,会发现该条消息重新被拉取出来重新进入投递,最终成功走到了最后了,如下图所示:


 


3)消息确认消费失败进入重试的情况

A.这种情况只有在“监听消费处理消息的过程出现异常” 才能触发“消息重入队列”机制,因此,我们只需要在真正处理消息的业务逻辑:“发送邮件”中嵌入一段 int i=1/0 的代码即可,如下图所示:



B.而我们都知道,在执行这段代码之后,sendSimpleEmail() 势必会抛出异常,之后会更新数据库表中该消息的状态,即“消费失败”,之后便会触发外层MqListenercomsumeMailV2方法的catch逻辑,如下所示:   

catch (Exception e){
e.printStackTrace();

//出现异常时可以走重试机制
if (tryCount<Constant.MsgTryCount){
Thread.sleep(8000);

channel.basicNack(deliverTag,false,true);
msgLogMapper.updateMaxTry(msgId);
return;
}
}


C.之后,消息便会“重入队列”,当然啦,每次重入队列时,时间间隔是8s(这个按照实际情况设定即可,这种写法还是比较粗糙~), 数据库表该消息的“最大重试次数”将会加1,直到超过 最大的重试次数(这里是 3次),便不会再进入队列了,而是会从队列中永久删除;于此同时,IDEA控制台会报出相应的异常信息,最终数据库表中该条消息的try_count也达到了最大重试次数:3次,如下图所示:




四、总结

至此,我们已经完成了整个业务流程的代码实战以及测试了,总结的话就不说了,在本文的一开始我们就已经给出了答案了,简而言之就是:发送和消费需要做确认;队列、交换机和消息需要做持久化;允许消息投递失败时定时任务重新投递;允许消息消费失败时重入队列进行消费


最后想必是大伙儿关心的源码下载了,关注下方微信公众号:“程序员实战基地”,回复:mq实战,即可下载该源码;如果需要交流技术,可以加下方debug的微信:debug0868