理解 Victoria Logs 设计与实现

理解 Victoria Logs 设计与实现

Victoria Logs 简单易上手,非常适合中小团队使用,同时也适用于成本敏感的场景。

官方建议:如果可以接受在单节点上垂直扩展来满足业务需求,那么就不必使用集群模式。

本文包含大量源码片段,若不关注细节,可只阅读 架构概览存储模型

架构概览 #

Victoria Logs 的集群架构如下:

Victoria Logs 架构图
Victoria Logs 架构图
  • 通过 vlagent 完成类似 Replica 的功能,将日志数据复制到多个 vlstorage 组件中,实现高可用和数据冗余。
  • 通过 vlinsert 执行日志数据的写入策略。
  • 通过 vlselect 来聚合多个 vlstorage 组件中的日志数据,实现查询功能。
  • 通过 vlstorage 可以无缝的实现扩展存储容量,而不用考虑数据迁移(rebalance)。

vlstorage 的缩容稍微复杂一些,这里提供一种方案:

  1. 先将需要缩容的 vlstorage 节点从 vlinsert 的 storageNode 列表中移除,vlselect 中保留;
  2. 利用 Victoria Logs 的数据保留功能,让数据随逐步过期,直到待删除节点上的数据全部过期;
  3. 最后将其从 vlselect 中移除,并释放 vlstorage 节点。

也可以参考 Partitions lifecycle,手动执行数据迁移。

vlagent #

vlagent 是 Victoria Logs 的代理组件(类似 OpenTelemetry 的 Agent),位置更靠近日志源,同时也是一层缓冲区,负责接收日志数据并将其发送到 vlinsert 组件。

它可以将数据写入到不同的 Victoria Logs 实例中,实现数据的多写(复制),是实现高可用的必要组件。

PS:这里就不做过多的展开,有兴趣可以自行翻看源码。

vlinsert #

vlinsert 是 Victoria Logs 的写入组件,为无状态服务。写入时根据“局部性”和“均匀分布”策略选择目标 vlstorage 节点。

代码片段
// app/vlstorage/netinsert/netinsert.go
func (s *Storage) AddRow(streamHash uint64, r *logstorage.InsertRow) {
	idx := s.srt.getNodeIdx(streamHash)
    // sns []*storageNode 代表 vlstorage 节点组
	sn := s.sns[idx]
    // 进一步追踪到 sn.mustSendInsertRequest 代表通过网络写入到 vlstorage 节点
	sn.addRow(r) 
}

// app/vlstorage/netinsert/netinsert.go
func (srt *streamRowsTracker) getNodeIdx(streamHash uint64) uint64 {
	if srt.nodesCount == 1 {
		// 如果只有一个 vlstorage 节点,那么就直接写入到该节点
		return 0
	}

    // streamHash := sid.id.lo ^ sid.id.hi,这里的 sid 就是 streamID
    // sid.id = hash128(bb.B) 就是 stream 的标签和值 hash 得来的
    // 可以参见 LogRows.MustAdd 中的过程
	streamRows := srt.rowsPerStream[streamHash] + 1
	srt.rowsPerStream[streamHash] = streamRows

	// 如果 Stream 中的日志数量小于 1000 条,那么写入到同一个 vlstorage 组件中 
    // 对于只包含少量日志的 Stream,可提高局部性;当某个 Stream 量较大时则
    // 分散到不同的 vlstorage 节点,以提高查询性能。
	if streamRows <= 1000 {
		return streamHash % uint64(srt.nodesCount)
	}

	return uint64(fastrand.Uint32n(uint32(srt.nodesCount)))
}

// app/vlstorage/netinsert/netinsert.go
func (sn *storageNode) mustSendInsertRequest(pendingData *bytesutil.ByteBuffer) {
    // sn 代表当前选中的写入节点
	err := sn.sendInsertRequest(pendingData)

    // 如果写入成功,那么这里就直接返回了
    // 否则,会尝试其他节点
    // sn.s 代表的是 Storage 
    // sendInsertRequestToAnyNode 方法则会随机选中一个节点尝试(实际调用 storageNode.sendInsertRequest)
	for !sn.s.sendInsertRequestToAnyNode(pendingData) {
		// 省略
	}
}

// app/vlstorage/netinsert/netinsert.go
func (sn *storageNode) sendInsertRequest(pendingData *bytesutil.ByteBuffer) error {
	// 如果没有禁用压缩,就使用 zstd 压缩数据
	var body io.Reader
	if !sn.s.disableCompression {
		bb := zstdBufPool.Get()
		defer zstdBufPool.Put(bb)

		bb.B = zstd.CompressLevel(bb.B[:0], pendingData.B, 1)
		body = bb.NewReader()
	} else {
		body = pendingData.NewReader()
	}

	// 对 vlstorage 节点请求 internal/insert 接口
	reqURL := sn.getRequestURL("/internal/insert")
	req, err := http.NewRequestWithContext(ctx, "POST", reqURL, body)
	// ...
	resp, err := sn.c.Do(req)
	// ...
}

小结

在集群模式下,vlinsert 负责从 vlstorage 节点列表中选择写入目标,路由策略如下:

  • Stream 中的日志数量小于 1000 时,同一个 Stream 的日志会被写入到同一个 vlstorage 组件中。
  • Stream 中的日志数量大于等于 1000 时,会随机选择一个 vlstorage 节点,将日志写入到该组件中。
  • 如果写入失败,会尝试写入到其他节点。

vlselect #

vlselect 是 Victoria Logs 的查询组件,负责接收查询请求并返回查询结果。这里以 /select/logsql/query 接口为例,介绍查询的过程。

入口函数为:app/vlselect/logsq/logsql.go#ProcessQueryRequest

代码片段
// app/vlselect/logsq/logsql.go
// ProcessQueryRequest handles /select/logsql/query request.
//
// See https://docs.victoriametrics.com/victorialogs/querying/#querying-logs
func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) {
	// 省略参数解析,metrics 采集等等

	// Execute the query
	if err := vlstorage.RunQuery(qctx, writeBlock); err != nil {
		// 省略错误处理
		return
	}
}

// app/vlstorage/main.go
func RunQuery(qctx *logstorage.QueryContext, writeBlock logstorage.WriteDataBlockFunc) error {
	// 这部分的目的是用来判断查询是否可以优化为:直接返回最后的 N 条日志,而不用先执行过滤再排序
	// 比如:
	// - 'sort by (_time desc) offset <offset> limit <limit>'
	// - 'first <limit> by (_time desc)'
	// - 'last <limit> by (_time)'
	qOpt, offset, limit := qctx.Query.GetLastNResultsQuery()
	if qOpt != nil {
		qctxOpt := qctx.WithQuery(qOpt)
		return runOptimizedLastNResultsQuery(qctxOpt, offset, limit, writeBlock)
	}

	// localStorage 在 vlstorage 节点中才会被赋值
	if localStorage != nil {
		return localStorage.RunQuery(qctx, writeBlock)
	}

	return netstorageSelect.RunQuery(qctx, writeBlock)
}

可以看出,RunQuery 方法不仅由 vlselect 调用,vlstorage 节点中也会调用。目前先关注 netstorageSelect 这个分支:

代码片段
// app/vlstorage/netselect/netselect.go
func (s *Storage) RunQuery(qctx *logstorage.QueryContext, writeBlock logstorage.WriteDataBlockFunc) error {
	// nqr 代表 NetQueryRunner,用于执行分布式查询
	nqr, err := logstorage.NewNetQueryRunner(qctx, s.RunQuery, writeBlock)
	if err != nil {
		return err
	}

	search := func(stopCh <-chan struct{}, q *logstorage.Query, writeBlock logstorage.WriteDataBlockFunc) error {
		qctxLocal := qctx.WithQuery(q)
		return s.runQuery(stopCh, qctxLocal, writeBlock)
	}

	// nqr.Run 实际执行的还是 Storage.runQuery 方法
	// nqr.Run 其中会将 pipe 分成 remote pipe 和 local pipe 
	// 分别执行分布式查询和本地查询
	concurrency := qctx.Query.GetConcurrency()
	return nqr.Run(qctx.Context, concurrency, search)
}

// app/vlstorage/netselect/netselect.go
func (s *Storage) runQuery(stopCh <-chan struct{}, qctx *logstorage.QueryContext, writeBlock logstorage.WriteDataBlockFunc) error {

	// ...

	// 这里可以看出来,runQuery 方法会并发执行所有的 vlstorage 节点的查询
	for i := range s.sns {
		go func(nodeIdx int) {
			err := sn.runQuery(qctxLocal, func(db *logstorage.DataBlock) {
				writeBlock(uint(nodeIdx), db)
			})
		}(i)
	}

	// ...
}

func (sn *storageNode) runQuery(qctx *logstorage.QueryContext, processBlock func(db *logstorage.DataBlock)) error {
	path := "/internal/select/query"
	responseBody, reqURL, err := sn.getResponseBodyForPathAndArgs(qctx.Context, path, args)
	if err != nil {
		return err
	}
	defer responseBody.Close()

	// 解析响应,省略
}

小结

vlselect 会并发向所有 vlstorage 节点发送查询请求,查询使用的是节点的 /internal/select/query 接口。

从这部分实现也能看出,Victoria Logs 可以实现 vlstorage 的无缝扩展。这种查询方式会向所有 vlstorage 节点发送请求,并将各节点返回的结果进行合并。也存在一些缺点,例如:

  • 木桶效应:查询响应时间受最慢节点影响;
  • 即使某些节点不存在相关数据,仍会参与查询执行;
  • 网络开销不可避免,延迟会增加。

这也解释了为什么 Victoria Logs 更推荐在可接受的情况下使用单节点模式,而非集群。

vlstorage #

vlstorage 是 Victoria Logs 的存储组件,负责管理日志数据的存储,并为 vlselect 提供查询接口(/internal/select/query),为 vlinsert 提供写入接口(/internal/insert)。

查询接口 #

/internal/select/query 映射为 processQueryRequest 方法:

代码片段
// app/vlselect/internalselect/internalselect.go
func processQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
	// ...

	// 这里就是在跟踪 vlselect 时调用的 RunQuery 方法
	if err := vlstorage.RunQuery(qctx, writeBlock); err != nil {
		return err
	}
}

// app/vlstorage/main.go
func RunQuery(qctx *logstorage.QueryContext, writeBlock logstorage.WriteDataBlockFunc) error {
	// ...

	// 这里就要关注 localStorage.RunQuery 方法
	if localStorage != nil {
		return localStorage.RunQuery(qctx, writeBlock)
	}
	return netstorageSelect.RunQuery(qctx, writeBlock)
}

这也进入了 Victoria Logs 的核心存储层,具体细节在后文“存储原理”部分展开。

写入接口 #

/internal/insert 对应的处理方法在 app/vlinsert/internalinsert/internalinsert.go#RequestHandler 中:

代码片段
// app/vlinsert/internalinsert/internalinsert.go
func RequestHandler(w http.ResponseWriter, r *http.Request) {
	// ...

	// CommonParams 包含了写入时的一些公共参数, 如:
	// 	TenantID         logstorage.TenantID
	// TimeFields       []string
	// MsgFields        []string
	// StreamFields     []string
	// IgnoreFields     []string
	// 等等
	cp, err := insertutil.GetCommonParams(r)
	
	// 根据压缩类型解析 body 中的数据, 再通过 parseData 转为 InsertRow 类型
	err = protoparserutil.ReadUncompressedData(r.Body, encoding, maxRequestSize, func(data []byte) error {
		lmp := cp.NewLogMessageProcessor("internalinsert", false)
		irp := lmp.(insertutil.InsertRowProcessor)
		err := parseData(irp, data)
		lmp.MustClose()
		return err
	})
}

// app/vlinsert/internalinsert/internalinsert.go
func parseData(irp insertutil.InsertRowProcessor, data []byte) error {
	// 从对象池中获取 InsertRow 对象
	r := logstorage.GetInsertRow()

	src := data
	i := 0
	for len(src) > 0 {
		tail, err := r.UnmarshalInplace(src)
		
		src = tail
		i++

		// 将解析后的 InsertRow 添加到 InsertRowProcessor 中
		irp.AddInsertRow(r)
	}
}

InsertRowProcessor 的实现是 logMessageProcessor

代码片段
// app/vlinsert/insertutil/common_params.go
func (lmp *logMessageProcessor) AddInsertRow(r *logstorage.InsertRow) {
	// 超过 MaxFieldsPerLine(默认 1000) 个字段的日志行会被丢弃
	if len(r.Fields) > *MaxFieldsPerLine {
		return
	}

	// 调用 logstorage.LogRows 将 InsertRow 添加到 LogRows 中
	lmp.lr.MustAddInsertRow(r)

	// 如果需要 flush,则调用 flushLocked 方法
	if lmp.lr.NeedFlush() {
		lmp.flushLocked()
	}
}

到这里也进入到 Victoria Logs 的核心存储层,同样在介绍存储原理中再展开。

存储原理 #

Victoria Logs - How does victorialogs works 可以得知,Victoria Logs 采用了以下设计:

  • 日志被作为 JSON 条目存储。
  • 日志的字段会保存到不同的数据块中。
  • 不同日志的相同字段会保存在同一个数据块中。
  • 数据块会压缩存储,以减少磁盘空间占用。
  • 较小的数据块会在后台合并成较大的数据块。
  • 查询过程中,每个数据块会被原子的并发的读取。

此外,Victoria Logs 还采用以下优化以提升查询效率:

  • 引入了 Bloom Filter 来跳过没有给定关键字的数据块。
  • 针对不同数据类型的字段采用自定义编码和压缩。
  • 相同的 Stream 被物理的分组(使用 Stream Filter 可以跳过不需要的数据块)。
  • 为日志时间维护了一个稀疏索引(使用 Time Filter 可以提高查询效率)。

仅凭文字说明难以充分理解 Victoria Logs 的存储原理,仍需进一步深入源码来把握细节。

这部分源码在 lib/logstorage/ 目录下。

写入流程 #

前面我们跟踪到 logMessageProcessor 中的 AddInsertRow 方法,其中调用了 flushLocked 方法:

代码片段
// app/vlinsert/insertutil/common_params.go
func (lmp *logMessageProcessor) flushLocked() {
	logRowsStorage.MustAddRows(lmp.lr)
}

// app/vlstorage/main.go
// 这个方法在 vlinsert 中也被调用,只不过就是使用的 netstorageInsert.AddRow 方法
func (*Storage) MustAddRows(lr *logstorage.LogRows) {
	if localStorage != nil {
		localStorage.MustAddRows(lr)
	} else {
		// Store lr across the remote storage nodes.
		lr.ForEachRow(netstorageInsert.AddRow)
	}
}

localStorage(logstorage.Storage) 代表的就是 Victoria Logs 的本地存储层实现:

代码片段
// lib/logstorage/storage.go
type Storage struct {
	path string 			// 存储目录的路径
	retention time.Duration // 数据保留时间
	flockF *os.File 		// 用于确保 Storage 仅被单个进程打开的文件锁

	partitions []*partitionWrapper // 分区列表,按时间排序,例如 partitions[0] 是最早的分区
	ptwHot *partitionWrapper       // 最新的分区,用于写入新的日志行

	// ... 省略其他字段
}

这里新出了 partition 的概念,其实对应的就是日期,每个日期对应一个分区。再回到 MustAddRows 方法:

代码片段
func (s *Storage) MustAddRows(lr *LogRows) {
	// Fast path: 
	// 尝试将 LogRows 全部写入到 ptwHot 中
	ptwHot := s.ptwHot
	if ptwHot != nil {
		if ptwHot.canAddAllRows(lr) {
			ptwHot.pt.mustAddRows(lr)
			return
		}
	}

	// Slow path:
	// 如果 LogRows 不能被 ptwHot 全部写入,那么就需要把 LogRows 拆分成多个 LogRows,写入到不同的分区中
	// PS: 历史的分区可能需要从磁盘加载,会增加写入延迟
	now := time.Now().UnixNano()
	minAllowedDay := s.getMinAllowedDay(now) // 保留策略:过去的时间
	maxAllowedDay := s.getMaxAllowedDay(now) // 保留策略:未来的时间
	minAllowedTimestamp := now - s.maxBackfillAge.Nanoseconds() // 最大能接受的历史日志时间

	// 遍历 LogRows 中的每个日志行,过滤掉不符合要求的日志行,
	// 并根据日志行的时间戳,将其添加到同一分区(日期)的 LogRows 中
	m := make(map[int64]*LogRows)
	for i, ts := range lr.timestamps {
		day := ts / nsecsPerDay		
		// 根据保留策略 和 最大能接受的历史日志时间 过滤掉不符合要求的日志行
		// ...

		lrPart := m[day]
		lrPart.mustAddInternal(lr.streamIDs[i], ts, lr.rows[i], lr.streamTagsCanonicals[i])
	}
	for day, lrPart := range m {
		ptw := s.getPartitionForWriting(day)
		if ptw != nil {
			ptw.pt.mustAddRows(lrPart)
		}
	}
}

这里可以得知 Storage 的写入是分区(day)的,而分区具体的写入由 partitionWrapper.partition 控制。

代码片段
// lib/logstorage/partition.go
type partition struct {
	s *Storage

	// path 是分区的完整目录路径。例如 /data/logstorage/partitions/20230801
	// 前面的 /data/logstorage 可以配置,后面则是 Victoria Logs 自己的目录结构
	path string 
	name string // name 是分区的名称,是目录名。如:20230801

	idb *indexdb // idb 是索引数据库,用于存储日志行的索引信息
	ddb *datadb  // ddb 是数据数据库,用于存储日志行的原始数据

	// ....
}

// lib/logstorage/partition.go
// mustAddRows 也就是把数据写入到索引数据库和数据数据库中,只是其中为了提高性能过滤了已经存在的 Stream
func (pt *partition) mustAddRows(lr *LogRows) {
	// 将 新增 的 Stream 注册到 indexdb 中
	if !pt.idb.hasStreamID(streamID) {
		streamTagsCanonical := streamTagsCanonicals[rowIdx]
		pt.idb.mustRegisterStream(streamID, streamTagsCanonical)
	}

	// 最后把 LogRows 写入到 datadb 中
	pt.ddb.mustAddRows(lr)
}

显然 Victoria Logs 的分区中主要由 索引数据库(indexdb)和 数据数据库(datadb)构成。下面就依次跟踪下 indexdb 和 datadb 的写入流程。

indexdb 写入 #

indexdb 用于存储 Stream 的索引信息,基于 MergeSet 实现。

代码片段
// lib/logstorage/indexdb.go
type indexdb struct {
	path string          // path 是索引数据库的目录路径:如 path/to/partition/indexdb 目录
	partitionName string // partitionName 是索引数据库所属的分区名称(天)
	tb *mergeset.Table   // tb 是索引数据库的存储,用于存储索引信息
	s *Storage 	         // s 是 indexdb 所属的 Storage
}
  1. 索引条目准备

mustRegisterStream 会写入三类索引条目:

  • tenantID:streamID:streamID 到 streamTagsCanonical 的映射
  • tenantID:streamID -> streamTagsCanonical:streamID 到 streamTagsCanonical 的映射
  • tenantID:name:value -> streamID:tag 到 streamID 的映射
代码片段
// lib/logstorage/indexdb.go
func (idb *indexdb) mustRegisterStream(streamID *streamID, streamTagsCanonical string) {
	// Register tenantID:streamID entry.
	bufLen := len(buf)
	buf = marshalCommonPrefix(buf, nsPrefixStreamID, tenantID)
	buf = streamID.id.marshal(buf)
	items = append(items, buf[bufLen:])

	// Register tenantID:streamID -> streamTagsCanonical entry.
	bufLen = len(buf)
	buf = marshalCommonPrefix(buf, nsPrefixStreamIDToStreamTags, tenantID)
	buf = streamID.id.marshal(buf)
	buf = append(buf, streamTagsCanonical...)
	items = append(items, buf[bufLen:])

	// Register tenantID:name:value -> streamIDs entries.
	tags := st.tags
	for i := range tags {
		bufLen = len(buf)
		buf = marshalCommonPrefix(buf, nsPrefixTagToStreamIDs, tenantID)
		buf = tags[i].indexdbMarshal(buf)
		buf = streamID.id.marshal(buf)
		items = append(items, buf[bufLen:])
	}
	
	// Add items to the storage
	idb.tb.AddItems(items)
}

这里以一个具体的例子来说明:

假设我们有一个 Stream {"tag1":"value1", "tag2":"value2"}, 其 tenantID 为 0, streamID 为 0x12345678(hi) 90abcdef(lo),那么会生成如下的索引条目:

- 0x00(indexType) 0x00000000(tenantID) 0x12345678 0x90abcdef(streamID)
- 0x01(indexType) 0x00000000(tenantID) 0x12345678 0x90abcdef(streamID) 0x02(tagCount) 0x04(tag1NameLen) "tag1"(tag1Name) 0x06(tag1ValueLen) "value1" 0x04(tag2NameLen) "tag2" 0x06 "value2"
- 0x02(indexType) 0x00000000(tenantID) 0x04(tagNameLen) "tag1"(tagName) 0x06(tagValueLen) "value1"(tagValue) 0x12345678 0x90abcdef (streamID)
- 0x02(indexType) 0x00000000(tenantID) 0x04(tagNameLen) "tag2"(tagName) 0x06(tagValueLen) "value2"(tagValue) 0x12345678 0x90abcdef (streamID)
  1. 写入内存块

这些索引条目会加入到 indexdb 的 mergeset.Table 存储引擎中,此引擎负责将索引条目写入到内存块(inmemoryBlock)中,这里内存块采用了分片管理,分片数量 和 cpu 逻辑核数(GOMAXPROCS) 正相关cpus * multiplier (multiplier = cpus, 最大 16)

并在合适的时机触发合并操作。

代码片段
type rawItemsShard struct {
	ibs []*inmemoryBlock
}

type inmemoryBlock struct {
    commonPrefix []byte // 公共前缀,减少重复存储
    data         []byte
    items        []Item
}

// github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset/encoding.go
func (ib *inmemoryBlock) Add(x []byte) bool {
	data := ib.data
	// 如果写入当前索引条目后,内存块大小超过了 maxInmemoryBlockSize(64KB),
	// 那么就不能写入当前索引条目,返回 false
	if len(x)+len(data) > maxInmemoryBlockSize {
		return false
	}

	dataLen := len(data)
	data = append(data, x...)
	ib.items = append(ib.items, Item{
		Start: uint32(dataLen),
		End:   uint32(len(data)),
	})
	ib.data = data
	return true
}

inmemoryBlock 合并时,会将前面的索引条目加入 data 和 items 中,后面的索引条目会加入到下一个内存块中,比如:

{
	data([]byte): [
		0x00(indexType) 0x00000000(tenantID) 0x12345678 0x90abcdef(streamID) // 26B
		0x01(indexType) 0x00000000(tenantID) 0x12345678 0x90abcdef(streamID) 0x02(tagCount) 0x04(tag1NameLen) "tag1"(tag1Name) 0x06(tag1ValueLen) "value1" 0x04(tag2NameLen) "tag2" 0x06 "value2" // 56B
	],
	items: [
		Item{Start: 0, End: 26},
		Item{Start: 26, End: 82},
	],
}
  1. 内存块合并

mergeset.Table 会将 inmemoryBlock 合并成一个更大的内存块(inmemoryPart), inmemoryPart 也会进一步合并并在合适的时机将其刷入到持久化磁盘中。

当单个 Shard 中的内存块超过 maxBlocksPerShard(256) 时,分片管理器会将这些内存块加入到自身的 ibsToFlush 中,等待合并操作。

代码片段
// github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset/table.go
func (riss *rawItemsShards) addIbsToFlush(tb *Table, ibsToFlush []*inmemoryBlock) {
	if len(ibsToFlush) == 0 {
		return
	}

	var ibsToMerge []*inmemoryBlock

	riss.ibsToFlushLock.Lock()
	if len(riss.ibsToFlush) == 0 {
		riss.updateFlushDeadline()
	}

	// 将当前分片的内存块加入到 ibsToFlush 中
	riss.ibsToFlush = append(riss.ibsToFlush, ibsToFlush...)

	// 如果待合并的内存块数量超过了 maxBlocksPerShard(256) * cpus 这一阈值,
	// 那么就需要进行合并操作,将这些内存块合并成一个更大的内存块(inmemoryPart)
	if len(riss.ibsToFlush) >= maxBlocksPerShard*cgroup.AvailableCPUs() {
		ibsToMerge = riss.ibsToFlush
		riss.ibsToFlush = nil
	}
	riss.ibsToFlushLock.Unlock()

	// 将内存块合并成一个更大的内存块(inmemoryPart)
	tb.flushBlocksToInmemoryParts(ibsToMerge, false)
}

flushBlocksToInmemoryParts 中将 ibsToMerge 按最多 defaultPartsToMerge(16) 大小分成多个 chunk,再把这些 chunk 合并成一个更大的内存块(inmemoryPart)。注意单个 inmemoryPart 大小不能超过 5% 的可用系统内存。

代码片段
func (tb *Table) flushBlocksToInmemoryParts(ibs []*inmemoryBlock, isFinal bool) {
	pws := make([]*partWrapper, 0, (len(ibs)+defaultPartsToMerge-1)/defaultPartsToMerge)

	// 按最多 defaultPartsToMerge(16) 大小分成多个 chunk,
	// 每个 chunk 合并成一个更大的内存块(inmemoryPart)
	for len(ibs) > 0 {
		n := defaultPartsToMerge
		if n > len(ibs) {
			n = len(ibs)
		}
		
		go func(ibsChunk []*inmemoryBlock) {
			if pw := tb.createInmemoryPart(ibsChunk); pw != nil {
				pws = append(pws, pw)
			}
		}(ibs[:n])
		ibs = ibs[n:]
	}

	// 处理所有的 chunk (partWrapper), 将其合并成一个更大的内存块(inmemoryPart)
	// 除非超过大小限制
	maxPartSize := getMaxInmemoryPartSize()
	for len(pws) > 1 {
		// 合并 inmemoryPart
		pws = tb.mustMergeInmemoryParts(pws)
		pwsRemaining := pws[:0]
		for _, pw := range pws {
			// 如果大小超过 maxPartSize,那么就直接加入到 inmemoryParts 中
			if pw.p.size >= maxPartSize {
				tb.addToInmemoryParts(pw, isFinal)
			} else {
				pwsRemaining = append(pwsRemaining, pw)
			}
		}
		pws = pwsRemaining
	}

	if len(pws) == 1 {
		tb.addToInmemoryParts(pws[0], isFinal)
	}
}

默认的可用内存 = 系统内存 * 60% (可以通过 memory.allowedPercent 配置)

合并不只是上面提到的部分,程序还会在后台对 inmemoryPart/filePart 进行合并。

  1. 持久化

经过合并后的 inmemoryPart 会被写入到磁盘中,如果满足以下条件,则会被刷入到磁盘中:

  • flushInterval 定时触发
  • part(inmemoryPart / filePart) 合并时
代码片段
// github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset/table.go
func (tb *Table) nextMergeIdx() uint64 {
	return tb.mergeIdx.Add(1)
}

// github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset/table.go
func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFinal bool) error {
	mergeIdx := tb.nextMergeIdx()
	dstPartPath := ""
	if dstPartType == partFile {
		// 合并后的 inmemoryPart 会被写入到磁盘中,文件名格式为 %016X,如 18804EBAD6A80650
		dstPartPath = filepath.Join(tb.path, fmt.Sprintf("%016X", mergeIdx))
	}

	// 只有一个 inmemoryPart 时,直接将其写入到磁盘中
	if isFinal && len(pws) == 1 && pws[0].mp != nil {
		mp := pws[0].mp
		mp.MustStoreToDisk(dstPartPath)
		// 打开新创建的 filePart
		pwNew := tb.openCreatedPart(pws, nil, dstPartPath)
		// 将合并后的 inmemoryPart 加入到 inmemoryParts 中
		tb.swapSrcWithDstParts(pws, pwNew, dstPartType)
		return nil
	}
}

可以看到,inmemoryPart 会被写入到磁盘目录,目录格式为 %016X(如 18804EBAD6A80650)。实际写入文件的逻辑在 mp(*inmemoryPart).MustStoreToDisk 方法中。

代码片段
// github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset/inmemory_part.go
func (mp *inmemoryPart) MustStoreToDisk(path string) {
	fs.MustMkdirFailIfExist(path)

	metaindexPath := filepath.Join(path, metaindexFilename)  // metaindex.bin
	indexPath := filepath.Join(path, indexFilename)          // index.bin
	itemsPath := filepath.Join(path, itemsFilename)          // items.bin
	lensPath := filepath.Join(path, lensFilename)            // lens.bin

	var psw filestream.ParallelStreamWriter
	psw.Add(metaindexPath, &mp.metaindexData)
	psw.Add(indexPath, &mp.indexData)
	psw.Add(itemsPath, &mp.itemsData)
	psw.Add(lensPath, &mp.lensData)

	// 并发执行前面添加的刷写任务
	psw.Run()

	mp.ph.MustWriteMetadata(path) // 写入 parts.json 文件

	fs.MustSyncPathAndParentDir(path)
}

小结:

indexdb 用于存储 Stream 索引信息,位于 path/to/partitions/indexdb。索引包括:tenantID → streamIDstreamID → streamTagsCanonical、以及标签(tag)到 streamID 的映射。

indexdb 的存储按照 part 单位进行管理,每个 part 对应一个文件夹(18804EBAD6A80650),每个文件夹中包含如下的文件:

文件名 描述
metadata.json (Part) 索引元数据: {索引数量,块数量, 第一个项的索引, 最后一个项的索引}
metaindex.bin 存储 metaindexRow(用来索引 indexBlock 或者 blockHeaders)
index.bin 存储 blockHeader (commonPrefix、items 数量、索引 items 偏移、索引 lens 偏移等)
items.bin (Block) stream 索引 items 数据(会使用 commonPrefix 手段进行压缩)
lens.bin (Block) lens 索引 items 长度信息(用于支持 commonPrefix 压缩)

datadb 写入 #

datadb 用于存储实际的日志数据。

继续跟踪 pt.ddb.mustAddRows,可以看到 ddb(*datadb).mustFlushLogRows 是实际写入 datadb 的方法。

代码片段
// lib/logstorage/datadb.go
func (ddb *datadb) mustFlushLogRows(lr *logRows) {
	mp.mustInitFromRows(lr)
	p := mustOpenInmemoryPart(ddb.pt, mp)

	// 将 inmemoryPart 加入到 datadb.inmemoryParts 中
	ddb.inmemoryParts = append(ddb.inmemoryParts, pw)
	// 触发 inmemoryParts 合并
	ddb.startInmemoryPartsMergerLocked()
}

其中 mp.mustInitFromRows(lr) 方法会将 logRows 转换为 inmemoryPart:

代码片段
// lib/logstorage/datadb.go
func (mp *inmemoryPart) mustInitFromRows(lr *logRows) {
	sort.Sort(lr)         // 根据 streamID 排序,再按 timestamp 排序
	lr.sortFieldsInRows() // 根据 field 的 Name 进行排序
	
	// 将 inmemoryPart 的各种 buffer 赋值给 blockStreamWriter,
	// 后续 bsw.Finalize 实际上也是将 bsw 的数据写入到 inmemoryPart 中
	bsw.MustInitForInmemoryPart(mp)

	var sidPrev *streamID

	uncompressedBlockSizeBytes := uint64(0)
	timestamps := lr.timestamps
	rows := lr.rows
	streamIDs := lr.streamIDs

	for i := range timestamps {
		streamID := &streamIDs[i]
		if sidPrev == nil {
			sidPrev = streamID
		}
		
		// 注意:将相同流的日志写入同一个 block 中,如果超过 maxUncompressedBlockSize (2MB) 则会写入下一个 block
		// 看 blockStreamWriter.MustWriteRows 方法
		if uncompressedBlockSizeBytes >= maxUncompressedBlockSize || !streamID.equal(sidPrev) {
			bsw.MustWriteRows(sidPrev, trs.timestamps, trs.rows)
			trs.reset()
			sidPrev = streamID
			uncompressedBlockSizeBytes = 0
		}

		fields := rows[i]
		trs.timestamps = append(trs.timestamps, timestamps[i])
		trs.rows = append(trs.rows, fields)
		uncompressedBlockSizeBytes += uint64(EstimatedJSONRowLen(fields))
	}
	bsw.MustWriteRows(sidPrev, trs.timestamps, trs.rows)

	// 将 bsw 中的数据更新到 inmemoryPart
	bsw.Finalize(&mp.ph)	
}

// lib/logstorage/block_stream_writer.go
func (bsw *blockStreamWriter) MustWriteRows(sid *streamID, timestamps []int64, rows [][]Field) {
	if len(timestamps) == 0 {
		return
	}

	b := getBlock()

	// !!! 这里把原始日志的字段写入到 block 中
	b.MustInitFromRows(timestamps, rows)
	bsw.MustWriteBlock(sid, b)
	putBlock(b)
}


type block struct {
	timestamps []int64
	columns []column     // 所有 row 会根据按列进行存储
	constColumns []Field // 所有 row 这一列的值相同就可以存储在 constColumns 中,但值不能超过 256 字节
}

小结:

在这个环节,日志数据从行格式(logRows)转换为列格式(inmemoryPart),主要经历以下步骤:

  1. logRows 排序:同一 stream 的数据相邻并按时间排序,同时对 Fields 依据 Field.Name 进行排序,便于生成 block
  2. 将排序后的 logRows 中相同 stream 的数据写入一个或多个 block(每个 block 最大 maxUncompressedBlockSize(2MB));
  3. block 包含常量列(所有行该字段值相同)与普通列(各行值不完全相同);普通列保存所有行的该字段值(缺失则为空);
  4. block 写入 inmemoryPart,合并后刷入磁盘。

每个 part 包含下面几种文件:

文件名 存储内容
metadata.json (Part) part 元信息:记录数、格式版本、时间范围等
metaindex.bin (Part) 索引 indexBlockHeader (一组 block)
index.bin (Part) 索引 blockHeader
column_names.bin (Part) 中所有的列和列ID
column_idxs.bin (Block) column 的 bloom/values 存在的 shardID
columns_header_index.bin (Block) 索引 columnID 到 columnHeader 的偏移量
columns_header.bin (Block) columnHeader 数据
timestamps.bin (Block) 时间戳数据
message_bloom.bin (Block) 消息的布隆过滤器
message_values.bin (Block) 实际的日志消息
bloom.bin{shard} (Block) 普通列的布隆过滤器,用于快速判断是否包含某个消息
values.bin{shard} (Block) 普通列的实际值

其中 {shardIdx} 是分片索引,从 0 开始,代表了 bloom 和 values 在这个 part 中的分片数量。

Log 中的列名会存储在 column_names.bin 中,每个列名会有一个唯一的 ID,这个 ID 会在 column_idxs.bin 中存储。

读取流程 #

经过对写入流程的梳理,我们再回头来看读取流程。从 RunQuery 开始追踪:

Block 过滤 #

代码片段
// lib/logstorage/storage_search.go
func (s *Storage) runQuery(qctx *QueryContext, writeBlock writeBlockResultFunc) error {
	// 从查询条件中解析出查询需要的参数:
	// type storageSearchOptions struct {
	// 	tenantIDs          []TenantID
	// 	streamIDs          []streamID
	// 	minTimestamp       int64
	// 	maxTimestamp       int64
	// 	streamFilter       *StreamFilter
	// 	filter             filter
	// 	fieldsFilter       *prefixfilter.Filter
	// 	hiddenFieldsFilter *prefixfilter.Filter
	// 	timeOffset         int64
	// }
	sso := s.getSearchOptions(qctx.TenantIDs, q, qctx.HiddenFieldsFilters)
	s.searchParallel(workersCount, sso, qctx.QueryStats, stopCh, writeBlockToPipes)
}

// lib/logstorage/storage_search.go
func (s *Storage) searchParallel(workersCount int, sso *storageSearchOptions, qs *QueryStats, stopCh <-chan struct{}, writeBlock writeBlockResultFunc) {
	// ...

	// 启动多个 blockSearch 协程,每个协程从 workCh 中取出一个 blockSearchWorkBatch 进行搜索
	workCh := make(chan *blockSearchWorkBatch, workersCount)
	for workerID := 0; workerID < workersCount; workerID++ {
		go func(workerID uint) {
			for bswb := range workCh {
				for i := range bsws {
					// bs blockSearch 代表对一个 block 进行搜索
					bs.search(qsLocal, bsw, bm)
					if bs.br.rowsLen > 0 {
						writeBlock(workerID, &bs.br)
					}	
				}
			}
		}(uint(workerID))
	}

	// 根据时间范围确定需要查询的 Partition
	ptws, ptwsDecRef := s.getPartitionsForTimeRange(sso.minTimestamp, sso.maxTimestamp)
	defer ptwsDecRef()

	// 并发查询每个 Partition
	for i, ptw := range ptws {
		go func(idx int, pt *partition) {
			psfs[idx] = pt.search(sso, qsLocal, workCh, stopCh)
		}(i, ptw.pt)
	}
}

// lib/logstorage/storage_search.go
func (pt *partition) search(sso *storageSearchOptions, qs *QueryStats, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) partitionSearchFinalizer {
	// 如果查询条件中包含了 _streamFilter 那么从 indexdb 中获取到对应的 streamID
	pso := pt.getSearchOptions(sso)
	// 在 datadb 中执行查询
	return pt.ddb.search(pso, qs, workCh, stopCh)
}

// lib/logstorage/datadb_search.go
func (ddb *datadb) search(pso *partitionSearchOptions, qs *QueryStats, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) partitionSearchFinalizer {
	// 按照时间范围确定需要查询的 Part
	pws, pwsDecRef := ddb.getPartsForTimeRange(pso.minTimestamp, pso.maxTimestamp)

	// Apply search to matching parts
	for _, pw := range pws {
		pw.p.search(pso, qs, workCh, stopCh)
	}

	return pwsDecRef
}

// lib/logstorage/storage_search.go
func (p *part) search(pso *partitionSearchOptions, qs *QueryStats, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) {
	bhss := getBlockHeaders()
	if len(pso.tenantIDs) > 0 {
		p.searchByTenantIDs(pso, qs, bhss, workCh, stopCh)
	} else {
		// 这个方法是对 part 中的 index
		p.searchByStreamIDs(pso, qs, bhss, workCh, stopCh)
	}
	putBlockHeaders(bhss)
}

从上述代码可以清晰看出,Victoria Logs 的前三层结构在逐层缩小查询范围以提高效率,并发查询每个 partition 和 part 以加速检索。

其中 part.searchByStreamIDs 的作用是基于 streamIDs 过滤出符合条件的 blockblockHeader):要求 streamID 落在目标集合内且时间范围命中目标区间,并将其加入 workCh

根据 streamFilter 获取 streamIDs 的代码片段如下。以 _stream: {service="test-app"} 为例,对应的等值查询方法为 getStreamIDsForNonEmptyTagValue

代码片段
func (is *indexSearch) getStreamIDsForNonEmptyTagValue(tenantID TenantID, tagName, tagValue string) map[u128]struct{} {
	ids := make(map[u128]struct{})
	
	ts := &is.ts
	kb := &is.kb

	// 构建查询前缀:nsPrefixTagToStreamIDs + tenantID + tagName + tagValue
	kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToStreamIDs, tenantID)
	kb.B = marshalTagValue(kb.B, bytesutil.ToUnsafeBytes(tagName))
	kb.B = marshalTagValue(kb.B, bytesutil.ToUnsafeBytes(tagValue))
	prefix := kb.B

	// 找到第一条以 prefix 开头的记录
	ts.Seek(prefix)
	for ts.NextItem() {
		if !bytes.HasPrefix(item, prefix) {
			break
		}

		// 解析 streamID 并更新结果
		tail := item[len(prefix):]
		sp.UpdateStreamIDs(ids, tail)
	}

	return ids
}

这里的细节:

  1. indexBlockHeader(s)metaindex.bin 加载;
  2. 第一次过滤:通过 indexBlockHeader 排除不包含目标 streamID 和时间范围的索引块;
  3. 加载 blockHeader:从 index.bin 读取匹配索引块中的所有 blockHeader
  4. 第二次过滤:依据 blockHeader 再次排除不包含目标 streamID 和时间范围的块。
代码片段
// indexBlockHeader 包含索引是覆盖了多个 block
type indexBlockHeader struct {
	streamID streamID       // 代表的是覆盖的 block 中最小的 streamID
	minTimestamp int64      // 代表的是覆盖的 block 中最小的 timestamp
	maxTimestamp int64      // 代表的是覆盖的 block 中最大的 timestamp
	indexBlockOffset uint64 // 代表的是在 indexFilename(index.bin) 中的指向存储数据位置的偏移量
	indexBlockSize uint64   // 代表的是在 indexFilename(index.bin) 中存储数据的大小
}

// blockHeader 包含了单个 block 的元数据信息
type blockHeader struct {
	streamID streamID       // 代表的是 block 中存储的日志条目的 streamID
	uncompressedSizeBytes uint64 // 代表的是 block 中存储的日志条目的原始(未压缩)大小
	rowsCount uint64 // 代表的是 block 中存储的日志条目的数量
	timestampsHeader timestampsHeader // timestampsHeader 包含了 block 中存储的日志条目的时间戳信息
	columnsHeaderIndexOffset uint64 // 代表的是 columnsHeader 在 columnsHeaderIndexFilename(columns_header_index.bin) 中的偏移量
	columnsHeaderIndexSize uint64 // 代表的是 columnsHeader 在 columnsHeaderIndexFilename(columns_header_index.bin) 中的大小
	columnsHeaderOffset uint64 // 代表的是 columnsHeader 在 columnsHeaderFilename(columns_header.bin) 中的偏移量
	columnsHeaderSize uint64 // 代表的是 columnsHeader 在 columnsHeaderFilename(columns_header.bin) 中的大小
}

type timestampsHeader struct {
	blockOffset uint64 // 代表的是 block 在 timestampsFilename(timestamps.bin) 中的偏移量
	blockSize uint64 // 代表的是 block 在 timestampsFilename(timestamps.bin) 中的大小

	minTimestamp int64 // 代表的是 block 中存储的日志条目的最小时间戳
	maxTimestamp int64 // 代表的是 block 中存储的日志条目的最大时间戳

	marshalType encoding.MarshalType // 代表的是 block 中存储的日志条目的时间戳信息的编码类型
}

经过上面的层层过滤 (按照 时间范围 + streamID 列表),确定了哪些分区的哪些 part,以及 part 中需要进一步筛选的 Block(blockHeader) 列表。这些还需要查找过滤的 Block(blockHeader) 会被打包成 blockSearchWorkBatch 任务,发送到 workCh 中, 让 blockSearch 执行具体的查询。

我们也能得出这样的结论:VictoriaLogs 在存储时,在每一层都为时间 和 streamID 过滤在索引中做了设计,进而在查询时带上适当时间范围 和 streamFilter 可以极大的缩小查询范围,从而提高查询效率。

Block 匹配 #

blockSearch 是在 Storage.searchParallel 启动的工作协程中运行,依次从 workCh 中获取 blockSearchWorkBatch,然后进行进行查询。

代码片段
// lib/logstorage/storage_search.go
func (bs *blockSearch) search(qs *QueryStats, bsw *blockSearchWork, bm *bitmap) {
	bs.reset()

	bs.qs = qs
	bs.bsw = bsw

	// bitmap 用来存储 block 中命中的 log entry 的‘索引’
	bm.init(int(bsw.bh.rowsCount))
	// 将所有的位标记为 1(初始状态,代表所有行都需要检测)
	bm.setBits()

	// filter 是一个接口,用来实现 log entries 的过滤
	// 不仅有 = (filterExact) 这种简单的过滤,还支持逻辑表达: 且(filterAnd) 这样可以组织成一个复杂的逻辑 filter 
	bs.bsw.pso.filter.applyToBlockSearch(bs, bm)

	// 如果没有命中任何 log entry,直接返回
	if bm.isZero() {
		return
	}

	// 将 block 中的
	bs.br.mustInit(bs, bm)

	// 获取需要的列,通过 “ | fields level, streamID, timestamp, message” 这种方式来指定
	bs.br.initColumns(bsw.pso.fieldsFilter)
}

type filter interface {
	String() string // 返回 filter 的字符串表示
	updateNeededFields(pf *prefixfilter.Filter)
	matchRow(fields []Field) bool

	// 即根据 filter 过滤出 bs 中命中的 log entry 的‘索引’,并更新到 bm 中 (标记或者取消标记)
	applyToBlockSearch(bs *blockSearch, bm *bitmap)

	// 即根据 filter 过滤出 br 中命中的 log entry 的‘索引’,并更新到 bm 中(标记或者取消标记)
	applyToBlockResult(br *blockResult, bm *bitmap)
}

这里再进一步深入就是各种 filter 的实现了,因此不再展开,这里只关注下 Block 内是怎么对 列式存储结构 进行检索的?

这里以 filterExact 为例,来展示下 Block 是怎么对 列式存储结构 进行检索比较的。

举例:filterExact 匹配
// filterExact 匹配指定字段的精确值
//
// Example LogsQL: `fieldName:exact("foo bar")` of `fieldName:="foo bar"`
type filterExact struct {
	fieldName string // 代表的是需要匹配的字段名
	value     string // 代表的是需要匹配的字段值

	tokens       []string
	tokensHashes []uint64
}

// lib/logstorage/filter_exact.go
func (fe *filterExact) applyToBlockSearch(bs *blockSearch, bm *bitmap) {
	fieldName := fe.fieldName
	value := fe.value

	// 先判断是否是 const 列 (所有行这一列的值都是相同的)。
	// 如果是,可以直接比较 value 是否相等,且只需要比较一次就能确定是否命中
	v := bs.getConstColumnValue(fieldName)
	if v != "" {
		if value != v {
			bm.resetBits()
		}
		return
	}

	// 如果不是 const 列,检查列是否存在,并获取到 column header:
	// 1. `column_names.bin` 提供 column_name 和 column_id 的映射
	// 2. `columns_header_index.bin` 提供 column_id 到 column header 的偏移量映射
	// 3. `column_idxs.bin` 提供 column_name 到 bloom/values shardId 的映射
	ch := bs.getColumnHeader(fieldName)
	if ch == nil {
		// 列不存在的情况下,如果查询值不是空,那么需要重置 bitmap
		if value != "" {
			bm.resetBits()
		}
		return
	}

	tokens := fe.getTokensHashes()

	switch ch.valueType {
	case valueTypeString:
		matchStringByExactValue(bs, ch, bm, value, tokens)
	case valueTypeDict:
		matchValuesDictByExactValue(bs, ch, bm, value)
	case valueTypeUint8:
		matchUint8ByExactValue(bs, ch, bm, value, tokens)
	// ... 省略其他数值类型
	case valueTypeIPv4:
		matchIPv4ByExactValue(bs, ch, bm, value, tokens)
	case valueTypeTimestampISO8601:
		matchTimestampISO8601ByExactValue(bs, ch, bm, value, tokens)
	default:
		logger.Panicf("FATAL: %s: unknown valueType=%d", bs.partPath(), ch.valueType)
	}
}

// columnHeader 代表列的元数据信息, 它一定只对应一个 Block 中的一列
// 注意如果 name 为空,那么它代表的 message(_msg) 列
type columnHeader struct {
	name string

	valueType valueType // 列中存储的值的类型

	minValue uint64 // 列中存储的最小值, 用于快速判断是否在给定的范围内。适用于 uint*, ipv4, timestamp 和 float64 类型
	maxValue uint64 // 列中存储的最大值, 用于快速判断是否在给定的范围内。适用于 uint*, ipv4, timestamp 和 float64 类型
	valuesDict valuesDict // 列中存储的唯一值字典, 适用于 valueType = valueTypeDict 类型

	valuesOffset uint64 // 列中存储的 values 的偏移量, 用于快速定位到 values.bin 中的对应位置
	valuesSize uint64   // 列中存储的 values 的大小, 用于快速定位到 values.bin 中的对应位置

	bloomFilterOffset uint64 // 列中存储的 bloom filter 的偏移量, 用于快速定位到 bloom.bin 中的对应位置
	bloomFilterSize uint64   // 列中存储的 bloom filter 的大小, 用于快速定位到 bloom.bin 中的对应位置
}

到这里也只是完成了查询 column header 的过程,接下来需要根据 column header 判断具体命中了哪些行:

values 匹配
// lib/logstore/filter_exact.go
func matchStringByExactValue(bs *blockSearch, ch *columnHeader, bm *bitmap, value string, tokens []uint64) {
	// 先判断 bloom filter 是否命中, 如果 bloom filter 不命中, 那么直接返回
	if !matchBloomFilterAllTokens(bs, ch, tokens) {
		bm.resetBits()
		return
	}

	visitValues(bs, ch, bm, func(v string) bool {
		return v == value
	})
}

// lib/logstore/filter_phrase.go
func visitValues(bs *blockSearch, ch *columnHeader, bm *bitmap, f func(value string) bool) {
	if bm.isZero() {
		// Fast path - nothing to visit
		return
	}

	// 根据 列头 来获取 values
	// 1. 列名 确定 再 values.bin 的哪一个分片
	// 2. 再根据 columnHeader 存储的 valuesOffset 和 valuesSize 来读取 values
	values := bs.getValuesForColumn(ch)
	bm.forEachSetBit(func(idx int) bool {
		return f(values[idx])
	})
}

到这里整个查询过程也差不多完成了,剩下的部分就是根据命中的行索引来获取需要返回的 fields 的值了。这里限于篇幅,就不展开了。

小结:

总结一下查询过程:

  1. 根据查询条件,确定需要查询的分区和 part;
  2. 并发查询每个分区的每个 part,查询时根据 indexBlockHeader 过滤出可能命中的 block;
  3. 对每个命中的 block:
    • 先检查匹配的字段是不是常量列,如果是,那么判断一次即可;
    • 如果是普通列,那么先获取列头,检查列是否存在,如果不存在,那么也可以直接返回;
    • 如果列存在,也不直接加载 values,而是先判断 bloom filter 是否未命中,如果未命中,那么代表该 block 中一定不存在该值,直接返回;
    • 最后遍历这一列所有的 values,这里也不会遍历所有的 values,而是只遍历 bitmap 中设置为 1 的位置(这些位置可能是前置 filter 已经标记为命中的位置);

存储模型 #

自己本地启动一个 Victoria Logs,再写入数据,可以直观的看到存储引擎的目录结构如下:

代码片段
/storage/
 ├── partitions/
 │   ├── 20251210/
 │   │   ├── indexdb
 │   │   │   ├── 18804EBAD6A6ECA1
 │   │   │   │    ├── index.bin
 │   │   │   │    ├── items.bin
 │   │   │   │    ├── lens.bin
 │   │   │   │    ├── metadata.json
 │   │   │   │    └── metaindex.bin
 │   │   │   ├── 18804EBAD6A6ECB9
 │   │   │   ├── ...
 │   │   │   ├── 18804EBAD6A6ECB8
 │   │   │   └── parts.json
 │   │   │
 │   │   └── datadb/
 │   │       ├── 18804EBAD6A80650
 │   │       │    ├── bloom.bin0
 │   │       │    ├── bloom.bin1
 │   │       │    ├── bloom.bin2
 │   │       │    ├── bloom.bin3
 │   │       │    ├── bloom.bin4
 │   │       │    ├── column_idxs.bin
 │   │       │    ├── column_names.bin
 │   │       │    ├── columns_header.bin
 │   │       │    ├── columns_header_index.bin
 │   │       │    ├── index.bin
 │   │       │    ├── message_bloom.bin
 │   │       │    ├── message_values.bin
 │   │       │    ├── metadata.json
 │   │       │    ├── metaindex.bin
 │   │       │    ├── timestamps.bin
 │   │       │    ├── values.bin0
 │   │       │    ├── values.bin1
 │   │       │    ├── values.bin2
 │   │       │    ├── values.bin3
 │   │       │    └── values.bin4
 │   │       │
 │   │       ├── 18804EBAD6A80FF6
 │   │       ├── ...
 │   │       ├── 18804EBAD6A81004
 │   │       └── parts.json
 │   │
 │   │── 20251211/
 │   │── ...
 │   └── 20251213/
└── flock.lock

其实从上面的文件目录结构可以看出来,VictoriaLogs 存储上分成了三层:

  • Storage 这是存储引擎的最顶层,所有数据都存储在这个目录下。
  • Partition 这是存储引擎的第二层,每个 Partition 对应一个日期,这个时间范围内的数据按照 Part 进行存储。
  • Part 这是存储引擎的第三层,每个 Part 代表的是合并压缩后的数据,其中存储了具体的数据和索引信息。
  • Block 可以认为是第四层,每个 Block 代表的是同一一个 Stream 的数据(物理上相邻)。多个 Block 组成了一个 Part。

每个 Partition 都包含了 indexdb 和 datadb:

  • indexdb 保存 stream 的索引信息,包括:标签、tenantID、streamID 之间的映射关系。
  • datadb 日志数据,每个 stream 会有一个唯一的 ID,同一个 stream 的数据在物理上相近保存;datadb 存储的数据是列格式的,分为 message, timestamp,columns 三种类型分开存储;使用了 bloom 过滤器来快速判断是否存在某个字段值。

对应的存储模型如下图所示:

VictoriaLogs 存储模型
VictoriaLogs 存储模型

总结 #

通过深入 VictoriaLogs 的写入和查询过程,可以发现它的存储引擎做了针对 日志场景 做了很多优化来提高查询效率和节省存储开销:

  • 按照日期设计分区,如果查询特定时间范围内,可以只查询涉及到的分区;
  • 分区按照 part 进行划分,part 上设计了 最小 streamID 和 覆盖的时间的范围,用于帮助过滤;
  • 分区内提供了 indexBlockHeader(代表多个 blockHeader)和 blockHeader 进一步帮助过滤,减小扫描范围;
  • Block 中将列分成了 const column 和 普通 column,const column 存储时只需要存储一个值,比较时也只需要比较一次就能确定 block 是否命中;
  • 将同一个 Stream 数据相邻存储;
  • 普通列的值存储在一起,读取时顺序读取,速度更快;
  • 使用 bitmap 在 filter 中传递,避免重复检测已经确定未命中的行;
  • 使用 bloom filter 来快速判断未命中,减少不必要的读取 values;
  • 查询时分区 和 part 都是并发查询的;
  • 采用 ZSTD 压缩存储;

除了设计上的优化,还有很多 golang 实现上的优化:

  • 随处可见的 sync.Pool 用来复用对象,避免频繁分配和释放内存;
  • slice 同样也被复用;
  • 充分利用 goroutine 并发查询,提高查询效率;

水平有限,如有错误,欢迎勘误指正 🙏

参考 #

访问量 访客数