内容简介:先介绍管道的概念,在POSIX多线程的使用方式中,定义了一种重要的pipeline方式,成为“流水线”或“管道”,这种方式使得数据被一组线程顺序执行,其流程如下:以面向对象的思想去理解,整个流水线,可以理解为一个数据传输的管道;该管道中的每一个工作线程,可以理解为一个整个流水线的一个工作阶段stage,这些工作线程之间的合作是一环扣一环的。靠输入口越近的工作线程,是时序较早的工作阶段stage,它的工作成果会影响下一个工作线程阶段(stage)的工作结果,即下个阶段依赖于上一个阶段的输出,上一个阶段的输出
先介绍管道的概念,在POSIX多线程的使用方式中,定义了一种重要的pipeline方式,成为“流水线”或“管道”,这种方式使得数据被一组线程顺序执行,其流程如下:
以面向对象的思想去理解,整个流水线,可以理解为一个数据传输的管道;该管道中的每一个工作线程,可以理解为一个整个流水线的一个工作阶段stage,这些工作线程之间的合作是一环扣一环的。靠输入口越近的工作线程,是时序较早的工作阶段stage,它的工作成果会影响下一个工作线程阶段(stage)的工作结果,即下个阶段依赖于上一个阶段的输出,上一个阶段的输出成为本阶段的输入。这也是pipeline的一个共有特点。
mongodb中的管道
mongodb在2.2版本中引入了聚合框架(aggregate framework)的新功能,它是聚合的新框架,其概念类似于数据处理的管道,每个文档经过一个由多个节点组成的管道,每个节点相当于流水线中的一个stage,有自己的功能(分组,过滤等),文档经过管道处理后,最后输出相应的结果,管道的基本功能有两个:
- 对文档进行“过滤”,筛选出合适的文档
- 对文档进行“变换”,改变文档的输出形式
其他的一些功能还包括按照某个指定的字段分组和 排序 等。而且在每个阶段还可以使用表达式操作符计算平均值和拼接字符串等相关操作。管道提供了一个MapReduce 的替代方案,MapReduce使用相对来说比较复杂,而管道的拥有固定的接口(操作符表达),使用比较简单,对于大多数的聚合任务管道一般来说是首选方法。
mongodb中的聚合(aggregate)主要用于简单的数据处理(平均值,求和等),并返回计算后的数据结果,类似于 sql 中的内嵌函数(count()等) 在 mongodb 官网给出了聚合框架的应用实例:
可以看到,一个聚合管道中包含多个stage,每个stage都是对数据的一次处理,mongodb中的聚合采用aggregate()方法,语法如下:>db.COLLECTION_NAME.aggregate(pipeline,options)
其中pipeline为一个array,语法为:
[{<stage1>,<stage2>,<stage3>,...}]
其中每个的语法为:
{$管道操作符:{ 管道表达式 } }
那么下面列举一些较常见的管道操作符以及他们的作用,后文还会继续给出实例:
操作符 | 描述 | 语法 |
---|---|---|
$project | 数据投影,主要用于重命名,增加,删除字段 | db.article.aggregate({ $project : {title : 1 ,author : 1 ,}}); |
$match | 过滤,筛选符合条件的文档,作为下一阶段输入 | db.articles.aggregate( [{ $match : { score : { $gt : 70, $lte : 90 } } },{ $group: { _id: null, count: { $sum: 1 } } }] ); |
$limit | 限制经过管道的文档数量 | db.article.aggregate({ $limit : 5 }); |
$skip | 待操作集合处理前跳过部分文档 | db.article.aggregate({ $skip : 5 }); |
$unwind | 将数组拆分成独立字段 | db.article.aggregate({$project:{author:1,title:1,tags:1}},{$unwind:"$tags"}) |
$group | 对数据进行分组 | db.article.aggregate({ $group : {_id : "$author",docsPerAuthor : { $sum : 1 },viewsPerAuthor : { $sum : "$pageViews" }}}); |
$sort | 对文档按照指定字段排序 | db.users.aggregate( { $sort : { age : -1, posts: 1 } }); |
$sample | 随机选择从其输入指定数量的文档。 | { $sample: { size: <positive integer> } } |
$out | 必须为pipeline最后一个阶段管道,因为是将最后计算结果写入到指定的collection中 | |
$indexStats | 返回数据集合的每个索引的使用情况 | { $indexStats: { } } |
更多的管道操作符参考mongodb官方文档,官方文档写的更为详细,但是对语法的描写较少 docs.mongoing.com/manual-zh/m…
管道操作符示例
下面针对常用管道操作符举一些例子:
先加载数据:
use test1 db.mycol.remove({}) document1=({name:'dogOne',age:1,tags:['animal','dog'],type:'dog',money:[{min:100},{norm:200},{big:300}]}); document2=({name:'catOne',age:3,tags:['animal','cat'],type:'cat',money:[{min:50},{norm:100},{big:200}]}); document3=({name:'catTwo',age:2,tags:['animal','cat'],type:'cat',money:[{min:20},{norm:50},{big:100}]}); document4=({name:'dogTwo',age:5,tags:['animal','dog'],type:'dog',money:[{min:300},{norm:500},{big:700}]}); document5=({name:'appleOne',age:0,tags:['fruit','apple'],type:'apple',money:[{min:10},{norm:12},{big:13}]}); document6=({name:'appleTwo',age:0,tags:['fruit','apple'],type:'apple',money:[{min:10},{norm:12},{big:13}]}); document7=({name:'pineapple',age:0,tags:['fruit','pineapple'],type:'pineapple',money:[{min:8},{norm:9},{big:10}]}); db.mycol.insert(document1) db.mycol.insert(document2) db.mycol.insert(document3) db.mycol.insert(document4) db.mycol.insert(document5) db.mycol.insert(document6) db.mycol.insert(document7) 复制代码
下面是执行结果:
/* 1 */ { "_id" : ObjectId("59187984f322c585a98664e2"), "name" : "dogOne", "age" : 1.0, "tags" : [ "animal", "dog" ], "type" : "dog", "money" : [ { "min" : 100.0 }, { "norm" : 200.0 }, { "big" : 300.0 } ] } /* 2 */ { "_id" : ObjectId("59187984f322c585a98664e3"), "name" : "catOne", "age" : 3.0, "tags" : [ "animal", "cat" ], "type" : "cat", "money" : [ { "min" : 50.0 }, { "norm" : 100.0 }, { "big" : 200.0 } ] } /* 3 */ { "_id" : ObjectId("59187984f322c585a98664e4"), "name" : "catTwo", "age" : 2.0, "tags" : [ "animal", "cat" ], "type" : "cat", "money" : [ { "min" : 20.0 }, { "norm" : 50.0 }, { "big" : 100.0 } ] } /* 4 */ { "_id" : ObjectId("59187984f322c585a98664e5"), "name" : "dogTwo", "age" : 5.0, "tags" : [ "animal", "dog" ], "type" : "dog", "money" : [ { "min" : 300.0 }, { "norm" : 500.0 }, { "big" : 700.0 } ] } /* 5 */ { "_id" : ObjectId("59187984f322c585a98664e6"), "name" : "appleOne", "age" : 0.0, "tags" : [ "fruit", "apple" ], "type" : "apple", "money" : [ { "min" : 10.0 }, { "norm" : 12.0 }, { "big" : 13.0 } ] } /* 6 */ { "_id" : ObjectId("59187984f322c585a98664e7"), "name" : "appleTwo", "age" : 0.0, "tags" : [ "fruit", "apple" ], "type" : "apple", "money" : [ { "min" : 10.0 }, { "norm" : 12.0 }, { "big" : 13.0 } ] } /* 7 */ { "_id" : ObjectId("59187984f322c585a98664e8"), "name" : "pineapple", "age" : 0.0, "tags" : [ "fruit", "pineapple" ], "type" : "pineapple", "money" : [ { "min" : 8.0 }, { "norm" : 9.0 }, { "big" : 10.0 } ] } 复制代码
1. $project
操作符与 $match
操作符
$project
管道操作符用于修改流中的文档, $match
管道操作符用于对流中的文档进行过滤,仅允许符合条件的文档进入下一个阶段,过滤操作不会修改文档。 $match
操作使用mongodb标准的查询条件,对于每一个输入文档,如果符合条件,则输出这个文档,否则丢弃该文档。由于aggregate管道对于内存的限制,在处理大文件的时候,最好先用match操作符进行筛选,减少内存占用。
假定我们想提取money中min为100的文档,并且只输出名称和money数组中的min那一项,用 $project
与 $match
操作符可以很好的实现
use test1 db.mycol.aggregate( {$match:{'money.min':100}}, {$project:{_id:0,name:'$name',minprice:'$money.min'}} ) 复制代码
输出结果为:
/* 1 */ { "name" : "dogOne", "minprice" : [ 100.0 ] } 复制代码
可以发现,在project操作符后,文档中的字段被改变了。 也要注意到,对于数组中对象的引用,需要采用 '$money.min'
形式 注意:
- 不能在
$match
操作符中使用$where
表达式操作符。 -
$match
尽量出现在管道的前面,这样可以提早过滤文档,加快聚合速度。 - 如果
$match
出现在最前面的话,可以使用索引来加快查询。
2. $limit
$skip
操作符
$limit
与 $skip
操作符是用于限制与跳过相应文档,与find中的limit与skip方法效果相同。 假定我们想提取money中min小于100的文档,并且限制3个文档,跳过一个文档再显示 脚本为
use test1 db.mycol.aggregate( {$match:{'money.min':{$lt:100}}}, {$limit:3}, {$skip:1}, {$project:{_id:0,name:'$name',minprice:'$money.min'}} ) 复制代码
结果为:
/* 1 */ { "name" : "catTwo", "minprice" : [ 20.0 ] } /* 2 */ { "name" : "appleOne", "minprice" : [ 10.0 ] } 复制代码
可以发现结果满足我们的需求
3. $group
操作符
$group
操作符用来对数据进行分组。 $group
的时候必须要指定一个_id域,同时也可以包含一些算术类型的表达式操作符 比如我们要通过type类型来对数据进行分类,并且同时统计他们的年龄age总和, 脚本为:
use test1 db.mycol.aggregate( {$group:{_id:'$type',sumage:{$sum:'$age'}}} ) 复制代码
结果为:
/* 1 */ { "_id" : "pineapple", "sumage" : 0.0 } /* 2 */ { "_id" : "cat", "sumage" : 5.0 } /* 3 */ { "_id" : "apple", "sumage" : 0.0 } /* 4 */ { "_id" : "dog", "sumage" : 6.0 } 复制代码
可以看到数据按照 猫,狗,苹果,菠萝进行了分类,并且年龄相加了。 注意:
$group $group
4. $sort
操作符
sort操作符用来对数据进行排序,同样1代表升序,-1代表降序 假定我们按照年龄对数据进行排序,为了减少输出行数,我们用上分组与skip 脚本为:
use test1 db.mycol.aggregate( {$group:{_id:'$type',sumage:{$sum:'$age'}}}, {$skip:1}, {$sort:{sumage:1}} ) 复制代码
结果为:
/* 1 */ { "_id" : "apple", "sumage" : 0.0 } /* 2 */ { "_id" : "cat", "sumage" : 5.0 } /* 3 */ { "_id" : "dog", "sumage" : 6.0 } 复制代码
注意:
- 如果将
$sort
放到管道前面的话可以利用索引,提高效率 - MongoDB 对内存做了优化,在管道中如果
$sort
出现在$limit
之前的话,$sort
只会对前$limit
个文档进行操作,这样在内存中也只会保留前$limit
个文档,从而可以极大的节省内存 -
$sort
操作是在内存中进行的,如果其占有的内存超过物理内存的10%,程序会产生错误
以上所述就是小编给大家介绍的《mongodb 聚合管道》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- 速度不够,管道来凑——Redis管道技术
- Golang pipline泛型管道和类型管道的性能差距
- Linux 管道那些事儿
- Redis管道
- Clojure 集合管道函数练习
- Redis学习之管道机制
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。