SpringBoot下RabbitMQ实现多线程并发执行消费消息

2022-07-04 3050点热度 0人点赞 0条评论

SpringBoot下配置好RabbitMQ发现消息的消费处理是单线程处理的,在有些业务逻辑下是需要多线程并发消费消息的。经过一番研究,总结下实现多线程并发执行消费消息的配置:给ConnectionFactory指定自定义的线程池(TaskExecutor):

新增RabbitmqConfig.java

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;


/**
 * @author Administrator
 * @date 2022/6/29 15:19
 */
@Configuration
    public class RabbitmqConfig {
    @Bean("batchQueueRabbitListenerContainerFactory")
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        //确认方式,manual为手动ack.
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        //每次处理数据数量,提高并发量
        //factory.setPrefetchCount(250);
        //设置线程数
        //factory.setConcurrentConsumers(30);
        //最大线程数
        //factory.setMaxConcurrentConsumers(50);
        /* setConnectionFactory:设置spring-amqp的ConnectionFactory。 */
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrentConsumers(1); // 这里需要设置为1,这样消息会给到taskExecutor处理,测试发现,如果设置的很大,反而消息会直接在里面单线程处理
        factory.setPrefetchCount(250);
        //factory.setDefaultRequeueRejected(true);
        //使用自定义线程池来启动消费者。
        factory.setTaskExecutor(taskExecutor());
        return factory;
    }

    @Bean("myTaskExecutor")
    @Primary
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 设置核心线程数
        executor.setCorePoolSize(100);
        // 设置最大线程数
        executor.setMaxPoolSize(100);
        // 设置队列容量
        executor.setQueueCapacity(0);
        // 设置线程活跃时间(秒)
        executor.setKeepAliveSeconds(300);
        // 设置默认线程名称
        executor.setThreadNamePrefix("thread-mq-queue-");
        // 设置拒绝策略rejection-policy:当pool已经达到max size的时候,丢弃
        // executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        // 等待所有任务结束后再关闭线程池
        executor.setWaitForTasksToCompleteOnShutdown(true);
        return executor;
    }
}

在处理消息的类上,指定factory:

@Component
@Slf4j
public class ReceiveConfig {

    private  final Logger logger = LoggerFactory.getLogger(ReceiveConfig.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = "some_queue_name", concurrency = "1-3",containerFactory = "batchQueueRabbitListenerContainerFactory")
    @RabbitHandler
    public void handleSomeQueue(@Payload SomeMessage someMessage, Channel channel, Message message) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        logger.info("消费的消息来⾃的队列名为:" + message.getMessageProperties().getConsumerQueue());
        logger.info("执⾏"+ message.getMessageProperties().getConsumerQueue() +"中的消息的业务流程...");
        
        // 处理消息
        // 提交单条消息
        channel.basicAck(deliveryTag,false);
    }
}

 

admin

这个人很懒,什么都没留下

文章评论

您需要 登录 之后才可以评论