内容简介:在上一篇 文章中,我们介绍了frp中的一些 概念和基础知识,这一篇中,我们在此前的基础之上,来看看frp是怎么实现TCP内网穿透的。我们知道,要使用frp,必须有个服务端,然后要有个客户端。因此,我们从这里开始入手。可以参考 《如何阅读源代码》:
在上一篇 文章中,我们介绍了frp中的一些 概念和基础知识,这一篇中,我们在此前的基础之上,来看看frp是怎么实现TCP内网穿透的。
我们知道,要使用frp,必须有个服务端,然后要有个客户端。因此,我们从这里开始入手。
可以参考 《如何阅读源代码》: https://jiajunhuang.com/articles/2018_08_04-how_to_read_source_code.md.html
frps
cmd/frps/main.go
是frps的入口处,我们从这里开始, main
函数的主体:
func main() { crypto.DefaultSalt = "frp" Execute() }
因此我们需要看到 Execute()
函数的内容,其实它是使用了 cobra
这个库,所以实际的入口在
var rootCmd = &cobra.Command{ Use: "frps", Short: "frps is the server of frp (https://github.com/fatedier/frp)", RunE: func(cmd *cobra.Command, args []string) error { if showVersion { fmt.Println(version.Full()) return nil } var err error if cfgFile != "" { var content string content, err = config.GetRenderedConfFromFile(cfgFile) if err != nil { return err } g.GlbServerCfg.CfgFile = cfgFile err = parseServerCommonCfg(CfgFileTypeIni, content) } else { err = parseServerCommonCfg(CfgFileTypeCmd, "") } if err != nil { return err } err = runServer() if err != nil { fmt.Println(err) os.Exit(1) } return nil }, }
最终,也就是 runServer()
这个函数:
func runServer() (err error) { log.InitLog(g.GlbServerCfg.LogWay, g.GlbServerCfg.LogFile, g.GlbServerCfg.LogLevel, g.GlbServerCfg.LogMaxDays) svr, err := server.NewService() if err != nil { return err } log.Info("Start frps success") server.ServerService = svr svr.Run() return }
svr.Run()
,其中的 svr
是来自 server.NewService()
,仔细看一下, server.NewService()
其实就是初始化了一大堆东西。
我们直接看 svr.Run()
做了什么:
func (svr *Service) Run() { if svr.rc.NatHoleController != nil { go svr.rc.NatHoleController.Run() } if g.GlbServerCfg.KcpBindPort > 0 { go svr.HandleListener(svr.kcpListener) } go svr.HandleListener(svr.websocketListener) go svr.HandleListener(svr.tlsListener) svr.HandleListener(svr.listener) }
可以看到,最后frps会执行到 svr.HandleListener(svr.listener)
,前面的都是什么 nat hole punching, kcp, websocket, tls等等,我们不看。直接看tcp。
func (svr *Service) HandleListener(l frpNet.Listener) { // Listen for incoming connections from client. for { c, err := l.Accept() if err != nil { log.Warn("Listener for incoming connections from client closed") return } c = frpNet.CheckAndEnableTLSServerConn(c, svr.tlsConfig) // Start a new goroutine for dealing connections. go func(frpConn frpNet.Conn) { ... } } }
这里就是监听之后,每来一个新的连接,就起一个goroutine去处理,也就是 go func()...
这一段,然后我们看看内容:
switch m := rawMsg.(type) { case *msg.Login: err = svr.RegisterControl(conn, m) // If login failed, send error message there. // Otherwise send success message in control's work goroutine. if err != nil { conn.Warn("%v", err) msg.WriteMsg(conn, &msg.LoginResp{ Version: version.Full(), Error: err.Error(), }) conn.Close() } case *msg.NewWorkConn: svr.RegisterWorkConn(conn, m) case *msg.NewVisitorConn: if err = svr.RegisterVisitorConn(conn, m); err != nil { conn.Warn("%v", err) msg.WriteMsg(conn, &msg.NewVisitorConnResp{ ProxyName: m.ProxyName, Error: err.Error(), }) conn.Close() } else { msg.WriteMsg(conn, &msg.NewVisitorConnResp{ ProxyName: m.ProxyName, Error: "", }) } default: log.Warn("Error message type for the new connection [%s]", conn.RemoteAddr().String()) conn.Close() }
这就是服务端启动之后,卡住的地方了。客户端建立连接之后,会发送一个消息,它的类型可能是 msg.Login
, msg.NewWorkConn
, msg.NewVisitorConn
。上一篇我们说了,visitor
是用于stcp也就是端对端加密通信的,我们不看。workConn就是用于转发流量的,Login就是新的客户端连上去之后进行启动。
frpc
同样,我们从 cmd/frpc/main.go
看起:
func main() { crypto.DefaultSalt = "frp" sub.Execute() }
跳转到 sub.Execute()
:
var rootCmd = &cobra.Command{ Use: "frpc", Short: "frpc is the client of frp (https://github.com/fatedier/frp)", RunE: func(cmd *cobra.Command, args []string) error { if showVersion { fmt.Println(version.Full()) return nil } // Do not show command usage here. err := runClient(cfgFile) if err != nil { fmt.Println(err) os.Exit(1) } return nil }, } func Execute() { if err := rootCmd.Execute(); err != nil { os.Exit(1) } }
然后我们看 runClient
函数:
func runClient(cfgFilePath string) (err error) { var content string content, err = config.GetRenderedConfFromFile(cfgFilePath) if err != nil { return } g.GlbClientCfg.CfgFile = cfgFilePath err = parseClientCommonCfg(CfgFileTypeIni, content) if err != nil { return } pxyCfgs, visitorCfgs, err := config.LoadAllConfFromIni(g.GlbClientCfg.User, content, g.GlbClientCfg.Start) if err != nil { return err } err = startService(pxyCfgs, visitorCfgs) return }
基本上就是解析配置文件(因为frpc启动的时候要一个配置文件),然后执行 startService
:
func startService(pxyCfgs map[string]config.ProxyConf, visitorCfgs map[string]config.VisitorConf) (err error) { log.InitLog(g.GlbClientCfg.LogWay, g.GlbClientCfg.LogFile, g.GlbClientCfg.LogLevel, g.GlbClientCfg.LogMaxDays) if g.GlbClientCfg.DnsServer != "" { s := g.GlbClientCfg.DnsServer if !strings.Contains(s, ":") { s += ":53" } // Change default dns server for frpc net.DefaultResolver = &net.Resolver{ PreferGo: true, Dial: func(ctx context.Context, network, address string) (net.Conn, error) { return net.Dial("udp", s) }, } } svr, errRet := client.NewService(pxyCfgs, visitorCfgs) if errRet != nil { err = errRet return } // Capture the exit signal if we use kcp. if g.GlbClientCfg.Protocol == "kcp" { go handleSignal(svr) } err = svr.Run() if g.GlbClientCfg.Protocol == "kcp" { <-kcpDoneCh } return }
同样的,执行 client.NewService
之后执行 svr.Run()
,我们看看 svr.Run()
是什么:
func (svr *Service) Run() error { // first login for { conn, session, err := svr.login() if err != nil { log.Warn("login to server failed: %v", err) // if login_fail_exit is true, just exit this program // otherwise sleep a while and try again to connect to server if g.GlbClientCfg.LoginFailExit { return err } else { time.Sleep(10 * time.Second) } } else { // login success ctl := NewControl(svr.runId, conn, session, svr.pxyCfgs, svr.visitorCfgs) ctl.Run() svr.ctlMu.Lock() svr.ctl = ctl svr.ctlMu.Unlock() break } } go svr.keepControllerWorking() if g.GlbClientCfg.AdminPort != 0 { err := svr.RunAdminServer(g.GlbClientCfg.AdminAddr, g.GlbClientCfg.AdminPort) if err != nil { log.Warn("run admin server error: %v", err) } log.Info("admin server listen on %s:%d", g.GlbClientCfg.AdminAddr, g.GlbClientCfg.AdminPort) } <-svr.closedCh return nil }
可以看到,客户端启动之后,就是一个 for
循环,写入 login
信息,也就是刚才 frps
里的 msg.loginMsg
,
然后起一个 goroutine
执行 keepControllerWorking()
,之后主 goroutine
就阻塞在 <-svr.closedCh
。
看看 keepControllerWorking()
的内容:
func (svr *Service) keepControllerWorking() { maxDelayTime := 20 * time.Second delayTime := time.Second for { <-svr.ctl.ClosedDoneCh() if atomic.LoadUint32(&svr.exit) != 0 { return } for { log.Info("try to reconnect to server...") conn, session, err := svr.login() if err != nil { log.Warn("reconnect to server error: %v", err) time.Sleep(delayTime) delayTime = delayTime * 2 if delayTime > maxDelayTime { delayTime = maxDelayTime } continue } // reconnect success, init delayTime delayTime = time.Second ctl := NewControl(svr.runId, conn, session, svr.pxyCfgs, svr.visitorCfgs) ctl.Run() svr.ctlMu.Lock() svr.ctl = ctl svr.ctlMu.Unlock() break } } }
基本上就是一个循环,里面最终是为了成功连接然后执行 ctl.Run()
:
func (ctl *Control) Run() { go ctl.worker() // start all proxies ctl.pm.Reload(ctl.pxyCfgs) // start all visitors go ctl.vm.Run() return } // If controler is notified by closedCh, reader and writer and handler will exit func (ctl *Control) worker() { go ctl.msgHandler() go ctl.reader() go ctl.writer() select { case <-ctl.closedCh: // close related channels and wait until other goroutines done close(ctl.readCh) ctl.readerShutdown.WaitDone() ctl.msgHandlerShutdown.WaitDone() close(ctl.sendCh) ctl.writerShutdown.WaitDone() ctl.pm.Close() ctl.vm.Close() close(ctl.closedDoneCh) if ctl.session != nil { ctl.session.Close() } return } } // msgHandler handles all channel events and do corresponding operations. func (ctl *Control) msgHandler() { defer func() { if err := recover(); err != nil { ctl.Error("panic error: %v", err) ctl.Error(string(debug.Stack())) } }() defer ctl.msgHandlerShutdown.Done() hbSend := time.NewTicker(time.Duration(g.GlbClientCfg.HeartBeatInterval) * time.Second) defer hbSend.Stop() hbCheck := time.NewTicker(time.Second) defer hbCheck.Stop() ctl.lastPong = time.Now() for { select { case <-hbSend.C: // send heartbeat to server ctl.Debug("send heartbeat to server") ctl.sendCh <- &msg.Ping{} case <-hbCheck.C: if time.Since(ctl.lastPong) > time.Duration(g.GlbClientCfg.HeartBeatTimeout)*time.Second { ctl.Warn("heartbeat timeout") // let reader() stop ctl.conn.Close() return } case rawMsg, ok := <-ctl.readCh: if !ok { return } switch m := rawMsg.(type) { case *msg.ReqWorkConn: go ctl.HandleReqWorkConn(m) case *msg.NewProxyResp: ctl.HandleNewProxyResp(m) case *msg.Pong: ctl.lastPong = time.Now() ctl.Debug("receive heartbeat from server") } } } } // reader read all messages from frps and send to readCh func (ctl *Control) reader() { defer func() { if err := recover(); err != nil { ctl.Error("panic error: %v", err) ctl.Error(string(debug.Stack())) } }() defer ctl.readerShutdown.Done() defer close(ctl.closedCh) encReader := crypto.NewReader(ctl.conn, []byte(g.GlbClientCfg.Token)) for { if m, err := msg.ReadMsg(encReader); err != nil { if err == io.EOF { ctl.Debug("read from control connection EOF") return } else { ctl.Warn("read error: %v", err) ctl.conn.Close() return } } else { ctl.readCh <- m } } } // writer writes messages got from sendCh to frps func (ctl *Control) writer() { defer ctl.writerShutdown.Done() encWriter, err := crypto.NewWriter(ctl.conn, []byte(g.GlbClientCfg.Token)) if err != nil { ctl.conn.Error("crypto new writer error: %v", err) ctl.conn.Close() return } for { if m, ok := <-ctl.sendCh; !ok { ctl.Info("control writer is closing") return } else { if err := msg.WriteMsg(encWriter, m); err != nil { ctl.Warn("write message to control connection error: %v", err) return } } } }
reader
从frps收信息,然后写到 ctl.readCh
这个 channel 里, writer
则相反,
从 ctl.sendCh
收信息,写到 frps,而 msgHandler
则从frpc里读取信息,放到 ctl.sendCh
,
从 ctl.readCh
读取信息,处理之。
之所以这样设计,是为了能够异步处理所有消息。看看 msgHandler
的关键部分:
switch m := rawMsg.(type) { case *msg.ReqWorkConn: go ctl.HandleReqWorkConn(m) case *msg.NewProxyResp: ctl.HandleNewProxyResp(m) case *msg.Pong: ctl.lastPong = time.Now() ctl.Debug("receive heartbeat from server") }
我们说过了,frps 每次收到一个请求之后,然后下发一个指令给frpc,要求frpc建立连接,
然后frps再把新来的连接与请求所在的连接串起来,完成代理, msg.ReqWorkConn
就是这个指令。
那么 frps
是在哪里下发指令的呢?
frps 下发指令
每当公网来一个新的请求的时候,frps就会下发一个指令给frpc,要求建立一个新的连接,代码如下:
func (pxy *BaseProxy) GetWorkConnFromPool(src, dst net.Addr) (workConn frpNet.Conn, err error) { // try all connections from the pool for i := 0; i < pxy.poolCount+1; i++ { if workConn, err = pxy.getWorkConnFn(); err != nil { pxy.Warn("failed to get work connection: %v", err) return } pxy.Info("get a new work connection: [%s]", workConn.RemoteAddr().String()) workConn.AddLogPrefix(pxy.GetName()) var ( srcAddr string dstAddr string srcPortStr string dstPortStr string srcPort int dstPort int ) if src != nil { srcAddr, srcPortStr, _ = net.SplitHostPort(src.String()) srcPort, _ = strconv.Atoi(srcPortStr) } if dst != nil { dstAddr, dstPortStr, _ = net.SplitHostPort(dst.String()) dstPort, _ = strconv.Atoi(dstPortStr) } message := msg.StartWorkConn{ ProxyName: pxy.GetName(), SrcAddr: srcAddr, SrcPort: uint16(srcPort), DstAddr: dstAddr, DstPort: uint16(dstPort), } err := msg.WriteMsg(workConn, &message) workConn.Warn("===== HERE! HERE! HERE!, message is: %+v", message) if err != nil { workConn.Warn("failed to send message to work connection from pool: %v, times: %d", err, i) workConn.Close() } else { break } } if err != nil { pxy.Error("try to get work connection failed in the end") return } return }
那个 ===== HERE! HERE! HERE!, message is: %+v
是我加上去的,为了方便看每次服务端下发什么信息。我们看的是TCP的代理,
它继承于 server/proxy/proxy.go
里的 BaseProxy
,实现是在 server/proxy/tcp.go
里的 type TcpProxy struct
。
那么什么时候会初始化 TcpProxy
呢?我们注意到,frps 接收的消息里,就有一种是消息类型是 msg.NewProxy
,这是客户端和
服务端都启动,并且客户端成功login之后,客户端发送给服务端的消息。也就是 frpc
的配置文件里具体的代理,例如:
[common] server_addr = xxxxx server_port = 12345 tls_enable = true [ssh] type = tcp local_ip = 127.0.0.1 local_port = 22 remote_port = 12346
中的 ssh
就是一个TCP代理。收到 msg.NewProxy
之后,服务端会起一个新的监听器监听在对应的端口,然后开始处理请求。
总结
这一篇中,我们在第一篇的基础之上看了frpc和frps的交互流程,了解了frp是如何进行TCP代理的。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
引爆社群:移动互联网时代的新4C法则(第2版)
唐兴通 / 机械工业出版社 / 69.00元
社群已经被公认为是这个时代的商业新形态,原有的商业逻辑和方法被颠覆,新的基于社群的商业体系和规则亟待构建,今天几乎所有的企业都在为此而努力,都在摸索中前行。 本书提出的“新4C法则”为社群时代的商业践行提供了一套科学的、有效的、闭环的方法论,第1版上市后获得了大量企业和读者的追捧,“新4C法则”在各行各业被大量解读和应用,积累了越来越多的成功案例,被公认为是社群时代通用的方法论。也因此,第1......一起来看看 《引爆社群:移动互联网时代的新4C法则(第2版)》 这本书的介绍吧!