目录: [TOC]
9.Routing(路由模型) 9.1模型认识
在Fanout 模式中,一条消息,会被所有订阅的队列都消费。但是,某些场景下,我们希望不同的消息被不同的队列消费。
这时就要用到Direct类型的Exchange
在Direct模型下:
队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey
(路由Key)
消息的发送方 在想Exchange发送消息时,也必须指定消息的RoutingKey
Exchange不再把消息交给每一个绑定的队列,而是根据消息的RoutingKey
进行判断,只有队列的RoutingKey
与消息的RoutingKey
完全一致,才会接受到消息。
模型图 :
图解:
P:生产者,向Exchange发送消息,发送消息时,会指定一个RoutingKey
X:Exchange【交换机】,接受生产者的消息,然后把消息递交给RoutingKey
完全匹配的队列
C1:消费者,其所在队列指定了需要RoutingKey
为的error
消息
C2:消费者,其所在队列指定了需要RoutingKey
为info
、warning
、error
的消息
9.2 创建生产者 9.2.1 编码 这里与之前的区别主要是:需要指定RoutingKey
(路由Key)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 package direct;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import utils.RabbitMQUtils;import java.io.IOException;public class Provider { public static void main (String[] args) throws Exception { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("logs_direct" ,"direct" ); String routingKey = "info" ; channel.basicPublish("logs_direct" ,routingKey,null ,("Info Message And routingKey is " + routingKey).getBytes()); RabbitMQUtils.closeConnetctionAndChannel(channel,connection); } }
9.2.2 运行结果 增加了一个logs_direct
的交换机,类型为 direct
9.3 创建消费者 9.3.1 编码
步骤:
通道声明交换机 channel.exchangeDeclare("logs_direct",BuiltinExchangeType.DIRECT);
创建一个临时队列 channel.queueDeclare().getQueue();
通道与队列绑定 queueBind(queueName,"logs_direct","error");
Consumer1(消费者1)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 package direct;import com.rabbitmq.client.*;import utils.RabbitMQUtils;import java.io.IOException;public class Consumer1 { public static void main (String[] args) throws Exception{ String exchangeName = "logs_direct" ; Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(exchangeName,BuiltinExchangeType.DIRECT); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName,exchangeName,"error" ); channel.basicConsume(queueName,true ,new DefaultConsumer (channel){ @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { super .handleDelivery(consumerTag, envelope, properties, body); System.out.println("Consumer1 get Message = > 【 " + new String (body) + " 】" ); } }); } }
Consumer2(消费者2)
在队列绑定时多绑定了两种类型,其他与消费者1无区别
1 2 3 4 5 6 7 8 9 10 String exchangeName = "logs_direct" ;channel.queueBind(queueName,exchangeName,"info" ); channel.queueBind(queueName,exchangeName,"warning" ); channel.queueBind(queueName,exchangeName,"error" ); System.out.println("Consumer2 get Message = > 【 " + new String (body) + " 】" );
9.3.2 查看结果
先运行两个消费者等待消息,再运行生产者
Consumer1
Consumer2
1 Consumer2 get Message = > 【 Info Message And routingKey is 【info】 】
因为Consumer1 只绑定了路由Key【error】,所以接受不到info的消息
channel.queueBind(queueName,"logs_direct","error");
改变生产者的路由Key为error
运行,再来查看结果
Consumer1
1 Consumer1 get Message = > 【 Info Message And routingKey is 【error】 】
Consumer2
1 Consumer2 get Message = > 【 Info Message And routingKey is 【error】 】
因为Consumer1 、Consumer2 都绑定了路由Key【error】,所以都能接受到消息
channel.queueBind(queueName,"logs_direct","error");
路由Key【waring】同理。
10. Topic (主题模型) 10.1 模型认识 Topic类型的Exchange与Direct区别:
这种模型RoutingKey一般都是由一个或者多个单词组成,多个单词之间以”.”分割。例如:item.insert
简而言之,Topic就是对Direct类型的增强,使用了通配符匹配路由Key。
9.2 创建生产者 10.2.1 编码 代码没有什么改变,主要将Exchange
改为Topic
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 package topic;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import utils.RabbitMQUtils;public class Provider { public static void main (String[] args) throws Exception{ Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); String routingKey = "user.save" ; String exchangeName = "topics" ; channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC); channel.basicPublish(exchangeName,routingKey,null ,("TOPIC Message And routingKey is 【" + routingKey + "】" ).getBytes()); RabbitMQUtils.closeConnetctionAndChannel(channel,connection); } }
10.2.2 运行结果 增加了一个topics
的交换机,类型为 topic
10.3 创建消费者 10.3.1 编码
关键代码:使用通配符 ‘‘*“ 绑定路由key
String routingKey = "user.*";
channel.queueBind(queue,exchangeName,routingKey);
Consumer1(消费者1)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 package topic;import com.rabbitmq.client.*;import utils.RabbitMQUtils;import java.io.IOException;public class Consumer1 { public static void main (String[] args) throws Exception { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); String exchangeName = "topics" ; String routingKey = "user.*" ; channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC); String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue,exchangeName,routingKey); channel.basicConsume(queue,true ,new DefaultConsumer (channel){ @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { super .handleDelivery(consumerTag, envelope, properties, body); System.out.println("Consumer1 get Message = > 【 " + new String (body) + " 】" ); } }); } }
Consumer2(消费者2)
使用通配符 ‘‘#“ 绑定路由key,其他重复代码就不贴了
1 2 String routingKey = "user.*" ;channel.queueBind(queue,exchangeName,routingKey);
9.3.2 查看结果
先运行两个消费者等待消息,再运行生产者
生产者分别指定routingKey 为user.save
, user.save.ok
, user
。运行了三次结果如下:
Consumer1
1 Consumer1 get Message = > 【 TOPIC Message And routingKey is 【user.save】 】
Consumer2
1 2 3 Consumer2 get Message = > 【 TOPIC Message And routingKey is 【user.save】 】 Consumer2 get Message = > 【 TOPIC Message And routingKey is 【user.save.ok】 】 Consumer2 get Message = > 【 TOPIC Message And routingKey is 【user】 】
因为Consumer1 只绑定了路由Key 【user.*】;,所以接受不到第2、3条消息
而Consumer2 绑定了路由Key 【user.#】;,所以都能接受
也可以自己再尝试其他 routingKey 和 通配符进行结果的验证