Add a StreamSinkTransformer class.

This makes it easier to programmatically manipulate sinks. In
particular, StreamSinkTransformer.fromStreamTransformer makes it easy to
use existing encoders and codecs when emitting data.

R=lrn@google.com

Review URL: https://codereview.chromium.org//1566603002 .
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 6db6f36..8cb7880 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -3,6 +3,10 @@
 - Added `CancelableOperation.valueOrCancellation()`, which allows users to be
   notified when an operation is canceled elsewhere.
 
+- Added `StreamSinkTransformer` which transforms events before they're passed to
+  a `StreamSink`, similarly to how `StreamTransformer` transforms events after
+  they're emitted by a stream.
+
 ## 1.5.0
 
 - Added `LazyStream`, which forwards to the return value of a callback that's
diff --git a/lib/async.dart b/lib/async.dart
index 1d127fd..8f14bac 100644
--- a/lib/async.dart
+++ b/lib/async.dart
@@ -20,6 +20,7 @@
 export "src/stream_completer.dart";
 export "src/stream_group.dart";
 export "src/stream_queue.dart";
+export "src/stream_sink_transformer.dart";
 export "src/stream_splitter.dart";
 export "src/subscription_stream.dart";
 export "stream_zip.dart";
diff --git a/lib/src/stream_sink_transformer.dart b/lib/src/stream_sink_transformer.dart
new file mode 100644
index 0000000..a8410ab
--- /dev/null
+++ b/lib/src/stream_sink_transformer.dart
@@ -0,0 +1,50 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library async.stream_sink_transformer;
+
+import 'dart:async';
+
+import 'stream_sink_transformer/handler_transformer.dart';
+import 'stream_sink_transformer/stream_transformer_wrapper.dart';
+
+/// A [StreamSinkTransformer] transforms the events being passed to a sink.
+///
+/// This works on the same principle as a [StreamTransformer]. Each transformer
+/// defines a [bind] method that takes in the original [StreamSink] and returns
+/// the transformed version. However, where a [StreamTransformer] transforms
+/// events after they leave the stream, this transforms them before they enter
+/// the sink.
+///
+/// Transformers must be able to have `bind` called used multiple times.
+abstract class StreamSinkTransformer<S, T> {
+  /// Creates a [StreamSinkTransformer] that transforms events and errors
+  /// using [transformer].
+  ///
+  /// This is equivalent to piping all events from the outer sink through a
+  /// stream transformed by [transformer] and from there into the inner sink.
+  const factory StreamSinkTransformer.fromStreamTransformer(
+          StreamTransformer<S, T> transformer) =
+      StreamTransformerWrapper<S, T>;
+
+  /// Creates a [StreamSinkTransformer] that delegates events to the given
+  /// handlers.
+  ///
+  /// The handlers work exactly as they do for [StreamTransformer.fromHandlers].
+  /// They're called for each incoming event, and any actions on the sink
+  /// they're passed are forwarded to the inner sink. If a handler is omitted,
+  /// the event is passed through unaltered.
+  factory StreamSinkTransformer.fromHandlers({
+      void handleData(S data, EventSink<T> sink),
+      void handleError(Object error, StackTrace stackTrace, EventSink<T> sink),
+      void handleDone(EventSink<T> sink)}) {
+    return new HandlerTransformer<S, T>(handleData, handleError, handleDone);
+  }
+
+  /// Transforms the events passed to [sink].
+  ///
+  /// Creates a new sink. When events are passed to the returned sink, it will
+  /// transform them and pass the transformed versions to [sink].
+  StreamSink<S> bind(StreamSink<T> sink);
+}
diff --git a/lib/src/stream_sink_transformer/handler_transformer.dart b/lib/src/stream_sink_transformer/handler_transformer.dart
new file mode 100644
index 0000000..e5d8e3c
--- /dev/null
+++ b/lib/src/stream_sink_transformer/handler_transformer.dart
@@ -0,0 +1,107 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library async.stream_sink_transformer.handler_transformer;
+
+import 'dart:async';
+
+import '../stream_sink_transformer.dart';
+import '../delegate/stream_sink.dart';
+
+/// The type of the callback for handling data events.
+typedef void HandleData<S, T>(S data, EventSink<T> sink);
+
+/// The type of the callback for handling error events.
+typedef void HandleError<T>(
+    Object error, StackTrace stackTrace, EventSink<T> sink);
+
+/// The type of the callback for handling done events.
+typedef void HandleDone<T>(EventSink<T> sink);
+
+/// A [StreamSinkTransformer] that delegates events to the given handlers.
+class HandlerTransformer<S, T> implements StreamSinkTransformer<S, T> {
+  /// The handler for data events.
+  final HandleData<S, T> _handleData;
+
+  /// The handler for error events.
+  final HandleError<T> _handleError;
+
+  /// The handler for done events.
+  final HandleDone<T> _handleDone;
+
+  HandlerTransformer(
+      this._handleData, this._handleError, this._handleDone);
+
+  StreamSink<S> bind(StreamSink<T> sink) => new _HandlerSink<S, T>(this, sink);
+}
+
+/// A sink created by [HandlerTransformer].
+class _HandlerSink<S, T> implements StreamSink<S> {
+  /// The transformer that created this sink.
+  final HandlerTransformer<S, T> _transformer;
+
+  /// The original sink that's being transformed.
+  final StreamSink<T> _inner;
+
+  /// The wrapper for [_inner] whose [StreamSink.close] method can't emit
+  /// errors.
+  final StreamSink<T> _safeCloseInner;
+
+  Future get done => _inner.done;
+
+  _HandlerSink(this._transformer, StreamSink<T> inner)
+      : _inner = inner,
+        _safeCloseInner = new _SafeCloseSink<T>(inner);
+
+  void add(S event) {
+    if (_transformer._handleData == null) {
+      // [event] is an S and [_inner.add] takes a T. This style of conversion
+      // will throw an error in checked mode if [_inner] is actually a
+      // [StreamSink<T>], but will work if [_inner] isn't reified and won't add
+      // an extra check in unchecked mode.
+      _inner.add(event as dynamic);
+    } else {
+      _transformer._handleData(event, _safeCloseInner);
+    }
+  }
+
+  void addError(error, [StackTrace stackTrace]) {
+    if (_transformer._handleError == null) {
+      _inner.addError(error, stackTrace);
+    } else {
+      _transformer._handleError(error, stackTrace, _safeCloseInner);
+    }
+  }
+
+  Future addStream(Stream<S> stream) {
+    return _inner.addStream(stream.transform(
+        new StreamTransformer<S, T>.fromHandlers(
+            handleData: _transformer._handleData,
+            handleError: _transformer._handleError,
+            handleDone: _closeSink)));
+  }
+
+  Future close() {
+    if (_transformer._handleDone == null) return _inner.close();
+
+    _transformer._handleDone(_safeCloseInner);
+    return _inner.done;
+  }
+}
+
+/// A wrapper for [StreamSink]s that swallows any errors returned by [close].
+///
+/// [HandlerTransformer] passes this to its handlers to ensure that when they
+/// call [close], they don't leave any dangling [Future]s behind that might emit
+/// unhandleable errors.
+class _SafeCloseSink<T> extends DelegatingStreamSink<T> {
+  _SafeCloseSink(StreamSink<T> inner) : super(inner);
+
+  Future close() => super.close().catchError((_) {});
+}
+
+/// A function to pass as a [StreamTransformer]'s `handleDone` callback.
+void _closeSink(EventSink sink) {
+  sink.close();
+}
diff --git a/lib/src/stream_sink_transformer/stream_transformer_wrapper.dart b/lib/src/stream_sink_transformer/stream_transformer_wrapper.dart
new file mode 100644
index 0000000..b53f208
--- /dev/null
+++ b/lib/src/stream_sink_transformer/stream_transformer_wrapper.dart
@@ -0,0 +1,62 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library async.stream_sink_transformer.stream_transformer_wrapper;
+
+import 'dart:async';
+
+import '../stream_sink_transformer.dart';
+
+/// A [StreamSinkTransformer] that wraps a pre-existing [StreamTransformer].
+class StreamTransformerWrapper<S, T> implements StreamSinkTransformer<S, T> {
+  /// The wrapped transformer.
+  final StreamTransformer<S, T> _transformer;
+
+  const StreamTransformerWrapper(this._transformer);
+
+  StreamSink<S> bind(StreamSink<T> sink) =>
+      new _StreamTransformerWrapperSink<S, T>(_transformer, sink);
+}
+
+/// A sink created by [StreamTransformerWrapper].
+class _StreamTransformerWrapperSink<S, T> implements StreamSink<S> {
+  /// The controller through which events are passed.
+  ///
+  /// This is used to create a stream that can be transformed by the wrapped
+  /// transformer.
+  final _controller = new StreamController<S>(sync: true);
+
+  /// The original sink that's being transformed.
+  final StreamSink<T> _inner;
+
+  Future get done => _inner.done;
+
+  _StreamTransformerWrapperSink(StreamTransformer<S, T> transformer,
+      this._inner) {
+    _controller.stream.transform(transformer).listen(
+        _inner.add,
+        onError: _inner.addError,
+        onDone: () {
+          // Ignore any errors that come from this call to [_inner.close]. The
+          // user can access them through [done] or the value returned from
+          // [this.close], and we don't want them to get top-leveled.
+          _inner.close().catchError((_) {});
+        });
+  }
+
+  void add(S event) {
+    _controller.add(event);
+  }
+
+  void addError(error, [StackTrace stackTrace]) {
+    _controller.addError(error, stackTrace);
+  }
+
+  Future addStream(Stream<S> stream) => _controller.addStream(stream);
+
+  Future close() {
+    _controller.close();
+    return _inner.done;
+  }
+}
diff --git a/pubspec.yaml b/pubspec.yaml
index 16e2ea2..e13f6ab 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
 name: async
-version: 1.6.0-dev
+version: 1.6.0
 author: Dart Team <misc@dartlang.org>
 description: Utility functions and classes related to the 'dart:async' library.
 homepage: https://www.github.com/dart-lang/async
diff --git a/test/stream_sink_transformer_test.dart b/test/stream_sink_transformer_test.dart
new file mode 100644
index 0000000..70e7e6c
--- /dev/null
+++ b/test/stream_sink_transformer_test.dart
@@ -0,0 +1,218 @@
+// Copyright (c) 2015, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE filevents.
+
+import "dart:async";
+
+import "package:async/async.dart";
+import "package:test/test.dart";
+
+import "utils.dart";
+
+void main() {
+  var controller;
+  setUp(() {
+    controller = new StreamController();
+  });
+
+  group("fromStreamTransformer", () {
+    test("transforms data events", () {
+      var transformer = new StreamSinkTransformer.fromStreamTransformer(
+          new StreamTransformer.fromHandlers(handleData: (i, sink) {
+            sink.add(i * 2);
+          }));
+      var sink = transformer.bind(controller.sink);
+
+      var results = [];
+      controller.stream.listen(results.add, onDone: expectAsync(() {
+        expect(results, equals([2, 4, 6]));
+      }));
+
+      sink.add(1);
+      sink.add(2);
+      sink.add(3);
+      sink.close();
+    });
+
+    test("transforms error events", () {
+      var transformer = new StreamSinkTransformer.fromStreamTransformer(
+          new StreamTransformer.fromHandlers(
+              handleError: (i, stackTrace, sink) {
+                sink.addError(i * 2, stackTrace);
+              }));
+      var sink = transformer.bind(controller.sink);
+
+      var results = [];
+      controller.stream.listen(expectAsync((_) {}, count: 0),
+          onError: (error, stackTrace) {
+            results.add(error);
+          },
+          onDone: expectAsync(() {
+            expect(results, equals([2, 4, 6]));
+          }));
+
+      sink.addError(1);
+      sink.addError(2);
+      sink.addError(3);
+      sink.close();
+    });
+
+    test("transforms done events", () {
+      var transformer = new StreamSinkTransformer.fromStreamTransformer(
+          new StreamTransformer.fromHandlers(
+              handleDone: (sink) {
+                sink.add(1);
+                sink.close();
+              }));
+      var sink = transformer.bind(controller.sink);
+
+      var results = [];
+      controller.stream.listen(results.add, onDone: expectAsync(() {
+        expect(results, equals([1]));
+      }));
+
+      sink.close();
+    });
+
+    test("forwards the future from inner.close", () async {
+      var transformer = new StreamSinkTransformer.fromStreamTransformer(
+          new StreamTransformer.fromHandlers());
+      var innerSink = new CompleterStreamSink();
+      var sink = transformer.bind(innerSink);
+
+      // The futures shouldn't complete until the inner sink's close future
+      // completes.
+      var doneResult = new ResultFuture(sink.done);
+      doneResult.catchError((_) {});
+      var closeResult = new ResultFuture(sink.close());
+      closeResult.catchError((_) {});
+      await flushMicrotasks();
+      expect(doneResult.isComplete, isFalse);
+      expect(closeResult.isComplete, isFalse);
+
+      // Once the inner sink is completed, the futures should fire. 
+      innerSink.completer.complete();
+      await flushMicrotasks();
+      expect(doneResult.isComplete, isTrue);
+      expect(closeResult.isComplete, isTrue);
+    });
+
+    test("doesn't top-level the future from inner.close", () async {
+      var transformer = new StreamSinkTransformer.fromStreamTransformer(
+          new StreamTransformer.fromHandlers(handleData: (_, sink) {
+            sink.close();
+          }));
+      var innerSink = new CompleterStreamSink();
+      var sink = transformer.bind(innerSink);
+
+      // This will close the inner sink, but it shouldn't top-level the error.
+      sink.add(1);
+      innerSink.completer.completeError("oh no");
+      await flushMicrotasks();
+
+      // The error should be piped through done and close even if they're called
+      // after the underlying sink is closed.
+      expect(sink.done, throwsA("oh no"));
+      expect(sink.close(), throwsA("oh no"));
+    });
+  });
+
+  group("fromHandlers", () {
+    test("transforms data events", () {
+      var transformer = new StreamSinkTransformer.fromHandlers(
+          handleData: (i, sink) {
+            sink.add(i * 2);
+          });
+      var sink = transformer.bind(controller.sink);
+
+      var results = [];
+      controller.stream.listen(results.add, onDone: expectAsync(() {
+        expect(results, equals([2, 4, 6]));
+      }));
+
+      sink.add(1);
+      sink.add(2);
+      sink.add(3);
+      sink.close();
+    });
+
+    test("transforms error events", () {
+      var transformer = new StreamSinkTransformer.fromHandlers(
+          handleError: (i, stackTrace, sink) {
+            sink.addError(i * 2, stackTrace);
+          });
+      var sink = transformer.bind(controller.sink);
+
+      var results = [];
+      controller.stream.listen(expectAsync((_) {}, count: 0),
+          onError: (error, stackTrace) {
+            results.add(error);
+          },
+          onDone: expectAsync(() {
+            expect(results, equals([2, 4, 6]));
+          }));
+
+      sink.addError(1);
+      sink.addError(2);
+      sink.addError(3);
+      sink.close();
+    });
+
+    test("transforms done events", () {
+      var transformer = new StreamSinkTransformer.fromHandlers(
+          handleDone: (sink) {
+            sink.add(1);
+            sink.close();
+          });
+      var sink = transformer.bind(controller.sink);
+
+      var results = [];
+      controller.stream.listen(results.add, onDone: expectAsync(() {
+        expect(results, equals([1]));
+      }));
+
+      sink.close();
+    });
+
+    test("forwards the future from inner.close", () async {
+      var transformer = new StreamSinkTransformer.fromHandlers();
+      var innerSink = new CompleterStreamSink();
+      var sink = transformer.bind(innerSink);
+
+      // The futures shouldn't complete until the inner sink's close future
+      // completes.
+      var doneResult = new ResultFuture(sink.done);
+      doneResult.catchError((_) {});
+      var closeResult = new ResultFuture(sink.close());
+      closeResult.catchError((_) {});
+      await flushMicrotasks();
+      expect(doneResult.isComplete, isFalse);
+      expect(closeResult.isComplete, isFalse);
+
+      // Once the inner sink is completed, the futures should fire. 
+      innerSink.completer.complete();
+      await flushMicrotasks();
+      expect(doneResult.isComplete, isTrue);
+      expect(closeResult.isComplete, isTrue);
+    });
+
+    test("doesn't top-level the future from inner.close", () async {
+      var transformer = new StreamSinkTransformer.fromHandlers(
+          handleData: (_, sink) {
+            sink.close();
+          });
+      var innerSink = new CompleterStreamSink();
+      var sink = transformer.bind(innerSink);
+
+      // This will close the inner sink, but it shouldn't top-level the error.
+      sink.add(1);
+      innerSink.completer.completeError("oh no");
+      await flushMicrotasks();
+
+      // The error should be piped through done and close even if they're called
+      // after the underlying sink is closed.
+      expect(sink.done, throwsA("oh no"));
+      expect(sink.close(), throwsA("oh no"));
+    });
+  });
+}
diff --git a/test/utils.dart b/test/utils.dart
index b2ea885..0dc47fa 100644
--- a/test/utils.dart
+++ b/test/utils.dart
@@ -24,3 +24,19 @@
     throw new UnimplementedError("Gotcha!");
   }
 }
+
+/// A dummy [StreamSink] for testing the routing of the [done] and [close]
+/// futures.
+///
+/// The [completer] field allows the user to control the future returned by
+/// [done] and [close].
+class CompleterStreamSink<T> implements StreamSink<T> {
+  final completer = new Completer();
+
+  Future get done => completer.future;
+
+  void add(T event) {}
+  void addError(error, [StackTrace stackTrace]) {}
+  Future addStream(Stream<T> stream) async {}
+  Future close() => completer.future;
+}