Merge asyncWhere and whereType to same extension (dart-lang/stream_transform#85)
diff --git a/pkgs/stream_transform/lib/src/async_where.dart b/pkgs/stream_transform/lib/src/async_where.dart deleted file mode 100644 index f2fe108..0000000 --- a/pkgs/stream_transform/lib/src/async_where.dart +++ /dev/null
@@ -1,80 +0,0 @@ -// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file -// for details. All rights reserved. Use of this source code is governed by a -// BSD-style license that can be found in the LICENSE file. -import 'dart:async'; - -import 'from_handlers.dart'; - -/// An asynchronous [where]. -extension AsyncWhere<T> on Stream<T> { - /// Like [where] but allows the [test] to return a [Future]. - /// - /// Events on the result stream will be emitted in the order that [test] - /// completes which may not match the order of the original stream. - /// - /// If the source stream is a broadcast stream the result will be as well. When - /// used with a broadcast stream behavior also differs from [Stream.where] in - /// that the [test] function is only called once per event, rather than once - /// per listener per event. - /// - /// Errors from the source stream are forwarded directly to the result stream. - /// Errors from [test] are also forwarded to the result stream. - /// - /// The result stream will not close until the source stream closes and all - /// pending [test] calls have finished. - Stream<T> asyncWhere(FutureOr<bool> test(T element)) { - var valuesWaiting = 0; - var sourceDone = false; - return transform(fromHandlers(handleData: (element, sink) { - valuesWaiting++; - () async { - try { - if (await test(element)) sink.add(element); - } catch (e, st) { - sink.addError(e, st); - } - valuesWaiting--; - if (valuesWaiting <= 0 && sourceDone) sink.close(); - }(); - }, handleDone: (sink) { - sourceDone = true; - if (valuesWaiting <= 0) sink.close(); - })); - } -} - -/// Like [Stream.where] but allows the [test] to return a [Future]. -/// -/// Events on the result stream will be emitted in the order that [test] -/// completes which may not match the order of the original stream. -/// -/// If the source stream is a broadcast stream the result will be as well. When -/// used with a broadcast stream behavior also differs from [Stream.where] in -/// that the [test] function is only called once per event, rather than once -/// per listener per event. -/// -/// Errors from the source stream are forwarded directly to the result stream. -/// Errors during the conversion are also forwarded to the result stream. -/// -/// The result stream will not close until the source stream closes and all -/// pending [test] calls have finished. -@Deprecated('Use the extension instead') -StreamTransformer<T, T> asyncWhere<T>(FutureOr<bool> test(T element)) { - var valuesWaiting = 0; - var sourceDone = false; - return fromHandlers(handleData: (element, sink) { - valuesWaiting++; - () async { - try { - if (await test(element)) sink.add(element); - } catch (e, st) { - sink.addError(e, st); - } - valuesWaiting--; - if (valuesWaiting <= 0 && sourceDone) sink.close(); - }(); - }, handleDone: (sink) { - sourceDone = true; - if (valuesWaiting <= 0) sink.close(); - }); -}
diff --git a/pkgs/stream_transform/lib/src/where.dart b/pkgs/stream_transform/lib/src/where.dart new file mode 100644 index 0000000..33c0f7e --- /dev/null +++ b/pkgs/stream_transform/lib/src/where.dart
@@ -0,0 +1,143 @@ +// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'from_handlers.dart'; + +/// Utilities to filter events. +extension Where<T> on Stream<T> { + /// Returns a stream which emits only the events which have type [S]. + /// + /// If the source stream is a broadcast stream the result will be as well. + /// + /// Errors from the source stream are forwarded directly to the result stream. + /// + /// [S] should be a subtype of the stream's generic type, otherwise nothing of + /// type [S] could possibly be emitted, however there is no static or runtime + /// checking that this is the case. + Stream<S> whereType<S>() => transform(_WhereType<S>()); + + /// Like [where] but allows the [test] to return a [Future]. + /// + /// Events on the result stream will be emitted in the order that [test] + /// completes which may not match the order of the original stream. + /// + /// If the source stream is a broadcast stream the result will be as well. When + /// used with a broadcast stream behavior also differs from [Stream.where] in + /// that the [test] function is only called once per event, rather than once + /// per listener per event. + /// + /// Errors from the source stream are forwarded directly to the result stream. + /// Errors from [test] are also forwarded to the result stream. + /// + /// The result stream will not close until the source stream closes and all + /// pending [test] calls have finished. + Stream<T> asyncWhere(FutureOr<bool> test(T element)) { + var valuesWaiting = 0; + var sourceDone = false; + return transform(fromHandlers(handleData: (element, sink) { + valuesWaiting++; + () async { + try { + if (await test(element)) sink.add(element); + } catch (e, st) { + sink.addError(e, st); + } + valuesWaiting--; + if (valuesWaiting <= 0 && sourceDone) sink.close(); + }(); + }, handleDone: (sink) { + sourceDone = true; + if (valuesWaiting <= 0) sink.close(); + })); + } +} + +/// Emits only the events which have type [R]. +/// +/// If the source stream is a broadcast stream the result will be as well. +/// +/// Errors from the source stream are forwarded directly to the result stream. +/// +/// The static type of the returned transformer takes `Null` so that it can +/// satisfy the subtype requirements for the `stream.transform()` argument on +/// any source stream. The argument to `bind` has been broadened to take +/// `Stream<Object>` since it will never be passed a `Stream<Null>` at runtime. +/// This is safe to use on any source stream. +/// +/// [R] should be a subtype of the stream's generic type, otherwise nothing of +/// type [R] could possibly be emitted, however there is no static or runtime +/// checking that this is the case. +@Deprecated('Use the extension instead') +StreamTransformer<Null, R> whereType<R>() => _WhereType<R>(); + +class _WhereType<R> extends StreamTransformerBase<Null, R> { + @override + Stream<R> bind(Stream<Object> source) { + var controller = source.isBroadcast + ? StreamController<R>.broadcast(sync: true) + : StreamController<R>(sync: true); + + StreamSubscription<Object> subscription; + controller.onListen = () { + assert(subscription == null); + subscription = source.listen( + (value) { + if (value is R) controller.add(value); + }, + onError: controller.addError, + onDone: () { + subscription = null; + controller.close(); + }); + if (!source.isBroadcast) { + controller + ..onPause = subscription.pause + ..onResume = subscription.resume; + } + controller.onCancel = () { + subscription?.cancel(); + subscription = null; + }; + }; + return controller.stream; + } +} + +/// Like [Stream.where] but allows the [test] to return a [Future]. +/// +/// Events on the result stream will be emitted in the order that [test] +/// completes which may not match the order of the original stream. +/// +/// If the source stream is a broadcast stream the result will be as well. When +/// used with a broadcast stream behavior also differs from [Stream.where] in +/// that the [test] function is only called once per event, rather than once +/// per listener per event. +/// +/// Errors from the source stream are forwarded directly to the result stream. +/// Errors during the conversion are also forwarded to the result stream. +/// +/// The result stream will not close until the source stream closes and all +/// pending [test] calls have finished. +@Deprecated('Use the extension instead') +StreamTransformer<T, T> asyncWhere<T>(FutureOr<bool> test(T element)) { + var valuesWaiting = 0; + var sourceDone = false; + return fromHandlers(handleData: (element, sink) { + valuesWaiting++; + () async { + try { + if (await test(element)) sink.add(element); + } catch (e, st) { + sink.addError(e, st); + } + valuesWaiting--; + if (valuesWaiting <= 0 && sourceDone) sink.close(); + }(); + }, handleDone: (sink) { + sourceDone = true; + if (valuesWaiting <= 0) sink.close(); + }); +}
diff --git a/pkgs/stream_transform/lib/src/where_type.dart b/pkgs/stream_transform/lib/src/where_type.dart deleted file mode 100644 index bfd82be..0000000 --- a/pkgs/stream_transform/lib/src/where_type.dart +++ /dev/null
@@ -1,70 +0,0 @@ -// Copyright (c) 2018, the Dart project authors. Please see the AUTHORS file -// for details. All rights reserved. Use of this source code is governed by a -// BSD-style license that can be found in the LICENSE file. - -import 'dart:async'; - -/// A utility to filter events by type. -extension WhereType<T> on Stream<T> { - /// Returns a stream which emits only the events which have type [S]. - /// - /// If the source stream is a broadcast stream the result will be as well. - /// - /// Errors from the source stream are forwarded directly to the result stream. - /// - /// [S] should be a subtype of the stream's generic type, otherwise nothing of - /// type [S] could possibly be emitted, however there is no static or runtime - /// checking that this is the case. - Stream<S> whereType<S>() => transform(_WhereType<S>()); -} - -/// Emits only the events which have type [R]. -/// -/// If the source stream is a broadcast stream the result will be as well. -/// -/// Errors from the source stream are forwarded directly to the result stream. -/// -/// The static type of the returned transformer takes `Null` so that it can -/// satisfy the subtype requirements for the `stream.transform()` argument on -/// any source stream. The argument to `bind` has been broadened to take -/// `Stream<Object>` since it will never be passed a `Stream<Null>` at runtime. -/// This is safe to use on any source stream. -/// -/// [R] should be a subtype of the stream's generic type, otherwise nothing of -/// type [R] could possibly be emitted, however there is no static or runtime -/// checking that this is the case. -@Deprecated('Use the extension instead') -StreamTransformer<Null, R> whereType<R>() => _WhereType<R>(); - -class _WhereType<R> extends StreamTransformerBase<Null, R> { - @override - Stream<R> bind(Stream<Object> source) { - var controller = source.isBroadcast - ? StreamController<R>.broadcast(sync: true) - : StreamController<R>(sync: true); - - StreamSubscription<Object> subscription; - controller.onListen = () { - assert(subscription == null); - subscription = source.listen( - (value) { - if (value is R) controller.add(value); - }, - onError: controller.addError, - onDone: () { - subscription = null; - controller.close(); - }); - if (!source.isBroadcast) { - controller - ..onPause = subscription.pause - ..onResume = subscription.resume; - } - controller.onCancel = () { - subscription?.cancel(); - subscription = null; - }; - }; - return controller.stream; - } -}
diff --git a/pkgs/stream_transform/lib/stream_transform.dart b/pkgs/stream_transform/lib/stream_transform.dart index 4f7ead8..2f7bf24 100644 --- a/pkgs/stream_transform/lib/stream_transform.dart +++ b/pkgs/stream_transform/lib/stream_transform.dart
@@ -3,7 +3,6 @@ // BSD-style license that can be found in the LICENSE file. export 'src/async_map.dart'; -export 'src/async_where.dart'; export 'src/chain_transformers.dart'; export 'src/combine_latest.dart'; export 'src/concatenate.dart'; @@ -14,4 +13,4 @@ export 'src/switch.dart'; export 'src/take_until.dart'; export 'src/tap.dart'; -export 'src/where_type.dart'; +export 'src/where.dart';