blob: 422de0e23908c1eaf9f9e617712ad35c0cb8281a [file] [log] [blame]
// Copyright (c) 2021, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'package:async/async.dart';
/// Returns a single-subscription stream that emits the results of [operations]
/// in the order they complete.
/// If the subscription is canceled, any pending operations are canceled as
/// well.
Stream<T> inCompletionOrder<T>(Iterable<CancelableOperation<T>> operations) {
var operationSet = operations.toSet();
var controller = StreamController<T>(
sync: true,
onCancel: () =>
Future.wait( => operation.cancel())));
for (var operation in operationSet) {
.then((value) => controller.add(value))
.whenComplete(() {
if (operationSet.isEmpty) controller.close();
void unawaited(Future<void> f) {}