Golang实现平滑重启(优雅重启)

栏目: Go · 发布时间: 5年前

内容简介:最近在看traefik的源代码, 看到其中有一个功能是平滑重启, 不过他是通过一个叫做上面说的graceful这个库,如它在Github的简介所说: "Graceful is a Go package enabling graceful shutdown of an http.Handler server." 只提供了优雅关闭, 不提供优雅重启. 那么什么叫做优雅关闭呢? 意思就是服务器要关闭了, 会拒绝新的连接,但是老的连接不会被强制关闭,而是 会等待一定时间, 等待客户端主动关闭, 除非客户端一直没有

最近在看traefik的源代码, 看到其中有一个功能是平滑重启, 不过他是通过一个叫做 graceful 的库来做到的, 是在HTTP Server的层级. 于是我探索了一下方案, 在TCP层级做了一个demo出来.

先看traefik的实现方案

上面说的graceful这个库,如它在Github的简介所说: "Graceful is a Go package enabling graceful shutdown of an http.Handler server." 只提供了优雅关闭, 不提供优雅重启. 那么什么叫做优雅关闭呢? 意思就是服务器要关闭了, 会拒绝新的连接,但是老的连接不会被强制关闭,而是 会等待一定时间, 等待客户端主动关闭, 除非客户端一直没有关闭, 到了预设的超时时间才进行服务器端关闭.

我们来看看traefik是怎么做的:

func main() { // goroutine 0
    goAway := false
    go func() {  // goroutine 1
        sig := <-sigs
        fmt.Println("I have to go...", sig)
        goAway = true
        srv.Stop(10 * time.Second)
    }()

    for{
        if (goAway){
            break
        }
        fmt.Println("Started")
        srv = &graceful.Server{
            Timeout: 10 * time.Second,
            NoSignalHandling: true,

            ConnState: func(conn net.Conn, state http.ConnState) {
                fmt.Println( "Connection ", state)
            },

            Server: &http.Server{
                Addr: ":8001",
                Handler: userRouter,
            },
        }

        go srv.ListenAndServe()  // goroutine 2
        <- srv.StopChan() // goroutine 0
        fmt.Println("Stopped")
    }
}

可以看到, 我们用 goroutine + 数字 来表示所注释的代码会在哪个goroutine里执行, main函数我们假设是在goroutine 0里执行.

sigs

而追踪进去就会发现, srv.StopChan() 是一个确保 graceful Server正确初始化 srv.stopChan 的函数. 搜索stopChan, 我们可以看到

func (srv *Server) shutdown(shutdown chan chan struct{}, kill chan struct{}) {
    // Request done notification
    done := make(chan struct{})
    shutdown <- done

    srv.stopLock.Lock()
    defer srv.stopLock.Unlock()
    if srv.Timeout > 0 {
        select {
        case <-done:
        case <-time.After(srv.Timeout):
            close(kill)
        }
    } else {
        <-done
    }
    // Close the stopChan to wake up any blocked goroutines.
    srv.chanLock.Lock()
    if srv.stopChan != nil {
        close(srv.stopChan)
    }
    srv.chanLock.Unlock()
}

在调用shutdown之后, 就会关闭这个channel, 然后上面所说的for循环就会重新初始化. 于是似乎就实现了一次 "平滑重启". 为什么打 引号呢? 因为在关闭服务器端的监听和下一次for循环重新执行到 srv.ListenAndServe() 之间的这一段时间间隙, 很有可能会有新的 连接到来却因为服务器端没有监听而连接失败. 所以这个实现和我们直接执行 sudo systemctl restart nginx 是类似的.

更详细的traefik源码分析我会另外再写一篇博客来分析, 这里就此打住. 接下来来看一下简单的在TCP层面实现平滑重启的服务器.

TCP平滑重启

首先我们来看看怎么起一个TCP服务器:

package main

import (
    "fmt"
    "net"
)

func handleConnection(conn net.Conn) {
    conn.Write([]byte("hello"))
    conn.Close()
}

func main() {
    ln, err := net.Listen("tcp", ":8080")
    if err != nil {
        fmt.Println(err)
    } else {
        fmt.Println(ln.Addr())
    }
    for {
        if conn, err := ln.Accept(); err == nil {
            fmt.Println("new conn...")
            go handleConnection(conn)
        }
    }
}

为了测试,我们写一个 Python 脚本(Python果然还是更加简洁):

import socket


def foo():
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.connect(("127.0.0.1", 8080))
    s.close()


if __name__ == "__main__":
    while True:
        foo()

执行之后就可以看到输出.

我们知道多个进程是不可以监听在同一个(IP地址,端口号)对上的, 即,不能对同一对(IP地址,端口号) 执行多次listen函数,我们可以做个实验,把ListenAndServe抽出来,起另外一个goroutine去执行,为了方便 区分,我们加入一个参数,就是goroutine的名字:

package main

import (
    "fmt"
    "net"
)

func handleConnection(conn net.Conn) {
    conn.Write([]byte("hello"))
    conn.Close()
}

func ListenAndServe(name string) {
    ln, err := net.Listen("tcp", ":8080")
    if err != nil {
        fmt.Println(err)
    } else {
        fmt.Println(ln.Addr())
    }

    fmt.Println(name)
    for {
        if conn, err := ln.Accept(); err == nil {
            go handleConnection(conn)
        }
    }
}

func main() {
    go ListenAndServe("server1")
    ListenAndServe("server2")
}

执行一下:

$ go run t.go
[::]:8080
server2
listen tcp :8080: bind: address already in use
server1
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x20 pc=0x4dc9a4]

但是我们可以在同一个socket对上, 共享同一个监听套接字地址, 然后在多个goroutine中执行accept函数:

package main

import (
    "fmt"
    "net"
)

func handleConnection(conn net.Conn) {
    conn.Write([]byte("hello"))
    conn.Close()
}

func ListenAndServe(ln net.Listener, name string) {
    for {
        if conn, err := ln.Accept(); err == nil {
            fmt.Println(name)
            go handleConnection(conn)
        }
    }
}

func main() {
    ln, err := net.Listen("tcp", ":8080")
    if err != nil {
        fmt.Println(err)
    } else {
        fmt.Println(ln.Addr())
    }

    go ListenAndServe(ln, "server1")
    ListenAndServe(ln, "server2")
}

但是这还远远不是平滑重启, 只是证明了平滑重启是可行的, 毕竟平滑重启的前提就是在父子进程中能够 共享同一个套接字, 而且在不同的地方可以进行 accept 操作. 接下来我们来看一下怎么fork, 然后带上 socket套接字的文件描述符, 然后再在子进程中重新把套接字描述符还原成 tcp.Listener :

  • 先来看怎么把套接字转换成文件描述符, 传递给另外一个goroutine, 然后这个goroutine还原成listener:
package main

import (
    "fmt"
    "net"
)

func handleConnection(conn net.Conn) {
    conn.Write([]byte("hello"))
    conn.Close()
}

func listenAndServe(ln net.Listener, name string) {
    for {
        if conn, err := ln.Accept(); err == nil {
            fmt.Println(name)
            go handleConnection(conn)
        }
    }
}

func main() {
    ln, err := net.Listen("tcp", ":8080")
    if err != nil {
        fmt.Println(err)
    } else {
        fmt.Println(ln.Addr())
    }
    l := ln.(*net.TCPListener)
    newFile, _ := l.File()
    fmt.Println(newFile.Fd())

    anotherListener, _ := net.FileListener(newFile)

    go listenAndServe(anotherListener, "listener 1")
    listenAndServe(ln, "listener 2")
}

接下来我们要在 go 中进行fork并且传递文件描述符, 查看了文档, 可以通过 exec.Cmd 里的 ExtraFiles []*os.File 来传递:

package main

import (
    "flag"
    "fmt"
    "net"
    "os"
    "os/exec"
)

var (
    graceful = flag.Bool("graceful", false, "-graceful")
)

func handleConnection(conn net.Conn) {
    conn.Write([]byte("hello"))
    conn.Close()
}

func listenAndServe(ln net.Listener, name string) {
    for {
        if conn, err := ln.Accept(); err == nil {
            fmt.Println(name)
            go handleConnection(conn)
        }
    }
}

func gracefulRestart() {
    ln, err := net.FileListener(os.NewFile(3, "graceful server"))
    if err != nil {
        fmt.Println(err)
    } else {
        fmt.Println(ln)
    }

    listenAndServe(ln, "graceful server")
}

func main() {
    flag.Parse()
    fmt.Printf("given args: %t\n", *graceful)

    if *graceful {
        gracefulRestart()
    } else {
        ln, err := net.Listen("tcp", ":8080")
        if err != nil {
            fmt.Println(err)
        } else {
            fmt.Println(ln.Addr())
        }
        l := ln.(*net.TCPListener)
        newFD, _ := l.File()
        fmt.Println(newFD.Fd())

        cmd := exec.Command(os.Args[0], "-graceful")
        cmd.Stdin, cmd.Stdout, cmd.Stderr = os.Stdin, os.Stdout, os.Stderr
        cmd.ExtraFiles = []*os.File{newFD}
        cmd.Run()
    }
}

当然我们还可以做的更好, 例如让graceful server支持再次graceful restart, 于是代码变成了这样:

ckage main

import (
    "flag"
    "fmt"
    "net"
    "os"
    "os/exec"
    "os/signal"
    "syscall"
)

var (
    graceful = flag.Bool("graceful", false, "-graceful")
)

// Accepted accepted connection
type Accepted struct {
    conn net.Conn
    err  error
}

func handleConnection(conn net.Conn) {
    conn.Write([]byte("hello"))
    conn.Close()
}

func listenAndServe(ln net.Listener, sig chan os.Signal) {
    accepted := make(chan Accepted, 1)
    go func() {
        for {
            conn, err := ln.Accept()
            accepted <- Accepted{conn, err}
        }
    }()

    for {
        select {
        case a := <-accepted:
            if a.err == nil {
                fmt.Println("handle connection")
                go handleConnection(a.conn)
            }
        case _ = <-sig:
            fmt.Println("gonna fork and run")
            forkAndRun(ln)
            break
        }
    }
}

func gracefulListener() net.Listener {
    ln, err := net.FileListener(os.NewFile(3, "graceful server"))
    if err != nil {
        fmt.Println(err)
    }

    return ln
}

func firstBootListener() net.Listener {
    ln, err := net.Listen("tcp", ":8080")
    if err != nil {
        fmt.Println(err)
    }

    return ln
}

func forkAndRun(ln net.Listener) {
    l := ln.(*net.TCPListener)
    newFile, _ := l.File()
    fmt.Println(newFile.Fd())

    cmd := exec.Command(os.Args[0], "-graceful")
    cmd.Stdin, cmd.Stdout, cmd.Stderr = os.Stdin, os.Stdout, os.Stderr
    cmd.ExtraFiles = []*os.File{newFile}
    cmd.Run()
}

func main() {
    flag.Parse()
    fmt.Printf("given args: %t, pid: %d\n", *graceful, os.Getpid())
    c := make(chan os.Signal, 1)
    signal.Notify(c, syscall.SIGUSR1)

    var ln net.Listener
    if *graceful {
        ln = gracefulListener()
    } else {
        ln = firstBootListener()
    }

    listenAndServe(ln, c)
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Persuasive Technology

Persuasive Technology

B.J. Fogg / Morgan Kaufmann / 2002-12 / USD 39.95

Can computers change what you think and do? Can they motivate you to stop smoking, persuade you to buy insurance, or convince you to join the Army? "Yes, they can," says Dr. B.J. Fogg, directo......一起来看看 《Persuasive Technology》 这本书的介绍吧!

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器