Spring Boot RedisTemplate 实现发布/订阅
前言
Redis 本身支持消息的发布与订阅,本文讲解Spring Boot项目中如何实现消息发布/订阅。
内容
- 方式一
配置消息:
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
/**
* @author IILee
*/
@Configuration
public class RedisPubSubConfig {
/**
* 初始化监听器
* @param connectionFactory
* @param listenerAdapter
* @return
*/
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listenerAdapter, new PatternTopic("demo-channels"));
return container;
}
@Bean
public MessageListenerAdapter listenerAdapter() {
return new MessageListenerAdapter(proxyMessageReceiver(), "process");
}
@Bean
public ProxyMessageReceiver proxyMessageReceiver() {
return new ProxyMessageReceiver();
}
}
消费者如下:
public class ProxyMessageReceiver {
private final Logger LOG = LoggerFactory.getLogger(this.getClass());
@Resource
private MessageProcessService messageProcessService;
public void process(String body) {
}
}
- 方式二
配置如下:
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
/**
* @author IILee
*/
@Configuration
public class RedisPubSubConfig {
/**
* 初始化监听器
* @param connectionFactory
* @return
*/
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(proxyMessageReceiver(), new PatternTopic("demo-channels"));
return container;
}
@Bean
public ProxyMessageReceiver proxyMessageReceiver() {
return new ProxyMessageReceiver();
}
}
消费者代码如下:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import javax.annotation.Resource;
/**
* 下游消息处理类(包括:用户消息下发、关闭会话连接等)
* @author IILee
*/
public class ProxyMessageReceiver implements MessageListener {
private final Logger LOG = LoggerFactory.getLogger(this.getClass());
@Resource
private MessageProcessService messageProcessService;
@Override
public void onMessage(Message message, byte[] pattern) {
String traceId = TracingContext.genIfAbsentTraceId();
//trace
TracingContext.putTraceId(traceId);
try {
String channel = StringUtils.getStringUtf8(message.getChannel());
String body = StringUtils.getStringUtf8(message.getBody());
LOG.info("客服呼叫中心-处理下游消息开始, channel:{}, traceId:{}, 请求内容:{}", channel, traceId, body);
CallCentreMessageDTO messageDTO = JsonUtils.parseObject(body, CallCentreMessageDTO.class);
boolean success = messageProcessService.process(messageDTO);
LOG.info("客服呼叫中心-处理下游消息结束, traceId:{} 处理结果:{}", traceId, success);
} catch (Exception e) {
LOG.error(String.format("客服呼叫中心-处理下游消息异常, messageId:%s", traceId), e);
} finally {
TracingContext.removeTraceId();
}
}
}
- 消费者代码
@Service
public class RedisService {
@Resource
private StringRedisTemplate stringRedisTemplate;
//向通道发送消息的方法
public void sendChannelMess(String channel, String message) {
stringRedisTemplate.convertAndSend(channel, message);
}
}
后记
自测示例代码,生产环境需更详尽处理。