首页 > 后端开发 > 最新文章

掌控消息全链路(4)——RabbitMQ

CSDN博客 2026-04-25 12:11:40 人看过


1.事务

AMQP(高级消息队列协议)实现了事务机制,主要用于确保消息的原子性发布和确认。换言之,它允许你将多个操作(如发送消息、确认消息)绑定在一起,要么全部成功,要么全部失败

配置队列

@Configuration public class RabbitMQConfig {    @Bean("transQueue")    public Queue transQueue(){        return QueueBuilder.durable(Constants.TRANS_QUEUE).build();    } }

配置RabbitTemplate和事务管理器

@Configuration public class RabbitTemplateConfig {    @Bean("transRabbitTemplate")    public RabbitTemplate transRabbitTemplate(ConnectionFactory connectionFactory) {        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);        rabbitTemplate.setChannelTransacted(true);        return rabbitTemplate;    }    @Bean    public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory){        return new RabbitTransactionManager(connectionFactory);    } }

发送消息

@RestController @RequestMapping("/producer") public class ProducerController {    @Resource(name = "transRabbitTemplate")    private RabbitTemplate transRabbitTemplate;    @Transactional    @RequestMapping("/trans")    public String trans(){        transRabbitTemplate.convertAndSend("",Constants.TRANS_QUEUE, "trans test ---> 1");        int num = 5 / 0;        transRabbitTemplate.convertAndSend("",Constants.TRANS_QUEUE, "trans test ---> 2");        return "发送成功";    } }

在这里插入图片描述
Spring Boot的RabbitMQ自动配置默认会确认模式,但RabbitMQ不允许同一个通道同时使用事务模式和确认模式,所以需要确保publisher confirms被禁用

spring:  rabbitmq:    publisher-confirm-type: none    publisher-returns: false

2.消息限流

消息限流(Flow Control) 是RabbitMQ防止生产者发送消息速度超过消费者处理能力,导致消息积压和系统崩溃的保护机制

声明和配置交换器、队列和绑定关系

@Configuration public class RabbitMQConfig {    @Bean("qosQueue")    public Queue qosQueue(){        return QueueBuilder.durable(Constants.QOS_QUEUE).build();    }    @Bean("qosExchange")    public DirectExchange qosExchange(){        return ExchangeBuilder.directExchange(Constants.QOS_EXCHANGE).build();    }    @Bean("qosBinding")    public Binding qosBinding(@Qualifier("qosExchange") DirectExchange directExchange,@Qualifier("qosQueue") Queue queue){        return  BindingBuilder.bind(queue).to(directExchange).with("qos");    } }

限制每个消费者未确认的最大消息数

spring:  rabbitmq:    listener:      simple:        acknowledge-mode: manual # 消费者确认机制        prefetch: 5

配置消费者

@Component @Slf4j public class QosListener {    @RabbitListener(queues = Constants.QOS_QUEUE)    public void handMessage(Message message, Channel channel) throws IOException {        long deliveryTag = message.getMessageProperties().getDeliveryTag();        try{            log.info("接收到消息:{},deliveryTag:{}",                    new String(message.getBody(), StandardCharsets.UTF_8),                    deliveryTag);            log.info("处理成功");            // channel.basicAck(deliveryTag, true); // 消费者不确认消息        }catch(Exception e){            channel.basicNack(deliveryTag,true,true);        }    } }

发送消息

@RestController @RequestMapping("/producer") public class ProducerController {    @Resource(name = "rabbitTemplate")    private RabbitTemplate rabbitTemplate;    @RequestMapping("/qos")    public String qos(){        for (int i = 0; i < 20; i++) {            rabbitTemplate.convertAndSend(Constants.QOS_EXCHANGE, "qos", "qos test:" + i);        }        return "发送成功";    } }

在这里插入图片描述

版权声明:倡导尊重与保护知识产权。未经许可,任何人不得复制、转载、或以其他方式使用本站《原创》内容,违者将追究其法律责任。本站文章内容,部分图片来源于网络,如有侵权,请联系我们修改或者删除处理。

编辑推荐

热门文章