RabbitMQ避免消息丢失的方法主要是利用消息确认机制和手动签收机制,所以有必要把这两个概念搞清楚。
1、消息确认机制主要是生产者使用的机制,用来确认消息是否被成功消费。
配置如下:
spring: rabbitmq: address: 192.168.x.x:xxxx virtual-host: / username: guest password: guest connection-timeout: 5000 publisher-confirms: true # 消息成功确认 publisher-returns: true # 消息失败确认 template: mandatory: true # 手动签收机制这样,当你实现RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback这两个接口的方法后,就可以针对性地进行消息确认的日志记录,之后做进一步的消息发送补偿,以达到接近100%投递的目的。
伪代码如下:
@Component@Slf4jpublic class RabbitMQSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { /** * 发送消息 */ public void sendOrder(Order order) { rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this); // 发送消息 rabbitTemplate.convertAndSend(xx, xx, order, xx); } /** * 成功接收后的回调 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String s) { // 如果成功接收了,这里可以对日志表的消息收发状态做更新。 // .... } /** * 失败后的回调 */ @Override public void returnedMessage(Message message, int i, String s, String s1, String s2) { // 如果失败了,这里可以对日志表的消息收发状态做更新,之后通过任务调度去补偿发送。 // .... }}2、消息签收机制RabbitMQ的消息是自动签收的,你可以理解为快递签收了,那么这个快递的状态就从发送变为已签收,唯一的区别是快递公司会对物流轨迹有记录,而MQ签收后就从队列中删除了。
企业级开发中,RabbitMQ我们基本都开启手动签收方式,这样可以有效避免消息的丢失。
前文中已经在生产者开启了手动签收机制,那么作为消费方,也要设置手动签收。
配置如下:
spring: rabbitmq: address: 192.168.x.x:xxxx virtual-host: / username: guest password: guest connection-timeout: 5000 listener: simple: concurrency: 5 # 并发数量 max-concurrency: 10 # 最大并发数量 acknowledge-mode: manual # 开启手动签收 prefetch: 1 # 限制每次只消费一个(一个线程),上面配置5,也就是能一次接收5个消费监听时,手动签收就一行代码,伪代码如下:
@RabbitListener(xxx)public void onOrderMessage(@Payload Order order, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception { // .... // 手动签收 channel.basicAck(tag, false); }消息丢失两个概念搞清楚后,就可以来学习消息丢失的问题和处理方案了。
1、出现原因消息丢失的原因无非有三种:
1)、消息发出后,中途网络故障,服务器没收到;
2)、消息发出后,服务器收到了,还没持久化,服务器宕机;
3)、消息发出后,服务器收到了,消费方还未处理业务逻辑,服务却挂掉了,而消息也自动签收,等于啥也没干。
这三种情况,(1) 和 (2)是由于生产方未开启消息确认机制导致,(3)是由于消费方未开启手动签收机制导致。
2、解决方案1)、生产方发送消息时,要try...catch,在catch中捕获异常,并将MQ发送的关键内容记录到日志表中,日志表中要有消息发送状态,若发送失败,由定时任务定期扫描重发并更新状态;
2)、生产方publisher必须要加入确认回调机制,确认成功发送并签收的消息,如果进入失败回调方法,就修改数据库消息的状态,等待定时任务重发;
3)、消费方要开启手动签收ACK机制,消费成功才将消息移除,失败或因异常情况而尚未处理,就重新入队。
其实这就是前面阐述两个概念时已经讲过的内容,也是接近100%消息投递的企业级方案之一,主要目的就是为了解决消息丢失的问题。
消息重复1、出现原因消息重复大体上有两种情况会出现:
1)、消息消费成功,事务已提交,签收时结果服务器宕机或网络原因导致签收失败,消息状态会由unack转变为ready,重新发送给其他消费方;
2)、消息消费失败,由于retry重试机制,重新入队又将消息发送出去。
2、解决方案网上大体上能搜罗到的方法有三种:
1)、消费方业务接口做好幂等;
2)、消息日志表保存MQ发送时的唯一消息ID,消费方可以根据这个唯一ID进行判断避免消息重复;
3)、消费方的Message对象有个getRedelivered()方法返回Boolean,为TRUE就表示重复发送过来的。
我这里只推荐第一种,业务方法幂等这是最直接有效的方式,(2)还要和数据库产生交互,(3)有可能导致第一次消费失败但第二次消费成功的情况被砍掉。
消息积压1、出现原因消息积压出现的场景一般有两种:
1)、消费方的服务挂掉,导致一直无法消费消息;
2)、消费方的服务节点太少,导致消费能力不足,从而出现积压,这种情况极可能就是生产方的流量过大导致。
2、解决方案1)、既然消费能力不足,那就扩展