Start方法。 这个方法在eth协议启动的时候被调用,这个方法接收两个参数,一个是当前的区块头,一个是事件订阅器,通过这个订阅器可以获取区块链的改变信息。 eth.bloomIndexer.Start(eth.blockchain.CurrentHeader(), eth.blockchain.SubscribeChainEvent) // Start creates a goroutine to feed chain head events into the indexer for // cascading background processing. Children do not need to be started, they // are notified about new events by their parents. // 子链不需要被启动。 以为他们的父节点会通知他们。 func (c *ChainIndexer) Start(currentHeader *types.Header, chainEventer func(ch chan<- ChainEvent) event.Subscription) { go c.eventLoop(currentHeader, chainEventer) } // eventLoop is a secondary - optional - event loop of the indexer which is only // started for the outermost indexer to push chain head events into a processing // queue. // eventLoop 循环只会在最外面的索引节点被调用。 所有的Child indexer不会被启动这个方法。 func (c *ChainIndexer) eventLoop(currentHeader *types.Header, chainEventer func(ch chan<- ChainEvent) event.Subscription) { // Mark the chain indexer as active, requiring an additional teardown atomic.StoreUint32(&c.active, 1) events := make(chan ChainEvent, 10) sub := chainEventer(events) defer sub.Unsubscribe() // Fire the initial new head event to start any outstanding processing // 设置我们的其实的区块高度,用来触发之前未完成的操作。 c.newHead(currentHeader.Number.Uint64(), false) var ( prevHeader = currentHeader prevHash = currentHeader.Hash() ) for { select { case errc := <-c.quit: // Chain indexer terminating, report no failure and abort errc <- nil return case ev, ok := <-events: // Received a new event, ensure it's not nil (closing) and update if !ok { errc := <-c.quit errc <- nil return } header := ev.Block.Header() if header.ParentHash != prevHash { //如果出现了分叉,那么我们首先 //找到公共祖先, 从公共祖先之后的索引需要重建。 c.newHead(FindCommonAncestor(c.chainDb, prevHeader, header).Number.Uint64(), true) } // 设置新的head c.newHead(header.Number.Uint64(), false) prevHeader, prevHash = header, header.Hash() } } } newHead方法,通知indexer新的区块链头,或者是需要重建索引,newHead方法会触发 // newHead notifies the indexer about new chain heads and/or reorgs. func (c *ChainIndexer) newHead(head uint64, reorg bool) { c.lock.Lock() defer c.lock.Unlock() // If a reorg happened, invalidate all sections until that point if reorg { // 需要重建索引 从head开始的所有section都需要重建。 // Revert the known section number to the reorg point changed := head / c.sectionSize if changed < c.knownSections { c.knownSections = changed } // Revert the stored sections from the database to the reorg point // 将存储的部分从数据库恢复到索引重建点 if changed < c.storedSections { c.setValidSections(changed) } // Update the new head number to te finalized section end and notify children // 生成新的head 并通知所有的子索引 head = changed * c.sectionSize if head < c.cascadedHead { c.cascadedHead = head for _, child := range c.children { child.newHead(c.cascadedHead, true) } } return } // No reorg, calculate the number of newly known sections and update if high enough var sections uint64 if head >= c.confirmsReq { sections = (head + 1 - c.confirmsReq) / c.sectionSize if sections > c.knownSections { c.knownSections = sections select { case c.update <- struct{}{}: default: } } } } 父子索引数据的关系 父Indexer负载事件的监听然后把结果通过newHead传递给子Indexer的updateLoop来处理。 setValidSections方法,写入当前已经存储的sections的数量。 如果传入的值小于已经存储的数量,那么从数据库里面删除对应的section // setValidSections writes the number of valid sections to the index database func (c *ChainIndexer) setValidSections(sections uint64) { // Set the current number of valid sections in the database var data [8]byte binary.BigEndian.PutUint64(data[:], sections) c.indexDb.Put([]byte("count"), data[:]) // Remove any reorged sections, caching the valids in the mean time for c.storedSections > sections { c.storedSections-- c.removeSectionHead(c.storedSections) } c.storedSections = sections // needed if new > old } processSection // processSection processes an entire section by calling backend functions while // ensuring the continuity of the passed headers. Since the chain mutex is not // held while processing, the continuity can be broken by a long reorg, in which // case the function returns with an error. //processSection通过调用后端函数来处理整个部分,同时确保传递的头文件的连续性。 由于链接互斥锁在处理过程中没有保持,连续性可能会被重新打断,在这种情况下,函数返回一个错误。 func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (common.Hash, error) { c.log.Trace("Processing new chain section", "section", section) // Reset and partial processing c.backend.Reset(section) for number := section * c.sectionSize; number < (section+1)*c.sectionSize; number++ { hash := GetCanonicalHash(c.chainDb, number) if hash == (common.Hash{}) { return common.Hash{}, fmt.Errorf("canonical block #%d unknown", number) } header := GetHeader(c.chainDb, hash, number) if header == nil { return common.Hash{}, fmt.Errorf("block #%d [%x…] not found", number, hash[:4]) } else if header.ParentHash != lastHead { return common.Hash{}, fmt.Errorf("chain reorged during section processing") } c.backend.Process(header) lastHead = header.Hash() } if err := c.backend.Commit(); err != nil { c.log.Error("Section commit failed", "error", err) return common.Hash{}, err } return lastHead, nil }
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 区块链技术+区块链怎么赚钱?
- 区块链技术入门:区块链是什么
- 阿里申请可“行政干预”区块链专利,区块链变味?
- 中国区块链商学院:区块链基础知识
- 从Java到区块链:如何成为区块链开发人员
- 通过python构建一个区块链来学习区块链
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Redis开发与运维
付磊、张益军 / 机械工业出版社 / 2017-3-1 / 89.00
本书全面讲解Redis基本功能及其应用,并结合线上开发与运维监控中的实际使用案例,深入分析并总结了实际开发运维中遇到的“陷阱”,以及背后的原因, 包含大规模集群开发与管理的场景、应用案例与开发技巧,为高效开发运维提供了大量实际经验和建议。本书不要求读者有任何Redis使用经验,对入门与进阶DevOps的开发者提供有价值的帮助。主要内容包括:Redis的安装配置、API、各种高效功能、客户端、持久化......一起来看看 《Redis开发与运维》 这本书的介绍吧!