blob: 6a4fc329f17d61d6b11c4e4f270d5cc26fe71067 [file] [log] [blame]
// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'from_handlers.dart';
/// Like [Stream.where] but allows the [test] to return a [Future].
StreamTransformer<T, T> asyncWhere<T>(FutureOr<bool> test(T element)) {
var valuesWaiting = 0;
var sourceDone = false;
return fromHandlers(handleData: (element, sink) {
valuesWaiting++;
() async {
if (await test(element)) sink.add(element);
valuesWaiting--;
if (valuesWaiting <= 0 && sourceDone) sink.close();
}();
}, handleDone: (sink) {
sourceDone = true;
if (valuesWaiting <= 0) sink.close();
});
}