冰灯

云水禅心

  • 首页
  • 归档

  • 搜索
mybatis Dubbo Git Https ELK Elasticsearch logstash Springboot Nginx Java MQ

Springboot RabbitMQ 基础用法

发表于 2020-03-21 | 分类于 MQ | 0 | 阅读次数 543

由于公司的项目业务越来越繁杂,需要使用消息中间件的为服务进行一定的减压,所以就去了解些中间件,最后选了rabbitMQ。

一、消息中间件适合使用的几个场景(这里列举的不是全部):

1、系统间的解耦
比如某系统的数据需要同步到n个系统时,采取这种方式也是一种友好的实现,哪个系统需要数据就直接去订阅就行

2、异步通知消息
比如邮件通知,短信通知等等

3、高并发
如电商秒杀,促销活动等,可以适当的进行流量的控制,服务器的压力

目前我知道的有Kafka,RabbitMQ,ActiveMQ,阿里的RocketMQ

二、接下来我简单的学习下 RabbitMQ的用法

以Direct Exchange的方式举例:通过Routingkey精准的在Exchange找到相应的队列,另外还有 topic,fanout的方式

pom引入

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

1、基础配置

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    public static final String testDirectExchange= "Test-DirectExchange";
    public static final String testDirectExchangeQueue= "Test-Queue";
    public static final String bindingDirectExchangeRoutingKey= "Test-RoutingKey";


    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

        /**
         * 设置RabbitTemplate的ConfirmCallback和ReturnCallback的时候注意:
         * 不论在哪里设置,只能设置一个(即只能设置一次或不配置:可在生产者类注入RabbitTemplate后设置,或我是在这里直接配置)
         * 否则会报如下异常
         * Only one ConfirmCallback is supported by each RabbitTemplate
         */
        rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
        rabbitTemplate.setConfirmCallback((correlationData, b,  s)->{
            System.out.println("correlationData = [" + correlationData + "], b = [" + b + "], s = [" + s + "]");
        });
        rabbitTemplate.setReturnCallback((message, i, s, s1, s2)->{
            System.out.println("message = [" + message + "], i = [" + i + "], s = [" + s + "], s1 = [" + s1 + "], s2 = [" + s2 + "]");
        });
        return rabbitTemplate;
    }

    /**
     * 默认的json转换器,不支持实体对象的
     * @return
     */
    @Bean
    public Jackson2JsonMessageConverter jackson2JsonMessageConverter(){
        Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
        return jackson2JsonMessageConverter;
    }


    /**
     * 定义Exchange:这里定义为direct,另外的还有 fanout,topic
     * @return
     */
    @Bean
    public DirectExchange testDirectExchange(){
        return new DirectExchange(testDirectExchange);
    }

    /**
     * 定义队列
     * @return
     */
    @Bean
    public Queue testDirectExchangeQueue(){
        return new Queue(testDirectExchangeQueue);
    }

    /**
     * 将队列绑定到Exchange,通过bindingKey来进行精确路由
     * @return
     */
    @Bean
    public Binding bindingDirectExchangeQueue(){
        return BindingBuilder.bind(testDirectExchangeQueue()).to(testDirectExchange()).with(bindingDirectExchangeRoutingKey);
    }

//    @Bean
//    public IMQService mqService(RabbitTemplate rabbitTemplate){
//        return new MQService(rabbitTemplate);
//    }

}

2、发布消息

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.UUID;

@Component
public class MQService implements IMQService {

    @Autowired
    private RabbitTemplate rabbitTemplate;


    public MQService(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
//        this.rabbitTemplate.setConfirmCallback(tihs);
//        this.rabbitTemplate.setReturnCallback(tihs);
    }

    /**
     *     也可以直接在此类MQService 实现RabbitTemplate.ConfirmCallback和RabbitTemplate.ReturnCallback
     *     重写confirm和returnedMessage方法
     *     this.rabbitTemplate.setConfirmCallback(tihs);
     *     this.rabbitTemplate.setReturnCallback(tihs);
     *
     *     切记:每个rabbitTemplate只能分别设置一次这两个回调,否则报异常
     */
//    RabbitTemplate.ConfirmCallback confirmCallback=new RabbitTemplate.ConfirmCallback() {
//        @Override
//        public void confirm(CorrelationData correlationData, boolean b, String s) {
//            System.out.println("correlationData = [" + correlationData + "], b = [" + b + "], s = [" + s + "]");
//        }
//    } ;
//
//    RabbitTemplate.ReturnCallback returnCallback=new RabbitTemplate.ReturnCallback() {
//        @Override
//        public void returnedMessage(Message message, int i, String s, String s1, String s2) {
//            System.out.println("message = [" + message + "], i = [" + i + "], s = [" + s + "], s1 = [" + s1 + "], s2 = [" + s2 + "]");
//        }
//    };
//
//    public void sendMessage(MQMessage mqMessage) {
////        this.rabbitTemplate.setConfirmCallback(confirmCallback);
//        this.rabbitTemplate.setReturnCallback(returnCallback);
//        CorrelationData correlationData = new CorrelationData();
//        correlationData.setId(UUID.randomUUID().toString());
//        this.rabbitTemplate.convertAndSend(RabbitMQConfig.testDirectExchange,RabbitMQConfig.bindingDirectExchangeRoutingKey,mqMessage,correlationData);
//
//    }

    @Override
    public void sendExchangRoutingMessage(String exchange, String routingKey, MQMessage mqMessage) {
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(UUID.randomUUID().toString());
        this.rabbitTemplate.convertAndSend(exchange,routingKey,mqMessage,correlationData);

    }

    @Override
    public void sendRoutingMessage(String routingKey, MQMessage mqMessage) {
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(UUID.randomUUID().toString());
        this.rabbitTemplate.convertAndSend (routingKey,mqMessage,correlationData);
    }
}

3、消费者(springboot监听器的方式)

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 队列监听器
 * 这里我直接在一个类里面监听了多个队列的形式
 * 也可以直接在类上面加上 @RabbitListener结合 在方法上加上@RabbitHandler处理收到的信息
 */
@Component
public class MessageListener {

    @RabbitListener(queues = RabbitMQConfig.testDirectExchangeQueue)
    public void testQueueMessage(TestMQMessage mqMessage){
        System.out.println("Test-Queue Receive Msg = [" + mqMessage.getMsg() + "]");
    }

//    @RabbitListener(queues = RabbitMQConfig.testTopicExchangeQueue)
//    public void testTopicQueueMessage(TestMQMessage mqMessage){
//        System.out.println("Test-Topic-Queue Receive Msg = [" + mqMessage.getMsg() + "]");
//    }

}
# mybatis # Dubbo # Git # Https # ELK # Elasticsearch # logstash # Springboot # Nginx # Java # MQ
JDK-Tool
java 体验下RabbitMQ的几种模式
冰灯

冰灯

雁过无痕,敲出人生

23 日志
9 分类
11 标签
RSS
Github StackOverflow
Creative Commons
Links
  • 点滴记录
  • Zero
© 2021 冰灯