Add ClosedStreamSink, and *Completer.setError, and StreamSinkCompleter.fromFuture.
R=lrn@google.com
Review URL: https://codereview.chromium.org//1615253002 .
diff --git a/CHANGELOG.md b/CHANGELOG.md
index af87e24..15570d7 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -3,6 +3,12 @@
- Added `StreamSinkCompleter`, for creating a `StreamSink` now and providing its
destination later as another sink.
+- Added `StreamCompleter.setError`, a shortcut for emitting a single error event
+ on the resulting stream.
+
+- Added `NullStreamSink`, an implementation of `StreamSink` that discards all
+ events.
+
## 1.7.0
- Added `SingleSubscriptionTransformer`, a `StreamTransformer` that converts a
diff --git a/lib/async.dart b/lib/async.dart
index ef59b73..ef1ac04 100644
--- a/lib/async.dart
+++ b/lib/async.dart
@@ -15,6 +15,7 @@
export "src/delegate/stream_subscription.dart";
export "src/future_group.dart";
export "src/lazy_stream.dart";
+export "src/null_stream_sink.dart";
export "src/restartable_timer.dart";
export "src/result_future.dart";
export "src/single_subscription_transformer.dart";
diff --git a/lib/src/null_stream_sink.dart b/lib/src/null_stream_sink.dart
new file mode 100644
index 0000000..aa85924
--- /dev/null
+++ b/lib/src/null_stream_sink.dart
@@ -0,0 +1,90 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library async.null_stream_sink;
+
+import 'dart:async';
+
+/// A [StreamSink] that discards all events.
+///
+/// The sink silently drops events until [close] is called, at which point it
+/// throws [StateError]s when events are added. This is the same behavior as a
+/// sink whose remote end has closed, such as when a [WebSocket] connection has
+/// been closed.
+///
+/// This can be used when a sink is needed but no events are actually intended
+/// to be added. The [new NullStreamSink.error] constructor can be used to
+/// represent errors when creating a sink, since [StreamSink.done] exposes sink
+/// errors. For example:
+///
+/// ```dart
+/// StreamSink<List<int>> openForWrite(String filename) {
+/// try {
+/// return new RandomAccessSink(new File(filename).openSync());
+/// } on IOException catch (error, stackTrace) {
+/// return new NullStreamSink.error(error, stackTrace);
+/// }
+/// }
+/// ```
+class NullStreamSink<T> implements StreamSink<T> {
+ final Future done;
+
+ /// Whether the sink has been closed.
+ var _closed = false;
+
+ /// Whether an [addStream] call is pending.
+ ///
+ /// We don't actually add any events from streams, but it does return the
+ /// [StreamSubscription.cancel] future so to be [StreamSink]-complaint we
+ /// reject events until that completes.
+ var _addingStream = false;
+
+ /// Creates a null sink.
+ ///
+ /// If [done] is passed, it's used as the [Sink.done] future. Otherwise, a
+ /// completed future is used.
+ NullStreamSink({Future done}) : done = done ?? new Future.value();
+
+ /// Creates a null sink whose [done] future emits [error].
+ ///
+ /// Note that this error will not be considered uncaught.
+ NullStreamSink.error(error, [StackTrace stackTrace])
+ : done = new Future.error(error, stackTrace)
+ // Don't top-level the error. This gives the user a change to call
+ // [close] or [done], and matches the behavior of a remote endpoint
+ // experiencing an error.
+ ..catchError((_) {});
+
+ void add(T data) {
+ _checkEventAllowed();
+ }
+
+ void addError(error, [StackTrace stackTrace]) {
+ _checkEventAllowed();
+ }
+
+ Future addStream(Stream<T> stream) {
+ _checkEventAllowed();
+
+ _addingStream = true;
+ var future = stream.listen(null).cancel() ?? new Future.value();
+ return future.whenComplete(() {
+ _addingStream = false;
+ });
+ }
+
+ /// Throws a [StateError] if [close] has been called or an [addStream] call is
+ /// pending.
+ void _checkEventAllowed() {
+ if (_closed) throw new StateError("Cannot add to a closed sink.");
+ if (_addingStream) {
+ throw new StateError("Cannot add to a sink while adding a stream.");
+ }
+ }
+
+ Future close() {
+ _closed = true;
+ return done;
+ }
+}
diff --git a/lib/src/stream_completer.dart b/lib/src/stream_completer.dart
index c343e6e..a6260dc 100644
--- a/lib/src/stream_completer.dart
+++ b/lib/src/stream_completer.dart
@@ -39,9 +39,7 @@
static Stream fromFuture(Future<Stream> streamFuture) {
var completer = new StreamCompleter();
streamFuture.then(completer.setSourceStream,
- onError: (e, s) {
- completer.setSourceStream(streamFuture.asStream());
- });
+ onError: completer.setError);
return completer.stream;
}
@@ -76,8 +74,8 @@
/// it is immediately listened to, and its events are forwarded to the
/// existing subscription.
///
- /// Either [setSourceStream] or [setEmpty] may be called at most once.
- /// Trying to call either of them again will fail.
+ /// Any one of [setSourceStream], [setEmpty], and [setError] may be called at
+ /// most once. Trying to call any of them again will fail.
void setSourceStream(Stream<T> sourceStream) {
if (_stream._isSourceStreamSet) {
throw new StateError("Source stream already set");
@@ -87,14 +85,24 @@
/// Equivalent to setting an empty stream using [setSourceStream].
///
- /// Either [setSourceStream] or [setEmpty] may be called at most once.
- /// Trying to call either of them again will fail.
+ /// Any one of [setSourceStream], [setEmpty], and [setError] may be called at
+ /// most once. Trying to call any of them again will fail.
void setEmpty() {
if (_stream._isSourceStreamSet) {
throw new StateError("Source stream already set");
}
_stream._setEmpty();
}
+
+ /// Completes this to a stream that emits [error] and then closes.
+ ///
+ /// This is useful when the process of creating the data for the stream fails.
+ ///
+ /// Any one of [setSourceStream], [setEmpty], and [setError] may be called at
+ /// most once. Trying to call any of them again will fail.
+ void setError(error, [StackTrace stackTrace]) {
+ setSourceStream(new Stream.fromFuture(new Future.error(error, stackTrace)));
+ }
}
/// Stream completed by [StreamCompleter].
diff --git a/lib/src/stream_sink_completer.dart b/lib/src/stream_sink_completer.dart
index 10caa06..0b6dc46 100644
--- a/lib/src/stream_sink_completer.dart
+++ b/lib/src/stream_sink_completer.dart
@@ -6,6 +6,8 @@
import 'dart:async';
+import 'null_stream_sink.dart';
+
/// A [sink] where the destination is provided later.
///
/// The [sink] is a normal sink that you can add events to to immediately, but
@@ -29,6 +31,20 @@
/// Returns [sink] typed as a [_CompleterSink].
_CompleterSink<T> get _sink => sink;
+ /// Convert a `Future<StreamSink>` to a `StreamSink`.
+ ///
+ /// This creates a sink using a sink completer, and sets the destination sink
+ /// to the result of the future when the future completes.
+ ///
+ /// If the future completes with an error, the returned sink will instead
+ /// be closed. Its [Sink.done] future will contain the error.
+ static StreamSink fromFuture(Future<StreamSink> sinkFuture) {
+ var completer = new StreamSinkCompleter();
+ sinkFuture.then(completer.setDestinationSink,
+ onError: completer.setError);
+ return completer.sink;
+ }
+
/// Sets a sink as the destination for events from the [StreamSinkCompleter]'s
/// [sink].
///
@@ -41,12 +57,25 @@
/// buffered until the destination is available.
///
/// A destination sink may be set at most once.
+ ///
+ /// Either of [setDestinationSink] or [setError] may be called at most once.
+ /// Trying to call either of them again will fail.
void setDestinationSink(StreamSink<T> destinationSink) {
if (_sink._destinationSink != null) {
throw new StateError("Destination sink already set");
}
_sink._setDestinationSink(destinationSink);
}
+
+ /// Completes this to a closed sink whose [Sink.done] future emits [error].
+ ///
+ /// This is useful when the process of loading the sink fails.
+ ///
+ /// Either of [setDestinationSink] or [setError] may be called at most once.
+ /// Trying to call either of them again will fail.
+ void setError(error, [StackTrace stackTrace]) {
+ setDestinationSink(new NullStreamSink.error(error, stackTrace));
+ }
}
/// [StreamSink] completed by [StreamSinkCompleter].
diff --git a/pubspec.yaml b/pubspec.yaml
index ccfa970..7560836 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
name: async
-version: 1.8.0-dev
+version: 1.8.0
author: Dart Team <misc@dartlang.org>
description: Utility functions and classes related to the 'dart:async' library.
homepage: https://www.github.com/dart-lang/async
diff --git a/test/null_stream_sink_test.dart b/test/null_stream_sink_test.dart
new file mode 100644
index 0000000..244f99c
--- /dev/null
+++ b/test/null_stream_sink_test.dart
@@ -0,0 +1,113 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import "dart:async";
+
+import "package:async/async.dart";
+import "package:test/test.dart";
+
+import "utils.dart";
+
+void main() {
+ group("constructors", () {
+ test("done defaults to a completed future", () {
+ var sink = new NullStreamSink();
+ expect(sink.done, completes);
+ });
+
+ test("a custom future may be passed to done", () async {
+ var completer = new Completer();
+ var sink = new NullStreamSink(done: completer.future);
+
+ var doneFired = false;
+ sink.done.then((_) {
+ doneFired = true;
+ });
+ await flushMicrotasks();
+ expect(doneFired, isFalse);
+
+ completer.complete();
+ await flushMicrotasks();
+ expect(doneFired, isTrue);
+ });
+
+ test("NullStreamSink.error passes an error to done", () {
+ var sink = new NullStreamSink.error("oh no");
+ expect(sink.done, throwsA("oh no"));
+ });
+ });
+
+ group("events", () {
+ test("are silently dropped before close", () {
+ var sink = new NullStreamSink();
+ sink.add(1);
+ sink.addError("oh no");
+ });
+
+ test("throw StateErrors after close", () {
+ var sink = new NullStreamSink();
+ expect(sink.close(), completes);
+
+ expect(() => sink.add(1), throwsStateError);
+ expect(() => sink.addError("oh no"), throwsStateError);
+ expect(() => sink.addStream(new Stream.empty()), throwsStateError);
+ });
+
+ group("addStream", () {
+ test("listens to the stream then cancels immediately", () async {
+ var sink = new NullStreamSink();
+ var canceled = false;
+ var controller = new StreamController(onCancel: () {
+ canceled = true;
+ });
+
+ expect(sink.addStream(controller.stream), completes);
+ await flushMicrotasks();
+ expect(canceled, isTrue);
+ });
+
+ test("returns the cancel future", () async {
+ var completer = new Completer();
+ var sink = new NullStreamSink();
+ var controller = new StreamController(onCancel: () => completer.future);
+
+ var addStreamFired = false;
+ sink.addStream(controller.stream).then((_) {
+ addStreamFired = true;
+ });
+ await flushMicrotasks();
+ expect(addStreamFired, isFalse);
+
+ completer.complete();
+ await flushMicrotasks();
+ expect(addStreamFired, isTrue);
+ });
+
+ test("pipes errors from the cancel future through addStream", () async {
+ var sink = new NullStreamSink();
+ var controller = new StreamController(onCancel: () => throw "oh no");
+ expect(sink.addStream(controller.stream), throwsA("oh no"));
+ });
+
+ test("causes events to throw StateErrors until the future completes",
+ () async {
+ var sink = new NullStreamSink();
+ var future = sink.addStream(new Stream.empty());
+ expect(() => sink.add(1), throwsStateError);
+ expect(() => sink.addError("oh no"), throwsStateError);
+ expect(() => sink.addStream(new Stream.empty()), throwsStateError);
+
+ await future;
+ sink.add(1);
+ sink.addError("oh no");
+ expect(sink.addStream(new Stream.empty()), completes);
+ });
+ });
+ });
+
+ test("close returns the done future", () {
+ var sink = new NullStreamSink.error("oh no");
+ expect(sink.close(), throwsA("oh no"));
+ });
+}
diff --git a/test/stream_completer_test.dart b/test/stream_completer_test.dart
index cd3ceb9..76d81d1 100644
--- a/test/stream_completer_test.dart
+++ b/test/stream_completer_test.dart
@@ -337,6 +337,34 @@
});
expect(controller.hasListener, isFalse);
});
+
+ group("setError()", () {
+ test("produces a stream that emits a single error", () {
+ var completer = new StreamCompleter();
+ completer.stream.listen(
+ unreachable("data"),
+ onError: expectAsync((error, stackTrace) {
+ expect(error, equals("oh no"));
+ }),
+ onDone: expectAsync(() {}));
+
+ completer.setError("oh no");
+ });
+
+ test("produces a stream that emits a single error on a later listen",
+ () async {
+ var completer = new StreamCompleter();
+ completer.setError("oh no");
+ await flushMicrotasks();
+
+ completer.stream.listen(
+ unreachable("data"),
+ onError: expectAsync((error, stackTrace) {
+ expect(error, equals("oh no"));
+ }),
+ onDone: expectAsync(() {}));
+ });
+ });
}
Stream<int> createStream() async* {
diff --git a/test/stream_sink_completer_test.dart b/test/stream_sink_completer_test.dart
index 3892d26..82393fc 100644
--- a/test/stream_sink_completer_test.dart
+++ b/test/stream_sink_completer_test.dart
@@ -245,6 +245,48 @@
expect(completer.sink.close(), completes);
});
+ group("fromFuture()", () {
+ test("with a successful completion", () async {
+ var futureCompleter = new Completer();
+ var sink = StreamSinkCompleter.fromFuture(futureCompleter.future);
+ sink.add(1);
+ sink.add(2);
+ sink.add(3);
+ sink.close();
+
+ var testSink = new TestSink();
+ futureCompleter.complete(testSink);
+ await testSink.done;
+
+ expect(testSink.results[0].asValue.value, equals(1));
+ expect(testSink.results[1].asValue.value, equals(2));
+ expect(testSink.results[2].asValue.value, equals(3));
+ });
+
+ test("with an error", () async {
+ var futureCompleter = new Completer();
+ var sink = StreamSinkCompleter.fromFuture(futureCompleter.future);
+ expect(sink.done, throwsA("oh no"));
+ futureCompleter.completeError("oh no");
+ });
+ });
+
+ group("setError()", () {
+ test("produces a closed sink with the error", () {
+ completer.setError("oh no");
+ expect(completer.sink.done, throwsA("oh no"));
+ expect(completer.sink.close(), throwsA("oh no"));
+ });
+
+ test("produces an error even if done was accessed earlier", () async {
+ expect(completer.sink.done, throwsA("oh no"));
+ expect(completer.sink.close(), throwsA("oh no"));
+ await flushMicrotasks();
+
+ completer.setError("oh no");
+ });
+ });
+
test("doesn't allow the destination sink to be set multiple times", () {
completer.setDestinationSink(new TestSink());
expect(() => completer.setDestinationSink(new TestSink()),