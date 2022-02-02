Highly performant JavaScript data stream ETL engine.
Bellboy streams input data row by row. Every row, in turn, goes through user-defined function where it can be transformed. When enough data is collected in batch, it is being loaded to destination.
Before install, make sure you are using latest version of Node.js.
npm install bellboy
This example shows how
bellboy can extract rows from the Excel file, modify it on the fly, load to the Postgres database, move processed file to the other folder and process remaining files.
Just in five simple steps.
const bellboy = require("bellboy");
const fs = require("fs");
const path = require("path");
(async () => {
const srcPath = `C:/source`;
// 1. create a processor which will process
// Excel files in the folder one by one
const processor = new bellboy.ExcelProcessor({
path: srcPath,
hasHeader: true,
});
// 2. create a destination which will add a new 'status'
// field to each row and load processed data into a Postgres database
const destination = new bellboy.PostgresDestination({
connection: {
user: "user",
password: "password",
host: "localhost",
database: "bellboy",
},
table: "stats",
recordGenerator: async function* (record) {
yield {
...record.raw.obj,
status: "done",
};
},
});
// 3. create a job which will glue the processor and the destination together
const job = new bellboy.Job(processor, [destination]);
// 4. tell bellboy to move the file away as soon as it was processed
job.on("endProcessingStream", async (file) => {
const filePath = path.join(srcPath, file);
const newFilePath = path.join(`./destination`, file);
await fs.renameSync(filePath, newFilePath);
});
// 5. Log all error events
job.onAny(async (eventName, ...args) => {
if (eventName.includes("Error")) {
console.log(args);
}
});
// 6. run your job
await job.run();
})();
A job in
bellboy is a relationship link between processor and destinations. When the job is run, data processing and loading mechanism will be started.
To initialize a Job instance, pass processor and some destination(s).
const job = new bellboy.Job(
processor_instance,
[destination_instance],
(job_options = {})
);
Reporter[]\
Array of reporters.
string\
Optional user-defined name of the job. Can become handy if used in combination with extended events to distinguish events from different jobs.
async function()\
Starts processing data.
function(event, async function listener)\
Add specific event listener.
function(async function listener)\
Add any event listener.
function(errorMessage?)\
Stops job execution. If
errorMessage is passed, job will throw an error with this message.
Event listeners, which can be registered with
job.on or
job.onAny methods, allow you to listen to specific events in the job lifecycle and to interact with them.
job.stop() method can be used inside a listener to stop job execution and throw an error if needed.
job.on(
"startProcessing",
async (processor: IProcessor, destinations: IDestination[]) => {
// Job has started execution.
}
);
job.on("startProcessingStream", async (...args: any) => {
// Stream processing has been started.
// Passed parameters may vary based on specific processor.
});
job.on('startProcessingRow', async (row: any)) => {
// Row has been received and is about to be processed inside `recordGenerator` method.
});
job.on('rowGenerated', async (destinationIndex: number, generatedRow: any)) => {
// Row has been generated using `recordGenerator` method.
});
job.on('rowGenerationError', async (destinationIndex: number, row: any, error: any)) => {
// Record generation (`recordGenerator` method) has thrown an error.
});
job.on('endProcessingRow', async ()) => {
// Row has been processed.
});
job.on("transformingBatch", async (destinationIndex: number, rows: any[]) => {
// Batch is about to be transformed inside `batchTransformer` method.
});
job.on(
"transformedBatch",
async (destinationIndex: number, transformedRows: any) => {
// Batch has been transformed using`batchTransformer` method.
}
);
job.on(
"transformingBatchError",
async (destinationIndex: number, rows: any[], error: any) => {
// Batch transformation (`batchTransformer` method) has thrown an error.
}
);
job.on("endTransformingBatch", async (destinationIndex: number) => {
// Batch has been transformed.
});
job.on("loadingBatch", async (destinationIndex: number, data: any[]) => {
// Batch is about to be loaded into destination.
});
job.on("loadedBatch", async (destinationIndex: number, data: any[]) => {
// Batch has been loaded into destination.
});
job.on(
"loadingBatchError",
async (destinationIndex: number, data: any[], error: any) => {
// Batch load has failed.
}
);
job.on("endLoadingBatch", async (destinationIndex: number) => {
// Batch load has finished .
});
job.on("endProcessingStream", async (...args: any) => {
// Stream processing has finished.
// Passed parameters may vary based on specific processor.
});
job.on("processingError", async (error: any) => {
// Unexpected error has occured.
});
job.on("endProcessing", async () => {
// Job has finished execution.
});
Special listener can be registered using
job.onAny method which will listen for any previously mentioned event.
job.onAny(async (eventName: string, ...args: any) => {
// An event has been fired.
});
Sometimes more information about event is needed, especially if you are building custom reporter to log or trace fired events.
This information can be obtained by registering an async function as a third parameter with
job.on method or as a second parameter with
job.onAny method.
For example,
job.on("rowGenerated", undefined, async (event: IBellboyEvent) => {
// Row has been generated using `recordGenerator` method.
console.log(
`${event.jobName} has generated row for #${event.eventArguments.destinationIndex} destination`
);
});
or
job.onAny(undefined, async (event: IBellboyEvent) => {
console.log(`${event.jobName} has fired ${event.jobEvent}`);
});
string\
Name of the event.
any\
Arguments of the event.
string?\
User-defined name of the job.
string\
Unique ID of the job.
string\
Unique ID of the event.
number\
High resolution timestamp of the event.
boolean\
Whether the job is stopped or not.
Each processor in
bellboy is a class which has a single responsibility of processing data of specific type -
number\
Number of records to be processed before stopping processor. If not specified or
0 is passed, all records will be processed.
Listens for messages and processes them one by one. It also handles backpressure by queuing messages, so all messages can be eventually processed.
string
required
string[]
required
Processes data received from a HTTP call. Can process
JSON as well as
delimited data. Can handle pagination by using
nextRequest function.
For delimited data produces rows described here.
object
required\
Options from axios library.
delimited | json
required
string
required for delimited
string
only for delimited\
A symbol separating fields of the row.
boolean
only for delimited\
If
true, first row will be processed as a header.
string
only for delimited\
Symbol placed around a field to signify that it is the same field.
RegExp\
Path to the array to be streamed. This option is described in detail inside JsonProcessor section.
async function(header)\
Function which must return
connection for the next request or
null if the next request is not needed.
const processor = new bellboy.HttpProcessor({
nextRequest: async function () {
if (currentPage < pageCount) {
return {
...connection,
url: `${url}¤t_page=${currentPage + 1}`,
};
}
return null;
},
// ...
});
Used for streaming text data from files in directory. There are currently four types of directory processors -
ExcelProcessor,
JsonProcessor,
DelimitedProcessor and
TailProcessor. Such processors search for the files in the source directory and process them one by one.
File name (
file) and full file path (
filePath) parameters will be passed to
startProcessingStream event.
string\
Path to the directory where files are located. Current directory by default.
RegExp\
Regex pattern for the files to be processed. If not specified, all files in the directory will be matched.
string[]\
Array of file names. If not specified, all files in the directory will be matched against
filePattern regex and processed in alphabetical order.
Processes
XLSX files in the directory.
boolean |
number\
Whether the worksheet has a header or not,
false by default. 0-based row location can be passed to this option if header is not located on the first row.
boolean\
If
true, merged cells wil have the same value (by default, only the first cell of merged cells is filled with value). \
Warning! Enabling this feature may increase streaming time because file must be processed to detect merged cells before actual stream.
false by default.
boolean\
Whether to ignore empty rows or not,
true by default.
(string | number)[] | async function(sheets)\
Array of sheet names and/or sheet indexes or async function, which accepts array of all sheets and must return another array of sheet names that needs to be processed. If not specified, first sheet will be processed.
string\
XLSX file encoding.
const processor = new bellboy.ExcelProcessor({
// process last sheet
sheets: async (sheets) => {
const sheet = sheets[sheets.length - 1];
return [sheet.name];
},
// ...
});
To see how processed row will look like, proceed to xlstream library documentation which is used for Excel processing.
Processes
JSON files in the directory.
RegExp\
Path to the array to be streamed. Internally when JSON is streamed, current path is joined together using
. as separator and then tested against provided regular expression. If not specified, a root array will be streamed. As an example, if you have this JSON object:\
{ "animals": { "dogs": [ "pug", "bulldog", "poodle" ] } }\
And want to stream
dogs array, path you will need to use is
/animals.dogs.(\d+)/.\
(\d+) is used here because each index of the array is a number.
Processes files with delimited data in the directory.
string
required
string\
A symbol separating fields of the row.
boolean\
If
true, first row will be processed as a header.
string \
Symbol placed around a field to signify that it is the same field.
string[]\
If
hasHeader is
true, first row will appear here.
string\
Row split by
delimiter and
qualifier.
string\
If
hasHeader is
true, object with header elements as keys will appear here.
string\
Received raw row.
Watches for file changes and outputs last part of file as soon as new lines are added to the file.
boolean\
In addition to emitting new lines, emits lines from the beginning of file,
false by default.
string\
Name of the file the data came from.
string
Processes
SELECT query row by row. There are two database processors -
PostgresProcessor (usage examples) and
MssqlProcessor (usage examples). Both of them are having the same options.
string
required\
Query to execute.
object
required
MssqlProcessor.
PostgresProcessor.
PostgresProcessor.
MssqlProcessor. Defines which driver to use -
tedious (used by default) or
msnodesqlv8.
Processor which generates records on the fly. Can be used to define custom data processors.
async generator function
required\
Generator function which must yield records to process.
// processor which generates 10 records dynamically
const processor = new bellboy.DynamicProcessor({
generator: async function* () {
for (let i = 0; i < 10; i++) {
yield i;
}
},
});
Every job can have as many destinations (outputs) as needed. For example, one job can load processed data into a database, log this data to stdout and post it by HTTP simultaneously.
boolean\
If
true, no data will be loaded to the destination. In combination with reporters, this option can become handy during testing process.
number\
Number of records to be processed before loading them to the destination. If not specified or
0 is passed, all records will be processed.
async generator function(row)\
Function which receives produced row by processor and can apply transformations to it.
async function(rows)\
Function which receives whole batch of rows. This function is being called after row count reaches
batchSize. Data is being loaded to destination immediately after this function has been executed.
Logs out all data to stdout (console).
boolean\
If set to
true, data will be printed as table.
Puts processed data one by one in
body and executes specified HTTP request.
required\
Options from axios library.
Inserts data to PostgreSQL.
string
required\
Table name.
string[]\
If specified,
UPSERT command will be executed based on provided constraints.
object
required
Inserts data to MSSQL.
string
required\
Table name.
object
required
New processors and destinations can be made by extending existing ones. Feel free to make a pull request if you create something interesting.
To create a new processor, you must extend
Processor class and implement async
process function. This function accepts one parameter:
async function(readStream, ...args)
required\
Callback function which accepts Readable stream. After calling this function,
job instance will handle passed stream internally. Passed parameters (
args) will be emitted with
startProcessingStream event during job execution.
class CustomProcessor extends bellboy.Processor {
async process(processStream) {
// await processStream(readStream, 'hello', 'world');
}
}
To create a new destination, you must extend
Destination class and implement async
loadBatch function. This function accepts one parameter:
any[]
required\
Array of some processed data that needs to be loaded.
class CustomDestination extends bellboy.Destination {
async loadBatch(data) {
console.log(data);
}
}
Reporter is a job wrapper which can operate with job instance (for example, listen to events using job
on method). To create a new reporter, you must extend
Reporter class and implement
report function, which will be executed during job instance initialization. This function accepts one parameter:
Job
required\
Job instance
class CustomReporter extends bellboy.Reporter {
report(job) {
job.on("startProcessing", undefined, async ({ jobName }) => {
console.log(`Job ${jobName} has been started.`);
});
}
}
Tests can be run by using
docker-compose up --abort-on-container-exit --exit-code-from test --build test command.