饿了么:分布式时序数据库 - LinDB

栏目: 数据库 · 发布时间: 6年前

饿了么对时序数据库的需求主要来自各监控系统,主要用于存储监控指标。原来使用graphite,后来慢慢有对指标有多维的需求(主要体现在对一个指标加多个Tag, 来组成Series,然后对Tag进行Filter和Group进行计算),这时graphite基本很难满足需求。 业界现在用的比较多的主要有如下几类TSDB: 免费获取学习 Java 高架构、分布式架构、高可扩展、高性能、高并发、性能优化、Spring boot、 Redis 、ActiveMQ、Nginx、Mycat、Netty、Jvm大型分布式项目实战学习架构师视频免费获取 架构群:863621962 InfluxDB:很多公司都在用,包括饿了么有部分监控系统也是用InfluxDB。优点,支持多维和多字段,存储也根据TSDB的特点做了优化。但开源的部分不支持,很多公司自己做集群化, 但大多基于指标名来,这样会有单指的热点问题。现在饿了么也是类似的做法,但热点问题很严重,大的指标已经用了最好的服务器,但是查询性能还是不够理想, 如果做成按Series Sharding那成本还是有一点高; Graphite:根据指标写入及查询,计算函数很多,但很难支持多维,包括机房或多集群的查询,原来饿了么把业务层的监控指标存储在Graphite中,并工作的很好, 但是多活之后基本已经很难满足一些需求了,由于其存储结构的特点,很占IO,根据目前线上的数据写放大差不多几十倍以上; OpenTSDB: 基于HBase,优点存储层不用自己考虑,做好查询聚合就可以,也会存在HBase的热点问题等,在以前公司也弄基于HBase实现的TSDB,来解决OpenTSDB的一些问题, 如热点,部分查询聚合下放到HBase等,目的是优化其查询性能,但依赖HBase/HDFS还是很点重; HiTSDB: 阿里提供的TSDB,存储也是用HBase,在数据结构及Index上面做了很多优化,具体没有研究,有兴趣的同学可以在阿里云上试一下; Druid: Druid其实是一个OLAP系统,但也可以用来存储时间序列数据,但看到它的架构图时已经放弃了; ES: 也有公司直接用ES来存储,没有实际测试,但总觉得ES不是一个真正的TSDB; atlas: Netflix出品,全内存TSDB,最近几小时数据全在内存中,历史数据需要外部存储,具体没有详细研究; beringei:facebook出品,全内存TSDB,跟atlas一样最近的数据在内存,目前应该还在孵化期; 3. 最终我们还是决定自己实现一套分布式时序数据库,具体需要解决如下问题: 轻量,目前只依赖于Zookeeper; 基于Series进行Sharding,解决热点,可以真正水平扩展; 实时写入,实时查询,由于大多用于监控系统,所以查询性能要好; 由于饿了么目前是多活,监控系统也是多活,所以要支持单机房写入,多机房聚合查询等; 自动的Rollup功能,如用户可以写10s的精度,系统自动Rollup到分钟,小时,天级别,以支持大时间范围的查询,如报表等; 支持类 SQL 的查询方式; 支持多副本,以提高整个系统的可靠性,正常只要还有一个副本存活就可以正常提供服务,副本数指定; 整体设计 采用计算和存储分离的架构,分为计算层LinProxy和存储层LinStorage。 说明: LinProxy主要做一些SQL的解析,及一些中间结合的再聚合计算,如果不是跨集群,LinProxy可以不需要,对于单集群的每个节点都内嵌了一个LinProxy来提供查询服务; LinDB Client主要用于数据的写入,也有一些查询的API; LinStorage的每个节点组成一个集群,节点之进行复制,并有副本的Leader节点提供读写服务,这点设计主要是参考Kafka的设计,可以把LinDB理解成类Kafka的数据写入复制+底层时间序列的存储层; LinMaster主要负责database、shard、replica的分配,所以LinStorage存储的调度,及MetaData(目前存储Zookeeper中)的管理; 由于LinStorage Node都是对等的,所以我们基于Zookeeper在集群的节点的选一个节点成为Master,每个Node把自身的状态以心跳的方式上报到Master上,Master根据这些状态进行调度, 如果Master挂了,自动再选一个Master出来,这个过程基本对整个服务是无损的,所以用户基本无感知。 写入 整个写过程分为如下2部分组成: WAL复制,这部分设计上参考了Kafka,用户的写入只要写入WAL成功,就认为成功(由于主要用于监控系统,所以对数据的一致性没有做太多的保证),这样就可以提供系统的写入吞吐; 本地写入,这个过程是把WAL的数据解析写入到自己的存储结构中,只有写入本地存储的数据才可以查到; 整个过程不像一些系统在每次写的过程中完成,我们是把这个过程分2步,并异步化了; WAL复制 目前LinDB的replica复制协议采用多通道复制协议,主要基于WAL在多节点之间的复制,WAL在每个节点上的写入,有独立的写操作完成, 所以对于Client写入对应Leader的WAL成功就认为本次写操作是成功的,Leader所在的节点负责把相应的WAL复制到对应的follower, 同理写WAL成功认为复制成功,如下所示: 多通道复制协议 写入Leader副本成功就算成功以提高了写入速率,也带来了以下问题: 数据一致性的问题 数据的丢失问题 以上图Server1为Leader,3个Replication来复制1-WAL为举例来说: 当前Server1是该shard的Leader接受Client的写入,Server2和Server3都是Follower接受Server1的复制请求,此时1-wal通道作为当前的数据写入通道, Server2和Server3此时可能落后于Server1。 说明: 整个过程需要注意以下几个Index; Client写入时的Append Index,表示当前Client写入到哪里; 对应每个Follower都会有一个Replica Index,表示对应Follower消费Leader上面同步到哪里; Follower的Ack Index,表示Follower已经成功复制到本地的WAL; 对于Follower的复制请求,其实相当于一个特殊Client的写入,所以也有一个对应的Append Index; 只有被Ack过的Index,才标示为已经处理完成,对于Leader来说,小于最小的Ack Index的WAL数据是可以被删除; 在这个过程中,如果Server2或者Server3中有一台出问题,这时对应的Consume Index不会移动,只有等到相应服务恢复之后,继续处理; 在整个过程中可能出现如下情况的可能; Leader Replica Index > Follower Append Index,这时需要根据Follower Append Index重置Leader Replica Index,可能存在2种情况,具体情况在复制顺序性中描述; Leader Replica Index < Follower Append Index,也同样存在2种情况,具体情况在复制顺序性中描述; 假如此时Server1挂了,从Server2和Server3中选出新的Leader,如此时选为Server2为Leader。 Server2就会开启2-wal复制通道,向server1和server3复制,由于当前server1挂了,所以暂时只往Server3复制,此时数据的写入通道为2-wal。 Server1启动恢复后,Server2会开启向Server1的2-wal复制通道,同时server1会将1-wal中剩余的还未向Server2和Server3复制的数据复制给他们。 对于异常情况,WAL中的数据不能正常由于ACK之后删除,导致WAL占用过多磁盘,所以对WAL需要有一个SIZE和TTL的清理过程,一旦因为WAL因为SIZE和TTL清理之后,会导致几个Index错乱,具体错乱情况如上所述。 多通道复制协议带来的问题: 每个通道都有对应的index序列,保存每个通道的last index。而单通道复制只需要保存1个last index即可。这个代价其实还好。 本地写入 背景 做到Shard级别的写入隔离,即每个Shard都会有独立的线程来负责写入,不会因为某个数据库或者某个Shard写入量具增而导致别的数据库的写入, 但可能会因为单机承载的Shard数过多,导致线程数过多,如果遇到这种情况,应该通过扩机器来解决,或者在新建数据库的时候,合理分配Shard数。 由于是单线程的写操作,所以在很多情况下,不需要考虑多线程写带来的锁竞争问题。 数据存储结构 说明,以单个数据库在单节点上的数据结构如例: 一个数据库在单节点上会存在多个Shard,所有Shard共享一个索引数据; 所有的数据根据数据库的Interval来计算按时间片来存储具体的数据包括数据文件和索引文件。 这样的设计主要为了方便处理TTL,数据如果过期,直接删除相应的目录就可以; 每个shard下面会存在segment,segment根据interval来存储相应时间片的数据; 为什么每个segment下面又按interval存储很多个data family?这个主要由于LinDB主要解决的问题是存储海量的监控数据,一般的监控数据基本是最新时间写入, 基本不会写历史数据,而整个LinDB的数据存储类似LSM方式,所以为了减少数据文件之间的合并操作,导致写放大,所以最终衡量下来,再对segment时间片进行分片。 下面以interval为10s为例说明: segment按天来存储; 每个segment按小时来分data family,每个小时一个family,每个family中的文件再按列存储具体的数据。 写入流程 说明: 系统会为每一个Shard启一个写线程,该线程负责这个Shard的所有写操作。 首先把measurement, tags, fields对应的数据写入数据库的索引文件,并生成相应的measurement id, time series id及field id,主要完成string->int的转换。 这样的好处是所有的数据存储都以数据类型来存储,从而可以减少整个存储大小,因为对于每个数据点,measurement/tags/field这样元数据占用,如cpu{host=1.1.1.1} load=1 1514214168614, 其实转换成id之后,cpu => 1(measurement id), host=1.1.1.1 => 1(time series id), load => 1(field id),所以最终的数据存储为1 1 1514214168614=>1,这个考虑OpenTSDB的设计。 如果写索引失败,认为本次写入失败,失败分为2种,一种是数据写入格式有问题,这类失败直接标示失败,另外一种由于内部问题,这时写入失败需要重试。 使用根据索引得到的ID,再结合写入时间和数据库Interval计算得到需要写入到哪个segment下的哪个family,写family的过程,直接写内存以达到高吞吐量的要求, 内存数据到达内存限制之后,会触发Flush操作。 整个写过程先写内存,再由Flusher线程把内存中的数据dump到相应的文件中,这样就做到了对一批数据顺序写入,同时对于最近的数据根据Field Type进行Rollup操作,从而进一步减少磁盘IO操作。 查询引擎 LinDB查询需要解决如下问题: 解决多个机房之间的查询; 高效的流式查询计算; 说明: 由于需要支持多机房或者多集群的查询,所以引入了LinProxy,LinProxy主要负责面向用户的查询请求; SQL Plan负责具体SQL的解析,生成最终的执行计划及需要计算的中间结果的函数; 通过Zookeeper中的Metadata,把请求路由给具体的LinDB集群中对应的服务; 每个LinConnect负责与一个LinDB集群之间的通信,每个LinConnect内部保存了一份对应集群的Metadata,该Metadata信息在每个Metadata变更的时候有Server端推送给LinConnect, 这样LinConnect基本做到近实时的更新Metadata; Aggregator Stream主要负责把各个LinConnect的中间结果进行最终的合并计算操作; 整个LinProxy处理过程都是异步化,这样可以利用线程在IO等待的时候可以做计算; 每个Node接收LinConnect过来的请求,在内部查询计算成中间结果返回给LinConnect,详细的过程后面要介绍; Node查询 说明: 如果所示,Client过来的一个查询请求,会产生很多小的查询任务,每个任务所承担的职责很单一,只做它所自己的任务,然后把结果给下一个任务, 所以需要所有的查询计算任务都是异常无阻塞处理,IO/CPU任务分离; 整个服务端查询使用Actor模式来简化整个Pipeline的处理; 任何一个任务执行完成,如果没有结果产生,则不会生产下游的任务,所有下游的任务都是根据上游任务是否有结果来决定; 最终把底层结果,通过Reduce Aggregate聚合成最终的结果; 存储结构 倒排索引 倒排索引,分两部分,目前索引相关的数据还是存储在RocksDB中。 根据Time Series的Measurement+Tags生成对应的唯一ID(类似luence里面的doc id)。 根据Tags倒排索引,指向一个ID列表。TSID列表以BitMap的方式存储,以方便查询的时候通过BitMap操作来过滤出想要的数据。BitMap使用RoaringBitMap。 每一类数据都存储在独立的RocksDB Family中。 内存结构 为了提高写入性能,把当前一段时间的数据写入到内存中,内存到达一定限制或者时间后把内存中的数据Dump到文件中。 内存存储分为当前可写和不可写,当前可写用于接入正常的数据写入,不可写用入Dump到文件中,如果Dump成功,则清空不可写部分。 如果可写部分也到在写入限制,但不可写部分还没有完成Dump,这时写入会被Block住,直到有可用的内存供数据写入,目的是为了不会因为占用过多内存而导致OOM。 MemoryTable内部通过一个Map来存储Measurement ID->Measurement Store关系,即每个Measurement都存储在一个独立的Store中。 在Measurement Store内存储对应Measurement下面每个TSID的数据,每个TSID对应的数据用一个Memory Block来存储,每个Memory Block按TSID的顺序存储在Array List中,把TSID存储在一个BitMap中,通过TSID在Bitmap中位置来定位Memory Block在Array List中的具体位置,这里说明一下为什么不直接使用Map来存储,因为整个系统是用Java实现的,Java中的Map结构,不适合存储小对象的数据,存在内存放多倍的存储。 由于每个TSID都会对应一个时间线,每个时间线可能会存在多个数据点的情况,如count时只有一个count值,timer时会有count/sum/min/max等多个值。每个数据类型以Chunk的方式存储。Chunk内部又以堆内和堆外2部分内存来存储,最近一段时间的数据放在堆内,历史数据压缩之后放在堆外,在内存中尽量多放一些最近的数据,因为LinDB的目的主要是存储一些监控类的数据,而监控类的数据主要关心最近一段时间的数据。 文件存储结构 文件存储跟内存存储类似,同一个Measurement的数据以Block的方式存储在一起,查询时通过Measurement ID定位到该Measurement的数据存储在哪个Block中。 Measurement Block后存储一个Offset Block,即存储每个Measurement Block所在的Offset,每个Offset以4 bytes存储。 Offset Block存储一个Measurement Index Block,按顺序存储每个Measurement ID,以Bitmap的方式存储。 文件的尾存储一个Footer Block,主要存储Version(2 bytes) + Measurement Index Offset(4 bytes) + Measurement Index Length(4 bytes)。 Data数据块都是数值,所以使用xor压缩,参考facebook的gorilla论文; Measurement Block: 每个Measurement Block类似Measurement的方式存储,只是把Measurement ID换成Measurement内的TSID。 TS Entry存储该TSID对应每一列的数据,一列数据对应存储一段时间的数据点。 查询逻辑: DataFile在第一次加载的时候会把Measurement Index放在内存中,查询输入Measurement ID通过Measurement Index中的第几个位置,然后通过这个位置N,在Offset Block查询具体的Measurement Block的Offset,由于每个Offset都是4 bytes,所以offset position = (N-1) * 4,再读取4 bytes得到真正的Offset。 同样的道理可以通过TSID,找到具体的TS Entry,再根据条件过滤具体的列数据,最终得到需要读取的数据。 总结 LinDB从2年前正式慢慢服务于公司的监控系统,从1.0到2.0,已经稳定运行2年多,除了一次rocksdb的问题,几乎没出过什么问题,到现在的3.0性能的大幅提升,我们基本都是站在业界一些成熟方案的基础上,慢慢演进而来。 也有人问,LinDB为什么这么快,其实我们是参考了很多TSDB的作法,然后取其好的设计,再结果时序的特征再做一些优化。 时序一般都是最新写入,但也是一种随机写,我们会先成内存中把随写变成循序写,最终到写文件都是顺序写,所有数据都是有序,这样查询的时候也是顺序读,这一点很关键; 把写入的measurement/tags/fields都转化成Int,再生成倒排索引,最终生成一个TSID(类似Luence的doc id),这样就大大减少了最终的数据量,毕竟指标这样字符串是占绝对的大头, 这点很想OpenTSDB,虽然InfluxDB已经把一段时间的按Block来存储,但还是在block的头放这些数据,这些都是成本,特别是在compact的时候; 不像别的TSDB会把timestamp直接存下来,一般timestamp到毫秒基别占8个节点,虽然根据时间有序的优势再用delta-encoded,压缩也是很好,但我们想做到极致,我们是用一个bit来表示时间, 具体的做法就是根据上面的描述,把时间的高位和存储Interval,把高位的时间放在目录上,再结合高位算一个delta,把delta以1bit的格式存储,来表示有没有数据,因为监控数据绝大部分都是连续的数据, 所以这样做也是合理的,因此在时间这个数据上的存储也大大减少空间; 我们发现对一个指标的多个Field的数据,每个Field的数据相邻的一些点基本是很相近的,LinDB 2.0存储直接是用RocksDB,多个Field放在一起存储,再把相邻的点进行压缩,这样其实压缩率不会很高, 而且每取查询取Field的时候都要把所有的数据都读出来,这也是LinDB 3.0我们考虑自己实现列式存储,相同列存在一块,以提高压缩率,查询的时候只读需要的数据。 整个压缩我们也没有用gzip/snappy/zlib,因为这些不大适合用于数值类型,我们是直接参考了facebook的gorilla论文的xor的方式来的,这个现在已经被很多TSDB采用; 基于上面这些基本的顺序读已经不成问题,基于TSID查询的更不是问题,因为整个设计都是基于TSID->data来设计的,所以还要解决一个根据倒排查出一组TSID对数据的随机读,如上我们是把TSID放在Bitmap, 然后通过Bitmap计算出Offset,直接找到数据,通过存储时的优化,做到TSID查询精准查找,然不是通过二分查找; 还有一点就是LinDB在新建数据库时指定完Interval之后,系统会自己Rollup,不像InfluxDB要写很多Continue Query,LinDB所有的这一切都是自动化的; 查询计算并行流式处理; 所以用一句话来总结的话就是一个高效的索引外加一堆数值,然后怎么玩好这堆数值。 自身监控 LinDB也自带了自身的一些监控功能 Overview Dashboard 未来的展望 丰富查询函数; 优化内存使用率; 自身监控的提升; 如果有可能,计划开源; 对比测试 下面是与InfluxDB和LinDB2.0的一些查询性能对比。 由于InfluxDB集群化要商业版,所以都是单机默认配置下,无Cache的测试。 服务器配置阿里云机器:8 Core 16G Memory 大维度 Tags: host(40000),disk(4),partition(20),模拟服务器磁盘的监控,总的Series数为320W,每个Series写一个数据点 小维度的1天内的聚合测试 Tags: host(400),disk(2),partition(10),模拟服务器磁盘的监控,总的Series数为8K,每个Series写一天的数据 每个维度每2s写入1个点,每个维度一天内总共43200个点,所有维度总共43200 * 8000个点,共3 4560 0000即3亿多数据 小维度的7天内的聚合测试 Tags: host(400),disk(2),partition(10),模拟服务器磁盘的监控,总的Series数为8K,每个Series写7天的数据 每个维度每5s写入1个点,每个维度一天内总共17280个点,所有天数所有维度总共17280 8000 7 个点,即9 6768 0000,9亿多个点 这个测试要说明一下,得利于LinDB自动的Rollup,如果InfluxDB开Continue Query的话相信应该也还好。


以上所述就是小编给大家介绍的《饿了么:分布式时序数据库 - LinDB》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

走进搜索引擎

走进搜索引擎

梁斌 / 电子工业出版社 / 2007-1 / 49.80元

《走进搜索引擎》由搜索引擎开发研究领域年轻而有活力的科学家精心编写,作者将自己对搜索引擎的深刻理解和实际应用巧妙地结合,使得从未接触过搜索引擎原理的读者也能够轻松地在搜索引擎的大厦中邀游一番。《走进搜索引擎》作为搜索引擎原理与技术的入门书籍,面向那些有志从事搜索引擎行业的青年学生、需要完整理解并优化搜索引擎的专业技术人员、搜索引擎的营销人员,以及网站的负责人等。《走进搜索引擎》是从事搜索引擎开发的......一起来看看 《走进搜索引擎》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具