在工作中使用Rabbitmq传输数据时,可能会因为数据、网络等问题,导致数据发送或者接收失败;
如果对此类问题没有做好处理,就会存在丢失数据的问题,为此,引入了ConfirmCallback与ReturnCallback,来保证系统能够做到更好的数据监听、以及消费失败的数据做好相应的补偿;
ConfirmCallback与ReturnCallback也被称为Rabbitmq的消息确认机制;
首先,下面为消息从生产者 ——> 消费者的流程图:
不过如果应用到生产环境中会出现两个问题:
生产者发出的消息可能因为种种原因,并没有发送到交换器,而生产者却不知道;
交换器接收到的消息,并没有发送到队列中,而生产者却不知道;
为了解决以上两个问题,系统引入了ConfirmCallback与ReturnCallback:
也就是说,前者是为了监听消息是否到达了Exchange,后者是为了监听消息是否到达了队列,如果这两个步骤遇到了问题,则生产者也好做出相应处理(例如:消息补偿,不过这并不是本篇的重点);
如果消息在消费端消费失败了怎么办?
失败就失败了,在实际场景中,数据库是需要为发送成功的消息做标记的,如果消息没有做标记(消费失败),则会采用定时任务重新发送,不过会涉及到幂等性的问题,这里会另起一篇文章:基于RabbitMQ实现最终一致性解决方案,在此不再赘述;
@PostConstruct
public void init() {
//消息未送达队列触发回调
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.error("消息发送失败,未送达队列,message:{},replyCode:{},replyText:{},exchange:{},exchange:{}", JSON.toJSONString(message), replyCode, replyText, exchange, routingKey);
MqMsg msg = JSON.parseObject(new String(message.getBody()), MqMsg.class);
// 更新数据库 设置消息的状态为发送失败
});
//消息进入到Exchange触发回调
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
String id = Objects.requireNonNull(correlationData.getId());
if (!ack) {
log.error("消息未发送成功,返回信息:{}", cause);
//设置消息的状态为发送失败
} else {
// 更新数据库 设置消息的状态为发送成功
}
});
}
@ResponseBody
@GetMapping("/send")
public String send() {
UserVo userVo = new UserVo();
//组装消息内容
MessageProperties properties = new MessageProperties();
//消息唯一ID,用力防止幂等性
properties.setMessageId(userVo.getId().toString());
Message message = new Message(JSON.toJSONString(userVo).getBytes(StandardCharsets.UTF_8), properties);
// 发送消息时,需要根据业务设置唯一id,发送方确认时,还需要使用唯一id去修改数据状态
rabbitTemplate.convertAndSend("demoExchange", "demoRoutingKey", message);
return "发送成功";
}
@Slf4j
@Component
@RabbitListener(queues = "demo_data_queue")
public class HelloReceiver {
int status = 0;
@RabbitHandler
public void process(JSONObject jsonObject, Channel channel, Message message) throws Exception {
// 单条消息的大小限制,一般设为0或不设置,不限制大小
int prefecthSize = 0;
// 不要同时给消费端推送n条消息,一旦有n个消息还没ack,则该consumer将block掉,直到有ack 注意在自动应答下不生效
int prefecthCount = 1;
// 表示是否应用于channel上,即是channel级别还是consumer级别
boolean global = false;
channel.basicQos(prefecthSize,prefecthCount,global);
log.info("收到消息:{}", jsonObject);
Thread.sleep(10000);
try {
log.info("message:{}", message.getMessageProperties().getDeliveryTag());
} catch (Exception e) {
status = 1;
e.printStackTrace();
log.info("message:{}", message.getMessageProperties().getDeliveryTag());
} finally {
// 在这里执行成功或失败
if (status == 0) {
//成功消费消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} else if (status == 1) {
//丢弃这条消息,如果最后一个参数设置为true的话,消息将重回队列末尾,重复消费
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
}
}
}