[以太坊源码分析][p2p网络03]:发起TCP连接请求

栏目: 服务器 · 发布时间: 7年前

内容简介:上一节介绍的是底层p2p网络开启后,监听别的远程节点发送来的TCP连接请求。这一节是个续集,要介绍的是本地节点如何向远程节点这一次,是我们打电话cal别人。但是这一次有点不同,我们是以究竟是怎么回事,请往下看。

上一节介绍的是底层p2p网络开启后,监听别的远程节点发送来的TCP连接请求。这一节是个续集,要介绍的是本地节点如何向远程节点 发起TCP连接请求

这一次,是我们打电话cal别人。但是这一次有点不同,我们是以 做任务 的形式向别人发起拨号,而且每次做 很多个 任务。每一个任务里都包含了 连接类型远程节点信息

究竟是怎么回事,请往下看。

0.索引

01.从 Server 服务开始

02.初始化拨号状态,以及创建任务

03.计划任务和开启任务

04.Do 执行任务

05.总结

1.从 Server 服务开始

[以太坊源码分析][p2p网络03]:发起TCP连接请求

Server服务启动,也开始了拨号

在上图中,主要是看第3个步骤,初始化拨号状态,和第6个步骤,开始拨号。(这里提一下, 监听连接发起连接

是两个单独的协程,所以并不是监听后再发起连接。)

2.初始化拨号状态,以及创建任务

dialstate 拨号状态

dialstatep2p/dial.go 中的核心结构体,管理拨号(发起TCP连接请求)和查找节点的操作。

通过 newDialState 来新建它。关于 dialstate 字段的含义在下方的注释中。

func newDialState(static []*enode.Node, bootnodes []*enode.Node, ntab discoverTable, maxdyn int, netrestrict *netutil.Netlist) *dialstate {
    s := &dialstate{
        maxDynDials: maxdyn,                                    // 最大的拨号任务数量
        ntab:        ntab,                                      // k桶                                        
        netrestrict: netrestrict,                               // ip网络的列表
        static:      make(map[enode.ID]*dialTask),              // 静态节点
        dialing:     make(map[enode.ID]connFlag),               // 拨号中,connFlag有4种拨号类型
        bootnodes:   make([]*enode.Node, len(bootnodes)),       // 初始引导节点
        randomNodes: make([]*enode.Node, maxdyn/2),             // 在k桶种随机查找节点,数量为最大拨号任务数量的二分之一
        hist:        new(dialHistory),                          // 记录最近的拨号
    }
    // 加入初始引导节点
    copy(s.bootnodes, bootnodes)
    // 加入静态节点
    for _, n := range static {
        s.addStatic(n)
    }
    return s
}

其中加入了两种节点, bootnodesstaticbootnodes 是初始引导节点,在节点没有接收到任何节点的连接请求,也没有节点可以给我们邻居节点的时候,就去连接 bootnodes ,它硬编码在了以太坊的源码中。 static 是静态节点,如果我们想和某些节点保持长期的连接,就把它们加入到静态节点的列表中。

newTasks 新建任务

新建任务就是将某一些远程节点打包成任务,(一个任务对应一个远程节点),最终返回一个任务列表。执行任务就是给任务中的远程节点发起TCP连接请求。

以下是新建任务的流程图:

[以太坊源码分析][p2p网络03]:发起TCP连接请求

新建任务

  • 1.设置最大的任务数量,这个是是由节点最大连接数除以拨号比率得出的,即 maxPeers/radio 得到。
    needDynDials := s.maxDynDials
  • 2.判断 peers 里是否有已经建立连接的节点, peers 是向本地节点发来连接请求的远程节点的集合。记录数量,最大任务数量减去这个数。
    for _, p := range peers {
        if p.rw.is(dynDialedConn) {
            needDynDials--
        }
    }
  • 3.判断服务中是否有正在拨号的节点。记录数量,最大任务数量减去这个数。
    for _, flag := range s.dialing {
        if flag&dynDialedConn != 0 {
            needDynDials--
        }
    }
  • 4.向设置的静态节点 s.static 发起连接请求,这个不消耗最大任务数量。
  • 5.如果发来连接请求的远程节点集合 peers 为空,并且经过了设置的时间 fallbackInterval 20s,会随机的连接一个引导节点 bootnode 。最大任务数量减1。
    if len(peers) == 0 && len(s.bootnodes) > 0 && needDynDials > 0 && now.Sub(s.start) > fallbackInterval {
        // 将第一个bootnode放在列表最后,使每一次取的bootnode都是不一样的。
        bootnode := s.bootnodes[0]
        s.bootnodes = append(s.bootnodes[:0], s.bootnodes[1:]...)
        s.bootnodes = append(s.bootnodes, bootnode)
    
        if addDial(dynDialedConn, bootnode) {
            needDynDials--
        }
    }
  • 6. ReadRandomNodes 随机的从UDP节点发现中使用Kad算法维护的k桶里,提取 randomCandidates 个节点。 randomCandidates 为最大任务数量的二分之一。(可能会提取不到这么多个,实际提取到的数量为 n 。)
    最大任务数量减去 n
    randomCandidates := needDynDials / 2
    if randomCandidates > 0 {
        n := s.ntab.ReadRandomNodes(s.randomNodes)
        for i := 0; i < randomCandidates && i < n; i++ {
            if addDial(dynDialedConn, s.randomNodes[i]) {
                needDynDials--
            }
        }
    }
  • 7.如果还不满足最大任务数量的话,从 s.lookupBuf 里提取,直到达到最大任务数量。 s.lookupBuf 里的节点也是通过Kad获取节点的方式获取到的,如果 s.lookupBuf 里节点数量不够,则创建发现任务 discoverTask 进行节点发现,填充 s.lookupBuf
    // 从lookupBuf里提取节点。
    i := 0
    for ; i < len(s.lookupBuf) && needDynDials > 0; i++ {
        if addDial(dynDialedConn, s.lookupBuf[i]) {
            needDynDials--
        }
    }
    // 去掉被提取出来的节点。
    s.lookupBuf = s.lookupBuf[:copy(s.lookupBuf, s.lookupBuf[i:])]    
    // 数量不够的话,进行节点发现。
    if len(s.lookupBuf) < needDynDials && !s.lookupRunning {
        s.lookupRunning = true
        newtasks = append(newtasks, &discoverTask{})
    }
  • 8.如果没有需要执行的任务,会执行等待任务 waitExpireTask ,也就是,保持拨号逻辑继续运行。
    if nRunning == 0 && len(newtasks) == 0 && s.hist.Len() > 0 {
          t := &waitExpireTask{s.hist.min().exp.Sub(now)}
          newtasks = append(newtasks, t)
      }

上述过程,即完成了一次任务的创建,结果是得到一个任务列表 newtasks

3.计划任务和开启任务

server.gorun(dialstate dialer)

先来看一下 Server 服务中关于任务的计划和执行的流程图:

[以太坊源码分析][p2p网络03]:发起TCP连接请求

发起TCP连接请求

  • 1.首先是对字段进行初始化,例如,发来请求连接的远程节点列表 peers ,接入的连接数 inboundCount 。最主要的是定义了两种任务列表, runningTasks 运行中的任务列表,和 queuedTasks 排队等待中的任务列表。
    • runningTasks 是指执行一个任务(即发起一个TCP连接请求),就将该任务加入到 runningTasks 列表中。完成任务后移除。
    • queuedTasks 是指新建了任务后,将任务加入到 queuedTasks 列表中, queuedTasks 中的任务被执行时从 queuedTasks 中移除,加入到 runningTasks 中。
  • 2.定义了三种对任务进行的操作: scheduleTasks 计划任务, startTasks 开始任务, delTask 删除任务。
    • delTaskrunningTasks 移除给定的单个任务。
    delTask := func(t task) {
       // 循环查找到该任务,然后移除。
       for i := range runningTasks {
           if runningTasks[i] == t {
               runningTasks = append(runningTasks[:i], runningTasks[i+1:]...)
               break
           }
       }
    }
    • startTasks 如果运行中的任务数量没有达到 maxActiveDialTasks 最大活跃的任务数量(默认为16个),则开始执行任务。每一个任务都是一个单独的线程。任务的执行通过调用 t.Do(srv) 进行,任务完成后将任务传入 taskdone 通道。

      执行中的任务加入 runningTasks 列表中。最终返回 ts 列表中未执行的任务。

    startTasks := func(ts []task) (rest []task) {
         i := 0
         for ; len(runningTasks) < maxActiveDialTasks && i < len(ts); i++ {
             t := ts[i]
             srv.log.Trace("New dial task", "task", t)
             // 分配线程,开始执行任务。
             go func() { t.Do(srv); taskdone <- t }()
             // 运行任务列表中加入该任务。
             runningTasks = append(runningTasks, t)
         }
         return ts[i:]
    }
    • scheduleTasks 先执行 queuedTasks 列表中的任务, queuedTasks 列表中未被执行的任务将被保留。
    • 然后如果运行中的任务数量没有达到最大活跃的任务数量,则调用 dialstate.newTasks 新建任务,接着执行刚刚新建任务。未被执行的任务也加入到 queuedTasks 列表中,等待循环的下一次执行。
    scheduleTasks := func() {
      // 执行queuedTasks列表中的任务。
      queuedTasks = append(queuedTasks[:0], startTasks(queuedTasks)...)
      // 如果运行中的任务数量没有达到最大的拨号数量
      if len(runningTasks) < maxActiveDialTasks {
          // 新建拨号任务
          nt := dialstate.newTasks(len(runningTasks)+len(queuedTasks), peers, time.Now())
          // 先执行新建的任务,新建的任务中未被执行的任务加入到queuedTasks列表中
          queuedTasks = append(queuedTasks, startTasks(nt)...)
      }
    }
  • 3.主要的处理循环:
    • 开启了计划任务函数,由于开始任务函数包含在计划函数里,所以这里开始了新建任务以及并发的执行任务。 running 是运行与否的标志位。
      running:
        for {
            scheduleTasks()
            // 处理消息
            ...
        }
    • 接下来是一个内容很多的 select case 结构,处理接收到的内容。
    • 比如说, case n := <-srv.addtrusted: 触发后,将某个节点标记为受信任的节点。 case n := <-srv.removetrusted: 触发后,移除某个受信任的节点。
    • 比较重要的是 case c := <-srv.addpeer: ,需要 新建远程节点 的时候触发。也是说这个 case 会在上一篇中介绍的节点协议握手成功之后, srv.addpeer 的通道中加入与远程节点的连接的时候触发。
      case c := <-srv.addpeer:
          // 协议握手检查
          err := srv.protoHandshakeChecks(peers, inboundCount, c)
          if err == nil {
              // 握手完成,通过所有检查。
              p := newPeer(c, srv.Protocols)
              ...
              // 执行远程节点。
              go srv.runPeer(p)
              // 加入连接请求的peers列表。
              peers[c.node.ID()] = p
              // 接入节点数加1。
              if p.Inbound() {
                  inboundCount++
              }
          }
          select {
          case c.cont <- err:
          case <-srv.quit:
              break running
          }
  • 4.最后是循环完毕,关闭节点发现,断开与全部节点的连接,并清空 peers

4.Do 执行任务

dial.goDo(srv *Server)

上述 startTasks 开始任务中执行任务的具体过程。

func (t *dialTask) Do(srv *Server) {
    // 判断节点是否完整,不完整的节点表示没有ip地址。
    if t.dest.Incomplete() {
        // 解析,使用Kad的方法查找到该节点的ip地址。
        if !t.resolve(srv) {
            return
        }
    }
    // 拨号
    err := t.dial(srv, t.dest)
    ...
    }
}
  • 先判断节点的完整性,不完整的话解析获取该节点的ip地址。
  • 然后进行拨号。
func (t *dialTask) dial(srv *Server, dest *enode.Node) error {
    // fd是一个连接
    fd, err := srv.Dialer.Dial(dest)
    ...
    return srv.SetupConn(mfd, t.flags, dest)
}
  • dial ,拨号,调用了 golang 自带的 net.Dialer.Dial 方法建立TCP连接,然后使用 srv.SetupConn 方法进行加密握手和协议握手。
  • 在上一节 监听连接 中, srv.SetupConn 的第三个传入字段是 nil ,因为我们是监听连接的,所以还无该节点公钥。这一次是 发起请求 ,所以我们知道该远程节点的公钥,在加密握手之后,可以将我们拥有的公钥与远程节点发来的公钥进行验证。

5.总结

  • 1.节点发起TCP连接请求是通过创建任务,执行任务实现的,以任务的形式,可以更好的控制建立连接的数量,也方便并发的发起连接请求。
  • 2.监听TCP连接和发起TCP请求相辅相成。监听连接负责接收远程节点的TCP连接,以及建立与远程节点的加密通道;发起请求负责向远程节点发送TCP连接请求,以及执行建立了加密通道后的远程节点(的协议)。

以上所述就是小编给大家介绍的《[以太坊源码分析][p2p网络03]:发起TCP连接请求》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Web Data Mining

Web Data Mining

Bing Liu / Springer / 2011-6-26 / CAD 61.50

Web mining aims to discover useful information and knowledge from Web hyperlinks, page contents, and usage data. Although Web mining uses many conventional data mining techniques, it is not purely an ......一起来看看 《Web Data Mining》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

html转js在线工具
html转js在线工具

html转js在线工具

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具