RabbitMQ 消息队列

 

消息队列RabbitMQ

  • 消息队列的作用
    • 异步处理
    • 应用解耦
    • 流量控制
  • 重要概念
    • 消息代理 (message broker)
    • 目的地 (destination)
      • 队列 (queue) : 点对点消息通信
        • 消息发送者发送消息消息代理将其放进一个队列中,消息接收者从队列中获取消息内容,消息读取后移除队列
        • 消息只有一个唯一的发送者和接收者,但不是说只能有一个接收者
      • 主题 (topic) : 发布/订阅 消息通信
        • 发送者发送消息到主题,多个接收者监听主题,消息到达时接收者同时收到消息
    • 规范
      • JMS(java messsage service) java 消息服务:基于jvm消息代理的规范
      • AMQP(advanced message queuing protocol)高级消息队列协议,兼容JMS

RabbitMQ概念

  • Messsage : 消息是不具名的,由消息头和消息体组成,消息体是不透明的,消息头由可选的属性组成
  • Publisher: 消息的生产者,也就是向消息中间件发布消息的服务
  • Exchange:交换器,用来接受生产者发送的消息并将这些消息路由给服务器中的队列

RabbitMQ消息发布模型

graph TD;
    Publisher生产者-->Message头+体,route-key路由键,路由键指定将消息放入哪个存储队列;
    Message头+体,route-key路由键,路由键指定将消息放入哪个存储队列-->Broker消息代理中的Exchange交换机;
    Broker消息代理中的Exchange交换机-->绑定Queue消息存储队列;

RabbitMQ消息消费者模型

一个客户端只会建立一条连接

客户端通过Channel与Broker建立连接,一个Connection会有很多Channel

RabbitMQ安装

docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.9-management

SpringBoot集成AMQP使用RabbitMQ

  • 依赖

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
  • 将依赖导入后就自动配置了

    RabbitConnectionFactoryBean //连接工厂
    RabbitMessagingTemplate
    amqpAdmin
    rabbitTemplate
    
  • 给配置文件中配置 host和port和虚拟host,并且使用注解@EnableRabbit标记主类

    spring:
    	rabbitmq:
        	host: 192.168.23.182
    	    port: 5672
        	virtual-host: /
    
  • 创建交换机

    @Autowired
        AmqpAdmin amqpAdmin;
      
        @Test
        void contextLoads() {
            DirectExchange directExchange = new DirectExchange("hello-java-exchange", true, false);
            amqpAdmin.declareExchange(directExchange);
            log.info("exchange [{}] create success", "hello-java-exchange");
      
        }
    
  • 创建队列

    @Test
    void createQueue() {
        Queue queue = new Queue("hello-java-queue", true, false, false);
        amqpAdmin.declareQueue(queue);
        log.info("create queue [{}]success", "hello-java-queue");
      
    }
    
  • 交换机和队列绑定:exchange指定的交换机与destination目的地进行绑定,使用 routingKey作为路由键

     @Test
        void createBinding() {
            Binding binding = new Binding("hello-java-queue", Binding.DestinationType.QUEUE,
                    "hello-java-exchange","hello.key",null);
            amqpAdmin.declareBinding(binding);
            log.info("交换机 {} 与队列 {} 绑定成功","hello-java-exchange","hello-java-queue");
        }
    
  • 向消息队列发送消息

        @Autowired
        RabbitTemplate rabbitTemplate;
    
        /**
         * 测试发送消息
         * 第一个参数为交换机名字
         * 第二个参数为路由键
         * 第三个参数为消息内容
         */
        @Test
        void sendMessageTest() {
            string = "hello world";
            //发送对象 如果发送的消息是一个对象,会使用序列化机制
            OrderReturnReasonEntity orderReturnReasonEntity = new OrderReturnReasonEntity();
            orderReturnReasonEntity.setId(1L);
            orderReturnReasonEntity.setCreateTime(new Date());
            orderReturnReasonEntity.setName("hahahahah");
            rabbitTemplate.convertAndSend("hello-java-exchange", "hello.key",
                    orderReturnReasonEntity);
            log.info("发送消息{} 成功", orderReturnReasonEntity);
        }
    //最后的结果
    /*rO0ABXNyAEBjb20ucGhjLnBoY3N0b3JlLnN0b3Jlb3JkZXIub3JkZXIuZW50aXR5Lk9yZGVyUmV0dXJuUmVhc29uRW50aXR5AAAAAAAAAAECAAVMAApjcmVh
    dGVUaW1ldAAQTGphdmEvdXRpbC9EYXRlO0wAAmlkdAAQTGphdmEvbGFuZy9Mb25nO0wABG5hbWV0ABJMamF2YS9sYW5nL1N0cmluZztMAARzb3J0dAATTGph
    dmEvbGFuZy9JbnRlZ2VyO0wABnN0YXR1c3EAfgAEeHBzcgAOamF2YS51dGlsLkRhdGVoaoEBS1l0GQMAAHhwdwgAAAF7Lr+OdHhzcgAOamF2YS5sYW5nLkxv
    bmc7i+SQzI8j3wIAAUoABXZhbHVleHIAEGphdmEubGFuZy5OdW1iZXKGrJUdC5TgiwIAAHhwAAAAAAAAAAF0AAloYWhhaGFoYWhwcA==
    */
    
    • 如果需要实现发送json消息,编写配置类

      @Configuration
      public class MyRabbitConfig {
          @Bean
          public MessageConverter messageConverter() {
              return new Jackson2JsonMessageConverter();
          }
      }
      //最后的结果为
      //{"id":1,"name":"hahahahah","sort":null,"status":null,"createTime":1628577383709}
      
  • 基础使用:从消息队列接收消息:使用 @RabbitListener注解

        import com.rabbitmq.client.Channel;
    	import org.springframework.amqp.core.Message;
      
    	@RabbitListener(queues = {"hello-java-queue"})
        public void receiveMessage1(Message message,
                                    OrderReturnReasonEntity content,
                                    Channel channel) {
      
            log.info("receiveMessage 1监听到的消息为{} ", content);
        }
    
  • 进阶使用-1 @RabbitListener 标记在类上,说明要监听那些队列 @RabbitHandler 标记在方法上,重载区分不同的消息

    package com.phc.phcstore.storeorder.order.service.impl;
      
    import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
    import com.baomidou.mybatisplus.core.metadata.IPage;
    import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
    import com.phc.phcbstort.common.utils.PageUtils;
    import com.phc.phcbstort.common.utils.Query;
    import com.phc.phcstore.storeorder.order.dao.OrderItemDao;
    import com.phc.phcstore.storeorder.order.entity.OrderEntity;
    import com.phc.phcstore.storeorder.order.entity.OrderItemEntity;
    import com.phc.phcstore.storeorder.order.entity.OrderReturnReasonEntity;
    import com.phc.phcstore.storeorder.order.service.OrderItemService;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Service;
      
    import java.util.Map;
      
      
    @Service("orderItemService")
    @Slf4j
    @RabbitListener(queues = {"hello-java-queue"})
    public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService {
      
        @Override
        public PageUtils queryPage(Map<String, Object> params) {
            IPage<OrderItemEntity> page = this.page(
                    new Query<OrderItemEntity>().getPage(params),
                    new QueryWrapper<OrderItemEntity>()
            );
      
            return new PageUtils(page);
        }
      
        /**
         * queue:声明需要监听的所有队列
         * 方法参数可以写一下类型
         * 1. Message message 原生消息详细信息
         * 2. T<发送的消息类型> OrderReturnReasonEntity content
         * 3. Channel 当前传输数据的通道
         * <p>
         * queue: 很多人都可以来监听,只要收到消息,队列删除消息,只能有一个接受到消息
         * 1) 订单服务启动多个
         * 2) 只有一个消息完全处理完才能接受下一个消息
         * <p>
         * 最后的使用
         *
         * @RabbitListener 标记在类上,说明要监听那些队列
         * @@RabbitHandler 标记在方法上,重载区分不同的消息
         */
        @RabbitHandler
        public void receiveMessage1(OrderReturnReasonEntity content) {
            log.info("receiveMessage 1监听到的消息为{} ", content);
        }
      
        @RabbitHandler
        public void receiveMessage2(OrderEntity content) {
            log.info("receiveMessage 2监听到的消息为{}", content);
        }
    }
    
  • 进阶使用-2 消息发送

    package com.phc.phcstore.storeorder.order.controller;
      
    import com.phc.phcstore.storeorder.order.entity.OrderEntity;
    import com.phc.phcstore.storeorder.order.entity.OrderReturnReasonEntity;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.RestController;
      
    import java.util.Date;
      
    @RestController
    @Slf4j
    public class TestRabbitMQController {
      
        @Autowired
        RabbitTemplate rabbitTemplate;
      
        @GetMapping("/sendMsg")
        public String test(@RequestParam(value = "num", defaultValue = "10") Integer num) {
            for (int i = 0; i < num; i++) {
                if (i % 2 == 0) {
                    OrderReturnReasonEntity orderReturnReasonEntity = new OrderReturnReasonEntity();
                    orderReturnReasonEntity.setId(1L);
                    orderReturnReasonEntity.setCreateTime(new Date());
                    orderReturnReasonEntity.setName("偶数");
                    rabbitTemplate.convertAndSend("hello-java-exchange", "hello.key",
                            orderReturnReasonEntity);
                    log.info("发送消息{} 成功", orderReturnReasonEntity);
                } else {
                    OrderEntity orderEntity = new OrderEntity();
                    orderEntity.setBillContent("偶数");
                    rabbitTemplate.convertAndSend("hello-java-exchange", "hello.key",
                            orderEntity);
                    log.info("发送消息{} 成功", orderEntity);
                }
            }
            return "success";
        }
    }
    
  • 进阶使用-3 打印出的日志

    2021-08-11 10:31:21.580  INFO 9744 --- [nio-9000-exec-1] c.p.p.s.o.c.TestRabbitMQController       : 发送消息OrderReturnReasonEntity(id=1, name=偶数, sort=null, status=null, createTime=Wed Aug 11 10:31:21 CST 2021) 成功
    2021-08-11 10:31:21.583  INFO 9744 --- [nio-9000-exec-1] c.p.p.s.o.c.TestRabbitMQController       : 发送消息OrderEntity(id=null, memberId=null, orderSn=null, couponId=null, createTime=null, memberUsername=null, totalAmount=null, payAmount=null, freightAmount=null, promotionAmount=null, integrationAmount=null, couponAmount=null, discountAmount=null, payType=null, sourceType=null, status=null, deliveryCompany=null, deliverySn=null, autoConfirmDay=null, integration=null, growth=null, billType=null, billHeader=null, billContent=偶数, billReceiverPhone=null, billReceiverEmail=null, receiverName=null, receiverPhone=null, receiverPostCode=null, receiverProvince=null, receiverCity=null, receiverRegion=null, receiverDetailAddress=null, note=null, confirmStatus=null, deleteStatus=null, useIntegration=null, paymentTime=null, deliveryTime=null, receiveTime=null, commentTime=null, modifyTime=null) 成功
    2021-08-11 10:31:21.583  INFO 9744 --- [nio-9000-exec-1] c.p.p.s.o.c.TestRabbitMQController       : 发送消息OrderReturnReasonEntity(id=1, name=偶数, sort=null, status=null, createTime=Wed Aug 11 10:31:21 CST 2021) 成功
    2021-08-11 10:31:21.583  INFO 9744 --- [nio-9000-exec-1] c.p.p.s.o.c.TestRabbitMQController       : 发送消息OrderEntity(id=null, memberId=null, orderSn=null, couponId=null, createTime=null, memberUsername=null, totalAmount=null, payAmount=null, freightAmount=null, promotionAmount=null, integrationAmount=null, couponAmount=null, discountAmount=null, payType=null, sourceType=null, status=null, deliveryCompany=null, deliverySn=null, autoConfirmDay=null, integration=null, growth=null, billType=null, billHeader=null, billContent=偶数, billReceiverPhone=null, billReceiverEmail=null, receiverName=null, receiverPhone=null, receiverPostCode=null, receiverProvince=null, receiverCity=null, receiverRegion=null, receiverDetailAddress=null, note=null, confirmStatus=null, deleteStatus=null, useIntegration=null, paymentTime=null, deliveryTime=null, receiveTime=null, commentTime=null, modifyTime=null) 成功
    2021-08-11 10:31:21.584  INFO 9744 --- [nio-9000-exec-1] c.p.p.s.o.c.TestRabbitMQController       : 发送消息OrderReturnReasonEntity(id=1, name=偶数, sort=null, status=null, createTime=Wed Aug 11 10:31:21 CST 2021) 成功
    2021-08-11 10:31:21.584  INFO 9744 --- [nio-9000-exec-1] c.p.p.s.o.c.TestRabbitMQController       : 发送消息OrderEntity(id=null, memberId=null, orderSn=null, couponId=null, createTime=null, memberUsername=null, totalAmount=null, payAmount=null, freightAmount=null, promotionAmount=null, integrationAmount=null, couponAmount=null, discountAmount=null, payType=null, sourceType=null, status=null, deliveryCompany=null, deliverySn=null, autoConfirmDay=null, integration=null, growth=null, billType=null, billHeader=null, billContent=偶数, billReceiverPhone=null, billReceiverEmail=null, receiverName=null, receiverPhone=null, receiverPostCode=null, receiverProvince=null, receiverCity=null, receiverRegion=null, receiverDetailAddress=null, note=null, confirmStatus=null, deleteStatus=null, useIntegration=null, paymentTime=null, deliveryTime=null, receiveTime=null, commentTime=null, modifyTime=null) 成功
    2021-08-11 10:31:21.584  INFO 9744 --- [nio-9000-exec-1] c.p.p.s.o.c.TestRabbitMQController       : 发送消息OrderReturnReasonEntity(id=1, name=偶数, sort=null, status=null, createTime=Wed Aug 11 10:31:21 CST 2021) 成功
    2021-08-11 10:31:21.584  INFO 9744 --- [nio-9000-exec-1] c.p.p.s.o.c.TestRabbitMQController       : 发送消息OrderEntity(id=null, memberId=null, orderSn=null, couponId=null, createTime=null, memberUsername=null, totalAmount=null, payAmount=null, freightAmount=null, promotionAmount=null, integrationAmount=null, couponAmount=null, discountAmount=null, payType=null, sourceType=null, status=null, deliveryCompany=null, deliverySn=null, autoConfirmDay=null, integration=null, growth=null, billType=null, billHeader=null, billContent=偶数, billReceiverPhone=null, billReceiverEmail=null, receiverName=null, receiverPhone=null, receiverPostCode=null, receiverProvince=null, receiverCity=null, receiverRegion=null, receiverDetailAddress=null, note=null, confirmStatus=null, deleteStatus=null, useIntegration=null, paymentTime=null, deliveryTime=null, receiveTime=null, commentTime=null, modifyTime=null) 成功
    2021-08-11 10:31:21.585  INFO 9744 --- [nio-9000-exec-1] c.p.p.s.o.c.TestRabbitMQController       : 发送消息OrderReturnReasonEntity(id=1, name=偶数, sort=null, status=null, createTime=Wed Aug 11 10:31:21 CST 2021) 成功
    2021-08-11 10:31:21.586  INFO 9744 --- [nio-9000-exec-1] c.p.p.s.o.c.TestRabbitMQController       : 发送消息OrderEntity(id=null, memberId=null, orderSn=null, couponId=null, createTime=null, memberUsername=null, totalAmount=null, payAmount=null, freightAmount=null, promotionAmount=null, integrationAmount=null, couponAmount=null, discountAmount=null, payType=null, sourceType=null, status=null, deliveryCompany=null, deliverySn=null, autoConfirmDay=null, integration=null, growth=null, billType=null, billHeader=null, billContent=偶数, billReceiverPhone=null, billReceiverEmail=null, receiverName=null, receiverPhone=null, receiverPostCode=null, receiverProvince=null, receiverCity=null, receiverRegion=null, receiverDetailAddress=null, note=null, confirmStatus=null, deleteStatus=null, useIntegration=null, paymentTime=null, deliveryTime=null, receiveTime=null, commentTime=null, modifyTime=null) 成功
    2021-08-11 10:31:21.619  INFO 9744 --- [ntContainer#0-1] c.p.p.s.o.s.impl.OrderItemServiceImpl    : receiveMessage 1监听到的消息为OrderReturnReasonEntity(id=1, name=偶数, sort=null, status=null, createTime=Wed Aug 11 10:31:21 CST 2021) 
    2021-08-11 10:31:21.625  INFO 9744 --- [ntContainer#0-1] c.p.p.s.o.s.impl.OrderItemServiceImpl    : receiveMessage 2监听到的消息为OrderEntity(id=null, memberId=null, orderSn=null, couponId=null, createTime=null, memberUsername=null, totalAmount=null, payAmount=null, freightAmount=null, promotionAmount=null, integrationAmount=null, couponAmount=null, discountAmount=null, payType=null, sourceType=null, status=null, deliveryCompany=null, deliverySn=null, autoConfirmDay=null, integration=null, growth=null, billType=null, billHeader=null, billContent=偶数, billReceiverPhone=null, billReceiverEmail=null, receiverName=null, receiverPhone=null, receiverPostCode=null, receiverProvince=null, receiverCity=null, receiverRegion=null, receiverDetailAddress=null, note=null, confirmStatus=null, deleteStatus=null, useIntegration=null, paymentTime=null, deliveryTime=null, receiveTime=null, commentTime=null, modifyTime=null)
    2021-08-11 10:31:21.626  INFO 9744 --- [ntContainer#0-1] c.p.p.s.o.s.impl.OrderItemServiceImpl    : receiveMessage 1监听到的消息为OrderReturnReasonEntity(id=1, name=偶数, sort=null, status=null, createTime=Wed Aug 11 10:31:21 CST 2021) 
    2021-08-11 10:31:21.626  INFO 9744 --- [ntContainer#0-1] c.p.p.s.o.s.impl.OrderItemServiceImpl    : receiveMessage 2监听到的消息为OrderEntity(id=null, memberId=null, orderSn=null, couponId=null, createTime=null, memberUsername=null, totalAmount=null, payAmount=null, freightAmount=null, promotionAmount=null, integrationAmount=null, couponAmount=null, discountAmount=null, payType=null, sourceType=null, status=null, deliveryCompany=null, deliverySn=null, autoConfirmDay=null, integration=null, growth=null, billType=null, billHeader=null, billContent=偶数, billReceiverPhone=null, billReceiverEmail=null, receiverName=null, receiverPhone=null, receiverPostCode=null, receiverProvince=null, receiverCity=null, receiverRegion=null, receiverDetailAddress=null, note=null, confirmStatus=null, deleteStatus=null, useIntegration=null, paymentTime=null, deliveryTime=null, receiveTime=null, commentTime=null, modifyTime=null)
    2021-08-11 10:31:21.626  INFO 9744 --- [ntContainer#0-1] c.p.p.s.o.s.impl.OrderItemServiceImpl    : receiveMessage 1监听到的消息为OrderReturnReasonEntity(id=1, name=偶数, sort=null, status=null, createTime=Wed Aug 11 10:31:21 CST 2021) 
    2021-08-11 10:31:21.627  INFO 9744 --- [ntContainer#0-1] c.p.p.s.o.s.impl.OrderItemServiceImpl    : receiveMessage 2监听到的消息为OrderEntity(id=null, memberId=null, orderSn=null, couponId=null, createTime=null, memberUsername=null, totalAmount=null, payAmount=null, freightAmount=null, promotionAmount=null, integrationAmount=null, couponAmount=null, discountAmount=null, payType=null, sourceType=null, status=null, deliveryCompany=null, deliverySn=null, autoConfirmDay=null, integration=null, growth=null, billType=null, billHeader=null, billContent=偶数, billReceiverPhone=null, billReceiverEmail=null, receiverName=null, receiverPhone=null, receiverPostCode=null, receiverProvince=null, receiverCity=null, receiverRegion=null, receiverDetailAddress=null, note=null, confirmStatus=null, deleteStatus=null, useIntegration=null, paymentTime=null, deliveryTime=null, receiveTime=null, commentTime=null, modifyTime=null)
    2021-08-11 10:31:21.627  INFO 9744 --- [ntContainer#0-1] c.p.p.s.o.s.impl.OrderItemServiceImpl    : receiveMessage 1监听到的消息为OrderReturnReasonEntity(id=1, name=偶数, sort=null, status=null, createTime=Wed Aug 11 10:31:21 CST 2021) 
    2021-08-11 10:31:21.627  INFO 9744 --- [ntContainer#0-1] c.p.p.s.o.s.impl.OrderItemServiceImpl    : receiveMessage 2监听到的消息为OrderEntity(id=null, memberId=null, orderSn=null, couponId=null, createTime=null, memberUsername=null, totalAmount=null, payAmount=null, freightAmount=null, promotionAmount=null, integrationAmount=null, couponAmount=null, discountAmount=null, payType=null, sourceType=null, status=null, deliveryCompany=null, deliverySn=null, autoConfirmDay=null, integration=null, growth=null, billType=null, billHeader=null, billContent=偶数, billReceiverPhone=null, billReceiverEmail=null, receiverName=null, receiverPhone=null, receiverPostCode=null, receiverProvince=null, receiverCity=null, receiverRegion=null, receiverDetailAddress=null, note=null, confirmStatus=null, deleteStatus=null, useIntegration=null, paymentTime=null, deliveryTime=null, receiveTime=null, commentTime=null, modifyTime=null)
    2021-08-11 10:31:21.627  INFO 9744 --- [ntContainer#0-1] c.p.p.s.o.s.impl.OrderItemServiceImpl    : receiveMessage 1监听到的消息为OrderReturnReasonEntity(id=1, name=偶数, sort=null, status=null, createTime=Wed Aug 11 10:31:21 CST 2021) 
    2021-08-11 10:31:21.628  INFO 9744 --- [ntContainer#0-1] c.p.p.s.o.s.impl.OrderItemServiceImpl    : receiveMessage 2监听到的消息为OrderEntity(id=null, memberId=null, orderSn=null, couponId=null, createTime=null, memberUsername=null, totalAmount=null, payAmount=null, freightAmount=null, promotionAmount=null, integrationAmount=null, couponAmount=null, discountAmount=null, payType=null, sourceType=null, status=null, deliveryCompany=null, deliverySn=null, autoConfirmDay=null, integration=null, growth=null, billType=null, billHeader=null, billContent=偶数, billReceiverPhone=null, billReceiverEmail=null, receiverName=null, receiverPhone=null, receiverPostCode=null, receiverProvince=null, receiverCity=null, receiverRegion=null, receiverDetailAddress=null, note=null, confirmStatus=null, deleteStatus=null, useIntegration=null, paymentTime=null, deliveryTime=null, receiveTime=null, commentTime=null, modifyTime=null)
    
  • 消息队列的可靠投递

    • 为保证消息不会丢失,需要使用事务消息,性能会下降约200倍,为了高并发情况下能确认消息的成功与失败,引入了消息确认机制

      • 生产端:发布者到服务器的确认 ,投递到queue未成功的回退机制
      • 接收端:ack机制(让服务器知道那些消息被消费了)
    • ConfirmCallback:从发布者到RabbitMQ服务器的确认

      • 添加配置

        spring:
          rabbitmq:
            publisher-confirms: true
        
      • 定制RabbitTemplate

        package com.phc.phcstore.storeorder.order.config;
              
        import lombok.extern.slf4j.Slf4j;
        import org.springframework.amqp.core.Message;
        import org.springframework.amqp.rabbit.connection.CorrelationData;
        import org.springframework.amqp.rabbit.core.RabbitTemplate;
        import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
        import org.springframework.amqp.support.converter.MessageConverter;
        import org.springframework.beans.factory.annotation.Autowired;
        import org.springframework.context.annotation.Bean;
        import org.springframework.context.annotation.Configuration;
              
        import javax.annotation.PostConstruct;
              
        /**
         * @Author: PengHaiChen
         * @Description:
         * @Date: Create in 14:35 2021/8/10
         */
        @Configuration
        @Slf4j
        public class MyRabbitConfig {
            @Bean
            public MessageConverter messageConverter() {
                return new Jackson2JsonMessageConverter();
            }
              
            @Autowired
            public RabbitTemplate rabbitTemplate;
              
            /**
             * 定制rabbitTemplate 设置确认回调
             *
             * @PostConstruct MyRabbitConfig对象创建完成后,执行这个方法
             */
            @PostConstruct
            public void initRabbitTemplate() {
                //消息抵达broker的确认回调
                rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                    /**
                     * 只要消息抵达Broker ack = true
                     * @param correlationData 当前消息的唯一关联数据
                     * @param ack 消息是否成功收到
                     * @param cause 失败的原因
                     */
                    @Override
                    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                        log.info("消息数据【{}】 消息接受状态【{}】 消息失败的原因【{}】", correlationData, ack, cause);
                    }
                });
            }
        }
        
    • ReturnCallback:消息抵达队列的回调

      • 添加配置

        spring:
          rabbitmq:
            publisher-returns: true #消息正确抵达queue确认
            template:
              mandatory: true #只要抵达队列 以异步发送 优先回调returns
        
      • 定制RabbitTemplate

                //消息抵达队列的回调
                rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
                    /***
                     * 只要消息没有投递给指定的队列,就会触发这个回调
                     * @param message   投递失败的消息内容
                     * @param replyCode 回复的状态码
                     * @param replyText 回复的文本内容
                     * @param exchange  发送给的交换机
                     * @param routingKey 消息的路由键
                     */
                    @Override
                    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                        log.error("进入队列失败的消息==>{} ,回复码==>{} ,回复==>{} ,交换机==>{} , 路由键==>{} ",
                                message, replyCode, replyText, exchange, routingKey);
              
                    }
                });
        
      • 发送消息的时候需要带上 CorrelationData

        rabbitTemplate.convertAndSend("hello-java-exchange", "hello22.key",orderEntity, new CorrelationData(UUID.randomUUID().toString()));
        
      • 日志

        2021-08-12 15:25:40.513 ERROR 8724 --- [nectionFactory1] c.p.p.s.order.config.MyRabbitConfig      : 进入队列失败的消息==>(Body:'{"id":null,"memberId":null,"orderSn":null,"couponId":null,"createTime":null,"memberUsername":null,"totalAmount":null,"payAmount":null,"freightAmount":null,"promotionAmount":null,"integrationAmount":null,"couponAmount":null,"discountAmount":null,"payType":null,"sourceType":null,"status":null,"deliveryCompany":null,"deliverySn":null,"autoConfirmDay":null,"integration":null,"growth":null,"billType":null,"billHeader":null,"billContent":"偶数","billReceiverPhone":null,"billReceiverEmail":null,"receiverName":null,"receiverPhone":null,"receiverPostCode":null,"receiverProvince":null,"receiverCity":null,"receiverRegion":null,"receiverDetailAddress":null,"note":null,"confirmStatus":null,"deleteStatus":null,"useIntegration":null,"paymentTime":null,"deliveryTime":null,"receiveTime":null,"commentTime":null,"modifyTime":null}' MessageProperties [headers={spring_returned_message_correlation=935c0b49-fc60-406a-81a3-0a3bf9d85356, __TypeId__=com.phc.phcstore.storeorder.order.entity.OrderEntity}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]) ,回复码==>312 ,回复==>NO_ROUTE ,交换机==>hello-java-exchange , 路由键==>hello22.key 
        2021-08-12 15:25:40.515 ERROR 8724 --- [nectionFactory3] c.p.p.s.order.config.MyRabbitConfig      : 进入队列失败的消息==>(Body:'{"id":null,"memberId":null,"orderSn":null,"couponId":null,"createTime":null,"memberUsername":null,"totalAmount":null,"payAmount":null,"freightAmount":null,"promotionAmount":null,"integrationAmount":null,"couponAmount":null,"discountAmount":null,"payType":null,"sourceType":null,"status":null,"deliveryCompany":null,"deliverySn":null,"autoConfirmDay":null,"integration":null,"growth":null,"billType":null,"billHeader":null,"billContent":"偶数","billReceiverPhone":null,"billReceiverEmail":null,"receiverName":null,"receiverPhone":null,"receiverPostCode":null,"receiverProvince":null,"receiverCity":null,"receiverRegion":null,"receiverDetailAddress":null,"note":null,"confirmStatus":null,"deleteStatus":null,"useIntegration":null,"paymentTime":null,"deliveryTime":null,"receiveTime":null,"commentTime":null,"modifyTime":null}' MessageProperties [headers={spring_returned_message_correlation=4fbc0016-706b-45fe-addb-ca93151f9e68, __TypeId__=com.phc.phcstore.storeorder.order.entity.OrderEntity}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]) ,回复码==>312 ,回复==>NO_ROUTE ,交换机==>hello-java-exchange , 路由键==>hello22.key 
        2021-08-12 15:25:40.516 ERROR 8724 --- [nectionFactory3] c.p.p.s.order.config.MyRabbitConfig      : 进入队列失败的消息==>(Body:'{"id":null,"memberId":null,"orderSn":null,"couponId":null,"createTime":null,"memberUsername":null,"totalAmount":null,"payAmount":null,"freightAmount":null,"promotionAmount":null,"integrationAmount":null,"couponAmount":null,"discountAmount":null,"payType":null,"sourceType":null,"status":null,"deliveryCompany":null,"deliverySn":null,"autoConfirmDay":null,"integration":null,"growth":null,"billType":null,"billHeader":null,"billContent":"偶数","billReceiverPhone":null,"billReceiverEmail":null,"receiverName":null,"receiverPhone":null,"receiverPostCode":null,"receiverProvince":null,"receiverCity":null,"receiverRegion":null,"receiverDetailAddress":null,"note":null,"confirmStatus":null,"deleteStatus":null,"useIntegration":null,"paymentTime":null,"deliveryTime":null,"receiveTime":null,"commentTime":null,"modifyTime":null}' MessageProperties [headers={spring_returned_message_correlation=3a62ff12-3cf6-4b7d-943b-cc3ec7e1058f, __TypeId__=com.phc.phcstore.storeorder.order.entity.OrderEntity}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]) ,回复码==>312 ,回复==>NO_ROUTE ,交换机==>hello-java-exchange , 路由键==>hello22.key 
        2021-08-12 15:25:40.520 ERROR 8724 --- [nectionFactory1] c.p.p.s.order.config.MyRabbitConfig      : 进入队列失败的消息==>(Body:'{"id":null,"memberId":null,"orderSn":null,"couponId":null,"createTime":null,"memberUsername":null,"totalAmount":null,"payAmount":null,"freightAmount":null,"promotionAmount":null,"integrationAmount":null,"couponAmount":null,"discountAmount":null,"payType":null,"sourceType":null,"status":null,"deliveryCompany":null,"deliverySn":null,"autoConfirmDay":null,"integration":null,"growth":null,"billType":null,"billHeader":null,"billContent":"偶数","billReceiverPhone":null,"billReceiverEmail":null,"receiverName":null,"receiverPhone":null,"receiverPostCode":null,"receiverProvince":null,"receiverCity":null,"receiverRegion":null,"receiverDetailAddress":null,"note":null,"confirmStatus":null,"deleteStatus":null,"useIntegration":null,"paymentTime":null,"deliveryTime":null,"receiveTime":null,"commentTime":null,"modifyTime":null}' MessageProperties [headers={spring_returned_message_correlation=bdd9113a-459c-4cdd-888e-6f309994f044, __TypeId__=com.phc.phcstore.storeorder.order.entity.OrderEntity}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]) ,回复码==>312 ,回复==>NO_ROUTE ,交换机==>hello-java-exchange , 路由键==>hello22.key 
        2021-08-12 15:25:40.521 ERROR 8724 --- [nectionFactory3] c.p.p.s.order.config.MyRabbitConfig      : 进入队列失败的消息==>(Body:'{"id":null,"memberId":null,"orderSn":null,"couponId":null,"createTime":null,"memberUsername":null,"totalAmount":null,"payAmount":null,"freightAmount":null,"promotionAmount":null,"integrationAmount":null,"couponAmount":null,"discountAmount":null,"payType":null,"sourceType":null,"status":null,"deliveryCompany":null,"deliverySn":null,"autoConfirmDay":null,"integration":null,"growth":null,"billType":null,"billHeader":null,"billContent":"偶数","billReceiverPhone":null,"billReceiverEmail":null,"receiverName":null,"receiverPhone":null,"receiverPostCode":null,"receiverProvince":null,"receiverCity":null,"receiverRegion":null,"receiverDetailAddress":null,"note":null,"confirmStatus":null,"deleteStatus":null,"useIntegration":null,"paymentTime":null,"deliveryTime":null,"receiveTime":null,"commentTime":null,"modifyTime":null}' MessageProperties [headers={spring_returned_message_correlation=5eeb8218-4c8e-4b7f-836f-f7465544e496, __TypeId__=com.phc.phcstore.storeorder.order.entity.OrderEntity}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]) ,回复码==>312 ,回复==>NO_ROUTE ,交换机==>hello-java-exchange , 路由键==>hello22.key 
        
    • Ack:接收者收到了消息,回复broker ack,但是接受者宕机了,这个时候就会发生消息丢失,我们需要去手动确认消息到达

      • 编写配置

        spring:
           rabbitmq:
            listener:
              simple:
                acknowledge-mode: manual #手动ack消息
        
      • 手动ack,在逻辑完成后手动调用签收和拒签的方法

            @RabbitHandler
            public void receiveMessage1(Message message, OrderReturnReasonEntity content, Channel channel) {
                log.info("receiveMessage 1监听到的消息为{} ", content);
                //模拟网络中断
                long deliveryTag = message.getMessageProperties().getDeliveryTag();
                if (deliveryTag % 2 == 0) {
                    try {
                        //签收货物,第二个参数为 非批量模式
                        channel.basicAck(deliveryTag, false);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }else {
                    try {
        	            //退货模式 request==true 是消息发回服务器,服务器重新入队
                        channel.basicNack(deliveryTag, false, true);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }