go-ethereum 源码分析 - ethdb(3)

栏目: 数据库 · 发布时间: 7年前

内容简介:go-ethereum 中的所有区块数据都存储在源码目录如下,主要就下面 4 个源码文件:我们主要分析下

go-ethereum 中的所有区块数据都存储在 leveldb 中,并且 go-ethereum 又基于 leveldb 进行了一层简单的封装。

leveldb 是一个由 Google 开源(BSD)的 KV(Key/Value Pair)非关系型数据库,是基于 LSM(Log-Structured-Merge tree) 的典型实现。

主要有如下几个 特性

  • Keys 和 Values 均为任意长度的字节数组;
  • Data(KV) 默认以 Key 字典序 排序 存储,也可以提供自定义的 排序算法 来重载排序;
  • 基本操作包括:Put(k,v),Get(k),Delete(k);
  • 支持原子级的批量(Batch)操作;
  • 可以创建数据全景(transient)的 snapshot,并支持在 snapshot 中查找数据;
  • 支持前向和后向迭代遍历数据;
  • 数据自动采用 Snappy 压缩算法进行压缩;
  • 可移植性;

源码目录如下,主要就下面 4 个源码文件:

➜  ethdb :
.
├── database.go        // leveldb 的封装代码
├── database_test.go   // 测试用例
├── interface.go       // 定义了 Database 的一些操作接口
└── memory_database.go // 供测试环境使用的基于内存的数据库

我们主要分析下 database.gointerface.go 两个源码文件。

database.go

这里主要封装了 leveldb 的接口,从如下 import 可以看出主要使用了 goleveldb 的封装。

import (
  "strconv"
  "strings"
  "sync"
  "time"

  "github.com/ethereum/go-ethereum/log"
  "github.com/ethereum/go-ethereum/metrics"
  "github.com/syndtr/goleveldb/leveldb"
  "github.com/syndtr/goleveldb/leveldb/errors"
  "github.com/syndtr/goleveldb/leveldb/filter"
  "github.com/syndtr/goleveldb/leveldb/iterator"
  "github.com/syndtr/goleveldb/leveldb/opt"
  "github.com/syndtr/goleveldb/leveldb/util"
)

下面看下在 goleveldb 基础上进一步封装的 LDBDatabase 结构,如下,主要增加了很多维度的 Metrics 用于统计使用情况(几个 metrics 的注释很清晰):

type LDBDatabase struct {
  fn string      // filename for reporting
  db *leveldb.DB // LevelDB instance

  compTimeMeter  metrics.Meter // Meter for measuring the total time spent in database compaction
  compReadMeter  metrics.Meter // Meter for measuring the data read during compaction
  compWriteMeter metrics.Meter // Meter for measuring the data written during compaction
  diskReadMeter  metrics.Meter // Meter for measuring the effective amount of data read
  diskWriteMeter metrics.Meter // Meter for measuring the effective amount of data written

  quitLock sync.Mutex      // Mutex protecting the quit channel access
  quitChan chan chan error // Quit channel to stop the metrics collection before closing the database

  log log.Logger // Contextual logger tracking the database path
}

通过 NewLDBDatabase 方法来打开(或创建)并返回封装后的 LDBDatabase 实例,如下:

// NewLDBDatabase returns a LevelDB wrapped object.
func NewLDBDatabase(file string, cache int, handles int) (*LDBDatabase, error) {
  // 跟踪对应 leveldb 文件的 logger
  logger := log.New("database", file)

  // Ensure we have some minimal caching and file guarantees
  if cache < 16 {
    cache = 16
  }
  if handles < 16 {
    handles = 16
  }
  logger.Info("Allocated cache and file handles", "cache", cache, "handles", handles)

  // 打开 leveldb 文件,并恢复潜在的冲突
  db, err := leveldb.OpenFile(file, &opt.Options{
    OpenFilesCacheCapacity: handles,
    BlockCacheCapacity:     cache / 2 * opt.MiB,
    WriteBuffer:            cache / 4 * opt.MiB, // Two of these are used internally
    Filter:                 filter.NewBloomFilter(10),
  })
  if _, corrupted := err.(*errors.ErrCorrupted); corrupted {
    // 如果有冲突,则需要修复文件
    db, err = leveldb.RecoverFile(file, nil)
  }

  // (Re)check for errors and abort if opening of the db failed
  if err != nil {
    return nil, err
  }

  // 返回 LDBDatabase 实例
  return &LDBDatabase{
    fn:  file,
    db:  db,
    log: logger,
  }, nil
}

此外,提供了一些基本操作方法,如 Put,Path,Get,Delete 等等用于操作 leveldb,这些基本操作都是直接调用 leveldb 的封装,如下为 Put 操作:

// Put puts the given key / value to the queue
func (db *LDBDatabase) Put(key []byte, value []byte) error {
  // Generate the data to write to disk, update the meter and write
  //value = rle.Compress(value)

  return db.db.Put(key, value, nil)
}

创建本地 blockchain 数据库

下面看何时以及如何创建本地区块链DB:

// source: eth/backend.go
// 
// New creates a new Ethereum object (including the
// initialisation of the common Ethereum object)
func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
  if config.SyncMode == downloader.LightSync {
    return nil, errors.New("can't run eth.Ethereum in light sync mode, use les.LightEthereum")
  }
  if !config.SyncMode.IsValid() {
    return nil, fmt.Errorf("invalid sync mode %d", config.SyncMode)
  }

  // 创建本地 blockchain 数据库
  // 返回的 chainDb
  chainDb, err := CreateDB(ctx, config, "chaindata")
  if err != nil {
    return nil, err
  }
  stopDbUpgrade := upgradeDeduplicateData(chainDb)

  // 设置创世区块,并写入 chainDb 中
  chainConfig, genesisHash, genesisErr := core.SetupGenesisBlock(chainDb, config.Genesis)
  if _, ok := genesisErr.(*params.ConfigCompatError); genesisErr != nil && !ok {
    return nil, genesisErr
  }
  log.Info("Initialised chain configuration", "config", chainConfig)

  eth := &Ethereum{
    config:         config,
    chainDb:        chainDb,
    chainConfig:    chainConfig,
    eventMux:       ctx.EventMux,
    accountManager: ctx.AccountManager,
    engine:         CreateConsensusEngine(ctx, &config.Ethash, chainConfig, chainDb),
    shutdownChan:   make(chan bool),
    stopDbUpgrade:  stopDbUpgrade,
    networkId:      config.NetworkId,
    gasPrice:       config.GasPrice,
    etherbase:      config.Etherbase,
    bloomRequests:  make(chan chan *bloombits.Retrieval),
    bloomIndexer:   NewBloomIndexer(chainDb, params.BloomBitsBlocks),
  }

  // ...
}

可以看到在创建和初始化 Ethereum 对象的时候会同时通过调用 CreateDB 方法来创建本地区块链数据库,并将 ethdb.LDBDatabase 实例 chainDb 作为 eth 实例的成员,在后续区块数据写入本地库时,都是通过该 chainDb 实例来操作写入的,如下代码:

// source: eth/backend.go
// 
// CreateDB creates the chain database.
func CreateDB(ctx *node.ServiceContext, config *Config, name string) (ethdb.Database, error) {
  // 打开 name 数据库
  db, err := ctx.OpenDatabase(name, config.DatabaseCache, config.DatabaseHandles)
  if err != nil {
    return nil, err
  }
  if db, ok := db.(*ethdb.LDBDatabase); ok {
    db.Meter("eth/db/chaindata/")
  }
  return db, nil
}

这里打开 “chaindata” 数据库,并且启动 Metric(见下面),在 ctx.OpenDatabase 中创建 LDBDatabase,如下:

// source: node/node.go
// 
// OpenDatabase opens an existing database with the given name (or creates one if no
// previous can be found) from within the node's instance directory. If the node is
// ephemeral, a memory database is returned.
func (n *Node) OpenDatabase(name string, cache, handles int) (ethdb.Database, error) {
  if n.config.DataDir == "" {
    // 没有指定 DataDir,则启动为内存数据库
    return ethdb.NewMemDatabase()
  }
  return ethdb.NewLDBDatabase(n.config.resolvePath(name), cache, handles)
}

将 block 写入到 leveldb

在 core/database_util.go 中,封装了 WriteHeader, WriteBody, WriteBlock 等方法用于将区块 header, body, block 写入 leveldb 中。

// source: core/database_util.go
// 
// WriteHeader serializes a block header into the database.
func WriteHeader(db ethdb.Putter, header *types.Header) error {
  data, err := rlp.EncodeToBytes(header)
  if err != nil {
    return err
  }
  hash := header.Hash().Bytes()
  num := header.Number.Uint64()
  encNum := encodeBlockNumber(num)
  key := append(blockHashPrefix, hash...)
  if err := db.Put(key, encNum); err != nil {
    log.Crit("Failed to store hash to number mapping", "err", err)
  }
  key = append(append(headerPrefix, encNum...), hash...)
  if err := db.Put(key, data); err != nil {
    log.Crit("Failed to store header", "err", err)
  }
  return nil
}

// WriteBody serializes the body of a block into the database.
func WriteBody(db ethdb.Putter, hash common.Hash, number uint64, body *types.Body) error {
  data, err := rlp.EncodeToBytes(body)
  if err != nil {
    return err
  }
  return WriteBodyRLP(db, hash, number, data)
}

// WriteBodyRLP writes a serialized body of a block into the database.
func WriteBodyRLP(db ethdb.Putter, hash common.Hash, number uint64, rlp rlp.RawValue) error {
  key := append(append(bodyPrefix, encodeBlockNumber(number)...), hash.Bytes()...)
  if err := db.Put(key, rlp); err != nil {
    log.Crit("Failed to store block body", "err", err)
  }
  return nil
}

// WriteTd serializes the total difficulty of a block into the database.
func WriteTd(db ethdb.Putter, hash common.Hash, number uint64, td *big.Int) error {
  data, err := rlp.EncodeToBytes(td)
  if err != nil {
    return err
  }
  key := append(append(append(headerPrefix, encodeBlockNumber(number)...), hash.Bytes()...), tdSuffix...)
  if err := db.Put(key, data); err != nil {
    log.Crit("Failed to store block total difficulty", "err", err)
  }
  return nil
}

// WriteBlock serializes a block into the database, header and body separately.
func WriteBlock(db ethdb.Putter, block *types.Block) error {
  // Store the body first to retain database consistency
  if err := WriteBody(db, block.Hash(), block.NumberU64(), block.Body()); err != nil {
    return err
  }
  // Store the header too, signaling full block ownership
  if err := WriteHeader(db, block.Header()); err != nil {
    return err
  }
  return nil
}

其中都通过 db.Put 方法将区块信息写入 leveldb 中。

Metrics

Meter 方法中创建了各种 Metrics 采集器,然后创建了 quitChan,最后启动一个协程调用了 db.meter 方法每隔 3 秒收集一次。

// Meter configures the database metrics collectors and
func (db *LDBDatabase) Meter(prefix string) {
  // Short circuit metering if the metrics system is disabled
  if !metrics.Enabled {
    return
  }
  // Initialize all the metrics collector at the requested prefix
  db.compTimeMeter = metrics.NewRegisteredMeter(prefix+"compact/time", nil)
  db.compReadMeter = metrics.NewRegisteredMeter(prefix+"compact/input", nil)
  db.compWriteMeter = metrics.NewRegisteredMeter(prefix+"compact/output", nil)
  db.diskReadMeter = metrics.NewRegisteredMeter(prefix+"disk/read", nil)
  db.diskWriteMeter = metrics.NewRegisteredMeter(prefix+"disk/write", nil)

  // Create a quit channel for the periodic collector and run it
  db.quitLock.Lock()
  db.quitChan = make(chan chan error)
  db.quitLock.Unlock()

  go db.meter(3 * time.Second)
}

metrics.Enabled开关默认为关闭(false),通过命令行选项参数 metrics 来设置。

// go-ethereum/metrics.go

// Enabled is checked by the constructor functions for all of the
// standard metrics.  If it is true, the metric returned is a stub.
//
// This global kill-switch helps quantify the observer effect and makes
// for less cluttered pprof profiles.
var Enabled bool = false

// MetricsEnabledFlag is the CLI flag name to use to enable metrics collections.
const MetricsEnabledFlag = "metrics"
const DashboardEnabledFlag = "dashboard"

// Init enables or disables the metrics system. Since we need this to run before
// any other code gets to create meters and timers, we'll actually do an ugly hack
// and peek into the command line args for the metrics flag.
func init() {
  for _, arg := range os.Args {
    if flag := strings.TrimLeft(arg, "-"); flag == MetricsEnabledFlag || flag == DashboardEnabledFlag {
      log.Info("Enabling metrics collection")
      Enabled = true
    }
  }
}

在启动 metrics 开关后,每隔 3 秒会从 leveldb 内部获取计数器,然后公布到 metrics 子系统,这里是一个无限循环,直到 quitChan 收到一个退出信号。

// meter periodically retrieves internal leveldb counters and reports them to
// the metrics subsystem.
//
// This is how a stats table look like (currently):
//   Compactions
//    Level |   Tables   |    Size(MB)   |    Time(sec)  |    Read(MB)   |   Write(MB)
//   -------+------------+---------------+---------------+---------------+---------------
//      0   |          0 |       0.00000 |       1.27969 |       0.00000 |      12.31098
//      1   |         85 |     109.27913 |      28.09293 |     213.92493 |     214.26294
//      2   |        523 |    1000.37159 |       7.26059 |      66.86342 |      66.77884
//      3   |        570 |    1113.18458 |       0.00000 |       0.00000 |       0.00000
//
// This is how the iostats look like (currently):
// Read(MB):3895.04860 Write(MB):3654.64712
func (db *LDBDatabase) meter(refresh time.Duration) {
  // Create the counters to store current and previous compaction values
  // 创建一个计数器,用于存储当前和上一次的压缩值
  // 一个二维数组,第 1 维表示 当前 和 上一次,第 2 维表示统计值
  compactions := make([][]float64, 2)
  for i := 0; i < 2; i++ {
    compactions[i] = make([]float64, 3)
  }
  // Create storage for iostats.
  var iostats [2]float64
  // Iterate ad infinitum and collect the stats
  for i := 1; ; i++ {
    // Retrieve the database stats
    stats, err := db.db.GetProperty("leveldb.stats")
    if err != nil {
      db.log.Error("Failed to read database stats", "err", err)
      return
    }
    // Find the compaction table, skip the header
    lines := strings.Split(stats, "\n")
    for len(lines) > 0 && strings.TrimSpace(lines[0]) != "Compactions" {
      lines = lines[1:]
    }
    if len(lines) <= 3 {
      db.log.Error("Compaction table not found")
      return
    }
    lines = lines[3:]

    // Iterate over all the table rows, and accumulate the entries
    for j := 0; j < len(compactions[i%2]); j++ {
      compactions[i%2][j] = 0
    }
    for _, line := range lines {
      parts := strings.Split(line, "|")
      if len(parts) != 6 {
        break
      }
      for idx, counter := range parts[3:] {
        value, err := strconv.ParseFloat(strings.TrimSpace(counter), 64)
        if err != nil {
          db.log.Error("Compaction entry parsing failed", "err", err)
          return
        }
        compactions[i%2][idx] += value
      }
    }
    // Update all the requested meters
    if db.compTimeMeter != nil {
      db.compTimeMeter.Mark(int64((compactions[i%2][0] - compactions[(i-1)%2][0]) * 1000 * 1000 * 1000))
    }
    if db.compReadMeter != nil {
      db.compReadMeter.Mark(int64((compactions[i%2][1] - compactions[(i-1)%2][1]) * 1024 * 1024))
    }
    if db.compWriteMeter != nil {
      db.compWriteMeter.Mark(int64((compactions[i%2][2] - compactions[(i-1)%2][2]) * 1024 * 1024))
    }

    // Retrieve the database iostats.
    ioStats, err := db.db.GetProperty("leveldb.iostats")
    if err != nil {
      db.log.Error("Failed to read database iostats", "err", err)
      return
    }
    parts := strings.Split(ioStats, " ")
    if len(parts) < 2 {
      db.log.Error("Bad syntax of ioStats", "ioStats", ioStats)
      return
    }
    r := strings.Split(parts[0], ":")
    if len(r) < 2 {
      db.log.Error("Bad syntax of read entry", "entry", parts[0])
      return
    }
    read, err := strconv.ParseFloat(r[1], 64)
    if err != nil {
      db.log.Error("Read entry parsing failed", "err", err)
      return
    }
    w := strings.Split(parts[1], ":")
    if len(w) < 2 {
      db.log.Error("Bad syntax of write entry", "entry", parts[1])
      return
    }
    write, err := strconv.ParseFloat(w[1], 64)
    if err != nil {
      db.log.Error("Write entry parsing failed", "err", err)
      return
    }
    if db.diskReadMeter != nil {
      db.diskReadMeter.Mark(int64((read - iostats[0]) * 1024 * 1024))
    }
    if db.diskWriteMeter != nil {
      db.diskWriteMeter.Mark(int64((write - iostats[1]) * 1024 * 1024))
    }
    iostats[0] = read
    iostats[1] = write

    // Sleep a bit, then repeat the stats collection
    select {
    case errc := <-db.quitChan:
      // Quit requesting, stop hammering the database
      errc <- nil
      return

    case <-time.After(refresh):
      // Timeout, gather a new set of stats
    }
  }
}

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Visual Thinking

Visual Thinking

Colin Ware / Morgan Kaufmann / 2008-4-18 / USD 49.95

Increasingly, designers need to present information in ways that aid their audiences thinking process. Fortunately, results from the relatively new science of human visual perception provide valuable ......一起来看看 《Visual Thinking》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码