MapReduce

Based on v111 (2011-05-15更新) - オリジナル

MongoDBでのmap/reduceは、バッチでのデータ処理や集計処理で便利に使えます。すべての入力はコレクションから、すべての出力はコレクションへ、という方針はHadoopに少しにています。SQLでGROUP BYを使いたいようなとき、MongoDBではmap/reduceを使うのが正しいです。

MongoDBでは、インデックスと通常のクエリはmap/reduceとは別です。もし、CouchDBの使ったことがあるなら、これはとても大きな違いになります。MongoDBはでは、クエリーとインデックスはMySQLに近いです。 これらの操作については、 クエリーインデックス を参照してください。

概要

map/reduce はデータベース command を経由して呼び出され、基本的には処理結果を格納するためのコレクションが作成されます。 mapreduce 関数は JavaScript で記述され、サーバー上で実行されます。

コマンドの文法:

db.runCommand(
{ mapReduce : <collection>,
map : <map ファンクション名>,
reduce : <reduce ファンクション名>
[, query : <クエリーフィルタオブジェクト>]
[, sort : <入力オブジェクトに対して、ここで指定されたキーに対してソートを行います。reduceへ渡すキーを少なくするための最適化として有用です>]
[, limit : <最大何件のオブジェクトを返すか>]
[, out : <出力先のコレクション名>]
[, keeptemp: <true|false>]
[, outType : ("normal"|"merge"|"reduce")] -- 1.7.3 以上
[, finalize : <finalize ファンクション>]
[, scope : <object where fields go into javascript global scope >]
[, verbose : true]
 }
);
  • keeptemp - true を指定した場合、作成されたコレクションは「一時用」として扱われません。デフォルトはfalseです。out が指定された場合、コレクションは自動的に永続的に保存されるコレクションになります。(MongoDB <=1.6)
  • finalize - 各shardでのmap/reduceが全て終了時した後の全ての結果に対し実行されるファンクションです。
  • scope - map/reduce/finalize から呼び出すことのできる変数を渡します example mr5
  • verbose - 実行時間の統計を出します

Output options

MongoDB v1.7.3 以下: もし out オプションを指定しなかった場合、結果はコマンド出力が自動的に付与する名前の一時的なコレクションに返されます(下記参照)。そうでなければ、out で指定した名前のコレクションに結果が格納されます。

MongoDB 1.7.4+: output オプションは変更になり、map-reduce は一時コレクションに出力しなくなりました。 (よって keepTemp は削除されました)。現在は必ず out オプションを使用しなければなりません。 out オプションは以下の様に指定します:

  • "collectionName" - 結果を出力するコレクション名。デフォルトでは出力のタイプは "replace"となっています。
  • { replace : "collectionName" } - 出力は指定した名前と同じ名前の既存のコレクションに出力を置き換えます。言い換えれば、既存の同名コレクションの全内容を削除して今回の出力を格納します。
  • { merge : "collectionName" } - 以前の出力コレクションに今回の結果をマージします。言い換えれば、既存のコレクション内にある同名のキーは新出力の値で上書きされ、新出力に存在しないキーは以前の値を保持したままになります。
  • { reduce : "collectionName" } - もし以前の出力コレクションに同名のキーがあった場合、reduce ファンクションで指定されたオペレーションをそのキーに対して実行します。つまり新出力のキーの値と以前の出力コレクションのキーの値の2つに対してreduceが行われます。finalize ファンクションはこの操作の後に行われます。
  • { inline : 1} - このオプションではコレクションは作成されず、map-reduceオペレーションはRAM内で行われます。また、処理結果は result object として返ります。このオプションは出力セットが(1ドキュメントのサイズ上限である)16MB以内でないと使えないことに注意して下さい。

For example:

db.users.mapReduce(map, reduce, {out: { inline : 1}});

Additional options within out objects are:

  • "db" - the db name to output to.
     out : {replace : "collectionName", db : "otherDB"} 

Result object

{
[results : <document_array>,]
[result : <collection_name>,]
[db : <out-db>,]
timeMillis : <job_time>,
counts : {
input : <number of objects scanned>,
emit : <number of times emit was called>,
output : <number of items in output collection>
} ,
ok : <1_if_ok>
[, err : <errmsg_if_error>]
}

resultresults は出力タイプに依存します。 results は inline オプションを指定した時のみ該当します。 results の要素は mapreduce の出力を含む Embedded ドキュメントです。それ以外のオプションを選んだ場合は result に出力結果を保持しているコレクション名が入ります。

map ファンクション

map ファンクションは、 評価中の現在のオブジェクトを this 変数で指します。 mapファンクションは、 最低一回は emit(key,value) を呼ぶ必要があります。また何回でも呼ぶこともできます。各 emit で扱えるサイズはドキュメントサイズの上限の50%になります。(v1.6.x では4MB, v1.8.x では8MB)

function map(void) -> void

reduce ファンクション

map/reduceを実行すると map 処理の後、 reduce ファンクションは emit された値の配列を受け取り、それを1つの値に reduce します。reduce ファンクションは1つ以上の同名キーを呼び出すために、reduce ファンクションによって返されるオブジェクトの構造は map ファンクションで emit された値と同じ構造をもっていなければなりません。以下の簡単な例でそれを確認してみます:

user のコメントが格納されたドキュメントのコレクションのイテレーションを考えます。サンプルドキュメントは以下の様なものです:

 { username: "jones",
likes: 20,
text: "Hello world!"
}

ここでのゴールはユーザーごとのコメント数をカウントし、また全てのユーザーの "likes" の値の合計を求めることにします。これを行うために、 map function は以下の様に記述します:

 function() {
emit( this.username, {count: 1, likes: this.likes} );
}

この処理は本質的には username に対して group by し、countlikes のオブジェクトフィールドに対して集約関数を適用していることになります。

map.reduce が実際に実行されたら、 username ごとの 値の配列が reduce ファンクションに渡されて行きます。 reduce ファンクションはこの値の配列を処理するための記述を行います、以下が今回の例に対する適切な reduce function です:

 function(key, values) {
var result = {count: 0, likes: 0};

values.forEach(function(value) {
result.count += value.count;
result.likes += values.likes;
});

return result;
}

result ドキュメントが map ファンクションで emit されたドキュメントと同じ構造を持っていることに気づくことでしょう。これはとても重要なことです、なぜなら reduce ファンクションが与えられたキーに対して実行を行うとき、そのキー(または username)に対する値がいつも single value である保証は無いからです。実際、 reduce ファンクションは1回以上実行されます。例えばコメントのコレクションに対して処理を行うとき、 map ファンクションは "jones" ユーザーからのコメントに対しては10回処理を行います。これらのコメントデータは reduce 処理に回され、このキーに対する処理結果は以下ようななアグリゲーションオブジェクトになります:

 { count: 10, likes: 247 }

この後に "jones" によるドキュメントが map ファンクションに入ってくる事があります。これが起こったとき、後から入ってきたコメントは既にある集約結果の値と reduce されなければなりません。もし新しく以下の様な emit されたドキュメントが返ってきたとします:

 { count: 1, likes: 5 }

このとき reduce ファンクションは次の様に呼び出されます:

 reduce("jones", [ {count: 10, likes: 247}, { count: 1, likes: 5} ] )

そして結果のドキュメントはこれらの値が合わさったシンプルなものになります:

 { count: 11, likes: 252 }

reduce ファンクションが同名キーに対して1回以上呼び出されることを知っていれば、なぜ reduce ファンクションが map ファンクションで emit された値と同じ形式でないかは容易に理解できるはずです。

よりテクニカルな説明

function reduce(key, array_of_value) -> value

または

function reduce(key_obj, [value_obj, value_obj, ...]) -> value_obj

map/reduce エンジンは reduce ファンクションを繰り返し呼び出します。よってこれらのファンクションは等冪でなければなりません。つまり以下の式があなたの reduce ファンクションにも成立していなければなりません:

for all k,vals : reduce( k, [reduce(k,vals)] ) == reduce(k,vals)

また次も真でなければなりません:

reduce( k, [A, B] ) == reduce( k, [B, A] )

もしあなたが一度だけ処理したいオペレーションがあるなら、finalize ファンクションを使います。

mapファンクションのemitの2番目の引数の値(オブジェクト)とreduceで返す値(オブジェクト)の形式は同じでなければなりません。それはreduceが繰り返し呼び出されることを可能にするためです。もしそうしなければバグを引き起こし、このことを知らなければデバッグも大変です。
今のところ、reduceファンクションから返される値は配列であってはいけません。オブジェクトか数値です。

finalize ファンクション

finalize ファンクションは、reduceの後に走ります。 このファンクションはオプションで、多くのmap/reduce処理では必要ではありません。   finalize ファンクションはキーと値を受け取り、finalizeされた結果を返します。

function finalize(key, value) -> final_value 

reduce ファンクションは一つのオブジェクトに対して複数呼ばれることがあります。最後に一度だけ何かを実行したい場合に finalize を使ってください。たとえば、 何かの平均を計算する、と言った場合です。

shard環境

shardな環境では、map/reduceでの操作はすべてのshardで並行に処理されます。

シェルでの例 1

次の例では、 events コレクションが次の構造のオブジェクトを持つことを仮定しています。

{ time : <time>, user_id : <userid>, type : <type>, ... }

mapreduceを使い、"sale"というタイプのイベントを1つ以上持つユーザを抽出します。

> m = function() { emit(this.user_id, 1); }
> r = function(k,vals) { return 1; }
> res = db.events.mapReduce(m, r, { query : {type:'sale'} });
> db[res.result].find().limit(2)
{ "_id" : 8321073716060 , "value" : 1 }
{ "_id" : 7921232311289 , "value" : 1 }

もし、ユーザが何回のeventを経験したかという情報も出力したい場合には、reduceファンクションを以下のように書き換えます。

> r = function(k,vals) {
... var sum=0;
... for(var i in vals) sum += vals[i];
... return sum;
... }

注意。ここで、単純に vals.length を返すことはできません。reduceファンクションは複数回呼ばれることがあるからです。

シェルでの例 2

$ ./mongo
> db.things.insert( { _id : 1, tags : ['dog', 'cat'] } );
> db.things.insert( { _id : 2, tags : ['cat'] } );
> db.things.insert( { _id : 3, tags : ['mouse', 'cat', 'dog'] } );
> db.things.insert( { _id : 4, tags : [] } );

> // map function
> m = function(){
... this.tags.forEach(
... function(z){
... emit( z , { count : 1 } );
... }
... );
...};

> // reduce function
> r = function( key , values ){
... var total = 0;
... for ( var i=0; i<values.length; i++ )
... total += values[i].count;
... return { count : total };
...};

> res = db.things.mapReduce(m,r);
> res
{"timeMillis.emit" : 9 , "result" : "mr.things.1254430454.3" ,
"numObjects" : 4 , "timeMillis" : 9 , "errmsg" : "" , "ok" : 0}

> db[res.result].find()
{"_id" : "cat" , "value" : {"count" : 3}}
{"_id" : "dog" , "value" : {"count" : 2}}
{"_id" : "mouse" , "value" : {"count" : 1}}

> db[res.result].drop()

その他の例

永続的なコレクションについての注意

たとえ永続的なコレクション名が指定されたときでも、一時的なコレクション名が処理中に使われます。map/reduce が終わったとき、一時的なコレクションが永続的なコレクション名にリネームされます。このため、一時的な不完全なデータについて心配することなく、同じコレクション名を使い定期的にmap/reduce処理を実行することができます。これは、定期的に統計情報をコレクションに出力するときにとても便利です。

並列性

現在のところ、一つのmongodプロセスでのMapReduce処理は、シングルスレッドです。これは、現在のJavaScriptエンジンの制限のためです。この問題を解決するための方法を探していますが、今MapReduce処理を並列化したい場合には、shardingを使うか、クライアントサイドの自分のコードで行ってください。

プレゼンテーション

Map/reduce, geospatial indexing, and other cool features - Kristina Chodorow at MongoSF (April 2010)

Troubleshooting

参照


Enter labels to add to this page:
Please wait 
Looking for a label? Just start typing.

IF YOU HAVE A QUESTION, POST IT TO THE USER GROUP.

These pages are fine for comments, but for questions, your best bet will always be the MongoDB User Group.

blog comments powered by Disqus