Fix Dart 1.17 strong-mode warnings.

R=kevmoo@google.com

Review URL: https://codereview.chromium.org//2161283002 .
diff --git a/CHANGELOG.md b/CHANGELOG.md
index de22933..1d47eaf 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,7 @@
+## 1.11.1
+
+* Fix new strong-mode warnings introduced in Dart 1.17.0.
+
 ## 1.11.0
 
 * Add a `typedStreamTransformer()` function. This wraps an untyped
diff --git a/lib/src/delegate/future.dart b/lib/src/delegate/future.dart
index 3f7ea72..8b81076 100644
--- a/lib/src/delegate/future.dart
+++ b/lib/src/delegate/future.dart
@@ -24,7 +24,7 @@
 
   Stream<T> asStream() => _future.asStream();
 
-  Future catchError(Function onError, {bool test(Object error)}) =>
+  Future<T> catchError(Function onError, {bool test(Object error)}) =>
     _future.catchError(onError, test: test);
 
   Future/*<S>*/ then/*<S>*/(/*=S*/ onValue(T value), {Function onError}) =>
diff --git a/lib/src/delegate/stream_subscription.dart b/lib/src/delegate/stream_subscription.dart
index b86e91f..5fa7b01 100644
--- a/lib/src/delegate/stream_subscription.dart
+++ b/lib/src/delegate/stream_subscription.dart
@@ -51,7 +51,8 @@
 
   Future cancel() => _source.cancel();
 
-  Future asFuture([futureValue]) => _source.asFuture(futureValue);
+  Future/*<E>*/ asFuture/*<E>*/([/*=E*/ futureValue]) =>
+      _source.asFuture(futureValue);
 
   bool get isPaused => _source.isPaused;
 }
diff --git a/lib/src/typed/future.dart b/lib/src/typed/future.dart
index a269593..f53ec5f 100644
--- a/lib/src/typed/future.dart
+++ b/lib/src/typed/future.dart
@@ -11,8 +11,8 @@
 
   Stream<T> asStream() => _future.then((value) => value as T).asStream();
 
-  Future catchError(Function onError, {bool test(Object error)}) =>
-      _future.catchError(onError, test: test);
+  Future<T> catchError(Function onError, {bool test(Object error)}) async =>
+      new TypeSafeFuture<T>(_future.catchError(onError, test: test));
 
   Future/*<S>*/ then/*<S>*/(/*=S*/ onValue(T value), {Function onError}) =>
       _future.then((value) => onValue(value as T), onError: onError);
diff --git a/lib/src/typed/stream.dart b/lib/src/typed/stream.dart
index 3db9f69..afa4462 100644
--- a/lib/src/typed/stream.dart
+++ b/lib/src/typed/stream.dart
@@ -8,6 +8,7 @@
 
 import '../utils.dart';
 import 'stream_subscription.dart';
+import '../delegate/event_sink.dart';
 
 class TypeSafeStream<T> implements Stream<T> {
   final Stream _stream;
@@ -36,20 +37,19 @@
                   onCancel(new TypeSafeStreamSubscription<T>(subscription))));
   }
 
-  // TODO(nweiz): Give this a generic parameter when sdk#26125 is fixed.
-  Stream asyncExpand(Stream convert(T event)) =>
+  Stream/*<E>*/ asyncExpand/*<E>*/(Stream/*<E>*/ convert(T event)) =>
       _stream.asyncExpand(_validateType(convert));
 
-  // TODO(nweiz): Give this a generic parameter when sdk#26125 is fixed.
-  Stream asyncMap(convert(T event)) => _stream.asyncMap(_validateType(convert));
+  Stream/*<E>*/ asyncMap/*<E>*/(convert(T event)) =>
+      _stream.asyncMap(_validateType(convert));
 
   Stream<T> distinct([bool equals(T previous, T next)]) =>
       new TypeSafeStream<T>(_stream.distinct(equals == null
           ? null
           : (previous, next) => equals(previous as T, next as T)));
 
-  // TODO(nweiz): Give this a generic parameter when sdk#26125 is fixed.
-  Future drain([futureValue]) => _stream.drain(futureValue);
+  Future/*<E>*/ drain/*<E>*/([/*=E*/ futureValue]) =>
+      _stream.drain(futureValue);
 
   Stream/*<S>*/ expand/*<S>*/(Iterable/*<S>*/ convert(T value)) =>
       _stream.expand(_validateType(convert));
@@ -99,8 +99,10 @@
   Stream<T> takeWhile(bool test(T element)) =>
       new TypeSafeStream<T>(_stream.takeWhile(_validateType(test)));
 
-  Stream timeout(Duration timeLimit, {void onTimeout(EventSink sink)}) =>
-      _stream.timeout(timeLimit, onTimeout: onTimeout);
+  Stream<T> timeout(Duration timeLimit, {void onTimeout(EventSink<T> sink)}) =>
+      new TypeSafeStream<T>(_stream.timeout(
+          timeLimit,
+          onTimeout: (sink) => onTimeout(DelegatingEventSink.typed(sink))));
 
   Future<List<T>> toList() async =>
       DelegatingList.typed/*<T>*/(await _stream.toList());
diff --git a/lib/src/typed/stream_subscription.dart b/lib/src/typed/stream_subscription.dart
index 800a4ba..2cd4ddf 100644
--- a/lib/src/typed/stream_subscription.dart
+++ b/lib/src/typed/stream_subscription.dart
@@ -32,5 +32,7 @@
   }
 
   Future cancel() => _subscription.cancel();
-  Future asFuture([futureValue]) => _subscription.asFuture(futureValue);
+
+  Future/*<E>*/ asFuture/*<E>*/([/*=E*/ futureValue]) =>
+      _subscription.asFuture(futureValue);
 }
diff --git a/pubspec.yaml b/pubspec.yaml
index c6843a4..9ea1832 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
 name: async
-version: 1.11.0
+version: 1.11.1
 author: Dart Team <misc@dartlang.org>
 description: Utility functions and classes related to the 'dart:async' library.
 homepage: https://www.github.com/dart-lang/async
diff --git a/test/restartable_timer_test.dart b/test/restartable_timer_test.dart
index 7dbbc2e..f1c53fc 100644
--- a/test/restartable_timer_test.dart
+++ b/test/restartable_timer_test.dart
@@ -10,7 +10,7 @@
   test("runs the callback once the duration has elapsed", () {
     new FakeAsync().run((async) {
       var fired = false;
-      var timer = new RestartableTimer(new Duration(seconds: 5), () {
+      new RestartableTimer(new Duration(seconds: 5), () {
         fired = true;
       });
 
@@ -99,7 +99,7 @@
 
   test("only runs the callback once if the timer isn't reset", () {
     new FakeAsync().run((async) {
-      var timer = new RestartableTimer(
+      new RestartableTimer(
           new Duration(seconds: 5),
           expectAsync(() {}, count: 1));
       async.elapse(new Duration(seconds: 10));
diff --git a/test/result_test.dart b/test/result_test.dart
index 7af3fcf..1c5f335 100644
--- a/test/result_test.dart
+++ b/test/result_test.dart
@@ -57,7 +57,7 @@
 
   test("complete with value", () {
     Result<int> result = new ValueResult<int>(42);
-    Completer c = new Completer<int>();
+    var c = new Completer<int>();
     c.future.then(expectAsync((int v) { expect(v, equals(42)); }),
                   onError: (e, s) { fail("Unexpected error"); });
     result.complete(c);
@@ -65,7 +65,7 @@
 
   test("complete with error", () {
     Result<bool> result = new ErrorResult("BAD", stack);
-    Completer c = new Completer<bool>();
+    var c = new Completer<bool>();
     c.future.then((bool v) { fail("Unexpected value $v"); },
                   onError: expectAsync((e, s) {
                     expect(e, equals("BAD"));
@@ -75,7 +75,7 @@
   });
 
   test("add sink value", () {
-    Result<int> result = new ValueResult<int>(42);
+    var result = new ValueResult<int>(42);
     EventSink<int> sink = new TestSink(
         onData: expectAsync((v) { expect(v, equals(42)); })
     );
@@ -189,7 +189,7 @@
   test("release stream", () {
     StreamController<Result<int>> c = new StreamController<Result<int>>();
     Stream<int> stream = Result.releaseStream(c.stream);
-    List events = [new Result<int>.value(42),
+    var events = [new Result<int>.value(42),
                    new Result<int>.error("BAD", stack),
                    new Result<int>.value(37)];
     // Expect the data events, and an extra error event.
diff --git a/test/stream_completer_test.dart b/test/stream_completer_test.dart
index 76d81d1..6eb5138 100644
--- a/test/stream_completer_test.dart
+++ b/test/stream_completer_test.dart
@@ -130,7 +130,7 @@
 
   test("source stream isn't listened to until completer stream is", () async {
     var completer = new StreamCompleter();
-    var controller;
+    StreamController controller;
     controller = new StreamController(onListen: () {
       scheduleMicrotask(controller.close);
     });
@@ -281,7 +281,7 @@
     var completer = new StreamCompleter();
     var controller = new StreamController();
     var subscription = completer.stream.listen(null);
-    var lastEvent = 0;
+    Object lastEvent = 0;
     subscription.onData((value) => lastEvent = value);
     subscription.onError((value) => lastEvent = "$value");
     subscription.onDone(() => lastEvent = -1);
diff --git a/test/stream_queue_test.dart b/test/stream_queue_test.dart
index 228ba8a..5140890 100644
--- a/test/stream_queue_test.dart
+++ b/test/stream_queue_test.dart
@@ -12,7 +12,7 @@
 main() {
   group("source stream", () {
     test("is listened to on first request, paused between requests", () async {
-      var controller = new StreamController();
+      var controller = new StreamController<int>();
       var events = new StreamQueue<int>(controller.stream);
       await flushMicrotasks();
       expect(controller.hasListener, isFalse);
@@ -288,7 +288,7 @@
 
     test("forwards to underlying stream", () async {
       var cancel = new Completer();
-      var controller = new StreamController(onCancel: () => cancel.future);
+      var controller = new StreamController<int>(onCancel: () => cancel.future);
       var events = new StreamQueue<int>(controller.stream);
       expect(controller.hasListener, isFalse);
       var next = events.next;
@@ -347,14 +347,14 @@
     test("cancels underlying subscription when called before any event",
         () async {
       var cancelFuture = new Future.value(42);
-      var controller = new StreamController(onCancel: () => cancelFuture);
+      var controller = new StreamController<int>(onCancel: () => cancelFuture);
       var events = new StreamQueue<int>(controller.stream);
       expect(await events.cancel(), 42);
     });
 
     test("cancels underlying subscription, returns result", () async {
       var cancelFuture = new Future.value(42);
-      var controller = new StreamController(onCancel: () => cancelFuture);
+      var controller = new StreamController<int>(onCancel: () => cancelFuture);
       var events = new StreamQueue<int>(controller.stream);
       controller.add(1);
       expect(await events.next, 1);
@@ -373,7 +373,7 @@
       });
 
       test("cancels the underlying subscription immediately", () async {
-        var controller = new StreamController();
+        var controller = new StreamController<int>();
         controller.add(1);
 
         var events = new StreamQueue<int>(controller.stream);
@@ -387,7 +387,7 @@
       test("cancels the underlying subscription when called before any event",
           () async {
         var cancelFuture = new Future.value(42);
-        var controller = new StreamController(onCancel: () => cancelFuture);
+        var controller = new StreamController<int>(onCancel: () => cancelFuture);
 
         var events = new StreamQueue<int>(controller.stream);
         expect(await events.cancel(immediate: true), 42);
@@ -404,7 +404,7 @@
 
       test("returns the result of closing the underlying subscription",
           () async {
-        var controller = new StreamController(
+        var controller = new StreamController<int>(
             onCancel: () => new Future.value(42));
         var events = new StreamQueue<int>(controller.stream);
         expect(await events.cancel(immediate: true), 42);
@@ -413,7 +413,7 @@
       test("listens and then cancels a stream that hasn't been listened to yet",
           () async {
         var wasListened = false;
-        var controller = new StreamController(
+        var controller = new StreamController<int>(
             onListen: () => wasListened = true);
         var events = new StreamQueue<int>(controller.stream);
         expect(wasListened, isFalse);
@@ -448,7 +448,7 @@
 
     test("true when enqueued", () async {
       var events = new StreamQueue<int>(createStream());
-      var values = [];
+      var values = <int>[];
       for (int i = 1; i <= 3; i++) {
         events.next.then(values.add);
       }
@@ -459,7 +459,7 @@
 
     test("false when enqueued", () async {
       var events = new StreamQueue<int>(createStream());
-      var values = [];
+      var values = <int>[];
       for (int i = 1; i <= 4; i++) {
         events.next.then(values.add);
       }
@@ -469,7 +469,7 @@
     });
 
     test("true when data event", () async {
-      var controller = new StreamController();
+      var controller = new StreamController<int>();
       var events = new StreamQueue<int>(controller.stream);
 
       var hasNext;
@@ -483,7 +483,7 @@
     });
 
     test("true when error event", () async {
-      var controller = new StreamController();
+      var controller = new StreamController<int>();
       var events = new StreamQueue<int>(controller.stream);
 
       var hasNext;
@@ -525,7 +525,7 @@
 
     test("- next after true, enqueued", () async {
       var events = new StreamQueue<int>(createStream());
-      var responses = [];
+      var responses = <Object>[];
       events.next.then(responses.add);
       events.hasNext.then(responses.add);
       events.next.then(responses.add);
@@ -683,7 +683,7 @@
 }
 
 Stream<int> createErrorStream() {
-  StreamController controller = new StreamController<int>();
+  var controller = new StreamController<int>();
   () async {
     controller.add(1);
     await flushMicrotasks();
diff --git a/test/stream_splitter_test.dart b/test/stream_splitter_test.dart
index ffd0c87..077ba22 100644
--- a/test/stream_splitter_test.dart
+++ b/test/stream_splitter_test.dart
@@ -8,7 +8,7 @@
 import 'package:test/test.dart';
 
 main() {
-  var controller;
+  StreamController<int> controller;
   var splitter;
   setUp(() {
     controller = new StreamController<int>();
diff --git a/test/stream_zip_test.dart b/test/stream_zip_test.dart
index 47f3d8d..a00d8bd 100644
--- a/test/stream_zip_test.dart
+++ b/test/stream_zip_test.dart
@@ -36,7 +36,7 @@
 
 main() {
   // Test that zipping [streams] gives the results iterated by [expectedData].
-  testZip(Iterable streams, Iterable expectedData) {
+  testZip(Iterable<Stream> streams, Iterable expectedData) {
     List data = [];
     Stream zip = new StreamZip(streams);
     zip.listen(data.add, onDone: expectAsync(() {
diff --git a/test/subscription_stream_test.dart b/test/subscription_stream_test.dart
index 0d1870b..699babe 100644
--- a/test/subscription_stream_test.dart
+++ b/test/subscription_stream_test.dart
@@ -22,7 +22,7 @@
     var stream = createStream();
     var skips = 0;
     var completer = new Completer();
-    var subscription;
+    StreamSubscription<int> subscription;
     subscription = stream.listen((value) {
       ++skips;
       expect(value, skips);
@@ -168,7 +168,7 @@
     await flushMicrotasks();
     yield 2;
     await flushMicrotasks();
-    yield* new Future.error("To err is divine!").asStream();
+    yield* new Future<int>.error("To err is divine!").asStream();
     await flushMicrotasks();
     yield 4;
     await flushMicrotasks();
diff --git a/test/typed_wrapper/future_test.dart b/test/typed_wrapper/future_test.dart
index 6601ac1..b3ff0a7 100644
--- a/test/typed_wrapper/future_test.dart
+++ b/test/typed_wrapper/future_test.dart
@@ -4,7 +4,6 @@
 
 import 'dart:async';
 
-import "package:async/async.dart";
 import "package:async/src/typed/future.dart";
 import "package:test/test.dart";
 
@@ -13,7 +12,7 @@
 void main() {
   group("with valid types, forwards", () {
     var wrapper;
-    var errorWrapper;
+    TypeSafeFuture<int> errorWrapper;
     setUp(() {
       wrapper = new TypeSafeFuture<int>(new Future<Object>.value(12));
 
@@ -35,11 +34,11 @@
 
       expect(errorWrapper.catchError(expectAsync((error) {
         expect(error, equals("oh no"));
-        return "value";
+        return 42;
       }), test: expectAsync((error) {
         expect(error, equals("oh no"));
         return true;
-      })), completion(equals("value")));
+      })), completion(equals(42)));
     });
 
     test("then()", () {
@@ -71,7 +70,7 @@
   });
 
   group("with invalid types", () {
-    var wrapper;
+    TypeSafeFuture<int> wrapper;
     setUp(() {
       wrapper = new TypeSafeFuture<int>(new Future<Object>.value("foo"));
     });
@@ -104,14 +103,5 @@
           throwsCastError);
       });
     });
-
-    group("doesn't throw a CastError for", () {
-      test("catchError()", () {
-        // catchError has a Future<dynamic> return type, so even if there's no
-        // error we don't re-wrap the returned future.
-        expect(wrapper.catchError(expectAsync((_) {}, count: 0)),
-            completion(equals("foo")));
-      });
-    });
   });
 }
diff --git a/test/typed_wrapper/stream_subscription_test.dart b/test/typed_wrapper/stream_subscription_test.dart
index 30d3892..ca2ba86 100644
--- a/test/typed_wrapper/stream_subscription_test.dart
+++ b/test/typed_wrapper/stream_subscription_test.dart
@@ -4,7 +4,6 @@
 
 import 'dart:async';
 
-import "package:async/async.dart";
 import "package:async/src/typed/stream_subscription.dart";
 import "package:test/test.dart";
 
diff --git a/test/typed_wrapper/stream_test.dart b/test/typed_wrapper/stream_test.dart
index 4936120..05cde3a 100644
--- a/test/typed_wrapper/stream_test.dart
+++ b/test/typed_wrapper/stream_test.dart
@@ -4,7 +4,6 @@
 
 import 'dart:async';
 
-import "package:async/async.dart";
 import "package:async/src/typed/stream.dart";
 import "package:test/test.dart";
 
@@ -245,7 +244,7 @@
     });
 
     test("pipe()", () {
-      var consumer = new StreamController<T>();
+      var consumer = new StreamController();
       expect(wrapper.pipe(consumer), completes);
       expect(consumer.stream.toList(), completion(equals([1, 2, 3, 4, 5])));
     });
diff --git a/test/utils.dart b/test/utils.dart
index 7b65bcb..50e106b 100644
--- a/test/utils.dart
+++ b/test/utils.dart
@@ -11,10 +11,13 @@
 /// A zero-millisecond timer should wait until after all microtasks.
 Future flushMicrotasks() => new Future.delayed(Duration.ZERO);
 
+typedef void OptionalArgAction([a, b]);
+
 /// A generic unreachable callback function.
 ///
 /// Returns a function that fails the test if it is ever called.
-unreachable(String name) => ([a, b]) => fail("Unreachable: $name");
+OptionalArgAction unreachable(String name) =>
+        ([a, b]) => fail("Unreachable: $name");
 
 // TODO(nweiz): Use the version of this in test when test#418 is fixed.
 /// A matcher that runs a callback in its own zone and asserts that that zone