SpringBoot整合Redis实现类似MQ发布订阅功能

2022-12-21 964点热度 0人点赞 0条评论

发布订阅模式简介

发布订阅模式在我们日常的开发工作中算是比较常用的一种方式,发布订阅模式有可以被称为观察者模式,它定义的是一种一对多的关系模式,可以让多个订阅者同时监听到同一个发布者的消息,这个发布者又被称为是主题对象。然后通过发布者发布的消息,来通知观察者能够实时的更新自己获取到的消息内容。

有人说发布订阅模式与观察者模式是有区别的,这里我们先不对两者做展开的讨论,在后面我们会介绍相关的内容。这里我们首先来看发布订阅模式,如下图所示。

如图所示,在发布订阅模式中有两个关键性的节点,发布者与订阅者。我们可以将发布者看做是消息的生产方,而将订阅者看做是消息的消费方。下面我们就来看看在Spring Boot 项目中如何整合Redis来实现发布订阅。

Redis中的发布订阅

在Redis中为我们提供了发布订阅的功能,我们可以用这个功能来实现发布订阅模式的消息传入。该机制主要包括了三个部分,发布者、订阅者和Channel。这里的Channel 就是上图中所体现的消息代理。

消息发布

PUBLISH + channel + message #将信息 message 发送到指定的频道 channel

消息订阅

SUBSCRIBE channel

Spring Boot 整合Redis实现发布订阅

第一步、我们需要先建立一个Spring Boot的项目并且在POM文件中添加如下的配置内容。

<!-- redis 缓存操作 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- redis 缓存操作 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
<!-- redis 缓存操作 -->
 <dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-data-redis</artifactId>
 </dependency>

第二步、增加Redis相关的配置,代码如下。

@Configuration
@EnableCaching
public class RedisConfig {
@Bean
@SuppressWarnings(value = { "unchecked", "rawtypes" })
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory connectionFactory)
{
RedisTemplate<Object, Object> template = new RedisTemplate<>();
template.setConnectionFactory(connectionFactory);
FastJson2JsonRedisSerializer serializer = new FastJson2JsonRedisSerializer(Object.class);
ObjectMapper mapper = new ObjectMapper();
mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
serializer.setObjectMapper(mapper);
// 使用StringRedisSerializer来序列化和反序列化redis的key值
template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(serializer);
// Hash的key也采用StringRedisSerializer的序列化方式
template.setHashKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(serializer);
template.afterPropertiesSet();
return template;
}
}
@Configuration @EnableCaching public class RedisConfig { @Bean @SuppressWarnings(value = { "unchecked", "rawtypes" }) public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory connectionFactory) { RedisTemplate<Object, Object> template = new RedisTemplate<>(); template.setConnectionFactory(connectionFactory); FastJson2JsonRedisSerializer serializer = new FastJson2JsonRedisSerializer(Object.class); ObjectMapper mapper = new ObjectMapper(); mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); serializer.setObjectMapper(mapper); // 使用StringRedisSerializer来序列化和反序列化redis的key值 template.setKeySerializer(new StringRedisSerializer()); template.setValueSerializer(serializer); // Hash的key也采用StringRedisSerializer的序列化方式 template.setHashKeySerializer(new StringRedisSerializer()); template.setHashValueSerializer(serializer); template.afterPropertiesSet(); return template; } }
@Configuration
@EnableCaching
public class RedisConfig {
    @Bean
    @SuppressWarnings(value = { "unchecked", "rawtypes" })
    public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory connectionFactory)
    {
        RedisTemplate<Object, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(connectionFactory);

        FastJson2JsonRedisSerializer serializer = new FastJson2JsonRedisSerializer(Object.class);

        ObjectMapper mapper = new ObjectMapper();
        mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        serializer.setObjectMapper(mapper);

        // 使用StringRedisSerializer来序列化和反序列化redis的key值
        template.setKeySerializer(new StringRedisSerializer());
        template.setValueSerializer(serializer);

        // Hash的key也采用StringRedisSerializer的序列化方式
        template.setHashKeySerializer(new StringRedisSerializer());
        template.setHashValueSerializer(serializer);

        template.afterPropertiesSet();
        return template;
    }
}

第三步、在application.yml配置文件中添加Redis相关的连接内容。

# redis 配置
spring:
redis:
# 地址
host: 192.168.1.2
port: 6379
# 密码
password: redis
# 连接超时时间
timeout: 10s
lettuce:
pool:
# 连接池中的最小空闲连接
min-idle: 0
# 连接池中的最大空闲连接
max-idle: 8
# 连接池的最大数据库连接数
max-active: 8
# #连接池最大阻塞等待时间(使用负值表示没有限制)
max-wait: -1ms
# redis 配置 spring: redis: # 地址 host: 192.168.1.2 port: 6379 # 密码 password: redis # 连接超时时间 timeout: 10s lettuce: pool: # 连接池中的最小空闲连接 min-idle: 0 # 连接池中的最大空闲连接 max-idle: 8 # 连接池的最大数据库连接数 max-active: 8 # #连接池最大阻塞等待时间(使用负值表示没有限制) max-wait: -1ms
# redis 配置
spring:  
  redis:
    # 地址
    host: 192.168.1.2
    port: 6379
    # 密码
    password: redis
    # 连接超时时间
    timeout: 10s
    lettuce:
      pool:
        # 连接池中的最小空闲连接
        min-idle: 0
        # 连接池中的最大空闲连接
        max-idle: 8
        # 连接池的最大数据库连接数
        max-active: 8
        # #连接池最大阻塞等待时间(使用负值表示没有限制)
        max-wait: -1ms

第四步、编写消息发送者代码,在RedisTemplate中提供了一个convertAndSend()的方法用来发送消息。

public void setTest(){
redisTemplate.convertAndSend("redis.user","testssss");
}
public void setTest(){ redisTemplate.convertAndSend("redis.user","testssss"); }
public void setTest(){
     redisTemplate.convertAndSend("redis.user","testssss");
}

消息接收者

  • 第一步、在配置类中增加消息订阅者的相关配置内容代码如下。
@Configuration
public class RedisSubConfig {
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory factory,
RedisMessageListener listener) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
//订阅频道redis.news 和 redis.life 这个container 可以添加多个 messageListener
container.addMessageListener(listener, new ChannelTopic("redis.user"));
//container.addMessageListener(listener, new ChannelTopic("redis.news"));
return container;
}
}
@Configuration public class RedisSubConfig { @Bean public RedisMessageListenerContainer container(RedisConnectionFactory factory, RedisMessageListener listener) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(factory); //订阅频道redis.news 和 redis.life 这个container 可以添加多个 messageListener container.addMessageListener(listener, new ChannelTopic("redis.user")); //container.addMessageListener(listener, new ChannelTopic("redis.news")); return container; } }
@Configuration
public class RedisSubConfig {
    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory factory, 
                                                    RedisMessageListener listener) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(factory);
        //订阅频道redis.news 和 redis.life  这个container 可以添加多个 messageListener
        container.addMessageListener(listener, new ChannelTopic("redis.user"));
        //container.addMessageListener(listener, new ChannelTopic("redis.news"));
        return container;
    }
}
  • 第二步、继承
    org.springframework.data.redis.connection.MessageListener 实现一个消息监听的类RedisMessageListener,并且通过@Component注解将其注入到容器中。代码如下
@Component
public class RedisMessageListener implements MessageListener {
@Autowired
private RedisTemplate redisTemplate;
@Override
public void onMessage(Message message, byte[] pattern) {
// 获取消息
byte[] messageBody = message.getBody();
// 使用值序列化器转换
Object msg = redisTemplate.getValueSerializer().deserialize(messageBody);
// 获取监听的频道
byte[] channelByte = message.getChannel();
// 使用字符串序列化器转换
Object channel = redisTemplate.getStringSerializer().deserialize(channelByte);
// 渠道名称转换
String patternStr = new String(pattern);
System.out.println(patternStr);
System.out.println("---频道---: " + channel);
System.out.println("---消息内容---: " + msg);
}
}
@Component public class RedisMessageListener implements MessageListener { @Autowired private RedisTemplate redisTemplate; @Override public void onMessage(Message message, byte[] pattern) { // 获取消息 byte[] messageBody = message.getBody(); // 使用值序列化器转换 Object msg = redisTemplate.getValueSerializer().deserialize(messageBody); // 获取监听的频道 byte[] channelByte = message.getChannel(); // 使用字符串序列化器转换 Object channel = redisTemplate.getStringSerializer().deserialize(channelByte); // 渠道名称转换 String patternStr = new String(pattern); System.out.println(patternStr); System.out.println("---频道---: " + channel); System.out.println("---消息内容---: " + msg); } }
@Component
public class RedisMessageListener implements MessageListener {

    @Autowired
    private RedisTemplate redisTemplate;

    @Override
    public void onMessage(Message message, byte[] pattern) {
        // 获取消息
        byte[] messageBody = message.getBody();
        // 使用值序列化器转换
        Object msg = redisTemplate.getValueSerializer().deserialize(messageBody);
        // 获取监听的频道
        byte[] channelByte = message.getChannel();
        // 使用字符串序列化器转换
        Object channel = redisTemplate.getStringSerializer().deserialize(channelByte);
        // 渠道名称转换
        String patternStr = new String(pattern);
        System.out.println(patternStr);
        System.out.println("---频道---: " + channel);
        System.out.println("---消息内容---: " + msg);

    }
}

完成以上操作之后我们就可以启动项目调用相关的接口来进行发布定于的测试,结果如下。会看到控制台中有如下的输入内容,则证明整个的发布订阅功能已经实现了。

redis.user
---频道---: redis.user
---消息内容---: testssss

admin

这个人很懒,什么都没留下

文章评论

您需要 登录 之后才可以评论