SpringBoot下配置好RabbitMQ发现消息的消费处理是单线程处理的,在有些业务逻辑下是需要多线程并发消费消息的。经过一番研究,总结下实现多线程并发执行消费消息的配置:给ConnectionFactory指定自定义的线程池(TaskExecutor):
新增RabbitmqConfig.java
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
/**
* @author Administrator
* @date 2022/6/29 15:19
*/
@Configuration
public class RabbitmqConfig {
@Bean("batchQueueRabbitListenerContainerFactory")
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
//确认方式,manual为手动ack.
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//每次处理数据数量,提高并发量
//factory.setPrefetchCount(250);
//设置线程数
//factory.setConcurrentConsumers(30);
//最大线程数
//factory.setMaxConcurrentConsumers(50);
/* setConnectionFactory:设置spring-amqp的ConnectionFactory。 */
factory.setConnectionFactory(connectionFactory);
factory.setConcurrentConsumers(1); // 这里需要设置为1,这样消息会给到taskExecutor处理,测试发现,如果设置的很大,反而消息会直接在里面单线程处理
factory.setPrefetchCount(250);
//factory.setDefaultRequeueRejected(true);
//使用自定义线程池来启动消费者。
factory.setTaskExecutor(taskExecutor());
return factory;
}
@Bean("myTaskExecutor")
@Primary
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 设置核心线程数
executor.setCorePoolSize(100);
// 设置最大线程数
executor.setMaxPoolSize(100);
// 设置队列容量
executor.setQueueCapacity(0);
// 设置线程活跃时间(秒)
executor.setKeepAliveSeconds(300);
// 设置默认线程名称
executor.setThreadNamePrefix("thread-mq-queue-");
// 设置拒绝策略rejection-policy:当pool已经达到max size的时候,丢弃
// executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
// 等待所有任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
return executor;
}
}
在处理消息的类上,指定factory:
@Component
@Slf4j
public class ReceiveConfig {
private final Logger logger = LoggerFactory.getLogger(ReceiveConfig.class);
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = "some_queue_name", concurrency = "1-3",containerFactory = "batchQueueRabbitListenerContainerFactory")
@RabbitHandler
public void handleSomeQueue(@Payload SomeMessage someMessage, Channel channel, Message message) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
logger.info("消费的消息来⾃的队列名为:" + message.getMessageProperties().getConsumerQueue());
logger.info("执⾏"+ message.getMessageProperties().getConsumerQueue() +"中的消息的业务流程...");
// 处理消息
// 提交单条消息
channel.basicAck(deliveryTag,false);
}
}
文章评论