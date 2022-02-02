bellboy

Highly performant JavaScript data stream ETL engine.

How it works?

Bellboy streams input data row by row. Every row, in turn, goes through user-defined function where it can be transformed. When enough data is collected in batch, it is being loaded to destination.

Installation

Before install, make sure you are using latest version of Node.js.

npm install bellboy

Example

This example shows how bellboy can extract rows from the Excel file, modify it on the fly, load to the Postgres database, move processed file to the other folder and process remaining files.

Just in five simple steps.

const bellboy = require ( "bellboy" ); const fs = require ( "fs" ); const path = require ( "path" ); ( async ( ) => { const srcPath = `C:/source` ; const processor = new bellboy.ExcelProcessor({ path : srcPath, hasHeader : true , }); const destination = new bellboy.PostgresDestination({ connection : { user : "user" , password : "password" , host : "localhost" , database : "bellboy" , }, table : "stats" , recordGenerator : async function * ( record ) { yield { ...record.raw.obj, status : "done" , }; }, }); const job = new bellboy.Job(processor, [destination]); job.on( "endProcessingStream" , async (file) => { const filePath = path.join(srcPath, file); const newFilePath = path.join( `./destination` , file); await fs.renameSync(filePath, newFilePath); }); job.onAny( async (eventName, ...args) => { if (eventName.includes( "Error" )) { console .log(args); } }); await job.run(); })();

Jobs

A job in bellboy is a relationship link between processor and destinations. When the job is run, data processing and loading mechanism will be started.

Initialization

To initialize a Job instance, pass processor and some destination(s).

const job = new bellboy.Job( processor_instance, [destination_instance], (job_options = {}) );

Options

reporters Reporter[] \ Array of reporters.

\ Array of reporters. jobName string \ Optional user-defined name of the job. Can become handy if used in combination with extended events to distinguish events from different jobs.

Instance methods

run async function() \ Starts processing data.

\ Starts processing data. on function(event, async function listener) \ Add specific event listener.

\ Add specific event listener. onAny function(async function listener) \ Add any event listener.

\ Add any event listener. stop function(errorMessage?) \ Stops job execution. If errorMessage is passed, job will throw an error with this message.

Events and event listeners

Event listeners, which can be registered with job.on or job.onAny methods, allow you to listen to specific events in the job lifecycle and to interact with them.

Multiple listeners for one event will be executed in the order they were registered.

Job always waits for the code inside a listener to complete.

Any error thrown inside a listener will be ignored and warning message will be printed out.

job.stop() method can be used inside a listener to stop job execution and throw an error if needed.

job.on( "startProcessing" , async (processor: IProcessor, destinations: IDestination[]) => { } );

job.on( "startProcessingStream" , async (...args: any ) => { });

job.on( 'startProcessingRow' , async (row: any )) => { });

job.on( 'rowGenerated' , async (destinationIndex: number , generatedRow: any )) => { });

job.on( 'rowGenerationError' , async (destinationIndex: number , row: any , error: any )) => { });

job.on( 'endProcessingRow' , async ()) => { });

job.on( "transformingBatch" , async (destinationIndex: number , rows: any []) => { });

job.on( "transformedBatch" , async (destinationIndex: number , transformedRows: any ) => { } );

job.on( "transformingBatchError" , async (destinationIndex: number , rows: any [], error: any ) => { } );

job.on( "endTransformingBatch" , async (destinationIndex: number ) => { });

job.on( "loadingBatch" , async (destinationIndex: number , data: any []) => { });

job.on( "loadedBatch" , async (destinationIndex: number , data: any []) => { });

job.on( "loadingBatchError" , async (destinationIndex: number , data: any [], error: any ) => { } );

job.on( "endLoadingBatch" , async (destinationIndex: number ) => { });

job.on( "endProcessingStream" , async (...args: any ) => { });

job.on( "processingError" , async (error: any ) => { });

job.on( "endProcessing" , async () => { });

Listening for any event

Special listener can be registered using job.onAny method which will listen for any previously mentioned event.

job.onAny( async (eventName: string , ...args: any ) => { });

Extended information from event

Sometimes more information about event is needed, especially if you are building custom reporter to log or trace fired events.

This information can be obtained by registering an async function as a third parameter with job.on method or as a second parameter with job.onAny method.

For example,

job.on( "rowGenerated" , undefined , async (event: IBellboyEvent) => { console .log( ` ${event.jobName} has generated row for # ${event.eventArguments.destinationIndex} destination` ); });

or

job.onAny( undefined , async (event: IBellboyEvent) => { console .log( ` ${event.jobName} has fired ${event.jobEvent} ` ); });

Extended event (IBellboyEvent) fields

eventName string \ Name of the event.

\ Name of the event. eventArguments any \ Arguments of the event.

\ Arguments of the event. jobName string? \ User-defined name of the job.

\ User-defined name of the job. jobId string \ Unique ID of the job.

\ Unique ID of the job. eventId string \ Unique ID of the event.

\ Unique ID of the event. timestamp number \ High resolution timestamp of the event.

\ High resolution timestamp of the event. jobStopped boolean \ Whether the job is stopped or not.

Processors

Each processor in bellboy is a class which has a single responsibility of processing data of specific type -

MqttProcessor processes MQTT protocol messages.

protocol messages. HttpProcessor processes data received from a HTTP call.

call. ExcelProcessor processes XLSX file data from the file system.

file data from the file system. JsonProcessor processes JSON file data from the file system.

file data from the file system. DelimitedProcessor processes files with delimited data from the file system.

from the file system. PostgresProcessor processes data received from a PostgreSQL SELECT.

SELECT. MssqlProcessor processes data received from a MSSQL SELECT.

SELECT. DynamicProcessor processes dynamically generated data.

data. TailProcessor processes new lines added to the file.

Options

rowLimit number \ Number of records to be processed before stopping processor. If not specified or 0 is passed, all records will be processed.

MqttProcessor

Usage examples

Listens for messages and processes them one by one. It also handles backpressure by queuing messages, so all messages can be eventually processed.

Options

Processor options

url string required

topics string[] required

HttpProcessor

Usage examples

Processes data received from a HTTP call. Can process JSON as well as delimited data. Can handle pagination by using nextRequest function.

For delimited data produces rows described here.

Options

Processor options

connection object required \ Options from axios library.

\ Options from axios library. dataFormat delimited | json required

rowSeparator string required for delimited

delimiter string only for delimited \ A symbol separating fields of the row.

\ A symbol separating fields of the row. hasHeader boolean only for delimited \ If true , first row will be processed as a header.

\ If , first row will be processed as a header. qualifier string only for delimited \ Symbol placed around a field to signify that it is the same field.

\ Symbol placed around a field to signify that it is the same field. jsonPath RegExp \ Path to the array to be streamed. This option is described in detail inside JsonProcessor section.

\ Path to the array to be streamed. This option is described in detail inside JsonProcessor section. nextRequest async function(header) \ Function which must return connection for the next request or null if the next request is not needed.

const processor = new bellboy.HttpProcessor({ nextRequest : async function ( ) { if (currentPage < pageCount) { return { ...connection, url : ` ${url} ¤t_page= ${currentPage + 1 } ` , }; } return null ; }, });

Directory processors

Used for streaming text data from files in directory. There are currently four types of directory processors - ExcelProcessor , JsonProcessor , DelimitedProcessor and TailProcessor . Such processors search for the files in the source directory and process them one by one.

File name ( file ) and full file path ( filePath ) parameters will be passed to startProcessingStream event.

Options

Processor options

path string \ Path to the directory where files are located. Current directory by default.

\ Path to the directory where files are located. Current directory by default. filePattern RegExp \ Regex pattern for the files to be processed. If not specified, all files in the directory will be matched.

\ Regex pattern for the files to be processed. If not specified, all files in the directory will be matched. files string[] \ Array of file names. If not specified, all files in the directory will be matched against filePattern regex and processed in alphabetical order.

ExcelProcessor

Usage examples

Processes XLSX files in the directory.

Options

Directory processor options

hasHeader boolean | number \ Whether the worksheet has a header or not, false by default. 0-based row location can be passed to this option if header is not located on the first row.

| \ Whether the worksheet has a header or not, by default. 0-based row location can be passed to this option if header is not located on the first row. fillMergedCells boolean \ If true , merged cells wil have the same value (by default, only the first cell of merged cells is filled with value). \ Warning! Enabling this feature may increase streaming time because file must be processed to detect merged cells before actual stream. false by default.

\ If , merged cells wil have the same value (by default, only the first cell of merged cells is filled with value). \ Enabling this feature may increase streaming time because file must be processed to detect merged cells before actual stream. by default. ignoreEmpty boolean \ Whether to ignore empty rows or not, true by default.

\ Whether to ignore empty rows or not, by default. sheets (string | number)[] | async function(sheets) \ Array of sheet names and/or sheet indexes or async function, which accepts array of all sheets and must return another array of sheet names that needs to be processed. If not specified, first sheet will be processed.

\ Array of sheet names and/or sheet indexes or async function, which accepts array of all sheets and must return another array of sheet names that needs to be processed. If not specified, first sheet will be processed. encoding string \ XLSX file encoding.

const processor = new bellboy.ExcelProcessor({ sheets : async (sheets) => { const sheet = sheets[sheets.length - 1 ]; return [sheet.name]; }, });

Produced row

To see how processed row will look like, proceed to xlstream library documentation which is used for Excel processing.

JsonProcessor

Processes JSON files in the directory.

Options

Directory processor options

jsonPath RegExp \ Path to the array to be streamed. Internally when JSON is streamed, current path is joined together using . as separator and then tested against provided regular expression. If not specified, a root array will be streamed. As an example, if you have this JSON object:\ { "animals": { "dogs": [ "pug", "bulldog", "poodle" ] } } \ And want to stream dogs array, path you will need to use is /animals.dogs.(\d+)/ .\ (\d+) is used here because each index of the array is a number.

DelimitedProcessor

Usage examples

Processes files with delimited data in the directory.

Options

Directory processor options

rowSeparator string required

delimiter string \ A symbol separating fields of the row.

\ A symbol separating fields of the row. hasHeader boolean \ If true , first row will be processed as a header.

\ If , first row will be processed as a header. qualifier string \ Symbol placed around a field to signify that it is the same field.

Produced row

header string[] \ If hasHeader is true , first row will appear here.

\ If is , first row will appear here. arr string \ Row split by delimiter and qualifier .

\ Row split by and . obj string \ If hasHeader is true , object with header elements as keys will appear here.

\ If is , object with header elements as keys will appear here. row string \ Received raw row.

TailProcessor

Usage examples

Watches for file changes and outputs last part of file as soon as new lines are added to the file.

Options

Directory processor options

fromBeginning boolean \ In addition to emitting new lines, emits lines from the beginning of file, false by default.

Produced row

file string \ Name of the file the data came from.

\ Name of the file the data came from. data string

Database processors

Processes SELECT query row by row. There are two database processors - PostgresProcessor (usage examples) and MssqlProcessor (usage examples). Both of them are having the same options.

Options

Processor options

query string required \ Query to execute.

\ Query to execute. connection object required user password server \ Used with MssqlProcessor . host Used with PostgresProcessor . port database schema \ Currently available only for PostgresProcessor . driver \ Available only for MssqlProcessor . Defines which driver to use - tedious (used by default) or msnodesqlv8 .



DynamicProcessor

Processor which generates records on the fly. Can be used to define custom data processors.

Options

Processor options

generator async generator function required \ Generator function which must yield records to process.

const processor = new bellboy.DynamicProcessor({ generator : async function * ( ) { for ( let i = 0 ; i < 10 ; i++) { yield i; } }, });

Destinations

Every job can have as many destinations (outputs) as needed. For example, one job can load processed data into a database, log this data to stdout and post it by HTTP simultaneously.

StdoutDestination logs data to console .

. HttpDestination executes HTTP request calls.

request calls. PostgresDestination inserts/upserts data to PostgreSQL database.

database. MssqlDestination inserts data to MSSQL database.

Options

disableLoad boolean \ If true , no data will be loaded to the destination. In combination with reporters, this option can become handy during testing process.

\ If , no data will be loaded to the destination. In combination with reporters, this option can become handy during testing process. batchSize number \ Number of records to be processed before loading them to the destination. If not specified or 0 is passed, all records will be processed.

\ Number of records to be processed before loading them to the destination. If not specified or is passed, all records will be processed. recordGenerator async generator function(row) \ Function which receives produced row by processor and can apply transformations to it.

\ Function which receives produced row by processor and can apply transformations to it. batchTransformer async function(rows) \ Function which receives whole batch of rows. This function is being called after row count reaches batchSize . Data is being loaded to destination immediately after this function has been executed.

StdoutDestination

Logs out all data to stdout (console).

Options

General destination options

asTable boolean \ If set to true , data will be printed as table.

HttpDestination

Usage examples

Puts processed data one by one in body and executes specified HTTP request.

Options

General destination options

request required \ Options from axios library.

PostgresDestination

Usage examples

Inserts data to PostgreSQL.

Options

General destination options

table string required \ Table name.

\ Table name. upsertConstraints string[] \ If specified, UPSERT command will be executed based on provided constraints.

\ If specified, command will be executed based on provided constraints. connection object required user password host database schema



MssqlDestination

Usage examples

Inserts data to MSSQL.

Options

General destination options

table string required \ Table name.

\ Table name. connection object required user password server database



Extendability

New processors and destinations can be made by extending existing ones. Feel free to make a pull request if you create something interesting.

Creating a new processor

Processor class examples

To create a new processor, you must extend Processor class and implement async process function. This function accepts one parameter:

processStream async function(readStream, ...args) required \ Callback function which accepts Readable stream. After calling this function, job instance will handle passed stream internally. Passed parameters ( args ) will be emitted with startProcessingStream event during job execution.

class CustomProcessor extends bellboy . Processor { async process(processStream) { } }

Creating a new destination

Destination class examples

To create a new destination, you must extend Destination class and implement async loadBatch function. This function accepts one parameter:

data any[] required \ Array of some processed data that needs to be loaded.

class CustomDestination extends bellboy . Destination { async loadBatch(data) { console .log(data); } }

Creating a new reporter

Official stdout reporter

Reporter is a job wrapper which can operate with job instance (for example, listen to events using job on method). To create a new reporter, you must extend Reporter class and implement report function, which will be executed during job instance initialization. This function accepts one parameter:

job Job required \ Job instance

class CustomReporter extends bellboy . Reporter { report(job) { job.on( "startProcessing" , undefined , async ({ jobName }) => { console .log( `Job ${jobName} has been started.` ); }); } }

Testing