[关闭]
@a5635268 2015-09-14T18:56:22.000000Z 字数 4592 阅读 1577

【mongoDB高级篇②】大数据聚集运算之mapReduce(映射化简)

mongoDB


简述

mapReduce从字面上来理解就是两个过程:map映射以及reduce化简。是一种比较先进的大数据处理方法,其难度不高,从性能上来说属于比较暴力的(通过N台服务器同时来计算),但相较于group以及aggregate来说,功能更强大,并更加灵活。

  1. 映射过程:先把某一类数据分组归类,这里的映射过程是支持分布式的,一边遍历每一台服务器,一边进行分类。
  2. 化简过程:然后再在分组中进行运算,这里的化简过程也是支持分布式的,在分类的过程中直接运算了。也就是说如果是一个求和的过程,先在a服务器分组求和,然后再在b服务器分组求和····最后再把化简以后的数据进行最终处理。在映射化简的过程都是每台服务器自己的CPU在运算,大量的服务器同时来进行运算工作,这就是大数据基本理念。

map-reduce.png-99.4kB

在这个映射化简操作中,MongoDB对每个输入文档(例如集合中满足查询条件的文档)执行了map操作。映射操作输出了键值对结果。对那些有多个值的关键字,MongoDB执reduce操作,收集并压缩了最终的聚合结果。然后MongoDB把结果保存到一个集合中。化简函数还可以把结果输出到finalize函数,进一步对聚合结果做处理,当然这步是可选的。

在MongoDB中,所有的映射化简函数都是使用JavaScript编写,并且运行在 mongod 进程中。映射化简操作使用一个集合中文档作为输入,并且可以在映射阶段之前执行任意的排序和限定操作。 mapReduce 命令可以把结果作为一个文档来返回,也可以把结果写入集合。输入集合和输出集合可以是分片的。

语法参数

更多参考: http://docs.mongodb.org/manual/reference/command/mapReduce/

  1. map: function() {emit(this.cat_id,this.goods_number); }, # 函数内部要调用内置的emit函数,cat_id代表根据cat_id来进行分组,goods_number代表把文档中的goods_number字段映射到cat_id分组上的数据,其中this是指向向前的文档的。
  2. reduce: function(cat_id,all_goods_number) {return Array.sum(all_goods_number)}, # cat_id代表着cat_id当前的这一组,all_goods_number代表当前这一组的goods_number集合
  3. out: <output>, # 输出到某一个集合中,注意本属性来还支持如果输出的集合如果已经存在了,那是替换,合并还是继续reduce? 另外还支持输出到其他db的分片中,具体用到时查阅文档
  4. query: <document>, # 一个查询表达式,是先查询出来,再进行mapReduce的
  5. sort: <document>, # 发往map函数前先给文档排序
  6. limit: <number>, # 发往map函数的文档数量上限,该参数貌似不能用在分片模式下的mapreduce
  7. finalize: function(key, reducedValue) {return modifiedObject; }, # 从reduce函数中接受的参数key与reducedValue,并且可以访问scope中设定的变量
  8. scope: <document>, # 指定一个全局变量,能应用于finalize和reduce函数
  9. jsMode: <boolean>, # 布尔值,是否减少执行过程中BSON和JS的转换,默认true,true时BSON-->js-->map-->reduce-->BSON,false时 BSON-->JS-->map-->BSON-->JS-->reduce-->BSON,可处理非常大的mapreduce。
  10. verbose: <boolean> # 是否产生更加详细的服务器日志,默认true

实例

简单应用实例

  1. # 求每组的库存总量
  2. var map = function(){
  3. emit(this.cat_id,this.goods_number);
  4. }
  5. var reduce = function(cat_id,numbers){
  6. return Array.sum(numbers);
  7. }
  8. db.goods.mapReduce(map,reduce,{out:'res'})
  9. # 查看Array支持的方法
  10. for(var i in Array){
  11. printjson(i);
  12. }
  13. "contains"
  14. "unique"
  15. "shuffle"
  16. "tojson"
  17. "fetchRefs"
  18. "sum"
  19. "avg"
  20. "stdDev"
  21. # 求每个栏目的平均价格
  22. var map = function(){
  23. emit(this.cat_id,this.shop_price);
  24. }
  25. var reduce = function(cat_id,prices){
  26. var avgprice = Array.avg(prices);
  27. return Math.round(avgprice,2);
  28. }
  29. db.goods.mapReduce(map,reduce,{out:'res'});
  30. # 求出每组的最大价格
  31. var map = function(){
  32. emit(this.cat_id,this.shop_price);
  33. }
  34. //错误操作 ↓↓ 应该在finalize函数中做处理
  35. var reduce = function(cat_id,prices){
  36. var max = 0;
  37. for(var i in prices){
  38. if(i > max)
  39. max = i;
  40. }
  41. return max;
  42. }
  43. var reduce = function(cat_id,prices){
  44. return {cat_id:cat_id,prices:prices};
  45. }
  46. var finalize = function(cat_id, prices) {
  47. var max = 0;
  48. if(prices.prices !== null){
  49. var obj = prices.prices;
  50. for(var i in obj){
  51. if(obj[i] > max)
  52. max = obj[i]
  53. }
  54. }
  55. return max == 0 ? prices : max;
  56. }
  57. db.goods.mapReduce(map,reduce,{out:'res1',finalize:finalize,query:{'shop_price':{$gt:0}}});
  58. # 获得每组的商品集合
  59. var map = function(){
  60. emit(this.cat_id,this.goods_name);
  61. }
  62. var reduce = function(cat_id,goods_names){
  63. return {cat_id:cat_id,goods_names:goods_names}
  64. }
  65. var finalize = function(key, reducedValue) {
  66. return reducedValue == null ? 'none value' : reducedValue; //对reduce的值进行二次处理
  67. }
  68. db.runCommand({
  69. mapReduce:'goods',
  70. map:map,
  71. reduce:reduce,
  72. finalize:finalize,
  73. out:'res2'
  74. })
  75. # 对于price大于100的才进行分组映射
  76. ## 方法1:
  77. var map = function(){
  78. if(this.shop_price > 100){
  79. emit(this.cat_id,{name:this.goods_name,price:this.shop_price});
  80. }
  81. }
  82. var reduce = function(cat_id,goods_names){
  83. return {cat_id:cat_id,goods_names:goods_names}
  84. }
  85. db.runCommand({
  86. mapReduce:'goods',
  87. map:map,
  88. reduce:reduce,
  89. out:'res2'
  90. })
  91. ## 方法2 首推此方法
  92. var map = function(){
  93. emit(this.cat_id,{name:this.goods_name,price:this.shop_price});
  94. }
  95. var reduce = function(cat_id,goods_names){
  96. return {cat_id:cat_id,goods_names:goods_names}
  97. }
  98. db.runCommand({
  99. mapReduce:'goods',
  100. map:map,
  101. reduce:reduce,
  102. query:{'shop_price':{$gt:100}},
  103. out:'res2'
  104. })

官网实例

  1. # 数据结构
  2. {
  3. _id: ObjectId("50a8240b927d5d8b5891743c"),
  4. cust_id: "abc123",
  5. ord_date: new Date("Oct 04, 2012"),
  6. status: 'A',
  7. price: 25,
  8. items: [ { sku: "mmm", qty: 5, price: 2.5 },
  9. { sku: "nnn", qty: 5, price: 2.5 } ]
  10. }
  11. # 计算每个顾客的总金额
  12. var mapFunction1 = function() {
  13. emit(this.cust_id, this.price);
  14. };
  15. var reduceFunction1 = function(keyCustId, valuesPrices) {
  16. return Array.sum(valuesPrices);
  17. };
  18. db.orders.mapReduce(
  19. mapFunction1,
  20. reduceFunction1,
  21. { out: "map_reduce_example" }
  22. )
  23. # 计算订单总量和每种 sku 订购量的平均值
  24. var mapFunction2 = function() {
  25. for (var idx = 0; idx < this.items.length; idx++) {
  26. var key = this.items[idx].sku;
  27. var value = {
  28. count: 1,
  29. qty: this.items[idx].qty
  30. };
  31. emit(key, value);
  32. }
  33. };
  34. var reduceFunction2 = function(keySKU, countObjVals) {
  35. reducedVal = { count: 0, qty: 0 };
  36. for (var idx = 0; idx < countObjVals.length; idx++) {
  37. reducedVal.count += countObjVals[idx].count;
  38. reducedVal.qty += countObjVals[idx].qty;
  39. }
  40. return reducedVal;
  41. };
  42. var finalizeFunction2 = function (key, reducedVal) {
  43. reducedVal.avg = reducedVal.qty/reducedVal.count;
  44. return reducedVal;
  45. };
  46. db.orders.mapReduce(
  47. mapFunction2,
  48. reduceFunction2,
  49. {
  50. out: { merge: "map_reduce_example" },
  51. query: { ord_date:
  52. { $gt: new Date('01/01/2012') }
  53. },
  54. finalize: finalizeFunction2
  55. }
  56. )
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注