Add a subscriptionTransformer() function. (#10)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 615e33e..43fcf54 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -9,6 +9,9 @@
 * Add `StreamQueue.cancelable()`, which allows users to easily make a
   `CancelableOperation` that can be canceled without affecting the queue.
 
+* Add a `subscriptionTransformer()` function to create `StreamTransformer`s that
+  modify the behavior of subscriptions to a stream.
+
 ## 1.11.3
 
 * Fix strong-mode warning against the signature of Future.then
diff --git a/lib/async.dart b/lib/async.dart
index 2526ae5..c11f0b5 100644
--- a/lib/async.dart
+++ b/lib/async.dart
@@ -29,6 +29,7 @@
 export "src/stream_sink_completer.dart";
 export "src/stream_sink_transformer.dart";
 export "src/stream_splitter.dart";
+export "src/stream_subscription_transformer.dart";
 export "src/stream_zip.dart";
 export "src/subscription_stream.dart";
 export "src/typed_stream_transformer.dart";
diff --git a/lib/src/stream_subscription_transformer.dart b/lib/src/stream_subscription_transformer.dart
new file mode 100644
index 0000000..060f466
--- /dev/null
+++ b/lib/src/stream_subscription_transformer.dart
@@ -0,0 +1,105 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'async_memoizer.dart';
+import 'delegate/stream_subscription.dart';
+
+typedef Future _AsyncHandler<T>(StreamSubscription<T> inner);
+
+typedef void _VoidHandler<T>(StreamSubscription<T> inner);
+
+/// Creates a [StreamTransformer] that modifies the behavior of subscriptions to
+/// a stream.
+///
+/// When [StreamSubscription.cancel], [StreamSubscription.pause], or
+/// [StreamSubscription.resume] is called, the corresponding handler is invoked.
+/// By default, handlers just forward to the underlying subscription.
+///
+/// Guarantees that none of the [StreamSubscription] callbacks and none of the
+/// callbacks passed to `subscriptionTransformer()` will be invoked once the
+/// transformed [StreamSubscription] has been canceled and `handleCancel()` has
+/// run. The [handlePause] and [handleResume] are invoked regardless of whether
+/// the subscription is paused already or not.
+///
+/// In order to preserve [StreamSubscription] guarantees, **all callbacks must
+/// synchronously call the corresponding method** on the inner
+/// [StreamSubscription]: [handleCancel] must call `cancel()`, [handlePause]
+/// must call `pause()`, and [handleResume] must call `resume()`.
+StreamTransformer/*<T, T>*/ subscriptionTransformer/*<T>*/(
+    {Future handleCancel(StreamSubscription/*<T>*/ inner),
+    void handlePause(StreamSubscription/*<T>*/ inner),
+    void handleResume(StreamSubscription/*<T>*/ inner)}) {
+  return new StreamTransformer((stream, cancelOnError) {
+    return new _TransformedSubscription(
+        stream.listen(null, cancelOnError: cancelOnError),
+        handleCancel ?? (inner) => inner.cancel(),
+        handlePause ?? (inner) {
+          inner.pause();
+        },
+        handleResume ?? (inner) {
+          inner.resume();
+        });
+  });
+}
+
+/// A [StreamSubscription] wrapper that calls callbacks for subscription
+/// methods.
+class _TransformedSubscription<T> implements StreamSubscription<T> {
+  /// The wrapped subscription.
+  StreamSubscription<T> _inner;
+
+  /// The callback to run when [cancel] is called.
+  final _AsyncHandler<T> _handleCancel;
+
+  /// The callback to run when [pause] is called.
+  final _VoidHandler<T> _handlePause;
+
+  /// The callback to run when [resume] is called.
+  final _VoidHandler<T> _handleResume;
+
+  bool get isPaused => _inner?.isPaused ?? false;
+
+  _TransformedSubscription(this._inner, this._handleCancel, this._handlePause,
+      this._handleResume);
+
+  void onData(void handleData(T data)) {
+    _inner?.onData(handleData);
+  }
+
+  void onError(Function handleError) {
+    _inner?.onError(handleError);
+  }
+
+  void onDone(void handleDone()) {
+    _inner?.onDone(handleDone);
+  }
+
+  Future cancel() => _cancelMemoizer.runOnce(() {
+    var inner = _inner;
+    _inner.onData(null);
+    _inner.onDone(null);
+
+    // Setting onError to null will cause errors to be top-leveled.
+    _inner.onError((_, __) {});
+    _inner = null;
+    return _handleCancel(inner);
+  });
+  final _cancelMemoizer = new AsyncMemoizer();
+
+  void pause([Future resumeFuture]) {
+    if (_cancelMemoizer.hasRun) return;
+    if (resumeFuture != null) resumeFuture.whenComplete(resume);
+    _handlePause(_inner);
+  }
+
+  void resume() {
+    if (_cancelMemoizer.hasRun) return;
+    _handleResume(_inner);
+  }
+
+  Future/*<E>*/ asFuture/*<E>*/([/*=E*/ futureValue]) =>
+      _inner?.asFuture(futureValue) ?? new Completer().future;
+}
diff --git a/test/subscription_transformer_test.dart b/test/subscription_transformer_test.dart
new file mode 100644
index 0000000..cdac4f8
--- /dev/null
+++ b/test/subscription_transformer_test.dart
@@ -0,0 +1,292 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:async/async.dart';
+import 'package:test/test.dart';
+
+import 'utils.dart';
+
+void main() {
+  group("with no callbacks", () {
+    test("forwards cancellation", () async {
+      var isCanceled = false;
+      var cancelCompleter = new Completer();
+      var controller = new StreamController(onCancel: expectAsync0(() {
+        isCanceled = true;
+        return cancelCompleter.future;
+      }));
+      var subscription = controller.stream
+          .transform(subscriptionTransformer())
+          .listen(expectAsync1((_) {}, count: 0));
+
+      var cancelFired = false;
+      subscription.cancel().then(expectAsync1((_) {
+        cancelFired = true;
+      }));
+
+      await flushMicrotasks();
+      expect(isCanceled, isTrue);
+      expect(cancelFired, isFalse);
+
+      cancelCompleter.complete();
+      await flushMicrotasks();
+      expect(cancelFired, isTrue);
+
+      // This shouldn't call the onCancel callback again.
+      expect(subscription.cancel(), completes);
+    });
+
+    test("forwards pausing and resuming", () async {
+      var controller = new StreamController();
+      var subscription = controller.stream
+          .transform(subscriptionTransformer())
+          .listen(expectAsync1((_) {}, count: 0));
+
+      subscription.pause();
+      await flushMicrotasks();
+      expect(controller.isPaused, isTrue);
+
+      subscription.pause();
+      await flushMicrotasks();
+      expect(controller.isPaused, isTrue);
+
+      subscription.resume();
+      await flushMicrotasks();
+      expect(controller.isPaused, isTrue);
+
+      subscription.resume();
+      await flushMicrotasks();
+      expect(controller.isPaused, isFalse);
+    });
+
+    test("forwards pausing with a resume future", () async {
+      var controller = new StreamController();
+      var subscription = controller.stream
+          .transform(subscriptionTransformer())
+          .listen(expectAsync1((_) {}, count: 0));
+
+      var completer = new Completer();
+      subscription.pause(completer.future);
+      await flushMicrotasks();
+      expect(controller.isPaused, isTrue);
+
+      completer.complete();
+      await flushMicrotasks();
+      expect(controller.isPaused, isFalse);
+    });
+  });
+
+  group("with a cancel callback", () {
+    test("invokes the callback when the subscription is canceled", () async {
+      var isCanceled = false;
+      var callbackInvoked = false;
+      var controller = new StreamController(onCancel: expectAsync0(() {
+        isCanceled = true;
+      }));
+      var subscription = controller.stream
+          .transform(subscriptionTransformer(
+              handleCancel: expectAsync1((inner) {
+                callbackInvoked = true;
+                inner.cancel();
+              })))
+          .listen(expectAsync1((_) {}, count: 0));
+
+      await flushMicrotasks();
+      expect(callbackInvoked, isFalse);
+      expect(isCanceled, isFalse);
+
+      subscription.cancel();
+      await flushMicrotasks();
+      expect(callbackInvoked, isTrue);
+      expect(isCanceled, isTrue);
+    });
+    
+    test("invokes the callback once and caches its result", () async {
+      var completer = new Completer();
+      var controller = new StreamController();
+      var subscription = controller.stream
+          .transform(subscriptionTransformer(
+              handleCancel: expectAsync1((inner) => completer.future)))
+          .listen(expectAsync1((_) {}, count: 0));
+
+      var cancelFired1 = false;
+      subscription.cancel().then(expectAsync1((_) {
+        cancelFired1 = true;
+      }));
+
+      var cancelFired2 = false;
+      subscription.cancel().then(expectAsync1((_) {
+        cancelFired2 = true;
+      }));
+
+      await flushMicrotasks();
+      expect(cancelFired1, isFalse);
+      expect(cancelFired2, isFalse);
+
+      completer.complete();
+      await flushMicrotasks();
+      expect(cancelFired1, isTrue);
+      expect(cancelFired2, isTrue);
+    });
+  });
+
+  group("with a pause callback", () {
+    test("invokes the callback when pause is called", () async {
+      var pauseCount = 0;
+      var controller = new StreamController();
+      var subscription = controller.stream
+          .transform(subscriptionTransformer(handlePause: expectAsync1((inner) {
+            pauseCount++;
+            inner.pause();
+          }, count: 3)))
+          .listen(expectAsync1((_) {}, count: 0));
+
+      await flushMicrotasks();
+      expect(pauseCount, equals(0));
+
+      subscription.pause();
+      await flushMicrotasks();
+      expect(pauseCount, equals(1));
+
+      subscription.pause();
+      await flushMicrotasks();
+      expect(pauseCount, equals(2));
+
+      subscription.resume();
+      subscription.resume();
+      await flushMicrotasks();
+      expect(pauseCount, equals(2));
+
+      subscription.pause();
+      await flushMicrotasks();
+      expect(pauseCount, equals(3));
+    });
+
+    test("doesn't invoke the callback when the subscription has been canceled",
+        () async {
+      var controller = new StreamController();
+      var subscription = controller.stream
+          .transform(subscriptionTransformer(
+              handlePause: expectAsync1((_) {}, count: 0)))
+          .listen(expectAsync1((_) {}, count: 0));
+
+      subscription.cancel();
+      subscription.pause();
+      subscription.pause();
+      subscription.pause();
+    });
+  });
+
+  group("with a resume callback", () {
+    test("invokes the callback when resume is called", () async {
+      var resumeCount = 0;
+      var controller = new StreamController();
+      var subscription = controller.stream
+          .transform(subscriptionTransformer(
+              handleResume: expectAsync1((inner) {
+                resumeCount++;
+                inner.resume();
+              }, count: 3)))
+          .listen(expectAsync1((_) {}, count: 0));
+
+      await flushMicrotasks();
+      expect(resumeCount, equals(0));
+
+      subscription.resume();
+      await flushMicrotasks();
+      expect(resumeCount, equals(1));
+
+      subscription.pause();
+      subscription.pause();
+      await flushMicrotasks();
+      expect(resumeCount, equals(1));
+
+      subscription.resume();
+      await flushMicrotasks();
+      expect(resumeCount, equals(2));
+
+      subscription.resume();
+      await flushMicrotasks();
+      expect(resumeCount, equals(3));
+    });
+
+    test("invokes the callback when a resume future completes", () async {
+      var resumed = false;
+      var controller = new StreamController();
+      var subscription = controller.stream
+          .transform(subscriptionTransformer(
+              handleResume: expectAsync1((inner) {
+                resumed = true;
+                inner.resume();
+              })))
+          .listen(expectAsync1((_) {}, count: 0));
+
+      var completer = new Completer();
+      subscription.pause(completer.future);
+      await flushMicrotasks();
+      expect(resumed, isFalse);
+
+      completer.complete();
+      await flushMicrotasks();
+      expect(resumed, isTrue);
+    });
+
+    test("doesn't invoke the callback when the subscription has been canceled",
+        () async {
+      var controller = new StreamController();
+      var subscription = controller.stream
+          .transform(subscriptionTransformer(
+              handlePause: expectAsync1((_) {}, count: 0)))
+          .listen(expectAsync1((_) {}, count: 0));
+
+      subscription.cancel();
+      subscription.resume();
+      subscription.resume();
+      subscription.resume();
+    });
+  });
+
+  group("when the outer subscription is canceled but the inner is not", () {
+    StreamSubscription subscription;
+    setUp(() {
+      var controller = new StreamController();
+      subscription = controller.stream
+          .transform(subscriptionTransformer(handleCancel: (_) {}))
+          .listen(
+              expectAsync1((_) {}, count: 0),
+              onError: expectAsync2((_, __) {}, count: 0),
+              onDone: expectAsync0(() {}, count: 0));
+      subscription.cancel();
+      controller.add(1);
+      controller.addError("oh no!");
+      controller.close();
+    });
+
+    test("doesn't call a new onData", () async {
+      subscription.onData(expectAsync1((_) {}, count: 0));
+      await flushMicrotasks();
+    });
+
+    test("doesn't call a new onError", () async {
+      subscription.onError(expectAsync2((_, __) {}, count: 0));
+      await flushMicrotasks();
+    });
+
+    test("doesn't call a new onDone", () async {
+      subscription.onDone(expectAsync0(() {}, count: 0));
+      await flushMicrotasks();
+    });
+
+    test("isPaused returns false", () {
+      expect(subscription.isPaused, isFalse);
+    });
+
+    test("asFuture never completes", () async {
+      subscription.asFuture().then(expectAsync1((_) {}, count: 0));
+      await flushMicrotasks();
+    });
+  });
+}