xiaoheiAh's blog
RabbitMQ在保证生产端与消费端的数据安全上,提供了消息确认的机制来保证. 消费端到 broker
端的确认常叫做ack机制
, broker
到生产端常叫做confirm
.
Delivery Tag
是 RabbitMQ
来确认消息如何发送的标志. Consumer
在注册到 RabbitMQ
上后, RabbitMQ
通过 basic.deliver
方法向消费者推送消息, 这个方法中就带着可以在 Channel
中唯一识别消息的 delivery tag
. Delivery Tag
是channel
隔离的.
tag
是一个大于零的增长的整型, 客户端在确认消息时将其当做参数传回来就可以保证是同一条消息的确认了.
tag
是channel
隔离的, 所以必须在接受消息的channel
上确认消息收到,否则会抛 unknown delivery tag
的异常.
最大值: delivery tag
是 64 位的long
,最大值是 9223372036854775807
. tag
是channel
隔离的,理论上来说是不会超过这个值的.
消息确认有两种模式: 自动/手动.
自动模式会在消息一经发出就自动确认.这是在吞吐量和 可靠投递之间的权衡.如果在发送的过程中, TCP断掉了或是其他的问题,那消息就会丢掉了,这个问题需要考虑.还需要考虑的一个问题是: Consumer
消费速率如果不能跟上broker
的发送速率, 会导致Consumer
过载(消息堆积,内存耗尽),而在手动模式中可以通过prefetch
来控制消费端的速率.有些客户端会提供TCP的背压,不能处理时,就丢弃了.
手动模式需要Consumer
端在收到消息后调用:
basic.ack
: 消息处理好了,可以丢掉了basic.nack
: 可以批量reject
, 如果Consumer
设置了requeue
,消息还会重新回到broker
的队列中basic.reject
: 消息没有处理但是也需要删除由于消息的发送和接收是独立的且完全异步,消息的手动确认也是完全异步的.所以这里有一个未确认消息的滑动窗口.在消费端我们经常需要控制接收消息的数量,防止出现消息缓存buffer
越界的问题.此时我们就可以通过basic.qos
来设置prefetch count
, 该值定义了一个Channel
中能存放的消息条数上限,超过这个值,RabbitMQ
在收到至少一条ack
之前都不能再往Channel
上发送消息了.
这里需要注意前面说的滑动窗口: 意味着当Channel
满的时候,不会再往Channel
上发消息,但是当你ack
了一条,就会往Channel
上发一条,ack
了N条,就会发N条到Channel
上.
basic.get
设置prefetch
是无效的,请使用basic.consume
确认机制的选择和Prefetch
的值决定了消费端的吞吐量.一般来讲,增大Prefetch
值以及 自动确认 会提升推送消息的速率,但也会增加待处理消息的堆积,消费端内存压力也会上升.
如果Prefetch
无界,Consumer
在消费大量消息时没有ack
会导致消费端连接的那个节点内存压力上升.所以找到一个完美的Prefetch
值还是很重要的. 一般 100-300 左右吞吐量还不错,且消费端压力不大. 设置为 1 时,就很保守了,这种情况下吞吐量就很低,延迟较高.
网络有很多种失败的方式,并且需要花时间检测.所以客户端并不能保证消息可以正常的发送到broker
,正常的被处理.有可能丢了也有可能有延迟.
根据AMQP-0-9-1
, 只有通过 事务 的方式来保证.将Channel
设置为事务型的,每条消息都以事务形式推送提交.但是,事务是很重,会降低吞吐量,所以RabbitMQ
就换了种方式来实现: 通过模仿已有的Consumer
端的确认机制.
启用Confirm
,客户端调用confirm.select
即可.Broker
会返回confirm.select-ok
,取决于是否有no-wait
设置. Channel
如果设置了confirm.select
,说明处于confirm
模式,此时是不能设置为事务型Channel
,两者不可互通.
Broker
的应答机制同Consumer
一致,通过basic.ack
即可,也可批量ack
.
在某些情况下,broker
无法再接收消息,就会向发布端回执basic.nack
,意味着消息会被丢弃,发布端需要重新发布这些消息.当Channel
置为Confirm
模式后,后面收到的消息都将会confirm
或者nack
一次. 需要注意的几点:
无法路由的消息: 当确认消息不会被路由时, broker
会立即发出confirm
. 如果消息设置了强制(mandatory
)发送,basic.return
会在basic.ack
之前回执. nack
逻辑一致.
可路由的消息: 所有queue
接受了消息时返回basic.ack
,如果队列是持久化的,意味着持久化完成后才发出.对镜像队列(Mirrored Queues)
,意味着所有镜像都收到后发出.
RabbitMQ
的持久化通常是批量的,需要间隔几百毫秒来减少 fsync(2)
的调用次数或者等待 queue
是空闲状态的时候,这意味着,每一次basic.ack
的延迟可能达到几百毫秒.为了提高吞吐量最好是将持久化做成异步的,或者使用批量publish
,这个需要参考客户端的api实现.
大多数情况下, RabbitMQ
会根据消息发送的顺序依次回执(要求消息发送在同一个channel
上).但确认回执都是异步的,并且可以确认一条,或一组消息.确切的confirm
发送时间取决于: 消息是否需要持久化,消息的路由方式.意味着不同的消息的确认时间是不同的.也就意味着返回确认的顺序并不一定相同.应用方不能将其作为一个依据.