内容简介:上一节介绍的是底层p2p网络开启后,监听别的远程节点发送来的TCP连接请求。这一节是个续集,要介绍的是本地节点如何向远程节点这一次,是我们打电话cal别人。但是这一次有点不同,我们是以究竟是怎么回事,请往下看。
上一节介绍的是底层p2p网络开启后,监听别的远程节点发送来的TCP连接请求。这一节是个续集,要介绍的是本地节点如何向远程节点 发起TCP连接请求 。
这一次,是我们打电话cal别人。但是这一次有点不同,我们是以 做任务 的形式向别人发起拨号,而且每次做 很多个 任务。每一个任务里都包含了 连接类型 和 远程节点信息 。
究竟是怎么回事,请往下看。
0.索引
01.从 Server
服务开始
02.初始化拨号状态,以及创建任务
03.计划任务和开启任务
04.Do 执行任务
05.总结
1.从 Server
服务开始
Server服务启动,也开始了拨号
是两个单独的协程,所以并不是监听后再发起连接。)
2.初始化拨号状态,以及创建任务
dialstate 拨号状态
dialstate
是 p2p/dial.go
中的核心结构体,管理拨号(发起TCP连接请求)和查找节点的操作。
通过 newDialState
来新建它。关于 dialstate
字段的含义在下方的注释中。
func newDialState(static []*enode.Node, bootnodes []*enode.Node, ntab discoverTable, maxdyn int, netrestrict *netutil.Netlist) *dialstate { s := &dialstate{ maxDynDials: maxdyn, // 最大的拨号任务数量 ntab: ntab, // k桶 netrestrict: netrestrict, // ip网络的列表 static: make(map[enode.ID]*dialTask), // 静态节点 dialing: make(map[enode.ID]connFlag), // 拨号中,connFlag有4种拨号类型 bootnodes: make([]*enode.Node, len(bootnodes)), // 初始引导节点 randomNodes: make([]*enode.Node, maxdyn/2), // 在k桶种随机查找节点,数量为最大拨号任务数量的二分之一 hist: new(dialHistory), // 记录最近的拨号 } // 加入初始引导节点 copy(s.bootnodes, bootnodes) // 加入静态节点 for _, n := range static { s.addStatic(n) } return s }
其中加入了两种节点, bootnodes
和 static
。 bootnodes
是初始引导节点,在节点没有接收到任何节点的连接请求,也没有节点可以给我们邻居节点的时候,就去连接 bootnodes
,它硬编码在了以太坊的源码中。 static
是静态节点,如果我们想和某些节点保持长期的连接,就把它们加入到静态节点的列表中。
newTasks 新建任务
新建任务就是将某一些远程节点打包成任务,(一个任务对应一个远程节点),最终返回一个任务列表。执行任务就是给任务中的远程节点发起TCP连接请求。
以下是新建任务的流程图:
新建任务
- 1.设置最大的任务数量,这个是是由节点最大连接数除以拨号比率得出的,即
maxPeers/radio
得到。needDynDials := s.maxDynDials
- 2.判断
peers
里是否有已经建立连接的节点,peers
是向本地节点发来连接请求的远程节点的集合。记录数量,最大任务数量减去这个数。for _, p := range peers { if p.rw.is(dynDialedConn) { needDynDials-- } }
- 3.判断服务中是否有正在拨号的节点。记录数量,最大任务数量减去这个数。
for _, flag := range s.dialing { if flag&dynDialedConn != 0 { needDynDials-- } }
- 4.向设置的静态节点
s.static
发起连接请求,这个不消耗最大任务数量。 - 5.如果发来连接请求的远程节点集合
peers
为空,并且经过了设置的时间fallbackInterval
20s,会随机的连接一个引导节点bootnode
。最大任务数量减1。if len(peers) == 0 && len(s.bootnodes) > 0 && needDynDials > 0 && now.Sub(s.start) > fallbackInterval { // 将第一个bootnode放在列表最后,使每一次取的bootnode都是不一样的。 bootnode := s.bootnodes[0] s.bootnodes = append(s.bootnodes[:0], s.bootnodes[1:]...) s.bootnodes = append(s.bootnodes, bootnode) if addDial(dynDialedConn, bootnode) { needDynDials-- } }
- 6.
ReadRandomNodes
随机的从UDP节点发现中使用Kad算法维护的k桶里,提取randomCandidates
个节点。randomCandidates
为最大任务数量的二分之一。(可能会提取不到这么多个,实际提取到的数量为n
。)
最大任务数量减去n
。randomCandidates := needDynDials / 2 if randomCandidates > 0 { n := s.ntab.ReadRandomNodes(s.randomNodes) for i := 0; i < randomCandidates && i < n; i++ { if addDial(dynDialedConn, s.randomNodes[i]) { needDynDials-- } } }
- 7.如果还不满足最大任务数量的话,从
s.lookupBuf
里提取,直到达到最大任务数量。s.lookupBuf
里的节点也是通过Kad获取节点的方式获取到的,如果s.lookupBuf
里节点数量不够,则创建发现任务discoverTask
进行节点发现,填充s.lookupBuf
。// 从lookupBuf里提取节点。 i := 0 for ; i < len(s.lookupBuf) && needDynDials > 0; i++ { if addDial(dynDialedConn, s.lookupBuf[i]) { needDynDials-- } } // 去掉被提取出来的节点。 s.lookupBuf = s.lookupBuf[:copy(s.lookupBuf, s.lookupBuf[i:])] // 数量不够的话,进行节点发现。 if len(s.lookupBuf) < needDynDials && !s.lookupRunning { s.lookupRunning = true newtasks = append(newtasks, &discoverTask{}) }
- 8.如果没有需要执行的任务,会执行等待任务
waitExpireTask
,也就是,保持拨号逻辑继续运行。if nRunning == 0 && len(newtasks) == 0 && s.hist.Len() > 0 { t := &waitExpireTask{s.hist.min().exp.Sub(now)} newtasks = append(newtasks, t) }
上述过程,即完成了一次任务的创建,结果是得到一个任务列表 newtasks
。
3.计划任务和开启任务
( server.go
的 run(dialstate dialer)
)
先来看一下 Server
服务中关于任务的计划和执行的流程图:
发起TCP连接请求
- 1.首先是对字段进行初始化,例如,发来请求连接的远程节点列表
peers
,接入的连接数inboundCount
。最主要的是定义了两种任务列表,runningTasks
运行中的任务列表,和queuedTasks
排队等待中的任务列表。-
runningTasks
是指执行一个任务(即发起一个TCP连接请求),就将该任务加入到runningTasks
列表中。完成任务后移除。 -
queuedTasks
是指新建了任务后,将任务加入到queuedTasks
列表中,queuedTasks
中的任务被执行时从queuedTasks
中移除,加入到runningTasks
中。
-
- 2.定义了三种对任务进行的操作:
scheduleTasks
计划任务,startTasks
开始任务,delTask
删除任务。-
delTask
在runningTasks
移除给定的单个任务。
delTask := func(t task) { // 循环查找到该任务,然后移除。 for i := range runningTasks { if runningTasks[i] == t { runningTasks = append(runningTasks[:i], runningTasks[i+1:]...) break } } }
-
startTasks
如果运行中的任务数量没有达到maxActiveDialTasks
最大活跃的任务数量(默认为16个),则开始执行任务。每一个任务都是一个单独的线程。任务的执行通过调用t.Do(srv)
进行,任务完成后将任务传入taskdone
通道。执行中的任务加入
runningTasks
列表中。最终返回ts
列表中未执行的任务。
startTasks := func(ts []task) (rest []task) { i := 0 for ; len(runningTasks) < maxActiveDialTasks && i < len(ts); i++ { t := ts[i] srv.log.Trace("New dial task", "task", t) // 分配线程,开始执行任务。 go func() { t.Do(srv); taskdone <- t }() // 运行任务列表中加入该任务。 runningTasks = append(runningTasks, t) } return ts[i:] }
-
scheduleTasks
先执行queuedTasks
列表中的任务,queuedTasks
列表中未被执行的任务将被保留。 - 然后如果运行中的任务数量没有达到最大活跃的任务数量,则调用
dialstate.newTasks
新建任务,接着执行刚刚新建任务。未被执行的任务也加入到queuedTasks
列表中,等待循环的下一次执行。
scheduleTasks := func() { // 执行queuedTasks列表中的任务。 queuedTasks = append(queuedTasks[:0], startTasks(queuedTasks)...) // 如果运行中的任务数量没有达到最大的拨号数量 if len(runningTasks) < maxActiveDialTasks { // 新建拨号任务 nt := dialstate.newTasks(len(runningTasks)+len(queuedTasks), peers, time.Now()) // 先执行新建的任务,新建的任务中未被执行的任务加入到queuedTasks列表中 queuedTasks = append(queuedTasks, startTasks(nt)...) } }
-
- 3.主要的处理循环:
- 开启了计划任务函数,由于开始任务函数包含在计划函数里,所以这里开始了新建任务以及并发的执行任务。
running
是运行与否的标志位。running: for { scheduleTasks() // 处理消息 ... }
- 接下来是一个内容很多的
select case
结构,处理接收到的内容。 - 比如说,
case n := <-srv.addtrusted:
触发后,将某个节点标记为受信任的节点。case n := <-srv.removetrusted:
触发后,移除某个受信任的节点。 - 比较重要的是
case c := <-srv.addpeer:
,需要 新建远程节点 的时候触发。也是说这个case
会在上一篇中介绍的节点协议握手成功之后,srv.addpeer
的通道中加入与远程节点的连接的时候触发。case c := <-srv.addpeer: // 协议握手检查 err := srv.protoHandshakeChecks(peers, inboundCount, c) if err == nil { // 握手完成,通过所有检查。 p := newPeer(c, srv.Protocols) ... // 执行远程节点。 go srv.runPeer(p) // 加入连接请求的peers列表。 peers[c.node.ID()] = p // 接入节点数加1。 if p.Inbound() { inboundCount++ } } select { case c.cont <- err: case <-srv.quit: break running }
- 开启了计划任务函数,由于开始任务函数包含在计划函数里,所以这里开始了新建任务以及并发的执行任务。
- 4.最后是循环完毕,关闭节点发现,断开与全部节点的连接,并清空
peers
。
4.Do 执行任务
( dial.go
的 Do(srv *Server)
)
上述 startTasks
开始任务中执行任务的具体过程。
func (t *dialTask) Do(srv *Server) { // 判断节点是否完整,不完整的节点表示没有ip地址。 if t.dest.Incomplete() { // 解析,使用Kad的方法查找到该节点的ip地址。 if !t.resolve(srv) { return } } // 拨号 err := t.dial(srv, t.dest) ... } }
- 先判断节点的完整性,不完整的话解析获取该节点的ip地址。
- 然后进行拨号。
func (t *dialTask) dial(srv *Server, dest *enode.Node) error { // fd是一个连接 fd, err := srv.Dialer.Dial(dest) ... return srv.SetupConn(mfd, t.flags, dest) }
-
dial
,拨号,调用了golang
自带的net.Dialer.Dial
方法建立TCP连接,然后使用srv.SetupConn
方法进行加密握手和协议握手。 - 在上一节 监听连接 中,
srv.SetupConn
的第三个传入字段是nil
,因为我们是监听连接的,所以还无该节点公钥。这一次是 发起请求 ,所以我们知道该远程节点的公钥,在加密握手之后,可以将我们拥有的公钥与远程节点发来的公钥进行验证。
5.总结
- 1.节点发起TCP连接请求是通过创建任务,执行任务实现的,以任务的形式,可以更好的控制建立连接的数量,也方便并发的发起连接请求。
- 2.监听TCP连接和发起TCP请求相辅相成。监听连接负责接收远程节点的TCP连接,以及建立与远程节点的加密通道;发起请求负责向远程节点发送TCP连接请求,以及执行建立了加密通道后的远程节点(的协议)。
以上所述就是小编给大家介绍的《[以太坊源码分析][p2p网络03]:发起TCP连接请求》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- Apache Eagle联合发起人陈浩:如何发起和维护开源项目
- 计划发起一个练习算法项目
- 无视HTTPS发起中间人攻击
- Spring Boot 发起 HTTP 请求
- 伊组织对美发起新一轮攻击
- 使用 PowerShell 发起 HTTP REST请求
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Python高性能编程
【美】 戈雷利克 (Micha Gorelick)、【美】 欧日沃尔德(Ian Ozsvald) / 人民邮电出版社 / 2017-7-1 / 79
本书共有12章,围绕如何进行代码优化和加快实际应用的运行速度进行详细讲解。本书主要包含以下主题:计算机内部结构的背景知识、列表和元组、字典和集合、迭代器和生成器、矩阵和矢量计算、并发、集群和工作队列等。最后,通过一系列真实案例展现了在应用场景中需要注意的问题。 本书适合初级和中级Python程序员、有一定Python语言基础想要得到进阶和提高的读者阅读。一起来看看 《Python高性能编程》 这本书的介绍吧!
Markdown 在线编辑器
Markdown 在线编辑器
HEX HSV 转换工具
HEX HSV 互换工具