百万 Go TCP 连接的思考2: 百万连接的吞吐率和延迟

栏目: Go · 发布时间: 5年前

内容简介:tl;dr上一篇这几种服务器的实现包括:相关代码已发布到github上:

tl;dr上一篇 epoll方式减少资源占用 介绍了测试环境以及epoll方式实现百万连接的TCP服务器。这篇文章介绍百万连接服务器的几种实现方式,以及它们的吞吐率和延迟。

这几种服务器的实现包括: epollmultiple epollerpreforkworkerpool

相关代码已发布到github上: 1m-go-tcp-server

三、 epoll服务器加上吞吐率指标

上一篇已经介绍了epoll方式的实现,为了测试吞吐率,我们需要通过传递特殊的数据来计算。

客户端将它发送数据时的时间戳传给服务器,这个时间戳只需要8个字节,服务器不需要任何改动,只需要原封不动的将数据回传给客户端:

......
var (
	opsRate = metrics.NewRegisteredMeter("ops", nil)
)

func start() {
	for {
		connections, err := epoller.Wait()
		if err != nil {
			log.Printf("failed to epoll wait %v", err)
			continue
		}
		for _, conn := range connections {
			if conn == nil {
				break
			}

			// 将消息(时间戳)原封不动的写回
			_, err = io.CopyN(conn, conn,8)
			if err != nil {
				if err := epoller.Remove(conn); err != nil {
					log.Printf("failed to remove %v", err)
				}
				conn.Close()
			}

			opsRate.Mark(1)
		}
	}
}

这里epoll我们并没有注册为边缘触发的方式,默认是水平触发的方式。

每次读取8个字节(时间戳),然后返回给客户端。同时metric记录一次。

metric库使用的是 rcrowley/go-metrics

四、客户端也修改为epoll方式

客户端不再发送 hello world 数据,而是当前的时间戳,收到服务器的返回后,就可以计算出一次请求的总共的花费(延迟,latency),然后发送下一个请求。

所以客户端的测试并不是pipeline的方式,以下所有的测试都不是pipeline的方式,而是收到返回再发下一个请求。

客户端也需要改成epoll的方式,原先一个goroutine轮训所有的连接的方式性能比较底下,所以改成epoll的方式:

package main

import (
	"encoding/binary"
	"flag"
	"fmt"
	"log"
	"net"
	"os"
	"syscall"
	"time"

	"github.com/rcrowley/go-metrics"
)

var (
	ip          = flag.String("ip", "127.0.0.1", "server IP")
	connections = flag.Int("conn",1, "number of tcp connections")
	startMetric = flag.String("sm", time.Now().Format("2006-01-02T15:04:05 -0700"), "start time point of all clients")
)

var (
	opsRate = metrics.NewRegisteredTimer("ops", nil)
)
var epoller *epoll

// client改造成epoll方式, 处理epoll消息是单线程的
func main() {
	flag.Parse()

	go func() {
		startPoint, err := time.Parse("2006-01-02T15:04:05 -0700", *startMetric)
		if err != nil {
			panic(err)
		}
		time.Sleep(startPoint.Sub(time.Now()))

		metrics.Log(metrics.DefaultRegistry,5*time.Second, log.New(os.Stderr, "metrics: ", log.Lmicroseconds))
	}()

	var err error
	epoller, err = MkEpoll()
	if err != nil {
		panic(err)
	}

	addr := *ip + ":8972"
	log.Printf("连接到 %s", addr)
	var conns []net.Conn
	for i :=0; i < *connections; i++ {
		c, err := net.DialTimeout("tcp", addr,10*time.Second)
		if err != nil {
			fmt.Println("failed to connect", i, err)
			i--
			continue
		}
		if err := epoller.Add(c); err != nil {
			log.Printf("failed to add connection %v", err)
			c.Close()
		}
		conns = append(conns, c)
	}

	log.Printf("完成初始化 %d 连接", len(conns))

	tts := time.Second
	if *connections >100 {
		tts = time.Millisecond *5
	}

	go start()

	for i :=0; i < len(conns); i++ {
		time.Sleep(tts)
		conn := conns[i]
		err = binary.Write(conn, binary.BigEndian, time.Now().UnixNano())
		if err != nil {
			log.Printf("failed to write timestamp %v", err)
			if err := epoller.Remove(conn); err != nil {
				if err := epoller.Remove(conn); err != nil {
					log.Printf("failed to remove %v", err)
				}
			}
		}
	}

	select {}
}

func start() {
	var nano int64
	for {
		connections, err := epoller.Wait()
		if err != nil {
			log.Printf("failed to epoll wait %v", err)
			continue
		}
		for _, conn := range connections {
			if conn == nil {
				break
			}

			if err := binary.Read(conn, binary.BigEndian, &nano); err != nil {
				log.Printf("failed to read %v", err)
				if err := epoller.Remove(conn); err != nil {
					log.Printf("failed to remove %v", err)
				}

				conn.Close()
				continue
			} else {
				opsRate.Update(time.Duration(time.Now().UnixNano() - nano))
			}

			err = binary.Write(conn, binary.BigEndian, time.Now().UnixNano())
			if err != nil {
				log.Printf("failed to write %v", err)
				if err := epoller.Remove(conn); err != nil {
					log.Printf("failed to remove %v", err)
				}
				conn.Close()
			}
		}
	}
}

使用的epoll实现代码和服务器端是一样的。

客户端的统计会遇到一个问题,因为我们会启动50个 docker 容器,计算客户端的吞吐率的时候我们需要统计同一个时间段内这50个容器所有的请求和延迟。这里我们用了一个小小的技巧,让metrics库再同一个时间打印出它们的统计数据,基本可以保证统计的是这50个容器的同一个时间段内的指标。

数据分析

这里我们对50个容器的日志进行统计, 汇总吞吐率进行相加,可以得到吞吐率(TPS)为 42495 , 延迟(latency)为 23秒

五、客户端改为多个epoller

在上面的实现中,我们的客户端使用一个epoller处理所有的请求, 在事件监听的处理中,使用一个goroutine处理接收的所有的事件,如果处理事件比较慢,这个单一的goroutine将会是严重的瓶颈。

所以我们要把它改成多goroutine的方式去处理。一种方式是启动一个线程池,采用多event loop的方式处理事件,另外一种方式是使用多个epoller, 每个epoller处理一批连接,每个epoller独自占用一个goroutine。 我们的客户端采用第二种方式,实现起来比较简单。

Linux的Accept和epoller都曾有惊群的现象,也就是一个一个事件到来后会唤醒所有的监听的线程,目前这个问题应该已经不存在了。

client.go
func main() {
 flag.Parse()

 setLimit()
 go func() {
 startPoint, _ := time.Parse("2006-01-02T15:04:05 -0700", *startMetric)
 time.Sleep(startPoint.Sub(time.Now()))

 metrics.Log(metrics.DefaultRegistry,5*time.Second, log.New(os.Stderr, "metrics: ", log.Lmicroseconds))
 }()

 addr := *ip + ":8972"
 log.Printf("连接到 %s", addr)

 for i :=0; i < *c; i++ {
 go mkClient(addr, *connections/(*c))
 }

 select {}
}

func mkClient(addr string, connections int) {
 epoller, err := MkEpoll()
 if err != nil {
 panic(err)
 }

 var conns []net.Conn
 for i :=0; i < connections; i++ {
 c, err := net.DialTimeout("tcp", addr,10*time.Second)
 if err != nil {
 fmt.Println("failed to connect", i, err)
 i--
 continue
 }
 if err := epoller.Add(c); err != nil {
 log.Printf("failed to add connection %v", err)
 c.Close()
 }
 conns = append(conns, c)
 }

 log.Printf("完成初始化 %d 连接", len(conns))

 go start(epoller)

 tts := time.Second
 if *c >100 {
 tts = time.Millisecond *5
 }

 for i :=0; i < len(conns); i++ {
 time.Sleep(tts)
 conn := conns[i]
 err = binary.Write(conn, binary.BigEndian, time.Now().UnixNano())
 if err != nil {
 log.Printf("failed to write timestamp %v", err)
 if err := epoller.Remove(conn); err != nil {
 if err := epoller.Remove(conn); err != nil {
 log.Printf("failed to remove %v", err)
 }
 }
 }
 }

 select {}
}

func start(epoller *epoll) {
 ...... //同上
}

测试脚本稍微一下,增加一个epoller数量的控制:

CONNECTIONS=$1
REPLICAS=$2
IP=$3
CONCURRENCY=$4

DATE=`date -d "+2 minutes" +"%FT%T %z"`

for (( c=0; c<${REPLICAS}; c++ ))
do
    docker run -v $(pwd)/mclient:/client --name 1mclient_$c -d alpine /client \
    -conn=${CONNECTIONS} -ip=${IP} -c=${CONCURRENCY} -sm "${DATE}"
done

数据分析

这里我们对50个容器的日志进行统计, 汇总吞吐率进行相加,可以得到吞吐率(TPS)为 42402 , 延迟(latency)为 0.8秒

吞吐率并没有增加,但是得益于我们客户端可以并发的处理消息,可以大大减小事务的延迟,将相关的延迟可以降低到一秒以下。

六、服务器改为多个epoller

基于我们上面客户端使用多个epoller的启发,我们可以修改服务器端也采用多个epoller的方式,看看是否能增加吞吐率或者降低延迟。

server.go
package main

import (
 "flag"
 "io"
 "log"
 "net"
 "net/http"
 _ "net/http/pprof"
 "os"
 "syscall"
 "time"

 "github.com/libp2p/go-reuseport"
 "github.com/rcrowley/go-metrics"
)

var (
 c = flag.Int("c",10, "concurrency")
)

var (
 opsRate = metrics.NewRegisteredMeter("ops", nil)
)

func main() {
 flag.Parse()

 go metrics.Log(metrics.DefaultRegistry,5*time.Second, log.New(os.Stderr, "metrics: ", log.Lmicroseconds))

 go func() {
 if err := http.ListenAndServe(":6060", nil); err != nil {
 log.Fatalf("pprof failed: %v", err)
 }
 }()

 for i :=0; i < *c; i++ {
 go startEpoll()
 }

 select {}
}

func startEpoll() {
 ln, err := reuseport.Listen("tcp", ":8972")
 if err != nil {
 panic(err)
 }

 epoller, err := MkEpoll()
 if err != nil {
 panic(err)
 }

 go start(epoller)

 for {
 conn, e := ln.Accept()
 if e != nil {
 if ne, ok := e.(net.Error); ok && ne.Temporary() {
 log.Printf("accept temp err: %v", ne)
 continue
 }

 log.Printf("accept err: %v", e)
 return
 }

 if err := epoller.Add(conn); err != nil {
 log.Printf("failed to add connection %v", err)
 conn.Close()
 }
 }
}

func start(epoller *epoll) {
 for {
 connections, err := epoller.Wait()
 if err != nil {
 log.Printf("failed to epoll wait %v", err)
 continue
 }
 for _, conn := range connections {
 if conn == nil {
 break
 }
 io.CopyN(conn, conn,8)
 if err != nil {
 if err := epoller.Remove(conn); err != nil {
 log.Printf("failed to remove %v", err)
 }
 conn.Close()
 }

 opsRate.Mark(1)
 }
 }
}

和客户端的类似,我们启动了多个epoller。这里我们使用 reuseport 库启动多个goroutine监听同一个端口,这个特性应该在较新的 Linux 内核上已经支持, 内核会负责负载均衡。

当然我们也可以启动一个goroutine进行监听,接收到客户端的请求后在交给某个epoller进行处理(随机或者轮询),我们就负责连接的负载均衡。

再或者,多个goroutine可以同时调用同一个 listener.Accept 方法,对 Accept 进行竞争。

后面的处理逻辑和单个的epoller的方式是一样的,只不过我们使用多个goroutine进行处理。

数据分析

这里我们对50个容器的日志进行统计, 汇总吞吐率进行相加,可以得到吞吐率(TPS)为 197814 , 延迟(latency)为 0.9秒

以下所有的测试都使用多epoller的客户端,下面的比较也是针对多epoller的客户端的测试:

和单poller的服务器实现相比较,多epoller的服务器客户端吞吐率大幅增加,而延迟略微增加。

七、 prefork实现服务器

Prefork 是Apache实现的一种服务方式。一个单一的控制进程启动的时候负责启动多个 子进程 ,每个子进程都是独立的,使用单一的goroutine处理消息事件。

这是一个有趣的实现方式,子进程可以共享父进程打开的文件,这样我们就可以把net.Listener传给子进程,让所有的子进程共同监听这个端口。

传递给子进程的文件是通过 exec.Cmd.ExtraFiles 字段进行传递的:

type Cmd struct {
    ......
    // ExtraFiles specifies additional open files to be inherited by the
    // new process. It does not include standard input, standard output, or
    // standard error. If non-nil, entry i becomes file descriptor 3+i.
    //
    // ExtraFiles is not supported on Windows.
    ExtraFiles []*os.File
    ......
}

正如注释中所指出的,传递的第 i 个文件在子进程中的文件描述符为 3+i ,所以如果父进程中启动子进程的命令如下的话:

a_file_descriptor, _ := tcplistener.File()

children[i] = exec.Command(os.Args[0], "-prefork", "-child")
children[i].Stdout = os.Stdout
children[i].Stderr = os.Stderr
children[i].ExtraFiles = []*os.File{a_file_descriptor}

子进程你可以这样得到这个父进程的文件:

listener, err = net.FileListener(os.NewFile(3, ""))

我们实现的是父进程和子进程共享同一个listener的方式, 如果你使用reuseport在每个子进程打开同一个端口应该也是可以的,这样就父子之间不需要共享同一个文件了。

完整的服务器实现如下:

package main

import (
	"flag"
	"io"
	"log"
	"net"
	"os"
	"os/exec"
	"syscall"
)

var (
	c       = flag.Int("c",10, "concurrency")
	prefork = flag.Bool("prefork", false, "use prefork")
	child   = flag.Bool("child", false, "is child proc")
)

func main() {
    flag.Parse()
    
	var ln net.Listener
	var err error

	if *prefork {
		ln = doPrefork(*c)
	} else {
		ln, err = net.Listen("tcp", ":8972")
		if err != nil {
			panic(err)
		}
	}

	startEpoll(ln)

	select {}
}

func startEpoll(ln net.Listener) {
	epoller, err := MkEpoll()
	if err != nil {
		panic(err)
	}

	go start(epoller)

	for {
		conn, e := ln.Accept()
		if e != nil {
			if ne, ok := e.(net.Error); ok && ne.Temporary() {
				log.Printf("accept temp err: %v", ne)
				continue
			}

			log.Printf("accept err: %v", e)
			return
		}

		if err := epoller.Add(conn); err != nil {
			log.Printf("failed to add connection %v", err)
			conn.Close()
		}
	}
}

func doPrefork(c int) net.Listener {
	var listener net.Listener
	if !*child {
		addr, err := net.ResolveTCPAddr("tcp", ":8972")
		if err != nil {
			log.Fatal(err)
		}
		tcplistener, err := net.ListenTCP("tcp", addr)
		if err != nil {
			log.Fatal(err)
		}
		fl, err := tcplistener.File()
		if err != nil {
			log.Fatal(err)
		}
		children := make([]*exec.Cmd, c)
		for i := range children {
			children[i] = exec.Command(os.Args[0], "-prefork", "-child")
			children[i].Stdout = os.Stdout
			children[i].Stderr = os.Stderr
			children[i].ExtraFiles = []*os.File{fl}
			err = children[i].Start()
			if err != nil {
				log.Fatalf("failed to start child: %v", err)
			}
		}
		for _, ch := range children {
			if err := ch.Wait(); err != nil {
				log.Printf("failed to wait child's starting: %v", err)
			}
		}
		os.Exit(0)
	} else {
		var err error
		listener, err = net.FileListener(os.NewFile(3, ""))
		if err != nil {
			log.Fatal(err)
		}
	}
	return listener
}

func start(epoller *epoll) {
	for {
		connections, err := epoller.Wait()
		if err != nil {
			log.Printf("failed to epoll wait %v", err)
			continue
		}
		for _, conn := range connections {
			if conn == nil {
				break
			}
			io.CopyN(conn, conn,8)
			if err != nil {
				if err := epoller.Remove(conn); err != nil {
					log.Printf("failed to remove %v", err)
				}
				conn.Close()
			}
		}
	}
}

数据分析

服务器启动50个子进程: ./server -c 50 -prefork

客户端还是一样: ./setupm.sh 20000 50 172.17.0.1 10

这里我们对50个容器的日志进行统计, 汇总吞吐率进行相加,可以得到吞吐率(TPS)为 444415 , 延迟(latency)为 1.5秒

和多poller的服务器实现相比较,prefork的服务器客户端吞吐率又大大幅增加,而延迟相对长一些了,比多poller的实现延迟翻倍。

八、 服务器实现workerpool

从单个poller的代码分析可知,单goroutine处理消息到来的事件可能会有瓶颈,尤其是并发量比较大的情况下,无法使用多核的优势,因为我们采用多poller、prefork的方式可以并发地处理到来的消息,这里还有一种Reactor的方式,将I/O goroutine和业务goroutine分离, I/O goroutine采用单goroutine的方式,监听的消息交给一个goroutine池 (workerpool)去处理,这样可以并行的处理业务消息,而不会阻塞I/O goroutine。

这里实现的消息读取也是在 workerpool 中实现的, 一般更通用的方式是I/O goroutine解析出消息, 将解析好的消息再交给workerpool去处理。我们这里的例子比较简单,所以读取消息也在workerpool中实现。

worker pool的实现如下:

workerpool.go
package main

import (
 "io"
 "log"
 "net"
 "sync"
)

type pool struct {
 workers int
 maxTasks int
 taskQueue chan net.Conn

 mu sync.Mutex
 closed bool
 done chan struct{}
}

func newPool(w int, t int) *pool {
 return &pool{
 workers: w,
 maxTasks: t,
 taskQueue: make(chan net.Conn, t),
 done: make(chan struct{}),
 }
}

func (p *pool) Close() {
 p.mu.Lock()
 p.closed = true
 close(p.done)
 close(p.taskQueue)
 p.mu.Unlock()
}

func (p *pool) addTask(conn net.Conn) {
 p.mu.Lock()
 if p.closed {
 p.mu.Unlock()
 return
 }
 p.mu.Unlock()

 p.taskQueue <- conn
}

func (p *pool) start() {
 for i :=0; i < p.workers; i++ {
 go p.startWorker()
 }
}

func (p *pool) startWorker() {
 for {
 select {
 case <-p.done:
 return
 case conn := <-p.taskQueue:
 if conn != nil {
 handleConn(conn)
 }
 }
 }
}

func handleConn(conn net.Conn) {
 _, err := io.CopyN(conn, conn,8)
 if err != nil {
 if err := epoller.Remove(conn); err != nil {
 log.Printf("failed to remove %v", err)
 }
 conn.Close()
 }
 opsRate.Mark(1)
}

服务器端代码改造:

server.go
var epoller *epoll
var workerPool *pool

func main() {
 flag.Parse()

 go metrics.Log(metrics.DefaultRegistry,5*time.Second, log.New(os.Stderr, "metrics: ", log.Lmicroseconds))

 ln, err := net.Listen("tcp", ":8972")
 if err != nil {
 panic(err)
 }

 go func() {
 if err := http.ListenAndServe(":6060", nil); err != nil {
 log.Fatalf("pprof failed: %v", err)
 }
 }()

 workerPool = newPool(*c,1000000)
 workerPool.start()

 epoller, err = MkEpoll()
 if err != nil {
 panic(err)
 }

 go start()

 for {
 conn, e := ln.Accept()
 if e != nil {
 if ne, ok := e.(net.Error); ok && ne.Temporary() {
 log.Printf("accept temp err: %v", ne)
 continue
 }

 log.Printf("accept err: %v", e)
 return
 }

 if err := epoller.Add(conn); err != nil {
 log.Printf("failed to add connection %v", err)
 conn.Close()
 }
 }

 workerPool.Close()
}

func start() {
 for {
 connections, err := epoller.Wait()
 if err != nil {
 log.Printf("failed to epoll wait %v", err)
 continue
 }
 for _, conn := range connections {
 if conn == nil {
 break
 }

 workerPool.addTask(conn)
 }
 }
}

数据分析

服务器启动50个子进程: ./server -c 50 -prefork

客户端还是一样: ./setupm.sh 20000 50 172.17.0.1 10

这里我们对50个容器的日志进行统计, 汇总吞吐率进行相加,可以得到吞吐率(TPS)为 190022 , 延迟(latency)为 0.3秒

总结

吞吐率 (tps) 延迟 (latency)
单epoller(单epoller client) 42495 23s
单epoller 42402 0.8s
多epoller 197814 0.9s
prefork 444415 1.5s
workerpool 190022 0.3s

从上表可以看出,客户端的实现对测试结果影响也是巨大的,不过实际我们的客户端分布在不同的节点上,而不像我们的测试不得不使用同一台机器启动百万个节点,所以下面的测试都是通过多epoller client进行测试的,尽量让客户端能并发的处理消息。

从测试结果来看, 在百万并发的情况下, workerpool的实现还是不错的, 既能达到很高的吞吐率(19万), 还能取得 0.3秒的延迟, 而且使用小量的goroutine的worker pool也不会占用太多的系统资源。prefork可以大幅提高吞吐率,但是延迟要稍微长一些。

以上是在巨量连接情况下的各种实现的吞吐率和延迟的测试,这是一类的应用场景, 还有一类很大的应用场景, 比如企业内的服务通讯, 连接数并不会很多,我们将介绍这类场景下几种实现方案的吞吐率和延迟。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Adobe Flex 大师之路

Adobe Flex 大师之路

2009-5 / 69.80元

《Adobe Flex大师之路》以Flex 3.0为基础,涵盖了Flex技术的核心内容。《Adobe Flex 大师之路》能够帮助您你学习并掌握使用Flex所需的牢靠和全面的知识基础,告诉您你如何把这些知识将之转化为代码,并在实际项目中变通应用。通过学习《Adobe Flex 大师之路》,您你可以利用Flex来构建企业级应用的表现层、改善应用的用户体验、集成企业端的复杂服务。这本书是为所有希望学习......一起来看看 《Adobe Flex 大师之路》 这本书的介绍吧!

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具