Influxdb中基于磁盘的倒排索引文件TSI结构解析

栏目: 数据库 · 发布时间: 5年前

内容简介:从上面的定义我们可以得到两点收获:基础上是按照其在文件中的结构定义的,记录了measurement包括的tagset和series id信息;简单讲就是:

TSI文件结构概览

tsdb/index/tsi1/index_file.go
IndexFileTrailer
IndexFileTrailerSize = IndexFileVersionSize +
        8 + 8 + // measurement block offset + size
        8 + 8 + // series id set offset + size
        8 + 8 + // tombstone series id set offset + size
        8 + 8 + // series sketch offset + size
        8 + 8 + // tombstone series sketch offset + size
        0

从上面的定义我们可以得到两点收获:

  1. 这个IndexFileTrailerSize在TSI文件结尾处有固定大小(82bytes),我们在解析TSI文件时,很容易读到并解析这个Trailer;
  2. 我们可以知道这个TSI文件都包含哪些Section, 下图是TSI文件结构
    2.1 Trailer部分
    2.2 series id set block
    2.3 tombstone series id set block
    2.4 series sketch block
    2.5 tombstone series sketch block
  • 下面我们就分别来看一下各组成部分

Measurement block

tsdb/index/tsi1/measurement_block.go
Trailer
type MeasurementBlock struct {
    data     []byte
    hashData []byte

    // Measurement sketch and tombstone sketch for cardinality estimation.
    sketchData, tSketchData []byte

    version int // block version
}

基础上是按照其在文件中的结构定义的,记录了measurement包括的tagset和series id信息;

  • 我们来看一张完整的结构图
Influxdb中基于磁盘的倒排索引文件TSI结构解析

influxdb_measurement_block_in_tsi.png

  • 一图抵千言
    1. Trailer 部分是整个MeasuermentBlock的索引,存储着其他部分的offset和size
    2. Data block set 部分是所有 MeasurementBlockElement 的集合,
      2.1 measurement 基本属性,比如name等;
      2.2 对应的tag set在文件中的offset和size;
      2.3 包括的所有series id信息, 这个series id有两种表示方式:roaring bitmap和 数组,flag指示了用哪种表示方法
    3. hash index部分 :以hash索引的方式存储了 MeasurementBlockElement 在文件中的offset, 可以在不用读取整体的tsi文件的前提下,快速定位对某个measurementblockElement的文件位置,然后读取并解析
    4. tombstome sketchmesurement sketch 是使用 HyperLogLog++ 算法来作基数统计用。
  • 代码里还提供了很多的序列化和反序列化,遍历等方法,这里不再累述。

Tag Block

  • 定义在 tsdb/index/tsil/tag_block.go

  • 它由 trailer , hash index , tag key block , tag value block 四个部分组成

  • 我们来看一张完整的结构图

    Influxdb中基于磁盘的倒排索引文件TSI结构解析

    influxdb_tag_block_detail_in_tsi.png

  • 一图抵千言

    1. Trailer 部分相当于这个tag block的meta 信息,主要保存其它各组成部分的offset和大小。
    2. Hash index 部分,可以通过 tag key快速定位到tag key block的offsset;
    3. tag key block 部分的 hash indxe 部分,可以通过tag value快速定位到tag value block部分, Data offset, Data size 部分指向了当前tag key对应的所有的tag value block文件区域;
    4. 简言之,这就是个多级索引表,一级找一级;
  • 代码里还提供了很多的序列化和反序列化,遍历等方法,这里不再累述;

  • 我们来看一下tabblock的encode过程,我们用伪码来写一下:

//遍历每个tag key
for tagkey in tagkeys {
   每个tag key对应多个tag value,遍历
   每个tag key都生成一个tag key entry对象,记录下tag key entry的offset,然后将这个tag key entry放入数组TagKeyEntrys备用
   for tagValue in tagValuesBytagKey {
      buildTagValueBlock
      写这个tagValueBlock的offset和tag value入hash表
   }
   写入这个tag value到tagValueBlockOffset的hash表
}

 for tagkeyEntry in TagKeyEntrys {
   构建tagKeyBlock
   写这个tagKeyBlock的offset和tag key入hash表
}
写入这个tag key到tagKeyBlock Offset的hash表

简单讲就是:

  1. 构建一系列tag value block, 同时准备好TagKeyEntry组数;
  2. 根据 1 中的TabKeyEntry构建一系列tab key block, 同时准备好[tag key] -> [tag key block offset]的map;
  3. 根据 2 中的 map 建hash index。

IndexFile

  • 定义在 tsdb/index/tsil/index_file.go
type IndexFile struct {
    wg   sync.WaitGroup // ref count
    data []byte

    // Components
    sfile *tsdb.SeriesFile // 对应的Seriesile文件
    tblks map[string]*TagBlock //包含的所有TagBlock
    mblk  MeasurementBlock //MeasurementBlock

    // Raw series set data.
    seriesIDSetData          []byte
    tombstoneSeriesIDSetData []byte

    // Series sketch data.
    sketchData, tSketchData []byte

    // Sortable identifier & filepath to the log file.
    level int
    id    int

    mu sync.RWMutex
    // Compaction tracking.
    compacting bool

    // Path to data file.
    path string
}
  • 我们看一下结构图:
Influxdb中基于磁盘的倒排索引文件TSI结构解析

influxd_series_index.png

  • Trailer 部分是其meta信息,里面是其他block的offset和size;
  • Trailer 部分的信息可以定位到 Measurement block 部分;
  • Measurement block 部分有hash index,根据measurement name可以快速定位到其 tab block 部分
  • tag block 中可以根据tag key定位到 tab key block 部分;
  • tag key block 部分又按hash indexd快速定位到 tag value;

IndexFiles

  • 代表了一个layer上所有的index file,定义如下:
type IndexFiles []*IndexFile
  • 提供了一系列的iterator操作,按measurement name来汇集了所有index文件中的measurement, tagkey, tagvalue, series id set等,且作了排序
func (p IndexFiles) MeasurementIterator() MeasurementIterator
func (p *IndexFiles) TagKeyIterator(name []byte) (TagKeyIterator, error)
func (p IndexFiles) MeasurementSeriesIDIterator(name []byte) tsdb.SeriesIDIterator
func (p IndexFiles) TagValueSeriesIDSet(name, key, value []byte) (*tsdb.SeriesIDSet, error)
  • 最主要的是提供了CompactTo方法,将其包含的所有index文件合并成一个

    利用上面的一系列iterator方法,依次写入tag block, measurement block等,这里不累述。

FileSet

  • 是File类型的集合,这个 File类型可能是LogFile,也可能是 IndexFile,功能是上面的 IndexFiles 类似,定义如下:
type FileSet struct {
    levels       []CompactionLevel
    sfile        *tsdb.SeriesFile
    files        []File // 按最后更改时间从小到大排列
    manifestSize int64 // Size of the manifest file in bytes.
}
  • 提供了一系列的iterator操作,按measurement name来汇集了所有index文件中的measurement, tagkey, tagvalue, series id set等,且作了排序
  • 文件替换操作, 参数中 oldFiles 是fs.files的一部分,即当前正在被compat的文件列表,这个方法的目的是将这oldFiles列表从fs.files中删除,然后在 oldFiles 原来开始的位置插入这个 newFile , 这个 newFile 就是compact之后新生成的文件。
func (fs *FileSet) MustReplace(oldFiles []File, newFile File) *FileSet {
    assert(len(oldFiles) > 0, "cannot replace empty files")

    // Find index of first old file.
    var i int
    for ; i < len(fs.files); i++ {
        if fs.files[i] == oldFiles[0] {
            break
        } else if i == len(fs.files)-1 {
            panic("first replacement file not found")
        }
    }

    // Ensure all old files are contiguous.
    for j := range oldFiles {
        if fs.files[i+j] != oldFiles[j] {
            panic(fmt.Sprintf("cannot replace non-contiguous files: subset=%+v, fileset=%+v", Files(oldFiles).IDs(), Files(fs.files).IDs()))
        }
    }

    // Copy to new fileset.
    other := make([]File, len(fs.files)-len(oldFiles)+1)
    copy(other[:i], fs.files[:i])
    other[i] = newFile
    copy(other[i+1:], fs.files[i+len(oldFiles):])

    // Build new fileset and rebuild changed filters.
    return &FileSet{
        levels: fs.levels,
        files:  other,
    }
}

Partition

  • 它包含层级结构的index files和一个LogFile,其中这个层级结构的index files就是L1-l7的tsi,这个LogFile就是tsl(它算作是L0),在磁盘上的目录结构上,它位于每个shard目录下。一个partiton下包含有一个tsl文件,若干tsi文件和一个MANIFEST文件。
  1. tsl:就是WAL,前面已经介绍过,新写入的index信息除了在内存里缓存外,还会以LogEntry的形式写入这个tsl,作故障恢复时用。
  2. L0层LogFile会定期compact到L1, L1-L6会定期向高层作compact, compact的过程其实就是将相同measurement的tagbock作在一起,相同measurement的相同tagkey对应的所有tagvalue放在一起, 相同measurement的相同tagkey又相同tagvalue的不同series id作合并后放在一起。
  • 我们重点看一下compact方法:
func (p *Partition) compact() {
    if p.isClosing() {
        return
    } else if !p.compactionsEnabled() {
        return
    }
    interrupt := p.compactionInterrupt

    fs := p.retainFileSet()
    defer fs.Release()

    //influxdb的每个partition中tsi的层次是固定的L0-L7,其中L0是wal,这个方法不涉及它的compact
    //L7为最高层,它也不会再被compact了
    //所以这个compact方法需要处理的是L1-L6层
    minLevel, maxLevel := 1, len(p.levels)-2
    for level := minLevel; level <= maxLevel; level++ {
        //如果正在被compact则跳过
        if p.levelCompacting[level] {
            continue
        }

        // 获取当前level中的相邻的index文件列表,按文件更改时间由新到旧排,每次最多compact两个文件,少于两个的不作compact
        files := fs.LastContiguousIndexFilesByLevel(level)
        if len(files) < 2 {
            continue
        } else if len(files) > MaxIndexMergeCount {
            files = files[len(files)-MaxIndexMergeCount:]
        }

        // Retain files during compaction.
        IndexFiles(files).Retain()

        // Mark the level as compacting.
        p.levelCompacting[level] = true

        // 开goroutine作compact
        func(files []*IndexFile, level int) {
            // Start compacting in a separate goroutine.
            p.wg.Add(1)
            go func() {

                // compact到高一级.
                p.compactToLevel(files, level+1, interrupt)

                // Ensure compaction lock for the level is released.
                p.mu.Lock()
                p.levelCompacting[level] = false
                p.mu.Unlock()
                p.wg.Done()

                // Check for new compactions
                p.Compact()
            }()
        }(files, level)
    }

TagValueSeriesIDCache

  • series id set 在内存中的缓存,实际上就是一个LRU缓存,一般用双向链表+map实现;
  • 使用map来记录缓存内容: cache map[string]map[string]map[string]*list.Element ,这是个嵌套map结构
    seasurement name -> tag key -> tag value -> list.Element ,最后的这个 list.Element 里包括 seriesIDSet
type seriesIDCacheElement struct {
    name        []byte
    key         []byte
    value       []byte
    SeriesIDSet *tsdb.SeriesIDSet
}

利用这个map来加速cache的查找过程;

  • 使用 golang list.List 来记录所有的 list.Eement 对象,实现缓存的LRU淘汰机制。新加入的和刚刚Get过的element被移动到链表的头部,如果缓存大小到达上限,则直接删除链表尾部的元素,同时也要清理map中相应的元素。

完整结构图

  • 最后我们来放一张完整的tsi结构图,每个Shard都对应有这样的一个tsi结构

    Influxdb中基于磁盘的倒排索引文件TSI结构解析

    influxdb_index_arch.png


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

编程卓越之道

编程卓越之道

Hyde R / 韩东海 / 电子工业出版社 / 2006-4-1 / 49.80

各位程序员一定希望自己编写的代码是能让老板赞赏、满意的代码;是能让客户乐意掏钱购买的代码;是能让使用者顺利使用的代码;是能让同行欣赏赞誉的代码;是能让自己引以为豪的卓越代码。本书作者为希望能编写出卓越代码的人提供了自己积累的关于卓越编程的真知灼见。它弥补了计算机科学和工程课程中被忽略的一个部分——底层细节,而这正是构建卓越代码的基石。具体内容包括:计算机数据表示法,二进制数学运算与位运算,内存组织......一起来看看 《编程卓越之道》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器