Implement StreamTransformer.fromBind
Closes #32021
Change-Id: I9b8c680ace8b3d7a7bb479ffe19fa0e01fc2cf2f
Reviewed-on: https://dart-review.googlesource.com/71923
Commit-Queue: Nate Bosch <nbosch@google.com>
Reviewed-by: Lasse R.H. Nielsen <lrn@google.com>
diff --git a/CHANGELOG.md b/CHANGELOG.md
index d79cfa2..4c65b1c 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -27,6 +27,10 @@
* Fix a bug where calling `stream.take(0).drain(value)` would not correctly
forward the `value` through the returned `Future`.
+#### `dart:async`
+
+* Add a `StreamTransformer.fromBind` constructor.
+
## 2.1.0-dev.3.0
### Core library changes
diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart
index a0efcb7..da9c337 100644
--- a/sdk/lib/async/stream.dart
+++ b/sdk/lib/async/stream.dart
@@ -2007,6 +2007,21 @@
void handleDone(EventSink<T> sink)}) = _StreamHandlerTransformer<S, T>;
/**
+ * Creates a [StreamTransformer] based on a [bind] callback.
+ *
+ * The returned stream transformer uses the [bind] argument to implement the
+ * [StreamTransformer.bind] API and can be used when the transformation is
+ * available as a stream-to-stream function.
+ *
+ * ```dart
+ * final splitDecoded = StreamTransformer<List<int>, String>.fromBind(
+ * (stream) => stream.transform(utf8.decoder).transform(LineSplitter()));
+ * ```
+ */
+ factory StreamTransformer.fromBind(Stream<T> Function(Stream<S>) bind) =
+ _StreamBindTransformer<S, T>;
+
+ /**
* Adapts [source] to be a `StreamTransfomer<TS, TT>`.
*
* This allows [source] to be used at the new type, but at run-time it
diff --git a/sdk/lib/async/stream_transformers.dart b/sdk/lib/async/stream_transformers.dart
index 37bbc02..5488d55 100644
--- a/sdk/lib/async/stream_transformers.dart
+++ b/sdk/lib/async/stream_transformers.dart
@@ -282,6 +282,16 @@
}
}
+/**
+ * A StreamTransformer that overrides [StreamTransformer.bind] with a callback.
+ */
+class _StreamBindTransformer<S, T> extends StreamTransformerBase<S, T> {
+ final Stream<T> Function(Stream<S>) _bind;
+ _StreamBindTransformer(this._bind);
+
+ Stream<T> bind(Stream<S> stream) => _bind(stream);
+}
+
/// A closure mapping a stream and cancelOnError to a StreamSubscription.
typedef StreamSubscription<T> _SubscriptionTransformer<S, T>(
Stream<S> stream, bool cancelOnError);
diff --git a/tests/lib_2/async/stream_transformer_from_bind_test.dart b/tests/lib_2/async/stream_transformer_from_bind_test.dart
new file mode 100644
index 0000000..fcc63a1
--- /dev/null
+++ b/tests/lib_2/async/stream_transformer_from_bind_test.dart
@@ -0,0 +1,25 @@
+// Copyright (c) 2018, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import "package:async_helper/async_helper.dart";
+import 'package:expect/expect.dart';
+
+import 'event_helper.dart';
+
+void main() {
+ asyncStart();
+ var transformer =
+ new StreamTransformer<int, String>.fromBind((s) => s.map((v) => '$v'));
+ var controller = new StreamController<int>(sync: true);
+ Events expected = new Events.fromIterable(['1', '2']);
+ Events input = new Events.fromIterable([1, 2]);
+ Events actual = new Events.capture(controller.stream.transform(transformer));
+ actual.onDone(() {
+ Expect.listEquals(expected.events, actual.events);
+ asyncEnd();
+ });
+ input.replay(controller);
+}