Add `asyncWhere` (dart-lang/stream_transform#13) Relatively few tests since most of the complex stuff is handled by StreamTransformer.fromHandlers
diff --git a/pkgs/stream_transform/CHANGELOG.md b/pkgs/stream_transform/CHANGELOG.md index 6cee6e0..6f632db 100644 --- a/pkgs/stream_transform/CHANGELOG.md +++ b/pkgs/stream_transform/CHANGELOG.md
@@ -1,6 +1,7 @@ ## 0.0.5 - Bug Fix: Allow compiling switchLatest with Dart2Js. +- Add `asyncWhere`: Like `where` but allows an asynchronous predicate. ## 0.0.4 - Add `scan`: fold which returns intermediate values
diff --git a/pkgs/stream_transform/lib/src/async_where.dart b/pkgs/stream_transform/lib/src/async_where.dart new file mode 100644 index 0000000..1bb2d64 --- /dev/null +++ b/pkgs/stream_transform/lib/src/async_where.dart
@@ -0,0 +1,21 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. +import 'dart:async'; + +/// Like [Stream.where] but allows the [test] to return a [Future]. +StreamTransformer<T, T> asyncWhere<T>(FutureOr<bool> test(T element)) { + var valuesWaiting = 0; + var sourceDone = false; + return new StreamTransformer<T, T>.fromHandlers(handleData: (element, sink) { + valuesWaiting++; + () async { + if (await test(element)) sink.add(element); + valuesWaiting--; + if (valuesWaiting <= 0 && sourceDone) sink.close(); + }(); + }, handleDone: (sink) { + sourceDone = true; + if (valuesWaiting <= 0) sink.close(); + }); +}
diff --git a/pkgs/stream_transform/lib/stream_transform.dart b/pkgs/stream_transform/lib/stream_transform.dart index ba49b0b..8b4d601 100644 --- a/pkgs/stream_transform/lib/stream_transform.dart +++ b/pkgs/stream_transform/lib/stream_transform.dart
@@ -2,12 +2,13 @@ // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. +export 'src/async_where.dart'; +export 'src/audit.dart'; export 'src/buffer.dart'; export 'src/concat.dart'; export 'src/debounce.dart'; export 'src/merge.dart'; +export 'src/scan.dart'; export 'src/switch.dart'; export 'src/tap.dart'; -export 'src/scan.dart'; export 'src/throttle.dart'; -export 'src/audit.dart';
diff --git a/pkgs/stream_transform/pubspec.yaml b/pkgs/stream_transform/pubspec.yaml index 6da6df8..e826730 100644 --- a/pkgs/stream_transform/pubspec.yaml +++ b/pkgs/stream_transform/pubspec.yaml
@@ -2,7 +2,7 @@ description: A collection of utilities to transform and manipulate streams. author: Dart Team <misc@dartlang.org> homepage: https://www.github.com/dart-lang/stream_transform -version: 0.0.5-dev +version: 0.0.5 environment: sdk: ">=1.22.0 <2.0.0"
diff --git a/pkgs/stream_transform/test/async_where_test.dart b/pkgs/stream_transform/test/async_where_test.dart new file mode 100644 index 0000000..919f931 --- /dev/null +++ b/pkgs/stream_transform/test/async_where_test.dart
@@ -0,0 +1,37 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. +import 'dart:async'; + +import 'package:test/test.dart'; + +import 'package:stream_transform/stream_transform.dart'; + +void main() { + test('forwards only events that pass the predicate', () async { + var values = new Stream.fromIterable([1, 2, 3, 4]); + var filtered = values.transform(asyncWhere((e) async => e > 2)); + expect(await filtered.toList(), [3, 4]); + }); + + test('allows predicates that go through event loop', () async { + var values = new Stream.fromIterable([1, 2, 3, 4]); + var filtered = values.transform(asyncWhere((e) async { + await new Future(() {}); + return e > 2; + })); + expect(await filtered.toList(), [3, 4]); + }); + + test('allows synchronous predicate', () async { + var values = new Stream.fromIterable([1, 2, 3, 4]); + var filtered = values.transform(asyncWhere((e) => e > 2)); + expect(await filtered.toList(), [3, 4]); + }); + + test('can result in empty stream', () async { + var values = new Stream.fromIterable([1, 2, 3, 4]); + var filtered = values.transform(asyncWhere((e) => e > 4)); + expect(await filtered.isEmpty, true); + }); +}