Splitting Shard Chunks

MongoDB uses two key operations to facilitate sharding - split and migrate. Migrate moves a chunk (the data associated with a key range) to another shard. This is done as needed to rebalance. Split splits a chunk into two ranges; this is done to assure no one chunk is unusually large. Split is an inexpensive metadata operation, while migrate is expensive as large amounts of data may be moving server to server.

Both splits and migrates are performed automatically. MongoDB has a sub-system called Balancer, which monitors shards loads and moves chunks around if it finds an imbalance. If you add a new shard to the system, some chunks will eventually be moved to that shard to spread out the load.

A recently split chunk may be moved immediately to a new shard if the system predicts that future insertions will benefit from that move.

Manually Splitting a Chunk

Typically there is no need to manually split a chunk.

The following command splits the chunk where the { _id : 99 }} resides (or would reside if present) in two. The key used as the split point is computed internally and is approximately the key which would divide the chunk in two equally sized new chunks.

> use admin
switched to db admin
> db.runCommand( { split : "test.foo" , find : { _id : 99 } } )
...

The Balancer treats all chunks the same way, regardless if they were generated by a manual or an automatic split.

Pre-splitting

There is a second version of the split command that takes the exact key you'd like to split on.  Often this is most useful when you do not initially have data in a collection, but want to ensure that data is loaded in a distributed way to multiple shards.

In the example below the command splits the chunk where the _id 99 would reside using that key as the split point. Again note that a key need not exist for a chunk to use it in its range. The chunk may even be empty.

> use admin
switched to db admin
> db.runCommand( { split : "test.foo" , middle : { _id : 99 } } )
...

This version of the command allows one to do a data presplitting that is especially useful in a load. If the range and distribution of keys to be inserted are known in advance, the collection can be split proportionately to the number of servers using the command above, and the (empty) chunks could be migrated upfront using the moveChunk command.

Pre-Splitting Example #1

Lets say you have 5 shards, and want to insert 100M user profiles sharded by email address. What you should do before inserting is

for ( var x=97; x<97+26; x++ ){
  for( var y=97; y<97+26; y+=6 ) {
    var prefix = String.fromCharCode(x) + String.fromCharCode(y);
    db.runCommand( { split : <collection> , middle : { email : prefix } } );
  }
}

Then wait for the system to balance (should take about 5 minutes).

Pre-Splitting Example #2

Lets assume you have a sharded database setup that looks like this:

> db.printShardingStatus()
--- Sharding Status ---
  sharding version: { "_id" : 1, "version" : 3 }
  shards:
      { "_id" : "shard0000", "host" : "localhost:30000" }
      { "_id" : "shard0001", "host" : "localhost:30001" }
  databases:
	{ "_id" : "admin", "partitioned" : false, "primary" : "config" }

and you would like to bulk insert 100M documents into a new sharded collection. Each document has a "hash" field which contains a (unrealistically small) 4-byte hexadecimal string hash value like "00aa". Ideally half the inserts will go to each shard, since there are two shards, and since the range of the hash value is limited, we are sure that all documents will fall between "0000" and "ffff". Using the hash value as a shard key will ensure our writes are distributed evenly between the shards, assuming chunks are also distributed evenly between shards.

The first step is to create a sharded collection to contain the data, which can be done in three steps:

> use admin
> db.runCommand({ enableSharding : "foo" })

Next, we add a unique index to the collection "foo.bar" which is required for the shard key.

> use foo
> db.bar.ensureIndex({ hash : 1 }, { unique : true })

Finally we shard the collection (which contains no data) using the hash value.

> use admin
> db.runCommand({ shardCollection : "foo.bar", key : { hash : 1 } })
> db.printShardingStatus()
--- Sharding Status ---
  sharding version: { "_id" : 1, "version" : 3 }
  shards:
      { "_id" : "shard0000", "host" : "localhost:30000" }
      { "_id" : "shard0001", "host" : "localhost:30001" }
  databases:
	{ "_id" : "admin", "partitioned" : false, "primary" : "config" }
	{ "_id" : "test", "partitioned" : false, "primary" : "shard0001" }
	{ "_id" : "foo", "partitioned" : true, "primary" : "shard0001" }
		foo.bar chunks:
				shard0001	1
			{ "hash" : { "$MinKey" : true } } -->> { "hash" : { "$MaxKey" : true } } on : shard0001 { "t" : 1000, "i" : 0 }

Note that one chunk exists on shard0001, and contains all values from $MinKey to $MaxKey. All inserts will initially go to this chunk, which is only on a single shard. To pre-split this chunk such that inserts go to two separate shards, we just need to choose a midpoint and move one of the chunks.

> use admin
> db.runCommand({ split : "foo.bar", middle : { hash : "8000" } })
> db.runCommand({ moveChunk : "foo.bar", find : { hash : "8000" }, to : "shard0000" })
> db.printShardingStatus()
--- Sharding Status ---
  sharding version: { "_id" : 1, "version" : 3 }
  shards:
      { "_id" : "shard0000", "host" : "localhost:30000" }
      { "_id" : "shard0001", "host" : "localhost:30001" }
  databases:
	{ "_id" : "admin", "partitioned" : false, "primary" : "config" }
	{ "_id" : "test", "partitioned" : false, "primary" : "shard0001" }
	{ "_id" : "foo", "partitioned" : true, "primary" : "shard0001" }
		foo.bar chunks:
				shard0001	1
				shard0000	1
			{ "hash" : { "$MinKey" : true } } -->> { "hash" : "8000" } on : shard0001 { "t" : 2000, "i" : 1 }
			{ "hash" : "8000" } -->> { "hash" : { "$MaxKey" : true } } on : shard0000 { "t" : 2000, "i" : 0 }

Inserts will now go to both shards equally. 

Chunks will not split until the data reaches a certain minimum amount in size (hundreds of megabytes). Until this occurs balancing and migration will not take place. When the data volume is this small, distributing data between multiple servers is not required anyway. When pre-splitting manually, many chunks can exist even if very little data is present for each chunk initially.

Pre-Splitting Example #3 – UUID's

Suppose our shard key has UUID's for values. To presplit to 100 chunks we could predefine the following ranges:

["00", "01")
["01", "02")
...
["98","99")
["99",":")

The above example shows UUIDs as strings. Storing them as BinData is more efficient. For example, to generate the value corresponding to "98" above one can invoke:

> b = UUID("98000000000000000000000000000000")
> b.hex()
98000000000000000000000000000000

Follow @mongodb

MongoDB Pittsburgh - May 15
MongoNYC - May 23
MongoDB Paris - Jun 14
MongoDB UK - Jun 20
MongoDC - June 26


Enter labels to add to this page:
Please wait 
Looking for a label? Just start typing.

PLEASE POST QUESTIONS IN THE USER GROUPS FORUM. Post non-question comments and helpful hints here.

blog comments powered by Disqus