熬夜整理的RabbitMQ知识点相当齐全的文章

作者: 修罗debug
版权声明:本文为博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。



在如今微服务、分布式时代,不懂一点消息队列、服务解耦、异步通信,都不好意思说自己做过Java高并发、分布式项目了;趁着放假,debug熬夜整理了消息中间件RabbitMQ相关的知识点,对于没撸过这一技术栈的小伙伴而言可以说是福利了,而对于已经撸过的小伙伴而言不妨再过一遍,毕竟温故而知新嘛!


以下为本文的目录,先一睹为快:

  • 一、基本概念
  • 二、RabbitMQ底层架构
  • 三、如何在Spring Boot项目中使用
  • 四、福利奉上!


话不多说,咱们直接进入正题!

一、简介

1.RabbitMQ,一款由Erlang语言开发的、基于AMQP(高级消息队列协议)实现的开源消息代理软件,俗称“消息中间件”;

2.那何谓“AMQP”呢,解释为中文:高级消息队列协议,是应用层协议的一个开放标准,专门面向消息而设计的,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制;

3.据说这RabbitMQ最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现还挺不俗,具体特点有:

(1)可靠性:RabbitMQ使用一些机制来保证可靠性,如持久化、消费确认、发布确认;

(2)灵活的路由:在消息进入队列之前,通过交换器Exchange 来路由消息的;对于典型的路由功能,RabbitMQ 已经提供了一些内置Exchange来实现;针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange

(3)消息集群:多个RabbitMQ 服务器可以组成一个集群,形成一个逻辑上的Broker(消息代理服务器);

(4)高可用:队列可以在集群中的机器节点进行镜像备份,当部分节点出问题时队列仍然可用;

(5)多种协议支持;RabbitMQ 支持多种消息队列协议,比如 STOMPMQTT等等;

(6)多语言客户端:RabbitMQ 几乎支持所有常用的开发语言,比如 Java.NETRuby 等;

(7)Web控制台管理界面:RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息代理服务器 Broker 中的方方面面(包括队列、交换器、路由、消息消费情况、队列绑定情况、消费者实例情况、使用账户等等)

(8)跟踪机制:RabbitMQ 提供了消息跟踪机制,在消息传输期间出现异常时使用者可以找出到底发生了什么事!

(9)插件机制:RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件,比如RabbitMQ自带的“延迟队列插件”就相当好用!


二、RabbitMQ底层架构与基本概念

1.如下图所示为RabbitMQ底层的架构图,消息在底层传输期间几乎经历了其中所涉及的每个组件!


2.下面介绍下这些组件的具体含义:

1Message:消息,就像万物皆对象一样,万物皆可充当消息!因此消息可以理解为任意的事物、任意的数据;它由消息头Header和消息体Body组成;

其中消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括 routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等;

2Publisher:消息的生产者,也是一个向交换器发布消息的客户端应用程序;

3Exchange:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列,简而言之:即将消息路由给队列;

4Binding:绑定,用于将消息队列绑定、关联到指定的交换器;即用于消息队列和交换器之间的关联;那如何将其关联、绑定在一起呢,通过路由键RoutingKey实现;

即通过 路由键RoutingKey 将 交换器Exchange 和 消息队列 Queue 连接起来的,从另外一个角度上看,也可以将交换器Exchange理解成一个由绑定构成的路由表。

5Queue:消息队列,用于保存消息直到发送给消费者;可以理解为它是消息的容器;一个消息可投入一个或多个队列;消息一直在队列里面,直到消费者连接到这个队列并将其取走 这个 消息才算真正走到了最后!

6Connection:网络连接,比如一个 TCP 连接;

7Channel:信道,多路复用连接中的一条独立的双向数据流通道;信道是建立在真实TCP 连接之上的虚拟连接;AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成,因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接

8Consumer:消息的消费者,表示一个从消息队列中取得消息的客户端应用程序;

9Virtual Host:虚拟主机,表示一批交换器、消息队列和相关对象,虚拟主机是共享相同的身份认证和加密环境的独立服务器域;

10Broker:表示消息队列服务器实体;


3. 交换器Exchange的类型: 交换器分发消息时根据类型的不同,分发策略也有所不同,目前共四种类型:directfanouttopicheaders headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接看另外三种类型:

1)基于Direct类型交换器的传输模式:消息中的路由键(routing key)如果和 Binding 中的 binding key 一致,交换器就将消息发到对应的队列中,它是完全匹配、单播的模式;如下图所示:



2)基于Fanout类型交换器的广播分发模式:每个分发到 Fanout类型交换器的消息都会分到所有绑定的队列上去,很像子网广播,每台子网内的主机都获得了一份复制的消息,Fanout 类型转发消息是最快的; 



3)基于Topic类型交换器的模式匹配分发模式:Topic交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上,它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开,它同样也会识别两个通配符:

   符号“#”和符号“*”,其中 # 匹配 0 个或多个单词,而 * 匹配不多不少一个单词,形象如下图:


三、如何在Spring Boot项目中使用

  说一千道一百,最终还是得将其用到实际的项目并用实际的开发语言将它用起来,下面将基于Java Spring Boot介绍下如何在Spring Boot搭建的项目使用RabbitMQ,其实,很简单,只要把握住上面RabbitMQ的系统架构图即可!

1)消息(在Java中就是一实体类):

@Data
@AllArgsConstructor
@NoArgsConstructor
public class BlogIndexMsg implements Serializable{
private Integer type;
private Integer blogId;
}


2)生产者(发送消息):

@Component
@Slf4j
public class BlogMqService {
@Autowired
private RabbitTemplate rabbitTemplate;

@Autowired
private Environment env;

@Autowired
private ObjectMapper objectMapper;

//用于首页微博/朋友圈的相关操作对应的mq服务
public void sendIndexBlogMsg(final BlogIndexMsg msg){
try {
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.setExchange(env.getProperty("mq. exchange.name"));
rabbitTemplate.setRoutingKey(env.getProperty("mq. routing.key.name"));
Message message= MessageBuilder.withBody(objectMapper.writeValueAsBytes(msg))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build();
rabbitTemplate.convertAndSend(message);
}catch (Exception e){
log.error("用于首页微博/朋友圈的相关操作对应的mq服务-发生异常:{} ",msg,e);
}
}
}


3)消费者:

@Component
@Slf4j
public class BlogMqListener {

@Autowired
private ObjectMapper objectMapper;

//消息监听器
@RabbitListener(queues = {"${mq. queue.name}"})
public void consumeBlogIndexMsg(@Payload Message msg){
try {
BlogIndexMsg indexMsg=objectMapper.readValue(msg.getBody(),BlogIndexMsg.class);
log.info("消息监听器监听到的消息:{}",indexMsg);

//执行具体的业务逻辑代码

}catch (Exception e){
log.error("消息监听器-发生异常:",e);
}
}
}


4)消息队列与交换器的绑定(即消息模型的构建):

@Configuration
public class RabbitConfig {

private static final Logger log= LoggerFactory.getLogger(RabbitConfig.class);

@Autowired
private Environment env;

//构建消息模型(消息绑定)
@Bean
public Queue blogIndexQueue(){
return new Queue(env.getProperty("mq.queue.name"),true);
}

@Bean
public TopicExchange blogIndexExchange(){
return new TopicExchange(env.getProperty("mq.exchange.name"),true,false);
}

@Bean
public Binding blogIndexBinding(){
return BindingBuilder.bind(blogIndexQueue()).to(blogIndexExchange()).with(env.getProperty("mq. routing.key.name"));
}
}


然后在代码的某个地方调用 new BlogMqService().sendIndexBlogMsg()即可!!!


四、福利奉上!

关于RabbitMQ更多详细的介绍,诸位可以观看debug录制的 “RabbitMQ实战视频教程”,地址如下所示:https://www.fightjava.com/web/index/course/detail/4


在那里不仅仅介绍了本文所提及的所有知识点,还包括了:消费确认模式、高并发下消息限流、服务解耦、异步通信、死信队列等技术干货,课程目录如下图所示:


总结:

好了,本文就介绍到这里吧,快过春节了,提前祝大家新春快乐,身体健康,来年撸码更硬!!!

如果本文对你有帮助,请关注公众号,并动动手指收藏、点赞、以及转发哦!!!