Eda Eren

May 5, 2023
  • Miscellaneous

MongoDB Aggregation Pipeline Stages 101

In database management, aggregation is defined as (to quote Wikipedia):

a function where the values of multiple rows are grouped together to form a single summary value.

Simply, it is the collection and summary of data.

In MongoDB, an aggregation pipeline is a series of stages completed on the data in order, where a stage is a built-in method that does not permanently alter the data. Aggregation pipeline can be used for many tasks like filtering or grouping data.

To use an aggregation operation, we can use .aggregate() method on the collection, like below:

db.collection.aggregate([
{
$match: {
{ size: "small" }
},
...
}
])
db.collection.aggregate([
{
$match: {
{ size: "small" }
},
...
}
])

In this example, .aggregate() is the aggregation method, $match is the aggregation stage, and { size: "small" } is the expression we use to query the collection where the size field is "small". After $match, we can continue listing stages in the aggregation pipeline. Of course, since this is a pipeline, and each stage forwards the values on to the next stage, the ordering of stages matters.

Let's take a look at some of the aggregation pipeline stages we can use in MongoDB.

Using $match

$match filters the documents that match a given expression, and pass them to the next stage in the pipeline. Since it filters the documents, it reduces the number of documents to operate on, and therefore lessens the amount of processing required. So, it is better if we use it early in our pipeline.

Using $group

$group, groups the documents by a given group key, and for each unique value of a group key, the output is only one document.

Let's look at an example data first. To see a sample document in a given collection, we can use .findOne() method.

Let's say we're inside a bird_data database and want to look at how a document looks like in the sightings collection:

db.sightings.findOne()
db.sightings.findOne()

A sample document is returned:

{
_id: ObjectId("62cf32bdcfe5bbb25ee815fc"),
species_common: 'Eastern Bluebird',
species_scientific: 'Sialia sialis',
date: ISODate("2022-01-18T18:24:00.000Z"),
location: { type: 'Point', coordinates: [ 40, -73 ] }
}
{
_id: ObjectId("62cf32bdcfe5bbb25ee815fc"),
species_common: 'Eastern Bluebird',
species_scientific: 'Sialia sialis',
date: ISODate("2022-01-18T18:24:00.000Z"),
location: { type: 'Point', coordinates: [ 40, -73 ] }
}

Let's now use an aggregation pipeline:

db.sightings.aggregate([
{
$match: {
"species_common": "Eastern Bluebird"
}
},
{
$group: {
_id: "$location.coordinates",
number_of_sightings: { $count: { } }
}
}
])
db.sightings.aggregate([
{
$match: {
"species_common": "Eastern Bluebird"
}
},
{
$group: {
_id: "$location.coordinates",
number_of_sightings: { $count: { } }
}
}
])

In this example, we first want to filter a specific bird species, Eastern Bluebird. We do that with $match. Then we create a $group stage where we group the documents based on location coordinates. And, within the groups, we create a field called number_of_sightings to show how many documents there are in each group.

The output looks like this:

[
{ _id: [ 40, -74 ], number_of_sightings: 3 },
{ _id: [ 41, -74 ], number_of_sightings: 1 },
{ _id: [ 40, -73 ], number_of_sightings: 1 }
]
[
{ _id: [ 40, -74 ], number_of_sightings: 3 },
{ _id: [ 41, -74 ], number_of_sightings: 1 },
{ _id: [ 40, -73 ], number_of_sightings: 1 }
]

Using $sort

$sort does what you think it does, it sorts the documents. If the value given is 1, it sorts in the ascending order, if the value is -1 —you guessed it— it sorts in the descending order.

Using $limit

$limit is also clear, it limits the number of documents to show. We know that the order is important, and here is a chance to remember that. Say, we are going to use both $sort and $limit stages. If we sort before limit, the output will be the first <number-of-limit> documents on sorted data. Otherwise, if we limit before sorting, it will return the sorted version of the first <number-of-limit> documents in our collection.

Let's see an example of them together:

db.sightings.aggregate([
{
$sort: { "location.latitude": 1 }
},
{
$limit: 4
}
])
db.sightings.aggregate([
{
$sort: { "location.latitude": 1 }
},
{
$limit: 4
}
])

Here, we have a $sort stage that sorts the data according to the location.latitude field in the ascending order. Then, we limit the number of documents to 4.

Using $set

To create new fields, or change the value of existing fields, we can use the $set stage.

db.birds.aggregate([
{
$set: { "class": "bird" }
}
])
db.birds.aggregate([
{
$set: { "class": "bird" }
}
])

It simply adds the field of class with the value of "bird" to the returned document.

Using $count

$count creates a new document with the given field name with the value of the number of documents at that stage in the aggregation pipeline.

An example usage after the $match stage:

db.sightings.aggregate([
{
$match: {
date: {
$gt: ISODate('2022-01-01T00:00:00.000Z'),
$lt: ISODate('2023-01-01T00:00:00.000Z')
},
species_common: 'Eastern Bluebird'
}
},
{
$count: 'bluebird_sightings_2022'
}
])
db.sightings.aggregate([
{
$match: {
date: {
$gt: ISODate('2022-01-01T00:00:00.000Z'),
$lt: ISODate('2023-01-01T00:00:00.000Z')
},
species_common: 'Eastern Bluebird'
}
},
{
$count: 'bluebird_sightings_2022'
}
])

Using $project

We can choose to include or exclude fields with the $project stage. It is similar to the .find() method, where the second value we pass is the projection. The value 1 indicates that we want that field to be included, 0 indicates that we want it to be excluded.

$project stage is more appropriate to use last in the aggregation pipeline, as it just projects the given fields.

Here is an example where we only want the fields of date, species_common, and _id to be shown:

db.sightings.aggregate([
{
$project: { "date": 1, "species_common": 1, _id: 1 }
}
])
db.sightings.aggregate([
{
$project: { "date": 1, "species_common": 1, _id: 1 }
}
])

Using $out

$out is a stage that you can only use as the last one, it just writes the documents in an aggregation pipeline into a collection. If the collection exists, it overwrites it.

db.sightings.aggregate([
{
$match: {
date: {
$gte: ISODate("2022-01-01T00:00:00.000Z")
}
}
},
{
$out: "sightings_2022"
}
])
db.sightings.aggregate([
{
$match: {
date: {
$gte: ISODate("2022-01-01T00:00:00.000Z")
}
}
},
{
$out: "sightings_2022"
}
])

There are a lot of aggregation pipeline stages to use, and as always, the best place to look for and learn about them is the official documentation.

Happy coding.