Add StreamChannelTransformer. This makes StreamChannel.transform() take a StreamChannelTransformer rather than a codec. This is better, since a codec is explicitly intended to convert chunked data, while transformers are more general. R=rnystrom@google.com Review URL: https://codereview.chromium.org//1632903004 .
diff --git a/pkgs/stream_channel/lib/src/isolate_channel.dart b/pkgs/stream_channel/lib/src/isolate_channel.dart index c664543..46375c9 100644 --- a/pkgs/stream_channel/lib/src/isolate_channel.dart +++ b/pkgs/stream_channel/lib/src/isolate_channel.dart
@@ -128,7 +128,7 @@ if (_inAddStream) { throw new StateError("Cannot add stream while adding stream."); } - if (_isDone) return; + if (_isDone) return new Future.value(); _inAddStream = true; var completer = new Completer.sync();
diff --git a/pkgs/stream_channel/lib/src/stream_channel_transformer.dart b/pkgs/stream_channel/lib/src/stream_channel_transformer.dart new file mode 100644 index 0000000..be032c6 --- /dev/null +++ b/pkgs/stream_channel/lib/src/stream_channel_transformer.dart
@@ -0,0 +1,52 @@ +// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; +import 'dart:convert'; + +import 'package:async/async.dart'; + +import '../stream_channel.dart'; + +/// A [StreamChannelTransformer] transforms the events being passed to and +/// emitted by a [StreamChannel]. +/// +/// This works on the same principle as [StreamTransformer] and +/// [StreamSinkTransformer]. Each transformer defines a [bind] method that takes +/// in the original [StreamChannel] and returns the transformed version. +/// +/// Transformers must be able to have `bind` called multiple times. +class StreamChannelTransformer<S, T> { + /// The transformer to use on the channel's stream. + final StreamTransformer _streamTransformer; + + /// The transformer to use on the channel's sink. + final StreamSinkTransformer _sinkTransformer; + + /// Creates a [StreamChannelTransformer] from existing stream and sink + /// transformers. + const StreamChannelTransformer( + this._streamTransformer, this._sinkTransformer); + + /// Creates a [StreamChannelTransformer] from a codec's encoder and decoder. + /// + /// All input to the inner channel's sink is encoded using [Codec.encoder], + /// and all output from its stream is decoded using [Codec.decoder]. + StreamChannelTransformer.fromCodec(Codec<S, T> codec) + : this( + codec.decoder, + new StreamSinkTransformer.fromStreamTransformer(codec.encoder)); + + /// Transforms the events sent to and emitted by [channel]. + /// + /// Creates a new channel. When events are passed to the returned channel's + /// sink, the transformer will transform them and pass the transformed + /// versions to `channel.sink`. When events are emitted from the + /// `channel.straem`, the transformer will transform them and pass the + /// transformed versions to the returned channel's stream. + StreamChannel<S> bind(StreamChannel<T> channel) => + new StreamChannel<S>( + channel.stream.transform(_streamTransformer), + _sinkTransformer.bind(channel.sink)); +}
diff --git a/pkgs/stream_channel/lib/stream_channel.dart b/pkgs/stream_channel/lib/stream_channel.dart index 2885e2d..e7fb055 100644 --- a/pkgs/stream_channel/lib/stream_channel.dart +++ b/pkgs/stream_channel/lib/stream_channel.dart
@@ -3,14 +3,14 @@ // BSD-style license that can be found in the LICENSE file. import 'dart:async'; -import 'dart:convert'; -import 'package:async/async.dart'; +import 'src/stream_channel_transformer.dart'; export 'src/delegating_stream_channel.dart'; export 'src/isolate_channel.dart'; export 'src/multi_channel.dart'; export 'src/stream_channel_completer.dart'; +export 'src/stream_channel_transformer.dart'; /// An abstract class representing a two-way communication channel. /// @@ -70,12 +70,10 @@ /// directly to the other. void pipe(StreamChannel<T> other); - /// Transforms [this] using [codec]. + /// Transforms [this] using [transformer]. /// - /// This returns a stream channel that encodes all input using [Codec.encoder] - /// before passing it to this channel's [sink], and decodes all output from - /// this channel's [stream] using [Codec.decoder]. - StreamChannel transform(Codec<dynamic, T> codec); + /// This is identical to calling `transformer.bind(channel)`. + StreamChannel transform(StreamChannelTransformer<dynamic, T> transformer); } /// An implementation of [StreamChannel] that simply takes a stream and a sink @@ -98,11 +96,6 @@ other.stream.pipe(sink); } - StreamChannel transform(Codec<dynamic, T> codec) { - var sinkTransformer = - new StreamSinkTransformer.fromStreamTransformer(codec.encoder); - return new _StreamChannel( - stream.transform(codec.decoder), - sinkTransformer.bind(sink)); - } + StreamChannel transform(StreamChannelTransformer<dynamic, T> transformer) => + transformer.bind(this); }
diff --git a/pkgs/stream_channel/test/stream_channel_test.dart b/pkgs/stream_channel/test/stream_channel_test.dart index 30a7db4..5f006c3 100644 --- a/pkgs/stream_channel/test/stream_channel_test.dart +++ b/pkgs/stream_channel/test/stream_channel_test.dart
@@ -44,7 +44,8 @@ }); test("transform() transforms the channel", () { - var transformed = channel.transform(UTF8); + var transformed = channel.transform( + new StreamChannelTransformer.fromCodec(UTF8)); streamController.add([102, 111, 111, 98, 97, 114]); streamController.close();