Add a subscriptionTransformer() function. (#10)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 615e33e..43fcf54 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -9,6 +9,9 @@
* Add `StreamQueue.cancelable()`, which allows users to easily make a
`CancelableOperation` that can be canceled without affecting the queue.
+* Add a `subscriptionTransformer()` function to create `StreamTransformer`s that
+ modify the behavior of subscriptions to a stream.
+
## 1.11.3
* Fix strong-mode warning against the signature of Future.then
diff --git a/lib/async.dart b/lib/async.dart
index 2526ae5..c11f0b5 100644
--- a/lib/async.dart
+++ b/lib/async.dart
@@ -29,6 +29,7 @@
export "src/stream_sink_completer.dart";
export "src/stream_sink_transformer.dart";
export "src/stream_splitter.dart";
+export "src/stream_subscription_transformer.dart";
export "src/stream_zip.dart";
export "src/subscription_stream.dart";
export "src/typed_stream_transformer.dart";
diff --git a/lib/src/stream_subscription_transformer.dart b/lib/src/stream_subscription_transformer.dart
new file mode 100644
index 0000000..060f466
--- /dev/null
+++ b/lib/src/stream_subscription_transformer.dart
@@ -0,0 +1,105 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'async_memoizer.dart';
+import 'delegate/stream_subscription.dart';
+
+typedef Future _AsyncHandler<T>(StreamSubscription<T> inner);
+
+typedef void _VoidHandler<T>(StreamSubscription<T> inner);
+
+/// Creates a [StreamTransformer] that modifies the behavior of subscriptions to
+/// a stream.
+///
+/// When [StreamSubscription.cancel], [StreamSubscription.pause], or
+/// [StreamSubscription.resume] is called, the corresponding handler is invoked.
+/// By default, handlers just forward to the underlying subscription.
+///
+/// Guarantees that none of the [StreamSubscription] callbacks and none of the
+/// callbacks passed to `subscriptionTransformer()` will be invoked once the
+/// transformed [StreamSubscription] has been canceled and `handleCancel()` has
+/// run. The [handlePause] and [handleResume] are invoked regardless of whether
+/// the subscription is paused already or not.
+///
+/// In order to preserve [StreamSubscription] guarantees, **all callbacks must
+/// synchronously call the corresponding method** on the inner
+/// [StreamSubscription]: [handleCancel] must call `cancel()`, [handlePause]
+/// must call `pause()`, and [handleResume] must call `resume()`.
+StreamTransformer/*<T, T>*/ subscriptionTransformer/*<T>*/(
+ {Future handleCancel(StreamSubscription/*<T>*/ inner),
+ void handlePause(StreamSubscription/*<T>*/ inner),
+ void handleResume(StreamSubscription/*<T>*/ inner)}) {
+ return new StreamTransformer((stream, cancelOnError) {
+ return new _TransformedSubscription(
+ stream.listen(null, cancelOnError: cancelOnError),
+ handleCancel ?? (inner) => inner.cancel(),
+ handlePause ?? (inner) {
+ inner.pause();
+ },
+ handleResume ?? (inner) {
+ inner.resume();
+ });
+ });
+}
+
+/// A [StreamSubscription] wrapper that calls callbacks for subscription
+/// methods.
+class _TransformedSubscription<T> implements StreamSubscription<T> {
+ /// The wrapped subscription.
+ StreamSubscription<T> _inner;
+
+ /// The callback to run when [cancel] is called.
+ final _AsyncHandler<T> _handleCancel;
+
+ /// The callback to run when [pause] is called.
+ final _VoidHandler<T> _handlePause;
+
+ /// The callback to run when [resume] is called.
+ final _VoidHandler<T> _handleResume;
+
+ bool get isPaused => _inner?.isPaused ?? false;
+
+ _TransformedSubscription(this._inner, this._handleCancel, this._handlePause,
+ this._handleResume);
+
+ void onData(void handleData(T data)) {
+ _inner?.onData(handleData);
+ }
+
+ void onError(Function handleError) {
+ _inner?.onError(handleError);
+ }
+
+ void onDone(void handleDone()) {
+ _inner?.onDone(handleDone);
+ }
+
+ Future cancel() => _cancelMemoizer.runOnce(() {
+ var inner = _inner;
+ _inner.onData(null);
+ _inner.onDone(null);
+
+ // Setting onError to null will cause errors to be top-leveled.
+ _inner.onError((_, __) {});
+ _inner = null;
+ return _handleCancel(inner);
+ });
+ final _cancelMemoizer = new AsyncMemoizer();
+
+ void pause([Future resumeFuture]) {
+ if (_cancelMemoizer.hasRun) return;
+ if (resumeFuture != null) resumeFuture.whenComplete(resume);
+ _handlePause(_inner);
+ }
+
+ void resume() {
+ if (_cancelMemoizer.hasRun) return;
+ _handleResume(_inner);
+ }
+
+ Future/*<E>*/ asFuture/*<E>*/([/*=E*/ futureValue]) =>
+ _inner?.asFuture(futureValue) ?? new Completer().future;
+}
diff --git a/test/subscription_transformer_test.dart b/test/subscription_transformer_test.dart
new file mode 100644
index 0000000..cdac4f8
--- /dev/null
+++ b/test/subscription_transformer_test.dart
@@ -0,0 +1,292 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:async/async.dart';
+import 'package:test/test.dart';
+
+import 'utils.dart';
+
+void main() {
+ group("with no callbacks", () {
+ test("forwards cancellation", () async {
+ var isCanceled = false;
+ var cancelCompleter = new Completer();
+ var controller = new StreamController(onCancel: expectAsync0(() {
+ isCanceled = true;
+ return cancelCompleter.future;
+ }));
+ var subscription = controller.stream
+ .transform(subscriptionTransformer())
+ .listen(expectAsync1((_) {}, count: 0));
+
+ var cancelFired = false;
+ subscription.cancel().then(expectAsync1((_) {
+ cancelFired = true;
+ }));
+
+ await flushMicrotasks();
+ expect(isCanceled, isTrue);
+ expect(cancelFired, isFalse);
+
+ cancelCompleter.complete();
+ await flushMicrotasks();
+ expect(cancelFired, isTrue);
+
+ // This shouldn't call the onCancel callback again.
+ expect(subscription.cancel(), completes);
+ });
+
+ test("forwards pausing and resuming", () async {
+ var controller = new StreamController();
+ var subscription = controller.stream
+ .transform(subscriptionTransformer())
+ .listen(expectAsync1((_) {}, count: 0));
+
+ subscription.pause();
+ await flushMicrotasks();
+ expect(controller.isPaused, isTrue);
+
+ subscription.pause();
+ await flushMicrotasks();
+ expect(controller.isPaused, isTrue);
+
+ subscription.resume();
+ await flushMicrotasks();
+ expect(controller.isPaused, isTrue);
+
+ subscription.resume();
+ await flushMicrotasks();
+ expect(controller.isPaused, isFalse);
+ });
+
+ test("forwards pausing with a resume future", () async {
+ var controller = new StreamController();
+ var subscription = controller.stream
+ .transform(subscriptionTransformer())
+ .listen(expectAsync1((_) {}, count: 0));
+
+ var completer = new Completer();
+ subscription.pause(completer.future);
+ await flushMicrotasks();
+ expect(controller.isPaused, isTrue);
+
+ completer.complete();
+ await flushMicrotasks();
+ expect(controller.isPaused, isFalse);
+ });
+ });
+
+ group("with a cancel callback", () {
+ test("invokes the callback when the subscription is canceled", () async {
+ var isCanceled = false;
+ var callbackInvoked = false;
+ var controller = new StreamController(onCancel: expectAsync0(() {
+ isCanceled = true;
+ }));
+ var subscription = controller.stream
+ .transform(subscriptionTransformer(
+ handleCancel: expectAsync1((inner) {
+ callbackInvoked = true;
+ inner.cancel();
+ })))
+ .listen(expectAsync1((_) {}, count: 0));
+
+ await flushMicrotasks();
+ expect(callbackInvoked, isFalse);
+ expect(isCanceled, isFalse);
+
+ subscription.cancel();
+ await flushMicrotasks();
+ expect(callbackInvoked, isTrue);
+ expect(isCanceled, isTrue);
+ });
+
+ test("invokes the callback once and caches its result", () async {
+ var completer = new Completer();
+ var controller = new StreamController();
+ var subscription = controller.stream
+ .transform(subscriptionTransformer(
+ handleCancel: expectAsync1((inner) => completer.future)))
+ .listen(expectAsync1((_) {}, count: 0));
+
+ var cancelFired1 = false;
+ subscription.cancel().then(expectAsync1((_) {
+ cancelFired1 = true;
+ }));
+
+ var cancelFired2 = false;
+ subscription.cancel().then(expectAsync1((_) {
+ cancelFired2 = true;
+ }));
+
+ await flushMicrotasks();
+ expect(cancelFired1, isFalse);
+ expect(cancelFired2, isFalse);
+
+ completer.complete();
+ await flushMicrotasks();
+ expect(cancelFired1, isTrue);
+ expect(cancelFired2, isTrue);
+ });
+ });
+
+ group("with a pause callback", () {
+ test("invokes the callback when pause is called", () async {
+ var pauseCount = 0;
+ var controller = new StreamController();
+ var subscription = controller.stream
+ .transform(subscriptionTransformer(handlePause: expectAsync1((inner) {
+ pauseCount++;
+ inner.pause();
+ }, count: 3)))
+ .listen(expectAsync1((_) {}, count: 0));
+
+ await flushMicrotasks();
+ expect(pauseCount, equals(0));
+
+ subscription.pause();
+ await flushMicrotasks();
+ expect(pauseCount, equals(1));
+
+ subscription.pause();
+ await flushMicrotasks();
+ expect(pauseCount, equals(2));
+
+ subscription.resume();
+ subscription.resume();
+ await flushMicrotasks();
+ expect(pauseCount, equals(2));
+
+ subscription.pause();
+ await flushMicrotasks();
+ expect(pauseCount, equals(3));
+ });
+
+ test("doesn't invoke the callback when the subscription has been canceled",
+ () async {
+ var controller = new StreamController();
+ var subscription = controller.stream
+ .transform(subscriptionTransformer(
+ handlePause: expectAsync1((_) {}, count: 0)))
+ .listen(expectAsync1((_) {}, count: 0));
+
+ subscription.cancel();
+ subscription.pause();
+ subscription.pause();
+ subscription.pause();
+ });
+ });
+
+ group("with a resume callback", () {
+ test("invokes the callback when resume is called", () async {
+ var resumeCount = 0;
+ var controller = new StreamController();
+ var subscription = controller.stream
+ .transform(subscriptionTransformer(
+ handleResume: expectAsync1((inner) {
+ resumeCount++;
+ inner.resume();
+ }, count: 3)))
+ .listen(expectAsync1((_) {}, count: 0));
+
+ await flushMicrotasks();
+ expect(resumeCount, equals(0));
+
+ subscription.resume();
+ await flushMicrotasks();
+ expect(resumeCount, equals(1));
+
+ subscription.pause();
+ subscription.pause();
+ await flushMicrotasks();
+ expect(resumeCount, equals(1));
+
+ subscription.resume();
+ await flushMicrotasks();
+ expect(resumeCount, equals(2));
+
+ subscription.resume();
+ await flushMicrotasks();
+ expect(resumeCount, equals(3));
+ });
+
+ test("invokes the callback when a resume future completes", () async {
+ var resumed = false;
+ var controller = new StreamController();
+ var subscription = controller.stream
+ .transform(subscriptionTransformer(
+ handleResume: expectAsync1((inner) {
+ resumed = true;
+ inner.resume();
+ })))
+ .listen(expectAsync1((_) {}, count: 0));
+
+ var completer = new Completer();
+ subscription.pause(completer.future);
+ await flushMicrotasks();
+ expect(resumed, isFalse);
+
+ completer.complete();
+ await flushMicrotasks();
+ expect(resumed, isTrue);
+ });
+
+ test("doesn't invoke the callback when the subscription has been canceled",
+ () async {
+ var controller = new StreamController();
+ var subscription = controller.stream
+ .transform(subscriptionTransformer(
+ handlePause: expectAsync1((_) {}, count: 0)))
+ .listen(expectAsync1((_) {}, count: 0));
+
+ subscription.cancel();
+ subscription.resume();
+ subscription.resume();
+ subscription.resume();
+ });
+ });
+
+ group("when the outer subscription is canceled but the inner is not", () {
+ StreamSubscription subscription;
+ setUp(() {
+ var controller = new StreamController();
+ subscription = controller.stream
+ .transform(subscriptionTransformer(handleCancel: (_) {}))
+ .listen(
+ expectAsync1((_) {}, count: 0),
+ onError: expectAsync2((_, __) {}, count: 0),
+ onDone: expectAsync0(() {}, count: 0));
+ subscription.cancel();
+ controller.add(1);
+ controller.addError("oh no!");
+ controller.close();
+ });
+
+ test("doesn't call a new onData", () async {
+ subscription.onData(expectAsync1((_) {}, count: 0));
+ await flushMicrotasks();
+ });
+
+ test("doesn't call a new onError", () async {
+ subscription.onError(expectAsync2((_, __) {}, count: 0));
+ await flushMicrotasks();
+ });
+
+ test("doesn't call a new onDone", () async {
+ subscription.onDone(expectAsync0(() {}, count: 0));
+ await flushMicrotasks();
+ });
+
+ test("isPaused returns false", () {
+ expect(subscription.isPaused, isFalse);
+ });
+
+ test("asFuture never completes", () async {
+ subscription.asFuture().then(expectAsync1((_) {}, count: 0));
+ await flushMicrotasks();
+ });
+ });
+}