Mongodb作为目前最流行的nosql数据库之一,它给我们提供了海量数据的管理能力。而现在正处于大数据最为火热的时代,如何将海量的数据转化为可用的信息,这一步至关重要,面向应用而设计的mongodb在基础的数据分析上提供了aggregation这一功能,足以满足很多日常需求。

管道(Aggregation Pipeline)

在mongodb中,Aggregation的操作方法如下:

1
db.collection.aggregate( [ { <stage> }, ... ] )

而其中的stage则是构成管道的每个部分,可以由一个或者多个stage来构成。管道是mongodb数据聚合的一个新框架,类似于数据处理的管道,或者说是像*nix系统中的管道。对于collection,由多个stage构成的管道能够实现分组、过滤等功能,文档每经过一个stage就过进行对应的操作并把操作结果输出给下一个stage。因此,构成管道的stage则至关重要,它能直接决定最终的处理结果。 每个stage由Stage Operators(管道操作符)和Expression Operators(管道表达式)构成。

管道操作符

管道操作符,可以理解为每个stage你要对目标数据进行的操作,目前主要包括以下几种操作符:

$project

对于$project操作符来说,官方的解释是这样的:

Reshapes each document in the stream, such as by adding new fields or removing existing fields. For each input document, outputs one document.

也就是说,$project操作符能够完成增加、删除、重命名字段,控制流向下一个stage的数据组织形式。 以我抓去的拉勾网招聘数据为例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
# 原始数据
{
	"_id" : ObjectId("582ee2a0b7f3d15aaebf35b3"),
	"category" : "技术",
	"url" : "https://www.lagou.com/zhaopin/Java/",
	"tag" : "后端开发",
	"name" : "Java"
}
# 聚合方法
db.Category.aggregate({$project:{category:1,链接:"$url",分类:{tag:"$tag",name:"$name"}}})
# 输出结果
{
	"_id" : ObjectId("582ee2a0b7f3d15aaebf35b3"),
	"category" : "技术",
	"链接" : "https://www.lagou.com/zhaopin/Java/",
	"分类" : {
		"tag" : "后端开发",
		"name" : "Java"
	}
}

$match

Filters the document stream to allow only matching documents to pass unmodified into the next pipeline stage. $match uses standard MongoDB queries. For each input document, outputs either one document (a match) or zero documents (no match).

$match运算符,用于筛选出符合目标条件的数据传给下一个stage。它的用法和普通的find()类似,以下例来说:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
# find()查询
db.Category.find({category:"技术"}).pretty()
{
	"_id" : ObjectId("582ee2a0b7f3d15aaebf35b3"),
	"category" : "技术",
	"url" : "https://www.lagou.com/zhaopin/Java/",
	"tag" : "后端开发",
	"name" : "Java"
}
# 聚合方法
db.Category.aggregate({$match:{category:"技术"}})
{
	"_id" : ObjectId("582ee2a0b7f3d15aaebf35b3"),
	"category" : "技术",
	"url" : "https://www.lagou.com/zhaopin/Java/",
	"tag" : "后端开发",
	"name" : "Java"
}

$limit

Passes the first n documents unmodified to the pipeline where n is the specified limit. For each input document, outputs either one document (for the first n documents) or zero documents (after the first n documents).

与普通查询类似,limit用以限制输出文档数量,在aggregtion中的使用方式如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# 常规查询
> db.Category.find({category:"技术"}).limit(3).pretty()
{
	"_id" : ObjectId("582ee2a0b7f3d15aaebf35b3"),
	"category" : "技术",
	"url" : "https://www.lagou.com/zhaopin/Java/",
	"tag" : "后端开发",
	"name" : "Java"
}
{
	"_id" : ObjectId("582ee2a0b7f3d15aaebf35b4"),
	"category" : "技术",
	"url" : "https://www.lagou.com/zhaopin/Python/",
	"tag" : "后端开发",
	"name" : "Python"
}
{
	"_id" : ObjectId("582ee2a0b7f3d15aaebf35b5"),
	"category" : "技术",
	"url" : "https://www.lagou.com/zhaopin/PHP/",
	"tag" : "后端开发",
	"name" : "PHP"
}
# aggregation
> db.Category.aggregate({$limit:3}).pretty()
{
	"_id" : ObjectId("582ee2a0b7f3d15aaebf35b3"),
	"category" : "技术",
	"url" : "https://www.lagou.com/zhaopin/Java/",
	"tag" : "后端开发",
	"name" : "Java"
}
{
	"_id" : ObjectId("582ee2a0b7f3d15aaebf35b4"),
	"category" : "技术",
	"url" : "https://www.lagou.com/zhaopin/Python/",
	"tag" : "后端开发",
	"name" : "Python"
}
{
	"_id" : ObjectId("582ee2a0b7f3d15aaebf35b5"),
	"category" : "技术",
	"url" : "https://www.lagou.com/zhaopin/PHP/",
	"tag" : "后端开发",
	"name" : "PHP"
}

$skip

Skips the first n documents where n is the specified skip number and passes the remaining documents unmodified to the pipeline. For each input document, outputs either zero documents (for the first n documents) or one document (if after the first n documents).

skip也是一个很常用的操作,用以跳过输入文档前n个文档之后进行下一步操作:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
# 常规全表扫描
> db.Category.find().skip(1).limit(1)
{ "_id" : ObjectId("582ee2a0b7f3d15aaebf35b4"), "category" : "技术", "url" : "https://www.lagou.com/zhaopin/Python/", "tag" : "后端开发", "name" : "Python" }
> db.Category.find().limit(1)
{ "_id" : ObjectId("582ee2a0b7f3d15aaebf35b3"), "category" : "技术", "url" : "https://www.lagou.com/zhaopin/Java/", "tag" : "后端开发", "name" : "Java" }
# aggregation操作
> db.Category.aggregate({$limit:1})
{ "_id" : ObjectId("582ee2a0b7f3d15aaebf35b3"), "category" : "技术", "url" : "https://www.lagou.com/zhaopin/Java/", "tag" : "后端开发", "name" : "Java" }
> db.Category.aggregate({$skip:1},{$limit:1})
{ "_id" : ObjectId("582ee2a0b7f3d15aaebf35b4"), "category" : "技术", "url" : "https://www.lagou.com/zhaopin/Python/", "tag" : "后端开发", "name" : "Python" }

可以看到,同样是limit(1),在增加了skip操作之后,返回的结果是不相同的。

$lookup

Performs a left outer join to another collection in the same database to filter in documents from the “joined” collection for processing.

可以看到,$lookup的作用就是进行一个类似sql中left join的操作,而每个lookup操作需要有四个参数传入:

  • from 确定要进行lookup的集合名,该集合必须与当前进行aggregate操作的集合在同一个库中,并且不能分片
  • localField 制定要进行操作的列名,用以和下边通过from确定的集合中foreignField指定的列名进行匹配
  • foreignField 指定由from确定的集合要进行lookup操作的列名
  • as 两个集合匹配完成之后,在当前结合中添加的一列

通过下例来说:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
> db.Position.aggregate({$lookup:{from:"Category",localField:"name",foreignField:"name",as:"jobInfo"}},{$skip:1},{$limit:1}).pretty()
{
	"_id" : ObjectId("582ee2b6b7f3d15abcf7174c"),
	"positionid" : 2342819,
	"salary" : "8k-15k",
	"name" : "Java",
	"detailUR" : "https://www.lagou.com/jobs/2342819.html",
	"companyid" : 25439,
	"company" : "Udesk-企业级智能客服平台",
	"experience" : "经验1-3年",
	"tag" : "后端开发",
	"address" : "西直门南大街2号成铭大厦C座16层(西直门地铁站C口)",
	"category" : "技术",
	"loc" : {
		"type" : "Point",
		"coordinates" : [
			116.35735297,
			39.93921106
		]
	},
	"jobInfo" : [
		{
			"_id" : ObjectId("582ee2a0b7f3d15aaebf35b3"),
			"category" : "技术",
			"url" : "https://www.lagou.com/zhaopin/Java/",
			"tag" : "后端开发",
			"name" : "Java"
		}
	]
}

可以看到,通过name字段将Category和Position两个集合关联起来,并且Category中对应的数据在输出中以jobInfo表示。

$group

Groups input documents by a specified identifier expression and applies the accumulator expression(s), if specified, to each group. Consumes all input documents and outputs one document per each distinct group. The output documents only contain the identifier field and, if specified, accumulated fields.

$group也是一个sql中常用的操作符,它可以很方便的按照字段来进行一些统计工作:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
> db.Position.aggregate({$group:{_id:"$name",count:{$sum:1}}})
{ "_id" : "游戏制作人", "count" : 343 }
{ "_id" : "产品总监", "count" : 90 }
{ "_id" : "产品部经理", "count" : 53 }
{ "_id" : "网页产品设计师", "count" : 4 }
{ "_id" : "产品实习生", "count" : 13 }
{ "_id" : "游戏策划", "count" : 291 }
{ "_id" : "电商产品经理", "count" : 301 }
{ "_id" : "数据产品经理", "count" : 450 }
{ "_id" : "产品助理", "count" : 65 }
{ "_id" : "移动产品经理", "count" : 18 }
{ "_id" : "BI工程师", "count" : 9 }
{ "_id" : "售前工程师", "count" : 9 }
{ "_id" : "硬件开发其它", "count" : 450 }
{ "_id" : "射频工程师", "count" : 60 }
{ "_id" : "热传导", "count" : 2 }
{ "_id" : "模具设计", "count" : 11 }
{ "_id" : "DSP开发", "count" : 6 }
{ "_id" : "企业软件其它", "count" : 21 }
{ "_id" : "系统集成", "count" : 110 }
{ "_id" : "驱动开发", "count" : 129 }

需要注意的是,在进行$group操作的时候,必须要指定一个_id的字段名,并且这个操作都是在内存中进行,如果数据量很大的时候需要额外的参数(将会在以后的文章中提到)。

其他的操作符

上述几个是用的比较多的几个。其他的操作符还有$unwind、$sample、$sort、$indexStats、$out等。在以后使用中如果遇到的话在来这更新。

管道表达式

上边的几个例子已经有所体现,就是说,在进行aggreagtion的时候,每个stage都是由一个管道表达式作为动词,按照对应的限制进行操作,而管道表达式则可以构成丰富的操作限制,从而得到想要的结果,其中$group操作符中的$sum则是众多管道表达式中的一个。 而跟多的管道表达式可以参考这里了解。

mongodb campass

通过mongodb自身的aggregation操作,合理使用管道表达式和管道操作符可以完成大量的分析统计工作,这里不得不得说一下mongodb官方提供的数据库可视化工具mongodb compass。有了这个工具(有windows和mac版本),感觉其他的可视化工具都弱爆了,强烈推荐!