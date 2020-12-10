Empujar. Empujarlo Bueno.

When you need to push data around, you push it. Push it real good.

An ETL and Operations tool.

What

Empujar is a tool which moves stuff around. It's built in node.js so you can do lots of stuff async-ly. You can move data around (a ETL tool), files (a backup tool), and more!

Empujar's top level object is a "book", which contains "chapters" and then "pages". Chapters are excecuted 1-by-1 in order, and then each page in a chapter can be run in parallel (up to a threading limit you specify).

See an example project here.

For Example, an example chapter to extract all data from a mySQL database would be:

var dateformat = require ( 'dateformat' ); exports.chapterLoader = function ( book ) { var chapter = book.addChapter( 1 , 'EXTRACT & LOAD' , { threads : 5 }); var source = book.connections.source.connection; var destination = book.connections.destination.connection; var queryLimit = 1000 ; var tableMaxes = {}; var extractTable = function ( table, callback ) { destination.getMax(table, 'updatedAt' , function ( error, max ) { if (error){ return callback(error); } var query = 'SELECT * FROM `' + table + '` ' ; if (max){ query += ' WHERE `updatedAt` >= "' + dateformat(max, 'yyyy-mm-dd HH:MM:ss' ) + '"' ; } source.getAll(query, queryLimit, function ( error, rows, done ) { destination.insertData(table, rows, function ( error ) { if (error){ return next(error); } done(); }); }, callback); }); }; chapter.addLoader( 'determine extract queries' , function ( done ) { source.tables.forEach( function ( table ) { chapter.addPage( 'extract table: ' + table, function ( next ) { extractTable(table, next); }); }); done(); }); };

Empujar runs operations in series or parallel. These are defined by books and chapters and pages .

process.chdir(__dirname); var Empujar = require ( 'empujar' ); var optimist = require ( 'optimist' ); var options = optimist.argv; var book = new Empujar.book(options); var errorHandler = function ( error, context ) { console .log( "OH NO! (but I handled the error) | " + error); setTimeout(process.exit, 5000 ); }; book.on( 'error' , errorHandler); book.connect( function ( ) { book.logger.log( 'I am a debug message' , 'debug' ); book.data.stuff = 'something cool' ; var chapter1 = book.addChapter( 1 , 'Do the first thing in parallel' , { threads : 10 }); var chapter2 = book.addChapter( 2 , 'Do that next thing in serial' , { threads : 1 }); var i = 0 ; while (i < 100 ){ chapter1.addPage( 'sleepy thing: ' + i, function ( next ) { setTimeout(next, 100 ); }); i++; } chapter2.addLoader( 'do something before' , function ( next ) { book.logger.log( 'I am the preloader' ); next(); }); chapter2.addPage( 'the final step' , function ( next ) { next(); }); book.run( function ( ) { setTimeout(process.exit, 5000 ); }); });

There is also a more formal example you can explore within this project. Check out /books/etl to learn more.

Empujar will connect to connections you define in book/config/connections/NAME.js , and there should be a matching transport in /lib/connections/TYPE.js .

When book.run() is complete, you probably want to process.exit() , or more gracefully shutdown.

You can subscribe to book.on('error') and book.on('state') events. A cool thing to do would be to actually record these state events into your datawarehouse, if you are using empujar as an ETL tool:

book.on( 'state' , function ( data ) { datawarehouse.insertData( 'empujar' , [data]); });

Project Layout

Create your project so that it looks like this:

| - \ books | --- \ myBook | ----- \ book .js | ----- \ pids \ | ----- \ logs \ | ----- \ config \ | ----- \ config \ connections \ | ----- \ config \ connections \ myDatabase .js | ----- \ chapters \ | ----- \ chapters \ chapte 1.js | ----- \ chapters \ chapte 2.js

Launch Flags

The defaults for all launch flags are:

{ chapterFiles : path.normalize( process.cwd() + '/chapters/**/*.js' ), configPath : path.normalize( process.cwd() + '/config' ), logPath : path.normalize( process.cwd() + '/log' ), pidsPath : path.normalize( process.cwd() + '/pids' ), logFile : 'empujar.log' , tmpPath : path.normalize( process.cwd() + '/tmp' ), logStdout : true , logLevel : 'info' , chapters : [], getAllLimit : Infinity , }

Examples:

Run your book: node yourBook.js Run your book in verbose mode: node yourBook.js --logLevel debug Run only certain chapters in your book: node yourBook.js --chapters 1,4 or a range: node yourBook.js --chapters 100-300 Extract only a small subset of yoru data (great in testing) node yourBook.js --getAllLimit 1000

This would make all invocations of connection.getAll() exit sucessfully after retrieving 1000 rows.

Connections

While you can create your own connections, Empujar ships with the tools to work with a number of the most common ones:

MySQL

var connection = book.connections.mysql.connection; connection.connect = function ( callback ) // Connection method ; handled by book . connect ( ); // callback is passed ( error ) connection . showTables = function ( callback ) // list tables // callback is returned error , array of table names connection . showColumns = function ( table, callback ) // list the columns + metadata for each column // callback is returned error , hash of columns + metadata connection . query = function ( query, data, callback ) // query the table // data can be optional ; used to fill in missing attributes / interpolate ( ? ) // callback is returned error , rows ( array of hashes col-value ) connection . getAll = function ( queryBase, chunkSize, dataCallback, doneCallback ) // fetch data from the cluster ; normalized as an array of hashes . Data is already typecast . // queryBase -> the base mySQL query ( Limit and offset will be appended automatically ) // chunkSize -> number of results to return ( IE: limit ) // dataCallback -> callback called with each collection of data // -> ( error, data, next ) // -> data is normalized // -> next ( ) must be called to continue // doneCallback is passed ( error, rowsFound ) connection . getMax = function ( table, column, callback ) // list the maximum value for a column in a table // callback is returned error , maximum value from the table or null connection . queryStream = function ( query, callback ) // get a stream that returns results of a query // events listed here : https :// github . com / felixge / node - mysql # streaming - query - rows // callback is returned error , stream connection . insertData = function ( table, data, callback, mergeOnDuplicates ) // add data to an table ; create the index if needed . Data should be normalized ( IE results from #getAll ) // callback is passed ( error ) connection . addColumn = function ( table, column, rowData, callback ) // add a column to a table . // RowData is an array of data to insert into the column which can be used to determine the column data type // callback is returned error connection . alterColumn = function ( table, column, definition, callback ) // change the datatype of a column // definition is a mySQL statment // callback is returned error connection . mergeTables = function ( sourceTable, destinationTable, callback ) // merge the data from sourceTable into destinationTable // destinationTable will be created if if doesn ' t exist // destinationTable will be erased and recreated from sourceTable if there is no primary key present // callback is returned error connection . copyTableSchema = function ( sourceTable, destinationTable, callback ) // create a new table ( destinationTable ) with the same schema as ( sourceTable ) // callback is returned error connection . dump = function ( file, options, callback ) // mysqlDump the DB to file // options : /* if ( !options.binary ) { options.binary = 'mysqldump' ; } if (!options.database){ options.database = self.options.database; } if (!options.password){ options.password = self.options.password; } if (!options.host){ options.host = self.options.host; } if (!options.port){ options.port = self.options.port; } if (!options.user){ options.user = self.options.user; } if (!options.tables){ options.tables = []; } if (!options.gzip){ options.gzip = false ; } * / / / callback is returned error

Elasticsearch

var connection = book.connections.elasticsearch.connection; connection.connect = function ( callback ) // Connection method ; handled by book . connect ( ); // callback is passed ( error ) connection . showIndices = function ( callback ) // list the indices in the cluster // callback is passed ( error, indicies ) // -> ` indicies ` is a hash with index names and metadata connection . insertData = function ( index, data, callback ) // add data to an index ; create the index if needed . Data should be normalized ( IE results from #getAll ) // callback is passed ( error ) connection . getAll = function ( index, query, fields, chunkSize, dataCallback, doneCallback ) // fetch data from the cluster ; normalized as an array of hashes . Data is already typecast . // index -> string name of index // query -> the elasticsearch query ( as a hash ) // fields -> array of fields you want returned ; '*' can be passed as an argument to request all fields // chunkSize -> number of results to return ( from each server ) // dataCallback -> callback called with each collection of data // -> ( error, data, next ) // -> data is normalized // -> next ( ) must be called to continue // doneCallback is passed ( error, rowsFound )

S3

var connection = book.connections.s3.connection; connection.connect = function ( callback ) // Connection method ; handled by book . connect ( ); // callback is passed ( error ) connection . listFolders = function ( prefix, callback ) // list all folders in this S3 bucket ( starting with `prefix` ) // prefix can be `*` of `''` to get all folders in the bucket // callback is passed ( error, arrayOfFolderNames ) connection . listObjects = function ( prefix, callback ) // list all objects in this S3 bucket ( starting with `prefix` ) // prefix can be `*` of `''` to get all folders in the bucket // callback is passed ( error, arrayOfObjectNames ) connection . deleteFolder = function ( prefix, callback ) // delete the folder starging with ` prefix `, and all objects contatined within // like ` rm - rf prefix ` // prefix can be `*` of `''` to delete all folders and files in the bucket // callback is passed ( error ) connection . objectExists = function ( filename, callback ) // check if a file exists in this bucket // callback is passed ( error, exists ) where exists is a boolean connection . delete = function ( filename, callback ) // delete a file from this bucket // callback is passed ( error ) connection . streamingUpload = function ( inputStream, filename, callback ) // upload a file * to S3 with the filename ` filename ` // the file you are uploading should be a readableStream created with fs . createReadStream // callback is passed ( error )

FTP

var connection = book.connections.ftp.connection; connection.connect = function ( callback ) // Connection method ; handled by book . connect ( ); // callback is passed ( error ) connection . get = function ( file, callback ) // donwload a file from the FTP server // callback is passed ( error, stream ) // -> ` stream ` which you can pipe to a file on disk or S3 , etc connection . listFiles = function ( dir, callback ) // list files from a remote directory // callback is passed ( error, files ) // -> ` files ` is an array of remote file names

Amazon Redshift

var connection = book.connections.redshift.connection; connection.connect = function ( callback ) // Connection method ; handled by book . connect ( ); // callback is passed ( error ) connection . showTables = function ( callback ) // list tables // callback is returned error , array of table names connection . showColumns = function ( table, callback ) // list the columns + metadata for each column // callback is returned error , hash of columns + metadata connection . query = function ( query, callback ) // query the table // callback is returned error , rows ( array of hashes col-value ) connection . getAll = function ( queryBase, chunkSize, dataCallback, doneCallback ) // fetch data from the cluster ; normalized as an array of hashes . Data is already typecast . // queryBase -> the base mySQL query ( Limit and offset will be appended automatically ) // chunkSize -> number of results to return ( IE: limit ) // dataCallback -> callback called with each collection of data // -> ( error, data, next ) // -> data is normalized // -> next ( ) must be called to continue // doneCallback is passed ( error, rowsFound ) connection . insertData = function ( table, data, callback ) // add data to an table ; create the index if needed . Data should be normalized ( IE results from #getAll ) // callback is passed ( error ) connection . mergeTables = function ( sourceTable, destinationTable, callback ) // merge the data from sourceTable into destinationTable // destinationTable will be created if if doesn ' t exist // destinationTable will be erased and recreated from sourceTable if there is no primary key present // callback is returned error connection . addColumn = function ( table, column, rowData, callback ) // add a column to a table . // RowData is an array of data to insert into the column which can be used to determine the column data type // callback is returned error connection . alterColumn = function ( table, column, definition, callback ) // change the datatype of a column // definition is a mySQL statment // callback is returned error connection . copyTableSchema = function ( sourceTable, destinationTable, callback ) // create a new table ( destinationTable ) with the same schema as ( sourceTable ) // callback is returned error connection . getMax = function ( table, column, callback ) // list the maximum value for a column in a table // callback is returned error , maximum value from the table or null

Creating your own connections.

It's easy to add your own connections to empujar. All you need is a /connections folder in your project, and to follow some conventions. The basic building block of a connection looks like this:

var connection = function ( name, type, options, book ) { this .name = name; this .type = type; this .options = options; this .book = book; this .connection = null ; }; connection.prototype.connect = function ( callback ) { var self = this ; callback(); }; exports.connection = connection;

... and then extend your connection model with more prototypes.

For example, here'e a connection, delighted.js which TaskRabbit uses to import NPS survey data from our partner Delighted. We extend their library to match the getAll method of the built-in connections above.