内容简介:nat是网络地址转换的意思。 这部分的源码比较独立而且单一,这里就暂时不分析了。 大家了解基本的功能就行了。nat下面有upnp和pmp两种网络协议。upnp的应用场景(pmp是和upnp类似的协议)
nat是网络地址转换的意思。 这部分的源码比较独立而且单一,这里就暂时不分析了。 大家了解基本的功能就行了。
nat下面有upnp和pmp两种网络协议。
upnp的应用场景(pmp是和upnp类似的协议)
如果用户是通过NAT接入Internet的,同时需要使用BC、电骡eMule等P2P这样的软件,这时UPnP功能就会带来很大的便利。利用UPnP能自动的把BC、电骡eMule等侦听的端口号映射到公网上,以便公网上的用户也能对NAT私网侧发起连接。
主要功能就是提供接口可以把内网的IP+端口 映射为 路由器的IP+端口。 这样就等于内网的程序有了外网的IP地址, 这样公网的用户就可以直接对你进行访问了。 不然就需要通过UDP打洞这种方式来进行访问。
p2p中的UDP协议
现在大部分用户运行的环境都是内网环境。 内网环境下监听的端口,其他公网的程序是无法直接访问的。需要经过一个打洞的过程。 双方才能联通。这就是所谓的UDP打洞。
在p2p代码里面。 peer代表了一条创建好的网络链路。在一条链路上可能运行着多个协议。比如以太坊的协议(eth)。 Swarm的协议。 或者是Whisper的协议。
peer的结构
type protoRW struct { Protocol in chan Msg // receices read messages closed <-chan struct{} // receives when peer is shutting down wstart <-chan struct{} // receives when write may start werr chan<- error // for write results offset uint64 w MsgWriter } // Protocol represents a P2P subprotocol implementation. type Protocol struct { // Name should contain the official protocol name, // often a three-letter word. Name string // Version should contain the version number of the protocol. Version uint // Length should contain the number of message codes used // by the protocol. Length uint64 // Run is called in a new groutine when the protocol has been // negotiated with a peer. It should read and write messages from // rw. The Payload for each message must be fully consumed. // // The peer connection is closed when Start returns. It should return // any protocol-level error (such as an I/O error) that is // encountered. Run func(peer *Peer, rw MsgReadWriter) error // NodeInfo is an optional helper method to retrieve protocol specific metadata // about the host node. NodeInfo func() interface{} // PeerInfo is an optional helper method to retrieve protocol specific metadata // about a certain peer in the network. If an info retrieval function is set, // but returns nil, it is assumed that the protocol handshake is still running. PeerInfo func(id discover.NodeID) interface{} } // Peer represents a connected remote node. type Peer struct { rw *conn running map[string]*protoRW //运行的协议 log log.Logger created mclock.AbsTime wg sync.WaitGroup protoErr chan error closed chan struct{} disc chan DiscReason // events receives message send / receive events if set events *event.Feed }
peer的创建,根据匹配找到当前Peer支持的protomap
func newPeer(conn conn, protocols []Protocol) Peer {
protomap := matchProtocols(protocols, conn.caps, conn) p := &Peer{ rw: conn, running: protomap, created: mclock.Now(), disc: make(chan DiscReason), protoErr: make(chan error, len(protomap)+1), // protocols + pingLoop closed: make(chan struct{}), log: log.New("id", conn.id, "conn", conn.flags), } return p
}
peer的启动, 启动了两个goroutine线程。 一个是读取。一个是执行ping操作。
func (p *Peer) run() (remoteRequested bool, err error) {
var ( writeStart = make(chan struct{}, 1) //用来控制什么时候可以写入的管道。 writeErr = make(chan error, 1) readErr = make(chan error, 1) reason DiscReason // sent to the peer ) p.wg.Add(2) go p.readLoop(readErr) go p.pingLoop() // Start all protocol handlers. writeStart <- struct{}{} //启动所有的协议。 p.startProtocols(writeStart, writeErr) // Wait for an error or disconnect.
loop:
for { select { case err = <-writeErr: // A write finished. Allow the next write to start if // there was no error. if err != nil { reason = DiscNetworkError break loop } writeStart <- struct{}{} case err = <-readErr: if r, ok := err.(DiscReason); ok { remoteRequested = true reason = r } else { reason = DiscNetworkError } break loop case err = <-p.protoErr: reason = discReasonForError(err) break loop case err = <-p.disc: break loop } } close(p.closed) p.rw.close(reason) p.wg.Wait() return remoteRequested, err
}
startProtocols方法,这个方法遍历所有的协议。
func (p *Peer) startProtocols(writeStart <-chan struct{}, writeErr chan<- error) {
p.wg.Add(len(p.running)) for _, proto := range p.running { proto := proto proto.closed = p.closed proto.wstart = writeStart proto.werr = writeErr var rw MsgReadWriter = proto if p.events != nil { rw = newMsgEventer(rw, p.events, p.ID(), proto.Name) } p.log.Trace(fmt.Sprintf("Starting protocol %s/%d", proto.Name, proto.Version)) // 等于这里为每一个协议都开启了一个goroutine。 调用其Run方法。 go func() { // proto.Run(p, rw)这个方法应该是一个死循环。 如果返回就说明遇到了错误。 err := proto.Run(p, rw) if err == nil { p.log.Trace(fmt.Sprintf("Protocol %s/%d returned", proto.Name, proto.Version)) err = errProtocolReturned } else if err != io.EOF { p.log.Trace(fmt.Sprintf("Protocol %s/%d failed", proto.Name, proto.Version), "err", err) } p.protoErr <- err p.wg.Done() }() }
}
回过头来再看看readLoop方法。 这个方法也是一个死循环。 调用p.rw来读取一个Msg(这个rw实际是之前提到的frameRLPx的对象,也就是分帧之后的对象。然后根据Msg的类型进行对应的处理,如果Msg的类型是内部运行的协议的类型。那么发送到对应协议的proto.in队列上面。
func (p *Peer) readLoop(errc chan<- error) {
defer p.wg.Done() for { msg, err := p.rw.ReadMsg() if err != nil { errc <- err return } msg.ReceivedAt = time.Now() if err = p.handle(msg); err != nil { errc <- err return } }
}
func (p *Peer) handle(msg Msg) error {
switch { case msg.Code == pingMsg: msg.Discard() go SendItems(p.rw, pongMsg) case msg.Code == discMsg: var reason [1]DiscReason // This is the last message. We don't need to discard or // check errors because, the connection will be closed after it. rlp.Decode(msg.Payload, &reason) return reason[0] case msg.Code < baseProtocolLength: // ignore other base protocol messages return msg.Discard() default: // it's a subprotocol message proto, err := p.getProto(msg.Code) if err != nil { return fmt.Errorf("msg code out of range: %v", msg.Code) } select { case proto.in <- msg: return nil case <-p.closed: return io.EOF } } return nil
}
在看看pingLoop。这个方法很简单。就是定时的发送pingMsg消息到对端。
func (p *Peer) pingLoop() {
ping := time.NewTimer(pingInterval) defer p.wg.Done() defer ping.Stop() for { select { case <-ping.C: if err := SendItems(p.rw, pingMsg); err != nil { p.protoErr <- err return } ping.Reset(pingInterval) case <-p.closed: return } }
}
最后再看看protoRW的read和write方法。 可以看到读取和写入都是阻塞式的。
func (rw *protoRW) WriteMsg(msg Msg) (err error) {
if msg.Code >= rw.Length { return newPeerError(errInvalidMsgCode, "not handled") } msg.Code += rw.offset select { case <-rw.wstart: //等到可以写入的受在执行写入。 这难道是为了多线程控制么。 err = rw.w.WriteMsg(msg) // Report write status back to Peer.run. It will initiate // shutdown if the error is non-nil and unblock the next write // otherwise. The calling protocol code should exit for errors // as well but we don't want to rely on that. rw.werr <- err case <-rw.closed: err = fmt.Errorf("shutting down") } return err
}
func (rw *protoRW) ReadMsg() (Msg, error) {
select { case msg := <-rw.in: msg.Code -= rw.offset return msg, nil case <-rw.closed: return Msg{}, io.EOF }
}
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 以太坊源码分析(36)ethdb源码分析
- [源码分析] kubelet源码分析(一)之 NewKubeletCommand
- libmodbus源码分析(3)从机(服务端)功能源码分析
- [源码分析] nfs-client-provisioner源码分析
- [源码分析] kubelet源码分析(三)之 Pod的创建
- Spring事务源码分析专题(一)JdbcTemplate使用及源码分析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
系统程序员成长计划
李先静 / 人民邮电出版社 / 2010-04 / 45.00
在学习程序开发的过程中,你是否总是为自己遇到的一些问题头疼不已,你是否还在为写不出代码而心急如焚?作为软件开发人员,你是否时时为自己如何成为一名合格的程序员而困惑不已?没关系,本书将为你排忧解难。 这是一本介绍系统程序开发方法的书。书中结合内容详尽的代码细致讲述了不少底层程序开发基础知识,并在逐步深入的过程中介绍了一些简单实用的应用程序,最后还讲述了一些软件工程方面的内容,内容全面,语言生动......一起来看看 《系统程序员成长计划》 这本书的介绍吧!