blob: 0330dd761a4cc358004444d65634df4004f79893 [file] [log] [blame]
// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
/// Utilities to append or prepend to a stream.
extension Concatenate<T> on Stream<T> {
/// Emits all values and errors from [next] following all values and errors
/// from this stream.
///
/// If this stream never finishes, the [next] stream will never get a
/// listener.
///
/// If this stream is a broadcast stream, the result will be as well.
/// If a single-subscription follows a broadcast stream it may be listened
/// to and never canceled since there may be broadcast listeners added later.
///
/// If a broadcast stream follows any other stream it will miss any events or
/// errors which occur before this stream is done.
/// If a broadcast stream follows a single-subscription stream, pausing the
/// stream while it is listening to the second stream will cause events to be
/// dropped rather than buffered.
Stream<T> followedBy(Stream<T> next) {
var controller = isBroadcast
? StreamController<T>.broadcast(sync: true)
: StreamController<T>(sync: true);
next = isBroadcast && !next.isBroadcast ? next.asBroadcastStream() : next;
StreamSubscription<T>? subscription;
var currentStream = this;
var thisDone = false;
var secondDone = false;
late void Function() currentDoneHandler;
void listen() {
subscription = currentStream.listen(controller.add,
onError: controller.addError, onDone: () => currentDoneHandler());
}
void onSecondDone() {
secondDone = true;
controller.close();
}
void onThisDone() {
thisDone = true;
currentStream = next;
currentDoneHandler = onSecondDone;
listen();
}
currentDoneHandler = onThisDone;
controller.onListen = () {
assert(subscription == null);
listen();
if (!isBroadcast) {
controller
..onPause = () {
if (!thisDone || !next.isBroadcast) return subscription!.pause();
subscription!.cancel();
subscription = null;
}
..onResume = () {
if (!thisDone || !next.isBroadcast) return subscription!.resume();
listen();
};
}
controller.onCancel = () {
if (secondDone) return null;
var toCancel = subscription!;
subscription = null;
return toCancel.cancel();
};
};
return controller.stream;
}
/// Emits [initial] before any values or errors from the this stream.
///
/// If this stream is a broadcast stream the result will be as well.
/// If this stream is a broadcast stream, the returned stream will only
/// contain events of this stream that are emitted after the [initial] value
/// has been emitted on the returned stream.
Stream<T> startWith(T initial) =>
startWithStream(Future.value(initial).asStream());
/// Emits all values in [initial] before any values or errors from this
/// stream.
///
/// If this stream is a broadcast stream the result will be as well.
/// If this stream is a broadcast stream it will miss any events which
/// occur before the initial values are all emitted.
Stream<T> startWithMany(Iterable<T> initial) =>
startWithStream(Stream.fromIterable(initial));
/// Emits all values and errors in [initial] before any values or errors from
/// this stream.
///
/// If this stream is a broadcast stream the result will be as well.
/// If this stream is a broadcast stream it will miss any events which occur
/// before [initial] closes.
Stream<T> startWithStream(Stream<T> initial) {
if (isBroadcast && !initial.isBroadcast) {
initial = initial.asBroadcastStream();
}
return initial.followedBy(this);
}
}