冰灯

云水禅心

  • 首页
  • 归档

  • 搜索
mybatis Dubbo Git Https ELK Elasticsearch logstash Springboot Nginx Java MQ

java 体验下RabbitMQ的几种模式

发表于 2020-03-21 | 分类于 MQ | 0 | 阅读次数 390

之前项目中只是单纯的使用其中的一种方式,这里介绍下其他的方式

一、生产者

1、创建服务连接

    private static ConnectionFactory connectionFactory = null;

    static {
        connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.43.120");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("root");
//        connectionFactory.setVirtualHost("/");
    }

2、定义队列


    public static void declareExchange(String exchangeType,String testQueue, String testExchange, String testRoutingkey) {
        try {
            Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
            channel.exchangeDeclare(testExchange,exchangeType,true);
            channel.queueDeclare(testQueue,false,false,false,null);
            channel.queueBind(testQueue, testExchange, testRoutingkey);
            System.out.println("declareExchange,exchangeType = [" + exchangeType + "], testQueue = [" + testQueue + "], testExchange = [" + testExchange + "], testRoutingkey = [" + testRoutingkey + "]");
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }

    }
    public static void declareQueue(String testQueue) {
        try {
            Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(testQueue,false,false,false,null);
            System.out.println("declareQueue,testQueue = [" + testQueue + "]");
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }

    }




3、发布消息
public static void publishExchange(String msg, String testExchange, String testRoutingkey) {
    try {
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        channel.basicPublish(
                testExchange,
                testRoutingkey,
                null,
                msg.getBytes()
        );
        System.out.println("publishExchange,msg = [" + msg + "], testQueue = [" + MQConfig.queue_test + "], testExchange = [" + testExchange + "], testRoutingkey = [" + testRoutingkey + "]");
    } catch (IOException e) {
        e.printStackTrace();
    } catch (TimeoutException e) {
        e.printStackTrace();
    }

}

public static void publishToQueue(String msg, String testQueue) {
    try {
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        channel.basicPublish(
                "",
                testQueue,
                null,
                msg.getBytes()
        );
        System.out.println("publishToQueuen,msg = [" + msg + "], testQueue = [" + testQueue + "]");
    } catch (IOException e) {
        e.printStackTrace();
    } catch (TimeoutException e) {
        e.printStackTrace();
    }

}
    
}

二、消费者

 public static void comsumer(String name,String testQueue) {
        try {
            Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
            while (true){
		//自动确认回复
                channel.basicConsume(testQueue,true,new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        dowork(name+"-"+consumerTag+"-comsumer: "+new String(body));
                    }
                });
            }

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }

    }
//具体的业务实现
    public static void dowork(String msg){
        System.out.println("msg = [" + msg + "]");
    }

三、广播,主题,路由的体验

1、广播

/**
     * 广播
     * 不会校验RoutingKey
     * 直接将消息发送到指定交换机的所有queue中
     */
    private static void testFaoutExchange() {
        MQProductor.declareExchange(MQConfig.type_exchange_faout,MQConfig.fanout_exchange_queue1_test,MQConfig.fanout_exchange_test,MQConfig.fanout_routingkey1_test);
        MQProductor.declareExchange(MQConfig.type_exchange_faout,MQConfig.fanout_exchange_queue2_test,MQConfig.fanout_exchange_test,MQConfig.fanout_routingkey2_test);
        for(int i=0;i<=100000;i++){
            if(i%2==0){
                MQProductor.publishExchange("faout-TestProductor[1]-"+i,MQConfig.fanout_exchange_test,MQConfig.fanout_routingkey1_test);
            }else {
                MQProductor.publishExchange("faout-TestProductor[2]-"+i,MQConfig.fanout_exchange_test,MQConfig.fanout_routingkey2_test);
            }
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

2、路由 精准匹配

 /**
     * routingkey路由 精准匹配
     */
    private static void testDirectExchange() {
        MQProductor.declareExchange(MQConfig.type_exchange_direct,MQConfig.direct_queue_test,MQConfig.direct_exchange_test,MQConfig.direct_routingkey_test);
        MQProductor.declareExchange(MQConfig.type_exchange_direct,MQConfig.direct_queue2_test,MQConfig.direct_exchange_test,MQConfig.direct_routingkey2_test);
        for(int i=0;i<=100000;i++){
            if(i%2==0){
                MQProductor.publishExchange("Direct-TestProductor[1]-"+i,MQConfig.direct_exchange_test,MQConfig.direct_routingkey_test);
            }else {
                MQProductor.publishExchange("Direct-TestProductor[2]-"+i,MQConfig.direct_exchange_test,MQConfig.direct_routingkey2_test);
            }
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

3、主题模式

/**
     * 主题模式
     * 主要是模糊匹配
     * 当生产者推送消息时,会在指定的交换机中通过Publish时指定的RoutingKey与已经绑定好queue的BindingKey进行模糊匹配,
     * 当匹配到所有符合BindingKey,就会将此消息推送到相应BindingKey绑定的queue中
     * BindingKey规则:
     * .# 表示匹配0个或多个. (如 #.topic 能匹配的routingKey(topci, xx.topic,xx.xx.topic等;topic.# 也类似))
     * .* 表示.个数要严格匹配(如*.topic 能匹配的routingKey(只能匹配 xx.topic,匹配不到 xx.xx.topic;topic.* 也类似))
     */
    private static void testTopic() {
        MQProductor.declareExchange(MQConfig.type_exchange_topic,MQConfig.topic_queue1_test,MQConfig.topic_exchange_test,MQConfig.topic_routingkey1_test);
        MQProductor.declareExchange(MQConfig.type_exchange_topic,MQConfig.topic_queue2_test,MQConfig.topic_exchange_test,MQConfig.topic_routingkey2_test);
        MQProductor.declareExchange(MQConfig.type_exchange_topic,MQConfig.topic_queue3_test,MQConfig.topic_exchange_test,MQConfig.topic_routingkey3_test);
        MQProductor.declareExchange(MQConfig.type_exchange_topic,MQConfig.topic_queue4_test,MQConfig.topic_exchange_test,MQConfig.topic_routingkey4_test);
        MQProductor.declareExchange(MQConfig.type_exchange_topic,MQConfig.topic_queue5_test,MQConfig.topic_exchange_test,MQConfig.topic_routingkey5_test);
        MQProductor.declareExchange(MQConfig.type_exchange_topic,MQConfig.topic_queue6_test,MQConfig.topic_exchange_test,MQConfig.topic_routingkey6_test);

        for(int j=0;j<=100000;j++){
            if(j%3==0){
                MQProductor.publishExchange("Topic-TestProductor[r1/r2]-"+j,MQConfig.topic_exchange_test,MQConfig.topic_routingkey_test+".2");
            }else if(j%3==1){
                MQProductor.publishExchange("Topic-Test r3/r4-TestProductor[r4]-"+j,MQConfig.topic_exchange_test,MQConfig.topic_routingkey_test);
            }else if(j%3==2){
                MQProductor.publishExchange("Topic-Test r3/r4-TestProductor[r5]-"+j,MQConfig.topic_exchange_test,"5."+MQConfig.topic_routingkey_test+".5"+".5");
            }
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

4、多消费者监听同一队列(若是要求每个消息只允许被消费一次话,必须自己实现一个类似锁机制的功能来控制,如果这个消息被消费过,下一个消费者就不能再消费了)

/**
     * 多个消费者消费一个queue
     */
    public static void multiComsumerToqueue(){
        new Thread(()->{
            MQComsumer.comsumer("multi-Comsumer1",MQConfig.queue_test);

        }).start();
        new Thread(()->{
            MQComsumer.comsumer("multi-Comsumer2",MQConfig.queue_test);

        }).start();
    }
# mybatis # Dubbo # Git # Https # ELK # Elasticsearch # logstash # Springboot # Nginx # Java # MQ
Springboot RabbitMQ 基础用法
MQ报错 not_a_dets_file recovery.dets
冰灯

冰灯

雁过无痕,敲出人生

23 日志
9 分类
11 标签
RSS
Github StackOverflow
Creative Commons
Links
  • 点滴记录
  • Zero
© 2021 冰灯