兄弟连区块链教程以太坊源码分析chain-indexer区块链索引二

栏目: 编程工具 · 发布时间: 6年前

Start方法。 这个方法在eth协议启动的时候被调用,这个方法接收两个参数,一个是当前的区块头,一个是事件订阅器,通过这个订阅器可以获取区块链的改变信息。

    eth.bloomIndexer.Start(eth.blockchain.CurrentHeader(), eth.blockchain.SubscribeChainEvent)

    // Start creates a goroutine to feed chain head events into the indexer for
    // cascading background processing. Children do not need to be started, they
    // are notified about new events by their parents.

    // 子链不需要被启动。 以为他们的父节点会通知他们。
    func (c *ChainIndexer) Start(currentHeader *types.Header, chainEventer func(ch chan<- ChainEvent) event.Subscription) {
        go c.eventLoop(currentHeader, chainEventer)
    }

    // eventLoop is a secondary - optional - event loop of the indexer which is only
    // started for the outermost indexer to push chain head events into a processing
    // queue.

    // eventLoop 循环只会在最外面的索引节点被调用。 所有的Child indexer不会被启动这个方法。

    func (c *ChainIndexer) eventLoop(currentHeader *types.Header, chainEventer func(ch chan<- ChainEvent) event.Subscription) {
        // Mark the chain indexer as active, requiring an additional teardown
        atomic.StoreUint32(&c.active, 1)
    
        events := make(chan ChainEvent, 10)
        sub := chainEventer(events)
        defer sub.Unsubscribe()
    
        // Fire the initial new head event to start any outstanding processing
        // 设置我们的其实的区块高度,用来触发之前未完成的操作。
        c.newHead(currentHeader.Number.Uint64(), false)
    
        var (
            prevHeader = currentHeader
            prevHash = currentHeader.Hash()
        )
        for {
            select {
            case errc := <-c.quit:
                // Chain indexer terminating, report no failure and abort
                errc <- nil
                return
    
            case ev, ok := <-events:
                // Received a new event, ensure it's not nil (closing) and update
                if !ok {
                    errc := <-c.quit
                    errc <- nil
                    return
                }
                header := ev.Block.Header()
                if header.ParentHash != prevHash { //如果出现了分叉,那么我们首先
                    //找到公共祖先, 从公共祖先之后的索引需要重建。
                    c.newHead(FindCommonAncestor(c.chainDb, prevHeader, header).Number.Uint64(), true)
                }
                // 设置新的head
                c.newHead(header.Number.Uint64(), false)
    
                prevHeader, prevHash = header, header.Hash()
            }
        }
    }


newHead方法,通知indexer新的区块链头,或者是需要重建索引,newHead方法会触发

    
    // newHead notifies the indexer about new chain heads and/or reorgs.
    func (c *ChainIndexer) newHead(head uint64, reorg bool) {
        c.lock.Lock()
        defer c.lock.Unlock()
    
        // If a reorg happened, invalidate all sections until that point
        if reorg { // 需要重建索引 从head开始的所有section都需要重建。
            // Revert the known section number to the reorg point
            changed := head / c.sectionSize
            if changed < c.knownSections {
                c.knownSections = changed
            }
            // Revert the stored sections from the database to the reorg point
            // 将存储的部分从数据库恢复到索引重建点
            if changed < c.storedSections {
                c.setValidSections(changed)
            }
            // Update the new head number to te finalized section end and notify children
            // 生成新的head 并通知所有的子索引
            head = changed * c.sectionSize
    
            if head < c.cascadedHead {
                c.cascadedHead = head
                for _, child := range c.children {
                    child.newHead(c.cascadedHead, true)
                }
            }
            return
        }
        // No reorg, calculate the number of newly known sections and update if high enough
        var sections uint64
        if head >= c.confirmsReq {
            sections = (head + 1 - c.confirmsReq) / c.sectionSize
            if sections > c.knownSections {
                c.knownSections = sections
    
                select {
                case c.update <- struct{}{}:
                default:
                }
            }
        }
    }


父子索引数据的关系
父Indexer负载事件的监听然后把结果通过newHead传递给子Indexer的updateLoop来处理。


setValidSections方法,写入当前已经存储的sections的数量。 如果传入的值小于已经存储的数量,那么从数据库里面删除对应的section

    // setValidSections writes the number of valid sections to the index database
    func (c *ChainIndexer) setValidSections(sections uint64) {
        // Set the current number of valid sections in the database
        var data [8]byte
        binary.BigEndian.PutUint64(data[:], sections)
        c.indexDb.Put([]byte("count"), data[:])
    
        // Remove any reorged sections, caching the valids in the mean time
        for c.storedSections > sections {
            c.storedSections--
            c.removeSectionHead(c.storedSections)
        }
        c.storedSections = sections // needed if new > old
    }


processSection
    
    // processSection processes an entire section by calling backend functions while
    // ensuring the continuity of the passed headers. Since the chain mutex is not
    // held while processing, the continuity can be broken by a long reorg, in which
    // case the function returns with an error.

    //processSection通过调用后端函数来处理整个部分,同时确保传递的头文件的连续性。 由于链接互斥锁在处理过程中没有保持,连续性可能会被重新打断,在这种情况下,函数返回一个错误。
    func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (common.Hash, error) {
        c.log.Trace("Processing new chain section", "section", section)
    
        // Reset and partial processing
        c.backend.Reset(section)
    
        for number := section * c.sectionSize; number < (section+1)*c.sectionSize; number++ {
            hash := GetCanonicalHash(c.chainDb, number)
            if hash == (common.Hash{}) {
                return common.Hash{}, fmt.Errorf("canonical block #%d unknown", number)
            }
            header := GetHeader(c.chainDb, hash, number)
            if header == nil {
                return common.Hash{}, fmt.Errorf("block #%d [%x…] not found", number, hash[:4])
            } else if header.ParentHash != lastHead {
                return common.Hash{}, fmt.Errorf("chain reorged during section processing")
            }
            c.backend.Process(header)
            lastHead = header.Hash()
        }
        if err := c.backend.Commit(); err != nil {
            c.log.Error("Section commit failed", "error", err)
            return common.Hash{}, err
        }
        return lastHead, nil
    }

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Scrum敏捷软件开发

Scrum敏捷软件开发

Mike Cohn / 廖靖斌、吕梁岳、陈争云、阳陆育 / 清华大学出版社 / 2010-11 / 69.00元

《Scrum敏捷软件开发》是敏捷联盟及Scrum联盟创始人之一、敏捷估算及计划的鼻祖Mike Cohn三大经典著作中影响最为深厚的扛鼎之作,也是全球敏捷社区中获得广泛肯定的企业敏捷转型权威参考。作者花四年时间,把自己近十五年的敏捷实践经验,特别是近四年中针对各种敏捷转型企业的咨询和指导工作,并结合旁征博引的方式,从更高的思想层次对敏捷与Scrum多年来的经验和教训进行深入而前面的梳理和总结,最终集......一起来看看 《Scrum敏捷软件开发》 这本书的介绍吧!

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

html转js在线工具
html转js在线工具

html转js在线工具

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具