RabbitMQ官网的消息模型的第三种-发布/订阅模式

发布/订阅

(using the Go RabbitMQ client)

​ 在上一个教程中我们创建了一个work queue。work queue背后的假设是每个任务都只被交付给一个worker。在这一部分,我们将做一些完全不同的事情—我们将向多个消费者传递一条消息。这种模式被称为“发布/订阅”。

为了说明该模式,我们将构建一个简单的日志系统。它将由两个程序组成—第一个将发出日志消息,第二个将接收并打印它们。

在我们的日志系统中, receiver程序的每个运行副本都会收到消息。这样我们就可以运行一个 receiver并将日志定向到磁盘;同时我们将能够运行另一个 receiver并在屏幕上查看日志。

本质上,发布的日志消息将被广播给所有 receiver(接收者)。

Exchanges (交换机)

在本教程的前面部分中,我们向队列发送消息和从队列接收消息。现在是时候在Rabbit中引入完整的消息传递模型了。

让我们快速回顾一下先前教程中介绍的内容:

  • 生产者是发送消息的用户应用程序。
  • 队列是存储消息的缓冲区。
  • 消费者是接收消息的用户应用程序。

RabbitMQ消息传递模型中的核心思想是生产者从不将任何消息直接发送到队列。实际上,生产者经常甚至根本不知道是否将消息传递到任何队列。

相反,生产者只能将消息发送到交换机。交换机是非常简单的东西。一方面,它接收来自生产者的消息,另一方面,将它们推入队列。交换机必须确切知道如何处理接收到的消息。它应该被附加到特定的队列吗?还是应该将其附加到许多队列中?或者它应该被丢弃。这些规则由交换机的类型定义。

有几种交换机类型可用:direct, topic, headersfanout。我们将着重讨论最后一个——fanout。让我们创建一个这种类型的交换机,并给它起个名字叫logs

1
2
3
4
5
6
7
8
9
err = ch.ExchangeDeclare(
  "logs",   // name
  "fanout", // type
  true,     // durable
  false,    // auto-deleted
  false,    // internal
  false,    // no-wait
  nil,      // arguments
)

第三种模型(fanout)

fanout 扇出, 也称为广播;

Putting it all together;

在广播模式下, 消息发送流程 是这样的:

  • 可以有多个消费者;
  • 每个消费者有自己的queue(队列)
  • 每个队列都要绑定到Exchange(交换机)
  • 生产者发送的消息,只能发送到交换机, 交换机来决定要发给哪个队列,生产者无法决定
  • 交换机把消息发送给绑定过的所有队列
  • 队列的消费者都能拿到消息。实现一条消息被多个消费者消费

生产者创建交换机的类型必须是"fanout"类型;

交换机清单

要列出服务器上的交换机, 我们可以执行rabbitmqctl命令:

1630724536420

在此列表中,将有一些amq.*交换机和一个默认的(未命名)交换机。这些是默认创建的,但是你现在不太可能需要使用它们。

默认交换器

在本教程的前面部分中,我们还不知道交换机的存在,但仍然能够将消息发送到队列。之所以能这样做,是因为我们使用的是默认交换机,该交换机由空字符串("")标识。

回想一下我们之前是怎么发布消息的:

1
2
3
4
5
6
7
8
9
err = ch.Publish(
   "",     // exchange
   q.Name, // routing key
   false,  // mandatory
   false,  // immediate
   amqp.Publishing{
     ContentType: "text/plain",
     Body:        []byte(body),
 })

在这里,我们使用的是默认或者无名称的交换机, 消息将以route_key参数指定的名称路由到队列(如果存在)。

现在,我们可以改为发布到我们的命名交换机:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
err = ch.ExchangeDeclare(
  "logs",   // 使用命名的交换器
  "fanout", // 交换器类型
  true,     // durable
  false,    // auto-deleted
  false,    // internal
  false,    // no-wait
  nil,      // arguments
)
failOnError(err, "Failed to declare an exchange")

body := bodyFrom(os.Args)
err = ch.Publish(
  "logs", // exchange
  "",     // routing key
  false,  // mandatory
  false,  // immediate
  amqp.Publishing{
          ContentType: "text/plain",
          Body:        []byte(body),
  })

临时队列

​ 之前我们使用的是具有特定名称的队列(用过的"hello"和"work"),能够命名队列对我们来说至关重要——我们需要将Worker指向同一个队列。当你想在生产者和消费者之间共享队列时,给队列一个名称非常重要。

​ 但对于我们的记录器来说,情况并非如此。我们希望收到所有日志消息,而不仅仅是它们的一部分。我们也只对当前正在发送的消息感兴趣,而对旧消息不感兴趣。为了解决这个问题,我们需要两件事。

​ 首先,当我们连接到Rabbit时,我们需要一个新的、空的队列。为此,我们可以创建一个随机名称的队列,或者更好的方法是让服务器为我们选择一个随机队列名称。其次,一旦我们断开消费者的连接,队列就会自动删除。

​ 在amqp客户端中,当我们传递一个空字符串作为队列名称时,我们将使用随机生成的名称创建一个非持久队列:

1
2
3
4
5
6
7
8
q, err := ch.QueueDeclare(
  "",    // name, 空字符串作为队列名称
  false, // durable, 非持久队列
  false, // delete when unused
  true,  // exclusive, 独占队列,当前声明的队列连接关闭后即被删除
  false, // no-wait
  nil,   // arguments
)

上述方法返回时,生成的队列实例包含RabbitMQ生成的随机队列名称。例如,它可能看起来像amq.gen-JzTY20BRgKO-HjmUJj0wLg

当声明它的连接关闭时,该队列将被删除,因为它被声明为独占。

你可以在队列指南中了解有关exclusive标志和其他队列属性的更多信息。

绑定

我们已经创建了一个扇出交换机和一个队列。现在我们需要告诉交换机将消息发送到我们的队列。交换机和队列之间的关系称为绑定

1
2
3
4
5
6
7
err = ch.QueueBind(
  q.Name, // queue name
  "",     // routing key
  "logs", // exchange
  false,
  nil,
)

从现在开始,logs交换机将会把消息添加到我们的队列中。

列出绑定关系

rabbitmqctl list_bindings

产生日志消息的生产程序与上一教程看起来没有太大不同,最重要的变化是,我们现在希望把消息发布到’logs’交换机上;

1630744370426

1630744396752

1630744446529

​ 如我们所见,在我们建立了连接之后,声明了交换机。此步骤是必须的,因为禁止发布到不存在的交换机;

如果没有队列绑定到交换机,那么消息将丢失,但这对我们来说是ok的。如果没有消费者在接收,我们可以安全地丢弃该消息。

​ 使用rabbitmqctl list_bindings命令,你可以验证代码是否确实根据需要创建了绑定关系和队列。在运行两个consumer1.goconsumer2.go程序后,你应该看到类似以下内容:

1630744878390

对结果的解释很简单:数据从logs交换机进入了两个由服务器分配名称的队列。这正是我们想要的。

参考资料

  1. 李文周老师 Go语言客户端教程3
  2. RabbitMQ官网 发布订阅模式-Go客户端
  3. MQ消息中间件之RabbitMQ