Mongo Spark Connector中的分区器(一)

栏目: IT技术 · 发布时间: 4年前

内容简介:MongoSpark为入口类,调用MongoSpark.load,该方法返回一个MongoRDD类对象,Mongo Spark Connector框架本质上就是一个大号的自定义RDD,加了些自定义配置、适配几种分区器规则、Sql的数据封装等等,个人认为相对核心的也就是分区器的规则实现;弄清楚了其分析器也就搞明白了Mongo Spark Connector 。

MongoSpark为入口类,调用MongoSpark.load,该方法返回一个MongoRDD类对象,Mongo Spark Connector框架本质上就是一个大号的自定义RDD,加了些自定义配置、适配几种分区器规则、 Sql 的数据封装等等,个人认为相对核心的也就是分区器的规则实现;弄清楚了其分析器也就搞明白了Mongo Spark Connector 。

当前实现的分区器(Partitioner):

MongoPaginateByCountPartitioner 基于总数的分页分区器

MongoPaginateBySizePartitioner 基于大小的分页分区器

MongoSamplePartitioner 基于采样的分区器

MongoShardedPartitioner 基于分片的分区器

MongoSinglePartitioner 单分区分区器

MongoSplitVectorPartitioner

基于分割向量的分区器

这里根据源码简单介绍MongoSinglePartitioner与MongoSamplePartitioner分区器,这或许就是用得最多的两种分区器,他的默认分区器(DefaultMongoPartitioner)就是MongoSamplePartitioner分区器;该分区默认的PartitionKey为_id、默认PartitionSizeMB为64MB、默认每个分区采样为10;

MongoSamplePartitioner

该类的 核心 也是 唯一的方法为:partitions方法 ,下面为该方法的执行流程与核心逻辑;

1、检查执行buildInfo指令检查Mongo版本用于判断是否支持随机采样聚合运算,版本大于3.2。hasSampleAggregateOperator方法。Mongo3.2版本中才新增了数据采样功能。

Mongodb中的语法为:

db.cName.aggregate([
  {$sample:{ size: 10 } }
])

上示例N等于10,如果N大于collection中总数据的5%,那么$sample将会执行collection扫描、sort,然后选择top N条文档;如果N小于5%,对于 wiredTiger 而言则会遍历collection并使用“ 伪随机 ”的方式选取N条文档,对于 MMAPv1 引擎则在_id索引上随机选取N条文档。

2、执行collStats,用于获取集合的存储信息,如行数、大小、存储大小等等信息;

matchQuery: 过滤条件

partitionerOptions: ReadConfig传进去的分析器选项

partitionKey: 分区key,默认为_id

partitionSizeInBytes: 分区大小,默认64MB

samplesPerPartition: 每个分区默认采样数量,默认10

count: 集合总条数

avgObjSizeInBytes: 对象平均字节数

numDocumentsPerPartition: 每个分区文档数, partitionSizeInBytes / avgObjSizeInBytes :分区大小/对象平均大小

numberOfSamples:

采样数量,samplesPerPartition * count / numDocumentsPerPartition,每个分区采样数*集合总数/每个分区文档数

Mongo Spark Connector中的分区器(一)

如每个分区文档数大于集合总文档数 ,则将直接创建单分区,不采取采样数据方式创建分区,因为此时数据量太少单个分区已经可以容得下无需多个分区;

一、创建单分区

MongoSinglePartitioner类中通过PartitionerHelper.createPartitions执行相关逻辑;

_id作为partitionKey,

二、通过采样数据创建分区

Mongo Spark Connector中的分区器(一)

指定采样条件、采样数据量、PartitionKey、 排序 条件等,获取采样数据;

集合拆分:

def collectSplit(i: Int): Boolean = (i % samplesPerPartition == 0) || !matchQuery.isEmpty && i == count – 1

右侧边界:

val rightHandBoundaries = samples.zipWithIndex.collect {
case (field, i) if collectSplit(i) => field.get(partitionKey)
}

获取 右侧边界 ,使用采样数据数组索引对每个分区采样数求余等于0对采样数据进行过滤取右侧边界(如匹配条件不为空则再取最后一条数据);

如采样得到62条数据,并且没有存在匹配条件,根据上述的采样数据过滤条件最后取得7条数据,分别为数据数组索引为0、索引为10、20、30、40、50、60的7条数据,数据的值为PartitionKey默认就是集合中_id字段的值;

Mongo Spark Connector中的分区器(一)

创建分区(Partitions)

Mongo Spark Connector中的分区器(一)

获取得到PartitionKey、rightHandBoundaries后就可以调用PartitionerHelper.createPartitions创建Partition;下面为创建Partition的具体逻辑;

使用PartitionKey创建查询边界 ,每个分区具有不同的查询边界,有最大、最小边界;此处创建分区Partition并在每个分区中指定了查询边界;

上面获取得到了7条数据,此处将创建8个分区;下面给出了简单数据用于说明该分区边界条件的基本逻辑与实现;

1、创建Min、1、3、5、7、9、11、13、Max的序列

2、创建1、3、5、7、9、11、13、Max序列

3、使用zip将两个序列拉链式的合并:合并后的数据为:

4、Min,1、1,3、3,5、5,7、7,9、9,11、11,13、13,Max

Partition的边界条件将使用上面的边界条件,8条数据八个Partition一个对应;

0 Partition的边界条件为:小于1

1 Partition的边界条件为:大于等于1 小于 3

2 Partition的边界条件为:大于等于3 小于 5

3 Partition的边界条件为:大于等于5 小于 7

4 Partition的边界条件为:大于等于7小于 9

5 Partition的边界条件为:大于等于9 小于 11

6 Partition的边界条件为:大于等于11 小于 13

7 Partition的边界条件为:大于等于13

上面的8个Partition为8个MongoPartition对象,每个对象的index、查询边界与上面所说的一一对应;

在MongoRDD类的compute方法

中可以看到根据对应的分区与上面创建分区时所建立的边界条件用于计算(从Mongo中获取对应数据);

Mongo Spark Connector中的分区器(一)

分区1分区与边界条件

MongoSinglePartitioner

创建单分区分区器时,直接调用PartitionerHelper.createPartitions方法创建分区,该类并无其他逻辑,并且固定的PartitionKey为_id,右侧边界条件为空集合,然后创建id为0的MongoPartition对象,并无查询边界;


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

编码整洁之道

编码整洁之道

罗伯特·C.马丁 / 电子工业出版社 / 2012-8 / 59.00元

忍受各种不确定性及不间断的压力并能够获取成功的程序员有一个共通特征:他们都深度关注软件创建实践。他们都把软件看做一种工艺品。他们都是专家。在“鲍勃大叔”看来“专业”的程序员不仅应该具备专业的技能,更应该具备专业的态度,这也是本书阐述的核心。专业的态度包括如何用带着荣誉感、自尊、自豪来面对进行软件开发,如何做好并做得整洁,如何诚实地进行沟通和估算,如何透明并坦诚地面对困难做抉择,如何理解与专业知识相......一起来看看 《编码整洁之道》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

MD5 加密
MD5 加密

MD5 加密工具

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具