0%

RabbitMQ-消息队列学习笔记①

前言

这个是个人学习RabbitMQ消息队列时所总结的一部分学习笔记,关于RabbitMQ的更深入内容后续学习继续补充;

1. 消息队列概述

1.1 MQ概念

MQ(Message Queue)本质上是一个FIFO的队列,只不过队列中存放的内容是消息,用于上下游传递消息,用于上下有“逻辑解耦+物理解耦”的消息通信服务,使用MQ之后消息发送上有只需要依赖MQ而不再需要依赖其他服务;

1.2 MQ作用

1.2.1 流量消峰

以订单系统为例,加入某订单系统最多能处理一万次订单,在正常时段处理能力足够,但是在高峰期就会有订单超过一万后不允许继续下单的限制,当使用MQ做缓冲时,先把请求发送到消息队列而不是直接发送到订单系统,消息队列把短时间集中的订单分散到一段时间来处理达到流量消峰的目的,虽然相比平时下单返回的结果时间会变慢,但是不至于订单系统崩溃

1.2.2 应用解耦

以电商系统为例,订单系统如果调用物流系统和支付系统,任何一个子系统出故障都会导致下单操作异常,当转变为基于MQ的方式后,可以减少系统间调用的问题,比如物流系统发生故障需要几分钟来修复,在这几分钟的时间里,订单系统发送的需要物流系统处理的消息先被缓存在消息队列中,用户的下单操作可以正常完成,当物流系统恢复后,物流系统继续处理订单消息即可,中途用户感受不到物流系统的故障

1.2.3 异步处理

以服务调用为例,A调用B服务后,只需要监听B处理完成的消息,当B处理完成后发送一条消息给MQ,MQ会将这消息转发给A,这样A既不用循环调用B的查询api,又能去处理其他业务的同时及时得到B异步处理完成的消息;

1.3 MQ核心概念与名词

1.3.1 RabbitMQ概念

RabbitMQ是一个消息中间件,用于接收、存储和转发消息数据;

1.3.2 四个核心概念

生产者:产生数据发送消息的程序;

消费者:等待接收消息的程序;(同一个应用程序既可以是生产者也可以是消费者)

交换机:RabbitMQ的接收和分发消息的组件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中,由交换机类型决定如何处理它接收到的消息;

队列:RabbirMQ内部使用的数据结构,消息只能存储在队列中,本质上是一个消息缓冲区;一个交换机可以与多个队列建立绑定关系,通常一个队列对应一个消费者,当一个队列对应多个消费者时也只会有其中一个消费者能收到消息;

2. 生产消费模型

2.1 简单队列模式

生产者代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//记得先引入rabbitmq依赖客户端amap-client、操作文件流依赖commons-io

//0.队列名称
public static final String QUEUE_NAME = "自定义队列名"

//1.创建连接工厂:
ConnectionFactory factory = new ConnectionFactory( );

//2.工厂IP连接RabbitMQ队列
factory.setHost(主机IP地址);

//3.用户名及密码
factory.setUsername(用户名);
factory.setPassword(密码);

//4.创建连接
Connection connection = factory.newConnection( );

//5.获取信道(一个连接中可以有多个信道)
Channel channel = connnection.createChannel( );

//6.生成队列
channel.quereDeclare(发送的队列名, 队列是否持久化, 是否只供一个消费者进行消费和进行消息共享, 断开连接后是否自动删除队列, 其他参数);

//7.发送消息
channel.basicPublish(发送到哪个交换机, 路由的Key值, 其它参数信息, 发送消息的消息体);

消费者代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//0.队列名称
public static final String QUEUE_NAME = "自定义队列名"

//1.创建连接工厂:
ConnectionFactory factory = new ConnectionFactory( );

//2.工厂IP连接RabbitMQ队列
factory.setHost(主机IP地址);

//3.用户名及密码
factory.setUsername(用户名);
factory.setPassword(密码);

//4.创建连接
Connection connection = factory.newConnection( );

//5.获取信道
Channel channel = connnection.createChannel( );

//6.接收消息
channel.basicConsume(消费的队列名, 消费成功后是否自动应答, 消费者未成功消费的回调, 消费者取消消费的回调);

2.2 工作队列模式

含义:多个工作线程作为消费者去接收消息,但要注意的是,一个消息只能被处理一次不能被处理多次,因此采用轮询分发消息的模式(即公平分发,与之对应的是不公平分发),不同的工作线程之间是竞争关系;

2.2.1 消息应答

概念:消费者在接收到消息并且处理该消息之后,发送ACK确认告诉RabbitMQ已经处理,RabbitMQ才可以把消息删除;(引入消息应答的目的是保证消息在发送过程中不丢失

自动应答模式(默认采用)

  • 消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡
  • 没有对传输的消息数量进行限制,可能导致消费者处消息积压(建议这种模式使用在消费者可以高效并以某种速率能够处理这些消息的情况下);

手动应答模式:可以批量应答并且减少网络拥堵;

手动应答的方法如下:

  1. channel.basicAck (用于肯定确认)
  2. channel.basicNack (用于否定确认)
  3. channel.basicReject (用于否定确认,不处理该消息直接拒绝),与basicNack相比没有批量处理参数,只应答当前消息

消息自动重新入队:消费者由于某些原因失去连接导致消息未发送ACK确认,RabbitMQ将了解到消息未完全处理,并将其重新排队;如果此时其他消费者可以处理,它将该消息重新发给另一个消费者处理,确保消息不会丢失;(开启手动应答才会使用)

2.3 持久化

2.3.1 队列持久化

默认创建的队列是非持久化的,RabbitMQ重启该队列就会被删除,实现队列持久化需要在声明队列前把durable参数设置为持久化;

1
2
//声明参数durable为true队列持久化
channel.quereDeclare(发送的队列名, true, 是否只供一个消费者进行消费和进行消息共享, 断开连接后是否自动删除队列, 其他参数);

2.3.2 消息持久化

1
2
//在发送消息时添加MessageProperties.PERSISTENT_TEXT_PLAIN属性即可把消息保存在磁盘上,实现消息持久化
channel.basicPublish(发送到哪个交换机, 路由的Key值, MessageProperties.PERSISTENT_TEXT_PLAIN, 发送消息的消息体);

2.3.3 不公平分发

1
channel.basicQos(int prefetchCount = 1)

注意:prefetchCount设置为0是轮询分发,设置为1就是不公平分发;将prefetchCount设置成其他数字可以设置为预取值;

2.4 发布确认模式

发布确认模式可分为以下几种:单个发布确认、批量发布确认、异步批量确认;

  • 单个发布确认属于一种同步发布确认模式,特点是收到单个消息就马上进行一次发布确认
  • 批量发布确认特点是批量发送指定数量的消息后才进行一次发布确认,但消息出现丢失时无法确定具体哪个消息未被确认
  • 异步批量确认属于一种异步发布确认模式,特点是无论是否收到都进行确认应答,确认收到时回调ackCallback,未确认收到回调nackCallback

3. 交换机模型

3.1 绑定

绑定(blinding)即声明交换机和具体哪个队列之间建立关系;

3.2 扇出交换机(发布订阅模式)

扇出(Fanout)是交换机类型之一,即将接收到的所有消息广播到它知道的所有队列中;

扇出交换机消费者代码如下:

1
2
3
4
5
6
7
8
//声明一个交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//声明一个临时队列,当消费者断开与队列连接时,队列就会自动删除
channel.queueDeclare().getQueue();
//绑定交换机和队列
channel.queueBind(queueName, EXCHANGE_NAME, "routingKey");

channel.basicConsume();

3.3 直接(Direct)交换机(路由模式)

3.4 主题(Topic)交换机

4. 死信队列

概念:死信即无法被消费的信息,死信队列即存放这些消息的队列;

应用场景:为了保证订单业务的消息数据不丢失,当消息消费发生异常时就将消息投入到死信队列中,后续恢复时该消息可以继续消费(如用户在商城下单成功并点击去支付后,在指定的时间未支付时自动失效)

4.1 死信来源

死信的来源主要有三个:

  1. 消息TTL过期
  2. 队列达到最大长度,无法再添加数据到MQ中
  3. 消息被拒绝

4.2 延迟队列

概念:延迟队列即死信来源为消息TTL过期的情况,是用来存放需要在指定时间被处理的元素的队列