blob: 83f684797915784ac2d64fa54d585e677b50a2da [file] [log] [blame]
// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
/// Copied from package:stream_transform.
/// Starts emitting values from [next] after the original stream is complete.
/// If the initial stream never finishes, the [next] stream will never be
/// listened to.
/// If a single-subscription follows the a broadcast stream it may be listened
/// to and never canceled.
/// If a broadcast stream follows any other stream it will miss any events which
/// occur before the first stream is done. If a broadcast stream follows a
/// single-subscription stream, pausing the stream while it is listening to the
/// second stream will cause events to be dropped rather than buffered.
StreamTransformer<T, T> followedBy<T>(Stream<T> next) => _FollowedBy<T>(next);
class _FollowedBy<T> extends StreamTransformerBase<T, T> {
final Stream<T> _next;
Stream<T> bind(Stream<T> first) {
var controller = first.isBroadcast
? StreamController<T>.broadcast(sync: true)
: StreamController<T>(sync: true);
var next = first.isBroadcast && !_next.isBroadcast
? _next.asBroadcastStream()
: _next;
StreamSubscription<T>? subscription;
var currentStream = first;
var firstDone = false;
var secondDone = false;
late Function currentDoneHandler;
listen() {
subscription = currentStream.listen(controller.add,
onError: controller.addError, onDone: () => currentDoneHandler());
onSecondDone() {
secondDone = true;
onFirstDone() {
firstDone = true;
currentStream = next;
currentDoneHandler = onSecondDone;
currentDoneHandler = onFirstDone;
controller.onListen = () {
assert(subscription == null);
final sub = subscription!;
if (!first.isBroadcast) {
..onPause = () {
if (!firstDone || !next.isBroadcast) return sub.pause();
subscription = null;
..onResume = () {
if (!firstDone || !next.isBroadcast) return sub.resume();
controller.onCancel = () {
if (secondDone) return null;
var toCancel = subscription!;
subscription = null;
return toCancel.cancel();
StreamTransformer<T, T> startWithMany<T>(Iterable<T> initial) =>
StreamTransformer<T, T> startWithStream<T>(Stream<T> initial) =>
StreamTransformer.fromBind((values) {
if (values.isBroadcast && !initial.isBroadcast) {
initial = initial.asBroadcastStream();
return initial.transform(followedBy(values));