Profile Photo

CSV file parsing in batches | using Stream

Created on: Aug 16, 2024

Suppose we have a csv file in below format

Name,Age,Email Alice,30,alice@example.com Bob,20,bob@example.com Charlie,25,charlie@example.com

Number of rows in csv file will be 30 and we have to process the records in batch of size 5 and change lowercasing of name to uppercase.

Firstly we will create readstream from input file.

const readStream = createReadStream(inputFilePath);

Then create a class extending Transform. A Transform stream is a specific type of Duplex stream where the output is computed based on the input. We can write custom implementation of transformation logic by overriding _transform(). The _flush method is called just before the stream is finished, and it provides an opportunity to process and push any remaining data that hasn't been handled in the _transform method.

Below is full code for batch transform which is self explanatory.

import { createReadStream, createWriteStream } from "fs"; import { Transform } from "stream"; import csvParser from "csv-parser"; interface User { Name: string; Age: number; Email: string; } const inputFilePath = "./data/sample-users.csv"; const outputFilePath = "./data/transformed-users.csv"; const chunkSize = 5; class BatchTransform extends Transform { private currentBatch: User[] = []; private writeStream = createWriteStream(outputFilePath, { flags: "a" }); constructor() { super({ objectMode: true, highWaterMark: chunkSize }); this.writeStream.write("Name,Age,Email\n"); } _transform(chunk: any, encoding: BufferEncoding, callback: Function) { const user: User = { Name: chunk.Name, Age: parseInt(chunk.Age, 10), Email: chunk.Email, }; this.currentBatch.push(user); if (this.currentBatch.length >= chunkSize) { const processedBatch = this.currentBatch.map((user) => { return { ...user, Name: user.Name.toUpperCase(), }; }); this._writeBatch(processedBatch); this.currentBatch = []; } callback(); } _flush(callback: Function) { if (this.currentBatch.length > 0) { const processedBatch = this.currentBatch.map((user) => { return { ...user, Name: user.Name.toUpperCase(), }; }); this._writeBatch(processedBatch); } this.writeStream.end(() => { console.log("Finished writing all batches to the CSV file."); callback(); }); } private _writeBatch(batch: User[]) { console.log(`Processing and writing batch of ${batch.length} records`); batch.forEach((user) => { const csvLine = `${user.Name},${user.Age},${user.Email}\n`; this.writeStream.write(csvLine); }); } } const readStream = createReadStream(inputFilePath); const transformStream = new BatchTransform(); readStream .pipe(csvParser()) .pipe(transformStream) .on("error", (err) => { console.error("Error during processing:", err); });

You can find full code in my github.