|
Map/reduce in MongoDB is useful for batch processing of data and aggregation operations. It is similar in spirit to using something like Hadoop with all input coming from a collection and output going to a collection. Often, in a situation where you would have used GROUP BY in SQL, map/reduce is the right tool in MongoDB.
Overviewmap/reduce is invoked via a database command. Typically the database creates a collection to hold output of the operation. map and reduce functions are written in JavaScript and execute on the server. Command syntax: db.runCommand(
{ mapreduce : <collection>,
map : <mapfunction>,
reduce : <reducefunction>
[, query : <query filter object>]
[, sort : <sorts the input objects using this key. Useful for optimization, like sorting by the emit key for fewer reduces>]
[, limit : <number of objects to return from collection>]
[, out : <see output options below>]
[, keeptemp: <true|false>]
[, finalize : <finalizefunction>]
[, scope : <object where fields go into javascript global scope >]
[, jsMode : true]
[, verbose : true]
}
);
* finalize - function to apply to all the results when finished
Incremental Map-reduceIf the data set over which you'd like to perform map-reduce aggregations is constantly growing, then you may want to take advantage of incremental map-reduce. The prevents you from having to aggregate over the entire data set each time you want to see your results. To perform incremental map-reduce, take the following steps: 1. First, run a map-reduce job over an existing collection, and output the data to it's own output collection. 2. When you have more data to process, run a second map-reduce job, but use the query option to the filter the documents to include only new documents. 3. Use the reduce output option. This will use your reduce function to merge the new data into the existing output collection. Output optionspre-v1.8: If you do not specify a value for out, then the results will be placed into a temporary collection whose name will be given in command's output (see below). Otherwise, you can specify the name of a collection for the out option and the results will be placed there. v1.8+: the output options have changed. Map-reduce no longer generates temporary collections (thus, keepTemp has been removed). Now, you must always supply a value for out. The out directives are:
For example: db.users.mapReduce(map, reduce, {out: { inline : 1}});
Additional options within out objects are:
Note: the order of the objects in the out parameter matter. Result object{
[results : <document_array>,]
[result : <collection_name> | {db: <db>, collection: <collection_name>},]
timeMillis : <job_time>,
counts : {
input : <number of objects scanned>,
emit : <number of times emit was called>,
output : <number of items in output collection>
} ,
ok : <1_if_ok>
[, err : <errmsg_if_error>]
}
Either the result or the results field will be present depending on your output type. The results element is only present if the inline output option was used. The value of the results element is an array of embedded documents containing the results. If you chose any other output type the result field will be a string with the name of the collection holding the results, or an embedded document containing the db and collection if you chose to output to another db. A command helper is available in the MongoDB shell : db.collection.mapReduce(mapfunction,reducefunction[,options]); map, reduce, and finalize functions are written in JavaScript. Map FunctionThe map function references the variable this to inspect the current object under consideration. A map function calls emit(key,value) any number of times to feed data to the reducer. In most cases you will emit once per input document, but in some cases such as counting tags, a given document may have one, many, or even zero tags. Each emit is limited to 50% of the maximum document size (e.g. 4MB for 1.6.x and 8MB for 1.8.x). function map(void) -> void Reduce FunctionWhen you run a map/reduce, the reduce function will receive an array of emitted values and reduce them to a single value. Because the reduce function might be invoked more than once for the same key, the structure of the object returned by the reduce function must be identical to the structure of the map function's emitted value. We can clarify this with a simple example. Suppose we're iterating over a collection of documents that represent user comments. A sample document might look like this: { username: "jones",
likes: 20,
text: "Hello world!"
}
We want to use map/reduce to count the total number of comments per user and aggregate the total number of "likes" received across all of a user's comments. To do this, we'd first write a map function like this one: function() {
emit( this.username, {count: 1, likes: this.likes} );
}
This essentially says that we'll be grouping by username and aggregating using an object with fields for count and likes. When map/reduce is actually run, an array of values for each username will be sent to the reduce function. That's why the reduce function is always written to process an array of values. Here's the appropriate function for this example: function(key, values) {
var result = {count: 0, likes: 0};
values.forEach(function(value) {
result.count += value.count;
result.likes += value.likes;
});
return result;
}
Notice that the result document has the same structure as the documents emitted by the map function. This is important because, when the reduce function is run against a given key, it's not guaranteed to process every single value for that key (or username). In fact, the reduce function may have to run more than once. For example, while processing the comments collection, the map function might encounter ten comments from the user "jones." It then sends those comments' data to be reduced, and this results in the following aggregate object: { count: 10, likes: 247 }
Later, the map function encounters one more comment document by "jones." When this happens, the values in the extra comment must be reduced against the already existing aggregate value. If the new emitted document looks like this: { count: 1, likes: 5 }
Then the reduce function will be invoked in this way: reduce("jones", [ {count: 10, likes: 247}, { count: 1, likes: 5} ] )
And the resulting document will be a simple combination (or reduction) of those values: { count: 11, likes: 252 }
So long as you understand that the reduce function might be invoked more than once for the same key, it's easy to see why the this function must return a value whose structure matches the map function's emitted value. A more technical explanationfunction reduce(key, array_of_value) -> value OR function reduce(key_obj, [value_obj, value_obj, ...]) -> value_obj The map/reduce engine may invoke reduce functions iteratively; thus, these functions must be idempotent. That is, the following must hold for your reduce function: for all k,vals : reduce( k, [reduce(k,vals)] ) == reduce(k,vals)
This also means the following is true: reduce( k, [A, B] ) == reduce( k, [B, A] ) If you need to perform an operation only once, use a finalize function.
Finalize FunctionA finalize function may be run after reduction. Such a function is optional and is not necessary for many map/reduce cases. The finalize function takes a key and a value, and returns a finalized value. function finalize(key, value) -> final_value Your reduce function may be called multiple times for the same object. Use finalize when something should only be done a single time at the end; for example calculating an average. jsMode flagv2.0+
Thus it requires several translations but it can handle very large datasets during mapping by using a temporary collection.
The execution time may be significantly reduced. Note that this mode is limited by either
Sharded EnvironmentsThere are 2 aspects of sharding with Map/Reduce, input and output. Sharded inputIf the input collection is sharded, MongoS will automatically dispatch the map/reduce job to each of the shard, to be executed in parallel. There is no special option required. MongoS will then wait for jobs on all shards to finish. Sharded outputBy default the output collection will not be sharded. The process is:
If using the "sharded" option in the "out" object, the output will be sharded using "_id" as the shard key.
Notes about sharded output:
ExamplesShell Example 1The following example assumes we have an events collection with objects of the form: { time : <time>, user_id : <userid>, type : <type>, ... }
We then use MapReduce to extract all users who have had at least one event of type "sale": > m = function() { emit(this.user_id, 1); }
> r = function(k,vals) { return 1; }
> res = db.events.mapReduce(m, r, { query : {type:'sale'} });
> // or in v1.8+:
> // res = db.events.mapReduce(m, r, { query : {type:'sale'}, out : 'example1' });
> db[res.result].find().limit(2)
{ "_id" : 8321073716060 , "value" : 1 }
{ "_id" : 7921232311289 , "value" : 1 }
If we also wanted to output the number of times the user had experienced the event in question, we could modify the reduce function like so: > r = function(k,vals) {
... var sum=0;
... for(var i in vals) sum += vals[i];
... return sum;
... }
Note, here, that we cannot simply return vals.length, as the reduce may be called multiple times. Shell Example 2$ ./mongo
> db.things.insert( { _id : 1, tags : ['dog', 'cat'] } );
> db.things.insert( { _id : 2, tags : ['cat'] } );
> db.things.insert( { _id : 3, tags : ['mouse', 'cat', 'dog'] } );
> db.things.insert( { _id : 4, tags : [] } );
> // map function
> m = function(){
... this.tags.forEach(
... function(z){
... emit( z , { count : 1 } );
... }
... );
...};
> // reduce function
> r = function( key , values ){
... var total = 0;
... for ( var i=0; i<values.length; i++ )
... total += values[i].count;
... return { count : total };
...};
> res = db.things.mapReduce(m, r, { out : "myoutput" } );
> res
{
"result" : "myoutput",
"timeMillis" : 12,
"counts" : {
"input" : 4,
"emit" : 6,
"output" : 3
},
"ok" : 1,
}
> db.myoutput.find()
{"_id" : "cat" , "value" : {"count" : 3}}
{"_id" : "dog" , "value" : {"count" : 2}}
{"_id" : "mouse" , "value" : {"count" : 1}}
> db.myoutput.drop()
* gist Mongo Shell Script with Incremental Map-Reduce and FinalizeThis example is a JavaScript script file. The map-reduce can be run repeatedly on different dates to incrementally augment the result. The finalize option computes averages. The output of commands and the queries themselves are saved to variables so that they can be examined after the sample script is run via the load() command in the shell. // work in the map-reduce example db db = db.getSiblingDB("mrex"); // clean out from previous runs of this sample -- you wouldn't do this in production db.session.drop(); db.session_stat.drop(); // simulate saving records that log the lengths of user sessions in seconds db.session.save({userid:"a", ts: ISODate('2011-11-03 14:17:00'), length: 95}); db.session.save({userid:"b", ts: ISODate('2011-11-03 14:23:00'), length: 110}); db.session.save({userid:"c", ts: ISODate('2011-11-03 15:02:00'), length: 120}); db.session.save({userid:"d", ts: ISODate('2011-11-03 16:45:00'), length: 45}); db.session.save({userid:"a", ts: ISODate('2011-11-04 11:05:00'), length: 105}); db.session.save({userid:"b", ts: ISODate('2011-11-04 13:14:00'), length: 120}); db.session.save({userid:"c", ts: ISODate('2011-11-04 17:00:00'), length: 130}); db.session.save({userid:"d", ts: ISODate('2011-11-04 15:37:00'), length: 65}); /* For each user, count up the number of sessions, and figure out the average session length. Note that to be able to find the average session length, we need to keep a total of the all the session lengths, and then divide at the end. We're also going to set this up so that we can repeat the process to get incremental results over time. */ function mapf() { emit(this.userid, {userid:this.userid, total_time:this.length, count:1, avg_time:0}); } function reducef(key, values) { var r = {userid:key, total_time:0, count:0, avg_time:0}; values.forEach(function(v) { r.total_time += v.total_time; r.count += v.count; }); return r; } function finalizef(key, value) { if (value.count > 0) value.avg_time = value.total_time / value.count; return value; } /* Here's the initial run. The query isn't technically necessary, but is included here to demonstrate how this is the same map-reduce command that will be issued later to do incremental adjustment of the computed values. The query is assumed to run once a day at midnight. */ var mrcom1 = db.runCommand( { mapreduce:"session", map:mapf, reduce:reducef, query: {ts: {$gt:ISODate('2011-11-03 00:00:00')}}, out: { reduce: "session_stat" }, finalize:finalizef }); function saveresults(a) { /* append everything from the cursor to the argument array */ var statcurs = db.session_stat.find(); while(statcurs.hasNext()) a.push(statcurs.next()); } /* save the results into mrres1 */ var mrres1 = []; saveresults(mrres1); /* add more session records (the next day) */ db.session.save({userid:"a", ts: ISODate('2011-11-05 14:17:00'), length: 100}); db.session.save({userid:"b", ts: ISODate('2011-11-05 14:23:00'), length: 115}); db.session.save({userid:"c", ts: ISODate('2011-11-05 15:02:00'), length: 125}); db.session.save({userid:"d", ts: ISODate('2011-11-05 16:45:00'), length: 55}); /* Run map reduce again. This time, the query date is the next midnight, simulating a daily job that is used to update the values in session_stat. This can be repeated daily (or on other periods, with suitable adjustments to the time). */ var mrcom2 = db.runCommand( { mapreduce:"session", map:mapf, reduce:reducef, query: {ts: {$gt:ISODate('2011-11-05 00:00:00')}}, out: { reduce: "session_stat" }, finalize:finalizef }); /* save the results into mrres2 */ var mrres2 = []; saveresults(mrres2); More Examples
Note on Permanent CollectionsEven when a permanent collection name is specified, a temporary collection name will be used during processing. At map/reduce completion, the temporary collection will be renamed to the permanent name atomically. Thus, one can perform a map/reduce job periodically with the same target collection name without worrying about a temporary state of incomplete data. This is very useful when generating statistical output collections on a regular basis. ParallelismSee info on Concurrency PresentationsMap/reduce, geospatial indexing, and other cool features - Kristina Chodorow at MongoSF (April 2010) TroubleshootingSee Also |

PLEASE POST QUESTIONS IN THE USER GROUPS FORUM. Post non-question comments and helpful hints here.
blog comments powered by Disqus