Node.js async in practice: When to use what?
Updated May 2016.
When I started out using Node.js and Async.js I didn’t find any good/thorough resources on how to really use the Async module. That’s why I decided to make a little cookbook about it.
What is Async solving? An antipattern
Async and other similar Node.js control flow modules seek to simplify code such as this:
//DON'T DO THIS AT HOME!
app.get('/user/:userId', function(req, res, next) {
var locals = {};
var userId = req.params.userId;
var callbackCounter = 0;
var gotError = false;
db.get('users', userId, function(err, user) {
if (gotError) {
return;
}
if (err) {
gotError = true;
return next(err);
}
locals.user = {
name: user.name,
email: user.email,
bio: user.bio
};
callbackCounter++;
if (callbackCounter == 2) {
res.render('user-profile', locals);
}
});
db.query('posts', {userId: userId}, function(err, posts) {
if (gotError) {
return;
}
if (err) {
gotError = true;
return next(err);
}
locals.posts = posts;
callbackCounter++;
if (callbackCounter == 2) {
res.render('user-profile', locals);
}
});
});
Since the two db calls are asynchronous we don’t know which one of them is going to finish first. So we have to use callbackCounter
to keep track of how many tasks have finished. If an error occurs we also have to handle this in a special way for each task. And we have code duplication.
And what happens when we need to add another asynchronous task? Then we need to change if (callbackCounter == 2)
to if (callbackCounter == 3)
, which won’t be fun to maintain in the long run.
This is where Async comes to our aid and makes the code sane to look at and easy to maintain. In this post I’ll give you some pointers on how how to use Async in real life.
Important note about callbacks and errors
One thing that wasn’t obvious to me when I first looked at Async, was the way callbacks are used.
Generally all the Async functions take a set of tasks to perform as argument. These tasks can for example be an array of functions, or a collection to iterate over. Each task is given a callback function, let’s call this the task callback. This callback must be called when the task is completed, e.g. after an asynchronous call to the database has completed.
Besides the set of tasks the Async functions also take a callback function as argument themselves. Let’s call this the final callback. The final callback is called when all tasks have completed, i.e. called their respective task callback functions.
Example:
async.parallel([
function(callback) { //This is the first task, and `callback` is its callback task
db.save('xxx', 'a', function(err) {
//Now we have saved to the DB, so let's tell Async that this task is done
callback();
});
},
function(callback) { //This is the second task, and `callback` is its callback task
db.save('xxx', 'b', callback); //Since we don't do anything interesting in db.save()'s callback, we might as well just pass in the task callback
}
], function(err) { //This is the final callback
console.log('Both a and b are saved now');
});
If a task encounters an error, the best thing is to call the task callback with the error object as the first argument.
When a task calls back with an error, the final callback will be called immediately with the error object, and no more outstanding tasks will be initiated.
Example:
async.parallel([
function(callback) {
db.save('xxx', 'a', function(err) {
if (err) {
callback(err);
//It's important to return so that `callback` isn't called twice
return;
}
callback();
});
},
function(callback) {
//If we just pass in the task callback, it will automatically be called with an error, if the db.save() call fails
db.save('xxx', 'b', callback);
}
], function(err) {
if (err) {
//Handle the error in some way. Here we simply throw it
//Other options: pass it on to an outer callback, log it etc.
throw err;
}
console.log('Both a and b are saved now');
});
These 4 lines of error handling gets pretty tedious:
if (err) {
callback(err);
return;
}
So I prefer to put those lines on one line, as in:
if (err) return callback(err);
Note about modules used implicitly in this post
In this post’s examples I’m using some Node modules implicitly.
async
(of course). In your own script you should usevar async = require('async');
to include it.- Express.js is used as an http server.
db
is a fictionary database module. It hasdb.get(bucket, key, callback)
anddb.query(bucket, properties, callback)
methods that are supposed to work like any normal NoSQL database.- Lodash used as
_
.
Now let’s get on to the interesting stuff!
I need to run multiple tasks that doesn’t depend on each other and when they all finish do something else
Then you should use async.parallel. The signature is async.parallel(tasks, callback)
, where tasks
is an array of functions. It will immediately run all the functions in parallel, wait for all of them to call their task callback, and finally when all tasks are complete it will run callback
(the final callback).
An example could be to load a forum user’s profile with their details and a list of all their posts.
As input we get the user’s ID, so we can easily get both user details and posts independently of each other (in parallel).
app.get('/user/:userId', function(req, res, next) {
var locals = {};
var userId = req.params.userId;
async.parallel([
//Load user
function(callback) {
db.get('users', userId, function(err, user) {
if (err) return callback(err);
locals.user = {
name: user.name,
email: user.email,
bio: user.bio
};
callback();
});
},
//Load posts
function(callback) {
db.query('posts', {userId: userId}, function(err, posts) {
if (err) return callback(err);
locals.posts = posts;
callback();
});
}
], function(err) { //This function gets called after the two tasks have called their "task callbacks"
if (err) return next(err); //If an error occurred, we let express handle it by calling the `next` function
//Here `locals` will be an object with `user` and `posts` keys
//Example: `locals = {user: ..., posts: [...]}`
res.render('user-profile', locals);
});
});
If you have more than two tasks to run, you just add to the tasks array (the first argument to async.parallel
).
Note that in a real life application you would probably want to modularize your functions a bit more. Instead of using two anonymous functions to get a user and load a user’s posts respectively, you could import two modules called getUser
and getUserPosts
. That would tidy up the code a lot. I’ll expand on this in a future post. This post focuses on explaining the workings of async.js with as few distractions as possible.
I need to run multiple tasks that depends on each other and when they all finish do something else
Then you should use async.series. The signature is async.series(tasks, callback)
, where tasks
is an array of functions. It will run one function at a time, wait for it to call its task callback, and finally when all tasks are complete it will run callback
(the final callback).
Again we will use the forum user example.
This time we get the user’s name as input, but our data model is the same as before. This means that we need to find the user’s id based on their name before we can load the posts. This means we can’t run it in parallel as we did before.
app.get('/user/:name', function(req, res, next) {
var locals = {};
var name = req.params.name;
var userId; //Define `userId` out here, so both tasks can access the variable
async.series([
//Load user to get `userId` first
function(callback) {
db.query('users', {name: name}, function(err, users) {
if (err) return callback(err);
//Check that a user was found
if (users.length == 0) {
return callback(new Error('No user with name '+name+' found.'));
}
var user = users[0];
userId = user.id; //Set the `userId` here, so the next task can access it
locals.user = {
name: user.name,
email: user.email,
bio: user.bio
};
callback();
});
},
//Load posts (won't be called before task 1's "task callback" has been called)
function(callback) {
db.query('posts', {userId: userId}, function(err, posts) {
if (err) return callback(err);
locals.posts = posts;
callback();
});
}
], function(err) { //This function gets called after the two tasks have called their "task callbacks"
if (err) return next(err);
//Here locals will be populated with `user` and `posts`
//Just like in the previous example
res.render('user-profile', locals);
});
});
In this example you don’t gain that much from using async.series, since you only have two tasks to run. The above example could (depending on your taste) be simplified to the following using nesting:
app.get('/user/:name', function(req, res, next) {
var name = req.params.name;
//Get user by name
db.query('users', {name: name}, function(err, users) {
if (err) return next(err);
if (users.length == 0) {
return callback(new Error('No user with name '+name+' found.'));
}
var user = users[0];
//Load user's posts
db.query('posts', {userId: user.id}, function(err, posts) {
if (err) return next(err);
locals.posts = posts;
//We're done and can render the template to the client
res.render('user-profile', {
user: {
name: user.name,
email: user.email,
bio: user.bio
},
posts: posts
});
});
});
});
But what happens when you suddenly need to run 3 different tasks? Or even more? Then you end up cooking callback spaghetti (bad).
IMO when you have two levels you can use either solution. If you have more, always go with async.series
.
I need to iterate over a collection, perform an asynchronous task for each item, and when they’re all done do something else
Then you use async.forEach. The signature is async.forEach(items, task, callback)
. items
is the collection you want to iterate over and task
is the function to call for each item in items
. Async will immediately call task
with each item in items
as the first argument. All tasks are run in parallel. Example: task(item[0])
, task(item[1])
… task(item[n])
. Once all tasks complete the final callback
will be called.
An example could be a webservice where you support deleting multiple messages in one request. You get the message IDs as a comma separated string in the URL. Each deletion requires a separate call to the database. When all deletions have completed you want to reply the user with a response.
app.delete('/messages/:messageIds', function(req, res, next) {
var messageIds = req.params.messageIds.split(',');
async.forEach(messageIds, function(messageId, callback) { //The second argument, `callback`, is the "task callback" for a specific `messageId`
//When the db has deleted the item it will call the "task callback"
//This way async knows which items in the collection have finished
db.delete('messages', messageId, callback);
}, function(err) {
if (err) return next(err);
//Tell the user about the great success
res.json({
success: true,
message: messageIds.length+' message(s) was deleted.'
});
});
});
Bonus tip: If you ever need to iterate over an object, there is an easy way too. You just use Object.keys(o)
(or _.keys(o)
if you prefer Lodash or Underscore.js) on the object, and iterate over the keys. Example:
var trafficLightActions = {
red: 'Stop',
yellow: 'Wait',
green: 'Go'
}
async.forEach(Object.keys(trafficLightActions), function(color, callback) {
var action = trafficLightActions[color];
//Play around with the color and action
}, function(err) {
//When done
});
I need to iterate over a collection, perform an asynchronous task for each item, but only let x tasks run at the same time, and when they’re all done do something else
But what if your database only allows a limited number of connections at a time, and your user might delete thousands of messages in a single request? Then you use async.forEach
’s sibling async.forEachLimit.
The signature is async.forEachLimit(items, concurrency, task, callback)
. It works almost like async.forEach
except that it doesn’t run task
for all items immediately in parallel. The concurrency
value is an integer that tells Async how many tasks are allowed to run simultaneously. Let’s say that our database only allows 5 connections at a time, then we simply change our code to:
app.delete('/messages/:messageIds', function(req, res, next) {
var messageIds = req.params.messageIds.split(',');
//`5` is the `concurrency` argument here
// ----------------------------↴
async.forEachLimit(messageIds, 5, function(messageId, callback) {
db.delete('messages', messageId, callback);
}, function(err) {
if (err) return next(err);
res.json({
success: true,
message: messageIds.length+' message(s) was deleted.'
});
});
});
If you are working with large collections it’s normally a good idea to use async.forEachLimit
in favor of async.forEach
to throttle i/o resources.
I need to iterate over a collection, perform an asynchronous task for one item at a time, and when they’re all done do something else
The third async.forEach
sibling is async.forEachSeries, which does the same as async.forEachLimit
with a concurrency
of 1
. The signature is async.forEachSeries(items, task, callback)
, and it simply handles each item in items
serially, or one at a time.
You can use this if it’s important that the task of one item finishes before the task of the next one is started. One example could be if each task depends on some result from the previous task. Or if each task share some state or external service that does not handle multiple clients.
I need to perform an arbitrary set of asynchronous tasks
Then you should use async.queue.
The syntax of async.queue
is a little different than the other functions. The signature is async.queue(task, concurrency)
.
The task
function itself should take two arguments. The first is the task to be performed. This can be anything that the function can use to perform its task. Second argument is a callback, which will be the task callback if we use the same terminology as earlier, that should be called when the task is done.
The concurrency
value is just like the one from async.forEachLimit, i.e. it limits how many tasks can be executed at a time.
async.queue
returns an object where you can push tasks to, using queue.push(task)
. Read about the other properties of the object on the Github page. The most useful property is drain
. If you set this to a function it will be called every time the queue’s last task has been processed, which is very useful for performing an action when queue processing is done. Think of it as async.queue
’s final callback.
A good example of using a queue is when your input is streamed from another source, which makes it difficult to use async.forEach
. An example could be to copy all objects from one AWS S3 bucket to another. Since AWS only lets you list 1000 objects at a time, you can’t get a single array with all object names from the source bucket at once. You have to list 1000 objects at a time, and supply the last object name from the previous response as the marker in the next request (just like pagination). You could choose to load all object names into a single array first, but then you’d have to list all objects, and not until they’ve all been listed you can start copying - but that would be a terrible waste of valuable time. Nor would it scale if you had billions of S3 files.
A smarter way is to set up an async.queue
, and add object names to the queue as we get them from the list. As I said, a queue task can be anything. In this case an S3 object name is a task.
Let’s get some code on the table. In this example I’m using the API of Apps Attic’s awssum module for AWS services (awesome name by the way).
//Prepare S3 access and bucket names
var awssum = require('awssum');
var s3 = new awssum.load('amazon/s3').S3({
accessKeyId: '...',
secretAccessKey: '..',
});
var sourceBucket = 'old-bucket';
var destinationBucket = 'new-bucket';
var listObjectsDone = false;
//Set up our queue
var queue = async.queue(function(objectName, callback) {
//This is the queue's `task` function
//It copies `objectName` from `sourceBucket` to `destinationBucket`
var options = {
BucketName: destinationBucket,
ObjectName: objectName,
SourceBucket: sourceBucket,
SourceObject: objectName
};
s3.CopyObject(options, function(err) {
if (err) throw err;
callback(); //Tell async that this queue item has been processed
});
}, 20); //Only allow 20 copy requests at a time so we don't flood the network
//When the queue is emptied we want to check if we're done
queue.drain = function() {
checkDone();
};
//Define the function that lists objects from the source bucket
//It gets the current `marker` as its argument
function listObjects(marker) {
var options = {
BucketName: sourceBucket,
Marker: marker,
MaxKeys: 1000
};
s3.ListObjects(options, function(err, data) {
if (err) throw err;
var result = data.Body.ListBucketResult;
var contents = _.isArray(result.Contents) ? result.Contents : [result.Contents]; //AWS sends an array if multiple, and a single object if there was only one result
_.each(contents, function(item) {
var objectName = item.Key;
marker = objectName; //Save the marker
queue.push(objectName); //Push the object to our queue
});
if (result.IsTruncated == 'true') {
//The result is truncated, i.e. we have to list once more, starting from the new marker
listObjects(marker);
} else {
//Tell our routine that we don't need to wait for more objects from S3
listObjectsDone = true;
//Check if we're done (is the queue empty?)
checkDone();
}
});
}
/*
This function gets called when:
a) `listObjects` didn't return a truncated result (because we were at the end of the bucket)
b) when the last task of the queue is finished
*/
function checkDone() {
if (queue.length() == 0 && listObjectsDone) {
console.log('Tada! All objects have been copied :)');
}
}
//Start the routine by calling `listObjects` with `null` as the `marker`
listObjects(null);
Note that the queue can be drained multiple times, and thereby call queue.drain()
multiple times. This would for example happen if our copy requests finished much faster than each list operation. That’s why we have the listObjectsDone
boolean. Even if the queue is empty we’re not done until this variable gets set to true
.
One missing feature of async.queue
that you should be aware of is that the task callbacks do not support being passed an error as its first argument. Check out this example:
var counter = 0;
var queue = async.queue(function(shouldFail, callback) {
counter++;
console.log(counter);
if (shouldFail) {
callback(new Error('An error just for fun.')); //Nobody will handle this error
} else {
callback();
}
}, 1);
queue.push(false); //Should not fail
queue.push(true); //Should fail
queue.push(false); //Should not fail
I would expect this to print out 1 and 2, and then I would see the error somewhere. But it will print out 1, 2 and 3.
This is a good feature when you think about it though. What would we want to have happen in our S3 copy example above if a single copy operation failed? Would we want the queue to stop automatically and halt our job midways? No, not really. Depending on your use case you should handle the error accordingly. If an S3 copy operation fails fx, you could retry it a maximum of 5 times, and then give up and just don’t copy it, but let the job continue anyway.
Combination: I need to perform some parallel tasks, some serial tasks and iterate over a collection performing an asynchronous task for each item
Then you use a combination of async.parallel, async.series, and async.forEach.
An example could be to load a forum user by name, their posts, and their photos. If we have the same data model as before we need to look up the user’s id based on name before we can load their posts and photos (which are both stored using the userId
) in parallel. Moreover we also have to check that each of the photos exist on disk.
app.get('/user/:name', function(req, res, next) {
var locals = {};
var name = req.params.name;
var userId;
async.series([
//Load user to get `userId` first
function(callback) {
db.query('users', {name: name}, function(err, users) {
if (err) return callback(err);
//Check that a user was found
if (users.length == 0) {
return callback(new Error('No user with name '+name+' found.'));
}
var user = users[0];
userId = user.id; //Set the `userId` here, so the next tasks can access it
locals.user = {
name: user.name,
email: user.email,
bio: user.bio
};
callback();
});
},
//Load posts and photos in parallel (won't be called before task 1's "task callback" has been called)
function(callback) {
async.parallel([
//Load posts
function(callback) {
db.query('posts', {userId: userId}, function(err, posts) {
if (err) return callback(err);
locals.posts = posts;
callback();
});
},
//Load photos
function(callback) {
db.query('photos', {userId: userId}, function(err, photos) {
if (err) return callback(err);
locals.photos = [];
//Iterate over each photo
async.forEach(photos, function(photo, callback) {
fs.exists(photo.path, function(exists) {
//Only add the photo to locals.photos if it exists on disk
if (exists) {
locals.photos.push(photo);
}
callback();
});
}, callback);
});
}
], callback); //Remember to put in the second series task's "task callback" as the "final callback" for the async.parallel operation
}
], function(err) { //This function gets called after the two series tasks have called their "task callbacks"
if (err) return next(err);
//Here `locals` will be populated with `user`, `photos` and `photos`
res.render('user-profile', locals);
});
});
You can nest and combine async.parallel
and async.series
as crazy as you want. A good trick when you find yourself nesting too deep is to divide the code into multiple functions (as mentioned earlier). The above example could be changed to this:
app.get('/user/:name', function(req, res, next) {
var locals = {};
var name = req.params.name;
var userId;
async.series([
//Load user
function(callback) {
loadUserByName(name, function(err, user) {
if (err) return callback(err);
userId = user.id;
locals.user = user;
callback();
});
},
function(callback) {
async.parallel([
//Load posts
function(callback) {
loadPostsByUserId(userId, function(err, posts) {
if (err) return callback(err);
locals.posts = posts;
callback();
});
},
//Load photos
function(callback) {
loadPhotosByUserId(userId, function(err, photos) {
if (err) return callback(err);
locals.photos = photos;
callback();
});
}
], callback);
}
], function(err) {
if (err) return next(err);
res.render('user-profile', locals);
});
});
function loadUserByName(name, callback) {
db.query('users', {name: name}, function(err, users) {
if (err) return callback(err);
//Check that a user was found
if (users.length == 0) {
return callback(new Error('No user with name '+name+' found.'));
}
callback(null, users[0]);
});
}
function loadPostsByUserId(userId, callback) {
db.query('posts', {userId: userId}, function(err, posts) {
if (err) return callback(err);
callback(null, posts);
});
}
function loadPhotosByUserId(userId, callback) {
db.query('photos', {userId: userId}, function(err, photos) {
if (err) return callback(err);
var photos = [];
async.forEach(photos, function(photo, callback) {
fs.exists(photo.path, function(exists) {
if (exists) {
photos.push(photo);
}
callback();
});
}, function(err) {
if (err) return callback(err);
callback(null, photos);
});
});
}
This makes your code look more “flat” and less nested. The main logic in the app.get('/user/:name'...
part looks much more readable, since each of the functions nicely describes what it’s supposed to do. There are more tricks to improve your code even further, which I hope to elaborate on in future posts.
That’s all I had to say about that. Big thanks to Caolan McMahon for making this fantastic Node.js module.
2017 Bonus Section: Nice looking callbacks using ES6
When writing async JavaScript codes you must deal with a lot of callbacks resulting in code full of function
keywords all over. Using the new ES6 arrow functions syntax (=>
) you can make your code look even cleaner and more concise. in ES6 function(a) { doSomething(a) }
can be written as a => doSomething(a)
. Take this example:
async.parallel([
function(callback) {
db.save('xxx', 'a', callback)
},
function(callback) {
db.save('xxx', 'b', callback)
}
], function(err) {
if (err) throw err
console.log('Both a and b are saved now')
})
With ES6, much shorter:
async.parallel([
callback => db.save('xxx', 'a', callback),
callback => db.save('xxx', 'b', callback)
], err => {
if (err) throw err
console.log('Both a and b are saved now')
})
If you need to brush up on the newest points of JavaScript syntax, I recommend the popular ES6 for Everyone course (with videos) by Wes Bos. It takes you through arrow functions, default arguments, destructuring, template strings and much more.