Messaging and Integration Patterns(消息传递和相应集成模式)
Fundamentals of a messaging system(一个消息系统的基础)
一般来说消息系统有以下 4 个基础:
- 消息传递的方向,是单向的还是“请求/响应”的方向。
- 消息的目的,这也决定了消息的内容
- 消息的时间性,是同步还是异步
- 消息的分发,是直接发送还是通过代理
Message types(消息的类型)
Command Message(命令消息)
这类消息的目的是在接收方执行一个动作或任务,因此消息通常包含了执行任务所需要的参数(操作的名称+运行的参数)。命令消息可以用来实现 RPC
、分布式计算、请求数据等。RESTful HTTP
请求就是命令的例子,HTTP
的动作都有特定的含义:GET
表示获取资源,POST
表示创建新资源,PUT
表示更新,DELETE
表示删除。
Event Message(事件消息)
事件消息是用来通知其他组件有事情发横了,通常包含了事件的类型,甚至有时候包含了一些细节如上下文、创建者。
Document Message(文档消息)
主要用于组件和机器之间传递数据。和命令消息的区别就是不含如何处理命令的任何内容,和事件消息的区别就是没有特定的事件的关联。通常对命令消息的回复是文档消息。
Asynchronous messaging and queues(异步消息队列)
异步消息类似于 SMS
,发邮件时不要求对方已连接互联网,可能立即或者一段时间后收到回复,甚至是没有回复,这样我们就可以不用等待回复继续发送下一封邮件。总之,就是以更少的资源获得更好的并发性。
另一个优点是可以将消息存储并尽快或延迟发送,当接收方太忙或者我们想要保证传送时非常有用。这可以使用消息队列来实现:
Peer-to-peer or broker-based messaging(点对点通信或基于代理的消息)
消息可以点对点直接发送,也可以使用中心化的消息代理系统转发。
在点对点体系中要求每个节点知道接收方得地址和端口,并且得使用相同的协议和消息格式,限制太多;而消息代理则不需要,每个节点完全独立,还可以有多种协议。
Publish/subscribe pattern(发布/订阅 模式)
”发布/订阅“ 模式也有点对点架构和代理架构:
Building a minimalist real-time chat application(构建一个微型实时聊天应用)
为了展示 “发布/订阅” 模式如何帮助我们构建一个分布式架构,我们通过构建一个简单的基于 WebSockets
实时聊天应用来说明。
Implementing the server side
1 | const WebSocketServer = require('ws').Server |
Implementing the client side
1 |
|
Running and scaling the chat application(运行并扩展聊天应用)
1 | node app 8080 |
启动两个服务,当在一个客户端发送消息时,只是将消息广播给本地连接的客户端了,另一个连着另外一个服务器的客户端并收不到消息,这和我们的意愿是相悖的。
Using Redis as a message broker(使用 Redis 作为消息代理)
了解 Redis
请查看 http://redis.io/topics/quickstart。
修改服务器代码,并引入 redis
模块,代码可查看 []:
1 | const WebSocketServer = require('ws').Server |
然后再启动多个服务就发现可以正常通信了。
详细代码可以查看 https://github.com/1016990109/front_end_practice/tree/master/src/node/design-pattern/basic-chat
Peer-to-peer publish/subscribe with ØMQ(使用 ØMQ 实现点对点发布订阅)
Introducing ØMQ
在 ØMQ
中,我们有两种专门为此设计的套接字:PUB
和 SUB
。典型的模式是将 PUB
套接字绑定到一个端口,该端口将开始侦听来自其他 SUB
套接字的订阅。订阅可以有一个过滤器,指定将传递到 SUB
套接字的消息。该过滤器是一个简单的二进制缓冲区(所以它也可以是一个字符串),它将与消息的开头(这也是一个二进制缓冲区)相匹配。当通过 PUB
套接字发送一条消息时,它将被广播到所有连接的 SUB
套接字,但仅在应用了它们的订阅过滤器之后。仅当使用连接的协议时,过滤器才会应用到发布方,例如 TCP
。
Designing a peer-to-peer architecture for the chat server(为聊天服务设计一个点对点架构)
Using the ØMQ PUB/SUB sockets(使用 ØMQ 发布订阅套接字)
修改服务端代码:
1 | const WebSocketServer = require('ws').Server |
- 创建
pub
套接字,绑定到对应参数的端口。 - 创建
sub
套接字,连接到应用其他实例的pub
套接字。 - 然后通过
chat
作为过滤器创建实际的发布订阅。 - 当我们的
WebSocket
收到新小时时,广播给所有连接的客户端,也通过pub
套接字发布它。注意内容是chat ${msg}
前面加了chat
,所以发布到所有使用chat
订阅者。 sub
套接字收到消息则做处理,取得需要的消息,并广播到所有连接当前WebSocket
的客户端。
尝试运行:
1 | node app --http 8080 --pub 5000 --sub 5001 --sub 5002 |
详细代码可以查看 https://github.com/1016990109/front_end_practice/tree/master/src/node/design-pattern/chat-zmq
Durable subscribers(持久订阅者)
消息传递系统中的一个重要抽象是消息队列(MQ
)。对于消息队列,消息的发送者和接收者不需要同时处于活动状态和连接状态以建立通信,因为排队系统负责存储消息直到目的地能够接收他们。这种行为与 set and forget
范式相反,订户只能在消息系统连接期间才能接收消息。
一个能够始终可靠地接收所有消息的订阅者,即使是在没有收听这些消息时发送的消息,也被称为持久订阅者。
MQTT
协议为发送方和接收方之间交换的消息定义了服务质量(QoS
)级别。这些级别对描述任何其他消息系统(不仅仅是 MQTT )的可靠性也非常有用。如下描述:
QoS0
,最多一次:也被称为“设置并忘记”,消息不会被保留,并且传送未被确认。这意味着在接收机崩溃或断开的情况下,信息可能会丢失。QoS1
,至少一次:保证至少收到一次该消息,但如果在通知发件人之前接收器崩溃,则可能发生重复。这意味着消息必须在必须再次发送的情况下持续下去。QoS2
,正好一次:这是最可靠的QoS
; 它保证该消息只被接收一次。 这是以用于确认消息传递的更慢和更数据密集型机制为代价的。
Redis
的发布/订阅命令实现了一个设置和遗忘机制( QoS0
)。但是,Redis
仍然可以用于使用其他命令的组合来实现持久订阅者(不直接依赖其发布/订阅实现)。
Instroducing AMQP(AMQP 介绍)
AMQP
是许多消息队列系统支持的开放标准协议。除了定义通用通信协议外,它还提供了描述路由,过滤,排队,可靠性和安全性的模型。在 AMQP
中,有三个基本组成部分:
Queue
(队列):负责存储客户端消费的消息的数据结构。我们的应用程序推送消息到队列,供给一个或多个消费者。如果多个使用者连接到同一个队列,则这些消息会在它们之间进行负载平衡。 队列可以是以下之一:Durable
(持久队列) :这意味着如果代理重新启动,队列会自动重新创建。一个持久的队列并不意味着它的内容也被保留下来;实际上,只有标记为持久性的消息才会保存到磁盘,并在重新启动的情况下进行恢复。Exclusive
(专有队列) :这意味着队列只能绑定到一个特定的用户连接。当连接关闭时,队列被销毁。Auto-delete
(自动删除队列) :这会导致队列在最后一个用户断开连接时被删除。
Exchange
(交换机) :这是发布消息的地方。交换机根据它实现的算法将消息路由到一个或多个队列:Direct exchange
(直接交换机) :通过匹配路由键(例如,chat.msg
)整个消息来路由消息。Topic exchange
(主题交换机) :它使用与路由密钥相匹配的类似glob
的模式分发消息(例如,chat.#
匹配以chat
开始的所有路由密钥)。Fanout exchange
(扇出交换机) :它向所有连接的队列广播消息,忽略提供的任何路由密钥。
Binding
(绑定) :这是交换机和队列之间的链接。它还定义了路由键或用于过滤从交换机到达的消息的模式。
可以在
RabbitMQ
网站上找到AMQP
模型的详细介绍: https://www.rabbitmq.com/tutorials/amqp-concepts.html
下图展示了组件如何组合:
Durable subscribers with AMQP and RabbitMQ(使用 AMQP 和 RabbitMQ 实现持久订阅者)
现在让我们使用微服务方法扩展我们的小聊天应用程序。让我们添加一个历史记录服务,将我们的聊天消息保存在数据库中,这样当客户端连接时,我们可以查询服务并检索整个聊天记录。我们将使用 RabbitMQ broker
和 AMQP
将历史记录服务器与聊天服务器相集成。
下图显示了我们的架构:
我们将使用熟悉的 LevelUP
作为历史记录服务的存储引擎,而我们将使用 amqplib
,并通过 AMQP
协议连接到 RabbitMQ
。现在让我们实施我们的历史记录服务器!我们将创建一个独立的应用程序(典型的微服务),它在模块 historySvc.js
中实现。该模块由两部分组成:向客户端展示聊天记录的 HTTP
服务器,以及负责捕获聊天消息并将其存储在本地数据库中的 AMQP
使用者。
1 | const level = require('level') |
- 我们首先与
AMQP
代理建立连接,在我们的例子中是RabbitMQ
。然后,我们创建一个channel
,该channel
类似于保持我们通信状态的会话。 - 接下来,我们建立了我们的会话,名为
chat
。正如我们已经提到的那样,这是一种扇出交换机。assertExchange()
命令将确保代理中存在交换,否则它将创建它。 - 我们还创建了我们的队列,名为
chat_history
。默认情况下,队列是持久的;不是排他性的,也不会自动删除,所以我们不需要传递任何额外的选项来支持持久订阅者。 - 接下来,我们将队列绑定到我们以前创建的交换机。在这里,我们不需要任何其他特殊选项,例如路由键或模式,因为交换机是扇出类型的交换机,所以它不执行任何过滤。
- 最后,我们可以开始监听来自我们刚创建的队列的消息。我们将使用时间戳记作为密钥(https://npmjs.org/package/monotonic-timestamp)在
LevelDB
数据库中收到的每条消息保存,以保持消息按日期排序。看到我们使用channel.ack(msg)
来确认每条消息,并且只有在消息成功保存到数据库后,也很有趣。如果代理没有收到ACK
(确认),则该消息将保留在队列中以供再次处理。这是AMQP
将服务可靠性提升到全新水平的另一个重要特征。如果我们不想发送明确的确认,我们可以将选项{noAck:true}
传递给channel.consume()
API
。
然后更改我们的 app.js
1 | const WebSocketServer = require('ws').Server |
正如我们所提到的,我们的聊天服务器不需要成为持久的订阅者。所以当我们创建我们的队列时,我们传递选项 {exclusive:true}
,指示队列被限制到当前连接,因此一旦聊天服务器关闭,它就会被销毁。
启动应用:
1 | node app 8080 |
现在看看我们的系统,特别是历史服务如何在停机的情况下运行,这一点很有意思。如果我们停止历史记录服务器并继续使用聊天应用程序的 Web UI
发送消息,我们将会看到,当历史记录服务器重新启动时,它将立即收到所有错过的消息。
Pipelines and task distribution patterns(管道和任务分配模式)
有时候我们的需求并不是把消息传递给每一个消费者,例如 Map/Reduce
之类的架构,需要将 Map
的任务分配给不同的 worker
,再将最终结果组合。那么继续使用“发布/订阅”模式就不行了,这里我们介绍管道和任务分配模式:
The ØMQ fanout/fanin pattern (ØMQ 扇出/扇出模式)
我们已经发现了 ØMQ
在构建点对点分布式体系结构方面的一些优势。在前一节中,我们使用 PUB
和 SUB
套接字向多个消费者传播单个消息;现在我们将看到如何使用称为 PUSH
和 PULL
的另一对套接字来构建并行管道。
PUSH/PULL socket
直观地说,我们可以说 PUSH
套接字用于发送消息,而 PULL
套接字是用于接收的。这似乎是一个微不足道的组合;然而,它们有一些很好的特性,使它们成为构建单向通信系统的完美选择:
- 两者都可以在
connet
模式或bind
模式下工作。换句话说,我们可以构建一个PUSH
套接字并将其绑定到本地端口,以监听来自PULL
套接字的传入连接,反之亦然,PULL
套接字可以监听来自PUSH
套接字的连接。消息总是以相同的方向传播,从PUSH
到PULL
;它只是连接的发起者可能是不同的。绑定模式是耐用节点(例如任务生产者和接收器)的最佳解决方案,而连接模式对于瞬态节点(例如任务工作者)来说是完美的。这使得瞬时节点的数量可以任意变化,而不会影响其它正在使用的节点。 - 如果有多个
PULL
套接字连接到单个PUSH
套接字,则消息均匀分布在所有的PULL
套接字中;在实践中,它们是负载均衡的(点对点负载平衡!)。另一方面,从多个PUSH
套接字接收消息的PULL
套接字将使用公平排队系统处理消息,这意味着它们将从所有负载是均衡的。 - 通过没有任何连接
PULL
套接字的PUSH
套接字发送的消息不会丢失;他们排队等待生产者,直到一个节点联机并开始提取消息。
Building a distributed hashsum cracker with ØMQ(使用 ØMQ 构建一个分布式的 hashsum cracker)
hashsum cracker
,一个使用暴力破解技术来尝试将给定的 hashsum
(MD5
,SHA1
等) 与给定字母表中每个可能的字符变体进行匹配的系统。这个算法的负载量是很高的。
对于我们的应用程序,我们希望通过一个节点来实现典型的并行管道,以在多个 worker
之间创建和分配任务,以及一个节点来收集所有结果。我们刚刚描述的系统可以使用以下体系结构在 ØMQ
中实现:
在我们的体系结构中,我们有一个 ventilator
,用于生成给定字母表中所有可能的字符变体,并将它们分发给一组 worker
,然后计算每个给定变体的哈希函数并尝试将其与输入的哈希函数进行匹配。如果找到匹配项,则结果将发送到结果收集器节点(sink
)。
重点是 ventilator
和 sink
,而 worker
节点是随时在变化中的。这意味着每个 worker
将其 PULL
套接字连接到 ventilator
,并将其 PUSH
套接字连接到 ventilator
;通过这种方式,我们可以在不改变 ventilator
和 sink
中的任何参数的情况下,启动和停止我们想要的 worker
数量。
Implementing the ventilator
1 | const zmq = require('zmq') |
如何给 worker
分配任务:
- 我们首先创建一个
PUSH
套接字,并将其绑定到本地端口 5000 ;这是worker
的PULL
套接字将连接以接收任务的地方。 - 我们将每个批次生成的变体进行分组,然后制作一条消息,其中包含匹配的散列和要检查的一批单词。这实质上是
worker
将接受的任务对象。当我们通过ventilator
套接字调用send()
时,消息将按循环分配传递给下一个可用的worker
。
Implementing worker
1 | const zmq = require('zmq') |
正如我们所说的,我们的 worker
在我们的体系结构中代表了一个临时节点,因此,它的套接字应连接到远程节点,而不是侦听传入连接。这正是我们在 worker
中所做的,我们创建了两个套接字:
- 连接到
ventilator
的PULL
套接字 - 用于接收任务连接到接收器的
PUSH
套接字,用于传播结果
除此之外,我们的 worker
完成的工作非常简单:对于收到的每条消息,我们迭代它包含的一批单词,然后对每个单词计算 SHA1
校验和,并尝试将其与针对消息传递的 searchHash
进行匹配。当找到匹配时,结果被转发到接收器。
Implementing sink
1 | const zmq = require('zmq') |
我们现在准备运行我们的应用程序;让我们开始几个 worker
和 sink
:
1 | node worker |
然后启动 ventilator
,指定要生成的单词的最大长度以及我们希望匹配的 SHA1
校验和。以下是参数的示例列表:
1 | node ventilator 4 f8e966d1e207d02c44511a58dccff2f5429e9a3b |
当运行上述命令时,ventilator
将开始生成所有可能的单词,其长度至多为四个字符,并将它们分配给我们开始的工作人员,以及我们提供的校验和。计算结果(如果有的话)将显示在接收器应用程序的终端中。
Pipelines and competing consumers in AMQP(AMQP 中管道和竞争消费者)
Point-to-point communications and competing consumers(点对点通信和竞争消费者)
在点对点的配置中,管道模式是非常容易理解的,但是如果使用消息代理,那么就很难去理解各个节点之间的关系了,我们并不知道另一边是谁在监听消息。如果我们想要使用 AMQP
来实现管道和任务分配模式,我们就必须确保每一个消息只能被一个消费者接收,但是这在交换机与多个队列绑定时是无法保证的。所以解决方案就是,将消息直接发给某一个队列,这样就能保证只有一个队列能接收到消息了。这样我们的目标就已经实现一半了。
Implementing the hashsum cracker with AMQP
Implementing the producer
1 | const amqp = require('amqplib') |
Implementing the worker
1 | const amqp = require('amqplib') |
Implementing the result collector
1 | const amqp = require('amqplib') |
Running the application
运行应用:
1 | node worker |
Request/reply patterns
单向异步消息可以在并发行和效率上带来很大帮助,但有时候我们也需要一个“请求/回复”模式来帮助我们解决剩下的问题。
Correlation identifier(相关 ID)
该模式包括标记每个请求的标识符(ID
),然后由接收方附加到响应中;通过这种方式,请求的发送者可以关联这两个消息并将响应返回给正确的处理程序。这优雅地解决了存在单向异步通道的问题,消息可以随时在任何方向传播:
Implementing a request/reply abstraction using correlation identifier(使用关联 ID 实现 “请求/回复” 抽象)
Abstracting request(抽象请求)
1 | const uuid = require('node-uuid') |
request
函数创建一个闭包。该模式的神奇之处在于idToCallbackMap
变量,它存储了传出请求与其回复处理程序之间的关联。- 一旦工厂被调用,我们所做的第一件事就是开始监听收到的消息。如果消息的关联
ID
(包含在inReplyTo
属性中)与idToCallbackMap
变量中包含的任何ID
相匹配,我们知道我们刚收到一个回复,因此我们获得了对相关响应处理程序的引用,并且用 消息中包含的数据。 - 最后,我们返回我们将用来发送新请求的函数。 其工作是使用
node-uuid
生成关联ID
,然后将请求数据包装起来,并指定关联ID correlationId
和消息类型type
。
Abstracting reply(抽象回复)
1 | module.exports = channel => { |
Trying the full request/reply cycle(尝试完整的 “请求/回复” 环路)
1 | const replier = require('child_process').fork(`${__dirname}/replier.js`) |
至此我们的应用就完成了。
Return address(返回地址)
关联 ID
是在单向信道之上创建 “请求/回复” 通信的基本模式;然而,当我们的消息架构拥有多个通道或队列,或者可能有多个请求者时,这还不够。在这些情况下,除了关联 ID
之外,我们还需要知道返回地址,这是允许回复者将回复发送回请求的原始发件人的一条信息。
Implementing the return address with AMQP
实现:
1 | // amqpRequest.js |