Start方法。 这个方法在eth协议启动的时候被调用,这个方法接收两个参数,一个是当前的区块头,一个是事件订阅器,通过这个订阅器可以获取区块链的改变信息。 eth.bloomIndexer.Start(eth.blockchain.CurrentHeader(), eth.blockchain.SubscribeChainEvent) // Start creates a goroutine to feed chain head events into the indexer for // cascading background processing. Children do not need to be started, they // are notified about new events by their parents. // 子链不需要被启动。 以为他们的父节点会通知他们。 func (c *ChainIndexer) Start(currentHeader *types.Header, chainEventer func(ch chan<- ChainEvent) event.Subscription) { go c.eventLoop(currentHeader, chainEventer) } // eventLoop is a secondary - optional - event loop of the indexer which is only // started for the outermost indexer to push chain head events into a processing // queue. // eventLoop 循环只会在最外面的索引节点被调用。 所有的Child indexer不会被启动这个方法。 func (c *ChainIndexer) eventLoop(currentHeader *types.Header, chainEventer func(ch chan<- ChainEvent) event.Subscription) { // Mark the chain indexer as active, requiring an additional teardown atomic.StoreUint32(&c.active, 1) events := make(chan ChainEvent, 10) sub := chainEventer(events) defer sub.Unsubscribe() // Fire the initial new head event to start any outstanding processing // 设置我们的其实的区块高度,用来触发之前未完成的操作。 c.newHead(currentHeader.Number.Uint64(), false) var ( prevHeader = currentHeader prevHash = currentHeader.Hash() ) for { select { case errc := <-c.quit: // Chain indexer terminating, report no failure and abort errc <- nil return case ev, ok := <-events: // Received a new event, ensure it's not nil (closing) and update if !ok { errc := <-c.quit errc <- nil return } header := ev.Block.Header() if header.ParentHash != prevHash { //如果出现了分叉,那么我们首先 //找到公共祖先, 从公共祖先之后的索引需要重建。 c.newHead(FindCommonAncestor(c.chainDb, prevHeader, header).Number.Uint64(), true) } // 设置新的head c.newHead(header.Number.Uint64(), false) prevHeader, prevHash = header, header.Hash() } } } newHead方法,通知indexer新的区块链头,或者是需要重建索引,newHead方法会触发 // newHead notifies the indexer about new chain heads and/or reorgs. func (c *ChainIndexer) newHead(head uint64, reorg bool) { c.lock.Lock() defer c.lock.Unlock() // If a reorg happened, invalidate all sections until that point if reorg { // 需要重建索引 从head开始的所有section都需要重建。 // Revert the known section number to the reorg point changed := head / c.sectionSize if changed < c.knownSections { c.knownSections = changed } // Revert the stored sections from the database to the reorg point // 将存储的部分从数据库恢复到索引重建点 if changed < c.storedSections { c.setValidSections(changed) } // Update the new head number to te finalized section end and notify children // 生成新的head 并通知所有的子索引 head = changed * c.sectionSize if head < c.cascadedHead { c.cascadedHead = head for _, child := range c.children { child.newHead(c.cascadedHead, true) } } return } // No reorg, calculate the number of newly known sections and update if high enough var sections uint64 if head >= c.confirmsReq { sections = (head + 1 - c.confirmsReq) / c.sectionSize if sections > c.knownSections { c.knownSections = sections select { case c.update <- struct{}{}: default: } } } } 父子索引数据的关系 父Indexer负载事件的监听然后把结果通过newHead传递给子Indexer的updateLoop来处理。 setValidSections方法,写入当前已经存储的sections的数量。 如果传入的值小于已经存储的数量,那么从数据库里面删除对应的section // setValidSections writes the number of valid sections to the index database func (c *ChainIndexer) setValidSections(sections uint64) { // Set the current number of valid sections in the database var data [8]byte binary.BigEndian.PutUint64(data[:], sections) c.indexDb.Put([]byte("count"), data[:]) // Remove any reorged sections, caching the valids in the mean time for c.storedSections > sections { c.storedSections-- c.removeSectionHead(c.storedSections) } c.storedSections = sections // needed if new > old } processSection // processSection processes an entire section by calling backend functions while // ensuring the continuity of the passed headers. Since the chain mutex is not // held while processing, the continuity can be broken by a long reorg, in which // case the function returns with an error. //processSection通过调用后端函数来处理整个部分,同时确保传递的头文件的连续性。 由于链接互斥锁在处理过程中没有保持,连续性可能会被重新打断,在这种情况下,函数返回一个错误。 func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (common.Hash, error) { c.log.Trace("Processing new chain section", "section", section) // Reset and partial processing c.backend.Reset(section) for number := section * c.sectionSize; number < (section+1)*c.sectionSize; number++ { hash := GetCanonicalHash(c.chainDb, number) if hash == (common.Hash{}) { return common.Hash{}, fmt.Errorf("canonical block #%d unknown", number) } header := GetHeader(c.chainDb, hash, number) if header == nil { return common.Hash{}, fmt.Errorf("block #%d [%x…] not found", number, hash[:4]) } else if header.ParentHash != lastHead { return common.Hash{}, fmt.Errorf("chain reorged during section processing") } c.backend.Process(header) lastHead = header.Hash() } if err := c.backend.Commit(); err != nil { c.log.Error("Section commit failed", "error", err) return common.Hash{}, err } return lastHead, nil }
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 区块链技术+区块链怎么赚钱?
- 区块链技术入门:区块链是什么
- 阿里申请可“行政干预”区块链专利,区块链变味?
- 中国区块链商学院:区块链基础知识
- 从Java到区块链:如何成为区块链开发人员
- 通过python构建一个区块链来学习区块链
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Introduction to Linear Optimization
Dimitris Bertsimas、John N. Tsitsiklis / Athena Scientific / 1997-02-01 / USD 89.00
"The true merit of this book, however, lies in its pedagogical qualities which are so impressive..." "Throughout the book, the authors make serious efforts to give geometric and intuitive explanations......一起来看看 《Introduction to Linear Optimization》 这本书的介绍吧!