兄弟连区块链教程以太坊源码分析chain-indexer区块链索引二

栏目: 编程工具 · 发布时间: 6年前

Start方法。 这个方法在eth协议启动的时候被调用,这个方法接收两个参数,一个是当前的区块头,一个是事件订阅器,通过这个订阅器可以获取区块链的改变信息。

    eth.bloomIndexer.Start(eth.blockchain.CurrentHeader(), eth.blockchain.SubscribeChainEvent)

    // Start creates a goroutine to feed chain head events into the indexer for
    // cascading background processing. Children do not need to be started, they
    // are notified about new events by their parents.

    // 子链不需要被启动。 以为他们的父节点会通知他们。
    func (c *ChainIndexer) Start(currentHeader *types.Header, chainEventer func(ch chan<- ChainEvent) event.Subscription) {
        go c.eventLoop(currentHeader, chainEventer)
    }

    // eventLoop is a secondary - optional - event loop of the indexer which is only
    // started for the outermost indexer to push chain head events into a processing
    // queue.

    // eventLoop 循环只会在最外面的索引节点被调用。 所有的Child indexer不会被启动这个方法。

    func (c *ChainIndexer) eventLoop(currentHeader *types.Header, chainEventer func(ch chan<- ChainEvent) event.Subscription) {
        // Mark the chain indexer as active, requiring an additional teardown
        atomic.StoreUint32(&c.active, 1)
    
        events := make(chan ChainEvent, 10)
        sub := chainEventer(events)
        defer sub.Unsubscribe()
    
        // Fire the initial new head event to start any outstanding processing
        // 设置我们的其实的区块高度,用来触发之前未完成的操作。
        c.newHead(currentHeader.Number.Uint64(), false)
    
        var (
            prevHeader = currentHeader
            prevHash = currentHeader.Hash()
        )
        for {
            select {
            case errc := <-c.quit:
                // Chain indexer terminating, report no failure and abort
                errc <- nil
                return
    
            case ev, ok := <-events:
                // Received a new event, ensure it's not nil (closing) and update
                if !ok {
                    errc := <-c.quit
                    errc <- nil
                    return
                }
                header := ev.Block.Header()
                if header.ParentHash != prevHash { //如果出现了分叉,那么我们首先
                    //找到公共祖先, 从公共祖先之后的索引需要重建。
                    c.newHead(FindCommonAncestor(c.chainDb, prevHeader, header).Number.Uint64(), true)
                }
                // 设置新的head
                c.newHead(header.Number.Uint64(), false)
    
                prevHeader, prevHash = header, header.Hash()
            }
        }
    }


newHead方法,通知indexer新的区块链头,或者是需要重建索引,newHead方法会触发

    
    // newHead notifies the indexer about new chain heads and/or reorgs.
    func (c *ChainIndexer) newHead(head uint64, reorg bool) {
        c.lock.Lock()
        defer c.lock.Unlock()
    
        // If a reorg happened, invalidate all sections until that point
        if reorg { // 需要重建索引 从head开始的所有section都需要重建。
            // Revert the known section number to the reorg point
            changed := head / c.sectionSize
            if changed < c.knownSections {
                c.knownSections = changed
            }
            // Revert the stored sections from the database to the reorg point
            // 将存储的部分从数据库恢复到索引重建点
            if changed < c.storedSections {
                c.setValidSections(changed)
            }
            // Update the new head number to te finalized section end and notify children
            // 生成新的head 并通知所有的子索引
            head = changed * c.sectionSize
    
            if head < c.cascadedHead {
                c.cascadedHead = head
                for _, child := range c.children {
                    child.newHead(c.cascadedHead, true)
                }
            }
            return
        }
        // No reorg, calculate the number of newly known sections and update if high enough
        var sections uint64
        if head >= c.confirmsReq {
            sections = (head + 1 - c.confirmsReq) / c.sectionSize
            if sections > c.knownSections {
                c.knownSections = sections
    
                select {
                case c.update <- struct{}{}:
                default:
                }
            }
        }
    }


父子索引数据的关系
父Indexer负载事件的监听然后把结果通过newHead传递给子Indexer的updateLoop来处理。


setValidSections方法,写入当前已经存储的sections的数量。 如果传入的值小于已经存储的数量,那么从数据库里面删除对应的section

    // setValidSections writes the number of valid sections to the index database
    func (c *ChainIndexer) setValidSections(sections uint64) {
        // Set the current number of valid sections in the database
        var data [8]byte
        binary.BigEndian.PutUint64(data[:], sections)
        c.indexDb.Put([]byte("count"), data[:])
    
        // Remove any reorged sections, caching the valids in the mean time
        for c.storedSections > sections {
            c.storedSections--
            c.removeSectionHead(c.storedSections)
        }
        c.storedSections = sections // needed if new > old
    }


processSection
    
    // processSection processes an entire section by calling backend functions while
    // ensuring the continuity of the passed headers. Since the chain mutex is not
    // held while processing, the continuity can be broken by a long reorg, in which
    // case the function returns with an error.

    //processSection通过调用后端函数来处理整个部分,同时确保传递的头文件的连续性。 由于链接互斥锁在处理过程中没有保持,连续性可能会被重新打断,在这种情况下,函数返回一个错误。
    func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (common.Hash, error) {
        c.log.Trace("Processing new chain section", "section", section)
    
        // Reset and partial processing
        c.backend.Reset(section)
    
        for number := section * c.sectionSize; number < (section+1)*c.sectionSize; number++ {
            hash := GetCanonicalHash(c.chainDb, number)
            if hash == (common.Hash{}) {
                return common.Hash{}, fmt.Errorf("canonical block #%d unknown", number)
            }
            header := GetHeader(c.chainDb, hash, number)
            if header == nil {
                return common.Hash{}, fmt.Errorf("block #%d [%x…] not found", number, hash[:4])
            } else if header.ParentHash != lastHead {
                return common.Hash{}, fmt.Errorf("chain reorged during section processing")
            }
            c.backend.Process(header)
            lastHead = header.Hash()
        }
        if err := c.backend.Commit(); err != nil {
            c.log.Error("Section commit failed", "error", err)
            return common.Hash{}, err
        }
        return lastHead, nil
    }

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Introduction to Linear Optimization

Introduction to Linear Optimization

Dimitris Bertsimas、John N. Tsitsiklis / Athena Scientific / 1997-02-01 / USD 89.00

"The true merit of this book, however, lies in its pedagogical qualities which are so impressive..." "Throughout the book, the authors make serious efforts to give geometric and intuitive explanations......一起来看看 《Introduction to Linear Optimization》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

随机密码生成器
随机密码生成器

多种字符组合密码

MD5 加密
MD5 加密

MD5 加密工具