blob: d8c44e05bce8b97c1f0d02eef7b5fa656f585d6d [file] [log] [blame]
// Copyright 2014 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
part of quiver.streams;
/**
* Returns the concatentation of the input streams.
*
* When the returned stream is listened to, the [streams] are iterated through
* asynchronously, forwarding all events (both data and error) for the current
* stream to the returned stream before advancing the iterator and listening to
* the next stream. If advancing the iterator throws an error, the returned
* stream ends immediately with that error.
*
* Pausing and resuming the returned stream's subscriptions will pause and
* resume the subscription of the current stream being listened to.
*
* Note: Events from pre-existing broadcast streams which occur before
* the stream is reached by the iteration will be dropped.
*
* Example:
*
* concat(files.map((file) =>
* file.openRead().transform(const LineSplitter())))
*
*/
Stream concat(Iterable<Stream> streams) => new _ConcatStream(streams);
class _ConcatStream extends Stream {
final Iterable<Stream> _streams;
_ConcatStream(Iterable<Stream> streams) : _streams = streams;
StreamSubscription listen(void onData(var data),
{Function onError, void onDone(), bool cancelOnError}) {
cancelOnError = true == cancelOnError;
StreamSubscription currentSubscription;
StreamController controller;
Iterator iterator = _streams.iterator;
void nextStream() {
bool hasNext;
try {
hasNext = iterator.moveNext();
} catch (e, s) {
controller
..addError(e, s)
..close();
return;
}
if (hasNext) {
currentSubscription = iterator.current.listen(controller.add,
onError: controller.addError,
onDone: nextStream,
cancelOnError: cancelOnError);
} else {
controller.close();
}
}
controller = new StreamController(onPause: () {
if (currentSubscription != null) currentSubscription.pause();
}, onResume: () {
if (currentSubscription != null) currentSubscription.resume();
}, onCancel: () {
if (currentSubscription != null) return currentSubscription.cancel();
});
nextStream();
return controller.stream.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}
}