Add `asyncWhere` (dart-lang/stream_transform#13)
Relatively few tests since most of the complex stuff is handled by
StreamTransformer.fromHandlers
diff --git a/pkgs/stream_transform/CHANGELOG.md b/pkgs/stream_transform/CHANGELOG.md
index 6cee6e0..6f632db 100644
--- a/pkgs/stream_transform/CHANGELOG.md
+++ b/pkgs/stream_transform/CHANGELOG.md
@@ -1,6 +1,7 @@
## 0.0.5
- Bug Fix: Allow compiling switchLatest with Dart2Js.
+- Add `asyncWhere`: Like `where` but allows an asynchronous predicate.
## 0.0.4
- Add `scan`: fold which returns intermediate values
diff --git a/pkgs/stream_transform/lib/src/async_where.dart b/pkgs/stream_transform/lib/src/async_where.dart
new file mode 100644
index 0000000..1bb2d64
--- /dev/null
+++ b/pkgs/stream_transform/lib/src/async_where.dart
@@ -0,0 +1,21 @@
+// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+import 'dart:async';
+
+/// Like [Stream.where] but allows the [test] to return a [Future].
+StreamTransformer<T, T> asyncWhere<T>(FutureOr<bool> test(T element)) {
+ var valuesWaiting = 0;
+ var sourceDone = false;
+ return new StreamTransformer<T, T>.fromHandlers(handleData: (element, sink) {
+ valuesWaiting++;
+ () async {
+ if (await test(element)) sink.add(element);
+ valuesWaiting--;
+ if (valuesWaiting <= 0 && sourceDone) sink.close();
+ }();
+ }, handleDone: (sink) {
+ sourceDone = true;
+ if (valuesWaiting <= 0) sink.close();
+ });
+}
diff --git a/pkgs/stream_transform/lib/stream_transform.dart b/pkgs/stream_transform/lib/stream_transform.dart
index ba49b0b..8b4d601 100644
--- a/pkgs/stream_transform/lib/stream_transform.dart
+++ b/pkgs/stream_transform/lib/stream_transform.dart
@@ -2,12 +2,13 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
+export 'src/async_where.dart';
+export 'src/audit.dart';
export 'src/buffer.dart';
export 'src/concat.dart';
export 'src/debounce.dart';
export 'src/merge.dart';
+export 'src/scan.dart';
export 'src/switch.dart';
export 'src/tap.dart';
-export 'src/scan.dart';
export 'src/throttle.dart';
-export 'src/audit.dart';
diff --git a/pkgs/stream_transform/pubspec.yaml b/pkgs/stream_transform/pubspec.yaml
index 6da6df8..e826730 100644
--- a/pkgs/stream_transform/pubspec.yaml
+++ b/pkgs/stream_transform/pubspec.yaml
@@ -2,7 +2,7 @@
description: A collection of utilities to transform and manipulate streams.
author: Dart Team <misc@dartlang.org>
homepage: https://www.github.com/dart-lang/stream_transform
-version: 0.0.5-dev
+version: 0.0.5
environment:
sdk: ">=1.22.0 <2.0.0"
diff --git a/pkgs/stream_transform/test/async_where_test.dart b/pkgs/stream_transform/test/async_where_test.dart
new file mode 100644
index 0000000..919f931
--- /dev/null
+++ b/pkgs/stream_transform/test/async_where_test.dart
@@ -0,0 +1,37 @@
+// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+import 'dart:async';
+
+import 'package:test/test.dart';
+
+import 'package:stream_transform/stream_transform.dart';
+
+void main() {
+ test('forwards only events that pass the predicate', () async {
+ var values = new Stream.fromIterable([1, 2, 3, 4]);
+ var filtered = values.transform(asyncWhere((e) async => e > 2));
+ expect(await filtered.toList(), [3, 4]);
+ });
+
+ test('allows predicates that go through event loop', () async {
+ var values = new Stream.fromIterable([1, 2, 3, 4]);
+ var filtered = values.transform(asyncWhere((e) async {
+ await new Future(() {});
+ return e > 2;
+ }));
+ expect(await filtered.toList(), [3, 4]);
+ });
+
+ test('allows synchronous predicate', () async {
+ var values = new Stream.fromIterable([1, 2, 3, 4]);
+ var filtered = values.transform(asyncWhere((e) => e > 2));
+ expect(await filtered.toList(), [3, 4]);
+ });
+
+ test('can result in empty stream', () async {
+ var values = new Stream.fromIterable([1, 2, 3, 4]);
+ var filtered = values.transform(asyncWhere((e) => e > 4));
+ expect(await filtered.isEmpty, true);
+ });
+}