内容简介:写这个东西也只是因为想简单掌握下 TiDB 的源码,同事给了一些阅读思路,很赞。有些地方如果理解的有问题还请批评教育,对 Go 语言理解的比较有限。如果不小心误导了读者,请见谅
写这个东西也只是因为想简单掌握下 TiDB 的源码,同事给了一些阅读思路,很赞。
有些地方如果理解的有问题还请批评教育,对 Go 语言理解的比较有限。
如果不小心误导了读者,请见谅
TiDB 模块是使用 Go 语言开发的,使用 GoLand 编译器就可以了。
JetBrains出品
阅读源码,要寻找好的切入点,我们选择 main.go [1] 作为阅读源码的入口。
tidb-server/main.go
这里的 main 函数可以 debug ,也是 TiDB 启动的开头。
稍微简化一下
func main() { registerStores() registerMetrics() config.InitializeConfig(*configPath, *configCheck, *configStrict, reloadConfig, overrideConfig) if config.GetGlobalConfig().OOMUseTmpStorage { config.GetGlobalConfig().UpdateTempStoragePath() initializeTempDir() } setCPUAffinity() setupLog() setupTracing() setupBinlogClient() setupMetrics() createStoreAndDomain() createServer() runServer() }
可以看出,启动流程做的每个步骤都按照函数封装好了,大致了解一下都做什么
// 注册store func registerStores() { err := kvstore.Register("tikv", tikv.Driver{}) //注册TiKV tikv.NewGCHandlerFunc = gcworker.NewGCWorker //为TiKV生成GCworker err = kvstore.Register("mocktikv", mockstore.MockDriver{}) //注册默认存储引擎MockTiKV } //共注册100+ prometheus监控项,这里只表一项 func RegisterMetrics() { prometheus.MustRegister(AutoAnalyzeCounter) } // get全局config func InitializeConfig(confPath string, configCheck, configStrict bool, reloadFunc ConfReloadFunc, enforceCmdArgs func(*Config)) { cfg := GetGlobalConfig() StoreGlobalConfig(cfg) }
这个判断
if config.GetGlobalConfig().OOMUseTmpStorage { config.GetGlobalConfig().UpdateTempStoragePath() initializeTempDir() }
设置是否在单条 SQL 语句的内存使用超出 mem-quota-query [2] 限制时为某些算子启用临时磁盘。故若为 true ,则初始化 TempStoragePath 。
// 设置CPU亲和性 func setCPUAffinity() { / err := linux.SetAffinity(cpu) runtime.GOMAXPROCS(len(cpu)) } //配置系统log func setupLog() { err = logutil.InitLogger(cfg.Log.ToLogConfig()) // 这里配置格式、文件名、slowlog等 } //注册分布式系统追踪链 jaeger func setupTracing() { tracingCfg := cfg.OpenTracing.ToTracingConfig() tracingCfg.ServiceName = "TiDB" tracer, _, err := tracingCfg.NewTracer() opentracing.SetGlobalTracer(tracer) } // 设置binlog信息 func setupBinlogClient() { if !cfg.Binlog.Enable { //若binlog.enable=false,则不开启binlog return } if cfg.Binlog.IgnoreError { //若为true,则忽略binlog报错 binloginfo.SetIgnoreError(true) } if len(cfg.Binlog.BinlogSocket) == 0 { //配置binlog输出网络地址 ... } binloginfo.SetPumpsClient(client) //配置binlog信息到pump客户端 } // 配置监控 func setupMetrics() { runtime.SetMutexProfileFraction(10)// 对锁调用的跟踪 systimeErrHandler := func() { // 表示TiDB的进程是否仍然存在。 // 若10分钟内tidb_monitor_keep_alive_total // 次数<100,TiDB可能退出,此时会报警 metrics.TimeJumpBackCounter.Inc() } callBackCount := 0 sucessCallBack := func() { callBackCount++ if callBackCount >= 5 { callBackCount = 0 metrics.KeepAliveCounter.Inc() // KeepAlive监控 } } } // 启动了一些重要的后台进程 func createStoreAndDomain() { fullPath := fmt.Sprintf("%s://%s", cfg.Store, cfg.Path) storage, err = kvstore.New(fullPath) dom, err = session.BootstrapSession(storage)} } // 创建TiDB server func createServer() { driver := server.NewTiDBDriver(storage) svr, err = server.NewServer(cfg, driver) svr.SetDomain(dom) } //启动服务 runServer()
可以看到, runServer() 是启动TiDB流程中的的最后一步。
所以我们跳转到 server.Run() 中,
这里有很多接受请求时的异常处理逻辑,在此不表。简化一下大概有四步,如下:
if s.cfg.Status.ReportStatus { s.startStatusHTTP() //配置路由信息 } for{ conn, err := s.listener.Accept()// 监听客户端请求 clientConn := s.newConn(conn)// 创建connection go s.onConn(clientConn)// 使用connection处理请求 }
首先配置了关于 TiDB 组件的很多路由信息,有兴趣的可以看看。
server 不断监听网络请求,出现新的客户端请求就创建一个新的 connection ,使用一个新 goroutine 来持续为它提供服务。
后续就是通过 go s.onConn(clientConn) 处理客户端请求,我们来一探究竟接下来的流程,简化下代码,大致如下
func (s *Server) onConn(conn *clientConn) { ctx := logutil.WithConnID(context.Background(), conn.connectionID) if err := conn.handshake(ctx); err != nil { if plugin.IsEnable(plugin.Audit) && conn.ctx != nil { conn.ctx.GetSessionVars().ConnectionInfo = conn.connectInfo() }) } } connectedTime := time.Now() conn.Run(ctx) }
将建链 info 写入到 session 中,统计一下链路建立成功的时间,成功后,通过 conn.Run(ctx) 处理客户端请求。
func (cc *clientConn) Run(ctx context.Context) { for { waitTimeout := cc.getSessionVarsWaitTimeout(ctx) cc.pkt.setReadTimeout(time.Duration(waitTimeout) * time.Second) data, err := cc.readPacket() if err = cc.dispatch(ctx, data); err != nil { ... } } }
简化后,处理逻辑大致是,
不停的通过 cc.readPacket() 读取客户端发来的网络包。如果这个期间发生了等待网络包超时的现象,则 close connection 。
读取 data 成功后,将它传入 cc.dispatch(ctx,data) ,这也是处理 SQL 请求的入口了。
当 SQL 来到 很合理 ,顾名思义,是对不同的 MySQL 协议进行调度,
server/conn.go
Then.
func (cc *clientConn) dispatch(ctx context.Context, data []byte) error { cc.lastPacket = data cmd := data[0] data = data[1:] dataStr := string(hack.String(data)) }
客户端请求MySQL协议报文格式
TiDB 也要根据这个格式进行解析, data[0] 就是 data 的第一个 byte ,其余的是命令。
MySQL 请求报文的命令列表
0x00 COM_SLEEP 内部线程状态 0x01 COM_QUIT 关闭连接 0x02 COM_INIT_DB 切换数据库 0x03 COM_QUERY SQL查询请求 0x04 COM_FIELD_LIST 获取数据表字段信息 0x05 COM_CREATE_DB 创建数据库 0x06 COM_DROP_DB 删除数据库 0x07 COM_REFRESH 清除缓存 0x08 COM_SHUTDOWN 停止服务器 0x09 COM_STATISTICS 获取服务器统计信息 0x0A COM_PROCESS_INFO 获取当前连接的列表 0x0B COM_CONNECT 内部线程状态 0x0C COM_PROCESS_KILL 中断某个连接 0x0D COM_DEBUG 保存服务器调试信息 0x0E COM_PING 测试连通性 0x0F COM_TIME 内部线程状态 0x10 COM_DELAYED_INSERT 内部线程状态 0x11 COM_CHANGE_USER 重新登陆 0x12 COM_BINLOG_DUMP 获取二进制日志信息 0x13 COM_TABLE_DUMP 获取数据表结构信息 0x14 COM_CONNECT_OUT 内部线程状态 0x15 COM_REGISTER_SLAVE 从服务器向主服务器进行注册 0x16 COM_STMT_PREPARE 预处理SQL语句 0x17 COM_STMT_EXECUTE 执行预处理语句 0x18 COM_STMT_SEND_LONG_DATA 发送BLOB类型的数据 0x19 COM_STMT_CLOSE 销毁预处理语句 0x1A COM_STMT_RESET 清除预处理语句参数缓存 0x1B COM_SET_OPTION 设置语句选项 0x1C COM_STMT_FETCH 获取预处理语句的执行结果
我们看一下 dispatch 接下来的部分,也就是目前 TiDB 实现的部分 MySQL 协议了
switch cmd { case mysql.ComPing, mysql.ComStmtClose, mysql.ComStmtSendLongData, mysql.ComStmtReset, mysql.ComSetOption, mysql.ComChangeUser: cc.ctx.SetProcessInfo("", t, cmd, 0) case mysql.ComInitDB: cc.ctx.SetProcessInfo("use "+dataStr, t, cmd, 0) } switch cmd { case mysql.ComSleep: case mysql.ComQuit: case mysql.ComQuery: case mysql.ComPing: case mysql.ComInitDB: case mysql.ComFieldList: case mysql.ComStmtPrepare: case mysql.ComStmtExecute: case mysql.ComStmtFetch: case mysql.ComStmtClose: case mysql.ComStmtSendLongData: case mysql.ComStmtReset: case mysql.ComSetOption: case mysql.ComChangeUser default: // other not support
目前 TiDB 的绝大部分 SQL 语句都是走的 ComQuery ,感兴趣的同学可以自行 debug 一下。
case mysql.ComQuery: if len(data) > 0 && data[len(data)-1] == 0 { data = data[:len(data)-1] dataStr = string(hack.String(data)) } return cc.handleQuery(ctx, dataStr)
于是我们仔细看看这个 case , 要去看下 cc.handleQuery(ctx , dataStr) 。
func (cc *clientConn) handleQuery(ctx context.Context, sql string) (err error) { stmts, err := cc.ctx.Execute(ctx, sql) }
之后就要进入解析、优化 SQL 的部分 ,我们还是要先简单理解下 Lex & Yacc。
有疑问加站长微信联系
以上所述就是小编给大家介绍的《TiDB源码阅读(一) TiDB的入口》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- 源码分析(二)—入口篇
- Vue源码: 构造函数入口
- Spring Boot 2.x 启动全过程源码分析(上):入口类剖析
- 兄弟连区块链教程open-ethereum-pool矿池源码分析main入口
- 链表中环的入口节点
- docker entrypoint入口文件详解
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。