RabbitMQ(二)
2022-09-22 22:46:10

目录:
[TOC]


上一篇,介绍了Rabbit的概念和安装。这一篇来说MQ的工作模型

6.HelloWorld(直连模型)

官网给的向导有这么一张图

img

咱们就先来学习一下第一个HelloWorld(直连模型),模型是这样的:

image20201201172023861.png

在上述模型中有以下概念:

  • P:生产者,发送消息的程序
  • C:消费者,取出消息,一直等待消息的到来
  • queue:消息队列,类似一个邮箱,可以缓存消息。生产者向其中投递消息,消费者从中取出消息

使用场景:比如注册用户发短信的时候,可以使用这种简单的模型

6.1 准备工作

首先,需要

  • 一个用户。本例是admin
  • 一个虚拟主机(且这个用户有权访问这个虚拟主机)。本例是/ems

假如是按着上面操作到这里的,这些已经完成了。(消息队列不需要建)

另:我是把MQ部署在了我的Linux服务器上,所以Linux服务器还要开放5672端口,不然会导致:

==> java 连接消息队列 Connection timed out: connect 的异常

6.2 创建生产者

6.2.1 编码

  1. 创建一个Maven项目,目录结构如下:
    image-20201201161900873

  2. POM

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    <dependencies>
    <dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.9.0</version>
    </dependency>
    <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.13</version>
    </dependency>
    <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-simple</artifactId>
    <version>1.7.25</version>
    <scope>compile</scope>
    </dependency>
    </dependencies>
  3. 写个Provider类,在helloworld包下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    package helloworld;

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import org.junit.Test;

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    public class Provider {

    //生产消息
    @Test
    public void testSendMessage() throws IOException, TimeoutException {

    //创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    //设置连接rabbitmq的主机
    connectionFactory.setHost("39.105.45.81");
    //设置连接的端口号
    connectionFactory.setPort(5672);
    //设置连接的虚拟主机
    connectionFactory.setVirtualHost("/ems");
    //设置访问虚拟主机的用户名密码
    connectionFactory.setUsername("admin");
    connectionFactory.setPassword("你的密码");

    //获取连接对象
    Connection connection = connectionFactory.newConnection();
    //获取连接的通道对象
    Channel channel = connection.createChannel();
    //通道绑定消息队列
    //参数1:队列名称 如果不存在则还会自动创建
    //参数2:durable 用来定义队列特性:队列(不包括消息)是否要持久化 true 持久化队列 false不持久化
    // (若持久化,在mq重启后还队列会存在,false则就不存在了) 注意:无论true false队列里的消息都会消失
    //参数3:exclusive 是否独占队列,true 这个队列只允许当前连接可用 false其他连接也可用
    //实际开发一般设为 false,期望多个连接共享一个队列
    //参数4:autoDelete 是否在消费完成后自动删除队列 true自动删除 false 不知道删除
    //当为true时,1.全部消费完成后,2.且没有消费者进行监听(断开连接)时,==》队列会被自动删除
    //参数5 额外附加参数
    //这里只是声明队列,消息不一定是向这里发送的。生产者和消费者要保持一致
    channel.queueDeclare("hello",false,false,false,null);

    //发布消息
    //参数1:交换机名称 参数2 队列名称 参数3 传递消息额外设置 参数4 消息的具体内容
    //参数2:这才是消息真正发送到的队列
    //参数3 为:MessageProperties.PERSISTENT_TEXT_PLAIN 表示消息持久化(前提队列也要持久化),然后重启MQ,消息依旧存在
    channel.basicPublish("","hello",null,"hello rabbitmq".getBytes());
    channel.close();
    connection.close();
    }

    /**声明消息队列的源代码注释
    * Like {@link Channel#queueDeclare(String, boolean, boolean, boolean, java.util.Map)} but sets nowait
    * flag to true and returns no result (as there will be no response from the server).
    * @param queue the name of the queue
    * @param durable true if we are declaring a durable queue (the queue will survive a server restart)
    * @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
    * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
    * @param arguments other properties (construction arguments) for the queue
    * @throws java.io.IOException if an error is encountered
    */
    // void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete,
    // Map<String, Object> arguments) throws IOException;


    }

6.2.2 查看结果

  • 运行

image20201201162432634.png

  • 控制台结果

    1
    2
    #程序没有报异常,正常结束就说明成功了^_^
    Process finished with exit code 0
  • 管理界面显示结果

    img

我运行了两次,所以消息总数是2。

6.3 创建消费者

6.3.1 编码

创建一个Consumer类,也是在helloworld包下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package helloworld;

import com.rabbitmq.client.*;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {

public static void main(String[] args) throws IOException, TimeoutException {
{

//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("39.105.45.81");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/ems");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("qeeq741");

//创建连接对象
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello",false,false,false,null);
//消费消息
//参数1: 消费的的那个队列的名称
//参数2:开始消息的自动确认机制
//参数3:消费时的回调接口
channel.basicConsume("hello",true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
System.out.println("body = " + new String(body));
}
});
//假如不关闭就可以一直消费消息
// channel.close();
// connection.close();
}
}

//这里就不能在Test里写了
@Test
public void testConsumer() throws IOException, TimeoutException {}
}

6.3.2 查看结果

1
2
3
4
5
body = hello rabbitmq
body = hello rabbitmq
#再运行一次 Provider 发送“body = hello rabbitmq 第三个”
#消费者也会收到这个新消息
body = hello rabbitmq 第三个

注意在Consumer里,连接是没有关闭的。
这时可以再运行一次生产者,消费者也能立即消费这个新消息。

6.4 优化代码

由于生产者和和消费者都来创建ConnectionFactory,比较浪费时间,又是相同的冗余代码。

这里将创建工厂,和关闭资源的方法封装成了RabbitMQUtils

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package utils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitMQUtils {

public static ConnectionFactory connectionFactory;
static {
//重量级及资源, 类加载时只执行一次
connectionFactory = new ConnectionFactory();
connectionFactory.setHost("39.105.45.81");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/ems");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("qeeq741");
}

//提供创建连接的方法
public static Connection getConnection(){
try {
return connectionFactory.newConnection();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
//关闭连接的方法
public static void closeConnetctionAndChannel(Channel channel,Connection connection){
try {
if(channel!=null) {
channel.close();
}
if(connection!=null) {
connection.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

所以Provider、Consumer就可以直接调用RabbitMQUtils了。替换比较简单,这里就不再粘贴代码了。
我的Gitee会有这部分的代码,可以在那里查看。

7. Work Queues(工作队列)

7.1 模型认识

第二个 Work Queues(工作队列)模型是这样的:

image20201202151120941.png

与第一种模型很相似,只不过有多个消费者来共同处理队列里的消息。

使用场景:一般在处理队列的任务比较耗时时使用这种模型

7.2 编码

创建生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
//获取连接对象
Connection connection = RabbitMQUtils.getConnection();
//获取通道对象
Channel channel = connection.createChannel();
//通过通道,声明工作队列
channel.queueDeclare("work",true,false,false,null);
//循环生产消息
for (int i = 0; i < 10; i++) {
channel.basicPublish("","work",null,("Message 【" + i + "】").getBytes());
}
System.out.println("send ok");
//关闭资源
RabbitMQUtils.closeConnetctionAndChannel(channel,connection);

创建消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
**
* 第一个消费者
*/
public class Consumer1 {
public static void main(String[] args) throws Exception {
//获取连接对象
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work",true,false,false,null);
channel.basicConsume("work",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
System.out.println(Thread.currentThread().getName() + " Consumer1 get message = " + new String(body));
}
});
}
}

/**
* 第二个消费者
*/
public class Consumer2 {
public static void main(String[] args) throws Exception {
//获取连接对象
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work",true,false,false,null);
channel.basicConsume("work",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
System.out.println(Thread.currentThread().getName() + " Consumer2 get message = " + new String(body));
}
});
}
}

7.3 查看结果

最好先运行两个消费者,在运行生产者。效果明显。

生产者

1
2
3
#发送完毕
send ok
Process finished with exit code 0

消费者1

1
2
3
4
5
pool-1-thread-4 Consumer1 get message = Message 【0】
pool-1-thread-4 Consumer1 get message = Message 【2】
pool-1-thread-4 Consumer1 get message = Message 【4】
pool-1-thread-4 Consumer1 get message = Message 【6】
pool-1-thread-4 Consumer1 get message = Message 【8】

消费者2

1
2
3
4
5
pool-1-thread-4 Consumer2 get message = Message 【1】
pool-1-thread-4 Consumer2 get message = Message 【3】
pool-1-thread-4 Consumer2 get message = Message 【5】
pool-1-thread-4 Consumer2 get message = Message 【7】
pool-1-thread-4 Consumer2 get message = Message 【9】

消费者1:0 2 4 6 8

消费者2:1 3 5 7 9

当我们不断改变生产者,发送消息的数量时,发现消费者1和消费者2总是一个挨着一个的消费消息。

官网的解释:

By default, RabbitMQ will send each message to the next consumer, in sequence. On average every consumer will get the same number of messages. This way of distributing messages is called round-robin. Try this out with three or more workers.

翻译:

默认情况下,RabbitMQ将按顺序将每条消息发送到下一个使用者。平均每个消费者将收到相同数量的消息。这种分发消息的方式称为循环(轮询)。和三个或更多的工人一起试试这个

也就是说多个消费者的情况,默认是一个替着一个循环,平均消费队列里的消息的。

比如队列里有消息:123456789… 有三个消费者
消费者1:147..
消费者2:258..
消费者3:369..
多个消费者 以此类推。

8. Fanout(扇出|广播)

第三个 Work Fanout(广播模式 模型是这样的:

fanout

在广播模式下,有以下约定:

  • 可以有多个消费者
  • 每个消费者有自己的Queue(队列)
  • 每个队列都要绑定到Exchange(交换机)
  • 生产者发送的消息,只能发送到交换机,交换机来决定要发送给哪个队列,生产者无法决定
  • 交换机把消息发给绑定过的所有队列
  • 队列的消费者都能拿到消息。实现一条消息被多个消费者消费

数据的流向如图的箭头所示。

官网的解释:

让我们快速回顾一下先前教程中介绍的内容:

  • 生产者是发送消息的用户的应用程序。
  • 队列是一个缓冲区,用于存储消息。
  • 消费者是接收消息的用户的应用程序。

RabbitMQ消息传递模型中的核心思想是生产者从不将任何消息直接发送到队列。实际上,生产者经常甚至根本不知道是否将消息传递到任何队列。

相反,生产者只能将消息发送到交换机。交流是一件非常简单的事情。一方面,它接收来自生产者的消息,另一方面,将它们推入队列。交换必须确切知道如何处理收到的消息。是否应将其附加到特定队列?是否应该将其附加到许多队列中?还是应该丢弃它。规则由交换类型定义 。

img

有几种交换类型可用:direct,topic,headers 和fanout。

8.1 创建生产者

8.1.1 编码

生产者的编码和以前的区别:

  1. 需要绑定交换机 channel.exchangeDeclare("logs","fanout");
  2. 消息放入交换机中 channel.basicPublish("logs","",null,"my fanout message".getBytes());
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package fanout;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.RabbitMQUtils;
import java.io.IOException;

/**
* 扇出模式的生产者
*/
public class Provider {
public static void main(String[] args) throws Exception {
//获取连接
Connection connection = RabbitMQUtils.getConnection();
//获取通道
Channel channel = connection.createChannel();
//将通道声明交换机
//参数1 交换机的名称:随便起
//参数2 交换机的类型:固定的几种类型 direct,topic,headers 和fanout
channel.exchangeDeclare("logs","fanout");
//发送消息
//广播模式时参数的填写
// 参数1 交换机名称要写
// 参数2 路由key不需要指定,空字符串就行
channel.basicPublish("logs","",null,"my fanout message".getBytes());
//关闭资源
RabbitMQUtils.closeConnetctionAndChannel(channel,connection);

}
}

与之前不同的主要是这两处代码:

  • channel.exchangeDeclare("logs","fanout");声明交换机
  • channel.basicPublish("logs","",null,"my fanout message".getBytes());参数设置

8.1.2 运行结果

image-20201203125509743

然后再界面中 显示出了一个新的交换机logs。type为fanout

8.2 创建消费者

8.2.1 编码

实际代码,一共创建了三个消费者 Consumer1 Consumer2Consumer3

这里只贴出Consumer1 的代码,其他两个消费者类似

消费者的编码和以前的区别:

  1. 需要绑定交换机 channel.exchangeDeclare("logs","fanout");
  2. 创建临时队列 channel.queueDeclare().getQueue();
  3. 绑定临时队列 channel.queueBind(queueName,"logs","");
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package fanout;

import com.rabbitmq.client.*;
import utils.RabbitMQUtils;

import java.io.IOException;

/**
* 扇出模式的消费者
*/
public class Consumer1 {
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
//通道绑定交换机
channel.exchangeDeclare("logs","fanout");
//临时队列
String queueName = channel.queueDeclare().getQueue();
//绑定交换机和队列
channel.queueBind(queueName,"logs","");
//消费消息
channel.basicConsume(queueName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
System.out.println("Consumer1 get Message ==> 【 " + new String(body) + " 】");
}
});
}
}

写其他消费者时,这句话打印的消费者记得换成2和3,以免造成误会

System.out.println("Consumer1 get Message ==> 【 " + new String(body) + " 】");

8.2.2 查看结果

**Consumer1 **

1
Consumer1 get Message ==>  【 my fanout message 】

**Consumer2 **

1
Consumer2 get Message ==>  【 my fanout message 】

**Consumer3 **

1
Consumer3 get Message ==>  【 my fanout message 】

也就是说三个消费者都收到了消息队列里的同一条消息


  • 以上就是Helloworld、WorkQueue、Fanout模式的介绍
  • 下一篇 来介绍路由模式
Prev
2022-09-22 22:46:10
Next