之前项目中只是单纯的使用其中的一种方式,这里介绍下其他的方式
一、生产者
1、创建服务连接
private static ConnectionFactory connectionFactory = null;
static {
connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.43.120");
connectionFactory.setPort(5672);
connectionFactory.setUsername("root");
connectionFactory.setPassword("root");
// connectionFactory.setVirtualHost("/");
}
2、定义队列
public static void declareExchange(String exchangeType,String testQueue, String testExchange, String testRoutingkey) {
try {
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(testExchange,exchangeType,true);
channel.queueDeclare(testQueue,false,false,false,null);
channel.queueBind(testQueue, testExchange, testRoutingkey);
System.out.println("declareExchange,exchangeType = [" + exchangeType + "], testQueue = [" + testQueue + "], testExchange = [" + testExchange + "], testRoutingkey = [" + testRoutingkey + "]");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
public static void declareQueue(String testQueue) {
try {
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(testQueue,false,false,false,null);
System.out.println("declareQueue,testQueue = [" + testQueue + "]");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
3、发布消息
public static void publishExchange(String msg, String testExchange, String testRoutingkey) {
try {
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.basicPublish(
testExchange,
testRoutingkey,
null,
msg.getBytes()
);
System.out.println("publishExchange,msg = [" + msg + "], testQueue = [" + MQConfig.queue_test + "], testExchange = [" + testExchange + "], testRoutingkey = [" + testRoutingkey + "]");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
public static void publishToQueue(String msg, String testQueue) {
try {
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.basicPublish(
"",
testQueue,
null,
msg.getBytes()
);
System.out.println("publishToQueuen,msg = [" + msg + "], testQueue = [" + testQueue + "]");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
二、消费者
public static void comsumer(String name,String testQueue) {
try {
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
while (true){
//自动确认回复
channel.basicConsume(testQueue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
dowork(name+"-"+consumerTag+"-comsumer: "+new String(body));
}
});
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
//具体的业务实现
public static void dowork(String msg){
System.out.println("msg = [" + msg + "]");
}
三、广播,主题,路由的体验
1、广播
/**
* 广播
* 不会校验RoutingKey
* 直接将消息发送到指定交换机的所有queue中
*/
private static void testFaoutExchange() {
MQProductor.declareExchange(MQConfig.type_exchange_faout,MQConfig.fanout_exchange_queue1_test,MQConfig.fanout_exchange_test,MQConfig.fanout_routingkey1_test);
MQProductor.declareExchange(MQConfig.type_exchange_faout,MQConfig.fanout_exchange_queue2_test,MQConfig.fanout_exchange_test,MQConfig.fanout_routingkey2_test);
for(int i=0;i<=100000;i++){
if(i%2==0){
MQProductor.publishExchange("faout-TestProductor[1]-"+i,MQConfig.fanout_exchange_test,MQConfig.fanout_routingkey1_test);
}else {
MQProductor.publishExchange("faout-TestProductor[2]-"+i,MQConfig.fanout_exchange_test,MQConfig.fanout_routingkey2_test);
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
2、路由 精准匹配
/**
* routingkey路由 精准匹配
*/
private static void testDirectExchange() {
MQProductor.declareExchange(MQConfig.type_exchange_direct,MQConfig.direct_queue_test,MQConfig.direct_exchange_test,MQConfig.direct_routingkey_test);
MQProductor.declareExchange(MQConfig.type_exchange_direct,MQConfig.direct_queue2_test,MQConfig.direct_exchange_test,MQConfig.direct_routingkey2_test);
for(int i=0;i<=100000;i++){
if(i%2==0){
MQProductor.publishExchange("Direct-TestProductor[1]-"+i,MQConfig.direct_exchange_test,MQConfig.direct_routingkey_test);
}else {
MQProductor.publishExchange("Direct-TestProductor[2]-"+i,MQConfig.direct_exchange_test,MQConfig.direct_routingkey2_test);
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
3、主题模式
/**
* 主题模式
* 主要是模糊匹配
* 当生产者推送消息时,会在指定的交换机中通过Publish时指定的RoutingKey与已经绑定好queue的BindingKey进行模糊匹配,
* 当匹配到所有符合BindingKey,就会将此消息推送到相应BindingKey绑定的queue中
* BindingKey规则:
* .# 表示匹配0个或多个. (如 #.topic 能匹配的routingKey(topci, xx.topic,xx.xx.topic等;topic.# 也类似))
* .* 表示.个数要严格匹配(如*.topic 能匹配的routingKey(只能匹配 xx.topic,匹配不到 xx.xx.topic;topic.* 也类似))
*/
private static void testTopic() {
MQProductor.declareExchange(MQConfig.type_exchange_topic,MQConfig.topic_queue1_test,MQConfig.topic_exchange_test,MQConfig.topic_routingkey1_test);
MQProductor.declareExchange(MQConfig.type_exchange_topic,MQConfig.topic_queue2_test,MQConfig.topic_exchange_test,MQConfig.topic_routingkey2_test);
MQProductor.declareExchange(MQConfig.type_exchange_topic,MQConfig.topic_queue3_test,MQConfig.topic_exchange_test,MQConfig.topic_routingkey3_test);
MQProductor.declareExchange(MQConfig.type_exchange_topic,MQConfig.topic_queue4_test,MQConfig.topic_exchange_test,MQConfig.topic_routingkey4_test);
MQProductor.declareExchange(MQConfig.type_exchange_topic,MQConfig.topic_queue5_test,MQConfig.topic_exchange_test,MQConfig.topic_routingkey5_test);
MQProductor.declareExchange(MQConfig.type_exchange_topic,MQConfig.topic_queue6_test,MQConfig.topic_exchange_test,MQConfig.topic_routingkey6_test);
for(int j=0;j<=100000;j++){
if(j%3==0){
MQProductor.publishExchange("Topic-TestProductor[r1/r2]-"+j,MQConfig.topic_exchange_test,MQConfig.topic_routingkey_test+".2");
}else if(j%3==1){
MQProductor.publishExchange("Topic-Test r3/r4-TestProductor[r4]-"+j,MQConfig.topic_exchange_test,MQConfig.topic_routingkey_test);
}else if(j%3==2){
MQProductor.publishExchange("Topic-Test r3/r4-TestProductor[r5]-"+j,MQConfig.topic_exchange_test,"5."+MQConfig.topic_routingkey_test+".5"+".5");
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
4、多消费者监听同一队列(若是要求每个消息只允许被消费一次话,必须自己实现一个类似锁机制的功能来控制,如果这个消息被消费过,下一个消费者就不能再消费了)
/**
* 多个消费者消费一个queue
*/
public static void multiComsumerToqueue(){
new Thread(()->{
MQComsumer.comsumer("multi-Comsumer1",MQConfig.queue_test);
}).start();
new Thread(()->{
MQComsumer.comsumer("multi-Comsumer2",MQConfig.queue_test);
}).start();
}