Node.js Streams and Promises

Node.js Streams and Promises

Tags
Javascript
Published
May 28, 2022
Related Tools

I have been working on a project that requires reading large .csv files from the local file system and then working with the data. Node.js has some great tools for working with this, namely streams, event emitters, and the readline native modules. However, all of the example code/tutorials fell into one of three categories:

  • Print the data to the console (not useful)
  • Write the data to a file
  • Push the incoming data to an outside array
  • Use an external library

I started by using the external library csv-parser. However, since it is basically a wrapper around the base Node.js technologies I listed above I had the same problems working with my data that I will list below. I eventually uninstalled it and wrote my own light-weight version.

Background

The readdline module provides an interface for reading data from a Readable stream...one line at a time.

from Node.js Documentation

All streams are instances of EventEmitter.

from Node.js Documentation

Basically, working with streams means listening for events with your data. And since the .on method of an EventEmitter expects a callback, everything you want to do next needs to happen in that callback. The readline module gives you the line event to listen for.

Solution #1

At first I tried the "push the incoming data to an outside array" approach.

const incomingData = [];

rl.on('line', data => [
  incomingData.push(data);
])
  .on('close', () => {
    // do something with incomingData
  });

This solution will actually work – if you are only reading one file. Unfortunately, I need to loop through a directory of files and read each one, and then do something with the data. I tried all sorts of things with counters and what not, but kept running into race conditions with the loops and what needed to happen next. So this solution has limited usefulness for my purposes.

Solution #2

This solution actually came from a member of my local code mentoring meetup (shout out to Austin Code Mentorship!) and uses Promises.

First, I created a JavaScript class for my various .csv needs.

const fs = require('fs');
const readline = require('readline');
const path = require('path');

class CSVHelpers {
  constructor () {
    super();
  }

  /**
   * @param  {string} filePath
   * @return {promise} Array of row objects. Key: header, value: field value
   */
  read (filePath) {
    return new Promise ((resolve, reject) => {
      try {
        const reader = this._createReadStream(filePath);
        let rows = [];
        let headers = null;

        reader.on('line', row => {
          if (headers === null) {
            headers = row.split(',');
          } else {
            const rowArray = row.split(',');
            const rowObject = {};
            rowArray.forEach((item, index) => {
              rowObject[headers[index]] = item;
            });

            rows.push(rowObject);
          }
        })
          .on('close', () => {
            resolve({
              rows,
              file: filePath
            });
          });
      } catch (error) {
        reject(error);
      }
    });
  }

  /**
   * @param  {type} filePath
   * @return {type} Readline event emitter
   */
  _createReadStream (filePath) {
    const fd = fs.openSync(path.resolve(filePath));
    const fileStream = fs.createReadStream(path.resolve(filePath), {fd});
    return readline.createInterface({
      input: fileStream
    });
  }
}

module.exports = CSVHelpers;

Then in my code:

const csv = new CSVHelpers();
const dataFiles = fs.readdirSync(<pathToDirectory);

const filePromises = dataFiles.map(file => {
  return csv.read(<pathToFile>);
});

Promise.all(filePromises)
  .then(values => {
    // do something with the values.
  });

This Promise approach means I don't need to bother to try next loops or callbacks.

Conclusion

I do not know if this is the best solution but it works for my use case, and solves the race conditions I was having. If you have a better way to solve the problem, please reach out and let me know!