IILeeのBlog

IILeeのBlog

Spring Boot RedisTemplate 实现发布/订阅

281
2021-07-01
Spring Boot RedisTemplate 实现发布/订阅

前言

Redis 本身支持消息的发布与订阅,本文讲解Spring Boot项目中如何实现消息发布/订阅。

内容

  • 方式一

配置消息:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

/**
 * @author IILee
 */
@Configuration
public class RedisPubSubConfig {

    /**
     * 初始化监听器
     * @param connectionFactory
     * @param listenerAdapter
     * @return
     */
    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory,
                                                   MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(listenerAdapter, new PatternTopic("demo-channels"));
        return container;
    }

    @Bean
    public MessageListenerAdapter listenerAdapter() {
        return new MessageListenerAdapter(proxyMessageReceiver(), "process");
    }

    @Bean
    public ProxyMessageReceiver proxyMessageReceiver() {
        return new ProxyMessageReceiver();
    }
}

消费者如下:

public class ProxyMessageReceiver {
    private final Logger LOG = LoggerFactory.getLogger(this.getClass());

    @Resource
    private MessageProcessService messageProcessService;

    public void process(String body) {

    }

}
  • 方式二

配置如下:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;

/**
 * @author IILee
 */
@Configuration
public class RedisPubSubConfig {

    /**
     * 初始化监听器
     * @param connectionFactory
     * @return
     */
    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(proxyMessageReceiver(), new PatternTopic("demo-channels"));
        return container;
    }

    @Bean
    public ProxyMessageReceiver proxyMessageReceiver() {
        return new ProxyMessageReceiver();
    }

}

消费者代码如下:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;

import javax.annotation.Resource;

/**
 * 下游消息处理类(包括:用户消息下发、关闭会话连接等)
 * @author IILee
 */
public class ProxyMessageReceiver implements MessageListener {
    private final Logger LOG = LoggerFactory.getLogger(this.getClass());

    @Resource
    private MessageProcessService messageProcessService;

    @Override
    public void onMessage(Message message, byte[] pattern) {

        String traceId = TracingContext.genIfAbsentTraceId();
        //trace
        TracingContext.putTraceId(traceId);

        try {
            String channel = StringUtils.getStringUtf8(message.getChannel());
            String body = StringUtils.getStringUtf8(message.getBody());

            LOG.info("客服呼叫中心-处理下游消息开始, channel:{}, traceId:{}, 请求内容:{}", channel, traceId, body);

            CallCentreMessageDTO messageDTO = JsonUtils.parseObject(body, CallCentreMessageDTO.class);

            boolean success = messageProcessService.process(messageDTO);
            LOG.info("客服呼叫中心-处理下游消息结束, traceId:{} 处理结果:{}", traceId, success);
        } catch (Exception e) {
            LOG.error(String.format("客服呼叫中心-处理下游消息异常, messageId:%s", traceId), e);
        } finally {
            TracingContext.removeTraceId();
        }
    }
}

  • 消费者代码
@Service
public class RedisService {
    @Resource
    private StringRedisTemplate stringRedisTemplate;

  //向通道发送消息的方法
    public void sendChannelMess(String channel, String message) {
        stringRedisTemplate.convertAndSend(channel, message);
    }
}

后记

自测示例代码,生产环境需更详尽处理。