众所周知,RabbitMQ在保证消息可靠投递的实现过程中有个参数mandatory
。该参数的作用是,当消息的mandatory设置为true时,消息投递到Exchange之后,如果Exchange无法将该消息路由到任何一个队列,那么该消息将返回给生产者。当设置为false,RabbitMQ将直接丢弃该消息。
定义一个Exchange,不绑定任何Queue,向该Exchange投递mandatory为true消息,那么肯定是路由失败并且需要返回生产者。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring.application.name=server-shop-sender
server.port=8600
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.listener.type=simple
spring.rabbitmq.listener.simple.default-requeue-rejected=false
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 当exchange无法找到任何一个合适的queue时,将消息return给生产者
spring.rabbitmq.template.mandatory=true
# 必须设置为true,否则消息消息路由失败也无法触发Return回调
spring.rabbitmq.publisher-returns=true
package com.shenmazong.servershopsender.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.stereotype.Component;
/**
* @author 军哥
* @version 1.0
* @description: 消息发送失败的回调函数
* @date 2021/8/4 15:17
*/
@Slf4j
@Component
public class RabbitTemplateEnhance implements BeanPostProcessor {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof RabbitTemplate) {
log.info("增强 RabbitTemplate");
RabbitTemplate rabbitTemplate = (RabbitTemplate) bean;
// return回调函数
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.error("消息被退回:{}", returnedMessage);
}
});
return rabbitTemplate;
}
return bean;
}
}
package com.shenmazong.servershopsender.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author 军哥
* @version 1.0
* @description: RabbitMQ Configuration
* @date 2021/8/4 15:30
*/
@Configuration
public class RabbitConfig {
public static final String EXCHANGE_NAME = "ack_exchage";
public static final String QUEUE_NAME = "ack_exchage";
/**
* @description 创建队列,并设置持久性
* @author 军哥
* @date 2021/8/4 15:43
* @version 1.0
*/
@Bean
public Queue ackQueue() {
return new Queue(QUEUE_NAME, true);
}
/**
* @description 创建交换机,并设置持久性
* @author 军哥
* @date 2021/8/4 15:45
* @version 1.0
*/
@Bean
public DirectExchange ackExchange() {
return new DirectExchange(EXCHANGE_NAME, true, false);
}
// @Bean
// public Binding ackBinding() {
// return BindingBuilder.bind(ackQueue()).to(ackExchange()).with("");
// }
}
package com.shenmazong.servershopsender.controller;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.shenmazong.servershopsender.pojo.TbUserInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;
import static com.shenmazong.servershopsender.config.RabbitConfig.EXCHANGE_NAME;
/**
* @author 军哥
* @version 1.0
* @description: TODO
* @date 2021/8/4 15:18
*/
@RestController
@Slf4j
public class IndexController {
@Autowired
RabbitTemplate rabbitTemplate;
@PostMapping(value = "/send")
public Object send(@RequestParam("msg") String msg) throws JsonProcessingException {
log.info("准备发送发送消息");
TbUserInfo tbUserInfo = new TbUserInfo();
tbUserInfo.setUserId(666);
tbUserInfo.setUserName("赵云");
tbUserInfo.setUserAge(18);
tbUserInfo.setNickName("赵子龙");
//--1 转换对象为JSON
ObjectMapper mapper = new ObjectMapper();
String user = mapper.writeValueAsString(tbUserInfo);
Message message = MessageBuilder
.withBody(user.getBytes(StandardCharsets.UTF_8))
.setContentEncoding(StandardCharsets.UTF_8.displayName())
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
.build();
rabbitTemplate.convertAndSend(EXCHANGE_NAME, "", message);
return "OK";
}
}
package com.shenmazong.demorabbitsend.listen;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import static com.shenmazong.demorabbitsend.config.RabbitConfig.WORK_QUEUE;
/**
* @author 军哥
* @version 1.0
* @description: 从队列接收消息
* @date 2021/8/6 11:50
*/
@Component
@Slf4j
public class WorkListener {
@RabbitListener(queues = EXCHANGE_NAME, ackMode = "MANUAL")
public void onMessage(Message message, Channel channel) throws IOException {
byte[] body = message.getBody();
String msg = new String(body);
log.info("onMessage=" + msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}