Implement StreamTransformer.fromBind

Closes #32021

Change-Id: I9b8c680ace8b3d7a7bb479ffe19fa0e01fc2cf2f
Reviewed-on: https://dart-review.googlesource.com/71923
Commit-Queue: Nate Bosch <nbosch@google.com>
Reviewed-by: Lasse R.H. Nielsen <lrn@google.com>
diff --git a/CHANGELOG.md b/CHANGELOG.md
index d79cfa2..4c65b1c 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -27,6 +27,10 @@
 *   Fix a bug where calling `stream.take(0).drain(value)` would not correctly
     forward the `value` through the returned `Future`.
 
+#### `dart:async`
+
+*   Add a `StreamTransformer.fromBind` constructor.
+
 ## 2.1.0-dev.3.0
 
 ### Core library changes
diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart
index a0efcb7..da9c337 100644
--- a/sdk/lib/async/stream.dart
+++ b/sdk/lib/async/stream.dart
@@ -2007,6 +2007,21 @@
       void handleDone(EventSink<T> sink)}) = _StreamHandlerTransformer<S, T>;
 
   /**
+   * Creates a [StreamTransformer] based on a [bind] callback.
+   *
+   * The returned stream transformer uses the [bind] argument to implement the
+   * [StreamTransformer.bind] API and can be used when the transformation is
+   * available as a stream-to-stream function.
+   *
+   * ```dart
+   * final splitDecoded = StreamTransformer<List<int>, String>.fromBind(
+   *     (stream) => stream.transform(utf8.decoder).transform(LineSplitter()));
+   * ```
+   */
+  factory StreamTransformer.fromBind(Stream<T> Function(Stream<S>) bind) =
+      _StreamBindTransformer<S, T>;
+
+  /**
    * Adapts [source] to be a `StreamTransfomer<TS, TT>`.
    *
    * This allows [source] to be used at the new type, but at run-time it
diff --git a/sdk/lib/async/stream_transformers.dart b/sdk/lib/async/stream_transformers.dart
index 37bbc02..5488d55 100644
--- a/sdk/lib/async/stream_transformers.dart
+++ b/sdk/lib/async/stream_transformers.dart
@@ -282,6 +282,16 @@
   }
 }
 
+/**
+ * A StreamTransformer that overrides [StreamTransformer.bind] with a callback.
+ */
+class _StreamBindTransformer<S, T> extends StreamTransformerBase<S, T> {
+  final Stream<T> Function(Stream<S>) _bind;
+  _StreamBindTransformer(this._bind);
+
+  Stream<T> bind(Stream<S> stream) => _bind(stream);
+}
+
 /// A closure mapping a stream and cancelOnError to a StreamSubscription.
 typedef StreamSubscription<T> _SubscriptionTransformer<S, T>(
     Stream<S> stream, bool cancelOnError);
diff --git a/tests/lib_2/async/stream_transformer_from_bind_test.dart b/tests/lib_2/async/stream_transformer_from_bind_test.dart
new file mode 100644
index 0000000..fcc63a1
--- /dev/null
+++ b/tests/lib_2/async/stream_transformer_from_bind_test.dart
@@ -0,0 +1,25 @@
+// Copyright (c) 2018, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import "package:async_helper/async_helper.dart";
+import 'package:expect/expect.dart';
+
+import 'event_helper.dart';
+
+void main() {
+  asyncStart();
+  var transformer =
+      new StreamTransformer<int, String>.fromBind((s) => s.map((v) => '$v'));
+  var controller = new StreamController<int>(sync: true);
+  Events expected = new Events.fromIterable(['1', '2']);
+  Events input = new Events.fromIterable([1, 2]);
+  Events actual = new Events.capture(controller.stream.transform(transformer));
+  actual.onDone(() {
+    Expect.listEquals(expected.events, actual.events);
+    asyncEnd();
+  });
+  input.replay(controller);
+}