|
| 1 | +========== |
| 2 | +Map-Reduce |
| 3 | +========== |
| 4 | + |
| 5 | +.. default-domain:: mongodb |
| 6 | + |
| 7 | +Map-reduce operations can handle complex aggregation |
| 8 | +tasks. [#simple-aggregation-use-framework]_ To perform map-reduce operations, |
| 9 | +MongoDB provides the :dbcommand:`mapReduce` command and, in the |
| 10 | +:program:`mongo` shell, the wrapper :method:`db.collection.mapReduce()` |
| 11 | +method. |
| 12 | + |
| 13 | +This overview will cover: |
| 14 | + |
| 15 | +- :ref:`map-reduce-method` |
| 16 | + |
| 17 | +- :ref:`map-reduce-examples` |
| 18 | + |
| 19 | +- :ref:`map-reduce-incremental` |
| 20 | + |
| 21 | +- :ref:`map-reduce-sharded-cluster` |
| 22 | + |
| 23 | +- :ref:`map-reduce-additional-references` |
| 24 | + |
| 25 | +.. _map-reduce-method: |
| 26 | + |
| 27 | +mapReduce() |
| 28 | +----------- |
| 29 | + |
| 30 | +.. include:: /reference/method/db.collection.mapReduce.txt |
| 31 | + :start-after: mongodb |
| 32 | + :end-before: mapReduce-syntax-end |
| 33 | + |
| 34 | +.. _map-reduce-examples: |
| 35 | + |
| 36 | +Map-Reduce Examples |
| 37 | +------------------- |
| 38 | +.. include:: /includes/examples-map-reduce.rst |
| 39 | + :start-after: map-reduce-examples-begin |
| 40 | + :end-before: map-reduce-sum-price-wrapper-end |
| 41 | + |
| 42 | +.. include:: /includes/examples-map-reduce.rst |
| 43 | + :start-after: map-reduce-sum-price-cmd-end |
| 44 | + :end-before: map-reduce-item-counts-avg-wrapper-end |
| 45 | + |
| 46 | +.. _map-reduce-incremental: |
| 47 | + |
| 48 | +Incremental Map-Reduce |
| 49 | +---------------------- |
| 50 | + |
| 51 | +If the map-reduce dataset is constantly growing, then rather than |
| 52 | +performing the map-reduce operation over the entire dataset each time |
| 53 | +you want to run map-reduce, you may want to perform an incremental |
| 54 | +map-reduce. |
| 55 | + |
| 56 | +To perform incremental map-reduce: |
| 57 | + |
| 58 | +#. Run a map-reduce job over the current collection and output the |
| 59 | + result to a separate collection. |
| 60 | + |
| 61 | +#. When you have more data to process, run subsequent map-reduce job |
| 62 | + with: |
| 63 | + |
| 64 | + - the ``<query>`` parameter that specifies conditions that match |
| 65 | + *only* the new documents. |
| 66 | + |
| 67 | + - the ``<out>`` parameter that specifies the ``reduce`` action to |
| 68 | + merge the new results into the existing output collection. |
| 69 | + |
| 70 | +Consider the following example where you schedule a map-reduce |
| 71 | +operation on a ``sessions`` collection to run at the end of each day. |
| 72 | + |
| 73 | +**Data Setup** |
| 74 | + |
| 75 | +The ``sessions`` collection contains documents that log users' session |
| 76 | +each day and can be simulated as follows: |
| 77 | + |
| 78 | +.. code-block:: javascript |
| 79 | + |
| 80 | + db.sessions.save( { userid: "a", ts: ISODate('2011-11-03 14:17:00'), length: 95 } ); |
| 81 | + db.sessions.save( { userid: "b", ts: ISODate('2011-11-03 14:23:00'), length: 110 } ); |
| 82 | + db.sessions.save( { userid: "c", ts: ISODate('2011-11-03 15:02:00'), length: 120 } ); |
| 83 | + db.sessions.save( { userid: "d", ts: ISODate('2011-11-03 16:45:00'), length: 45 } ); |
| 84 | + |
| 85 | + db.sessions.save( { userid: "a", ts: ISODate('2011-11-04 11:05:00'), length: 105 } ); |
| 86 | + db.sessions.save( { userid: "b", ts: ISODate('2011-11-04 13:14:00'), length: 120 } ); |
| 87 | + db.sessions.save( { userid: "c", ts: ISODate('2011-11-04 17:00:00'), length: 130 } ); |
| 88 | + db.sessions.save( { userid: "d", ts: ISODate('2011-11-04 15:37:00'), length: 65 } ); |
| 89 | + |
| 90 | +**Initial Map-Reduce of Current Collection** |
| 91 | + |
| 92 | +#. Define the ``<map>`` function that maps the ``userid`` to an |
| 93 | + object that contains the fields ``userid``, ``total_time``, ``count``, |
| 94 | + and ``avg_time``: |
| 95 | + |
| 96 | + .. code-block:: javascript |
| 97 | + |
| 98 | + var mapFunction = function() { |
| 99 | + var key = this.userid; |
| 100 | + var value = { |
| 101 | + userid: this.userid, |
| 102 | + total_time: this.length, |
| 103 | + count: 1, |
| 104 | + avg_time: 0 |
| 105 | + }; |
| 106 | + |
| 107 | + emit( key, value ); |
| 108 | + }; |
| 109 | + |
| 110 | +#. Define the corresponding ``<reduce>`` function with two arguments |
| 111 | + ``key`` and ``values`` to calculate the total time and the count. |
| 112 | + The ``key`` corresponds to the ``userid``, and the ``values`` is an |
| 113 | + array whose elements corresponds to the individual objects mapped to the |
| 114 | + ``userid`` in the ``mapFunction``. |
| 115 | + |
| 116 | + .. code-block:: javascript |
| 117 | + |
| 118 | + var reduceFunction = function(key, values) { |
| 119 | + |
| 120 | + var reducedObject = { |
| 121 | + userid: key, |
| 122 | + total_time: 0, |
| 123 | + count:0, |
| 124 | + avg_time:0 |
| 125 | + }; |
| 126 | + |
| 127 | + values.forEach( function(value) { |
| 128 | + reducedObject.total_time += value.total_time; |
| 129 | + reducedObject.count += value.count; |
| 130 | + } |
| 131 | + ); |
| 132 | + return reducedObject; |
| 133 | + }; |
| 134 | + |
| 135 | +#. Define ``<finalize>`` function with two arguments ``key`` and |
| 136 | + ``reducedValue``. The function modifies the ``reducedValue`` document |
| 137 | + to add another field ``average`` and returns the modified document. |
| 138 | + |
| 139 | + .. code-block:: javascript |
| 140 | + |
| 141 | + var finalizeFunction = function (key, reducedValue) { |
| 142 | + |
| 143 | + if (reducedValue.count > 0) |
| 144 | + reducedValue.avg_time = reducedValue.total_time / reducedValue.count; |
| 145 | + |
| 146 | + return reducedValue; |
| 147 | + }; |
| 148 | + |
| 149 | +#. Perform map-reduce on the ``session`` collection using the |
| 150 | + ``mapFunction``, the ``reduceFunction``, and the |
| 151 | + ``finalizeFunction`` functions. Output the results to a collection |
| 152 | + ``session_stat``. If the ``session_stat`` collection already exists, |
| 153 | + the operation will replace the contents: |
| 154 | + |
| 155 | + .. code-block:: javascript |
| 156 | + |
| 157 | + db.runCommand( |
| 158 | + { |
| 159 | + mapreduce: "sessions", |
| 160 | + map: mapFunction, |
| 161 | + reduce:reduceFunction, |
| 162 | + out: { reduce: "session_stat" }, |
| 163 | + finalize: finalizeFunction |
| 164 | + } |
| 165 | + ); |
| 166 | + |
| 167 | +**Subsequent Incremental Map-Reduce** |
| 168 | + |
| 169 | +Assume the next day, the ``sessions`` collection grows by the following documents: |
| 170 | + |
| 171 | + .. code-block:: javascript |
| 172 | + |
| 173 | + db.session.save( { userid: "a", ts: ISODate('2011-11-05 14:17:00'), length: 100 } ); |
| 174 | + db.session.save( { userid: "b", ts: ISODate('2011-11-05 14:23:00'), length: 115 } ); |
| 175 | + db.session.save( { userid: "c", ts: ISODate('2011-11-05 15:02:00'), length: 125 } ); |
| 176 | + db.session.save( { userid: "d", ts: ISODate('2011-11-05 16:45:00'), length: 55 } ); |
| 177 | + |
| 178 | +5. At the end of the day, perform incremental map-reduce on the |
| 179 | + ``sessions`` collection but use the ``query`` field to select only the |
| 180 | + new documents. Output the results to the collection ``session_stat``, |
| 181 | + but ``reduce`` the contents with the results of the incremental |
| 182 | + map-reduce: |
| 183 | + |
| 184 | + .. code-block:: javascript |
| 185 | + |
| 186 | + db.runCommand( { |
| 187 | + mapreduce: "sessions", |
| 188 | + map: mapFunction, |
| 189 | + reduce:reduceFunction, |
| 190 | + query: { ts: { $gt: ISODate('2011-11-05 00:00:00') } }, |
| 191 | + out: { reduce: "session_stat" }, |
| 192 | + finalize:finalizeFunction |
| 193 | + } |
| 194 | + ); |
| 195 | + |
| 196 | +.. _map-reduce-temporay-collection: |
| 197 | + |
| 198 | +Temporary Collection |
| 199 | +-------------------- |
| 200 | + |
| 201 | +The map-reduce operation uses a temporary collection during processing. |
| 202 | +At completion, the temporary collection will be renamed to the |
| 203 | +permanent name atomically. Thus, one can perform a map-reduce operation |
| 204 | +periodically with the same target collection name without worrying |
| 205 | +about a temporary state of incomplete data. This is very useful when |
| 206 | +generating statistical output collections on a regular basis. |
| 207 | + |
| 208 | +.. _map-reduce-sharded-cluster: |
| 209 | + |
| 210 | +Sharded Cluster |
| 211 | +--------------- |
| 212 | + |
| 213 | +Sharded Input |
| 214 | +~~~~~~~~~~~~~ |
| 215 | + |
| 216 | +If the input collection is sharded, :program:`mongos` will |
| 217 | +automatically dispatch the map-reduce job to each shard to be executed |
| 218 | +in parallel. There is no special option required. :program:`mongos` |
| 219 | +will wait for jobs on all shards to finish. |
| 220 | + |
| 221 | +Sharded Output |
| 222 | +~~~~~~~~~~~~~~ |
| 223 | + |
| 224 | +By default the output collection will not be sharded. The process is: |
| 225 | + |
| 226 | +- :program:`mongos` dispatches a map-reduce finish job to the shard |
| 227 | + that will store the target collection. |
| 228 | + |
| 229 | +- The target shard will pull results from all other shards, run a final |
| 230 | + reduce/finalize, and write to the output. |
| 231 | + |
| 232 | +- If using the sharded option in the ``<out>`` parameter, the output will be |
| 233 | + sharded using ``_id`` as the shard key. |
| 234 | + |
| 235 | +.. versionchanged:: 2.2 |
| 236 | + |
| 237 | +- If the output collection does not exist, the collection is created |
| 238 | + and sharded on the ``_id`` field. Even if empty, its initial chunks |
| 239 | + are created based on the result of the first step of the map-reduce |
| 240 | + operation. |
| 241 | + |
| 242 | +- :program:`mongos` dispatches, in parallel, a map-reduce finish job |
| 243 | + to every shard that owns a chunk. |
| 244 | + |
| 245 | +- Each shard will pull the results it owns from all other shards, run a |
| 246 | + final reduce/finalize, and write to the output collection. |
| 247 | + |
| 248 | +.. note:: |
| 249 | + |
| 250 | + - During additional map-reduce jobs, chunk splitting will be done as needed. |
| 251 | + |
| 252 | + - Balancing of chunks for the output collection is automatically |
| 253 | + prevented during post-processing to avoid concurrency issues. |
| 254 | + |
| 255 | +Prior to version 2.1: |
| 256 | + |
| 257 | +- :program:`mongos` retrieves the results from each shard, doing a |
| 258 | + merge sort to order the results, and performs a reduce/finalize as |
| 259 | + needed. :program:`mongos` then writes the result to the output |
| 260 | + collection in sharded mode. |
| 261 | + |
| 262 | +- Only a small amount of memory is required even for large datasets. |
| 263 | + |
| 264 | +- Shard chunks do not get automatically split and migrated during |
| 265 | + insertion. Manual intervention is required until the chunks are |
| 266 | + granular and balanced. |
| 267 | + |
| 268 | +.. warning:: |
| 269 | + |
| 270 | + Sharded output for mapreduce has been overhauled in v2.2. Its use in |
| 271 | + earlier versions is not recommended. |
| 272 | + |
| 273 | +.. _map-reduce-additional-references: |
| 274 | + |
| 275 | +Additional References |
| 276 | +--------------------- |
| 277 | + |
| 278 | +.. seealso:: |
| 279 | + |
| 280 | + - :wiki:`Map-Reduce Concurrency |
| 281 | + <How+does+concurrency+work#Howdoesconcurrencywork-MapReduce>` |
| 282 | + |
| 283 | + - `MapReduce, Geospatial Indexes, and Other Cool Features <http://www.slideshare.net/mongosf/mapreduce-geospatial-indexing-and-other-cool-features-kristina-chodorow>`_ - Kristina Chodorow at MongoSF (April 2010) |
| 284 | + |
| 285 | + - :wiki:`Troubleshooting MapReduce` |
| 286 | + |
| 287 | +.. [#simple-aggregation-use-framework] For many simple aggregation tasks, see the |
| 288 | + :doc:`aggregation framework </applications/aggregation>`. |
0 commit comments