Creates a StreamTransformer.
The returned instance takes responsibility of implementing (bind).
When the user invokes bind
it returns a new "bound" stream. Only when
the user starts listening to the bound stream, the listen
method
invokes the given closure transformer.
The transformer closure receives the stream, that was bound, as argument and returns a StreamSubscription. In almost all cases the closure listens itself to the stream that is given as argument.
The result of invoking the transformer closure is a StreamSubscription.
The bound stream-transformer (created by the bind
method above) then sets
the handlers it received as part of the listen
call.
Conceptually this can be summarized as follows:
var transformer = new StreamTransformer(transformerClosure);
creates a StreamTransformer
that supports the bind
method.
var boundStream = stream.transform(transformer);
binds the stream
and returns a bound stream that has a pointer to stream
.
boundStream.listen(f1, onError: f2, onDone: f3, cancelOnError: b)
starts the listening and transformation. This is accomplished
in 2 steps: first the boundStream
invokes the transformerClosure
with
the stream
it captured: transformerClosure(stream, b)
.
The result subscription
, a StreamSubscription, is then
updated to receive its handlers: subscription.onData(f1)
,
subscription.onError(f2)
, subscription(f3)
. Finally the subscription
is returned as result of the listen
call.
There are two common ways to create a StreamSubscription:
by creating a new class that implements StreamSubscription. Note that the subscription should run callbacks in the Zone the stream was listened to.
by allocating a StreamController and to return the result of listening to its stream.
Example use of a duplicating transformer:
stringStream.transform(new StreamTransformer<String, String>(
(Stream<String> input, bool cancelOnError) {
StreamController<String> controller;
StreamSubscription<String> subscription;
controller = new StreamController<String>(
onListen: () {
subscription = input.listen((data) {
// Duplicate the data.
controller.add(data);
controller.add(data);
},
onError: controller.addError,
onDone: controller.close,
cancelOnError: cancelOnError);
},
onPause: subscription.pause,
onResume: subscription.resume,
onCancel: subscription.cancel,
sync: true);
return controller.stream.listen(null);
});