Fix most strong mode warnings.

The remaining warnings will need type-asserting wrappers to fix, which
are coming in a separate CL.

R=lrn@google.com

Review URL: https://codereview.chromium.org//1841223002 .
diff --git a/.analysis_options b/.analysis_options
new file mode 100644
index 0000000..a10d4c5
--- /dev/null
+++ b/.analysis_options
@@ -0,0 +1,2 @@
+analyzer:
+  strong-mode: true
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 946d3a8..803b589 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,21 @@
+## 1.9.1
+
+* Fix all strong mode warnings and add generic method annotations.
+
+* `new StreamQueue()` now takes a `Stream<T>` rather than a `Stream<dynamic>`.
+  Passing a type that wasn't `is`-compatible with `Stream<T>` would already
+  throw an error under some circumstances, so this is not considered a breaking
+  change.
+
+* `new SubscriptionStream()` now takes a `Stream<T>` rather than a
+  `Stream<dynamic>`. Passing a type that wasn't `is`-compatible with `Stream<T>`
+  would already throw an error under some circumstances, so this is not
+  considered a breaking change.
+
+* `ErrorResult` now takes a type parameter.
+
+* `Result.asError` now returns a `Result<T>`.
+
 ## 1.9.0
 
 * Deprecate top-level libraries other than `package:async/async.dart`, which
diff --git a/lib/src/async_memoizer.dart b/lib/src/async_memoizer.dart
index 3612a7d..0213f68 100644
--- a/lib/src/async_memoizer.dart
+++ b/lib/src/async_memoizer.dart
@@ -31,7 +31,7 @@
   ///
   /// This can be accessed at any time, and will fire once [runOnce] is called.
   Future<T> get future => _completer.future;
-  final _completer = new Completer();
+  final _completer = new Completer<T>();
 
   /// Whether [runOnce] has been called yet.
   bool get hasRun => _completer.isCompleted;
diff --git a/lib/src/cancelable_operation.dart b/lib/src/cancelable_operation.dart
index 1ba6b3c..d0a1871 100644
--- a/lib/src/cancelable_operation.dart
+++ b/lib/src/cancelable_operation.dart
@@ -62,9 +62,9 @@
   /// If this operation is cancelled, the returned future waits for the future
   /// returned by [cancel], then completes to [cancellationValue].
   Future valueOrCancellation([T cancellationValue]) {
-    var completer = new Completer.sync();
-
-    value.then(completer.complete, onError: completer.completeError);
+    var completer = new Completer<T>.sync();
+    value.then((result) => completer.complete(result),
+        onError: completer.completeError);
 
     _completer._cancelMemo.future.then((_) {
       completer.complete(cancellationValue);
diff --git a/lib/src/delegate/future.dart b/lib/src/delegate/future.dart
index 5e84e4f..bfc075b 100644
--- a/lib/src/delegate/future.dart
+++ b/lib/src/delegate/future.dart
@@ -7,20 +7,20 @@
 /// A wrapper that forwards calls to a [Future].
 class DelegatingFuture<T> implements Future<T> {
   /// The wrapped [Future].
-  final Future _future;
+  final Future<T> _future;
 
   DelegatingFuture(this._future);
 
   Stream<T> asStream() => _future.asStream();
 
-  Future catchError(Function onError, {bool test(error)}) =>
+  Future catchError(Function onError, {bool test(Object error)}) =>
     _future.catchError(onError, test: test);
 
-  Future then(onValue(T value), {Function onError}) =>
+  Future/*<S>*/ then/*<S>*/(/*=S*/ onValue(T value), {Function onError}) =>
     _future.then(onValue, onError: onError);
 
   Future<T> whenComplete(action()) => _future.whenComplete(action);
 
-  Future timeout(Duration timeLimit, {void onTimeout()}) =>
+  Future<T> timeout(Duration timeLimit, {void onTimeout()}) =>
     _future.timeout(timeLimit, onTimeout: onTimeout);
 }
diff --git a/lib/src/result.dart b/lib/src/result.dart
index 482cf9b..393dced 100644
--- a/lib/src/result.dart
+++ b/lib/src/result.dart
@@ -72,20 +72,14 @@
   factory Result.error(Object error, [StackTrace stackTrace]) =>
       new ErrorResult(error, stackTrace);
 
-  // Helper functions.
-  static _captureValue(value) => new ValueResult(value);
-  static _captureError(error, stack) => new ErrorResult(error, stack);
-  static _release(Result v) {
-    if (v.isValue) return v.asValue.value;  // Avoid wrapping in future.
-    return v.asFuture;
-  }
-
   /// Capture the result of a future into a `Result` future.
   ///
   /// The resulting future will never have an error.
   /// Errors have been converted to an [ErrorResult] value.
-  static Future<Result> capture(Future future) {
-    return future.then(_captureValue, onError: _captureError);
+  static Future<Result/*<T>*/> capture/*<T>*/(Future/*<T>*/ future) {
+    return future.then((value) => new ValueResult(value),
+        onError: (error, stackTrace) =>
+            new ErrorResult/*<T>*/(error, stackTrace));
   }
 
   /// Release the result of a captured future.
@@ -95,30 +89,23 @@
   ///
   /// If [future] completes with an error, the returned future completes with
   /// the same error.
-  static Future release(Future<Result> future) {
-    return future.then(_release);
-  }
+  static Future/*<T>*/ release/*<T>*/(Future<Result/*<T>*/> future) =>
+      future.then/*<Future<T>>*/((result) => result.asFuture);
 
   /// Capture the results of a stream into a stream of [Result] values.
   ///
   /// The returned stream will not have any error events.
   /// Errors from the source stream have been converted to [ErrorResult]s.
-  ///
-  /// Shorthand for transforming the stream using [captureStreamTransformer].
-  static Stream<Result> captureStream(Stream source) {
-    return source.transform(captureStreamTransformer);
-  }
+  static Stream<Result/*<T>*/> captureStream/*<T>*/(Stream/*<T>*/ source) =>
+      source.transform(new CaptureStreamTransformer/*<T>*/());
 
   /// Release a stream of [result] values into a stream of the results.
   ///
   /// `Result` values of the source stream become value or error events in
   /// the returned stream as appropriate.
   /// Errors from the source stream become errors in the returned stream.
-  ///
-  /// Shorthand for transforming the stream using [releaseStreamTransformer].
-  static Stream releaseStream(Stream<Result> source) {
-    return source.transform(releaseStreamTransformer);
-  }
+  static Stream/*<T>*/ releaseStream/*<T>*/(Stream<Result/*<T>*/> source) =>
+      source.transform(new ReleaseStreamTransformer/*<T>*/());
 
   /// Converts a result of a result to a single result.
   ///
@@ -126,9 +113,10 @@
   /// which is then an error, then a result with that error is returned.
   /// Otherwise both levels of results are value results, and a single
   /// result with the value is returned.
-  static Result flatten(Result<Result> result) {
-    if (result.isError) return result;
-    return result.asValue.value;
+  static Result/*<T>*/ flatten/*<T>*/(Result<Result/*<T>*/> result) {
+    if (result.isValue) return result.asValue.value;
+    return new ErrorResult/*<T>*/(
+        result.asError.error, result.asError.stackTrace);
   }
 
   /// Whether this result is a value result.
@@ -149,7 +137,7 @@
   /// If this is an error result, return itself.
   ///
   /// Otherwise return `null`.
-  ErrorResult get asError;
+  ErrorResult<T> get asError;
 
   /// Complete a completer with this result.
   void complete(Completer<T> completer);
diff --git a/lib/src/result/capture_transformer.dart b/lib/src/result/capture_transformer.dart
index ca4d0d3..6b1bbf2 100644
--- a/lib/src/result/capture_transformer.dart
+++ b/lib/src/result/capture_transformer.dart
@@ -7,7 +7,10 @@
 import '../result.dart';
 import 'capture_sink.dart';
 
-/// Use [Result.captureTransformer] instead.
+/// A stream transformer that captures a stream of events into [Result]s.
+///
+/// The result of the transformation is a stream of [Result] values and no
+/// error events. This is the transformer used by [captureStream].
 @Deprecated("Will be removed in async 2.0.0.")
 class CaptureStreamTransformer<T> implements StreamTransformer<T, Result<T>> {
   const CaptureStreamTransformer();
diff --git a/lib/src/result/error.dart b/lib/src/result/error.dart
index eecf68e..b6d5859 100644
--- a/lib/src/result/error.dart
+++ b/lib/src/result/error.dart
@@ -8,14 +8,14 @@
 import 'value.dart';
 
 /// A result representing a thrown error.
-class ErrorResult implements Result {
+class ErrorResult<T> implements Result<T> {
   final error;
   final StackTrace stackTrace;
 
   bool get isValue => false;
   bool get isError => true;
-  ValueResult get asValue => null;
-  ErrorResult get asError => this;
+  ValueResult<T> get asValue => null;
+  ErrorResult<T> get asError => this;
 
   ErrorResult(this.error, this.stackTrace);
 
@@ -27,7 +27,7 @@
     sink.addError(error, stackTrace);
   }
 
-  Future get asFuture => new Future.error(error, stackTrace);
+  Future<T> get asFuture => new Future.error(error, stackTrace);
 
   /// Calls an error handler with the error and stacktrace.
   ///
diff --git a/lib/src/result/future.dart b/lib/src/result/future.dart
index db9dd82..209e8b1 100644
--- a/lib/src/result/future.dart
+++ b/lib/src/result/future.dart
@@ -20,11 +20,12 @@
   Result<T> _result;
 
   factory ResultFuture(Future<T> future) {
-    var resultFuture;
-    resultFuture = new ResultFuture._(Result.capture(future).then((result) {
+    ResultFuture<T> resultFuture;
+    resultFuture = new ResultFuture._(() async {
+      var result = await Result.capture(future);
       resultFuture._result = result;
-      return result.asFuture;
-    }));
+      return await result.asFuture;
+    }());
     return resultFuture;
   }
 
diff --git a/lib/src/result/value.dart b/lib/src/result/value.dart
index f065ab6..39fa048 100644
--- a/lib/src/result/value.dart
+++ b/lib/src/result/value.dart
@@ -14,7 +14,7 @@
   bool get isValue => true;
   bool get isError => false;
   ValueResult<T> get asValue => this;
-  ErrorResult get asError => null;
+  ErrorResult<T> get asError => null;
 
   ValueResult(this.value);
 
diff --git a/lib/src/single_subscription_transformer.dart b/lib/src/single_subscription_transformer.dart
index 4255175..e01efac 100644
--- a/lib/src/single_subscription_transformer.dart
+++ b/lib/src/single_subscription_transformer.dart
@@ -9,15 +9,26 @@
 ///
 /// This buffers the broadcast stream's events, which means that it starts
 /// listening to a stream as soon as it's bound.
+///
+/// This also casts the source stream's events to type `T`. If the cast fails,
+/// the result stream will emit a [CastError]. This behavior is deprecated, and
+/// should not be relied upon.
 class SingleSubscriptionTransformer<S, T> implements StreamTransformer<S, T> {
   const SingleSubscriptionTransformer();
 
   Stream<T> bind(Stream<S> stream) {
     var subscription;
-    var controller = new StreamController(sync: true,
+    var controller = new StreamController<T>(sync: true,
         onCancel: () => subscription.cancel());
-    subscription = stream.listen(controller.add,
-        onError: controller.addError, onDone: controller.close);
+    subscription = stream.listen((value) {
+      // TODO(nweiz): When we release a new major version, get rid of the second
+      // type parameter and avoid this conversion.
+      try {
+        controller.add(value as T);
+      } on CastError catch (error, stackTrace) {
+        controller.addError(error, stackTrace);
+      }
+    }, onError: controller.addError, onDone: controller.close);
     return controller.stream;
   }
 }
diff --git a/lib/src/stream_completer.dart b/lib/src/stream_completer.dart
index 73557b2..953985e 100644
--- a/lib/src/stream_completer.dart
+++ b/lib/src/stream_completer.dart
@@ -24,7 +24,7 @@
 /// the listen is performed directly on the source stream.
 class StreamCompleter<T> {
   /// The stream doing the actual work, is returned by [stream].
-  final _CompleterStream _stream = new _CompleterStream<T>();
+  final _stream = new _CompleterStream<T>();
 
   /// Convert a `Future<Stream>` to a `Stream`.
   ///
@@ -34,8 +34,8 @@
   ///
   /// If the future completes with an error, the returned stream will
   /// instead contain just that error.
-  static Stream fromFuture(Future<Stream> streamFuture) {
-    var completer = new StreamCompleter();
+  static Stream/*<T>*/ fromFuture/*<T>*/(Future<Stream/*<T>*/> streamFuture) {
+    var completer = new StreamCompleter/*<T>*/();
     streamFuture.then(completer.setSourceStream,
         onError: completer.setError);
     return completer.stream;
@@ -109,13 +109,13 @@
   ///
   /// Created if the user listens on this stream before the source stream
   /// is set, or if using [_setEmpty] so there is no source stream.
-  StreamController _controller;
+  StreamController<T> _controller;
 
   /// Source stream for the events provided by this stream.
   ///
   /// Set when the completer sets the source stream using [_setSourceStream]
   /// or [_setEmpty].
-  Stream _sourceStream;
+  Stream<T> _sourceStream;
 
   StreamSubscription<T> listen(onData(T data),
                                {Function onError,
diff --git a/lib/src/stream_group.dart b/lib/src/stream_group.dart
index 6239fc2..889ccd8 100644
--- a/lib/src/stream_group.dart
+++ b/lib/src/stream_group.dart
@@ -12,8 +12,8 @@
 /// this means that events emitted by broadcast streams will be dropped until
 /// [stream] has a listener.**
 ///
-/// If the `StreamGroup` is constructed using [new StreamGroup], [stream] will be
-/// single-subscription. In this case, if [stream] is paused or canceled, all
+/// If the `StreamGroup` is constructed using [new StreamGroup], [stream] will
+/// be single-subscription. In this case, if [stream] is paused or canceled, all
 /// streams in the group will likewise be paused or canceled, respectively.
 ///
 /// If the `StreamGroup` is constructed using [new StreamGroup.broadcast],
@@ -53,8 +53,8 @@
   ///
   /// This is equivalent to adding [streams] to a group, closing that group, and
   /// returning its stream.
-  static Stream merge(Iterable<Stream> streams) {
-    var group = new StreamGroup();
+  static Stream/*<T>*/ merge/*<T>*/(Iterable<Stream/*<T>*/> streams) {
+    var group = new StreamGroup/*<T>*/();
     streams.forEach(group.add);
     group.close();
     return group.stream;
@@ -192,7 +192,7 @@
   /// Starts actively forwarding events from [stream] to [_controller].
   ///
   /// This will pause the resulting subscription if [this] is paused.
-  StreamSubscription _listenToStream(Stream stream) {
+  StreamSubscription<T> _listenToStream(Stream<T> stream) {
     var subscription = stream.listen(
         _controller.add,
         onError: _controller.addError,
diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart
index 8482e0c..4ce9e13 100644
--- a/lib/src/stream_queue.dart
+++ b/lib/src/stream_queue.dart
@@ -94,7 +94,7 @@
   final Queue<_EventRequest> _requestQueue = new Queue();
 
   /// Create a `StreamQueue` of the events of [source].
-  factory StreamQueue(Stream source) = _StreamQueue<T>;
+  factory StreamQueue(Stream<T> source) = _StreamQueue<T>;
 
   StreamQueue._();
 
@@ -274,7 +274,7 @@
   /// Can only be used by the very last request (the stream queue must
   /// be closed by that request).
   /// Only used by [rest].
-  Stream _extractStream();
+  Stream<T> _extractStream();
 
   /// Requests that the event source pauses events.
   ///
@@ -342,13 +342,13 @@
 /// to when a request needs events.
 class _StreamQueue<T> extends StreamQueue<T> {
   /// Source of events.
-  final Stream _sourceStream;
+  final Stream<T> _sourceStream;
 
   /// Subscription on [_sourceStream] while listening for events.
   ///
   /// Set to subscription when listening, and set to `null` when the
   /// subscription is done (and [_isDone] is set to true).
-  StreamSubscription _subscription;
+  StreamSubscription<T> _subscription;
 
   _StreamQueue(this._sourceStream) : super._();
 
@@ -422,7 +422,7 @@
 ///
 /// The [close] method is also called immediately when the source stream
 /// is done.
-abstract class _EventRequest {
+abstract class _EventRequest<T> {
   /// Handle available events.
   ///
   /// The available events are provided as a queue. The `update` function
@@ -443,22 +443,22 @@
   /// If the function returns `false` when the stream has already closed
   /// ([isDone] is true), then the request must call
   /// [StreamQueue._updateRequests] itself when it's ready to continue.
-  bool update(Queue<Result> events, bool isDone);
+  bool update(Queue<Result<T>> events, bool isDone);
 }
 
 /// Request for a [StreamQueue.next] call.
 ///
 /// Completes the returned future when receiving the first event,
 /// and is then complete.
-class _NextRequest<T> implements _EventRequest {
+class _NextRequest<T> implements _EventRequest<T> {
   /// Completer for the future returned by [StreamQueue.next].
-  final Completer _completer;
+  final _completer = new Completer<T>();
 
-  _NextRequest() : _completer = new Completer<T>();
+  _NextRequest();
 
   Future<T> get future => _completer.future;
 
-  bool update(Queue<Result> events, bool isDone) {
+  bool update(Queue<Result<T>> events, bool isDone) {
     if (events.isNotEmpty) {
       events.removeFirst().complete(_completer);
       return true;
@@ -474,9 +474,9 @@
 }
 
 /// Request for a [StreamQueue.skip] call.
-class _SkipRequest implements _EventRequest {
+class _SkipRequest<T> implements _EventRequest<T> {
   /// Completer for the future returned by the skip call.
-  final Completer _completer = new Completer<int>();
+  final _completer = new Completer<int>();
 
   /// Number of remaining events to skip.
   ///
@@ -489,9 +489,9 @@
   _SkipRequest(this._eventsToSkip);
 
   /// The future completed when the correct number of events have been skipped.
-  Future get future => _completer.future;
+  Future<int> get future => _completer.future;
 
-  bool update(Queue<Result> events, bool isDone) {
+  bool update(Queue<Result<T>> events, bool isDone) {
     while (_eventsToSkip > 0) {
       if (events.isEmpty) {
         if (isDone) break;
@@ -501,7 +501,7 @@
 
       var event = events.removeFirst();
       if (event.isError) {
-        event.complete(_completer);
+        _completer.completeError(event.asError.error, event.asError.stackTrace);
         return true;
       }
     }
@@ -511,12 +511,12 @@
 }
 
 /// Request for a [StreamQueue.take] call.
-class _TakeRequest<T> implements _EventRequest {
+class _TakeRequest<T> implements _EventRequest<T> {
   /// Completer for the future returned by the take call.
-  final Completer _completer;
+  final _completer = new Completer<List<T>>();
 
   /// List collecting events until enough have been seen.
-  final List _list = <T>[];
+  final _list = <T>[];
 
   /// Number of events to capture.
   ///
@@ -524,24 +524,24 @@
   /// this value.
   final int _eventsToTake;
 
-  _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>();
+  _TakeRequest(this._eventsToTake);
 
   /// The future completed when the correct number of events have been captured.
-  Future get future => _completer.future;
+  Future<List<T>> get future => _completer.future;
 
-  bool update(Queue<Result> events, bool isDone) {
+  bool update(Queue<Result<T>> events, bool isDone) {
     while (_list.length < _eventsToTake) {
       if (events.isEmpty) {
         if (isDone) break;
         return false;
       }
 
-      var result = events.removeFirst();
-      if (result.isError) {
-        result.complete(_completer);
+      var event = events.removeFirst();
+      if (event.isError) {
+        _completer.completeError(event.asError.error, event.asError.stackTrace);
         return true;
       }
-      _list.add(result.asValue.value);
+      _list.add(event.asValue.value);
     }
     _completer.complete(_list);
     return true;
@@ -553,9 +553,9 @@
 /// The request needs no events, it just waits in the request queue
 /// until all previous events are fulfilled, then it cancels the stream queue
 /// source subscription.
-class _CancelRequest implements _EventRequest {
+class _CancelRequest<T> implements _EventRequest<T> {
   /// Completer for the future returned by the `cancel` call.
-  final Completer _completer = new Completer();
+  final _completer = new Completer();
 
   /// The [StreamQueue] object that has this request queued.
   ///
@@ -568,7 +568,7 @@
   /// The future completed when the cancel request is completed.
   Future get future => _completer.future;
 
-  bool update(Queue<Result> events, bool isDone) {
+  bool update(Queue<Result<T>> events, bool isDone) {
     if (_streamQueue._isDone) {
       _completer.complete();
     } else {
@@ -584,22 +584,22 @@
 /// The request is always complete, it just waits in the request queue
 /// until all previous events are fulfilled, then it takes over the
 /// stream events subscription and creates a stream from it.
-class _RestRequest<T> implements _EventRequest {
+class _RestRequest<T> implements _EventRequest<T> {
   /// Completer for the stream returned by the `rest` call.
-  final StreamCompleter _completer = new StreamCompleter<T>();
+  final _completer = new StreamCompleter<T>();
 
   /// The [StreamQueue] object that has this request queued.
   ///
   /// When the event is completed, it needs to cancel the active subscription
   /// of the `StreamQueue` object, if any.
-  final StreamQueue _streamQueue;
+  final StreamQueue<T> _streamQueue;
 
   _RestRequest(this._streamQueue);
 
   /// The stream which will contain the remaining events of [_streamQueue].
   Stream<T> get stream => _completer.stream;
 
-  bool update(Queue<Result> events, bool isDone) {
+  bool update(Queue<Result<T>> events, bool isDone) {
     if (events.isEmpty) {
       if (_streamQueue._isDone) {
         _completer.setEmpty();
@@ -627,12 +627,12 @@
 /// but doesn't consume the event.
 /// If the request is closed without seeing an event, then
 /// the [future] is completed with `false`.
-class _HasNextRequest<T> implements _EventRequest {
-  final Completer _completer = new Completer<bool>();
+class _HasNextRequest<T> implements _EventRequest<T> {
+  final _completer = new Completer<bool>();
 
   Future<bool> get future => _completer.future;
 
-  bool update(Queue<Result> events, bool isDone) {
+  bool update(Queue<Result<T>> events, bool isDone) {
     if (events.isNotEmpty) {
       _completer.complete(true);
       return true;
diff --git a/lib/src/stream_sink_completer.dart b/lib/src/stream_sink_completer.dart
index b53a8d2..00a7086 100644
--- a/lib/src/stream_sink_completer.dart
+++ b/lib/src/stream_sink_completer.dart
@@ -36,8 +36,9 @@
   ///
   /// If the future completes with an error, the returned sink will instead
   /// be closed. Its [Sink.done] future will contain the error.
-  static StreamSink fromFuture(Future<StreamSink> sinkFuture) {
-    var completer = new StreamSinkCompleter();
+  static StreamSink/*<T>*/ fromFuture/*<T>*/(
+      Future<StreamSink/*<T>*/> sinkFuture) {
+    var completer = new StreamSinkCompleter/*<T>*/();
     sinkFuture.then(completer.setDestinationSink,
         onError: completer.setError);
     return completer.sink;
diff --git a/lib/src/stream_sink_transformer/handler_transformer.dart b/lib/src/stream_sink_transformer/handler_transformer.dart
index a20a3fa..8cc3d01 100644
--- a/lib/src/stream_sink_transformer/handler_transformer.dart
+++ b/lib/src/stream_sink_transformer/handler_transformer.dart
@@ -54,11 +54,7 @@
 
   void add(S event) {
     if (_transformer._handleData == null) {
-      // [event] is an S and [_inner.add] takes a T. This style of conversion
-      // will throw an error in checked mode if [_inner] is actually a
-      // [StreamSink<T>], but will work if [_inner] isn't reified and won't add
-      // an extra check in unchecked mode.
-      _inner.add(event as dynamic);
+      _inner.add(event as T);
     } else {
       _transformer._handleData(event, _safeCloseInner);
     }
diff --git a/lib/src/stream_splitter.dart b/lib/src/stream_splitter.dart
index ec648aa..448fe2a 100644
--- a/lib/src/stream_splitter.dart
+++ b/lib/src/stream_splitter.dart
@@ -57,10 +57,11 @@
   ///
   /// [count] defaults to 2. This is the same as creating [count] branches and
   /// then closing the [StreamSplitter].
-  static List<Stream> splitFrom(Stream stream, [int count]) {
+  static List<Stream/*<T>*/> splitFrom/*<T>*/(Stream/*<T>*/ stream,
+      [int count]) {
     if (count == null) count = 2;
-    var splitter = new StreamSplitter(stream);
-    var streams = new List.generate(count, (_) => splitter.split());
+    var splitter = new StreamSplitter/*<T>*/(stream);
+    var streams = new List<Stream>.generate(count, (_) => splitter.split());
     splitter.close();
     return streams;
   }
@@ -75,12 +76,11 @@
       throw new StateError("Can't call split() on a closed StreamSplitter.");
     }
 
-    var controller;
-    controller = new StreamController<T>(
+    var controller = new StreamController<T>(
         onListen: _onListen,
         onPause: _onPause,
-        onResume: _onResume,
-        onCancel: () => _onCancel(controller));
+        onResume: _onResume);
+    controller.onCancel = () => _onCancel(controller);
 
     for (var result in _buffer) {
       result.addTo(controller);
diff --git a/lib/src/stream_zip.dart b/lib/src/stream_zip.dart
index 432cc22..f9e2082 100644
--- a/lib/src/stream_zip.dart
+++ b/lib/src/stream_zip.dart
@@ -17,22 +17,22 @@
 
   StreamZip(Iterable<Stream<T>> streams) : _streams = streams;
 
-  StreamSubscription<List<T>> listen(void onData(List data), {
+  StreamSubscription<List<T>> listen(void onData(List<T> data), {
                                   Function onError,
                                   void onDone(),
                                   bool cancelOnError}) {
     cancelOnError = identical(true, cancelOnError);
-    List<StreamSubscription> subscriptions = <StreamSubscription>[];
-    StreamController controller;
-    List current;
+    var subscriptions = <StreamSubscription<T>>[];
+    StreamController<List<T>> controller;
+    List<T> current;
     int dataCount = 0;
 
     /// Called for each data from a subscription in [subscriptions].
-    void handleData(int index, data) {
+    void handleData(int index, T data) {
       current[index] = data;
       dataCount++;
       if (dataCount == subscriptions.length) {
-        List data = current;
+        var data = current;
         current = new List(subscriptions.length);
         dataCount = 0;
         for (int i = 0; i < subscriptions.length; i++) {
@@ -70,7 +70,7 @@
     }
 
     try {
-      for (Stream stream in _streams) {
+      for (var stream in _streams) {
         int index = subscriptions.length;
         subscriptions.add(stream.listen(
             (data) { handleData(index, data); },
@@ -87,7 +87,7 @@
 
     current = new List(subscriptions.length);
 
-    controller = new StreamController<List>(
+    controller = new StreamController<List<T>>(
       onPause: () {
         for (int i = 0; i < subscriptions.length; i++) {
           // This may pause some subscriptions more than once.
diff --git a/lib/src/subscription_stream.dart b/lib/src/subscription_stream.dart
index 4f58c76..c448620 100644
--- a/lib/src/subscription_stream.dart
+++ b/lib/src/subscription_stream.dart
@@ -18,7 +18,7 @@
 /// If other code is accessing the subscription, results may be unpredictable.
 class SubscriptionStream<T> extends Stream<T> {
   /// The subscription providing the events for this stream.
-  StreamSubscription _source;
+  StreamSubscription<T> _source;
 
   /// Create a single-subscription `Stream` from [subscription].
   ///
@@ -29,7 +29,7 @@
   /// If the `subscription` doesn't send any `done` events, neither will this
   /// stream. That may be an issue if `subscription` was made to cancel on
   /// an error.
-  SubscriptionStream(StreamSubscription subscription)
+  SubscriptionStream(StreamSubscription<T> subscription)
        : _source = subscription {
     _source.pause();
     // Clear callbacks to avoid keeping them alive unnecessarily.
@@ -48,13 +48,10 @@
     cancelOnError = (true == cancelOnError);
     var subscription = _source;
     _source = null;
-    var result;
-    if (cancelOnError) {
-      result = new _CancelOnErrorSubscriptionWrapper<T>(subscription);
-    } else {
-      // Wrap the subscription to ensure correct type parameter.
-      result = new DelegatingStreamSubscription<T>(subscription);
-    }
+
+    var result = cancelOnError
+        ? new _CancelOnErrorSubscriptionWrapper<T>(subscription)
+        : subscription;
     result.onData(onData);
     result.onError(onError);
     result.onDone(onDone);
diff --git a/pubspec.yaml b/pubspec.yaml
index 2430d75..9e64244 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
 name: async
-version: 1.9.0
+version: 1.9.1-dev
 author: Dart Team <misc@dartlang.org>
 description: Utility functions and classes related to the 'dart:async' library.
 homepage: https://www.github.com/dart-lang/async
@@ -8,4 +8,4 @@
   stack_trace: "^1.0.0"
   test: "^0.12.0"
 environment:
-  sdk: ">=1.9.0 <2.0.0"
+  sdk: ">=1.12.0 <2.0.0"