Add StreamChannelTransformer.
This makes StreamChannel.transform() take a StreamChannelTransformer
rather than a codec. This is better, since a codec is explicitly
intended to convert chunked data, while transformers are more general.
R=rnystrom@google.com
Review URL: https://codereview.chromium.org//1632903004 .
diff --git a/lib/src/isolate_channel.dart b/lib/src/isolate_channel.dart
index c664543..46375c9 100644
--- a/lib/src/isolate_channel.dart
+++ b/lib/src/isolate_channel.dart
@@ -128,7 +128,7 @@
if (_inAddStream) {
throw new StateError("Cannot add stream while adding stream.");
}
- if (_isDone) return;
+ if (_isDone) return new Future.value();
_inAddStream = true;
var completer = new Completer.sync();
diff --git a/lib/src/stream_channel_transformer.dart b/lib/src/stream_channel_transformer.dart
new file mode 100644
index 0000000..be032c6
--- /dev/null
+++ b/lib/src/stream_channel_transformer.dart
@@ -0,0 +1,52 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:convert';
+
+import 'package:async/async.dart';
+
+import '../stream_channel.dart';
+
+/// A [StreamChannelTransformer] transforms the events being passed to and
+/// emitted by a [StreamChannel].
+///
+/// This works on the same principle as [StreamTransformer] and
+/// [StreamSinkTransformer]. Each transformer defines a [bind] method that takes
+/// in the original [StreamChannel] and returns the transformed version.
+///
+/// Transformers must be able to have `bind` called multiple times.
+class StreamChannelTransformer<S, T> {
+ /// The transformer to use on the channel's stream.
+ final StreamTransformer _streamTransformer;
+
+ /// The transformer to use on the channel's sink.
+ final StreamSinkTransformer _sinkTransformer;
+
+ /// Creates a [StreamChannelTransformer] from existing stream and sink
+ /// transformers.
+ const StreamChannelTransformer(
+ this._streamTransformer, this._sinkTransformer);
+
+ /// Creates a [StreamChannelTransformer] from a codec's encoder and decoder.
+ ///
+ /// All input to the inner channel's sink is encoded using [Codec.encoder],
+ /// and all output from its stream is decoded using [Codec.decoder].
+ StreamChannelTransformer.fromCodec(Codec<S, T> codec)
+ : this(
+ codec.decoder,
+ new StreamSinkTransformer.fromStreamTransformer(codec.encoder));
+
+ /// Transforms the events sent to and emitted by [channel].
+ ///
+ /// Creates a new channel. When events are passed to the returned channel's
+ /// sink, the transformer will transform them and pass the transformed
+ /// versions to `channel.sink`. When events are emitted from the
+ /// `channel.straem`, the transformer will transform them and pass the
+ /// transformed versions to the returned channel's stream.
+ StreamChannel<S> bind(StreamChannel<T> channel) =>
+ new StreamChannel<S>(
+ channel.stream.transform(_streamTransformer),
+ _sinkTransformer.bind(channel.sink));
+}
diff --git a/lib/stream_channel.dart b/lib/stream_channel.dart
index 2885e2d..e7fb055 100644
--- a/lib/stream_channel.dart
+++ b/lib/stream_channel.dart
@@ -3,14 +3,14 @@
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
-import 'dart:convert';
-import 'package:async/async.dart';
+import 'src/stream_channel_transformer.dart';
export 'src/delegating_stream_channel.dart';
export 'src/isolate_channel.dart';
export 'src/multi_channel.dart';
export 'src/stream_channel_completer.dart';
+export 'src/stream_channel_transformer.dart';
/// An abstract class representing a two-way communication channel.
///
@@ -70,12 +70,10 @@
/// directly to the other.
void pipe(StreamChannel<T> other);
- /// Transforms [this] using [codec].
+ /// Transforms [this] using [transformer].
///
- /// This returns a stream channel that encodes all input using [Codec.encoder]
- /// before passing it to this channel's [sink], and decodes all output from
- /// this channel's [stream] using [Codec.decoder].
- StreamChannel transform(Codec<dynamic, T> codec);
+ /// This is identical to calling `transformer.bind(channel)`.
+ StreamChannel transform(StreamChannelTransformer<dynamic, T> transformer);
}
/// An implementation of [StreamChannel] that simply takes a stream and a sink
@@ -98,11 +96,6 @@
other.stream.pipe(sink);
}
- StreamChannel transform(Codec<dynamic, T> codec) {
- var sinkTransformer =
- new StreamSinkTransformer.fromStreamTransformer(codec.encoder);
- return new _StreamChannel(
- stream.transform(codec.decoder),
- sinkTransformer.bind(sink));
- }
+ StreamChannel transform(StreamChannelTransformer<dynamic, T> transformer) =>
+ transformer.bind(this);
}
diff --git a/test/stream_channel_test.dart b/test/stream_channel_test.dart
index 30a7db4..5f006c3 100644
--- a/test/stream_channel_test.dart
+++ b/test/stream_channel_test.dart
@@ -44,7 +44,8 @@
});
test("transform() transforms the channel", () {
- var transformed = channel.transform(UTF8);
+ var transformed = channel.transform(
+ new StreamChannelTransformer.fromCodec(UTF8));
streamController.add([102, 111, 111, 98, 97, 114]);
streamController.close();