Add a StreamSinkTransformer class.
This makes it easier to programmatically manipulate sinks. In
particular, StreamSinkTransformer.fromStreamTransformer makes it easy to
use existing encoders and codecs when emitting data.
R=lrn@google.com
Review URL: https://codereview.chromium.org//1566603002 .
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 6db6f36..8cb7880 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -3,6 +3,10 @@
- Added `CancelableOperation.valueOrCancellation()`, which allows users to be
notified when an operation is canceled elsewhere.
+- Added `StreamSinkTransformer` which transforms events before they're passed to
+ a `StreamSink`, similarly to how `StreamTransformer` transforms events after
+ they're emitted by a stream.
+
## 1.5.0
- Added `LazyStream`, which forwards to the return value of a callback that's
diff --git a/lib/async.dart b/lib/async.dart
index 1d127fd..8f14bac 100644
--- a/lib/async.dart
+++ b/lib/async.dart
@@ -20,6 +20,7 @@
export "src/stream_completer.dart";
export "src/stream_group.dart";
export "src/stream_queue.dart";
+export "src/stream_sink_transformer.dart";
export "src/stream_splitter.dart";
export "src/subscription_stream.dart";
export "stream_zip.dart";
diff --git a/lib/src/stream_sink_transformer.dart b/lib/src/stream_sink_transformer.dart
new file mode 100644
index 0000000..a8410ab
--- /dev/null
+++ b/lib/src/stream_sink_transformer.dart
@@ -0,0 +1,50 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library async.stream_sink_transformer;
+
+import 'dart:async';
+
+import 'stream_sink_transformer/handler_transformer.dart';
+import 'stream_sink_transformer/stream_transformer_wrapper.dart';
+
+/// A [StreamSinkTransformer] transforms the events being passed to a sink.
+///
+/// This works on the same principle as a [StreamTransformer]. Each transformer
+/// defines a [bind] method that takes in the original [StreamSink] and returns
+/// the transformed version. However, where a [StreamTransformer] transforms
+/// events after they leave the stream, this transforms them before they enter
+/// the sink.
+///
+/// Transformers must be able to have `bind` called used multiple times.
+abstract class StreamSinkTransformer<S, T> {
+ /// Creates a [StreamSinkTransformer] that transforms events and errors
+ /// using [transformer].
+ ///
+ /// This is equivalent to piping all events from the outer sink through a
+ /// stream transformed by [transformer] and from there into the inner sink.
+ const factory StreamSinkTransformer.fromStreamTransformer(
+ StreamTransformer<S, T> transformer) =
+ StreamTransformerWrapper<S, T>;
+
+ /// Creates a [StreamSinkTransformer] that delegates events to the given
+ /// handlers.
+ ///
+ /// The handlers work exactly as they do for [StreamTransformer.fromHandlers].
+ /// They're called for each incoming event, and any actions on the sink
+ /// they're passed are forwarded to the inner sink. If a handler is omitted,
+ /// the event is passed through unaltered.
+ factory StreamSinkTransformer.fromHandlers({
+ void handleData(S data, EventSink<T> sink),
+ void handleError(Object error, StackTrace stackTrace, EventSink<T> sink),
+ void handleDone(EventSink<T> sink)}) {
+ return new HandlerTransformer<S, T>(handleData, handleError, handleDone);
+ }
+
+ /// Transforms the events passed to [sink].
+ ///
+ /// Creates a new sink. When events are passed to the returned sink, it will
+ /// transform them and pass the transformed versions to [sink].
+ StreamSink<S> bind(StreamSink<T> sink);
+}
diff --git a/lib/src/stream_sink_transformer/handler_transformer.dart b/lib/src/stream_sink_transformer/handler_transformer.dart
new file mode 100644
index 0000000..e5d8e3c
--- /dev/null
+++ b/lib/src/stream_sink_transformer/handler_transformer.dart
@@ -0,0 +1,107 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library async.stream_sink_transformer.handler_transformer;
+
+import 'dart:async';
+
+import '../stream_sink_transformer.dart';
+import '../delegate/stream_sink.dart';
+
+/// The type of the callback for handling data events.
+typedef void HandleData<S, T>(S data, EventSink<T> sink);
+
+/// The type of the callback for handling error events.
+typedef void HandleError<T>(
+ Object error, StackTrace stackTrace, EventSink<T> sink);
+
+/// The type of the callback for handling done events.
+typedef void HandleDone<T>(EventSink<T> sink);
+
+/// A [StreamSinkTransformer] that delegates events to the given handlers.
+class HandlerTransformer<S, T> implements StreamSinkTransformer<S, T> {
+ /// The handler for data events.
+ final HandleData<S, T> _handleData;
+
+ /// The handler for error events.
+ final HandleError<T> _handleError;
+
+ /// The handler for done events.
+ final HandleDone<T> _handleDone;
+
+ HandlerTransformer(
+ this._handleData, this._handleError, this._handleDone);
+
+ StreamSink<S> bind(StreamSink<T> sink) => new _HandlerSink<S, T>(this, sink);
+}
+
+/// A sink created by [HandlerTransformer].
+class _HandlerSink<S, T> implements StreamSink<S> {
+ /// The transformer that created this sink.
+ final HandlerTransformer<S, T> _transformer;
+
+ /// The original sink that's being transformed.
+ final StreamSink<T> _inner;
+
+ /// The wrapper for [_inner] whose [StreamSink.close] method can't emit
+ /// errors.
+ final StreamSink<T> _safeCloseInner;
+
+ Future get done => _inner.done;
+
+ _HandlerSink(this._transformer, StreamSink<T> inner)
+ : _inner = inner,
+ _safeCloseInner = new _SafeCloseSink<T>(inner);
+
+ void add(S event) {
+ if (_transformer._handleData == null) {
+ // [event] is an S and [_inner.add] takes a T. This style of conversion
+ // will throw an error in checked mode if [_inner] is actually a
+ // [StreamSink<T>], but will work if [_inner] isn't reified and won't add
+ // an extra check in unchecked mode.
+ _inner.add(event as dynamic);
+ } else {
+ _transformer._handleData(event, _safeCloseInner);
+ }
+ }
+
+ void addError(error, [StackTrace stackTrace]) {
+ if (_transformer._handleError == null) {
+ _inner.addError(error, stackTrace);
+ } else {
+ _transformer._handleError(error, stackTrace, _safeCloseInner);
+ }
+ }
+
+ Future addStream(Stream<S> stream) {
+ return _inner.addStream(stream.transform(
+ new StreamTransformer<S, T>.fromHandlers(
+ handleData: _transformer._handleData,
+ handleError: _transformer._handleError,
+ handleDone: _closeSink)));
+ }
+
+ Future close() {
+ if (_transformer._handleDone == null) return _inner.close();
+
+ _transformer._handleDone(_safeCloseInner);
+ return _inner.done;
+ }
+}
+
+/// A wrapper for [StreamSink]s that swallows any errors returned by [close].
+///
+/// [HandlerTransformer] passes this to its handlers to ensure that when they
+/// call [close], they don't leave any dangling [Future]s behind that might emit
+/// unhandleable errors.
+class _SafeCloseSink<T> extends DelegatingStreamSink<T> {
+ _SafeCloseSink(StreamSink<T> inner) : super(inner);
+
+ Future close() => super.close().catchError((_) {});
+}
+
+/// A function to pass as a [StreamTransformer]'s `handleDone` callback.
+void _closeSink(EventSink sink) {
+ sink.close();
+}
diff --git a/lib/src/stream_sink_transformer/stream_transformer_wrapper.dart b/lib/src/stream_sink_transformer/stream_transformer_wrapper.dart
new file mode 100644
index 0000000..b53f208
--- /dev/null
+++ b/lib/src/stream_sink_transformer/stream_transformer_wrapper.dart
@@ -0,0 +1,62 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library async.stream_sink_transformer.stream_transformer_wrapper;
+
+import 'dart:async';
+
+import '../stream_sink_transformer.dart';
+
+/// A [StreamSinkTransformer] that wraps a pre-existing [StreamTransformer].
+class StreamTransformerWrapper<S, T> implements StreamSinkTransformer<S, T> {
+ /// The wrapped transformer.
+ final StreamTransformer<S, T> _transformer;
+
+ const StreamTransformerWrapper(this._transformer);
+
+ StreamSink<S> bind(StreamSink<T> sink) =>
+ new _StreamTransformerWrapperSink<S, T>(_transformer, sink);
+}
+
+/// A sink created by [StreamTransformerWrapper].
+class _StreamTransformerWrapperSink<S, T> implements StreamSink<S> {
+ /// The controller through which events are passed.
+ ///
+ /// This is used to create a stream that can be transformed by the wrapped
+ /// transformer.
+ final _controller = new StreamController<S>(sync: true);
+
+ /// The original sink that's being transformed.
+ final StreamSink<T> _inner;
+
+ Future get done => _inner.done;
+
+ _StreamTransformerWrapperSink(StreamTransformer<S, T> transformer,
+ this._inner) {
+ _controller.stream.transform(transformer).listen(
+ _inner.add,
+ onError: _inner.addError,
+ onDone: () {
+ // Ignore any errors that come from this call to [_inner.close]. The
+ // user can access them through [done] or the value returned from
+ // [this.close], and we don't want them to get top-leveled.
+ _inner.close().catchError((_) {});
+ });
+ }
+
+ void add(S event) {
+ _controller.add(event);
+ }
+
+ void addError(error, [StackTrace stackTrace]) {
+ _controller.addError(error, stackTrace);
+ }
+
+ Future addStream(Stream<S> stream) => _controller.addStream(stream);
+
+ Future close() {
+ _controller.close();
+ return _inner.done;
+ }
+}
diff --git a/pubspec.yaml b/pubspec.yaml
index 16e2ea2..e13f6ab 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
name: async
-version: 1.6.0-dev
+version: 1.6.0
author: Dart Team <misc@dartlang.org>
description: Utility functions and classes related to the 'dart:async' library.
homepage: https://www.github.com/dart-lang/async
diff --git a/test/stream_sink_transformer_test.dart b/test/stream_sink_transformer_test.dart
new file mode 100644
index 0000000..70e7e6c
--- /dev/null
+++ b/test/stream_sink_transformer_test.dart
@@ -0,0 +1,218 @@
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE filevents.
+
+import "dart:async";
+
+import "package:async/async.dart";
+import "package:test/test.dart";
+
+import "utils.dart";
+
+void main() {
+ var controller;
+ setUp(() {
+ controller = new StreamController();
+ });
+
+ group("fromStreamTransformer", () {
+ test("transforms data events", () {
+ var transformer = new StreamSinkTransformer.fromStreamTransformer(
+ new StreamTransformer.fromHandlers(handleData: (i, sink) {
+ sink.add(i * 2);
+ }));
+ var sink = transformer.bind(controller.sink);
+
+ var results = [];
+ controller.stream.listen(results.add, onDone: expectAsync(() {
+ expect(results, equals([2, 4, 6]));
+ }));
+
+ sink.add(1);
+ sink.add(2);
+ sink.add(3);
+ sink.close();
+ });
+
+ test("transforms error events", () {
+ var transformer = new StreamSinkTransformer.fromStreamTransformer(
+ new StreamTransformer.fromHandlers(
+ handleError: (i, stackTrace, sink) {
+ sink.addError(i * 2, stackTrace);
+ }));
+ var sink = transformer.bind(controller.sink);
+
+ var results = [];
+ controller.stream.listen(expectAsync((_) {}, count: 0),
+ onError: (error, stackTrace) {
+ results.add(error);
+ },
+ onDone: expectAsync(() {
+ expect(results, equals([2, 4, 6]));
+ }));
+
+ sink.addError(1);
+ sink.addError(2);
+ sink.addError(3);
+ sink.close();
+ });
+
+ test("transforms done events", () {
+ var transformer = new StreamSinkTransformer.fromStreamTransformer(
+ new StreamTransformer.fromHandlers(
+ handleDone: (sink) {
+ sink.add(1);
+ sink.close();
+ }));
+ var sink = transformer.bind(controller.sink);
+
+ var results = [];
+ controller.stream.listen(results.add, onDone: expectAsync(() {
+ expect(results, equals([1]));
+ }));
+
+ sink.close();
+ });
+
+ test("forwards the future from inner.close", () async {
+ var transformer = new StreamSinkTransformer.fromStreamTransformer(
+ new StreamTransformer.fromHandlers());
+ var innerSink = new CompleterStreamSink();
+ var sink = transformer.bind(innerSink);
+
+ // The futures shouldn't complete until the inner sink's close future
+ // completes.
+ var doneResult = new ResultFuture(sink.done);
+ doneResult.catchError((_) {});
+ var closeResult = new ResultFuture(sink.close());
+ closeResult.catchError((_) {});
+ await flushMicrotasks();
+ expect(doneResult.isComplete, isFalse);
+ expect(closeResult.isComplete, isFalse);
+
+ // Once the inner sink is completed, the futures should fire.
+ innerSink.completer.complete();
+ await flushMicrotasks();
+ expect(doneResult.isComplete, isTrue);
+ expect(closeResult.isComplete, isTrue);
+ });
+
+ test("doesn't top-level the future from inner.close", () async {
+ var transformer = new StreamSinkTransformer.fromStreamTransformer(
+ new StreamTransformer.fromHandlers(handleData: (_, sink) {
+ sink.close();
+ }));
+ var innerSink = new CompleterStreamSink();
+ var sink = transformer.bind(innerSink);
+
+ // This will close the inner sink, but it shouldn't top-level the error.
+ sink.add(1);
+ innerSink.completer.completeError("oh no");
+ await flushMicrotasks();
+
+ // The error should be piped through done and close even if they're called
+ // after the underlying sink is closed.
+ expect(sink.done, throwsA("oh no"));
+ expect(sink.close(), throwsA("oh no"));
+ });
+ });
+
+ group("fromHandlers", () {
+ test("transforms data events", () {
+ var transformer = new StreamSinkTransformer.fromHandlers(
+ handleData: (i, sink) {
+ sink.add(i * 2);
+ });
+ var sink = transformer.bind(controller.sink);
+
+ var results = [];
+ controller.stream.listen(results.add, onDone: expectAsync(() {
+ expect(results, equals([2, 4, 6]));
+ }));
+
+ sink.add(1);
+ sink.add(2);
+ sink.add(3);
+ sink.close();
+ });
+
+ test("transforms error events", () {
+ var transformer = new StreamSinkTransformer.fromHandlers(
+ handleError: (i, stackTrace, sink) {
+ sink.addError(i * 2, stackTrace);
+ });
+ var sink = transformer.bind(controller.sink);
+
+ var results = [];
+ controller.stream.listen(expectAsync((_) {}, count: 0),
+ onError: (error, stackTrace) {
+ results.add(error);
+ },
+ onDone: expectAsync(() {
+ expect(results, equals([2, 4, 6]));
+ }));
+
+ sink.addError(1);
+ sink.addError(2);
+ sink.addError(3);
+ sink.close();
+ });
+
+ test("transforms done events", () {
+ var transformer = new StreamSinkTransformer.fromHandlers(
+ handleDone: (sink) {
+ sink.add(1);
+ sink.close();
+ });
+ var sink = transformer.bind(controller.sink);
+
+ var results = [];
+ controller.stream.listen(results.add, onDone: expectAsync(() {
+ expect(results, equals([1]));
+ }));
+
+ sink.close();
+ });
+
+ test("forwards the future from inner.close", () async {
+ var transformer = new StreamSinkTransformer.fromHandlers();
+ var innerSink = new CompleterStreamSink();
+ var sink = transformer.bind(innerSink);
+
+ // The futures shouldn't complete until the inner sink's close future
+ // completes.
+ var doneResult = new ResultFuture(sink.done);
+ doneResult.catchError((_) {});
+ var closeResult = new ResultFuture(sink.close());
+ closeResult.catchError((_) {});
+ await flushMicrotasks();
+ expect(doneResult.isComplete, isFalse);
+ expect(closeResult.isComplete, isFalse);
+
+ // Once the inner sink is completed, the futures should fire.
+ innerSink.completer.complete();
+ await flushMicrotasks();
+ expect(doneResult.isComplete, isTrue);
+ expect(closeResult.isComplete, isTrue);
+ });
+
+ test("doesn't top-level the future from inner.close", () async {
+ var transformer = new StreamSinkTransformer.fromHandlers(
+ handleData: (_, sink) {
+ sink.close();
+ });
+ var innerSink = new CompleterStreamSink();
+ var sink = transformer.bind(innerSink);
+
+ // This will close the inner sink, but it shouldn't top-level the error.
+ sink.add(1);
+ innerSink.completer.completeError("oh no");
+ await flushMicrotasks();
+
+ // The error should be piped through done and close even if they're called
+ // after the underlying sink is closed.
+ expect(sink.done, throwsA("oh no"));
+ expect(sink.close(), throwsA("oh no"));
+ });
+ });
+}
diff --git a/test/utils.dart b/test/utils.dart
index b2ea885..0dc47fa 100644
--- a/test/utils.dart
+++ b/test/utils.dart
@@ -24,3 +24,19 @@
throw new UnimplementedError("Gotcha!");
}
}
+
+/// A dummy [StreamSink] for testing the routing of the [done] and [close]
+/// futures.
+///
+/// The [completer] field allows the user to control the future returned by
+/// [done] and [close].
+class CompleterStreamSink<T> implements StreamSink<T> {
+ final completer = new Completer();
+
+ Future get done => completer.future;
+
+ void add(T event) {}
+ void addError(error, [StackTrace stackTrace]) {}
+ Future addStream(Stream<T> stream) async {}
+ Future close() => completer.future;
+}