blob: d2e53a6a1bdb685ad9c094f9502a97aa52cc9107 [file] [log] [blame]
// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
/// Emits values from the source stream and [other] in any order as they arrive.
///
/// If the source stream is a broadcast stream, the result stream will be as
/// well, regardless of the type of stream [other] is.
StreamTransformer<T, T> merge<T>(Stream<T> other) => new _Merge<T>([other]);
/// Emits values from the source stream and all streams in [others] in any order
/// as they arrive.
///
/// If the source stream is a broadcast stream, the result stream will be as
/// well, regardless of the types of streams in [others].
StreamTransformer<T, T> mergeAll<T>(List<Stream<T>> others) =>
new _Merge<T>(others);
class _Merge<T> implements StreamTransformer<T, T> {
final List<Stream<T>> _others;
_Merge(this._others);
@override
Stream<T> bind(Stream<T> first) {
StreamController<T> controller;
if (first.isBroadcast) {
controller = new StreamController<T>.broadcast();
} else {
controller = new StreamController<T>();
}
List<StreamSubscription> subscriptions;
List<Stream<T>> allStreams = [first]..addAll(_others);
var activeStreamCount = 0;
controller.onListen = () {
if (subscriptions != null) return;
subscriptions = allStreams.map((stream) {
activeStreamCount++;
return stream.listen(controller.add, onError: controller.addError,
onDone: () {
if (--activeStreamCount <= 0) controller.close();
});
}).toList();
};
// Forward methods from listener
if (!first.isBroadcast) {
controller.onPause = () {
for (var subscription in subscriptions) {
subscription.pause();
}
};
controller.onResume = () {
for (var subscription in subscriptions) {
subscription.resume();
}
};
controller.onCancel =
() => Future.wait(subscriptions.map((s) => s.cancel()));
} else {
controller.onCancel = () {
if (controller?.hasListener ?? false) return new Future.value(null);
return Future.wait(subscriptions.map((s) => s.cancel()));
};
}
return controller.stream;
}
}