TiDB源码阅读(一) TiDB的入口

栏目: IT技术 · 发布时间: 5年前

内容简介:写这个东西也只是因为想简单掌握下 TiDB 的源码,同事给了一些阅读思路,很赞。有些地方如果理解的有问题还请批评教育,对 Go 语言理解的比较有限。如果不小心误导了读者,请见谅

写这个东西也只是因为想简单掌握下 TiDB 的源码,同事给了一些阅读思路,很赞。

有些地方如果理解的有问题还请批评教育,对 Go 语言理解的比较有限。

如果不小心误导了读者,请见谅

TiDB 模块是使用 Go 语言开发的,使用 GoLand 编译器就可以了。

TiDB源码阅读(一) TiDB的入口

JetBrains出品

阅读源码,要寻找好的切入点,我们选择 main.go [1] 作为阅读源码的入口。

tidb-server/main.go

这里的 main 函数可以 debug ,也是 TiDB 启动的开头。

TiDB源码阅读(一) TiDB的入口

稍微简化一下

func main() {
   registerStores()
   registerMetrics()
   config.InitializeConfig(*configPath, *configCheck, *configStrict, reloadConfig, overrideConfig)
   if config.GetGlobalConfig().OOMUseTmpStorage {
      config.GetGlobalConfig().UpdateTempStoragePath()
      initializeTempDir()
   }
   setCPUAffinity()
   setupLog()
   setupTracing() 
   setupBinlogClient()
   setupMetrics()
   createStoreAndDomain()
   createServer()
   runServer()
}

可以看出,启动流程做的每个步骤都按照函数封装好了,大致了解一下都做什么

// 注册store
func registerStores() {
    err := kvstore.Register("tikv", tikv.Driver{}) //注册TiKV
    tikv.NewGCHandlerFunc = gcworker.NewGCWorker //为TiKV生成GCworker
    err = kvstore.Register("mocktikv", mockstore.MockDriver{}) //注册默认存储引擎MockTiKV
}
//共注册100+ prometheus监控项,这里只表一项
func RegisterMetrics() { 
    prometheus.MustRegister(AutoAnalyzeCounter)
}
// get全局config
func InitializeConfig(confPath string, configCheck, configStrict bool, reloadFunc ConfReloadFunc, enforceCmdArgs func(*Config)) {
    cfg := GetGlobalConfig() 
    StoreGlobalConfig(cfg)
}

这个判断

if config.GetGlobalConfig().OOMUseTmpStorage {
        config.GetGlobalConfig().UpdateTempStoragePath()
        initializeTempDir()
}

设置是否在单条 SQL 语句的内存使用超出 mem-quota-query [2] 限制时为某些算子启用临时磁盘。故若为 true ,则初始化 TempStoragePath 。

// 设置CPU亲和性
func setCPUAffinity() { /
    err := linux.SetAffinity(cpu)
    runtime.GOMAXPROCS(len(cpu))
}
//配置系统log
func setupLog() { 
    err = logutil.InitLogger(cfg.Log.ToLogConfig()) // 这里配置格式、文件名、slowlog等
}
//注册分布式系统追踪链 jaeger
func setupTracing() {
    tracingCfg := cfg.OpenTracing.ToTracingConfig()
    tracingCfg.ServiceName = "TiDB"
    tracer, _, err := tracingCfg.NewTracer()
    opentracing.SetGlobalTracer(tracer)
}
// 设置binlog信息
func setupBinlogClient() {
    if !cfg.Binlog.Enable { //若binlog.enable=false,则不开启binlog
        return
    }
    if cfg.Binlog.IgnoreError { //若为true,则忽略binlog报错
        binloginfo.SetIgnoreError(true)
    }
    if len(cfg.Binlog.BinlogSocket) == 0 { //配置binlog输出网络地址
        ...
    }
    binloginfo.SetPumpsClient(client) //配置binlog信息到pump客户端
}
// 配置监控
func setupMetrics() {
    runtime.SetMutexProfileFraction(10)// 对锁调用的跟踪
    systimeErrHandler := func() {
                // 表示TiDB的进程是否仍然存在。
                // 若10分钟内tidb_monitor_keep_alive_total               
                // 次数<100,TiDB可能退出,此时会报警
        metrics.TimeJumpBackCounter.Inc() 
    }
    callBackCount := 0
    sucessCallBack := func() {
        callBackCount++
        if callBackCount >= 5 {
            callBackCount = 0
            metrics.KeepAliveCounter.Inc() // KeepAlive监控 
        }
    }
}
// 启动了一些重要的后台进程
func createStoreAndDomain() {
    fullPath := fmt.Sprintf("%s://%s", cfg.Store, cfg.Path)
    storage, err = kvstore.New(fullPath)
    dom, err = session.BootstrapSession(storage)}
}
// 创建TiDB server
func createServer() {
    driver := server.NewTiDBDriver(storage)
    svr, err = server.NewServer(cfg, driver)
    svr.SetDomain(dom)
}
//启动服务
runServer()

可以看到, runServer() 是启动TiDB流程中的的最后一步。

所以我们跳转到 server.Run() 中,

这里有很多接受请求时的异常处理逻辑,在此不表。简化一下大概有四步,如下:

if s.cfg.Status.ReportStatus {
    s.startStatusHTTP() //配置路由信息
}
for{
    conn, err := s.listener.Accept()// 监听客户端请求
    clientConn := s.newConn(conn)// 创建connection
    go s.onConn(clientConn)// 使用connection处理请求
}

首先配置了关于 TiDB 组件的很多路由信息,有兴趣的可以看看。

server 不断监听网络请求,出现新的客户端请求就创建一个新的 connection ,使用一个新 goroutine 来持续为它提供服务。

后续就是通过 go s.onConn(clientConn) 处理客户端请求,我们来一探究竟接下来的流程,简化下代码,大致如下

func (s *Server) onConn(conn *clientConn) {
    ctx := logutil.WithConnID(context.Background(), conn.connectionID)
    if err := conn.handshake(ctx); err != nil {
        if plugin.IsEnable(plugin.Audit) && conn.ctx != nil {
            conn.ctx.GetSessionVars().ConnectionInfo = conn.connectInfo()
            })
        }
    }
    connectedTime := time.Now()
    conn.Run(ctx)
}

将建链 info 写入到 session 中,统计一下链路建立成功的时间,成功后,通过 conn.Run(ctx) 处理客户端请求。

func (cc *clientConn) Run(ctx context.Context) {
    for {
        waitTimeout := cc.getSessionVarsWaitTimeout(ctx)
        cc.pkt.setReadTimeout(time.Duration(waitTimeout) * time.Second)
        data, err := cc.readPacket()
        if err = cc.dispatch(ctx, data); err != nil {
            ...
        }
    }
}

简化后,处理逻辑大致是,

不停的通过 cc.readPacket() 读取客户端发来的网络包。如果这个期间发生了等待网络包超时的现象,则 close connection 。

读取 data 成功后,将它传入 cc.dispatch(ctx,data) ,这也是处理 SQL 请求的入口了。

当 SQL 来到 很合理 ,顾名思义,是对不同的 MySQL 协议进行调度,

server/conn.go

Then.

func (cc *clientConn) dispatch(ctx context.Context, data []byte) error { 
    cc.lastPacket = data
    cmd := data[0]
    data = data[1:]
    dataStr := string(hack.String(data))
}

客户端请求MySQL协议报文格式

TiDB 也要根据这个格式进行解析, data[0] 就是 data 的第一个 byte ,其余的是命令。

MySQL 请求报文的命令列表

0x00 COM_SLEEP 内部线程状态
0x01 COM_QUIT 关闭连接
0x02 COM_INIT_DB 切换数据库
0x03 COM_QUERY SQL查询请求
0x04 COM_FIELD_LIST 获取数据表字段信息
0x05 COM_CREATE_DB 创建数据库
0x06 COM_DROP_DB 删除数据库
0x07 COM_REFRESH 清除缓存
0x08 COM_SHUTDOWN 停止服务器
0x09 COM_STATISTICS 获取服务器统计信息
0x0A COM_PROCESS_INFO 获取当前连接的列表
0x0B COM_CONNECT 内部线程状态
0x0C COM_PROCESS_KILL 中断某个连接
0x0D COM_DEBUG 保存服务器调试信息
0x0E COM_PING 测试连通性
0x0F COM_TIME 内部线程状态
0x10 COM_DELAYED_INSERT 内部线程状态
0x11 COM_CHANGE_USER 重新登陆
0x12 COM_BINLOG_DUMP 获取二进制日志信息
0x13 COM_TABLE_DUMP 获取数据表结构信息
0x14 COM_CONNECT_OUT 内部线程状态
0x15 COM_REGISTER_SLAVE 从服务器向主服务器进行注册
0x16 COM_STMT_PREPARE 预处理SQL语句
0x17 COM_STMT_EXECUTE 执行预处理语句
0x18 COM_STMT_SEND_LONG_DATA 发送BLOB类型的数据
0x19 COM_STMT_CLOSE 销毁预处理语句
0x1A COM_STMT_RESET 清除预处理语句参数缓存
0x1B COM_SET_OPTION 设置语句选项
0x1C COM_STMT_FETCH 获取预处理语句的执行结果

我们看一下 dispatch 接下来的部分,也就是目前 TiDB 实现的部分 MySQL 协议了

switch cmd {
    case mysql.ComPing, 
             mysql.ComStmtClose, 
             mysql.ComStmtSendLongData, 
             mysql.ComStmtReset,
         mysql.ComSetOption, 
             mysql.ComChangeUser:
        cc.ctx.SetProcessInfo("", t, cmd, 0)
    case mysql.ComInitDB:
        cc.ctx.SetProcessInfo("use "+dataStr, t, cmd, 0)
    }
switch cmd {
    case mysql.ComSleep:
    case mysql.ComQuit:
    case mysql.ComQuery:
    case mysql.ComPing:
    case mysql.ComInitDB:
    case mysql.ComFieldList:
    case mysql.ComStmtPrepare:
    case mysql.ComStmtExecute:
    case mysql.ComStmtFetch:
    case mysql.ComStmtClose:
    case mysql.ComStmtSendLongData:
    case mysql.ComStmtReset:
    case mysql.ComSetOption:
    case mysql.ComChangeUser
    default: // other not support

目前 TiDB 的绝大部分 SQL 语句都是走的 ComQuery ,感兴趣的同学可以自行 debug 一下。

case mysql.ComQuery: 
    if len(data) > 0 && data[len(data)-1] == 0 {
        data = data[:len(data)-1]
        dataStr = string(hack.String(data))
    }
    return cc.handleQuery(ctx, dataStr)

于是我们仔细看看这个 case , 要去看下 cc.handleQuery(ctx , dataStr) 。

func (cc *clientConn) handleQuery(ctx context.Context, sql string) (err error) {
     stmts, err := cc.ctx.Execute(ctx, sql)
}

之后就要进入解析、优化 SQL 的部分 ,我们还是要先简单理解下 Lex & Yacc。

有疑问加站长微信联系

TiDB源码阅读(一) TiDB的入口

以上所述就是小编给大家介绍的《TiDB源码阅读(一) TiDB的入口》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Ajax for Web Application Developers

Ajax for Web Application Developers

Kris Hadlock / Sams / 2006-10-30 / GBP 32.99

Book Description Reusable components and patterns for Ajax-driven applications Ajax is one of the latest and greatest ways to improve users’ online experience and create new and innovative web f......一起来看看 《Ajax for Web Application Developers》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具