Add support for stream- and sink-specific changes.
These are especially useful when doing stream-specific transformations
like error handling.
R=rnystrom@google.com
Review URL: https://codereview.chromium.org//1657943002 .
diff --git a/CHANGELOG.md b/CHANGELOG.md
index d101ea0..ab08072 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,8 @@
-## 1.0.2
+## 1.1.0
+
+* Add `StreamChannel.transformStream()`, `StreamChannel.transformSink()`,
+ `StreamChannel.changeStream()`, and `StreamChannel.changeSink()` to support
+ changing only the stream or only the sink of a channel.
* Be more explicit about `JsonDocumentTransformer`'s error-handling behavior.
diff --git a/lib/stream_channel.dart b/lib/stream_channel.dart
index f914188..ff31042 100644
--- a/lib/stream_channel.dart
+++ b/lib/stream_channel.dart
@@ -4,6 +4,8 @@
import 'dart:async';
+import 'package:async/async.dart';
+
import 'src/stream_channel_transformer.dart';
export 'src/delegating_stream_channel.dart';
@@ -75,6 +77,20 @@
///
/// This is identical to calling `transformer.bind(channel)`.
StreamChannel transform(StreamChannelTransformer<dynamic, T> transformer);
+
+ /// Transforms only the [stream] component of [this] using [transformer].
+ StreamChannel<T> transformStream(StreamTransformer<T, T> transformer);
+
+ /// Transforms only the [sink] component of [this] using [transformer].
+ StreamChannel<T> transformSink(StreamSinkTransformer<T, T> transformer);
+
+ /// Returns a copy of [this] with [stream] replaced by [change]'s return
+ /// value.
+ StreamChannel<T> changeStream(Stream<T> change(Stream<T> stream));
+
+ /// Returns a copy of [this] with [sink] replaced by [change]'s return
+ /// value.
+ StreamChannel<T> changeSink(StreamSink<T> change(StreamSink<T> sink));
}
/// An implementation of [StreamChannel] that simply takes a stream and a sink
@@ -99,4 +115,16 @@
StreamChannel transform(StreamChannelTransformer<dynamic, T> transformer) =>
transformer.bind(this);
+
+ StreamChannel<T> transformStream(StreamTransformer<T, T> transformer) =>
+ changeStream(transformer.bind);
+
+ StreamChannel<T> transformSink(StreamSinkTransformer<T, T> transformer) =>
+ changeSink(transformer.bind);
+
+ StreamChannel<T> changeStream(Stream<T> change(Stream<T> stream)) =>
+ new StreamChannel(change(stream), sink);
+
+ StreamChannel<T> changeSink(StreamSink<T> change(StreamSink<T> sink)) =>
+ new StreamChannel(stream, change(sink));
}
diff --git a/pubspec.yaml b/pubspec.yaml
index 54e9198..7ef1101 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
name: stream_channel
-version: 1.0.2
+version: 1.1.0
description: An abstraction for two-way communication channels.
author: Dart Team <misc@dartlang.org>
homepage: https://github.com/dart-lang/stream_channel
diff --git a/test/stream_channel_test.dart b/test/stream_channel_test.dart
index 5f006c3..f525177 100644
--- a/test/stream_channel_test.dart
+++ b/test/stream_channel_test.dart
@@ -6,6 +6,7 @@
import 'dart:convert';
import 'dart:isolate';
+import 'package:async/async.dart';
import 'package:stream_channel/stream_channel.dart';
import 'package:test/test.dart';
@@ -56,4 +57,62 @@
expect(sinkController.stream.toList(),
completion(equals([[102, 98, 108, 116, 104, 112]])));
});
+
+ test("transformStream() transforms only the stream", () {
+ var transformed = channel.transformStream(UTF8.decoder);
+
+ streamController.add([102, 111, 111, 98, 97, 114]);
+ streamController.close();
+ expect(transformed.stream.toList(), completion(equals(["foobar"])));
+
+ transformed.sink.add("fblthp");
+ transformed.sink.close();
+ expect(sinkController.stream.toList(),
+ completion(equals(["fblthp"])));
+ });
+
+ test("transformSink() transforms only the sink", () {
+ var transformed = channel.transformSink(
+ new StreamSinkTransformer.fromStreamTransformer(UTF8.encoder));
+
+ streamController.add([102, 111, 111, 98, 97, 114]);
+ streamController.close();
+ expect(transformed.stream.toList(),
+ completion(equals([[102, 111, 111, 98, 97, 114]])));
+
+ transformed.sink.add("fblthp");
+ transformed.sink.close();
+ expect(sinkController.stream.toList(),
+ completion(equals([[102, 98, 108, 116, 104, 112]])));
+ });
+
+ test("changeStream() changes the stream", () {
+ var newController = new StreamController();
+ var changed = channel.changeStream((stream) {
+ expect(stream, equals(channel.stream));
+ return newController.stream;
+ });
+
+ newController.add(10);
+ newController.close();
+
+ streamController.add(20);
+ streamController.close();
+
+ expect(changed.stream.toList(), completion(equals([10])));
+ });
+
+ test("changeSink() changes the sink", () {
+ var newController = new StreamController();
+ var changed = channel.changeSink((sink) {
+ expect(sink, equals(channel.sink));
+ return newController.sink;
+ });
+
+ expect(newController.stream.toList(), completion(equals([10])));
+ streamController.stream.listen(expectAsync((_) {}, count: 0));
+
+ changed.sink.add(10);
+ changed.sink.close();
+ });
}