Add support for stream- and sink-specific changes.

These are especially useful when doing stream-specific transformations
like error handling.

R=rnystrom@google.com

Review URL: https://codereview.chromium.org//1657943002 .
diff --git a/CHANGELOG.md b/CHANGELOG.md
index d101ea0..ab08072 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,8 @@
-## 1.0.2
+## 1.1.0
+
+* Add `StreamChannel.transformStream()`, `StreamChannel.transformSink()`,
+  `StreamChannel.changeStream()`, and `StreamChannel.changeSink()` to support
+  changing only the stream or only the sink of a channel.
 
 * Be more explicit about `JsonDocumentTransformer`'s error-handling behavior.
 
diff --git a/lib/stream_channel.dart b/lib/stream_channel.dart
index f914188..ff31042 100644
--- a/lib/stream_channel.dart
+++ b/lib/stream_channel.dart
@@ -4,6 +4,8 @@
 
 import 'dart:async';
 
+import 'package:async/async.dart';
+
 import 'src/stream_channel_transformer.dart';
 
 export 'src/delegating_stream_channel.dart';
@@ -75,6 +77,20 @@
   ///
   /// This is identical to calling `transformer.bind(channel)`.
   StreamChannel transform(StreamChannelTransformer<dynamic, T> transformer);
+
+  /// Transforms only the [stream] component of [this] using [transformer].
+  StreamChannel<T> transformStream(StreamTransformer<T, T> transformer);
+
+  /// Transforms only the [sink] component of [this] using [transformer].
+  StreamChannel<T> transformSink(StreamSinkTransformer<T, T> transformer);
+
+  /// Returns a copy of [this] with [stream] replaced by [change]'s return
+  /// value.
+  StreamChannel<T> changeStream(Stream<T> change(Stream<T> stream));
+
+  /// Returns a copy of [this] with [sink] replaced by [change]'s return
+  /// value.
+  StreamChannel<T> changeSink(StreamSink<T> change(StreamSink<T> sink));
 }
 
 /// An implementation of [StreamChannel] that simply takes a stream and a sink
@@ -99,4 +115,16 @@
 
   StreamChannel transform(StreamChannelTransformer<dynamic, T> transformer) =>
       transformer.bind(this);
+
+  StreamChannel<T> transformStream(StreamTransformer<T, T> transformer) =>
+      changeStream(transformer.bind);
+
+  StreamChannel<T> transformSink(StreamSinkTransformer<T, T> transformer) =>
+      changeSink(transformer.bind);
+
+  StreamChannel<T> changeStream(Stream<T> change(Stream<T> stream)) =>
+      new StreamChannel(change(stream), sink);
+
+  StreamChannel<T> changeSink(StreamSink<T> change(StreamSink<T> sink)) =>
+      new StreamChannel(stream, change(sink));
 }
diff --git a/pubspec.yaml b/pubspec.yaml
index 54e9198..7ef1101 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
 name: stream_channel
-version: 1.0.2
+version: 1.1.0
 description: An abstraction for two-way communication channels.
 author: Dart Team <misc@dartlang.org>
 homepage: https://github.com/dart-lang/stream_channel
diff --git a/test/stream_channel_test.dart b/test/stream_channel_test.dart
index 5f006c3..f525177 100644
--- a/test/stream_channel_test.dart
+++ b/test/stream_channel_test.dart
@@ -6,6 +6,7 @@
 import 'dart:convert';
 import 'dart:isolate';
 
+import 'package:async/async.dart';
 import 'package:stream_channel/stream_channel.dart';
 import 'package:test/test.dart';
 
@@ -56,4 +57,62 @@
     expect(sinkController.stream.toList(),
         completion(equals([[102, 98, 108, 116, 104, 112]])));
   });
+
+  test("transformStream() transforms only the stream", () {
+    var transformed = channel.transformStream(UTF8.decoder);
+
+    streamController.add([102, 111, 111, 98, 97, 114]);
+    streamController.close();
+    expect(transformed.stream.toList(), completion(equals(["foobar"])));
+
+    transformed.sink.add("fblthp");
+    transformed.sink.close();
+    expect(sinkController.stream.toList(),
+        completion(equals(["fblthp"])));
+  });
+
+  test("transformSink() transforms only the sink", () {
+    var transformed = channel.transformSink(
+        new StreamSinkTransformer.fromStreamTransformer(UTF8.encoder));
+
+    streamController.add([102, 111, 111, 98, 97, 114]);
+    streamController.close();
+    expect(transformed.stream.toList(),
+        completion(equals([[102, 111, 111, 98, 97, 114]])));
+
+    transformed.sink.add("fblthp");
+    transformed.sink.close();
+    expect(sinkController.stream.toList(),
+        completion(equals([[102, 98, 108, 116, 104, 112]])));
+  });
+
+  test("changeStream() changes the stream", () {
+    var newController = new StreamController();
+    var changed = channel.changeStream((stream) {
+      expect(stream, equals(channel.stream));
+      return newController.stream;
+    });
+
+    newController.add(10);
+    newController.close();
+
+    streamController.add(20);
+    streamController.close();
+
+    expect(changed.stream.toList(), completion(equals([10])));
+  });
+
+  test("changeSink() changes the sink", () {
+    var newController = new StreamController();
+    var changed = channel.changeSink((sink) {
+      expect(sink, equals(channel.sink));
+      return newController.sink;
+    });
+
+    expect(newController.stream.toList(), completion(equals([10])));
+    streamController.stream.listen(expectAsync((_) {}, count: 0));
+
+    changed.sink.add(10);
+    changed.sink.close();
+  });
 }