由于公司的项目业务越来越繁杂,需要使用消息中间件的为服务进行一定的减压,所以就去了解些中间件,最后选了rabbitMQ。
一、消息中间件适合使用的几个场景(这里列举的不是全部):
1、系统间的解耦
比如某系统的数据需要同步到n个系统时,采取这种方式也是一种友好的实现,哪个系统需要数据就直接去订阅就行
2、异步通知消息
比如邮件通知,短信通知等等
3、高并发
如电商秒杀,促销活动等,可以适当的进行流量的控制,服务器的压力
目前我知道的有Kafka,RabbitMQ,ActiveMQ,阿里的RocketMQ
二、接下来我简单的学习下 RabbitMQ的用法
以Direct Exchange的方式举例:通过Routingkey精准的在Exchange找到相应的队列,另外还有 topic,fanout的方式
pom引入
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
1、基础配置
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
public static final String testDirectExchange= "Test-DirectExchange";
public static final String testDirectExchangeQueue= "Test-Queue";
public static final String bindingDirectExchangeRoutingKey= "Test-RoutingKey";
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
/**
* 设置RabbitTemplate的ConfirmCallback和ReturnCallback的时候注意:
* 不论在哪里设置,只能设置一个(即只能设置一次或不配置:可在生产者类注入RabbitTemplate后设置,或我是在这里直接配置)
* 否则会报如下异常
* Only one ConfirmCallback is supported by each RabbitTemplate
*/
rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
rabbitTemplate.setConfirmCallback((correlationData, b, s)->{
System.out.println("correlationData = [" + correlationData + "], b = [" + b + "], s = [" + s + "]");
});
rabbitTemplate.setReturnCallback((message, i, s, s1, s2)->{
System.out.println("message = [" + message + "], i = [" + i + "], s = [" + s + "], s1 = [" + s1 + "], s2 = [" + s2 + "]");
});
return rabbitTemplate;
}
/**
* 默认的json转换器,不支持实体对象的
* @return
*/
@Bean
public Jackson2JsonMessageConverter jackson2JsonMessageConverter(){
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
return jackson2JsonMessageConverter;
}
/**
* 定义Exchange:这里定义为direct,另外的还有 fanout,topic
* @return
*/
@Bean
public DirectExchange testDirectExchange(){
return new DirectExchange(testDirectExchange);
}
/**
* 定义队列
* @return
*/
@Bean
public Queue testDirectExchangeQueue(){
return new Queue(testDirectExchangeQueue);
}
/**
* 将队列绑定到Exchange,通过bindingKey来进行精确路由
* @return
*/
@Bean
public Binding bindingDirectExchangeQueue(){
return BindingBuilder.bind(testDirectExchangeQueue()).to(testDirectExchange()).with(bindingDirectExchangeRoutingKey);
}
// @Bean
// public IMQService mqService(RabbitTemplate rabbitTemplate){
// return new MQService(rabbitTemplate);
// }
}
2、发布消息
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;
@Component
public class MQService implements IMQService {
@Autowired
private RabbitTemplate rabbitTemplate;
public MQService(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
// this.rabbitTemplate.setConfirmCallback(tihs);
// this.rabbitTemplate.setReturnCallback(tihs);
}
/**
* 也可以直接在此类MQService 实现RabbitTemplate.ConfirmCallback和RabbitTemplate.ReturnCallback
* 重写confirm和returnedMessage方法
* this.rabbitTemplate.setConfirmCallback(tihs);
* this.rabbitTemplate.setReturnCallback(tihs);
*
* 切记:每个rabbitTemplate只能分别设置一次这两个回调,否则报异常
*/
// RabbitTemplate.ConfirmCallback confirmCallback=new RabbitTemplate.ConfirmCallback() {
// @Override
// public void confirm(CorrelationData correlationData, boolean b, String s) {
// System.out.println("correlationData = [" + correlationData + "], b = [" + b + "], s = [" + s + "]");
// }
// } ;
//
// RabbitTemplate.ReturnCallback returnCallback=new RabbitTemplate.ReturnCallback() {
// @Override
// public void returnedMessage(Message message, int i, String s, String s1, String s2) {
// System.out.println("message = [" + message + "], i = [" + i + "], s = [" + s + "], s1 = [" + s1 + "], s2 = [" + s2 + "]");
// }
// };
//
// public void sendMessage(MQMessage mqMessage) {
//// this.rabbitTemplate.setConfirmCallback(confirmCallback);
// this.rabbitTemplate.setReturnCallback(returnCallback);
// CorrelationData correlationData = new CorrelationData();
// correlationData.setId(UUID.randomUUID().toString());
// this.rabbitTemplate.convertAndSend(RabbitMQConfig.testDirectExchange,RabbitMQConfig.bindingDirectExchangeRoutingKey,mqMessage,correlationData);
//
// }
@Override
public void sendExchangRoutingMessage(String exchange, String routingKey, MQMessage mqMessage) {
CorrelationData correlationData = new CorrelationData();
correlationData.setId(UUID.randomUUID().toString());
this.rabbitTemplate.convertAndSend(exchange,routingKey,mqMessage,correlationData);
}
@Override
public void sendRoutingMessage(String routingKey, MQMessage mqMessage) {
CorrelationData correlationData = new CorrelationData();
correlationData.setId(UUID.randomUUID().toString());
this.rabbitTemplate.convertAndSend (routingKey,mqMessage,correlationData);
}
}
3、消费者(springboot监听器的方式)
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 队列监听器
* 这里我直接在一个类里面监听了多个队列的形式
* 也可以直接在类上面加上 @RabbitListener结合 在方法上加上@RabbitHandler处理收到的信息
*/
@Component
public class MessageListener {
@RabbitListener(queues = RabbitMQConfig.testDirectExchangeQueue)
public void testQueueMessage(TestMQMessage mqMessage){
System.out.println("Test-Queue Receive Msg = [" + mqMessage.getMsg() + "]");
}
// @RabbitListener(queues = RabbitMQConfig.testTopicExchangeQueue)
// public void testTopicQueueMessage(TestMQMessage mqMessage){
// System.out.println("Test-Topic-Queue Receive Msg = [" + mqMessage.getMsg() + "]");
// }
}