消息队列RabbitMQ
- 消息队列的作用
- 异步处理
- 应用解耦
- 流量控制
- 重要概念
- 消息代理 (message broker)
- 目的地 (destination)
- 队列 (queue) : 点对点消息通信
- 消息发送者发送消息消息代理将其放进一个队列中,消息接收者从队列中获取消息内容,消息读取后移除队列
- 消息只有一个唯一的发送者和接收者,但不是说只能有一个接收者
- 主题 (topic) : 发布/订阅 消息通信
- 发送者发送消息到主题,多个接收者监听主题,消息到达时接收者同时收到消息
- 队列 (queue) : 点对点消息通信
- 规范
- JMS(java messsage service) java 消息服务:基于jvm消息代理的规范
- AMQP(advanced message queuing protocol)高级消息队列协议,兼容JMS
RabbitMQ概念
- Messsage : 消息是不具名的,由消息头和消息体组成,消息体是不透明的,消息头由可选的属性组成
- Publisher: 消息的生产者,也就是向消息中间件发布消息的服务
- Exchange:交换器,用来接受生产者发送的消息并将这些消息路由给服务器中的队列
RabbitMQ消息发布模型
graph TD;
Publisher生产者-->Message头+体,route-key路由键,路由键指定将消息放入哪个存储队列;
Message头+体,route-key路由键,路由键指定将消息放入哪个存储队列-->Broker消息代理中的Exchange交换机;
Broker消息代理中的Exchange交换机-->绑定Queue消息存储队列;
RabbitMQ消息消费者模型
一个客户端只会建立一条连接
客户端通过Channel与Broker建立连接,一个Connection会有很多Channel
RabbitMQ安装
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.9-management
SpringBoot集成AMQP使用RabbitMQ
-
依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
-
将依赖导入后就自动配置了
RabbitConnectionFactoryBean //连接工厂 RabbitMessagingTemplate amqpAdmin rabbitTemplate
-
给配置文件中配置 host和port和虚拟host,并且使用注解
@EnableRabbit
标记主类spring: rabbitmq: host: 192.168.23.182 port: 5672 virtual-host: /
-
创建交换机
@Autowired AmqpAdmin amqpAdmin; @Test void contextLoads() { DirectExchange directExchange = new DirectExchange("hello-java-exchange", true, false); amqpAdmin.declareExchange(directExchange); log.info("exchange [{}] create success", "hello-java-exchange"); }
-
创建队列
@Test void createQueue() { Queue queue = new Queue("hello-java-queue", true, false, false); amqpAdmin.declareQueue(queue); log.info("create queue [{}]success", "hello-java-queue"); }
-
交换机和队列绑定:exchange指定的交换机与destination目的地进行绑定,使用 routingKey作为路由键
@Test void createBinding() { Binding binding = new Binding("hello-java-queue", Binding.DestinationType.QUEUE, "hello-java-exchange","hello.key",null); amqpAdmin.declareBinding(binding); log.info("交换机 {} 与队列 {} 绑定成功","hello-java-exchange","hello-java-queue"); }
-
向消息队列发送消息
@Autowired RabbitTemplate rabbitTemplate; /** * 测试发送消息 * 第一个参数为交换机名字 * 第二个参数为路由键 * 第三个参数为消息内容 */ @Test void sendMessageTest() { string = "hello world"; //发送对象 如果发送的消息是一个对象,会使用序列化机制 OrderReturnReasonEntity orderReturnReasonEntity = new OrderReturnReasonEntity(); orderReturnReasonEntity.setId(1L); orderReturnReasonEntity.setCreateTime(new Date()); orderReturnReasonEntity.setName("hahahahah"); rabbitTemplate.convertAndSend("hello-java-exchange", "hello.key", orderReturnReasonEntity); log.info("发送消息{} 成功", orderReturnReasonEntity); } //最后的结果 /*rO0ABXNyAEBjb20ucGhjLnBoY3N0b3JlLnN0b3Jlb3JkZXIub3JkZXIuZW50aXR5Lk9yZGVyUmV0dXJuUmVhc29uRW50aXR5AAAAAAAAAAECAAVMAApjcmVh dGVUaW1ldAAQTGphdmEvdXRpbC9EYXRlO0wAAmlkdAAQTGphdmEvbGFuZy9Mb25nO0wABG5hbWV0ABJMamF2YS9sYW5nL1N0cmluZztMAARzb3J0dAATTGph dmEvbGFuZy9JbnRlZ2VyO0wABnN0YXR1c3EAfgAEeHBzcgAOamF2YS51dGlsLkRhdGVoaoEBS1l0GQMAAHhwdwgAAAF7Lr+OdHhzcgAOamF2YS5sYW5nLkxv bmc7i+SQzI8j3wIAAUoABXZhbHVleHIAEGphdmEubGFuZy5OdW1iZXKGrJUdC5TgiwIAAHhwAAAAAAAAAAF0AAloYWhhaGFoYWhwcA== */
-
如果需要实现发送json消息,编写配置类
@Configuration public class MyRabbitConfig { @Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); } } //最后的结果为 //{"id":1,"name":"hahahahah","sort":null,"status":null,"createTime":1628577383709}
-
-
基础使用:从消息队列接收消息:使用
@RabbitListener
注解import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; @RabbitListener(queues = {"hello-java-queue"}) public void receiveMessage1(Message message, OrderReturnReasonEntity content, Channel channel) { log.info("receiveMessage 1监听到的消息为{} ", content); }
-
进阶使用-1 @RabbitListener 标记在类上,说明要监听那些队列 @RabbitHandler 标记在方法上,重载区分不同的消息
package com.phc.phcstore.storeorder.order.service.impl; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.phc.phcbstort.common.utils.PageUtils; import com.phc.phcbstort.common.utils.Query; import com.phc.phcstore.storeorder.order.dao.OrderItemDao; import com.phc.phcstore.storeorder.order.entity.OrderEntity; import com.phc.phcstore.storeorder.order.entity.OrderItemEntity; import com.phc.phcstore.storeorder.order.entity.OrderReturnReasonEntity; import com.phc.phcstore.storeorder.order.service.OrderItemService; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; import java.util.Map; @Service("orderItemService") @Slf4j @RabbitListener(queues = {"hello-java-queue"}) public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService { @Override public PageUtils queryPage(Map<String, Object> params) { IPage<OrderItemEntity> page = this.page( new Query<OrderItemEntity>().getPage(params), new QueryWrapper<OrderItemEntity>() ); return new PageUtils(page); } /** * queue:声明需要监听的所有队列 * 方法参数可以写一下类型 * 1. Message message 原生消息详细信息 * 2. T<发送的消息类型> OrderReturnReasonEntity content * 3. Channel 当前传输数据的通道 * <p> * queue: 很多人都可以来监听,只要收到消息,队列删除消息,只能有一个接受到消息 * 1) 订单服务启动多个 * 2) 只有一个消息完全处理完才能接受下一个消息 * <p> * 最后的使用 * * @RabbitListener 标记在类上,说明要监听那些队列 * @@RabbitHandler 标记在方法上,重载区分不同的消息 */ @RabbitHandler public void receiveMessage1(OrderReturnReasonEntity content) { log.info("receiveMessage 1监听到的消息为{} ", content); } @RabbitHandler public void receiveMessage2(OrderEntity content) { log.info("receiveMessage 2监听到的消息为{}", content); } }
-
进阶使用-2 消息发送
package com.phc.phcstore.storeorder.order.controller; import com.phc.phcstore.storeorder.order.entity.OrderEntity; import com.phc.phcstore.storeorder.order.entity.OrderReturnReasonEntity; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import java.util.Date; @RestController @Slf4j public class TestRabbitMQController { @Autowired RabbitTemplate rabbitTemplate; @GetMapping("/sendMsg") public String test(@RequestParam(value = "num", defaultValue = "10") Integer num) { for (int i = 0; i < num; i++) { if (i % 2 == 0) { OrderReturnReasonEntity orderReturnReasonEntity = new OrderReturnReasonEntity(); orderReturnReasonEntity.setId(1L); orderReturnReasonEntity.setCreateTime(new Date()); orderReturnReasonEntity.setName("偶数"); rabbitTemplate.convertAndSend("hello-java-exchange", "hello.key", orderReturnReasonEntity); log.info("发送消息{} 成功", orderReturnReasonEntity); } else { OrderEntity orderEntity = new OrderEntity(); orderEntity.setBillContent("偶数"); rabbitTemplate.convertAndSend("hello-java-exchange", "hello.key", orderEntity); log.info("发送消息{} 成功", orderEntity); } } return "success"; } }
-
进阶使用-3 打印出的日志
2021-08-11 10:31:21.580 INFO 9744 --- [nio-9000-exec-1] c.p.p.s.o.c.TestRabbitMQController : 发送消息OrderReturnReasonEntity(id=1, name=偶数, sort=null, status=null, createTime=Wed Aug 11 10:31:21 CST 2021) 成功 2021-08-11 10:31:21.583 INFO 9744 --- [nio-9000-exec-1] c.p.p.s.o.c.TestRabbitMQController : 发送消息OrderEntity(id=null, memberId=null, orderSn=null, couponId=null, createTime=null, memberUsername=null, totalAmount=null, payAmount=null, freightAmount=null, promotionAmount=null, integrationAmount=null, couponAmount=null, discountAmount=null, payType=null, sourceType=null, status=null, deliveryCompany=null, deliverySn=null, autoConfirmDay=null, integration=null, growth=null, billType=null, billHeader=null, billContent=偶数, billReceiverPhone=null, billReceiverEmail=null, receiverName=null, receiverPhone=null, receiverPostCode=null, receiverProvince=null, receiverCity=null, receiverRegion=null, receiverDetailAddress=null, note=null, confirmStatus=null, deleteStatus=null, useIntegration=null, paymentTime=null, deliveryTime=null, receiveTime=null, commentTime=null, modifyTime=null) 成功 2021-08-11 10:31:21.583 INFO 9744 --- [nio-9000-exec-1] c.p.p.s.o.c.TestRabbitMQController : 发送消息OrderReturnReasonEntity(id=1, name=偶数, sort=null, status=null, createTime=Wed Aug 11 10:31:21 CST 2021) 成功 2021-08-11 10:31:21.583 INFO 9744 --- [nio-9000-exec-1] c.p.p.s.o.c.TestRabbitMQController : 发送消息OrderEntity(id=null, memberId=null, orderSn=null, couponId=null, createTime=null, memberUsername=null, totalAmount=null, payAmount=null, freightAmount=null, promotionAmount=null, integrationAmount=null, couponAmount=null, discountAmount=null, payType=null, sourceType=null, status=null, deliveryCompany=null, deliverySn=null, autoConfirmDay=null, integration=null, growth=null, billType=null, billHeader=null, billContent=偶数, billReceiverPhone=null, billReceiverEmail=null, receiverName=null, receiverPhone=null, receiverPostCode=null, receiverProvince=null, receiverCity=null, receiverRegion=null, receiverDetailAddress=null, note=null, confirmStatus=null, deleteStatus=null, useIntegration=null, paymentTime=null, deliveryTime=null, receiveTime=null, commentTime=null, modifyTime=null) 成功 2021-08-11 10:31:21.584 INFO 9744 --- [nio-9000-exec-1] c.p.p.s.o.c.TestRabbitMQController : 发送消息OrderReturnReasonEntity(id=1, name=偶数, sort=null, status=null, createTime=Wed Aug 11 10:31:21 CST 2021) 成功 2021-08-11 10:31:21.584 INFO 9744 --- [nio-9000-exec-1] c.p.p.s.o.c.TestRabbitMQController : 发送消息OrderEntity(id=null, memberId=null, orderSn=null, couponId=null, createTime=null, memberUsername=null, totalAmount=null, payAmount=null, freightAmount=null, promotionAmount=null, integrationAmount=null, couponAmount=null, discountAmount=null, payType=null, sourceType=null, status=null, deliveryCompany=null, deliverySn=null, autoConfirmDay=null, integration=null, growth=null, billType=null, billHeader=null, billContent=偶数, billReceiverPhone=null, billReceiverEmail=null, receiverName=null, receiverPhone=null, receiverPostCode=null, receiverProvince=null, receiverCity=null, receiverRegion=null, receiverDetailAddress=null, note=null, confirmStatus=null, deleteStatus=null, useIntegration=null, paymentTime=null, deliveryTime=null, receiveTime=null, commentTime=null, modifyTime=null) 成功 2021-08-11 10:31:21.584 INFO 9744 --- [nio-9000-exec-1] c.p.p.s.o.c.TestRabbitMQController : 发送消息OrderReturnReasonEntity(id=1, name=偶数, sort=null, status=null, createTime=Wed Aug 11 10:31:21 CST 2021) 成功 2021-08-11 10:31:21.584 INFO 9744 --- [nio-9000-exec-1] c.p.p.s.o.c.TestRabbitMQController : 发送消息OrderEntity(id=null, memberId=null, orderSn=null, couponId=null, createTime=null, memberUsername=null, totalAmount=null, payAmount=null, freightAmount=null, promotionAmount=null, integrationAmount=null, couponAmount=null, discountAmount=null, payType=null, sourceType=null, status=null, deliveryCompany=null, deliverySn=null, autoConfirmDay=null, integration=null, growth=null, billType=null, billHeader=null, billContent=偶数, billReceiverPhone=null, billReceiverEmail=null, receiverName=null, receiverPhone=null, receiverPostCode=null, receiverProvince=null, receiverCity=null, receiverRegion=null, receiverDetailAddress=null, note=null, confirmStatus=null, deleteStatus=null, useIntegration=null, paymentTime=null, deliveryTime=null, receiveTime=null, commentTime=null, modifyTime=null) 成功 2021-08-11 10:31:21.585 INFO 9744 --- [nio-9000-exec-1] c.p.p.s.o.c.TestRabbitMQController : 发送消息OrderReturnReasonEntity(id=1, name=偶数, sort=null, status=null, createTime=Wed Aug 11 10:31:21 CST 2021) 成功 2021-08-11 10:31:21.586 INFO 9744 --- [nio-9000-exec-1] c.p.p.s.o.c.TestRabbitMQController : 发送消息OrderEntity(id=null, memberId=null, orderSn=null, couponId=null, createTime=null, memberUsername=null, totalAmount=null, payAmount=null, freightAmount=null, promotionAmount=null, integrationAmount=null, couponAmount=null, discountAmount=null, payType=null, sourceType=null, status=null, deliveryCompany=null, deliverySn=null, autoConfirmDay=null, integration=null, growth=null, billType=null, billHeader=null, billContent=偶数, billReceiverPhone=null, billReceiverEmail=null, receiverName=null, receiverPhone=null, receiverPostCode=null, receiverProvince=null, receiverCity=null, receiverRegion=null, receiverDetailAddress=null, note=null, confirmStatus=null, deleteStatus=null, useIntegration=null, paymentTime=null, deliveryTime=null, receiveTime=null, commentTime=null, modifyTime=null) 成功 2021-08-11 10:31:21.619 INFO 9744 --- [ntContainer#0-1] c.p.p.s.o.s.impl.OrderItemServiceImpl : receiveMessage 1监听到的消息为OrderReturnReasonEntity(id=1, name=偶数, sort=null, status=null, createTime=Wed Aug 11 10:31:21 CST 2021) 2021-08-11 10:31:21.625 INFO 9744 --- [ntContainer#0-1] c.p.p.s.o.s.impl.OrderItemServiceImpl : receiveMessage 2监听到的消息为OrderEntity(id=null, memberId=null, orderSn=null, couponId=null, createTime=null, memberUsername=null, totalAmount=null, payAmount=null, freightAmount=null, promotionAmount=null, integrationAmount=null, couponAmount=null, discountAmount=null, payType=null, sourceType=null, status=null, deliveryCompany=null, deliverySn=null, autoConfirmDay=null, integration=null, growth=null, billType=null, billHeader=null, billContent=偶数, billReceiverPhone=null, billReceiverEmail=null, receiverName=null, receiverPhone=null, receiverPostCode=null, receiverProvince=null, receiverCity=null, receiverRegion=null, receiverDetailAddress=null, note=null, confirmStatus=null, deleteStatus=null, useIntegration=null, paymentTime=null, deliveryTime=null, receiveTime=null, commentTime=null, modifyTime=null) 2021-08-11 10:31:21.626 INFO 9744 --- [ntContainer#0-1] c.p.p.s.o.s.impl.OrderItemServiceImpl : receiveMessage 1监听到的消息为OrderReturnReasonEntity(id=1, name=偶数, sort=null, status=null, createTime=Wed Aug 11 10:31:21 CST 2021) 2021-08-11 10:31:21.626 INFO 9744 --- [ntContainer#0-1] c.p.p.s.o.s.impl.OrderItemServiceImpl : receiveMessage 2监听到的消息为OrderEntity(id=null, memberId=null, orderSn=null, couponId=null, createTime=null, memberUsername=null, totalAmount=null, payAmount=null, freightAmount=null, promotionAmount=null, integrationAmount=null, couponAmount=null, discountAmount=null, payType=null, sourceType=null, status=null, deliveryCompany=null, deliverySn=null, autoConfirmDay=null, integration=null, growth=null, billType=null, billHeader=null, billContent=偶数, billReceiverPhone=null, billReceiverEmail=null, receiverName=null, receiverPhone=null, receiverPostCode=null, receiverProvince=null, receiverCity=null, receiverRegion=null, receiverDetailAddress=null, note=null, confirmStatus=null, deleteStatus=null, useIntegration=null, paymentTime=null, deliveryTime=null, receiveTime=null, commentTime=null, modifyTime=null) 2021-08-11 10:31:21.626 INFO 9744 --- [ntContainer#0-1] c.p.p.s.o.s.impl.OrderItemServiceImpl : receiveMessage 1监听到的消息为OrderReturnReasonEntity(id=1, name=偶数, sort=null, status=null, createTime=Wed Aug 11 10:31:21 CST 2021) 2021-08-11 10:31:21.627 INFO 9744 --- [ntContainer#0-1] c.p.p.s.o.s.impl.OrderItemServiceImpl : receiveMessage 2监听到的消息为OrderEntity(id=null, memberId=null, orderSn=null, couponId=null, createTime=null, memberUsername=null, totalAmount=null, payAmount=null, freightAmount=null, promotionAmount=null, integrationAmount=null, couponAmount=null, discountAmount=null, payType=null, sourceType=null, status=null, deliveryCompany=null, deliverySn=null, autoConfirmDay=null, integration=null, growth=null, billType=null, billHeader=null, billContent=偶数, billReceiverPhone=null, billReceiverEmail=null, receiverName=null, receiverPhone=null, receiverPostCode=null, receiverProvince=null, receiverCity=null, receiverRegion=null, receiverDetailAddress=null, note=null, confirmStatus=null, deleteStatus=null, useIntegration=null, paymentTime=null, deliveryTime=null, receiveTime=null, commentTime=null, modifyTime=null) 2021-08-11 10:31:21.627 INFO 9744 --- [ntContainer#0-1] c.p.p.s.o.s.impl.OrderItemServiceImpl : receiveMessage 1监听到的消息为OrderReturnReasonEntity(id=1, name=偶数, sort=null, status=null, createTime=Wed Aug 11 10:31:21 CST 2021) 2021-08-11 10:31:21.627 INFO 9744 --- [ntContainer#0-1] c.p.p.s.o.s.impl.OrderItemServiceImpl : receiveMessage 2监听到的消息为OrderEntity(id=null, memberId=null, orderSn=null, couponId=null, createTime=null, memberUsername=null, totalAmount=null, payAmount=null, freightAmount=null, promotionAmount=null, integrationAmount=null, couponAmount=null, discountAmount=null, payType=null, sourceType=null, status=null, deliveryCompany=null, deliverySn=null, autoConfirmDay=null, integration=null, growth=null, billType=null, billHeader=null, billContent=偶数, billReceiverPhone=null, billReceiverEmail=null, receiverName=null, receiverPhone=null, receiverPostCode=null, receiverProvince=null, receiverCity=null, receiverRegion=null, receiverDetailAddress=null, note=null, confirmStatus=null, deleteStatus=null, useIntegration=null, paymentTime=null, deliveryTime=null, receiveTime=null, commentTime=null, modifyTime=null) 2021-08-11 10:31:21.627 INFO 9744 --- [ntContainer#0-1] c.p.p.s.o.s.impl.OrderItemServiceImpl : receiveMessage 1监听到的消息为OrderReturnReasonEntity(id=1, name=偶数, sort=null, status=null, createTime=Wed Aug 11 10:31:21 CST 2021) 2021-08-11 10:31:21.628 INFO 9744 --- [ntContainer#0-1] c.p.p.s.o.s.impl.OrderItemServiceImpl : receiveMessage 2监听到的消息为OrderEntity(id=null, memberId=null, orderSn=null, couponId=null, createTime=null, memberUsername=null, totalAmount=null, payAmount=null, freightAmount=null, promotionAmount=null, integrationAmount=null, couponAmount=null, discountAmount=null, payType=null, sourceType=null, status=null, deliveryCompany=null, deliverySn=null, autoConfirmDay=null, integration=null, growth=null, billType=null, billHeader=null, billContent=偶数, billReceiverPhone=null, billReceiverEmail=null, receiverName=null, receiverPhone=null, receiverPostCode=null, receiverProvince=null, receiverCity=null, receiverRegion=null, receiverDetailAddress=null, note=null, confirmStatus=null, deleteStatus=null, useIntegration=null, paymentTime=null, deliveryTime=null, receiveTime=null, commentTime=null, modifyTime=null)
-
消息队列的可靠投递
-
为保证消息不会丢失,需要使用事务消息,性能会下降约200倍,为了高并发情况下能确认消息的成功与失败,引入了消息确认机制
- 生产端:发布者到服务器的确认 ,投递到queue未成功的回退机制
- 接收端:ack机制(让服务器知道那些消息被消费了)
-
ConfirmCallback:从发布者到RabbitMQ服务器的确认
-
添加配置
spring: rabbitmq: publisher-confirms: true
-
定制RabbitTemplate
package com.phc.phcstore.storeorder.order.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; /** * @Author: PengHaiChen * @Description: * @Date: Create in 14:35 2021/8/10 */ @Configuration @Slf4j public class MyRabbitConfig { @Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); } @Autowired public RabbitTemplate rabbitTemplate; /** * 定制rabbitTemplate 设置确认回调 * * @PostConstruct MyRabbitConfig对象创建完成后,执行这个方法 */ @PostConstruct public void initRabbitTemplate() { //消息抵达broker的确认回调 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * 只要消息抵达Broker ack = true * @param correlationData 当前消息的唯一关联数据 * @param ack 消息是否成功收到 * @param cause 失败的原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("消息数据【{}】 消息接受状态【{}】 消息失败的原因【{}】", correlationData, ack, cause); } }); } }
-
-
ReturnCallback:消息抵达队列的回调
-
添加配置
spring: rabbitmq: publisher-returns: true #消息正确抵达queue确认 template: mandatory: true #只要抵达队列 以异步发送 优先回调returns
-
定制RabbitTemplate
//消息抵达队列的回调 rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { /*** * 只要消息没有投递给指定的队列,就会触发这个回调 * @param message 投递失败的消息内容 * @param replyCode 回复的状态码 * @param replyText 回复的文本内容 * @param exchange 发送给的交换机 * @param routingKey 消息的路由键 */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.error("进入队列失败的消息==>{} ,回复码==>{} ,回复==>{} ,交换机==>{} , 路由键==>{} ", message, replyCode, replyText, exchange, routingKey); } });
-
发送消息的时候需要带上
CorrelationData
rabbitTemplate.convertAndSend("hello-java-exchange", "hello22.key",orderEntity, new CorrelationData(UUID.randomUUID().toString()));
-
日志
2021-08-12 15:25:40.513 ERROR 8724 --- [nectionFactory1] c.p.p.s.order.config.MyRabbitConfig : 进入队列失败的消息==>(Body:'{"id":null,"memberId":null,"orderSn":null,"couponId":null,"createTime":null,"memberUsername":null,"totalAmount":null,"payAmount":null,"freightAmount":null,"promotionAmount":null,"integrationAmount":null,"couponAmount":null,"discountAmount":null,"payType":null,"sourceType":null,"status":null,"deliveryCompany":null,"deliverySn":null,"autoConfirmDay":null,"integration":null,"growth":null,"billType":null,"billHeader":null,"billContent":"偶数","billReceiverPhone":null,"billReceiverEmail":null,"receiverName":null,"receiverPhone":null,"receiverPostCode":null,"receiverProvince":null,"receiverCity":null,"receiverRegion":null,"receiverDetailAddress":null,"note":null,"confirmStatus":null,"deleteStatus":null,"useIntegration":null,"paymentTime":null,"deliveryTime":null,"receiveTime":null,"commentTime":null,"modifyTime":null}' MessageProperties [headers={spring_returned_message_correlation=935c0b49-fc60-406a-81a3-0a3bf9d85356, __TypeId__=com.phc.phcstore.storeorder.order.entity.OrderEntity}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]) ,回复码==>312 ,回复==>NO_ROUTE ,交换机==>hello-java-exchange , 路由键==>hello22.key 2021-08-12 15:25:40.515 ERROR 8724 --- [nectionFactory3] c.p.p.s.order.config.MyRabbitConfig : 进入队列失败的消息==>(Body:'{"id":null,"memberId":null,"orderSn":null,"couponId":null,"createTime":null,"memberUsername":null,"totalAmount":null,"payAmount":null,"freightAmount":null,"promotionAmount":null,"integrationAmount":null,"couponAmount":null,"discountAmount":null,"payType":null,"sourceType":null,"status":null,"deliveryCompany":null,"deliverySn":null,"autoConfirmDay":null,"integration":null,"growth":null,"billType":null,"billHeader":null,"billContent":"偶数","billReceiverPhone":null,"billReceiverEmail":null,"receiverName":null,"receiverPhone":null,"receiverPostCode":null,"receiverProvince":null,"receiverCity":null,"receiverRegion":null,"receiverDetailAddress":null,"note":null,"confirmStatus":null,"deleteStatus":null,"useIntegration":null,"paymentTime":null,"deliveryTime":null,"receiveTime":null,"commentTime":null,"modifyTime":null}' MessageProperties [headers={spring_returned_message_correlation=4fbc0016-706b-45fe-addb-ca93151f9e68, __TypeId__=com.phc.phcstore.storeorder.order.entity.OrderEntity}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]) ,回复码==>312 ,回复==>NO_ROUTE ,交换机==>hello-java-exchange , 路由键==>hello22.key 2021-08-12 15:25:40.516 ERROR 8724 --- [nectionFactory3] c.p.p.s.order.config.MyRabbitConfig : 进入队列失败的消息==>(Body:'{"id":null,"memberId":null,"orderSn":null,"couponId":null,"createTime":null,"memberUsername":null,"totalAmount":null,"payAmount":null,"freightAmount":null,"promotionAmount":null,"integrationAmount":null,"couponAmount":null,"discountAmount":null,"payType":null,"sourceType":null,"status":null,"deliveryCompany":null,"deliverySn":null,"autoConfirmDay":null,"integration":null,"growth":null,"billType":null,"billHeader":null,"billContent":"偶数","billReceiverPhone":null,"billReceiverEmail":null,"receiverName":null,"receiverPhone":null,"receiverPostCode":null,"receiverProvince":null,"receiverCity":null,"receiverRegion":null,"receiverDetailAddress":null,"note":null,"confirmStatus":null,"deleteStatus":null,"useIntegration":null,"paymentTime":null,"deliveryTime":null,"receiveTime":null,"commentTime":null,"modifyTime":null}' MessageProperties [headers={spring_returned_message_correlation=3a62ff12-3cf6-4b7d-943b-cc3ec7e1058f, __TypeId__=com.phc.phcstore.storeorder.order.entity.OrderEntity}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]) ,回复码==>312 ,回复==>NO_ROUTE ,交换机==>hello-java-exchange , 路由键==>hello22.key 2021-08-12 15:25:40.520 ERROR 8724 --- [nectionFactory1] c.p.p.s.order.config.MyRabbitConfig : 进入队列失败的消息==>(Body:'{"id":null,"memberId":null,"orderSn":null,"couponId":null,"createTime":null,"memberUsername":null,"totalAmount":null,"payAmount":null,"freightAmount":null,"promotionAmount":null,"integrationAmount":null,"couponAmount":null,"discountAmount":null,"payType":null,"sourceType":null,"status":null,"deliveryCompany":null,"deliverySn":null,"autoConfirmDay":null,"integration":null,"growth":null,"billType":null,"billHeader":null,"billContent":"偶数","billReceiverPhone":null,"billReceiverEmail":null,"receiverName":null,"receiverPhone":null,"receiverPostCode":null,"receiverProvince":null,"receiverCity":null,"receiverRegion":null,"receiverDetailAddress":null,"note":null,"confirmStatus":null,"deleteStatus":null,"useIntegration":null,"paymentTime":null,"deliveryTime":null,"receiveTime":null,"commentTime":null,"modifyTime":null}' MessageProperties [headers={spring_returned_message_correlation=bdd9113a-459c-4cdd-888e-6f309994f044, __TypeId__=com.phc.phcstore.storeorder.order.entity.OrderEntity}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]) ,回复码==>312 ,回复==>NO_ROUTE ,交换机==>hello-java-exchange , 路由键==>hello22.key 2021-08-12 15:25:40.521 ERROR 8724 --- [nectionFactory3] c.p.p.s.order.config.MyRabbitConfig : 进入队列失败的消息==>(Body:'{"id":null,"memberId":null,"orderSn":null,"couponId":null,"createTime":null,"memberUsername":null,"totalAmount":null,"payAmount":null,"freightAmount":null,"promotionAmount":null,"integrationAmount":null,"couponAmount":null,"discountAmount":null,"payType":null,"sourceType":null,"status":null,"deliveryCompany":null,"deliverySn":null,"autoConfirmDay":null,"integration":null,"growth":null,"billType":null,"billHeader":null,"billContent":"偶数","billReceiverPhone":null,"billReceiverEmail":null,"receiverName":null,"receiverPhone":null,"receiverPostCode":null,"receiverProvince":null,"receiverCity":null,"receiverRegion":null,"receiverDetailAddress":null,"note":null,"confirmStatus":null,"deleteStatus":null,"useIntegration":null,"paymentTime":null,"deliveryTime":null,"receiveTime":null,"commentTime":null,"modifyTime":null}' MessageProperties [headers={spring_returned_message_correlation=5eeb8218-4c8e-4b7f-836f-f7465544e496, __TypeId__=com.phc.phcstore.storeorder.order.entity.OrderEntity}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]) ,回复码==>312 ,回复==>NO_ROUTE ,交换机==>hello-java-exchange , 路由键==>hello22.key
-
-
Ack:接收者收到了消息,回复broker ack,但是接受者宕机了,这个时候就会发生消息丢失,我们需要去手动确认消息到达
-
编写配置
spring: rabbitmq: listener: simple: acknowledge-mode: manual #手动ack消息
-
手动ack,在逻辑完成后手动调用签收和拒签的方法
@RabbitHandler public void receiveMessage1(Message message, OrderReturnReasonEntity content, Channel channel) { log.info("receiveMessage 1监听到的消息为{} ", content); //模拟网络中断 long deliveryTag = message.getMessageProperties().getDeliveryTag(); if (deliveryTag % 2 == 0) { try { //签收货物,第二个参数为 非批量模式 channel.basicAck(deliveryTag, false); } catch (IOException e) { e.printStackTrace(); } }else { try { //退货模式 request==true 是消息发回服务器,服务器重新入队 channel.basicNack(deliveryTag, false, true); } catch (IOException e) { e.printStackTrace(); } } }
-
-