Java秒杀系统(十七):秒杀逻辑优化之RabbitMQ接口限流一

作者: 修罗debug
版权声明:本文为博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。



摘要:本篇博文是“Java秒杀系统实战系列文章”的第十七篇,我们将继续秒杀系统的优化之路。在本篇文章中我们将基于RabbitMQ异步通信、FIFO(先进先出)、接口限流的特性,在执行秒杀核心的处理逻辑之前架上一层“限流”的处理逻辑,从而让瞬时产生的,犹如波涛汹涌、潮水般的请求流量变得井井有条、有序性地到达后端的秒杀接口! 

内容:在前面的篇章中,我们主要是从秒杀的核心处理逻辑着手进行优化,先后从数据库级别Sql的优化、分布式锁的引入、分布式唯一ID算法的引入以及业务服务模块的异步通信、服务解耦等方式对秒杀核心业务逻辑的处理进行了大幅度的调整、优化,本文Debug将介绍一种独立于“秒杀核心业务逻辑”的方式对秒杀系统进行优化。 

首先,我们先来回顾一下秒杀系统整体的秒杀业务逻辑的处理:


当前端高并发产生多线程请求时,正常情况下,前端会在瞬时产生犹如波涛汹涌、潮水般的请求流量到达后端的秒杀接口,此时如果我们的代码处理逻辑并不那么迅速,那么很有可能将应对不了这股蜂拥而至的高并发流量,最终可能出现各种各样乱七八糟的问题,前面所讲的“库存超卖”,“重复秒杀”便是一种体现。

而且,在某种程度上,这些请求流量对于我们来讲有两点我们需要去注意的:

(1) 这些请求流量是透明的,我们后端接口压根不知道、也不需要知道请求对应的用户是哪位;

(2) 这些请求最终并非全部都是有效的,即如果待秒杀的商品数量为N,而请求的流量远远大于N,则很明显,在秒杀开始后将有很大一部分的请求流量对于我们后端接口而言是木有多大用处的,因为秒杀进行到一定时间时,N很有可能已经等于0了。

基于这两点设想,我们希望对这些请求流量进行“规范化”,当前端高并发产生多线程请求流量时,我们希望将这些请求压入“队列”,使得这些请求可以“乖乖的”等待被处理,即变得井井有条、有序性地到达后端的秒杀接口,而不是像无头苍蝇般、一窝蜂的插队处理! 

众所周知,RabbitMQ是一款MQ中间件,MQ正是Message Queue,即消息队列的简称,而我们都知道队列的特点是先进先出,即FIFO;它可以实现先进入队列的消息先被消费处理、后进入队列的消息后被消费处理,因此,RabbitMQ的这一特性将可以助我们实现“接口限流”的作用,其最终的效果如下图所示:   


下面我们就进入代码实战环节:

(1) 首先,我们需要在RabbitmqConfig配置类中创建限流用的队列、交换机和路由消息模型,其代码如下所示:

//TODO:RabbitMQ限流专用

@Bean
public Queue executeLimitQueue(){
Map<String, Object> argsMap=Maps.newHashMap();
//限制channel中队列同一时刻通过的消息数量
argsMap.put("x-max-length", env.getProperty("spring.rabbitmq.listener.simple.prefetch",Integer.class));
return new Queue(env.getProperty("mq.kill.item.execute.limit.queue.name"),true,false,false,argsMap);
}

@Bean
public TopicExchange executeLimitExchange(){
return new TopicExchange(env.getProperty("mq.kill.item.execute.limit.queue.exchange"),true,false);
}

@Bean
public Binding executeLimitBinding(){
return BindingBuilder.bind(executeLimitQueue()).to(executeLimitExchange()).with(env.getProperty("mq.kill.item.execute.limit.queue.routing.key"));
}

其中,读取环境变量的对象实例读取的配置参数是配置为配置文件application.properties中的,如下所示:   

#RabbitMQ限流专用
mq.kill.item.execute.limit.queue.name=${mq.env}.kill.item.execute.limit.queue
mq.kill.item.execute.limit.queue.exchange=${mq.env}.kill.item.execute.limit.exchange
mq.kill.item.execute.limit.queue.routing.key=${mq.env}.kill.item.execute.limit.routing.key

(2) 接着,我们需要在RabbitSenderService服务类中开发一个用于转移巨大用户流量的、将请求流量或者消息发送至RabbitMQ的功能方法,其完整源代码如下所示:  

//秒杀时异步发送Mq消息
public void sendKillExecuteMqMsg(final KillDto killDto){
try {
if (killDto!=null){
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.setExchange(env.getProperty("mq.kill.item.execute.limit.queue.exchange"));
rabbitTemplate.setRoutingKey(env.getProperty("mq.kill.item.execute.limit.queue.routing.key"));
rabbitTemplate.convertAndSend(killDto, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
MessageProperties mp=message.getMessageProperties();
mp.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
mp.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME,KillDto.class);
return message;
}
});
}
}catch (Exception e){
log.error("秒杀时异步发送Mq消息-发生异常,消息为:{}",killDto,e.fillInStackTrace());
}
}

(3) 转移消息进入队列之后,消息将变得井井有序、规范化地等待被监听,消费处理,其处理逻辑即为“秒杀接口的核心处理逻辑”,完整源代码于RabbitReceiverService服务类中,如下所示:  

//秒杀时异步接收Mq消息-监听者
@RabbitListener(queues = {"${mq.kill.item.execute.limit.queue.name}"},containerFactory = "multiListenerContainer")
public void consumeKillExecuteMqMsg(KillDto dto){
try {
if (dto!=null){
//采用任何一种加分布锁的处理方法都是可行的 killItemV5也行
killService.killItemV4(dto.getKillId(),dto.getUserId());
}
}catch (Exception e){
log.error("用户秒杀成功后超时未支付-监听者-发生异常:",e.fillInStackTrace());
}
}

RabbitMQ限流、转移以及在接收处理层面的代码开发已经完成了,下篇文章我们将将其整合至秒杀的业务逻辑当中!  

补充:

1、目前,这一秒杀系统的整体构建与代码实战已经全部完成了,该秒杀系统对应的视频教程的链接地址为:https://www.fightjava.com/web/index/course/detail/6,可以点击链接进行试看以及学习,实战期间有任何问题都可以留言或者与Debug联系、交流!

2、另外,Debug也开源了该秒杀系统对应的完整的源代码以及数据库,其地址可以来这里下载:https://gitee.com/steadyjack/SpringBoot-SecondKill 记得Fork跟Star啊!!!

3、最后,不要忘记了关注一下Debug的技术微信公众号: