`
February 26, 2024 本文阅读量

Nats设计与实现

分析 nats 的设计和实现,理解其中的关键流程,并记录阅读 nats 源码中的关键函数。

NATS设计与实现

NATS 就是一个消息中间件,提供了 Pub/Sub 核心数据流,并基于此构建了 Request/Reply API 和 JetStream 用来提供可靠的分布式存储能力,和更高的 QoS(至少一次 + ACK)。

1. 核心概念

序号 名词(ENG) 名词(zh-CN) 解释
1 Publish 发布 发布动作,往 subject 中投递一条消息
2 Subscribe 订阅 订阅动作,表示想要接受发布相应 subject 的消息
3 Subject 主题 唯一标识,用于标识一种或者一类事件的概念。订阅时,可以使用 nats 约定的通匹配符来接收一类 subjects,如 orders.> 。
4 Core Nats NATS 核心 CORE NATS 提供了以下能力:

- PUB / SUB https://docs.nats.io/nats-concepts/core-nats/pubsub

- Request / Reply https://docs.nats.io/nats-concepts/core-nats/reqreply

- Queue Groups https://docs.nats.io/nats-concepts/core-nats/queue


提供 最多一次 的消息传递保证
5 Request / Reply 请求 / 回复 NATS 基于 PUB / SUB 实现的对 request 异步回复的功能,依赖于消息中的 reply 字段。reply 是内置实现,随机生成一个 “inbox” subject。



订阅者中间也是存在 Queue Group 概念的。

注意:发起 request 如果没有reply,那么服务端会返回 No Responders 消息。
6 Queue groups 队列组 订阅者使用同一个队列名称,它们就会成为一个队列组。每次队列组接收到消息时,只有队列组中_随机选择_的一个订阅者会消费一条消息,内置的负载均衡。
7 Message 消息 or 事件 一条消息包含了以下内容:

- Subject.

- A payload in the form of a byte array.

- Any number of header fields.

- An optional ‘reply’ address field.
8 Subject wildcards

https://docs.nats.io/nats-concepts/subjects#wildcards
主题通配符 订阅者可以使用这些通配符_通过单个订阅收听多个主题。_

注意:但发布者将始终使用完全指定的主题,而不使用通配符。
9 JetStream

https://docs.nats.io/nats-concepts/jetstream
无 / 流 NATS 中一个功能特性,它是一个内置的分布式存储,在 CORE nats 的基础上扩展了更多的功能和更高的 QoS。功能上:

- Stream:将 NATS 消息存储在流中,提供多种保留策略、限制、丢弃策略和主题映射功能。

- Consumer: 让客户端应用订阅或拉取流中的消息,支持多种重放策略、确认机制和流控功能

- Persistence: 将流的数据复制到多个 NATS 服务器,提供容错能力和加密存储功能

- KV Store: 可以将消息与键关联,提供存储、检索、删除和监听键值变化的功能。

- Object Store: JetStream 可以存储任意大小的对象(如文件),提供分块、校验和、元数据等功能。


其中最核心(开发常用)的概念就是:stream 和 consumer。
10 Stream

https://docs.nats.io/nats-concepts/jetstream/streams
流即是消息存储,它定义了消息的存储方式以及保留的限制。

更具体的内容参见 #5.1 Stream 配置解析
11 Consumer 消费者 消费者作为客户端的接口,使用存储在流中的消息,跟踪客户端传递和确认的消息。Nats 同时支持 pullpush 两种消费模式;consumer 还提供了 durable 配置,用于持久化 consumer 消费信息(除非设置了 InactiveThreshold)

更具体的内容参见 #5.2 Consumer 配置解析
12 Replay / Redelivery
13 Raft Group



https://docs.nats.io/running-a-nats-service/configuration/clustering/jetstream_clustering
RAFT 组 对于特定内容达成一致的分组,nats 中有 meta, stream, consumer 几种组。

Meta: 全部节点都是组成员。负责:JetStream API + 集群管理。



Stream: (根据 replicas 配置选择组内的服务器成员)。负责:stream 数据



Consumer: (根据 stream group的成员来确定消费者组内的成员)。负责:消费者状态

14 KV Store 键值存储
15 Object Store 对象存储

2. 软件架构(集群模式)

Nats 自身是提供了多种运行方式:

  • 单机 只运行了一个 nats 实例。

  • 普通集群: 运行了 3 / 5 个实例,其中一个为集群 leader,此时集群如下:

  • 超级集群: 存在多个集群,集群之间可以通过 网关 来传播消息__网关提供了三种传播机制:这里 A / B 分别代表两个集群。

    • Optimistic Mode 乐观模式

       当 A 中的发布者发布“foo”时,A 网关将检查集群 B 是否已注册对“foo”没有兴趣。如果没有,则将“foo”转发给B。如果B收到“foo”后,在“foo”上没有订阅者,则B将向A发送一条网关协议消息,表示它对“foo”没有兴趣,从而阻止将来的消息关于“foo”被转发。

        当后续 B 中有订阅者关注 “foo”, 那么 B 会发送一条网关协议,用来取消对 “foo” 没兴趣的设置。

    • Interest-only Mode 兴趣(关注)模式

        当 A 上的网关发送许多关于 B 不感兴趣的各种主题的消息时。 B 发送一条网关协议消息,要求 A 乐观地停止发送,如果已知对该主题感兴趣,则发送。随着 B 上的订阅不断增多,B 将向 A 更新其主题兴趣。

    • Queue Subscriptions 队列订阅模式

        服务器将始终首先尝试为本地队列订阅者提供服务,并且仅在未找到本地队列订阅者时进行故障转移。服务器将选择 RTT 最低的集群。

  • 叶子结点/集群(代理模式):

    透明地从本地客户端路由消息到一个或多个远程 NATS 系统。叶子结点采用本地的认证进行认证,连接远程 NATS 结点时,采用远程 NATS 系统的认证。通常用来降低 local 服务的延迟和流量。 注意:如果集群中一个节点配置为叶子结点,那么其余结点也要配置为叶子结点。

一般场景下最常用的就是集群模式。

Qs:

运维时怎么保证集群各节点的负载均衡?怎么保证集群的高可用?

3. Nats 客户端协议

https://docs.nats.io/reference/reference-protocols/nats-protocol

Nats 协议是一个_文本协议_ 这意味着,我们通过 telnet 就可以与之交互,通过抓包工具也可以很轻松的分析客户端和服务器之间的交互行为

Op 操作 发送方 操作场景描述 注意事项
INFO 服务器 当客户端建立连接时,或者服务器集群拓扑发生变化时,服务器发送自身的信息,配置和安全要求给客户端
CONNECT 客户端 当客户端收到服务器的 INFO 消息后,客户端发送自身的信息和安全信息给服务器,以完成连接 verbose 字段默认为 false,表示服务器不会对每个消息回复 +OK
PUB 客户端 当客户端想要发布一个消息给指定的主题时,客户端发送 PUB 消息,可选地提供一个回复主题 消息内容是可选的,如果没有内容,需要把内容大小设置为 0,并且仍然需要第二个 CRLF
HPUB 客户端 和 PUB 相同,但是消息内容包含了 NATS 头部信息 消息内容是可选的,如果没有内容,需要把总消息大小设置为头部大小,并且仍然需要第二个 CRLF
SUB 客户端 当客户端想要订阅一个主题时,客户端发送 SUB 消息,可选地加入一个分布式队列组 主题名称必须是合法的,不能包含空格或者分隔符
UNSUB 客户端 当客户端想要取消订阅一个主题时,客户端发送 UNSUB 消息,可选地指定一个消息数量,达到后自动取消订阅
MSG 服务器 当服务器向客户端发送一个应用消息时,服务器发送 MSG 消息,包含主题,sid,可选的回复主题和内容
HMSG 服务器 和 MSG 相同,但是消息内容包含了 NATS 头部信息
PING 服务器或客户端 当服务器或客户端想要检测对方是否存活时,发送 PING 消息 服务器会定期发送 PING 消息给客户端,如果客户端没有及时回复 PONG,服务器会断开连接
PONG 服务器或客户端 当服务器或客户端收到 PING 消息时,回复 PONG 消息 服务器会把正常的流量当作 PING/PONG 的代理,所以如果客户端有消息流动,可能不会收到服务器的 PING
+OK 服务器 当服务器收到客户端的合法消息时,如果 verbose 字段为 true,服务器回复 +OK 消息 大多数客户端会把 verbose 字段设置为 false
-ERR 服务器 当服务器遇到协议错误,授权错误,或者其他运行时错误时,服务器发送 -ERR 消息给客户端 大多数这些错误会导致服务器关闭连接,客户端需要异步处理这些错误

从表里没有看到 Request 对不对?那是因为 Request 是基于 Pub / Sub 实现的 API,因此不在通信协议中,属于应用层的功能。

另外也没有看到 JetStream 相关的 Op 对不对?翻下代码就可以发现,它是基于 Request 机制实现的 pub,但通过前面的介绍,我们已经知道 CORE Nats 的 Reply 其实是 Subscriber 回复的,而 JetStream 是提供可靠存储的 ACK 并不能依赖于 Consumer ,所以这里服务器一定是有特殊处理。我们在 #6.5 JetStream 消息的投递和消费 一节中再详细展开。

4. NATS-Server 配置解析

https://docs.nats.io/running-a-nats-service/configuration

Nats 端口列表

默认端口号 作用 补充说明
443 WebSocket 协议端口,通过 websocket 来进行交互
1883 MQTT 协议支持
4222 NATS 自身端口
4111 叶节点允许本地客户端通过端口 4111 连接,且不需要任何认证
6222 集群路由监听端口
7222 网关监听端口
7422 叶子结点监听端口
8222 监控服务端口

5. JetStream 配置解析

Nats 的 jetstream 是通过 raft 来解决分布式一致性问题,以此实现 stream 相关的功能。

上图中展示了 jetstream 的一些关键点:

  • Stream 可以存储多个 subject的消息。
  • 消费者有多种消费模式(pull/push),还可以过滤stream 中的 subject 进行消费。
  • 消费有多种确认模式(ack)

5.1 Stream 配置清单

https://docs.nats.io/nats-concepts/jetstream/streams#configuration

我们当前使用的 nats 版本对应 2.9.21 下表配置中,Metadata,compression, FirstSeq 和 SubjectTransform 暂不可用。

配置选项 描述 版本 可编辑
Name 流的名称,必须是唯一的 2.2.0
Storage 流的存储类型,可以是 File(默认)或 Memory 2.2.0
Subjects 流订阅的主题列表,可以使用通配符 2.2.0
Replicas 流的副本数量,必须大于等于 1 2.2.0
Retention 流的保留策略,决定了何时删除消息 2.2.0
MaxAge 流允许的最大消息存活时间,0 表示无限制 2.2.0
MaxBytes 流允许的最大字节数,-1 表示无限制 2.2.0
MaxMsgs 流允许的最大消息数量,-1 表示无限制 2.2.0
MaxMsgSize 流允许接收的最大消息体 2.2.0
MaxConsumers 流允许的最大消费者数量,-1 表示无限制 2.2.0
NoAck 流是否禁用确认机制,如果为 true,则不需要消费者确认消息。 2.2.0
Retention 声明流的保留策略 2.2.0
Discard 流达到限制时的丢弃策略,可以是 DiscardOld(默认)、DiscardNew 或 DiscardNewPerSubject 2.2.0
DuplicateWindow 流的去重窗口,用于检测和删除重复的消息,0 表示禁用去重 2.2.0
Placement 流的放置选项,可以指定集群和标签 2.2.0
Mirror 流的镜像选项,可以指定源流和过滤条件 2.2.0 已设置不可修改
Sources 流的源选项,可以指定一个或多个源流和过滤条件 2.2.0
MaxMsgsPerSubject 流允许的每个主题的最大消息数量,0 表示无限制 2.3.0
Description 描述信息 2.3.3
Sealed stream 封存,不允许删除。 2.6.2 可修改一次
DenyDelete 限制通过 API 从 stream 中删除消息。 2.6.2
DenyPurge 限制通过 API 从 stream 中清除消息。 2.6.2
AllowRollup 流是否允许滚动更新,如果为 true,则可以使用 Nats-Rollup 头部删除旧的消息 2.6.2
RePublish 存储在 stream 会马上重新往配置的 subject 发布消息 2.8.3
AllowDirect ???
MirroDirect ??? 2.9.0
DiscardNewPerSubject 如果为 true, 丢弃策略为 DiscardNew 时,则丢弃每个 subject 新来的消息。 2.9.0
Metadata ???? 2.10.0
Compression 文件存储压缩算法,s2 = snappy 2.10.0
FirstSeq 指定 stream 的初始序列号 2.10.0
SubjectTransform 流的主题转换选项,可以指定源主题和目标主题,用于在存储消息时修改主题 2.10.0

5.2 Consumer 配置清单

https://docs.nats.io/nats-concepts/jetstream/consumers

5.2.1 通用配置
配置选项 描述 版本 是否可编辑
Durable 订阅绑定到使用者,直到被显示删除 2.2.0
FilterSubject 过滤 consumer 消费的主题,不能和 FilterSubjects 同时使用 2.2.0
AckPolicy 客户端确认策略:

AckExplicit: 默认策略,每条消息单独确认。

AckNone: 不缺人任何消息,服务器发送时即确认。

AckAll:收到系列消息,只需要确认最后一条。
2.2.0
AckWait 一旦任何单个消息被传递给消费者,服务器将等待其确认的持续时间。如果没有及时收到确认,消息将被重新发送。 2.2.0
DeliverPolicy 消费者接受消息的策略:

DeliverAll默认策略,将从最早可用的消息开始消费

DeliverLast: 开始消费后,消费加入到 stream 中的最新一条消息(如果过滤,则是匹配的最后一条)

DeliverNew: 开始消费后,只会开始接收在消费者创建之后创建的消息。

DeliverByStartSequence: 匹配序列号的第一条或者下一条seq >= OptStartSeq

DeliverByStartTime: 匹配时间的第一条或者下一条 time >= OptStartTime

DeliverLastPerSubject: 开始消费后,消费(匹配的)每个 subject 的最新一条消息。
2.2.0
OptStartSeq DeliverByStartSequence 配合使用 2.2.0
OptStartTime DeliverByStartTime 配合使用 2.2.0
Description 消费者描述
InactiveThreshold Consumer 超过这个时间未活动会被清理,在 2.9 以前仅对 临时消费者生效。 2.2.0
MaxAckPending 定义为确认的消息的最大数量,一旦到达这个限制,那么将暂定投递消息。 2.2.0
MaxDeliver 尝试传送消息的最大次数。消费者未确认(NAck或者未发送确认)时会重新传递。 2.2.0
ReplayPolicy ReplayOriginal:模拟接收到消息的时间向 consumer 推送。

ReplayInstant:消息将尽快推送到客户端。
2.2.0
Replicas 设置 consumer group 的副本数,默认继承 stream。 2.8.3
MemoryStorage 设置将 Consumer 的状态保留在内存中,而不是继承 stream 的存储类型。 2.8.3
SampleFrequency 采样率:??? 2.2.0
Metadata 用于关联消费者的元数据 2.10.0
FilterSubjects 类似于 FilterSubject,但是多个 2.10.0
5.2.2   拉模式 - 专属配置
配置选项 描述 版本 是否可编辑
MaxWaiting 最大拉取请求数 2.2.0
MaxRequestExpires 单个拉取请求等待消息可供拉取的最长时间 2.2.0
MaxRequestBatch 单个请求的最大数量 (MaxRequestMaxBytes 共同作用,先到先限制) 2.7.0
MaxRequestMaxBytes 单个请求的最大字节数 2.8.3
5.2.3   推模式 - 专属配置
配置选项 描述 版本 是否可编辑
DeliverSubject 设置服务器推送的主题,将隐式的设置消费者是基于推送。 2.2.0
DeliverGroup 类似于 queue group 2.2.0
FlowControl 启用滑动窗口协议(服务端和客户端通信交换),用于控制服务器向客户端推送多少消息。 2.2.0
IdleHeartbeat 服务器定时向客户端发送状态消息(在心跳周期内没有新消息发送),客户端可以感知到服务器的状态。 2.2.0
RateLimit 限制向消费者发送消息的的速率 bits Per Second 2.2.0
HeadersOnly 仅传送流中的消息标头,而不传送正文。其中会携带 Nats-Msg-Size 来标识负载的大小。 2.6.2

6. 关键流程

这里梳理了一些 Nats 的关键流程,用来理解 nats 中的设计和实现原理。

6.1  服务启动 / 集群

 集群 gossip 流程示意图:

nats-server -D -p 4222 -cluster nats://localhost:6222
nats-server -D -p 4333 -cluster nats://localhost:6333 -routes nats://localhost:6222
nats-server -D -p 4444 -cluster nats://localhost:6444 -routes nats://localhost:6333

  流程解释说明:

  • nats-1 启动
  • nats-2 连接到 nats-1, nats-1 回复 INFO 消息,并将 nats-2 添加到自己的路由,同时建立到 nats-2 的连接。
  • nats-3 连接到 nats-2, nats-2 回复 INFO 消息,并将 nats-3 添加到自己的路由,同时建立到 nats-3 的连接。这里 nats-2 会将新路由( nats-3 )传播(INFO 消息)到自己已知的服务节点,也就是 nats-1, 而 nats-1 收到 INFO 消息后,会向 nats-3 建立连接,与前面的流程类似。

6.2 PUB / SUB

  整体流程概览如下:

6.2.1  SUB

  当客户端新增订阅时,会向服务器发送一条 SUB 消息,服务器则会更新客户端(及对应账户)的订阅关系,同时向其余集群中的 route (服务节点) 发送 RS+ (集群通讯协议)消息以更新订阅。

 6.2.2 PUB

  客户端发布一条消息,服务器内部匹配(有缓存设计以提高匹配效率)相关订阅客户端。将消息加入到客户端的发送缓冲区(注意:订阅中区分 普通订阅和 队列,队列模式则需要随机选中一个客户端),当相应的 route 节点收到消息后,会再从本地的订阅列表中匹配当前节点中的客户端推送消息。

6.3  Request / Reply

从 Nats 设计的协议已经知道,Request 和 Reply 两个 API 是基于 PUB / SUB 机制。PUB 参数中可以携带一个 reply 参数,该 reply 参数是发布时自动创建的一个 _INBOX subject。服务端在推送消息时,会将 reply 放到 MSG 消息头传递给订阅者。

// This processes the sublist results for a given message.  
// Returns if the message was delivered to at least target and queue filters.  
func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver, subject, reply []byte, flags int) (bool, [][]byte) {
	// ...
	
	var creply = reply
	
	// ...
	
	// Loop over all normal subscriptions that match.  
	for _, sub := range r.psubs {  
		...
		
	    // Normal delivery  
	    mh := c.msgHeader(dsubj, creply, sub)  
	    c.deliverMsg(prodIsMQTT, sub, acc, dsubj, creply, mh, msg, rplyHasGWPrefix)
	}
}

6.4  新建 jetstream

在开始之前,我们需要对 multi-raft 有个大致的概念,示意图如下:

stream raft group 就对应上图中的一个 raft group。与此同时,集群中所有开启 jetstream 功能的节点还会组成一个 meta raft group, 用来管理 jestream API 的执行和 jetstream 集群拓扑(节点上线离线)。

从 nats-cli 的源码入手,我们可以找到:

// NewStreamFromDefault creates a new stream based on a supplied template and optionsfunc (m *Manager) NewStreamFromDefault(name string, dflt api.StreamConfig, opts ...StreamOption) (stream *Stream, err error) {  
	// ...
	
    var resp api.JSApiStreamCreateResponse  
    err = m.jsonRequest(fmt.Sprintf(api.JSApiStreamCreateT, name), &cfg, &resp)  
    if err != nil {  
       return nil, err  
    }  
  
    return m.streamFromConfig(&resp.Config, resp.StreamInfo), nil  
} 

Manager 的含义是 JetStreamManager,代表与服务端 JS API 交互,其中 api. JSApiStreamCreateT string = "$JS.API.STREAM.CREATE.%s" 可以发现底层还是通过 PUB / SUB 机制实现的交互。

在 nats-server 中检索可以发现,在 nats-server 内部定义了 JetStream 相关API 的 msgHandler。

func (s *Server) setJetStreamExportSubs() error {  
    // Start the go routine that will process API requests received by the  
    // subscription below when they are coming from routes, etc..    s.jsAPIRoutedReqs = newIPQueue[*jsAPIRoutedReq](s, "Routed JS API Requests")  
    s.startGoRoutine(s.processJSAPIRoutedRequests)  
  
    // This is the catch all now for all JetStream API calls.  
    if _, err := s.sysSubscribe(jsAllAPI, js.apiDispatch); err != nil {  
       return err  
    }  
  
	// ...
  
    // API handles themselves.  
    pairs := []struct {  
       subject string  
       handler msgHandler  
    }{  
       {JSApiAccountInfo, s.jsAccountInfoRequest},  
       {JSApiTemplateCreate, s.jsTemplateCreateRequest},  
       {JSApiTemplates, s.jsTemplateNamesRequest},  
       {JSApiTemplateInfo, s.jsTemplateInfoRequest},  
       {JSApiTemplateDelete, s.jsTemplateDeleteRequest},  
       {JSApiStreamCreate, s.jsStreamCreateRequest},  
       {JSApiStreamUpdate, s.jsStreamUpdateRequest},  
       {JSApiStreams, s.jsStreamNamesRequest},  
       {JSApiStreamList, s.jsStreamListRequest},  
       {JSApiStreamInfo, s.jsStreamInfoRequest},  
       {JSApiStreamDelete, s.jsStreamDeleteRequest},  
       {JSApiStreamPurge, s.jsStreamPurgeRequest},  
       {JSApiStreamSnapshot, s.jsStreamSnapshotRequest},  
       {JSApiStreamRestore, s.jsStreamRestoreRequest},  
       {JSApiStreamRemovePeer, s.jsStreamRemovePeerRequest},  
       {JSApiStreamLeaderStepDown, s.jsStreamLeaderStepDownRequest},  
       {JSApiConsumerLeaderStepDown, s.jsConsumerLeaderStepDownRequest},  
       {JSApiMsgDelete, s.jsMsgDeleteRequest},  
       {JSApiMsgGet, s.jsMsgGetRequest},  
       {JSApiConsumerCreateEx, s.jsConsumerCreateRequest},  
       {JSApiConsumerCreate, s.jsConsumerCreateRequest},  
       {JSApiDurableCreate, s.jsConsumerCreateRequest},  
       {JSApiConsumers, s.jsConsumerNamesRequest},  
       {JSApiConsumerList, s.jsConsumerListRequest},  
       {JSApiConsumerInfo, s.jsConsumerInfoRequest},  
       {JSApiConsumerDelete, s.jsConsumerDeleteRequest},  
    }  
  
    js.mu.Lock()  
    defer js.mu.Unlock()  
  
    for _, p := range pairs {  
       sub := &subscription{subject: []byte(p.subject), icb: p.handler}  
       if err := js.apiSubs.Insert(sub); err != nil {  
          return err  
       }  
    }  
  
    return nil  
}

在 client 消息分发(deliverMsg) 的逻辑中会对匹配的 subscription 执行 icb (msgHandler)。

在集群模式下,这些 JetStream API 的订阅也是会通过 route 客户端传导到集群中的每个节点。相当于每个节点都订阅/处理,JS API 的消息,但是只有 leader 节点可以处理。

jsStreamCreateRequest 后续代码逻辑(集群模式下, 单机只需要在本地操作即可):

jsStreamCluster(jetstream meta group) 的启动路径: Reload / Start -> enableJetStream -> enableJetStreamClustering -> startRaftNode/setupMetaGroup -> monitorCluster

  • 通过 meta raft group 提交一条新增 stream 的日志消息(其中已经设置 raftgroup 的基本信息)参见 jsClusteredStreamRequest 函数
  • jetstream 集群中的节点接收到该日志后(monitorCluster),会执行(applyMetaEntries)其中的操作 (assignStreamOp)添加 stream 执行逻辑参见 processStreamAssignment -> processClusterCreateStream
    • 新建一个 stream raft group 启动 raft 节点
    • 内部创建一个 stream 数据结构(mset= stream)
    • 启动监视协程 monitorStream,处理 raft group 中的事件(快照,领导者变更,日志应用)

6.5  JetStream 消息的投递和消费

最简单的开始使用 JetStream 消费者的代码如下:

// connect to nats server
nc, _ := nats.Connect(nats.DefaultURL)

// create jetstream context from nats connection
js, _ := jetstream.New(nc)

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

// get existing stream handle
stream, _ := js.Stream(ctx, "foo")

// retrieve consumer handle from a stream
cons, _ := stream.Consumer(ctx, "cons")

// consume messages from the consumer in callback
cc, _ := cons.Consume(func(msg jetstream.Msg) {
    fmt.Println("Received jetstream message: ", string(msg.Data()))
    msg.Ack()
})
defer cc.Stop()
6.5.1 消费者创建

消费者可以显式或者隐式的创建,如果隐式创建往往都代表临时,也就是如果消费者不活跃则会被服务端删除。之后,客户端开始订阅消费者下发消息的 subject,push 方式等服务端推送,而 pull 模式则需要定时往服务端发送一条消息,触发消息下推。

从 nats.go 的源码出发,StreamConsumerManager 接口定义了一个 JetStream 中 Consumer 的相关操作。我们首先看下 consumer 的创建,同样的客户端是通过 JS API 来完成调用:

服务端统一使用了 jsConsumerCreateRequest 进行处理,其中逻辑与添加 stream 类似:jetstream集群模式下会创建一个raft group 设置好 leader, 并通过 meta group 来提交新增 consumer 的日志,每个节点会处理该消息(processConsumerAssignment->processClusterCreateConsumer):

  • 获取到流
  • 通过 addConsumerWithAssignment 添加一个 consumer
  • (leader)发送响应

这个操作会在 stream raft group 的 部分/全部(取决于 consumer 配置的 Replicas)节点中创建一个 raft group (consumer) 用来保存 consumer 的相关信息(消费点位)。同时在 stream 对应的节点中添加 consumer 实例。

再回到客户端,consumer 创建(查询)后,获取到 consume 的信息,从中获取到 delivery subject(代表 consumer 消费的消息会从这个 subject 推送出来),后续客户端会订阅这个这个 subject。

这里可以说明服务端 consumer 和 客户端 consumer 没有必然的的联系,后续另外的客户端使用同一个 consumer 信息时,也只是共享 consumer 的配置,订阅相同的 subject,回到了 CORE NATS 提供的数据模型。

6.5.2 消息消费

consumer 提供了两种模式 pull (PullSubscribe) 和 push(Subscribe) 模式,虽然叫做 pull 和 push,但是这两种机制的底层实现还是基于 nats 的 pub / sub 机制。push 等价于 subscribe,服务端会将消息直接推送到客户端;pull 则是客户端主动调用 JS API ”请求“,服务端 “响应” 数据给客户端消费。参见下图抓包数据分析截图(提前创建了 stream 和 consumer,指定了拉模式)

nats.go 的 jetstream 包针对 pull consumer 提供了丰富的API(尤其是 Consume)让 pull consumer 可以和 push consumer 用相似的方式来持续的处理消息。

6.5.2.1 pull 模式

pullConsumer Consume 方法实现如下(已精简),与 PullSubscribe.Fetch 的原理类似(请求 JS API),区别在于 API 的形式表现不同:

func (p *pullConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) (ConsumeContext, error) {
	// 解析消费选项
	consumeOpts, err := parseConsumeOpts(false, opts...)
	if err != nil {
		return nil, fmt.Errorf("%w: %s", ErrInvalidOption, err)
	}
	// pull subject 是 JS.API 下的一个 subject 
	// apiRequestNextT string = "CONSUMER.MSG.NEXT.%s.%s"
	subject := apiSubj(p.jetStream.apiPrefix, fmt.Sprintf(apiRequestNextT, p.stream, p.name))
	sub := &pullSubscription{
		consumer:    p,
		errs:        make(chan error, 1),
		done:        make(chan struct{}, 1),
		fetchNext:   make(chan *pullRequest, 1),
		consumeOpts: consumeOpts,
	}
	
	// 创建一个 inbox subject 作为 pull 请求的 reply,internalHandler 就是 MsgHandler
	inbox := p.jetStream.conn.NewInbox()
	sub.subscription, err = p.jetStream.conn.Subscribe(inbox, internalHandler)
	if err != nil {
		return nil, err
	}
	go func() {
		for {
			select {
				case status, ok := <-sub.connStatusChanged:
				case err := <-sub.errs:
					if errors.Is(err, ErrNoHeartbeat) {
						  sub.fetchNext <- &pullRequest{  
						    Expires:   sub.consumeOpts.Expires,  
						    Batch:     batchSize,  
						    MaxBytes:  sub.consumeOpts.MaxBytes,  
						    Heartbeat: sub.consumeOpts.Heartbeat,  
						  }
					}
			}
		}
	}()
	// 根据 fetchNext 通知向服务端发送 pull 请求
	// fetchNext 是由一个 consumeOpts 的心跳设置的一个 timer 触发 ErrNoHeartbeat 来激活
	go sub.pullMessages(subject)
	
	return sub, nil
}
6.5.2.2 push 模式

使用 DeliverSubject (没有设置会自动生成一个) 来设置消费者订阅的主题,同时创建一个对该主题的订阅,用于接收处理消息。push 模式同样会在服务器创建消费者。

小结:对于客户端来说消费 stream 中的消息,其实就是产生一个订阅主题,客户端会如同普通的 SUB 客户端一样消费这里的消息。但在此之前,需要根据场景和选项在服务器创建一个 consumer,告诉服务端自己的消费场景(是否持久化,关注stream中的哪些 subject, 消息的分发策略 等等)

6.5.3 消息投递推送

通过前面的消息消费我们可以知道,pullConsumer 拉取消息时会往一个形如 CONSUMER.MSG.NEXT 的主题中发起请求,通过在服务端代码检索可以发现,consumer 结构内保存了这么一个 subject, 并关联了对应的 icb(processNextMsgReq),而这个动作是在 consumer 创建时,在 leader 节点上设置的(consumer.setLeader)。

这里要注意,这个请求只能被 consumer group 中的 leader 处理。

与此同时 consumer leader 节点还设置了(与消费相关的操作):

  • ack订阅、请求订阅 和 流控订阅等
  • 如果消费者是推模式,那么会注册“兴趣通知”
  • 同时会启动定时器来清除不活跃的消费者
  • 启动消息推送的 loopAndGatherMsgs 逻辑,往 consumer.outq -> stream.outq 发送消息, stream.outq 在 stream.internalLoop 中执行往客户端推送的逻辑(通过 stream 的内部客户端往 subject 推送一条消息,与 pub 流程类似再由 CORE NATS 完成消息的分发)。

到这里总结下这个环节的问题:

  • jetstream publish 的消息经过了什么样的流程?
    • 消息到服务端后会检查 subject 相关的订阅,而 stream(leader)设置时已经配置了一个内部订阅订阅会传播到集群中 processInboundJetStreamMsg 的目的就是将将消息提交到 raft group, 最终通过 processJetStreamMsg 执行。
    • stream 组提交后,各个副本保存,同时通知相关的 consumer
    • server consumer(leader) 通过 loopAndGatherMsgs 往 subject 中推送消息
    • server consumer 订阅了客户端 ack 消息,用于更新消费者位点等数据
  • 是先保存还是先推送? 在stream 组内部提交保存后推送
  • consumer 的消息是从 leader 来还是可以从 follower 直接读取? leader
  • 服务端中 stream 和 consumer 这两个结构在其中承担什么样的职责?
    • stream 代表 stream (包括 store / raft group / stream config)
      • 负责 jetstream 相关API的请求处理
      • stream 消息 复制/存储
      • 通知/唤醒等待的 consumer
    • consumer 代表 消费者(包括消费配置 / raft group ) 注意它不等于客户端实例
      • consumer 组内的数据保存(raft group)
      • API 处理(拉消费的 API)
      • 消息推送
      • 消费确认处理

JetStream 消息投递API 也是基于 PUB / SUB 机制实现的,但是存在的区别是: JetStream 是存在确认机制的,而 Core Nats 的 Publish API 并没有。因此要严格保证消息发布到 stream 中,需要使用 JetStream 的 API 来发送。

7. 扩展

7.1 RAFT Consensus Protocol

Consensus is a fundamental problem in fault-tolerant distributed systems. Consensus involves multiple servers agreeing on values. Once they reach a decision on a value, that decision is final.

共识问题是分布式系统容错中的基础性问题,它描述的是:在多个服务节点间对某个值达成一致,一旦达成一致那么这个值就是最终的结果。RAFT 就是解决共识问题的一种算法,它以简单著名。

常见的一种实现是,通过 复制/多副本 的方式来解决分布式问题中的 可靠性 问题。常见的,我们的系统提供了3个副本,当其中一个节点宕机时,也不影响系统对外提供服务。而 RAFT 则提供相应的机制(通过当选的领导者达成共识)来解决 复制/多副本 过程中的一致性问题。

Raft 可视化介绍:https://thesecretlivesofdata.com/raft/

它其中有几个概念:

  • Leader Election(选举)产生唯一的一个 leader 角色。
    • Leader
    • Candidate
    • Follower
    • Vote
    • Term
  • Log Replication(日志复制)用于服务器之间保持一致要素。
  • Replicated State Machine(复制状态机)_每个服务器存储一个包含一系列指令的日志,并且按顺序执行指令。由于日志都包含相同顺序的指令,状态机会按照相同的顺序执行指令,由于状态机是确定的(deterministic),因此状态机会产生相同的结果。

7.2   Gossip Protocol

它基于流行病传播方式的节点或者进程之间信息交换的协议。以给定的频率,每台计算机随机选择另一台计算机,并共享任何消息。

它的定义如下:

  • 如果有某一项信息需要在整个网络中所有节点中传播,那从信息源开始,选择一个固定的传播周期(譬如 1 秒),随机选择它相连接的 k 个节点(称为 Fan-Out)来传播消息。
  • 每一个节点收到消息后,如果这个消息是它之前没有收到过的,将在下一个周期内,选择除了发送消息给它的那个节点外的其他相邻 k 个节点发送相同的消息,直到最终网络中所有节点都收到了消息,尽管这个过程需要一定时间,但是理论上最终网络的所有节点都会拥有相同的消息。

它对网络节点的 连通性和稳定性 几乎没有任何要求,它一开始就将网络某些节点只能与一部分节点_部分连通(Partially Connected Network)而不是以全连通网络_(Fully Connected Network)作为前提;没有任何中心化节点或者主节点的概念。

相应的它的缺点是:无法准确地预计到需要多长时间才能达成全网一致;也存在重复传播的概率,消息在网路中冗余。

由此,Gossip 设计了两种可能的消息传播模式:反熵(Anti-Entropy)和传谣(Rumor-Mongering):

  • 反熵:会同步节点的全部数据,以消除各节点之间的差异,目标是整个网络各节点完全的一致。
  • 传谣:仅仅发送新到达节点的数据,即只对外发送变更信息。

在 Nats 中,gossip 被用于实现 集群服务发现 https://docs.nats.io/reference/reference-protocols/nats-server-protocol。routes 里指定了一个种子服务器,新启动的服务器连接上种子服务器后,就可以获取到全部的服务器列表。在服务器的配置里,可以不用知道所有的节点,而只用配置一个即可,但通常为了配置简单,会统一使用一个种子服务器。如下:

nats-server -D -p 4222 -cluster nats://localhost:6222
nats-server -D -p 4333 -cluster nats://localhost:6333 -routes nats://localhost:6222
nats-server -D -p 4444 -cluster nats://localhost:6444 -routes nats://localhost:6222
// 使用下面的命令启动也是可行的
nats-server -D -p 4444 -cluster nats://localhost:6444 -routes nats://localhost:6333 

可以参见服务启动 / 集群的流程。

7.3   Zero allocation byte parser

https://www.youtube.com/watch?v=ylRKac5kSOk&t=646s 零分配 的含义是:nats 在解析协议时,避免不必要的内存分配:

  • 使用局部变量,在栈上分配

  • 使用 slice 复用底层数组

  • 采用状态机来解析协议的各个部分,而不是构建临时对象来存储中间状态。如: 一般情况下解析如下协议时,最常规的思路就是,先把 Command(PUB) 读出来,然后根据这个command 决定要后面的操作,这里再读去两个参数 subject, payload length, 有了 length 之后再读去 payload,这样我们就创建了_4个临时变量_在表示下面的命令。

    PUB foo.bar 7
    goodbye
    

    在 nats 中则是通过状态机,一步一步的解析中间没有使用临时变量来存储命令中的数据。

  • 避免使用字符串,网络IO场景中,操作的对象几乎都是字节,因此 nats parser 也避免使用字符串以减少内存分配和拷贝。

7.4  Nats CLI

https://docs.nats.io/using-nats/nats-tools/nats_cli 能方便快捷的与 nats 交互 / 模拟 / 测试。