SpringBoot下配置好RabbitMQ发现消息的消费处理是单线程处理的,在有些业务逻辑下是需要多线程并发消费消息的。经过一番研究,总结下实现多线程并发执行消费消息的配置:给ConnectionFactory指定自定义的线程池(TaskExecutor):
新增RabbitmqConfig.java
import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.core.task.TaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; /** * @author Administrator * @date 2022/6/29 15:19 */ @Configuration public class RabbitmqConfig { @Bean("batchQueueRabbitListenerContainerFactory") public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){ SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); //确认方式,manual为手动ack. factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); //每次处理数据数量,提高并发量 //factory.setPrefetchCount(250); //设置线程数 //factory.setConcurrentConsumers(30); //最大线程数 //factory.setMaxConcurrentConsumers(50); /* setConnectionFactory:设置spring-amqp的ConnectionFactory。 */ factory.setConnectionFactory(connectionFactory); factory.setConcurrentConsumers(1); // 这里需要设置为1,这样消息会给到taskExecutor处理,测试发现,如果设置的很大,反而消息会直接在里面单线程处理 factory.setPrefetchCount(250); //factory.setDefaultRequeueRejected(true); //使用自定义线程池来启动消费者。 factory.setTaskExecutor(taskExecutor()); return factory; } @Bean("myTaskExecutor") @Primary public TaskExecutor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 设置核心线程数 executor.setCorePoolSize(100); // 设置最大线程数 executor.setMaxPoolSize(100); // 设置队列容量 executor.setQueueCapacity(0); // 设置线程活跃时间(秒) executor.setKeepAliveSeconds(300); // 设置默认线程名称 executor.setThreadNamePrefix("thread-mq-queue-"); // 设置拒绝策略rejection-policy:当pool已经达到max size的时候,丢弃 // executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); // 等待所有任务结束后再关闭线程池 executor.setWaitForTasksToCompleteOnShutdown(true); return executor; } }
在处理消息的类上,指定factory:
@Component @Slf4j public class ReceiveConfig { private final Logger logger = LoggerFactory.getLogger(ReceiveConfig.class); @Autowired private RabbitTemplate rabbitTemplate; @RabbitListener(queues = "some_queue_name", concurrency = "1-3",containerFactory = "batchQueueRabbitListenerContainerFactory") @RabbitHandler public void handleSomeQueue(@Payload SomeMessage someMessage, Channel channel, Message message) throws IOException { long deliveryTag = message.getMessageProperties().getDeliveryTag(); logger.info("消费的消息来⾃的队列名为:" + message.getMessageProperties().getConsumerQueue()); logger.info("执⾏"+ message.getMessageProperties().getConsumerQueue() +"中的消息的业务流程..."); // 处理消息 // 提交单条消息 channel.basicAck(deliveryTag,false); } }
文章评论