RabbitMQ(三)
2022-09-22 22:46:14

目录:
[TOC]

9.Routing(路由模型)

9.1模型认识

Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,某些场景下,我们希望不同的消息被不同的队列消费。

这时就要用到Direct类型的Exchange

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由Key)
  • 消息的发送方 在想Exchange发送消息时,也必须指定消息的RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的RoutingKey进行判断,只有队列的RoutingKey与消息的RoutingKey完全一致,才会接受到消息。

模型图

routing02

图解:

  • P:生产者,向Exchange发送消息,发送消息时,会指定一个RoutingKey
  • X:Exchange【交换机】,接受生产者的消息,然后把消息递交给RoutingKey完全匹配的队列
  • C1:消费者,其所在队列指定了需要RoutingKey为的error消息
  • C2:消费者,其所在队列指定了需要RoutingKeyinfowarningerror的消息

9.2 创建生产者

9.2.1 编码

这里与之前的区别主要是:需要指定RoutingKey(路由Key)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.RabbitMQUtils;
import java.io.IOException;

/**
* 路由模式 之 Direct
*/
public class Provider {
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
//声明交换机
//参数1 交换机的名称:随便起
//参数2 交换机的类型:固定的几种类型 direct,topic,headers 和fanout
channel.exchangeDeclare("logs_direct","direct");
//路由Key
String routingKey = "info";
//发送消息
channel.basicPublish("logs_direct",routingKey,null,("Info Message And routingKey is " + routingKey).getBytes());
RabbitMQUtils.closeConnetctionAndChannel(channel,connection);
}
}

9.2.2 运行结果

增加了一个logs_direct的交换机,类型为 direct

image-20201203144805945

9.3 创建消费者

9.3.1 编码

步骤:

  1. 通道声明交换机 channel.exchangeDeclare("logs_direct",BuiltinExchangeType.DIRECT);
  2. 创建一个临时队列 channel.queueDeclare().getQueue();
  3. 通道与队列绑定 queueBind(queueName,"logs_direct","error");

Consumer1(消费者1)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package direct;
import com.rabbitmq.client.*;
import utils.RabbitMQUtils;
import java.io.IOException;

/**
* 路由模式 之 Direct
* 消费者
* 获取路由Key为 info 的消息
*/
public class Consumer1 {
public static void main(String[] args) throws Exception{
String exchangeName = "logs_direct";
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
//通道声明交换机 参数1 交换机的名字 参数2 交换机的类型与生产者保持一致
// BuiltinExchangeType.DIRECT 是个枚举 就是 direct
channel.exchangeDeclare(exchangeName,BuiltinExchangeType.DIRECT);
//创建一个临时队列
String queueName = channel.queueDeclare().getQueue();
//基于RoutingKey绑定队列和交换机
//参数1 队列名称
//参数2 交换机名
//参数3 路由Key
channel.queueBind(queueName,exchangeName,"error");
channel.basicConsume(queueName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
System.out.println("Consumer1 get Message = > 【 " + new String(body) + " 】");
}
});
}
}

Consumer2(消费者2)

在队列绑定时多绑定了两种类型,其他与消费者1无区别

1
2
3
4
5
6
7
8
9
10
//基于RoutingKey绑定队列和交换机
//参数1 队列名称
//参数2 交换机名
//参数3 路由Key
String exchangeName = "logs_direct";
channel.queueBind(queueName,exchangeName,"info");
channel.queueBind(queueName,exchangeName,"warning");
channel.queueBind(queueName,exchangeName,"error");
//下面的代码,变成输出消费者2
System.out.println("Consumer2 get Message = > 【 " + new String(body) + " 】");

9.3.2 查看结果

  1. 先运行两个消费者等待消息,再运行生产者

Consumer1

1
#没有输出

Consumer2

1
Consumer2 get Message = > 【 Info Message And routingKey is 【info】 】

因为Consumer1只绑定了路由Key【error】,所以接受不到info的消息

channel.queueBind(queueName,"logs_direct","error");

  1. 改变生产者的路由Key为error运行,再来查看结果

Consumer1

1
Consumer1 get Message = > 【 Info Message And routingKey is 【error】 】

Consumer2

1
Consumer2 get Message = > 【 Info Message And routingKey is 【error】 】

因为Consumer1Consumer2 都绑定了路由Key【error】,所以都能接受到消息

channel.queueBind(queueName,"logs_direct","error");

路由Key【waring】同理。

10. Topic (主题模型)

10.1 模型认识

Topic类型的Exchange与Direct区别:

  • 相同点:都是可以根据RoutingKey把消息路由到不同的队列。

  • 不同点:Topic类型的Exchange可以让队列在绑定RoutingKey的时候使用通配符!**

这种模型RoutingKey一般都是由一个或者多个单词组成,多个单词之间以”.”分割。例如:item.insert

topic

简而言之,Topic就是对Direct类型的增强,使用了通配符匹配路由Key。

9.2 创建生产者

10.2.1 编码

代码没有什么改变,主要将Exchange改为Topic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package topic;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.RabbitMQUtils;

/**
* 主题模式
* 创建生产者
*/
public class Provider {
public static void main(String[] args) throws Exception{
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
//定义 路由key
String routingKey = "user.save";
//定义 路由名
String exchangeName = "topics";
//BuiltinExchangeType.TOPIC 是一个枚举
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC);
//发布消息
channel.basicPublish(exchangeName,routingKey,null,("TOPIC Message And routingKey is 【" + routingKey + "】").getBytes());
//关闭资源
RabbitMQUtils.closeConnetctionAndChannel(channel,connection);
}
}

10.2.2 运行结果

增加了一个topics的交换机,类型为 topic

topic01

10.3 创建消费者

10.3.1 编码

关键代码:使用通配符 ‘‘*“ 绑定路由key

String routingKey = "user.*";

channel.queueBind(queue,exchangeName,routingKey);

Consumer1(消费者1)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package topic;

import com.rabbitmq.client.*;
import utils.RabbitMQUtils;

import java.io.IOException;

/**
* Topic 主题模式
* 创建消费者
*/
public class Consumer1 {
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
String exchangeName = "topics";
//*(星号)可以代替一个单词。
//#(哈希)可以替代零个或多个单词。
//user.* 就代表 user 和 1个单词 (比如user.del user.save 但是 user.save.ok 不会匹配)
String routingKey = "user.*";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC);
//创建临时队列
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue,exchangeName,routingKey);
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
System.out.println("Consumer1 get Message = > 【 " + new String(body) + " 】");
}
});
}
}

Consumer2(消费者2)

使用通配符 ‘‘#“ 绑定路由key,其他重复代码就不贴了

1
2
String routingKey = "user.*";
channel.queueBind(queue,exchangeName,routingKey);

9.3.2 查看结果

  1. 先运行两个消费者等待消息,再运行生产者

  2. 生产者分别指定routingKeyuser.save , user.save.ok , user。运行了三次结果如下:

Consumer1

1
Consumer1 get Message = > 【 TOPIC Message And routingKey is 【user.save】 】

Consumer2

1
2
3
Consumer2 get Message = > 【 TOPIC Message And routingKey is 【user.save】 】
Consumer2 get Message = > 【 TOPIC Message And routingKey is 【user.save.ok】 】
Consumer2 get Message = > 【 TOPIC Message And routingKey is 【user】 】

因为Consumer1只绑定了路由Key 【user.*】;,所以接受不到第2、3条消息

Consumer2绑定了路由Key 【user.#】;,所以都能接受

也可以自己再尝试其他 routingKey 和 通配符进行结果的验证

Prev
2022-09-22 22:46:14
Next