Version 1.20.0-dev.3.0
Merge commit '70c48b9a2ed6249495b69500622238b81c0bfb62' into dev
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 315e870..64e4b19 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -13,6 +13,10 @@
* `dart:async`
* `Future.wait` now catches synchronous errors and returns them in the
returned Future.
+ * More aggressively returns a Future on Stream.cancel operations.
+ Discourages to return `null` from `cancel`.
+ * Fixes a few bugs where the cancel future wasn't passed through
+ transformations.
* `dart:io`
* Added `WebSocket.addUtf8Text` to allow sending a pre-encoded text message
without a round-trip UTF-8 conversion.
diff --git a/pkg/compiler/lib/src/js_emitter/headers.dart b/pkg/compiler/lib/src/js_emitter/headers.dart
index 2bdb761..905cb04 100644
--- a/pkg/compiler/lib/src/js_emitter/headers.dart
+++ b/pkg/compiler/lib/src/js_emitter/headers.dart
@@ -28,7 +28,7 @@
// [args] is passed to [dartMainRunner].
//
// dartDeferredLibraryLoader(uri, successCallback, errorCallback):
-// if this function is defined, it will be called when a deferered library
+// if this function is defined, it will be called when a deferred library
// is loaded. It should load and eval the javascript of `uri`, and call
// successCallback. If it fails to do so, it should call errorCallback with
// an error.
diff --git a/sdk/lib/async/future.dart b/sdk/lib/async/future.dart
index c64e1c5..7d892c6 100644
--- a/sdk/lib/async/future.dart
+++ b/sdk/lib/async/future.dart
@@ -322,8 +322,13 @@
// The error must have been thrown while iterating over the futures
// list, or while installing a callback handler on the future.
if (remaining == 0 || eagerError) {
- // Just complete the error immediately.
- result._completeError(e, st);
+ // Throw a new Future.error.
+ // Don't just call `result._completeError` since that would propagate
+ // the error too eagerly, not giving the callers time to install
+ // error handlers.
+ // Also, don't use `_asyncCompleteError` since that one doesn't give
+ // zones the chance to intercept the error.
+ return new Future.error(e, st);
} else {
// Don't allocate a list for values, thus indicating that there was an
// error.
diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart
index 7661d9f..f4abf9a 100644
--- a/sdk/lib/async/stream.dart
+++ b/sdk/lib/async/stream.dart
@@ -232,6 +232,7 @@
onCancel: () {
if (timer != null) timer.cancel();
timer = null;
+ return Future._nullFuture;
});
return controller.stream;
}
@@ -441,7 +442,7 @@
onListen: onListen,
onPause: () { subscription.pause(); },
onResume: () { subscription.resume(); },
- onCancel: () { subscription.cancel(); },
+ onCancel: () => subscription.cancel(),
sync: true
);
}
@@ -499,7 +500,7 @@
onListen: onListen,
onPause: () { subscription.pause(); },
onResume: () { subscription.resume(); },
- onCancel: () { subscription.cancel(); },
+ onCancel: () => subscription.cancel(),
sync: true
);
}
@@ -1407,7 +1408,10 @@
* the subscription is canceled.
*
* Returns a future that is completed once the stream has finished
- * its cleanup. May also return `null` if no cleanup was necessary.
+ * its cleanup.
+ *
+ * For historical reasons, may also return `null` if no cleanup was necessary.
+ * Returning `null` is deprecated and should be avoided.
*
* Typically, futures are returned when the stream needs to release resources.
* For example, a stream might need to close an open file (as an asynchronous
@@ -1711,7 +1715,7 @@
* },
* onPause: () { subscription.pause(); },
* onResume: () { subscription.resume(); },
- * onCancel: () { subscription.cancel(); },
+ * onCancel: () => subscription.cancel(),
* sync: true);
* return controller.stream.listen(null);
* });
diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart
index 8c9af64..cb2e20d 100644
--- a/sdk/lib/async/stream_impl.dart
+++ b/sdk/lib/async/stream_impl.dart
@@ -188,9 +188,10 @@
// error or done event pending (waiting for the cancel to be done) discard
// that event.
_state &= ~_STATE_WAIT_FOR_CANCEL;
- if (_isCanceled) return _cancelFuture;
- _cancel();
- return _cancelFuture;
+ if (!_isCanceled) {
+ _cancel();
+ }
+ return _cancelFuture ?? Future._nullFuture;
}
Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) {
@@ -199,8 +200,14 @@
// Overwrite the onDone and onError handlers.
_onDone = () { result._complete(futureValue); };
_onError = (error, stackTrace) {
- cancel();
- result._completeError(error, stackTrace);
+ Future cancelFuture = cancel();
+ if (!identical(cancelFuture, Future._nullFuture)) {
+ cancelFuture.whenComplete(() {
+ result._completeError(error, stackTrace);
+ });
+ } else {
+ result._completeError(error, stackTrace);
+ }
};
return result;
@@ -361,7 +368,8 @@
if (_cancelOnError) {
_state |= _STATE_WAIT_FOR_CANCEL;
_cancel();
- if (_cancelFuture is Future) {
+ if (_cancelFuture is Future &&
+ !identical(_cancelFuture, Future._nullFuture)) {
_cancelFuture.whenComplete(sendError);
} else {
sendError();
@@ -389,7 +397,8 @@
_cancel();
_state |= _STATE_WAIT_FOR_CANCEL;
- if (_cancelFuture is Future) {
+ if (_cancelFuture is Future &&
+ !identical(_cancelFuture, Future._nullFuture)) {
_cancelFuture.whenComplete(sendDone);
} else {
sendDone();
@@ -778,7 +787,7 @@
}
}
- Future cancel() => null;
+ Future cancel() => Future._nullFuture;
Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) {
_Future/*<E>*/ result = new _Future/*<E>*/();
@@ -916,7 +925,7 @@
Future cancel() {
_stream._cancelSubscription();
- return null;
+ return Future._nullFuture;
}
bool get isPaused {
@@ -1032,7 +1041,7 @@
Future cancel() {
StreamSubscription subscription = _subscription;
- if (subscription == null) return null;
+ if (subscription == null) return Future._nullFuture;
if (_state == _STATE_MOVING) {
_Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/;
_clear();
diff --git a/sdk/lib/async/stream_pipe.dart b/sdk/lib/async/stream_pipe.dart
index 1125620..d26b0c5 100644
--- a/sdk/lib/async/stream_pipe.dart
+++ b/sdk/lib/async/stream_pipe.dart
@@ -29,7 +29,7 @@
error,
StackTrace stackTrace) {
var cancelFuture = subscription.cancel();
- if (cancelFuture is Future) {
+ if (cancelFuture is Future && !identical(cancelFuture, Future._nullFuture)) {
cancelFuture.whenComplete(() => future._completeError(error, stackTrace));
} else {
future._completeError(error, stackTrace);
@@ -61,7 +61,7 @@
before completing with a value. */
void _cancelAndValue(StreamSubscription subscription, _Future future, value) {
var cancelFuture = subscription.cancel();
- if (cancelFuture is Future) {
+ if (cancelFuture is Future && !identical(cancelFuture, Future._nullFuture)) {
cancelFuture.whenComplete(() => future._complete(value));
} else {
future._complete(value);
diff --git a/sdk/lib/async/stream_transformers.dart b/sdk/lib/async/stream_transformers.dart
index e0982da..7542c6b 100644
--- a/sdk/lib/async/stream_transformers.dart
+++ b/sdk/lib/async/stream_transformers.dart
@@ -109,7 +109,7 @@
if (_isSubscribed) {
StreamSubscription subscription = _subscription;
_subscription = null;
- subscription.cancel();
+ return subscription.cancel();
}
return null;
}
diff --git a/tests/lib/async/future_test.dart b/tests/lib/async/future_test.dart
index 71c509b..5ce2be9 100644
--- a/tests/lib/async/future_test.dart
+++ b/tests/lib/async/future_test.dart
@@ -879,7 +879,6 @@
void testWaitSyncError() {
var cms = const Duration(milliseconds: 100);
var cleanups = new List.filled(3, false);
- var uncaughts = new List.filled(3, false);
asyncStart();
asyncStart();
runZoned(() {
@@ -896,6 +895,43 @@
});
}
+void testWaitSyncError2() {
+ asyncStart();
+ Future.wait([null]).catchError((e, st) {
+ // Makes sure that the `catchError` is invoked.
+ // Regression test: an earlier version of `Future.wait` would propagate
+ // the error too soon for the code to install an error handler.
+ // `testWaitSyncError` didn't show this problem, because the `runZoned`
+ // was already installed.
+ asyncEnd();
+ });
+}
+
+// Future.wait transforms synchronous errors into asynchronous ones.
+// This function tests that zones can intercept them.
+void testWaitSyncError3() {
+ var caughtError;
+ var count = 0;
+
+ AsyncError errorCallback(
+ Zone self, ZoneDelegate parent, Zone zone, Object error,
+ StackTrace stackTrace) {
+ Expect.equals(0, count);
+ count++;
+ caughtError = error;
+ return parent.errorCallback(zone, error, stackTrace);
+ }
+
+ asyncStart();
+ runZoned(() {
+ Future.wait([null]).catchError((e, st) {
+ Expect.identical(e, caughtError);
+ Expect.equals(1, count);
+ asyncEnd();
+ });
+ }, zoneSpecification: new ZoneSpecification(errorCallback: errorCallback));
+}
+
void testBadFuture() {
var bad = new BadFuture();
// Completing with bad future (then call throws) puts error in result.
@@ -1096,6 +1132,8 @@
testWaitCleanUp();
testWaitCleanUpError();
testWaitSyncError();
+ testWaitSyncError2();
+ testWaitSyncError3();
testBadFuture();
diff --git a/tests/lib/async/stream_controller_test.dart b/tests/lib/async/stream_controller_test.dart
index afc7bb5..5b01932 100644
--- a/tests/lib/async/stream_controller_test.dart
+++ b/tests/lib/async/stream_controller_test.dart
@@ -16,237 +16,274 @@
void testMultiController() {
// Test normal flow.
- var c = new StreamController(sync: true);
- Events expectedEvents = new Events()
- ..add(42)
- ..add("dibs")
- ..error("error!")
- ..error("error too!")
- ..close();
- CaptureEvents actualEvents = new Events.capture(c.stream.asBroadcastStream());
- expectedEvents.replay(c);
- Expect.listEquals(expectedEvents.events, actualEvents.events);
+ {
+ var c = new StreamController(sync: true);
+ Events expectedEvents = new Events()
+ ..add(42)
+ ..add("dibs")
+ ..error("error!")
+ ..error("error too!")
+ ..close();
+ CaptureEvents actualEvents =
+ new Events.capture(c.stream.asBroadcastStream());
+ expectedEvents.replay(c);
+ Expect.listEquals(expectedEvents.events, actualEvents.events);
+ }
// Test automatic unsubscription on error.
- c = new StreamController(sync: true);
- expectedEvents = new Events()..add(42)..error("error");
- actualEvents = new Events.capture(c.stream.asBroadcastStream(),
- cancelOnError: true);
- Events sentEvents =
- new Events()..add(42)..error("error")..add("Are you there?");
- sentEvents.replay(c);
- Expect.listEquals(expectedEvents.events, actualEvents.events);
+ {
+ var c = new StreamController(sync: true);
+ var expectedEvents = new Events()..add(42)..error("error");
+ var actualEvents = new Events.capture(c.stream.asBroadcastStream(),
+ cancelOnError: true);
+ Events sentEvents =
+ new Events()..add(42)..error("error")..add("Are you there?");
+ sentEvents.replay(c);
+ Expect.listEquals(expectedEvents.events, actualEvents.events);
+ }
// Test manual unsubscription.
- c = new StreamController(sync: true);
- expectedEvents = new Events()..add(42)..error("error")..add(37);
- actualEvents = new Events.capture(c.stream.asBroadcastStream(),
- cancelOnError: false);
- expectedEvents.replay(c);
- actualEvents.subscription.cancel();
- c.add("Are you there"); // Not sent to actualEvents.
- Expect.listEquals(expectedEvents.events, actualEvents.events);
+ {
+ var c = new StreamController(sync: true);
+ var expectedEvents = new Events()..add(42)..error("error")..add(37);
+ var actualEvents = new Events.capture(c.stream.asBroadcastStream(),
+ cancelOnError: false);
+ expectedEvents.replay(c);
+ actualEvents.subscription.cancel();
+ c.add("Are you there"); // Not sent to actualEvents.
+ Expect.listEquals(expectedEvents.events, actualEvents.events);
+ }
// Test filter.
- c = new StreamController(sync: true);
- expectedEvents = new Events()
- ..add("a string")..add("another string")..close();
- sentEvents = new Events()
- ..add("a string")..add(42)..add("another string")..close();
- actualEvents = new Events.capture(c.stream
- .asBroadcastStream()
- .where((v) => v is String));
- sentEvents.replay(c);
- Expect.listEquals(expectedEvents.events, actualEvents.events);
+ {
+ var c = new StreamController(sync: true);
+ var expectedEvents = new Events()
+ ..add("a string")..add("another string")..close();
+ var sentEvents = new Events()
+ ..add("a string")..add(42)..add("another string")..close();
+ var actualEvents = new Events.capture(c.stream
+ .asBroadcastStream()
+ .where((v) => v is String));
+ sentEvents.replay(c);
+ Expect.listEquals(expectedEvents.events, actualEvents.events);
+ }
// Test map.
- c = new StreamController(sync: true);
- expectedEvents = new Events()..add("abab")..error("error")..close();
- sentEvents = new Events()..add("ab")..error("error")..close();
- actualEvents = new Events.capture(c.stream
- .asBroadcastStream()
- .map((v) => "$v$v"));
- sentEvents.replay(c);
- Expect.listEquals(expectedEvents.events, actualEvents.events);
+ {
+ var c = new StreamController(sync: true);
+ var expectedEvents = new Events()..add("abab")..error("error")..close();
+ var sentEvents = new Events()..add("ab")..error("error")..close();
+ var actualEvents = new Events.capture(c.stream
+ .asBroadcastStream()
+ .map((v) => "$v$v"));
+ sentEvents.replay(c);
+ Expect.listEquals(expectedEvents.events, actualEvents.events);
+ }
// Test handleError.
- c = new StreamController(sync: true);
- expectedEvents = new Events()..add("ab")..error("[foo]");
- sentEvents = new Events()..add("ab")..error("foo")..add("ab")..close();
- actualEvents = new Events.capture(c.stream
- .asBroadcastStream()
- .handleError((error) {
- if (error is String) {
- // TODO(floitsch): this test originally changed the stacktrace.
- throw "[${error}]";
- }
- }), cancelOnError: true);
- sentEvents.replay(c);
- Expect.listEquals(expectedEvents.events, actualEvents.events);
+ {
+ var c = new StreamController(sync: true);
+ var expectedEvents = new Events()..add("ab")..error("[foo]");
+ var sentEvents = new Events()..add("ab")..error("foo")..add("ab")..close();
+ var actualEvents = new Events.capture(c.stream
+ .asBroadcastStream()
+ .handleError((error) {
+ if (error is String) {
+ // TODO(floitsch): this test originally changed the stacktrace.
+ throw "[${error}]";
+ }
+ }), cancelOnError: true);
+ sentEvents.replay(c);
+ Expect.listEquals(expectedEvents.events, actualEvents.events);
+ }
// reduce is tested asynchronously and therefore not in this file.
// Test expand
- c = new StreamController(sync: true);
- sentEvents = new Events()..add(3)..add(2)..add(4)..close();
- expectedEvents = new Events()..add(1)..add(2)..add(3)
- ..add(1)..add(2)
- ..add(1)..add(2)..add(3)..add(4)
- ..close();
- actualEvents = new Events.capture(c.stream.asBroadcastStream().expand((v) {
- var l = [];
- for (int i = 0; i < v; i++) l.add(i + 1);
- return l;
- }));
- sentEvents.replay(c);
- Expect.listEquals(expectedEvents.events, actualEvents.events);
+ {
+ var c = new StreamController(sync: true);
+ var sentEvents = new Events()..add(3)..add(2)..add(4)..close();
+ var expectedEvents = new Events()..add(1)..add(2)..add(3)
+ ..add(1)..add(2)
+ ..add(1)..add(2)..add(3)..add(4)
+ ..close();
+ var actualEvents =
+ new Events.capture(c.stream.asBroadcastStream().expand((v) {
+ var l = [];
+ for (int i = 0; i < v; i++) l.add(i + 1);
+ return l;
+ }));
+ sentEvents.replay(c);
+ Expect.listEquals(expectedEvents.events, actualEvents.events);
+ }
// Test transform.
- c = new StreamController(sync: true);
- sentEvents = new Events()..add("a")..error(42)..add("b")..close();
- expectedEvents =
- new Events()..error("a")..add(42)..error("b")..add("foo")..close();
- actualEvents = new Events.capture(c.stream.asBroadcastStream().transform(
- new StreamTransformer.fromHandlers(
- handleData: (v, s) { s.addError(v); },
- handleError: (e, st, s) { s.add(e); },
- handleDone: (s) {
- s.add("foo");
- s.close();
- })));
- sentEvents.replay(c);
- Expect.listEquals(expectedEvents.events, actualEvents.events);
+ {
+ var c = new StreamController(sync: true);
+ var sentEvents = new Events()..add("a")..error(42)..add("b")..close();
+ var expectedEvents =
+ new Events()..error("a")..add(42)..error("b")..add("foo")..close();
+ var actualEvents =
+ new Events.capture(c.stream.asBroadcastStream().transform(
+ new StreamTransformer.fromHandlers(
+ handleData: (v, s) { s.addError(v); },
+ handleError: (e, st, s) { s.add(e); },
+ handleDone: (s) {
+ s.add("foo");
+ s.close();
+ })));
+ sentEvents.replay(c);
+ Expect.listEquals(expectedEvents.events, actualEvents.events);
+ }
// Test multiple filters.
- c = new StreamController(sync: true);
- sentEvents = new Events()..add(42)
- ..add("snugglefluffy")
- ..add(7)
- ..add("42")
- ..error("not FormatException") // Unsubscribes.
- ..close();
- expectedEvents = new Events()..add(42)..error("not FormatException");
- actualEvents = new Events.capture(
- c.stream.asBroadcastStream().where((v) => v is String)
- .map((v) => int.parse(v))
- .handleError((error) {
- if (error is! FormatException) throw error;
- })
- .where((v) => v > 10),
- cancelOnError: true);
- sentEvents.replay(c);
- Expect.listEquals(expectedEvents.events, actualEvents.events);
+ {
+ var c = new StreamController(sync: true);
+ var sentEvents = new Events()..add(42)
+ ..add("snugglefluffy")
+ ..add(7)
+ ..add("42")
+ ..error("not FormatException") // Unsubscribes.
+ ..close();
+ var expectedEvents = new Events()..add(42)..error("not FormatException");
+ var actualEvents = new Events.capture(
+ c.stream.asBroadcastStream().where((v) => v is String)
+ .map((v) => int.parse(v))
+ .handleError((error) {
+ if (error is! FormatException) throw error;
+ })
+ .where((v) => v > 10),
+ cancelOnError: true);
+ sentEvents.replay(c);
+ Expect.listEquals(expectedEvents.events, actualEvents.events);
+ }
// Test subscription changes while firing.
- c = new StreamController(sync: true);
- var sink = c.sink;
- var stream = c.stream.asBroadcastStream();
- var counter = 0;
- var subscription = stream.listen(null);
- subscription.onData((data) {
- counter += data;
- subscription.cancel();
- stream.listen((data) {
- counter += 10 * data;
+ {
+ var c = new StreamController(sync: true);
+ var sink = c.sink;
+ var stream = c.stream.asBroadcastStream();
+ var counter = 0;
+ var subscription = stream.listen(null);
+ subscription.onData((data) {
+ counter += data;
+ subscription.cancel();
+ stream.listen((data) {
+ counter += 10 * data;
+ });
+ var subscription2 = stream.listen(null);
+ subscription2.onData((data) {
+ counter += 100 * data;
+ if (data == 4) subscription2.cancel();
+ });
});
- var subscription2 = stream.listen(null);
- subscription2.onData((data) {
- counter += 100 * data;
- if (data == 4) subscription2.cancel();
- });
- });
- sink.add(1); // seen by stream 1
- sink.add(2); // seen by stream 10 and 100
- sink.add(3); // -"-
- sink.add(4); // -"-
- sink.add(5); // seen by stream 10
- Expect.equals(1 + 20 + 200 + 30 + 300 + 40 + 400 + 50, counter);
+ sink.add(1); // seen by stream 1
+ sink.add(2); // seen by stream 10 and 100
+ sink.add(3); // -"-
+ sink.add(4); // -"-
+ sink.add(5); // seen by stream 10
+ Expect.equals(1 + 20 + 200 + 30 + 300 + 40 + 400 + 50, counter);
+ }
}
testSingleController() {
// Test normal flow.
- var c = new StreamController(sync: true);
- Events expectedEvents = new Events()
- ..add(42)
- ..add("dibs")
- ..error("error!")
- ..error("error too!")
- ..close();
- CaptureEvents actualEvents = new Events.capture(c.stream);
- expectedEvents.replay(c);
- Expect.listEquals(expectedEvents.events, actualEvents.events);
+ {
+ var c = new StreamController(sync: true);
+ Events expectedEvents = new Events()
+ ..add(42)
+ ..add("dibs")
+ ..error("error!")
+ ..error("error too!")
+ ..close();
+ CaptureEvents actualEvents = new Events.capture(c.stream);
+ expectedEvents.replay(c);
+ Expect.listEquals(expectedEvents.events, actualEvents.events);
+ }
// Test automatic unsubscription on error.
- c = new StreamController(sync: true);
- expectedEvents = new Events()..add(42)..error("error");
- actualEvents = new Events.capture(c.stream, cancelOnError: true);
- Events sentEvents =
- new Events()..add(42)..error("error")..add("Are you there?");
- sentEvents.replay(c);
- Expect.listEquals(expectedEvents.events, actualEvents.events);
+ {
+ var c = new StreamController(sync: true);
+ var expectedEvents = new Events()..add(42)..error("error");
+ var actualEvents = new Events.capture(c.stream, cancelOnError: true);
+ Events sentEvents =
+ new Events()..add(42)..error("error")..add("Are you there?");
+ sentEvents.replay(c);
+ Expect.listEquals(expectedEvents.events, actualEvents.events);
+ }
// Test manual unsubscription.
- c = new StreamController(sync: true);
- expectedEvents = new Events()..add(42)..error("error")..add(37);
- actualEvents = new Events.capture(c.stream, cancelOnError: false);
- expectedEvents.replay(c);
- actualEvents.subscription.cancel();
- c.add("Are you there"); // Not sent to actualEvents.
- Expect.listEquals(expectedEvents.events, actualEvents.events);
+ {
+ var c = new StreamController(sync: true);
+ var expectedEvents = new Events()..add(42)..error("error")..add(37);
+ var actualEvents = new Events.capture(c.stream, cancelOnError: false);
+ expectedEvents.replay(c);
+ actualEvents.subscription.cancel();
+ c.add("Are you there"); // Not sent to actualEvents.
+ Expect.listEquals(expectedEvents.events, actualEvents.events);
+ }
// Test filter.
- c = new StreamController(sync: true);
- expectedEvents = new Events()
- ..add("a string")..add("another string")..close();
- sentEvents = new Events()
- ..add("a string")..add(42)..add("another string")..close();
- actualEvents = new Events.capture(c.stream.where((v) => v is String));
- sentEvents.replay(c);
- Expect.listEquals(expectedEvents.events, actualEvents.events);
+ {
+ var c = new StreamController(sync: true);
+ var expectedEvents = new Events()
+ ..add("a string")..add("another string")..close();
+ var sentEvents = new Events()
+ ..add("a string")..add(42)..add("another string")..close();
+ var actualEvents = new Events.capture(c.stream.where((v) => v is String));
+ sentEvents.replay(c);
+ Expect.listEquals(expectedEvents.events, actualEvents.events);
+ }
// Test map.
- c = new StreamController(sync: true);
- expectedEvents = new Events()..add("abab")..error("error")..close();
- sentEvents = new Events()..add("ab")..error("error")..close();
- actualEvents = new Events.capture(c.stream.map((v) => "$v$v"));
- sentEvents.replay(c);
- Expect.listEquals(expectedEvents.events, actualEvents.events);
+ {
+ var c = new StreamController(sync: true);
+ var expectedEvents = new Events()..add("abab")..error("error")..close();
+ var sentEvents = new Events()..add("ab")..error("error")..close();
+ var actualEvents = new Events.capture(c.stream.map((v) => "$v$v"));
+ sentEvents.replay(c);
+ Expect.listEquals(expectedEvents.events, actualEvents.events);
+ }
// Test handleError.
- c = new StreamController(sync: true);
- expectedEvents = new Events()..add("ab")..error("[foo]");
- sentEvents = new Events()..add("ab")..error("foo")..add("ab")..close();
- actualEvents = new Events.capture(c.stream.handleError((error) {
- if (error is String) {
- // TODO(floitsch): this error originally changed the stack trace.
- throw "[${error}]";
- }
- }), cancelOnError: true);
- sentEvents.replay(c);
- Expect.listEquals(expectedEvents.events, actualEvents.events);
+ {
+ var c = new StreamController(sync: true);
+ var expectedEvents = new Events()..add("ab")..error("[foo]");
+ var sentEvents = new Events()..add("ab")..error("foo")..add("ab")..close();
+ var actualEvents = new Events.capture(c.stream.handleError((error) {
+ if (error is String) {
+ // TODO(floitsch): this error originally changed the stack trace.
+ throw "[${error}]";
+ }
+ }), cancelOnError: true);
+ sentEvents.replay(c);
+ Expect.listEquals(expectedEvents.events, actualEvents.events);
+ }
// reduce is tested asynchronously and therefore not in this file.
// Test expand
- c = new StreamController(sync: true);
- sentEvents = new Events()..add(3)..add(2)..add(4)..close();
- expectedEvents = new Events()..add(1)..add(2)..add(3)
- ..add(1)..add(2)
- ..add(1)..add(2)..add(3)..add(4)
- ..close();
- actualEvents = new Events.capture(c.stream.expand((v) {
- var l = [];
- for (int i = 0; i < v; i++) l.add(i + 1);
- return l;
- }));
- sentEvents.replay(c);
- Expect.listEquals(expectedEvents.events, actualEvents.events);
+ {
+ var c = new StreamController(sync: true);
+ var sentEvents = new Events()..add(3)..add(2)..add(4)..close();
+ var expectedEvents = new Events()..add(1)..add(2)..add(3)
+ ..add(1)..add(2)
+ ..add(1)..add(2)..add(3)..add(4)
+ ..close();
+ var actualEvents = new Events.capture(c.stream.expand((v) {
+ var l = [];
+ for (int i = 0; i < v; i++) l.add(i + 1);
+ return l;
+ }));
+ sentEvents.replay(c);
+ Expect.listEquals(expectedEvents.events, actualEvents.events);
+ }
// test contains.
{
- c = new StreamController(sync: true);
+ var c = new StreamController(sync: true);
// Error after match is not important.
- sentEvents = new Events()..add("a")..add("x")..error("FAIL")..close();
+ var sentEvents = new Events()..add("a")..add("x")..error("FAIL")..close();
Future<bool> contains = c.stream.contains("x");
contains.then((var c) {
Expect.isTrue(c);
@@ -255,9 +292,9 @@
}
{
- c = new StreamController(sync: true);
+ var c = new StreamController(sync: true);
// Not matching is ok.
- sentEvents = new Events()..add("a")..add("x")..add("b")..close();
+ var sentEvents = new Events()..add("a")..add("x")..add("b")..close();
Future<bool> contains = c.stream.contains("y");
contains.then((var c) {
Expect.isFalse(c);
@@ -266,9 +303,9 @@
}
{
- c = new StreamController(sync: true);
+ var c = new StreamController(sync: true);
// Error before match makes future err.
- sentEvents = new Events()..add("a")..error("FAIL")..add("b")..close();
+ var sentEvents = new Events()..add("a")..error("FAIL")..add("b")..close();
Future<bool> contains = c.stream.contains("b");
contains.then((var c) {
Expect.fail("no value expected");
@@ -279,51 +316,57 @@
}
// Test transform.
- c = new StreamController(sync: true);
- sentEvents = new Events()..add("a")..error(42)..add("b")..close();
- expectedEvents =
- new Events()..error("a")..add(42)..error("b")..add("foo")..close();
- actualEvents = new Events.capture(c.stream.transform(
- new StreamTransformer.fromHandlers(
- handleData: (v, s) { s.addError(v); },
- handleError: (e, st, s) { s.add(e); },
- handleDone: (s) {
- s.add("foo");
- s.close();
- })));
- sentEvents.replay(c);
- Expect.listEquals(expectedEvents.events, actualEvents.events);
+ {
+ var c = new StreamController(sync: true);
+ var sentEvents = new Events()..add("a")..error(42)..add("b")..close();
+ var expectedEvents =
+ new Events()..error("a")..add(42)..error("b")..add("foo")..close();
+ var actualEvents = new Events.capture(c.stream.transform(
+ new StreamTransformer.fromHandlers(
+ handleData: (v, s) { s.addError(v); },
+ handleError: (e, st, s) { s.add(e); },
+ handleDone: (s) {
+ s.add("foo");
+ s.close();
+ })));
+ sentEvents.replay(c);
+ Expect.listEquals(expectedEvents.events, actualEvents.events);
+ }
// Test multiple filters.
- c = new StreamController(sync: true);
- sentEvents = new Events()..add(42)
- ..add("snugglefluffy")
- ..add(7)
- ..add("42")
- ..error("not FormatException") // Unsubscribes.
- ..close();
- expectedEvents = new Events()..add(42)..error("not FormatException");
- actualEvents = new Events.capture(
- c.stream.where((v) => v is String)
- .map((v) => int.parse(v))
- .handleError((error) {
- if (error is! FormatException) throw error;
- })
- .where((v) => v > 10),
- cancelOnError: true);
- sentEvents.replay(c);
- Expect.listEquals(expectedEvents.events, actualEvents.events);
+ {
+ var c = new StreamController(sync: true);
+ var sentEvents = new Events()..add(42)
+ ..add("snugglefluffy")
+ ..add(7)
+ ..add("42")
+ ..error("not FormatException") // Unsubscribes.
+ ..close();
+ var expectedEvents = new Events()..add(42)..error("not FormatException");
+ var actualEvents = new Events.capture(
+ c.stream.where((v) => v is String)
+ .map((v) => int.parse(v))
+ .handleError((error) {
+ if (error is! FormatException) throw error;
+ })
+ .where((v) => v > 10),
+ cancelOnError: true);
+ sentEvents.replay(c);
+ Expect.listEquals(expectedEvents.events, actualEvents.events);
+ }
// Test that only one subscription is allowed.
- c = new StreamController(sync: true);
- var sink = c.sink;
- var stream = c.stream;
- var counter = 0;
- var subscription = stream.listen((data) { counter += data; });
- Expect.throws(() => stream.listen(null), (e) => e is StateError);
- sink.add(1);
- Expect.equals(1, counter);
- c.close();
+ {
+ var c = new StreamController(sync: true);
+ var sink = c.sink;
+ var stream = c.stream;
+ var counter = 0;
+ var subscription = stream.listen((data) { counter += data; });
+ Expect.throws(() => stream.listen(null), (e) => e is StateError);
+ sink.add(1);
+ Expect.equals(1, counter);
+ c.close();
+ }
}
testExtraMethods() {
diff --git a/tests/lib/async/stream_periodic_test.dart b/tests/lib/async/stream_periodic_test.dart
index b333966..f755d76 100644
--- a/tests/lib/async/stream_periodic_test.dart
+++ b/tests/lib/async/stream_periodic_test.dart
@@ -16,7 +16,10 @@
subscription = stream.listen(expectAsync((data) {
expect(data, isNull);
receivedCount++;
- if (receivedCount == 5) subscription.cancel();
+ if (receivedCount == 5) {
+ var future = subscription.cancel();
+ expect(future, completes);
+ }
}, count: 5));
});
}
diff --git a/tests/lib/async/stream_subscription_as_future_test.dart b/tests/lib/async/stream_subscription_as_future_test.dart
index ac1293b..841298b 100644
--- a/tests/lib/async/stream_subscription_as_future_test.dart
+++ b/tests/lib/async/stream_subscription_as_future_test.dart
@@ -10,7 +10,7 @@
import 'package:unittest/unittest.dart';
main() {
- test("subscription.asStream success", () {
+ test("subscription.asFuture success", () {
Stream stream = new Stream.fromIterable([1, 2, 3]);
var output = [];
var subscription = stream.listen((x) { output.add(x); });
@@ -19,7 +19,7 @@
}));
});
- test("subscription.asStream success2", () {
+ test("subscription.asFuture success2", () {
StreamController controller = new StreamController(sync: true);
[1, 2, 3].forEach(controller.add);
controller.close();
@@ -31,7 +31,7 @@
}));
});
- test("subscription.asStream success 3", () {
+ test("subscription.asFuture success 3", () {
Stream stream = new Stream.fromIterable([1, 2, 3]).map((x) => x);
var output = [];
var subscription = stream.listen((x) { output.add(x); });
@@ -40,7 +40,7 @@
}));
});
- test("subscription.asStream different type", () {
+ test("subscription.asFuture different type", () {
Stream stream = new Stream<int>.fromIterable([1, 2, 3]);
var asyncCallback = expectAsync(() => {});
var output = [];
@@ -52,7 +52,7 @@
});
});
- test("subscription.asStream failure", () {
+ test("subscription.asFuture failure", () {
StreamController controller = new StreamController(sync: true);
[1, 2, 3].forEach(controller.add);
controller.addError("foo");
@@ -65,7 +65,7 @@
}));
});
- test("subscription.asStream failure2", () {
+ test("subscription.asFuture failure2", () {
Stream stream = new Stream.fromIterable([1, 2, 3, 4])
.map((x) {
if (x == 4) throw "foo";
@@ -77,4 +77,50 @@
Expect.equals(error, "foo");
}));
});
+
+ test("subscription.asFuture delayed cancel", () {
+ var completer = new Completer();
+ var controller =
+ new StreamController(onCancel: () => completer.future, sync: true);
+ [1, 2, 3].forEach(controller.add);
+ controller.addError("foo");
+ controller.close();
+ Stream stream = controller.stream;
+ var output = [];
+ var subscription = stream.listen((x) { output.add(x); });
+ bool catchErrorHasRun = false;
+ subscription.asFuture(output).catchError(expectAsync((error) {
+ Expect.equals(error, "foo");
+ catchErrorHasRun = true;
+ }));
+ Timer.run(expectAsync(() {
+ Expect.isFalse(catchErrorHasRun);
+ completer.complete();
+ }));
+ });
+
+ test("subscription.asFuture failure in cancel", () {
+ runZoned(() {
+ var completer = new Completer();
+ var controller =
+ new StreamController(onCancel: () => completer.future, sync: true);
+ [1, 2, 3].forEach(controller.add);
+ controller.addError("foo");
+ controller.close();
+ Stream stream = controller.stream;
+ var output = [];
+ var subscription = stream.listen((x) { output.add(x); });
+ bool catchErrorHasRun = false;
+ subscription.asFuture(output).catchError(expectAsync((error) {
+ Expect.equals(error, "foo");
+ catchErrorHasRun = true;
+ }));
+ Timer.run(expectAsync(() {
+ Expect.isFalse(catchErrorHasRun);
+ completer.completeError(499);
+ }));
+ }, onError: expectAsync((e) {
+ Expect.equals(499, e);
+ }));
+ });
}
diff --git a/tests/lib/async/stream_subscription_cancel_test.dart b/tests/lib/async/stream_subscription_cancel_test.dart
index d6c6987..48649b2 100644
--- a/tests/lib/async/stream_subscription_cancel_test.dart
+++ b/tests/lib/async/stream_subscription_cancel_test.dart
@@ -27,17 +27,22 @@
test('subscription.cancel after close', () {
var completer = new Completer();
StreamController controller = new StreamController(
- onCancel: completer.complete);
+ onCancel: () {
+ completer.complete();
+ return completer.future;
+ });
controller.close();
+ var completer2 = new Completer();
var sub;
void onDone() {
- expect(sub.cancel(), isNull);
+ sub.cancel().then(completer2.complete);
}
sub = controller.stream.listen(null, onDone: onDone);
expect(completer.future, completes);
+ expect(completer2.future, completes);
});
test('subscription.cancel after error', () {
@@ -135,4 +140,183 @@
.cancel();
expect(doneCompleter.future, completion(equals(true)));
});
+
+ test('subscription.cancel through map', () {
+ var completer = new Completer();
+ StreamController controller = new StreamController(
+ onCancel: () => completer.future);
+
+ bool done = false;
+ var future = controller.stream.map((x) => x).listen(null).cancel();
+
+ expect(future.then((_) => done = true), completion(equals(true)));
+
+ Timer.run(() {
+ expect(done, isFalse);
+ completer.complete();
+ });
+ });
+
+ test('subscription.cancel through asyncMap', () {
+ var completer = new Completer();
+ StreamController controller = new StreamController(
+ onCancel: () => completer.future);
+
+ bool done = false;
+ var future = controller.stream.asyncMap((x) => x).listen(null).cancel();
+
+ expect(future.then((_) => done = true), completion(equals(true)));
+
+ Timer.run(() {
+ expect(done, isFalse);
+ completer.complete();
+ });
+ });
+
+ test('subscription.cancel through asyncExpand', () {
+ var completer = new Completer();
+ StreamController controller = new StreamController(
+ onCancel: () => completer.future);
+
+ bool done = false;
+ var future = controller.stream.asyncExpand((x) => x).listen(null).cancel();
+
+ expect(future.then((_) => done = true), completion(equals(true)));
+
+ Timer.run(() {
+ expect(done, isFalse);
+ completer.complete();
+ });
+ });
+
+ test('subscription.cancel through handleError', () {
+ var completer = new Completer();
+ StreamController controller = new StreamController(
+ onCancel: () => completer.future);
+
+ bool done = false;
+ var future = controller.stream.handleError((x) => x).listen(null).cancel();
+
+ expect(future.then((_) => done = true), completion(equals(true)));
+
+ Timer.run(() {
+ expect(done, isFalse);
+ completer.complete();
+ });
+ });
+
+ test('subscription.cancel through skip', () {
+ var completer = new Completer();
+ StreamController controller = new StreamController(
+ onCancel: () => completer.future);
+
+ bool done = false;
+ var future = controller.stream.skip(1).listen(null).cancel();
+
+ expect(future.then((_) => done = true), completion(equals(true)));
+
+ Timer.run(() {
+ expect(done, isFalse);
+ completer.complete();
+ });
+ });
+
+ test('subscription.cancel through take', () {
+ var completer = new Completer();
+ StreamController controller = new StreamController(
+ onCancel: () => completer.future);
+
+ bool done = false;
+ var future = controller.stream.take(1).listen(null).cancel();
+
+ expect(future.then((_) => done = true), completion(equals(true)));
+
+ Timer.run(() {
+ expect(done, isFalse);
+ completer.complete();
+ });
+ });
+
+ test('subscription.cancel through skipWhile', () {
+ var completer = new Completer();
+ StreamController controller = new StreamController(
+ onCancel: () => completer.future);
+
+ bool done = false;
+ var future = controller.stream.skipWhile((x) => true).listen(null).cancel();
+
+ expect(future.then((_) => done = true), completion(equals(true)));
+
+ Timer.run(() {
+ expect(done, isFalse);
+ completer.complete();
+ });
+ });
+
+ test('subscription.cancel through takeWhile', () {
+ var completer = new Completer();
+ StreamController controller = new StreamController(
+ onCancel: () => completer.future);
+
+ bool done = false;
+ var future = controller.stream.takeWhile((x) => true).listen(null).cancel();
+
+ expect(future.then((_) => done = true), completion(equals(true)));
+
+ Timer.run(() {
+ expect(done, isFalse);
+ completer.complete();
+ });
+ });
+
+ test('subscription.cancel through timeOut', () {
+ var completer = new Completer();
+ StreamController controller = new StreamController(
+ onCancel: () => completer.future);
+
+ bool done = false;
+ var duration = const Duration(hours: 5);
+ var future = controller.stream.timeout(duration).listen(null).cancel();
+
+ expect(future.then((_) => done = true), completion(equals(true)));
+
+ Timer.run(() {
+ expect(done, isFalse);
+ completer.complete();
+ });
+ });
+
+ test('subscription.cancel through transform', () {
+ var completer = new Completer();
+ StreamController controller = new StreamController(
+ onCancel: () => completer.future);
+
+ bool done = false;
+ var transformer =
+ new StreamTransformer.fromHandlers(handleData: (x, y) {});
+ var future = controller.stream.transform(transformer).listen(null).cancel();
+
+ expect(future.then((_) => done = true), completion(equals(true)));
+
+ Timer.run(() {
+ expect(done, isFalse);
+ completer.complete();
+ });
+ });
+
+ test('subscription.cancel through where', () {
+ var completer = new Completer();
+ StreamController controller = new StreamController(
+ onCancel: () => completer.future);
+
+ bool done = false;
+ var future = controller.stream.where((x) => true).listen(null).cancel();
+
+ expect(future.then((_) => done = true), completion(equals(true)));
+
+ Timer.run(() {
+ expect(done, isFalse);
+ completer.complete();
+ });
+ });
}
diff --git a/tests/lib/lib.status b/tests/lib/lib.status
index c9d0afc..99f0273 100644
--- a/tests/lib/lib.status
+++ b/tests/lib/lib.status
@@ -187,6 +187,7 @@
async/stream_asyncmap_test: RuntimeError # Timer interface not supported: Issue 7728.
async/stream_transformation_broadcast_test: RuntimeError # Timer interface not supported: Issue 7728.
async/stream_controller_test: Fail # Timer interface not supported: Issue 7728.
+async/stream_subscription_cancel_test: Fail # Timer interface not supported: Issue 7728.
async/future_constructor2_test: Fail # Timer interface not supported: Issue 7728.
mirrors/mirrors_reader_test: Skip # Running in v8 suffices. Issue 16589 - RuntimeError. Issue 22130 - Crash (out of memory).
diff --git a/tools/VERSION b/tools/VERSION
index c335c43..b75955c 100644
--- a/tools/VERSION
+++ b/tools/VERSION
@@ -27,5 +27,5 @@
MAJOR 1
MINOR 20
PATCH 0
-PRERELEASE 2
+PRERELEASE 3
PRERELEASE_PATCH 0