Tuesday, September 5, 2017

MongoDB: Aggregation Pipeline

In this post we will:

Then we will discuss how to use:

Note: if you're getting no results when running these pipelines, be sure that you switched into the right database!

 

Introduction

Sometimes it is necessary to perform calculations on documents stored in MongoDB, such as for data analysis. There are several ways of doing this, one of which is to use "aggregation pipelines". These work similar to how piping works with Unix commands. Documents enter a pipeline that transforms them into aggregated results. Each step in an aggregation pipeline is called a "stage", and the results of each stage are passed to the next stage. These stages are described using an array of expressions (JSON objects) which start with a stage operator. A stage may filter-out or transform incoming documents or generate new documents based on the incoming documents, depending on the stage operator.

Almost all of these expressions are stateless, meaning they operate only on the current document. The exception to this are the accumulator operators.

Accumulator operators maintain running calculations as docs pass through the pipeline. For example, there is a $sum operator which provides a running total. This functionality changes when we use it in a different type of pipeline. For example when $sum is applied to an array field, it adds together all elements in that array. Examples of these two different behaviors will be provided below.

The stage operators we will cover are:

$project
either adds a new field or removes an existing field. For each incoming document, one modified document is outputted.
$sort
reorders the documents according to a sort key.
$match
filters the documents to allow only those that match a specified criteria. Each incoming document either passes-through the filter unmodified, or is not passed.
$count
counts the documents at this stage in the pipeline.
$group
groups input documents and then applies an accumulator expression (if any) to each group.
$unwind
creates one document for each element of an array field.

We have already seen some of these operators at work, as part of .find() and .sort().

The syntax of aggregation, if there is only one stage, is:

db.<collection>.aggregate(<A_Single_Stage>, <Options>);
or
db.<collection>.aggregate([<A_Single_Stage>], <Options>);
But if there are more than one stage, the syntax is:
db.<collection>.aggregate([<stage_1>, <stage_2>...], <Options>);

We will not be using the Options argument in this post. It is optional and will be omitted.

 

Two New Demo Databases

For this tutorial, we not only will use the "Query Eye for the Straight Guy" database, but two new databases. These are based closely on the sample databases found in the MongoDB documentation.

The bookstore inventory database

Consider how to represent the inventory of a bookstore in MongoDB. In a relational database, we would create at least two tables, one for Books and another for Authors. Authors would have a primary key called authorId, and Books will use that as a foreign key.

In a NoSQL database, we want to denormalize this data, so we will store the author's name in the book document, like this:

{title:"Atlas Shrugged",  author:"Rand",   quantity:3, format:"soft",  unitPrice:9.99 }
{title:"The Odyssey",   author:"Homer",  quantity:8, format:"soft",  unitPrice:8.01 }
{title:"Ringworld",   author:"Niven",  quantity:3, format:"audio",  unitPrice:23.99 }
{title:"Iliad",   author:"Homer",  quantity:7, format:"soft",  unitPrice:7.50 }
{title:"The World in Shadow",  author:"Beale",  quantity:2, format:"soft",  unitPrice:6.99 }
{title:"SJWs Always Lie",  author:"Day",   quantity:2, format:"soft",  unitPrice:6.99 }
{title:"Dangerous",   author:"Yiannopoulos",  quantity:2, format:"hard",  unitPrice:18.00 }
{title:"The Fountainhead",  author:"Rand",   quantity:2, format:"soft",  unitPrice:8.99 }
{title:"Tales of Known Space",  author:"Niven",  quantity:6, format:"soft",  unitPrice:6.99 }

Using the MongoDB Shell, we create a new database called "bookstore", and inside that we create a collection called "inventory" that contains the above records:

use bookstore
db.inventory.insertMany([
 {title:"Atlas Shrugged", author:"Rand", quantity:3, format:"soft", unitPrice:9.99 },
 {title:"The Odyssey", author:"Homer", quantity:8, format:"soft", unitPrice:8.01 },
 {title:"Ringworld", author:"Niven", quantity:3, format:"audio", unitPrice:23.99 },
 {title:"Iliad", author:"Homer", quantity:7, format:"soft", unitPrice:7.50 },
 {title:"The World in Shadow", author:"Beale", quantity:2, format:"soft", unitPrice:6.99 },
 {title:"SJWs Always Lie", author:"Day", quantity:2, format:"soft", unitPrice:6.99 },
 {title:"Dangerous", author:"Yiannopoulos", quantity:2, format:"hard", unitPrice:18.00 },
 {title:"The Fountainhead", author:"Rand", quantity:2, format:"soft", unitPrice:8.99 },
 {title:"Tales of Known Space", author:"Niven", quantity:6, format:"soft", unitPrice:6.99 }
]);

The class scores database

We will also need another database that represents student scores:

{name: "Rob", quizzes: [10, 10, 8], labs: [10, 7, 9], midterm:95, final:93 }
{name: "Sam", quizzes: [9, 10, 10], labs: [8, 8, 10], midterm:87, final:90 }
{name: "Zoe", quizzes: [5, 4, 5],   labs: [6, 5, 6],  midterm:75, final:79 }
{name: "Rae", quizzes: [7, 10, 9],  labs: [8, 8, 9],  midterm:89, final:85 }

So, switch to a new database called "class" and populate a collection called "scores" with that data:

use class
db.scores.insertMany([
 {name: "Rob", quizzes: [10, 10, 8], labs: [10, 7, 9], midterm:95, final:93 },
 {name: "Sam", quizzes: [9, 10, 10], labs: [8, 8, 10], midterm:87, final:90 },
 {name: "Zoe", quizzes: [5, 4, 5], labs: [6, 5, 6], midterm:75, final:79 },
 {name: "Rae", quizzes: [7, 10, 9], labs: [8, 8, 9], midterm:89, final:85 }
]);

 

Using the $project Operator to Create Calculated Fields

The $project stage operator allows us to either include or exclude fields, to rename fields, or to create calculated fields. It "reshapes" the document. For each incoming document there is one outgoing document.

To demonstrate this, let's get the total value (quantity * unitPrice) for each book in the bookstore database.

First, switch into the bookstore database:

use bookstore
then run the following:
db.inventory.aggregate(
 {
  $project: 
  {
   title: 1, 
   author: 1,
   _id: 0,
   totalValue: {$multiply: ["$unitPrice", "$quantity"]}
  }
 }
);

This pipeline has exactly one stage, a $project stage, which includes the title and author fields, excludes the _id field, and includes a calculated field. It is not necessary to indent the query like that, it was done for sake of readability.

The results are as expected:

{ "author" : "Rand", "title" : "Atlas Shrugged", "totalValue" : 29.97 }
{ "author" : "Homer", "title" : "The Odyssey", "totalValue" : 64.08 }
{ "author" : "Niven", "title" : "Ringworld", "totalValue" : 71.97 }
{ "author" : "Homer", "title" : "Iliad", "totalValue" : 52.5 }
{ "author" : "Beale", "title" : "The World in Shadow", "totalValue" : 13.98 }
{ "author" : "Day", "title" : "SJWs Always Lie", "totalValue" : 13.98 }
{ "author" : "Yiannopoulos", "title" : "Dangerous", "totalValue" : 36 }
{ "author" : "Rand", "title" : "The Fountainhead", "totalValue" : 17.98 }
{ "author" : "Niven", "title" : "Tales of Known Space", "totalValue" : 41.94 }

Now create another field, a boolean field called "inStock" - this isn't too difficult, all we have to do is check that quantity > 0:

db.inventory.aggregate(
 {
  $project: 
  {
   title: 1, 
   author: 1,
   _id: 0,
   totalValue: {$multiply: ["$unitPrice", "$quantity"]},
   inStock: {$gt: ["$quantity", 0]}
  }
 }
);

The results are:

{ "title" : "Atlas Shrugged", "author" : "Rand", "totalValue" : 29.97, "inStock" : true }
{ "title" : "The Odyssey", "author" : "Homer", "totalValue" : 64.08, "inStock" : true }
{ "title" : "Ringworld", "author" : "Niven", "totalValue" : 71.97, "inStock" : true }
{ "title" : "Iliad", "author" : "Homer", "totalValue" : 52.5, "inStock" : true }
{ "title" : "The World in Shadow", "author" : "Beale", "totalValue" : 13.98, "inStock" : true }
{ "title" : "SJWs Always Lie", "author" : "Day", "totalValue" : 13.98, "inStock" : true }
{ "title" : "Dangerous", "author" : "Yiannopoulos", "totalValue" : 36, "inStock" : true }
{ "title" : "The Fountainhead", "author" : "Rand", "totalValue" : 17.98, "inStock" : true }
{ "title" : "Tales of Known Space", "author" : "Niven", "totalValue" : 41.94, "inStock" : true }

Now suppose that the "SJWs Always Lie" sells out...

db.inventory.update({title : "SJWs Always Lie"}, {$set: {quantity: 0}});

We rerun the query and we see that "SJWs Always Lie" is indeed out of stock:

{ "title" : "Atlas Shrugged", "author" : "Rand", "totalValue" : 29.97, "inStock" : true }
{ "title" : "The Odyssey", "author" : "Homer", "totalValue" : 64.08, "inStock" : true }
{ "title" : "Ringworld", "author" : "Niven", "totalValue" : 71.97, "inStock" : true }
{ "title" : "Iliad", "author" : "Homer", "totalValue" : 52.5, "inStock" : true }
{ "title" : "The World in Shadow", "author" : "Beale", "totalValue" : 13.98, "inStock" : true }
{ "title" : "SJWs Always Lie", "author" : "Day", "totalValue" : 0, "inStock" : false }
{ "title" : "Dangerous", "author" : "Yiannopoulos", "totalValue" : 36, "inStock" : true }
{ "title" : "The Fountainhead", "author" : "Rand", "totalValue" : 17.98, "inStock" : true }
{ "title" : "Tales of Known Space", "author" : "Niven", "totalValue" : 41.94, "inStock" : true }

 

The $project Operator and Accumulators

Now let us see how the accumulators work when they are part of a $projection stage. For this, we will switch to the class database:

use class

We can use the accumulation operators to get some statistics:

db.scores.aggregate([
 {
  $project: 
  {
   name:1, 
   _id: 0,
   quizTotal: {$sum: "$quizzes"}, 
   quizHigh: {$max: "$quizzes"},
   quizLow: {$min: "$quizzes"},
   quizAverage: {$avg: "$quizzes"},
   examTotal: {$sum: ["$midterm", "final"]}
  }
 }
]);

This shows how $sum, $max, $min, and $avg work on arrays, as well as how to use $sum to add two fields. The results are:

{ "name" : "Rob", "quizTotal" : 28, "quizHigh" : 10, "quizLow" : 8, "quizAverage" : 9.333333333333334, "examTotal" : 95 }
{ "name" : "Sam", "quizTotal" : 29, "quizHigh" : 10, "quizLow" : 9, "quizAverage" : 9.666666666666666, "examTotal" : 87 }
{ "name" : "Zoe", "quizTotal" : 14, "quizHigh" : 5, "quizLow" : 4, "quizAverage" : 4.666666666666667, "examTotal" : 75 }
{ "name" : "Rae", "quizTotal" : 26, "quizHigh" : 10, "quizLow" : 7, "quizAverage" : 8.666666666666666, "examTotal" : 89 }

 

The $sort Operator

The $sort operator works as expected. Let's try it with the previous query to see our first example of a 2-stage pipeline:

db.scores.aggregate([
       {
              $project: 
              {
                     name:1, 
                     _id: 0,
                     quizTotal: {$sum: "$quizzes"}, 
                     quizHigh: {$max: "$quizzes"},
                     quizLow: {$min: "$quizzes"},
                     quizAverage: {$avg: "$quizzes"},
                     examTotal: {$sum:["$midterm", "final"]}
              }
       }, 
       {
              $sort: 
              {
                     name: 1
              }
       }
]);

This pipeline has two stages, the $project stage and the $sort stage. The results are as follows:

{ "name" : "Rae", "quizTotal" : 26, "quizHigh" : 10, "quizLow" : 7, "quizAverage" : 8.666666666666666, "examTotal" : 89 }
{ "name" : "Rob", "quizTotal" : 28, "quizHigh" : 10, "quizLow" : 8, "quizAverage" : 9.333333333333334, "examTotal" : 95 }
{ "name" : "Sam", "quizTotal" : 29, "quizHigh" : 10, "quizLow" : 9, "quizAverage" : 9.666666666666666, "examTotal" : 87 }
{ "name" : "Zoe", "quizTotal" : 14, "quizHigh" : 5, "quizLow" : 4, "quizAverage" : 4.666666666666667, "examTotal" : 75 }

We will see how to use $project to rename fields in a later example.

 

The $match Operator

The $match operator does what it says: if a document matches the given condition(s), it is returned; otherwise it is not. So, the $match operator outputs either zero or one document for each incoming document.

For the first couple of examples, we switch to the "Queer Eye for the Straight Guy" database:

use qeftsg

Say we want to get the names and ages of all cast members who are over 40. We do it like this:

db.cast.aggregate([
 {$match: {age: {$gt: 40}}}, 
 {$project: {name:1, age:1, _id:0}}
]);

This two stage pipeline consists of a $match stage and a $project stage. The results are:

{ "name" : "Ted Allen", "age" : 52 }
{ "name" : "Carson Kressley", "age" : 47 }
{ "name" : "Kyan Douglas", "age" : 47 }
{ "name" : "Thom Filicia", "age" : 48 }

Let's make that into a 3-stage pipeline by adding a $sort stage:

db.cast.aggregate([
 {$match: {age: {$gt: 40}}}, 
 {$project: {name:1, age:1, _id:0}}, 
 {$sort: {name: 1}}
]);

This returns:

{ "name" : "Carson Kressley", "age" : 47 }
{ "name" : "Kyan Douglas", "age" : 47 }
{ "name" : "Ted Allen", "age" : 52 }
{ "name" : "Thom Filicia", "age" : 48 }

 

The $count Operator

The $count operator does exactly what it says: it returns the number of incoming docs in the current stage.

For example, to get the number of "Queer Eye" cast members that are over 40, add a forth stage to the previous pipeline:

db.cast.aggregate([
 {$match: {age: {$gt: 40}}}, 
 {$project: {name:1, age:1, _id:0}}, 
 {$sort: {name: 1}}, 
 {$count: "docCount"}
]);
This returns:
{ "docCount" : 4 }

Of course, this pipeline is too complicated: the $project and the $sort stages do not alter the number of records coming from the $match stage! So, the pipeline can be simplified into a two stage pipeline:

db.cast.aggregate([
 {$match: {age: {$gt: 40}}}, 
 {$count: "docCount"}
]);

And it returns the same thing:

{ "docCount" : 4 }

 

The $unwind Operator

The $unwind operator creates one output document for each element in an array field in the incoming document. For example, let's make one document for each of Kyan Douglas' specialties, and only show his name and that specialty:

db.cast.aggregate([
 {$match: {name: "Kyan Douglas"}}, 
 {$unwind: "$specialties"}, 
 {$project: {name:1, specialties:1, _id:0}}
]);

The result is:

{ "name" : "Kyan Douglas", "specialties" : "Hair" }
{ "name" : "Kyan Douglas", "specialties" : "Grooming" }
{ "name" : "Kyan Douglas", "specialties" : "Personal Hygiene" }
{ "name" : "Kyan Douglas", "specialties" : "Makeup" }

 

The $group Operator

The group operator takes the incoming documents and groups them by creating a new _id field. The value we provide to the _id field gives us the different groups. It also applies aggregation operators to each group. The output is one document per group.

The next examples will use the bookstore database, so we switch to it:

use bookstore

Here is how to get a list of distinct authors, sorted alphabetically:

db.inventory.aggregate([
 {$group: {_id: "$author"}}, 
 {$sort: {_id: 1}}
]);

This returns:

{ "_id" : "Beale" }
{ "_id" : "Day" }
{ "_id" : "Homer" }
{ "_id" : "Niven" }
{ "_id" : "Rand" }
{ "_id" : "Yiannopoulos" }

The grouping was done in the first stage, and the group names are stored in a new _id field.

Now, let's use $project to produce a new field called "author":

db.inventory.aggregate([
 {$group: {_id: "$author"}}, 
 {$project: {author: "$_id"}}
]);
This returns
{ "_id" : "Yiannopoulos", "author" : "Yiannopoulos" }
{ "_id" : "Rand", "author" : "Rand" }
{ "_id" : "Day", "author" : "Day" }
{ "_id" : "Beale", "author" : "Beale" }
{ "_id" : "Niven", "author" : "Niven" }
{ "_id" : "Homer", "author" : "Homer" }

Then, we drop the _id field:

db.inventory.aggregate([
 {$group: {_id: "$author"}}, 
 {$project: {author: "$_id", _id: 0}}
]);

and we get:

{ "author" : "Yiannopoulos" }
{ "author" : "Rand" }
{ "author" : "Day" }
{ "author" : "Beale" }
{ "author" : "Niven" }
{ "author" : "Homer" }

Thus we have essentially renamed the _id field.

Finally, we sort by author:

db.inventory.aggregate([
 {$group: {_id: "$author"}}, 
 {$project: {author: "$_id", _id: 0}}, 
 {$sort:{author: 1}}
]);
Results:
{ "author" : "Beale" }
{ "author" : "Day" }
{ "author" : "Homer" }
{ "author" : "Niven" }
{ "author" : "Rand" }
{ "author" : "Yiannopoulos" }

 

The $group Operator with Accumulators

For each group stage, we can apply accumulators. Accumulators work differently when in a group stage than in a project stage. One accumulator is $push, which creates an array field in the output document and pushes a specified value from each document onto that array. Let us get the titles of the books written by each author:

db.inventory.aggregate([
 {$group: {_id: "$author", books: {$push:"$title"}}}, 
 {$sort: {_id: 1}}
]);
This returns:
{ "_id" : "Beale", "books" : [ "The World in Shadow" ] }
{ "_id" : "Day", "books" : [ "SJWs Always Lie" ] }
{ "_id" : "Homer", "books" : [ "The Odyssey", "Iliad" ] }
{ "_id" : "Niven", "books" : [ "Ringworld", "Tales of Known Space" ] }
{ "_id" : "Rand", "books" : [ "Atlas Shrugged", "The Fountainhead" ] }
{ "_id" : "Yiannopoulos", "books" : [ "Dangerous" ] }

Like mentioned earlier, the $sum operator behaves differently in groups then it does in projects. The $sum operator maintains a running total when used in a group stage. We will use $sum to count the titles by each author by adding one to a field named numTitles for each record:

db.inventory.aggregate([
 {$group: {_id:"$author", numTitles: {$sum: 1}}}, 
 {$project: {author: "$_id", _id: 0, numTitles: 1}}, 
 {$sort: {author: 1}}
]);
Result:
{ "numTitles" : 1, "author" : "Beale" }
{ "numTitles" : 1, "author" : "Day" }
{ "numTitles" : 2, "author" : "Homer" }
{ "numTitles" : 2, "author" : "Niven" }
{ "numTitles" : 2, "author" : "Rand" }
{ "numTitles" : 1, "author" : "Yiannopoulos" }

Now let's put it all together...

We want to find the total price of all books in stock, plus the titles, grouped by author. We approach this as follows:

  1. Find the totalValue for each title (price * quantity)
  2. Group the docs by author, while keeping a running total of totalValue, and create the books array by pushing each title
  3. Sort by author ascending
  4. Return only author, titles, and authorValue
db.inventory.aggregate([
 {$project: {title: 1, author: 1, _id: 0, totalValue: {$multiply: ["$unitPrice", "$quantity"]}}},
 {$group: {_id:"$author", runningTotal: {$sum: "$totalValue"}, books: {$push:"$title"}}},
 {$project: {author: "$_id", _id: 0, runningTotal: 1, books: 1}},
 {$sort: {author: 1}}
]);
Results:
{ "runningTotal" : 13.98, "books" : [ "The World in Shadow" ], "author" : "Beale" }
{ "runningTotal" : 0, "books" : [ "SJWs Always Lie" ], "author" : "Day" }
{ "runningTotal" : 116.58, "books" : [ "The Odyssey", "Iliad" ], "author" : "Homer" }
{ "runningTotal" : 113.91, "books" : [ "Ringworld", "Tales of Known Space" ], "author" : "Niven" }
{ "runningTotal" : 47.95, "books" : [ "Atlas Shrugged", "The Fountainhead" ], "author" : "Rand" }
{ "runningTotal" : 36, "books" : [ "Dangerous" ], "author" : "Yiannopoulos" }

Note that we don't need that "totalValue" field in the final result. This allows us to drop the first projection and include it in the $group stage:

db.inventory.aggregate([
 {$group: {_id: "$author", runningTotal: {$sum: {$multiply: ["$unitPrice", "$quantity"]}}, books: {$push: "$title"}}},
 {$project: {author: "$_id", _id:0, runningTotal: 1, books: 1}},
 {$sort: {author: 1}}
]);
The result is the same.

 

Conclusion

So that is MongoDB's data aggregation pipeline. As you can imagine, pipelines be very memory-intensive when applied to large datasets. There are various methods to mitigate this, such as performing $match operations earlier rather than later. Ultimately, the allowDiskUse option will have MongoDB write intermediate results of the pipeline to temporary files, thus avoiding memory limitations.

2 comments: