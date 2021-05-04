This is a reimagining of what a push-style stream could be.
push-stream harkens back to the original node.js classic streams
but without several of the unrealized-at-the-time blunders of classic streams.
paused property instead of
write() returning paused, this means you can know whether
your destination is paused before you call write.
resume method on the source stream (that stays the same) instead of assigning and reassigned "drain" listeners.
the only property that changes while streaming is the
paused boolean. No closures are necessary (which are related to memory leaks and not optimized by js engines)
possibly a stream has a buffer (array) but otherwise no memory should be allocated on most simple streams.
push-streams also incorporates the lessons learned over several years working with pull-streams. For additional context learn about the history of node streams and the simplicity of pull-streams
I wrote this because I wanted muxrpc to have back pressure. muxrpc@<=6 wrapped a simple streaming model called "packet-stream", which internally used a stream model called "weird streams" (yes, that is a sign I didn't really know what to do there) but it was much easier to write a multiplexer with a push model (I tried writing a pull based muxer and got stuck in many deadlocks)
push-mux was relatively straightforward once I figured out what the push-stream api looked like.
I'm not planning on rewriting everything that uses pull-streams to use this! So I also made push-stream-to-pull-stream
write one chunk of data to a stream.
this must not be called if
sink.paused == true.
end the stream. if
err is an Error, that means the stream has
aborted with a fatal error, throw away any buffered data and
stop immediately.
when sink.end is called with an error, it does not need to respect
pause because the stream is discarded at that point.
if it's an ordinary end, then the caller should wait until the stream is unpaused.
boolean of the current pause state. when writing to a stream,
check the value of
paused both before and after.
If the sink is paused, a transform stream will usually
also pause.
attach this stream to
sink stream
(or pipeline starting with a sink) if
source has
pipe should
call
source.resume() and while sink is not paused it should
write any available data to it.
If a sink sets
paused = true then the source should stop writing.
when the sink decides it is realy for data again, it must call
the source - via the
resume method. If there is data available
the source should write it to the sink. If the source is a transform
stream (i.e. has it's own source) then it would call
resume on that source (pass the resume signal along)
There are 2 basic types of streams, source and sink. and these two types can be combined in two ways: transform and duplex.
a stream that pipes data to a sink.
a stream that receives data from a source
A stream that receive data from a source, possibly transforms it in some way, and then pipes it to another sink. When the transform's sink pauses, the transform should also pause.
A stream that is both a source and a sink, but source/sink are not internally connected like they are in a transform stream. Duplex streams are used for communication - two processes are connected via a "wire" (serialized bytes transmitted, then parsed by the receiver). On a duplex stream, the pause state of the input is not connected to the pause state of the output, instead it's probably connected to the internal state of the system the duplex stream connects to.
This is a subtle distinction and probably only advanced use cases require writing duplex streams (you are doing protocol design if you are writing a duplex stream)
In node.js streams, there is a complicated "pipe" method
that defines how data flows between the streams. In
push-stream
the
pipe method just connects the streams together, and
instead of emitting events, the streams just call
write and
end
on the next stream in the pipeline directly.
A push-stream source is a simple object with a
pipe
resume and
abort method.
A push-stream pipeline
is just a doubly-linked list. the
sink property is a reference
to the next stream in the pipeline, and
source is a reference
to the previous stream. The words
Source and
Sink are also
used to refer to the first and last streams in a pipeline.
The
pipe method just sets up these references, and the
resume
method starts the data flowing. The data should flow until
the sink stream is paused, as indicated by it setting the
paused property to true.
//a stream that reads an array
function Values (ary) {
var i = 0
return {
resume: function () {
if(!this.sink || this.ended) return
while(!this.sink.paused && i < ary.length)
this.sink.write(ary[i++])
//note: end does not check pause state.
//pause does not block end.
if(i === ary.length) this.sink.end()
},
//pipe() can be as simple as connecting streams together!
pipe: function (sink) {
this.sink = sink
sink.source = this
this.resume()
return sink
},
//abort ends the stream immediately.
abort: function (err) {
this.ended = err
//if the stream has ended, abort immediately.
if(!this.sink.ended) this.sink.end(err)
}
}
}
(See a pull-stream version.)
A sink stream has
write and
end methods and a
paused property.
In pull-streams, the sink is responsible for calling the source
but in push-streams it's the reverse - so the push-stream sink
doesn't need very much at all.
return Log (name) {
return {
paused: false,
write: function (data) {
console.log(data)
//if you set paused=true here, the source should stop writing.
},
end: function (err) {
this.ended = err || true
}
}
}
(See a pull-stream version.)
the through stream is more complicated in push-streams because it needs to have the apis of both source and sink. This is a very simple example through stream that does not have it's own internal buffer. It just writes to the sink immediately. This may mean writing when the sink is paused in some situations, if this is a problem drop in a buffering stream
Note: push-stream throughs must start out with
paused=true,
sinks start out
paused=false. If a through stream is piped to a
destination that is unpaused, it should resume, which will propagate
the resume signal back up the pipeline and data will start flowing.
function Map(fn) {
return {
paused: true,
write: function (data) {
this.sink.write(fn(data))
this.paused = this.sink.paused
},
end: function (err) {
this.ended = err || true
this.sink.end(err)
},
resume: function () {
this.source.resume()
},
pipe: function (sink) {
this.sink = sink
sink.source = this
this.paused = this.sink.paused
if(!this.sink.paused)
this.resume()
return sink
},
abort: function (err) {
this.source.abort(err)
}
}
}
Thanks to @ahdinosaur for giving me the push-stream npm repo! His push-stream module (essentially a simple observable) is still available at push-stream@2
MIT