Make pub strong-mode clean.

R=rnystrom@google.com

Review URL: https://codereview.chromium.org//2184303002 .
diff --git a/.analysis_options b/.analysis_options
new file mode 100644
index 0000000..a10d4c5
--- /dev/null
+++ b/.analysis_options
@@ -0,0 +1,2 @@
+analyzer:
+  strong-mode: true
diff --git a/lib/src/ascii_tree.dart b/lib/src/ascii_tree.dart
index 8a7734e..ec70f7f 100644
--- a/lib/src/ascii_tree.dart
+++ b/lib/src/ascii_tree.dart
@@ -60,12 +60,13 @@
 /// will have their contents truncated. Defaults to `false`.
 String fromFiles(List<String> files, {String baseDir, bool showAllChildren}) {
   // Parse out the files into a tree of nested maps.
-  var root = {};
+  var root = <String, Map>{};
   for (var file in files) {
     if (baseDir != null) file = path.relative(file, from: baseDir);
     var directory = root;
     for (var part in path.split(file)) {
-      directory = directory.putIfAbsent(part, () => {});
+      directory = directory.putIfAbsent(part, () => <String, Map>{})
+          as Map<String, Map>;
     }
   }
 
@@ -97,7 +98,7 @@
 ///
 /// If [showAllChildren] is `false`, then directories with more than ten items
 /// will have their contents truncated. Defaults to `false`.
-String fromMap(Map map, {bool showAllChildren}) {
+String fromMap(Map<String, Map> map, {bool showAllChildren}) {
   var buffer = new StringBuffer();
   _draw(buffer, "", null, map, showAllChildren: showAllChildren);
   return buffer.toString();
@@ -125,8 +126,9 @@
   return log.gray("|   ");
 }
 
-void _draw(StringBuffer buffer, String prefix, String name, Map children,
-           {bool showAllChildren, bool isLast: false}) {
+void _draw(
+    StringBuffer buffer, String prefix, String name, Map<String, Map> children,
+    {bool showAllChildren, bool isLast: false}) {
   if (showAllChildren == null) showAllChildren = false;
 
   // Don't draw a line for the root node.
@@ -137,8 +139,13 @@
 
   drawChild(bool isLastChild, String child) {
     var childPrefix = _getPrefix(name == null, isLast);
-    _draw(buffer, '$prefix$childPrefix', child, children[child],
-        showAllChildren: showAllChildren, isLast: isLastChild);
+    _draw(
+        buffer,
+        '$prefix$childPrefix',
+        child,
+        children[child] as Map<String, Map>,
+        showAllChildren: showAllChildren,
+        isLast: isLastChild);
   }
 
   if (name == null || showAllChildren || childNames.length <= 10) {
diff --git a/lib/src/asset/dart/serialize.dart b/lib/src/asset/dart/serialize.dart
index f95022d..7af2941 100644
--- a/lib/src/asset/dart/serialize.dart
+++ b/lib/src/asset/dart/serialize.dart
@@ -5,6 +5,7 @@
 import 'dart:async';
 import 'dart:isolate';
 
+import 'package:async/async.dart';
 import 'package:barback/barback.dart';
 
 //# if source_span
@@ -12,7 +13,6 @@
 //# end
 
 import 'serialize/exception.dart';
-import 'utils.dart';
 
 export 'serialize/aggregate_transform.dart';
 export 'serialize/exception.dart';
@@ -77,7 +77,7 @@
 /// Converts [stream] into a serializable map.
 ///
 /// [serializeEvent] is used to serialize each event from the stream.
-Map serializeStream(Stream stream, serializeEvent(event)) {
+Map serializeStream/*<T>*/(Stream/*<T>*/ stream, serializeEvent(/*=T*/ event)) {
   var receivePort = new ReceivePort();
   var map = {'replyTo': receivePort.sendPort};
 
@@ -102,8 +102,9 @@
 /// Converts a serializable map into a [Stream].
 ///
 /// [deserializeEvent] is used to deserialize each event from the stream.
-Stream deserializeStream(Map stream, deserializeEvent(event)) {
-  return callbackStream(() {
+Stream/*<T>*/ deserializeStream/*<T>*/(Map stream,
+    /*=T*/ deserializeEvent(event)) {
+  return new LazyStream(() {
     var receivePort = new ReceivePort();
     stream['replyTo'].send({'replyTo': receivePort.sendPort});
 
@@ -133,15 +134,18 @@
 ///
 /// The returned Future will complete to the value or error returned by
 /// [respond].
-Future call(SendPort port, message) {
+Future/*<T>*/ call/*<T>*/(SendPort port, message) {
   var receivePort = new ReceivePort();
   port.send({
     'message': message,
     'replyTo': receivePort.sendPort
   });
 
-  return receivePort.first.then((response) {
-    if (response['type'] == 'success') return response['value'];
+  return new Future.sync(() async {
+    var response = await receivePort.first;
+    if (response['type'] == 'success') {
+      return response['value'] as dynamic/*=T*/;
+    }
     assert(response['type'] == 'error');
     var exception = deserializeException(response['error']);
     return new Future.error(exception, exception.stackTrace);
diff --git a/lib/src/asset/dart/serialize/aggregate_transform.dart b/lib/src/asset/dart/serialize/aggregate_transform.dart
index e529cae..7b61354 100644
--- a/lib/src/asset/dart/serialize/aggregate_transform.dart
+++ b/lib/src/asset/dart/serialize/aggregate_transform.dart
@@ -19,7 +19,8 @@
 /// serialized transform. [methodHandlers] is a set of additional methods. Each
 /// value should take a JSON message and return the response (which may be a
 /// Future).
-Map _serializeBaseAggregateTransform(transform, Map additionalFields,
+Map _serializeBaseAggregateTransform(transform,
+    Map<String, dynamic> additionalFields,
     Map<String, Function> methodHandlers) {
   var receivePort = new ReceivePort();
   receivePort.listen((wrappedMessage) {
diff --git a/lib/src/asset/dart/serialize/exception.dart b/lib/src/asset/dart/serialize/exception.dart
index d870a6e..f74537a 100644
--- a/lib/src/asset/dart/serialize/exception.dart
+++ b/lib/src/asset/dart/serialize/exception.dart
@@ -20,7 +20,8 @@
 
   /// The exception's message, or its [toString] if it didn't expose a `message`
   /// property.
-  final String message;
+  String get message => _message;
+  final String _message;
 
   /// The exception's stack chain, or `null` if no stack chain was available.
   final Chain stackTrace;
@@ -30,7 +31,7 @@
   /// [error] should be the result of [CrossIsolateException.serialize].
   CrossIsolateException.deserialize(Map error)
       : type = error['type'],
-        message = error['message'],
+        _message = error['message'],
         stackTrace = error['stack'] == null ? null :
             new Chain.parse(error['stack']);
 
diff --git a/lib/src/asset/dart/serialize/get_input_transform.dart b/lib/src/asset/dart/serialize/get_input_transform.dart
index be22f57..328d72d 100644
--- a/lib/src/asset/dart/serialize/get_input_transform.dart
+++ b/lib/src/asset/dart/serialize/get_input_transform.dart
@@ -5,10 +5,9 @@
 import 'dart:async';
 import 'dart:convert';
 
+import 'package:async/async.dart';
 import 'package:barback/barback.dart';
 
-import '../utils.dart';
-
 /// A mixin for transforms that support [getInput] and the associated suite of
 /// methods.
 abstract class GetInputTransform {
@@ -21,7 +20,7 @@
   }
 
   Stream<List<int>> readInput(AssetId id) =>
-      futureStream(getInput(id).then((input) => input.read()));
+      StreamCompleter.fromFuture(getInput(id).then((input) => input.read()));
 
   Future<bool> hasInput(AssetId id) {
     return getInput(id).then((_) => true).catchError((error) {
diff --git a/lib/src/asset/dart/serialize/transform.dart b/lib/src/asset/dart/serialize/transform.dart
index f56408d..14aa6d2 100644
--- a/lib/src/asset/dart/serialize/transform.dart
+++ b/lib/src/asset/dart/serialize/transform.dart
@@ -18,7 +18,7 @@
 /// serialized transform. [methodHandlers] is a set of additional methods. Each
 /// value should take a JSON message and return the response (which may be a
 /// Future).
-Map _serializeBaseTransform(transform, Map additionalFields,
+Map _serializeBaseTransform(transform, Map<String, dynamic> additionalFields,
     Map<String, Function> methodHandlers) {
   var receivePort = new ReceivePort();
   receivePort.listen((wrappedMessage) {
@@ -48,7 +48,8 @@
     });
   });
 
-  return {'port': receivePort.sendPort}..addAll(additionalFields);
+  return <String, dynamic>{'port': receivePort.sendPort}
+      ..addAll(additionalFields);
 }
 
 /// Converts [transform] into a serializable map.
diff --git a/lib/src/asset/dart/utils.dart b/lib/src/asset/dart/utils.dart
index 7baf27c..5b910d8 100644
--- a/lib/src/asset/dart/utils.dart
+++ b/lib/src/asset/dart/utils.dart
@@ -4,7 +4,6 @@
 
 /// Functions go in this file as opposed to lib/src/utils.dart if they need to
 /// be accessible to the transformer-loading isolate.
-import 'dart:async';
 
 /// A regular expression to match the exception prefix that some exceptions'
 /// [Object.toString] values contain.
@@ -16,69 +15,3 @@
 /// [toString], so we remove that if it exists.
 String getErrorMessage(error) =>
   error.toString().replaceFirst(_exceptionPrefix, '');
-
-/// Returns a buffered stream that will emit the same values as the stream
-/// returned by [future] once [future] completes.
-///
-/// If [future] completes to an error, the return value will emit that error and
-/// then close.
-///
-/// If [broadcast] is true, a broadcast stream is returned. This assumes that
-/// the stream returned by [future] will be a broadcast stream as well.
-/// [broadcast] defaults to false.
-Stream futureStream(Future<Stream> future, {bool broadcast: false}) {
-  var subscription;
-  var controller;
-
-  future = future.catchError((e, stackTrace) {
-    // Since [controller] is synchronous, it's likely that emitting an error
-    // will cause it to be cancelled before we call close.
-    if (controller != null) controller.addError(e, stackTrace);
-    if (controller != null) controller.close();
-    controller = null;
-  });
-
-  onListen() {
-    future.then((stream) {
-      if (controller == null) return;
-      subscription = stream.listen(
-          controller.add,
-          onError: controller.addError,
-          onDone: controller.close);
-    });
-  }
-
-  onCancel() {
-    if (subscription != null) subscription.cancel();
-    subscription = null;
-    controller = null;
-  }
-
-  if (broadcast) {
-    controller = new StreamController.broadcast(
-        sync: true, onListen: onListen, onCancel: onCancel);
-  } else {
-    controller = new StreamController(
-        sync: true, onListen: onListen, onCancel: onCancel);
-  }
-  return controller.stream;
-}
-
-/// Returns a [Stream] that will emit the same values as the stream returned by
-/// [callback].
-///
-/// [callback] will only be called when the returned [Stream] gets a subscriber.
-Stream callbackStream(Stream callback()) {
-  var subscription;
-  var controller;
-  controller = new StreamController(onListen: () {
-    subscription = callback().listen(controller.add,
-        onError: controller.addError,
-        onDone: controller.close);
-  },
-      onCancel: () => subscription.cancel(),
-      onPause: () => subscription.pause(),
-      onResume: () => subscription.resume(),
-      sync: true);
-  return controller.stream;
-}
diff --git a/lib/src/barback.dart b/lib/src/barback.dart
index 6fd6245..a13ec89 100644
--- a/lib/src/barback.dart
+++ b/lib/src/barback.dart
@@ -37,7 +37,8 @@
 final pubConstraints = {
   "barback": new VersionConstraint.parse(">=0.15.0 <0.15.3"),
   "source_span": new VersionConstraint.parse(">=1.0.0 <2.0.0"),
-  "stack_trace": new VersionConstraint.parse(">=0.9.1 <2.0.0")
+  "stack_trace": new VersionConstraint.parse(">=0.9.1 <2.0.0"),
+  "async": new VersionConstraint.parse(">=1.8.0 <2.0.0")
 };
 
 /// Converts [id] to a "package:" URI.
diff --git a/lib/src/barback/asset_environment.dart b/lib/src/barback/asset_environment.dart
index 8b04005..c776894 100644
--- a/lib/src/barback/asset_environment.dart
+++ b/lib/src/barback/asset_environment.dart
@@ -196,7 +196,7 @@
   /// that completes to the bound server.
   ///
   /// If [rootDirectory] is already being served, returns that existing server.
-  Future<BarbackServer> serveDirectory(String rootDirectory) {
+  Future<BarbackServer> serveDirectory(String rootDirectory) async {
     // See if there is already a server bound to the directory.
     var directory = _directories[rootDirectory];
     if (directory != null) {
@@ -231,11 +231,9 @@
         this, rootDirectory, _hostname, port);
     _directories[rootDirectory] = sourceDirectory;
 
-    return _provideDirectorySources(rootPackage, rootDirectory)
-        .then((subscription) {
-      sourceDirectory.watchSubscription = subscription;
-      return sourceDirectory.serve();
-    });
+    sourceDirectory.watchSubscription =
+        await _provideDirectorySources(rootPackage, rootDirectory);
+    return await sourceDirectory.serve();
   }
 
   /// Binds a new port to serve assets from within the "bin" directory of
@@ -308,18 +306,15 @@
   /// Also removes any source files within that directory from barback. Returns
   /// the URL of the unbound server, of `null` if [rootDirectory] was not
   /// bound to a server.
-  Future<Uri> unserveDirectory(String rootDirectory) {
+  Future<Uri> unserveDirectory(String rootDirectory) async {
     log.fine("Unserving $rootDirectory.");
     var directory = _directories.remove(rootDirectory);
     if (directory == null) return new Future.value();
 
-    return directory.server.then((server) {
-      var url = server.url;
-      return directory.close().then((_) {
-        _removeDirectorySources(rootDirectory);
-        return url;
-      });
-    });
+    var url = (await directory.server).url;
+    await directory.close();
+    _removeDirectorySources(rootDirectory);
+    return url;
   }
 
   /// Gets the source directory that contains [assetPath] within the entrypoint
@@ -333,15 +328,12 @@
           .directory;
 
   /// Return all URLs serving [assetPath] in this environment.
-  Future<List<Uri>> getUrlsForAssetPath(String assetPath) {
+  Future<List<Uri>> getUrlsForAssetPath(String assetPath) async {
     // Check the three (mutually-exclusive) places the path could be pointing.
-    return _lookUpPathInServerRoot(assetPath).then((urls) {
-      if (urls.isNotEmpty) return urls;
-      return _lookUpPathInPackagesDirectory(assetPath);
-    }).then((urls) {
-      if (urls.isNotEmpty) return urls;
-      return _lookUpPathInDependency(assetPath);
-    });
+    var urls = await _lookUpPathInServerRoot(assetPath);
+    if (urls.isEmpty) urls = await _lookUpPathInPackagesDirectory(assetPath);
+    if (urls.isEmpty) urls = await _lookUpPathInDependency(assetPath);
+    return urls;
   }
 
   /// Look up [assetPath] in the root directories of servers running in the
diff --git a/lib/src/barback/barback_server.dart b/lib/src/barback/barback_server.dart
index 19082c9..6d0a5dc 100644
--- a/lib/src/barback/barback_server.dart
+++ b/lib/src/barback/barback_server.dart
@@ -5,6 +5,7 @@
 import 'dart:async';
 import 'dart:io';
 
+import 'package:async/async.dart';
 import 'package:barback/barback.dart';
 import 'package:mime/mime.dart';
 import 'package:path/path.dart' as path;
@@ -156,9 +157,10 @@
   /// Returns the body of [asset] as a response to [request].
   Future<shelf.Response> _serveAsset(shelf.Request request, Asset asset) async {
     try {
-      var pair = tee(await validateStream(asset.read()));
-      var responseStream = pair.first;
-      var hashStream = pair.last;
+      var streams = StreamSplitter.splitFrom(
+          await validateStream(asset.read()));
+      var responseStream = streams.first;
+      var hashStream = streams.last;
 
       // Allow the asset to be cached based on its content hash.
       var assetSha = await sha1Stream(hashStream);
diff --git a/lib/src/barback/cycle_exception.dart b/lib/src/barback/cycle_exception.dart
index 45f4362..8931ddf 100644
--- a/lib/src/barback/cycle_exception.dart
+++ b/lib/src/barback/cycle_exception.dart
@@ -27,7 +27,7 @@
     if (_step == null) return [];
 
     var exception = this;
-    var steps = [];
+    var steps = <String>[];
     while (exception != null) {
       steps.add(exception._step);
       exception = exception._next;
diff --git a/lib/src/barback/dart2js_transformer.dart b/lib/src/barback/dart2js_transformer.dart
index 431c590..94d05d1 100644
--- a/lib/src/barback/dart2js_transformer.dart
+++ b/lib/src/barback/dart2js_transformer.dart
@@ -7,6 +7,7 @@
 
 import 'package:analyzer/analyzer.dart';
 import 'package:barback/barback.dart';
+import 'package:collection/collection.dart';
 import 'package:path/path.dart' as p;
 import 'package:pool/pool.dart';
 
@@ -154,7 +155,7 @@
 
     var options = _settings.configuration['commandLineOptions'];
     if (options is List && options.every((option) => option is String)) {
-      return options;
+      return DelegatingList.typed(options);
     }
 
     throw new FormatException('Invalid value for '
@@ -172,7 +173,9 @@
     if (environment is Map &&
         environment.keys.every((key) => key is String) &&
         environment.values.every((key) => key is String)) {
-      return mergeMaps(environment, _environment.environmentConstants);
+      return mergeMaps(
+          DelegatingMap.typed(environment),
+          _environment.environmentConstants);
     }
 
     throw new FormatException('Invalid value for \$dart2js.environment: '
diff --git a/lib/src/barback/dependency_computer.dart b/lib/src/barback/dependency_computer.dart
index 1166184..fedc208 100644
--- a/lib/src/barback/dependency_computer.dart
+++ b/lib/src/barback/dependency_computer.dart
@@ -69,7 +69,7 @@
   /// `T1` to `T2`.
   Map<TransformerId, Set<TransformerId>> transformersNeededByTransformers(
       [Iterable<TransformerId> transformers]) {
-    var result = {};
+    var result = <TransformerId, Set<TransformerId>>{};
 
     if (transformers == null) {
       transformers = ordered(_graph.packages.keys).expand((packageName) {
@@ -162,8 +162,8 @@
       return _transformersNeededByPackages[rootPackage];
     }
 
-    var results = new Set();
-    var seen = new Set();
+    var results = new Set<TransformerId>();
+    var seen = new Set<String>();
 
     traversePackage(packageName) {
       if (seen.contains(packageName)) return;
@@ -389,10 +389,10 @@
       return _transitiveExternalDirectives[rootLibrary];
     }
 
-    var results = new Set();
-    var seen = new Set();
+    var results = new Set<Uri>();
+    var seen = new Set<String>();
 
-    traverseLibrary(library) {
+    traverseLibrary(String library) {
       library = p.normalize(library);
       if (seen.contains(library)) return true;
       seen.add(library);
diff --git a/lib/src/barback/foreign_transformer.dart b/lib/src/barback/foreign_transformer.dart
index 5ec7734..147dbdd 100644
--- a/lib/src/barback/foreign_transformer.dart
+++ b/lib/src/barback/foreign_transformer.dart
@@ -123,9 +123,11 @@
   final String _toString;
 
   _ForeignGroup(TransformerConfig config, Map map)
-      : phases = map['phases'].map((phase) {
-          return phase.map((transformer) => deserializeTransformerLike(
-              transformer, config)).toList();
+      : phases = (map['phases'] as List).map((phase) {
+          return (phase as List)
+              .map((transformer) =>
+                  deserializeTransformerLike(transformer, config))
+              .toList();
         }).toList(),
         _toString = map['toString'];
 
diff --git a/lib/src/barback/load_all_transformers.dart b/lib/src/barback/load_all_transformers.dart
index f21df9e..3ebc1d7 100644
--- a/lib/src/barback/load_all_transformers.dart
+++ b/lib/src/barback/load_all_transformers.dart
@@ -32,7 +32,7 @@
 
   // If we only need to load transformers for a specific set of entrypoints,
   // remove any other transformers from [transformersNeededByTransformers].
-  var necessaryTransformers;
+  Set<TransformerId> necessaryTransformers;
   if (entrypoints != null) {
     if (entrypoints.isEmpty) return;
 
@@ -126,10 +126,10 @@
     Map<TransformerId, Set<TransformerId>> transformerDependencies) {
   // A map from transformer ids to the indices of the stages that those
   // transformer ids should end up in. Populated by [stageNumberFor].
-  var stageNumbers = {};
-  var stages = [];
+  var stageNumbers = <TransformerId, int>{};
+  var stages = <Set<TransformerId>>[];
 
-  stageNumberFor(id) {
+  stageNumberFor(TransformerId id) {
     // Built-in transformers don't have to be loaded in stages, since they're
     // run from pub's source. Return -1 so that the "next stage" is 0.
     if (id.isBuiltInTransformer) return -1;
@@ -155,7 +155,7 @@
 /// transformer.
 Map<TransformerId, Set<String>> _packagesThatUseTransformers(
     PackageGraph graph) {
-  var results = {};
+  var results = <TransformerId, Set<String>>{};
   for (var package in graph.packages.values) {
     for (var phase in package.pubspec.transformers) {
       for (var config in phase) {
diff --git a/lib/src/barback/pub_package_provider.dart b/lib/src/barback/pub_package_provider.dart
index 13dc9c9..71aafcf 100644
--- a/lib/src/barback/pub_package_provider.dart
+++ b/lib/src/barback/pub_package_provider.dart
@@ -5,14 +5,17 @@
 import 'dart:async';
 import 'dart:io';
 
+import 'package:async/async.dart';
 import 'package:barback/barback.dart';
+import 'package:collection/collection.dart';
 import 'package:path/path.dart' as path;
+import 'package:pub_semver/pub_semver.dart';
 
 import '../io.dart';
+import '../package.dart';
 import '../package_graph.dart';
 import '../preprocess.dart';
 import '../sdk.dart' as sdk;
-import '../utils.dart';
 
 /// The path to the lib directory of the compiler_unsupported package used by
 /// pub.
@@ -63,7 +66,8 @@
         return new Asset.fromPath(id, file);
       }
 
-      var versions = mapMap(_graph.packages,
+      var versions = mapMap/*<String, Package, String, Version>*/(
+          _graph.packages,
           value: (_, package) => package.version);
       var contents = readTextFile(file);
       contents = preprocess(contents, versions, path.toUri(file));
@@ -95,7 +99,7 @@
       var file = path.join(_compilerUnsupportedLib, 'sdk',
           path.joinAll(parts.skip(1))) + "_";
       _assertExists(file, id);
-      return new Asset.fromStream(id, callbackStream(() =>
+      return new Asset.fromStream(id, new LazyStream(() =>
           _zlib.decoder.bind(new File(file).openRead())));
     }
 
diff --git a/lib/src/barback/transformer_config.dart b/lib/src/barback/transformer_config.dart
index 67a8cba..896c7d6 100644
--- a/lib/src/barback/transformer_config.dart
+++ b/lib/src/barback/transformer_config.dart
@@ -79,7 +79,7 @@
           configuration);
 
   factory TransformerConfig(TransformerId id, YamlMap configurationNode) {
-    parseField(key) {
+    Set<Glob> parseField(String key) {
       if (!configurationNode.containsKey(key)) return null;
       var fieldNode = configurationNode.nodes[key];
       var field = fieldNode.value;
@@ -103,11 +103,10 @@
       }));
     }
 
-    var includes = null;
-    var excludes = null;
-
-    var configuration;
-    var span;
+    Set<Glob> includes;
+    Set<Glob> excludes;
+    Map configuration;
+    SourceSpan span;
     if (configurationNode == null) {
       configuration = {};
       span = id.span;
diff --git a/lib/src/barback/transformer_isolate.dart b/lib/src/barback/transformer_isolate.dart
index ae025c8..95a61e2 100644
--- a/lib/src/barback/transformer_isolate.dart
+++ b/lib/src/barback/transformer_isolate.dart
@@ -49,59 +49,59 @@
   /// path once the isolate is loaded.
   static Future<TransformerIsolate> spawn(AssetEnvironment environment,
       BarbackServer transformerServer, List<TransformerId> ids,
-      {String snapshot}) {
-    return mapFromIterableAsync(ids, value: (id) {
-      return id.getAssetId(environment.barback);
-    }).then((idsToAssetIds) {
-      var baseUrl = transformerServer.url;
-      var idsToUrls = mapMap(idsToAssetIds, value: (id, assetId) {
-        var path = assetId.path.replaceFirst('lib/', '');
-        return Uri.parse('package:${id.package}/$path');
-      });
+      {String snapshot}) async {
+    var idsToAssetIds = <TransformerId, AssetId>{};
+    var idsToUrls = <TransformerId, Uri>{};
+    await Future.wait(ids.map((id) async {
+      var assetId = await id.getAssetId(environment.barback);
+      idsToAssetIds[id] = assetId;
 
-      var code = new StringBuffer();
-      code.writeln("import 'dart:isolate';");
+      var path = assetId.path.replaceFirst('lib/', '');
+      idsToUrls[id] = Uri.parse('package:${id.package}/$path');
+    }));
 
-      for (var url in idsToUrls.values) {
-        code.writeln("import '$url';");
-      }
+    var code = new StringBuffer();
+    code.writeln("import 'dart:isolate';");
 
-      code.writeln("import r'package:\$pub/transformer_isolate.dart';");
-      code.writeln(
-          "void main(_, SendPort replyTo) => loadTransformers(replyTo);");
+    for (var url in idsToUrls.values) {
+      code.writeln("import '$url';");
+    }
 
-      log.fine("Loading transformers from $ids");
+    code.writeln("import r'package:\$pub/transformer_isolate.dart';");
+    code.writeln(
+        "void main(_, SendPort replyTo) => loadTransformers(replyTo);");
 
-      var port = new ReceivePort();
-      return dart.runInIsolate(code.toString(), port.sendPort,
-              packageRoot: baseUrl.resolve('packages'),
-              snapshot: snapshot)
-          .then((_) => port.first)
-          .then((sendPort) {
-        return new TransformerIsolate._(sendPort, environment.mode, idsToUrls);
-      }).catchError((error, stackTrace) {
-        if (error is! CrossIsolateException) throw error;
-        if (error.type != 'IsolateSpawnException') throw error;
+    log.fine("Loading transformers from $ids");
 
-        // TODO(nweiz): don't parse this as a string once issues 12617 and 12689
-        // are fixed.
-        var firstErrorLine = error.message.split('\n')[1];
+    var port = new ReceivePort();
+    try {
+      await dart.runInIsolate(code.toString(), port.sendPort,
+          packageRoot: transformerServer.url.resolve('packages'),
+          snapshot: snapshot);
+      return new TransformerIsolate._(
+          await port.first, environment.mode, idsToUrls);
+    } on CrossIsolateException catch (error, stackTrace) {
+      if (error.type != 'IsolateSpawnException') throw error;
 
-        // The isolate error message contains the fully expanded path, not the
-        // "package:" URI, so we have to be liberal in what we look for in the
-        // error message.
-        var missingTransformer = idsToUrls.keys.firstWhere((id) =>
-            firstErrorLine.startsWith('Could not import "${idsToUrls[id]}"'),
-            orElse: () => throw error);
-        var packageUri = idToPackageUri(idsToAssetIds[missingTransformer]);
+      // TODO(nweiz): don't parse this as a string once issues 12617 and 12689
+      // are fixed.
+      var firstErrorLine = error.message.split('\n')[1];
 
-        // If there was an IsolateSpawnException and the import that actually
-        // failed was the one we were loading transformers from, throw an
-        // application exception with a more user-friendly message.
-        fail('Transformer library "$packageUri" not found.',
-            error, stackTrace);
-      });
-    });
+      // The isolate error message contains the fully expanded path, not the
+      // "package:" URI, so we have to be liberal in what we look for in the
+      // error message.
+      var missingTransformer = idsToUrls.keys.firstWhere((id) =>
+          firstErrorLine.startsWith('Could not import "${idsToUrls[id]}"'),
+          orElse: () => throw error);
+      var packageUri = idToPackageUri(idsToAssetIds[missingTransformer]);
+
+      // If there was an IsolateSpawnException and the import that actually
+      // failed was the one we were loading transformers from, throw an
+      // application exception with a more user-friendly message.
+      fail('Transformer library "$packageUri" not found.',
+          error, stackTrace);
+      return null;
+    }
   }
 
   TransformerIsolate._(this._port, this._mode, this._idsToUrls);
@@ -111,20 +111,20 @@
   ///
   /// If there are no transformers defined in the given library, this will
   /// return an empty set.
-  Future<Set<Transformer>> create(TransformerConfig config) {
-    return call(_port, {
-      'library': _idsToUrls[config.id].toString(),
-      'mode': _mode.name,
-      'configuration': JSON.encode(config.configuration)
-    }).then((transformers) {
-      transformers = transformers.map(
-          (transformer) => deserializeTransformerLike(transformer, config))
+  Future<Set<Transformer>> create(TransformerConfig config) async {
+    try {
+      var transformers = (await call/*<List>*/(_port, {
+        'library': _idsToUrls[config.id].toString(),
+        'mode': _mode.name,
+        'configuration': JSON.encode(config.configuration)
+      }))
+          .map((transformer) => deserializeTransformerLike(transformer, config))
           .toSet();
       log.fine("Transformers from $config: $transformers");
       return transformers;
-    }).catchError((error, stackTrace) {
+    } catch (error) {
       throw new TransformerLoadError(error, config.span);
-    });
+    }
   }
 }
 
diff --git a/lib/src/barback/web_socket_api.dart b/lib/src/barback/web_socket_api.dart
index a6ed9ad..b0a502e 100644
--- a/lib/src/barback/web_socket_api.dart
+++ b/lib/src/barback/web_socket_api.dart
@@ -28,7 +28,7 @@
   bool _exitOnClose = false;
 
   WebSocketApi(WebSocketChannel socket, this._environment)
-      : _server = new json_rpc.Server(socket) {
+      : _server = new json_rpc.Server(socket.cast()) {
     _server.registerMethod("urlToAssetId", _urlToAssetId);
     _server.registerMethod("pathToUrls", _pathToUrls);
     _server.registerMethod("serveDirectory", _serveDirectory);
@@ -116,7 +116,7 @@
       // see if assets exist, consider supporting implicit index.html at that
       // point.
 
-      var result = {"package": id.package, "path": id.path};
+      var result = <String, Object>{"package": id.package, "path": id.path};
 
       // Map the line.
       // TODO(rnystrom): Right now, source maps are not supported and it just
@@ -188,7 +188,9 @@
             'Asset path "$assetPath" is not currently being served.');
       }
 
-      var result = {"urls": urls.map((url) => url.toString()).toList()};
+      var result = <String, Object>{
+        "urls": urls.map((url) => url.toString()).toList()
+      };
 
       // Map the line.
       // TODO(rnystrom): Right now, source maps are not supported and it just
diff --git a/lib/src/command.dart b/lib/src/command.dart
index f201b4e..d19f900 100644
--- a/lib/src/command.dart
+++ b/lib/src/command.dart
@@ -84,6 +84,7 @@
       return int.parse(intString);
     } on FormatException catch (_) {
       usageException('Could not parse $name "$intString".');
+      return null;
     }
   }
 }
diff --git a/lib/src/command/barback.dart b/lib/src/command/barback.dart
index 1ac2ea9..3e07773 100644
--- a/lib/src/command/barback.dart
+++ b/lib/src/command/barback.dart
@@ -111,7 +111,7 @@
 
     // Make sure the directories don't overlap.
     var sources = sourceDirectories.toList();
-    var overlapping = new Set();
+    var overlapping = new Set<String>();
     for (var i = 0; i < sources.length; i++) {
       for (var j = i + 1; j < sources.length; j++) {
         if (path.isWithin(sources[i], sources[j]) ||
diff --git a/lib/src/command/build.dart b/lib/src/command/build.dart
index 38397d1..b4457a5 100644
--- a/lib/src/command/build.dart
+++ b/lib/src/command/build.dart
@@ -53,17 +53,19 @@
     var errorsJson = [];
     var logJson = [];
 
-    var environmentConstants = new Map.fromIterable(argResults["define"],
+    var environmentConstants = new Map<String, String>.fromIterable(
+        argResults["define"],
         key: (pair) => pair.split("=").first,
         value: (pair) => pair.split("=").last);
 
-    // Since this server will only be hit by the transformer loader and isn't
-    // user-facing, just use an IPv4 address to avoid a weird bug on the
-    // OS X buildbots.
-    return AssetEnvironment.create(entrypoint, mode,
+    try {
+      // Since this server will only be hit by the transformer loader and isn't
+      // user-facing, just use an IPv4 address to avoid a weird bug on the OS X
+      // buildbots.
+      var environment = await AssetEnvironment.create(entrypoint, mode,
             environmentConstants: environmentConstants,
-            useDart2JS: true)
-        .then((environment) {
+            useDart2JS: true);
+
       // Show in-progress errors, but not results. Those get handled
       // implicitly by getAllAssets().
       environment.barback.errors.listen((error) {
@@ -85,41 +87,38 @@
             (entry) => logJson.add(_logEntryToJson(entry)));
       }
 
-      return log.progress("Building ${entrypoint.root.name}", () {
+      var assets = await log.progress("Building ${entrypoint.root.name}",
+          () async {
         // Register all of the build directories.
         // TODO(rnystrom): We don't actually need to bind servers for these, we
         // just need to add them to barback's sources. Add support to
         // BuildEnvironment for going the latter without the former.
-        return Future.wait(sourceDirectories.map(
-            (dir) => environment.serveDirectory(dir))).then((_) {
+        await Future.wait(sourceDirectories.map(
+            (dir) => environment.serveDirectory(dir)));
 
-          return environment.barback.getAllAssets();
-        });
-      }).then((assets) {
-        // Find all of the JS entrypoints we built.
-        var dart2JSEntrypoints = assets
-            .where((asset) => asset.id.path.endsWith(".dart.js"))
-            .map((asset) => asset.id);
-
-        return Future.wait(assets.map(_writeAsset)).then((_) {
-          return _copyBrowserJsFiles(dart2JSEntrypoints, assets);
-        }).then((_) {
-          log.message('Built $builtFiles ${pluralize('file', builtFiles)} '
-              'to "$outputDirectory".');
-
-          log.json.message({
-            "buildResult": "success",
-            "outputDirectory": outputDirectory,
-            "numFiles": builtFiles,
-            "log": logJson
-          });
-        });
+        return environment.barback.getAllAssets();
       });
-    }).catchError((error) {
+
+      // Find all of the JS entrypoints we built.
+      var dart2JSEntrypoints = assets
+          .where((asset) => asset.id.path.endsWith(".dart.js"))
+          .map((asset) => asset.id);
+
+      await Future.wait(assets.map(_writeAsset));
+      await _copyBrowserJsFiles(dart2JSEntrypoints, assets);
+
+      log.message('Built $builtFiles ${pluralize('file', builtFiles)} '
+          'to "$outputDirectory".');
+
+      log.json.message({
+        "buildResult": "success",
+        "outputDirectory": outputDirectory,
+        "numFiles": builtFiles,
+        "log": logJson
+      });
+    } on BarbackException catch (_) {
       // If [getAllAssets()] throws a BarbackException, the error has already
       // been reported.
-      if (error is! BarbackException) throw error;
-
       log.error(log.red("Build failed."));
       log.json.message({
         "buildResult": "failure",
@@ -128,7 +127,7 @@
       });
 
       return flushThenExit(exit_codes.DATA);
-    });
+    }
   }
 
   /// Writes [asset] to the appropriate build directory.
diff --git a/lib/src/command/cache_repair.dart b/lib/src/command/cache_repair.dart
index 6c238fa..902f445 100644
--- a/lib/src/command/cache_repair.dart
+++ b/lib/src/command/cache_repair.dart
@@ -25,11 +25,11 @@
 
     // Repair every cached source.
     for (var source in cache.sources.all.map(cache.source)) {
-      if (source is! CachedSource) continue;
-
-      var results = await source.repairCachedPackages();
-      successes.addAll(results.first);
-      failures.addAll(results.last);
+      if (source is CachedSource) {
+        var results = await source.repairCachedPackages();
+        successes.addAll(results.first);
+        failures.addAll(results.last);
+      }
     }
 
     if (successes.length > 0) {
diff --git a/lib/src/command/deps.dart b/lib/src/command/deps.dart
index 8822978..dbbec13 100644
--- a/lib/src/command/deps.dart
+++ b/lib/src/command/deps.dart
@@ -144,11 +144,11 @@
     // The work list for the breadth-first traversal. It contains the package
     // being added to the tree, and the parent map that will receive that
     // package.
-    var toWalk = new Queue<Pair<Package, Map>>();
+    var toWalk = new Queue<Pair<Package, Map<String, Map>>>();
     var visited = new Set<String>.from([entrypoint.root.name]);
 
     // Start with the root dependencies.
-    var packageTree = {};
+    var packageTree = <String, Map>{};
     var immediateDependencies = entrypoint.root.immediateDependencies.toSet();
     if (!_includeDev) {
       immediateDependencies.removeAll(entrypoint.root.devDependencies);
@@ -171,7 +171,7 @@
       visited.add(package.name);
 
       // Populate the map with this package's dependencies.
-      var childMap = {};
+      var childMap = <String, Map>{};
       map[_labelPackage(package)] = childMap;
 
       for (var dep in package.dependencies) {
@@ -221,5 +221,6 @@
     if (package != null) return package;
     dataError('The pubspec.yaml file has changed since the pubspec.lock file '
         'was generated, please run "pub get" again.');
+    return null;
   }
 }
diff --git a/lib/src/command/global_activate.dart b/lib/src/command/global_activate.dart
index 5c389fa..0183d70 100644
--- a/lib/src/command/global_activate.dart
+++ b/lib/src/command/global_activate.dart
@@ -35,13 +35,13 @@
 
   Future run() {
     // Default to `null`, which means all executables.
-    var executables;
+    List<String> executables;
     if (argResults.wasParsed("executable")) {
       if (argResults.wasParsed("no-executables")) {
         usageException("Cannot pass both --no-executables and --executable.");
       }
 
-      executables = argResults["executable"];
+      executables = argResults["executable"] as List<String>;
     } else if (argResults["no-executables"]) {
       // An empty list means no executables.
       executables = [];
diff --git a/lib/src/command/lish.dart b/lib/src/command/lish.dart
index 3d10b89..3ed4c49 100644
--- a/lib/src/command/lish.dart
+++ b/lib/src/command/lish.dart
@@ -56,7 +56,7 @@
         help: 'The package server to which to upload this package.');
   }
 
-  Future _publish(packageBytes) async {
+  Future _publish(List<int> packageBytes) async {
     var cloudStorageUrl;
     try {
       await oauth2.withClient(cache, (client) {
diff --git a/lib/src/command/list_package_dirs.dart b/lib/src/command/list_package_dirs.dart
index b9d1b01..b1faeb6 100644
--- a/lib/src/command/list_package_dirs.dart
+++ b/lib/src/command/list_package_dirs.dart
@@ -2,6 +2,7 @@
 // for details. All rights reserved. Use of this source code is governed by a
 // BSD-style license that can be found in the LICENSE file.
 
+import 'package:collection/collection.dart';
 import 'package:path/path.dart' as p;
 
 import '../command.dart';
diff --git a/lib/src/command/serve.dart b/lib/src/command/serve.dart
index 1094b49..9052d7c 100644
--- a/lib/src/command/serve.dart
+++ b/lib/src/command/serve.dart
@@ -86,7 +86,8 @@
     var watcherType = argResults['force-poll'] ?
         WatcherType.POLLING : WatcherType.AUTO;
 
-    var environmentConstants = new Map.fromIterable(argResults["define"],
+    var environmentConstants = new Map<String, String>.fromIterable(
+        argResults["define"],
         key: (pair) => pair.split("=").first,
         value: (pair) => pair.split("=").last);
 
diff --git a/lib/src/command_runner.dart b/lib/src/command_runner.dart
index 09ecf40..22fb5c6 100644
--- a/lib/src/command_runner.dart
+++ b/lib/src/command_runner.dart
@@ -79,7 +79,7 @@
     addCommand(new VersionCommand());
   }
 
-  Future run(List<String> arguments) async {
+  Future run(Iterable<String> arguments) async {
     var options;
     try {
       options = super.parse(arguments);
diff --git a/lib/src/dart.dart b/lib/src/dart.dart
index c298ace..9449164 100644
--- a/lib/src/dart.dart
+++ b/lib/src/dart.dart
@@ -34,7 +34,7 @@
   /// the input file at that URI.
   ///
   /// The future can complete to a string or a list of bytes.
-  Future/*<String | List<int>>*/ provideInput(Uri uri);
+  Future provideInput(Uri uri);
 
   /// Reports a diagnostic message from dart2js to the user.
   void handleDiagnostic(Uri uri, int begin, int end, String message,
@@ -192,7 +192,7 @@
     if (snapshot == null) return;
 
     ensureDir(p.dirname(snapshot));
-    var snapshotArgs = [];
+    var snapshotArgs = <String>[];
     if (packageRoot != null) snapshotArgs.add('--package-root=$packageRoot');
     snapshotArgs.addAll(['--snapshot=$snapshot', dartPath]);
     var result = await runProcess(Platform.executable, snapshotArgs);
diff --git a/lib/src/entrypoint.dart b/lib/src/entrypoint.dart
index cc58a3e..31aa24d 100644
--- a/lib/src/entrypoint.dart
+++ b/lib/src/entrypoint.dart
@@ -107,7 +107,8 @@
     if (_packageGraph != null) return _packageGraph;
 
     assertUpToDate();
-    var packages = new Map.fromIterable(lockFile.packages.values,
+    var packages = new Map<String, Package>.fromIterable(
+        lockFile.packages.values,
         key: (id) => id.name,
         value: (id) => cache.load(id));
     packages[root.name] = root;
@@ -294,7 +295,7 @@
   ///
   /// If [changed] is passed, only dependencies whose contents might be changed
   /// if one of the given packages changes will be returned.
-  Set<String> _dependenciesToPrecompile({Iterable<String> changed}) {
+  Set<String> _dependenciesToPrecompile({Set<String> changed}) {
     return packageGraph.packages.values.where((package) {
       if (package.pubspec.transformers.isEmpty) return false;
       if (packageGraph.isPackageMutable(package.name)) return false;
@@ -695,13 +696,13 @@
   /// Recursively lists the contents of [dir], excluding hidden `.DS_Store`
   /// files and `package` files.
   List<String> _listDirWithoutPackages(dir) {
-    return flatten(listDir(dir).map((file) {
+    return listDir(dir).expand/*<String>*/((file) {
       if (p.basename(file) == 'packages') return [];
       if (!dirExists(file)) return [];
       var fileAndSubfiles = [file];
       fileAndSubfiles.addAll(_listDirWithoutPackages(file));
       return fileAndSubfiles;
-    }));
+    });
   }
 
   /// If [packageSymlinks] is true, creates a symlink to the "packages"
diff --git a/lib/src/error_group.dart b/lib/src/error_group.dart
index da86be1..77a1c8d 100644
--- a/lib/src/error_group.dart
+++ b/lib/src/error_group.dart
@@ -65,7 +65,7 @@
   ///
   /// If all members of [this] have already completed successfully or with an
   /// error, it's a [StateError] to try to register a new [Future].
-  Future registerFuture(Future future) {
+  Future/*<T>*/ registerFuture/*<T>*/(Future/*<T>*/ future) {
     if (_isDone) {
       throw new StateError("Can't register new members on a complete "
           "ErrorGroup.");
@@ -88,7 +88,7 @@
   ///
   /// If all members of [this] have already completed successfully or with an
   /// error, it's a [StateError] to try to register a new [Stream].
-  Stream registerStream(Stream stream) {
+  Stream/*<T>*/ registerStream/*<T>*/(Stream/*<T>*/ stream) {
     if (_isDone) {
       throw new StateError("Can't register new members on a complete "
           "ErrorGroup.");
@@ -161,7 +161,7 @@
 ///
 /// It also notifies its parent [ErrorGroup] when it completes successfully or
 /// receives an error.
-class _ErrorGroupFuture implements Future {
+class _ErrorGroupFuture<T> implements Future<T> {
   /// The parent [ErrorGroup].
   final ErrorGroup _group;
 
@@ -169,14 +169,14 @@
   var _isDone = false;
 
   /// The underlying [Completer] for [this].
-  final _completer = new Completer();
+  final _completer = new Completer<T>();
 
   /// Whether [this] has any listeners.
   bool _hasListeners = false;
 
   /// Creates a new [_ErrorGroupFuture] that's a child of [_group] and wraps
   /// [inner].
-  _ErrorGroupFuture(this._group, Future inner) {
+  _ErrorGroupFuture(this._group, Future<T> inner) {
     inner.then((value) {
       if (!_isDone) _completer.complete(value);
       _isDone = true;
@@ -188,27 +188,27 @@
     _completer.future.catchError((_) {});
   }
 
-  Future then(onValue(value), {Function onError}) {
+  Future/*<S>*/ then/*<S>*/(/*=S*/ onValue(/*=T*/ value), {Function onError}) {
     _hasListeners = true;
     return _completer.future.then(onValue, onError: onError);
   }
 
-  Future catchError(Function onError, {bool test(Object error)}) {
+  Future<T> catchError(Function onError, {bool test(Object error)}) {
     _hasListeners = true;
     return _completer.future.catchError(onError, test: test);
   }
 
-  Future whenComplete(void action()) {
+  Future<T> whenComplete(void action()) {
     _hasListeners = true;
     return _completer.future.whenComplete(action);
   }
 
-  Future timeout(Duration timeLimit, {void onTimeout()}) {
+  Future<T> timeout(Duration timeLimit, {onTimeout()}) {
     _hasListeners = true;
     return _completer.future.timeout(timeLimit, onTimeout: onTimeout);
   }
 
-  Stream asStream() {
+  Stream<T> asStream() {
     _hasListeners = true;
     return _completer.future.asStream();
   }
@@ -229,7 +229,7 @@
 ///
 /// It also notifies its parent [ErrorGroup] when it completes successfully or
 /// receives an error.
-class _ErrorGroupStream extends Stream {
+class _ErrorGroupStream<T> extends Stream<T> {
   /// The parent [ErrorGroup].
   final ErrorGroup _group;
 
@@ -237,24 +237,24 @@
   var _isDone = false;
 
   /// The underlying [StreamController] for [this].
-  final StreamController _controller;
+  final StreamController<T> _controller;
 
   /// The controller's [Stream].
   ///
   /// May be different than `_controller.stream` if the wrapped stream is a
   /// broadcasting stream.
-  Stream _stream;
+  Stream<T> _stream;
 
   /// The [StreamSubscription] that connects the wrapped [Stream] to
   /// [_controller].
-  StreamSubscription _subscription;
+  StreamSubscription<T> _subscription;
 
   /// Whether [this] has any listeners.
   bool get _hasListeners => _controller.hasListener;
 
   /// Creates a new [_ErrorGroupFuture] that's a child of [_group] and wraps
   /// [inner].
-  _ErrorGroupStream(this._group, Stream inner)
+  _ErrorGroupStream(this._group, Stream<T> inner)
     : _controller = new StreamController(sync: true) {
     // Use old-style asBroadcastStream behavior - cancel source _subscription
     // the first time the stream has no listeners.
@@ -272,7 +272,7 @@
     });
   }
 
-  StreamSubscription listen(void onData(value),
+  StreamSubscription<T> listen(void onData(T value),
       {Function onError, void onDone(),
        bool cancelOnError}) {
     return _stream.listen(onData,
diff --git a/lib/src/executable.dart b/lib/src/executable.dart
index c8e38ff..adc134b 100644
--- a/lib/src/executable.dart
+++ b/lib/src/executable.dart
@@ -5,6 +5,7 @@
 import 'dart:async';
 import 'dart:io';
 
+import 'package:async/async.dart';
 import 'package:barback/barback.dart';
 import 'package:path/path.dart' as p;
 
@@ -93,7 +94,7 @@
   // "bin".
   if (p.split(executable).length == 1) executable = p.join("bin", executable);
 
-  var vmArgs = [];
+  var vmArgs = <String>[];
 
   // Run in checked mode.
   if (checked) vmArgs.add("--checked");
@@ -218,7 +219,7 @@
     String packagesFile, bool checked: false}) async {
   // TODO(nweiz): pass a flag to silence the "Wrong full snapshot version"
   // message when issue 20784 is fixed.
-  var vmArgs = [];
+  var vmArgs = <String>[];
   if (checked) vmArgs.add("--checked");
 
   if (packagesFile != null) {
@@ -239,9 +240,9 @@
   if (recompile == null) {
     stdin1 = stdin;
   } else {
-    var pair = tee(stdin);
-    stdin1 = pair.first;
-    stdin2 = pair.last;
+    var stdins = StreamSplitter.splitFrom(stdin);
+    stdin1 = stdins.first;
+    stdin2 = stdins.last;
   }
 
   runProcess(input) async {
diff --git a/lib/src/global_packages.dart b/lib/src/global_packages.dart
index 1c05501..2265e53 100644
--- a/lib/src/global_packages.dart
+++ b/lib/src/global_packages.dart
@@ -211,9 +211,7 @@
     if (id.isRoot) return;
 
     var source = cache.source(id.source);
-    if (source is! CachedSource) return;
-
-    await source.downloadToSystemCache(id);
+    if (source is CachedSource) await source.downloadToSystemCache(id);
   }
 
   /// Finishes activating package [package] by saving [lockFile] in the cache.
@@ -432,7 +430,7 @@
   /// were successfully re-activated; the second indicates which failed.
   Future<Pair<List<String>, List<String>>> repairActivatedPackages()
       async {
-    var executables = {};
+    var executables = <String, List<String>>{};
     if (dirExists(_binStubDir)) {
       for (var entry in listDir(_binStubDir)) {
         try {
@@ -459,8 +457,8 @@
       }
     }
 
-    var successes = [];
-    var failures = [];
+    var successes = <String>[];
+    var failures = <String>[];
     if (dirExists(_directory)) {
       for (var entry in listDir(_directory)) {
         var id;
@@ -551,8 +549,8 @@
 
     ensureDir(_binStubDir);
 
-    var installed = [];
-    var collided = {};
+    var installed = <String>[];
+    var collided = <String, String>{};
     var allExecutables = ordered(package.pubspec.executables.keys);
     for (var executable in allExecutables) {
       if (executables != null && !executables.contains(executable)) continue;
diff --git a/lib/src/io.dart b/lib/src/io.dart
index 22ed041..69248cd 100644
--- a/lib/src/io.dart
+++ b/lib/src/io.dart
@@ -8,6 +8,7 @@
 import 'dart:convert';
 import 'dart:io';
 
+import 'package:async/async.dart';
 import 'package:path/path.dart' as path;
 import 'package:pool/pool.dart';
 import 'package:http/http.dart' show ByteStream;
@@ -194,11 +195,10 @@
   // TODO(nweiz): remove extra logging when we figure out the windows bot issue.
   log.io("Creating $file from stream.");
 
-  return _descriptorPool.withResource(() {
-    return stream.pipe(new File(file).openWrite()).then((_) {
-      log.fine("Created $file from stream.");
-      return file;
-    });
+  return _descriptorPool.withResource/*<Future<String>>*/(() async {
+    await stream.pipe(new File(file).openWrite());
+    log.fine("Created $file from stream.");
+    return file;
   });
 }
 
@@ -639,10 +639,11 @@
 /// Returns a [EventSink] that pipes all data to [consumer] and a [Future] that
 /// will succeed when [EventSink] is closed or fail with any errors that occur
 /// while writing.
-Pair<EventSink, Future> consumerToSink(StreamConsumer consumer) {
-  var controller = new StreamController(sync: true);
+Pair<EventSink/*<T>*/, Future> consumerToSink/*<T>*/(
+    StreamConsumer/*<T>*/ consumer) {
+  var controller = new StreamController/*<T>*/(sync: true);
   var done = controller.stream.pipe(consumer);
-  return new Pair<EventSink, Future>(controller.sink, done);
+  return new Pair(controller.sink, done);
 }
 
 // TODO(nweiz): remove this when issue 7786 is fixed.
@@ -801,12 +802,10 @@
     _stdin = pair.first;
     _stdinClosed = errorGroup.registerFuture(pair.last);
 
-    _stdout = new ByteStream(
-        errorGroup.registerStream(process.stdout));
-    _stderr = new ByteStream(
-        errorGroup.registerStream(process.stderr));
+    _stdout = new ByteStream(errorGroup.registerStream(process.stdout));
+    _stderr = new ByteStream(errorGroup.registerStream(process.stderr));
 
-    var exitCodeCompleter = new Completer();
+    var exitCodeCompleter = new Completer<int>();
     _exitCode = errorGroup.registerFuture(exitCodeCompleter.future);
     _process.exitCode.then((code) => exitCodeCompleter.complete(code));
   }
@@ -829,7 +828,7 @@
   // have any path separators in it), then spawn it through a shell.
   if ((Platform.operatingSystem == "windows") &&
       (executable.indexOf('\\') == -1)) {
-    args = flatten(["/c", executable, args]);
+    args = ["/c", executable]..addAll(args);
     executable = "cmd";
   }
 
@@ -858,12 +857,13 @@
 ///
 /// Returns a future that completes to the value that the future returned from
 /// [fn] completes to.
-Future withTempDir(Future fn(String path)) {
-  return new Future.sync(() {
-    var tempDir = createSystemTempDir();
-    return new Future.sync(() => fn(tempDir))
-        .whenComplete(() => deleteEntry(tempDir));
-  });
+Future/*<T>*/ withTempDir/*<T>*/(Future/*<T>*/ fn(String path)) async {
+  var tempDir = createSystemTempDir();
+  try {
+    return await fn(tempDir);
+  } finally {
+    deleteEntry(tempDir);
+  }
 }
 
 /// Binds an [HttpServer] to [host] and [port].
@@ -995,8 +995,8 @@
 /// working directory.
 ///
 /// Returns a [ByteStream] that emits the contents of the archive.
-ByteStream createTarGz(List contents, {baseDir}) {
-  return new ByteStream(futureStream(new Future.sync(() async {
+ByteStream createTarGz(List contents, {String baseDir}) {
+  return new ByteStream(StreamCompleter.fromFuture(new Future.sync(() async {
     var buffer = new StringBuffer();
     buffer.write('Creating .tag.gz stream containing:\n');
     contents.forEach((file) => buffer.write('$file\n'));
diff --git a/lib/src/lock_file.dart b/lib/src/lock_file.dart
index b00bfdb..3a19c2f 100644
--- a/lib/src/lock_file.dart
+++ b/lib/src/lock_file.dart
@@ -76,32 +76,33 @@
     var parsed = loadYamlNode(contents, sourceUrl: sourceUrl);
 
     _validate(parsed is Map, 'The lockfile must be a YAML mapping.', parsed);
+    var parsedMap = parsed as YamlMap;
 
     var dartSdkConstraint = VersionConstraint.any;
     VersionConstraint flutterSdkConstraint;
-    var sdkNode = parsed.nodes['sdk'];
+    var sdkNode = parsedMap.nodes['sdk'];
     if (sdkNode != null) {
       // Lockfiles produced by pub versions from 1.14.0 through 1.18.0 included
       // a top-level "sdk" field which encoded the unified constraint on the
       // Dart SDK. They had no way of specifying constraints on other SDKs.
       dartSdkConstraint = _parseVersionConstraint(sdkNode);
-    } else if ((parsed as Map).containsKey('sdks')) {
-      var sdksField = parsed['sdks'];
+    } else if (parsedMap.containsKey('sdks')) {
+      var sdksField = parsedMap['sdks'];
       _validate(
           sdksField is Map,
           'The "sdks" field must be a mapping.',
-          parsed.nodes['sdks']);
+          parsedMap.nodes['sdks']);
 
       dartSdkConstraint = _parseVersionConstraint(sdksField.nodes['dart']);
       flutterSdkConstraint =
           _parseVersionConstraint(sdksField.nodes['flutter']);
     }
 
-    var packages = {};
-    var packageEntries = parsed['packages'];
+    var packages = <String, PackageId>{};
+    var packageEntries = parsedMap['packages'];
     if (packageEntries != null) {
       _validate(packageEntries is Map, 'The "packages" field must be a map.',
-          parsed.nodes['packages']);
+          parsedMap.nodes['packages']);
 
       packageEntries.forEach((name, spec) {
         // Parse the version.
@@ -182,7 +183,7 @@
   LockFile setPackage(PackageId id) {
     if (id.isRoot) return this;
 
-    var packages = new Map.from(this.packages);
+    var packages = new Map<String, PackageId>.from(this.packages);
     packages[id.name] = id;
     return new LockFile._(packages, dartSdkConstraint, flutterSdkConstraint);
   }
@@ -193,7 +194,7 @@
   LockFile removePackage(String name) {
     if (!this.packages.containsKey(name)) return this;
 
-    var packages = new Map.from(this.packages);
+    var packages = new Map<String, PackageId>.from(this.packages);
     packages.remove(name);
     return new LockFile._(packages, dartSdkConstraint, flutterSdkConstraint);
   }
@@ -205,7 +206,8 @@
   String packagesFile(SystemCache cache, [String entrypoint]) {
     var header = "Generated by pub on ${new DateTime.now()}.";
 
-    var map = new Map.fromIterable(ordered(packages.keys), value: (name) {
+    var map = new Map<String, Uri>.fromIterable(ordered(packages.keys),
+        value: (name) {
       var id = packages[name];
       var source = cache.source(id.source);
       return p.toUri(p.join(source.getDirectory(id), "lib"));
diff --git a/lib/src/log.dart b/lib/src/log.dart
index a235f05..0bbf6e0 100644
--- a/lib/src/log.dart
+++ b/lib/src/log.dart
@@ -399,7 +399,8 @@
 /// [progress]) that cancels the progress animation, although the total time
 /// will still be printed once it finishes. If [fine] is passed, the progress
 /// information will only be visible at [Level.FINE].
-Future progress(String message, Future callback(), {bool fine: false}) {
+Future/*<T>*/ progress/*<T>*/(String message, Future/*<T>*/ callback(),
+    {bool fine: false}) {
   _stopProgress();
 
   var progress = new Progress(message, fine: fine);
diff --git a/lib/src/package.dart b/lib/src/package.dart
index d7a87c4..d39c3d9 100644
--- a/lib/src/package.dart
+++ b/lib/src/package.dart
@@ -60,8 +60,8 @@
   /// All immediate dependencies this package specifies.
   ///
   /// This includes regular, dev dependencies, and overrides.
-  Set<PackageDep> get immediateDependencies {
-    var deps = {};
+  List<PackageDep> get immediateDependencies {
+    var deps = <String, PackageDep>{};
 
     addToMap(dep) {
       deps[dep.name] = dep;
@@ -73,7 +73,7 @@
     // Make sure to add these last so they replace normal dependencies.
     dependencyOverrides.forEach(addToMap);
 
-    return deps.values.toSet();
+    return deps.values.toList();
   }
 
   /// Returns a list of asset ids for all Dart executables in this package's bin
@@ -223,7 +223,7 @@
     // readability than most code in pub. In particular, it avoids using the
     // path package, since re-parsing a path is very expensive relative to
     // string operations.
-    var files;
+    Iterable<String> files;
     if (useGitIgnore && _inGitRepo) {
       // Later versions of git do not allow a path for ls-files that appears to
       // be outside of the repo, so make sure we give it a relative path.
@@ -439,8 +439,8 @@
 
   /// Creates an ID for a magic package (see [isMagic]).
   PackageId.magic(String name)
-      : super._magic(name),
-        version = Version.none;
+    : version = Version.none,
+      super._magic(name);
 
   /// Creates an ID for the given root package.
   PackageId.root(Package package)
@@ -473,8 +473,8 @@
       : super._(name, source, description);
 
   PackageDep.magic(String name)
-      : super._magic(name),
-        constraint = Version.none;
+      : constraint = Version.none,
+        super._magic(name);
 
   String toString() {
     if (isRoot) return "$name $constraint (root)";
diff --git a/lib/src/package_graph.dart b/lib/src/package_graph.dart
index f6b4a93..9e0959e 100644
--- a/lib/src/package_graph.dart
+++ b/lib/src/package_graph.dart
@@ -2,13 +2,14 @@
 // for details. All rights reserved. Use of this source code is governed by a
 // BSD-style license that can be found in the LICENSE file.
 
+import 'package:collection/collection.dart';
+
 import 'barback/transformer_cache.dart';
 import 'entrypoint.dart';
 import 'lock_file.dart';
 import 'package.dart';
 import 'solver/version_solver.dart';
 import 'source/cached.dart';
-import 'utils.dart';
 
 /// A holistic view of the entire transitive dependency graph for an entrypoint.
 class PackageGraph {
@@ -42,7 +43,7 @@
   /// the packages' pubspecs are already fully-parsed.
   factory PackageGraph.fromSolveResult(Entrypoint entrypoint,
       SolveResult result) {
-    var packages = new Map.fromIterable(result.packages,
+    var packages = new Map<String, Package>.fromIterable(result.packages,
         key: (id) => id.name,
         value: (id) {
       if (id.name == entrypoint.root.name) return entrypoint.root;
@@ -78,10 +79,19 @@
     if (package == entrypoint.root.name) return packages.values.toSet();
 
     if (_transitiveDependencies == null) {
-      var closure = transitiveClosure(mapMap(packages,
-          value: (_, package) => package.dependencies.map((dep) => dep.name)));
-      _transitiveDependencies = mapMap(closure,
-          value: (_, names) => names.map((name) => packages[name]).toSet());
+      var closure = transitiveClosure(
+          mapMap/*<String, Package, String, Iterable<String>>*/(
+              packages,
+              value: (_, package) =>
+                  package.dependencies.map((dep) => dep.name)));
+      _transitiveDependencies =
+          mapMap/*<String, Set<String>, String, Set<Package>>*/(
+              closure,
+              value: (depender, names) {
+                var set = names.map((name) => packages[name]).toSet();
+                set.add(packages[depender]);
+                return set;
+              });
     }
 
     return _transitiveDependencies[package];
diff --git a/lib/src/pubspec.dart b/lib/src/pubspec.dart
index 254c188..2186cde 100644
--- a/lib/src/pubspec.dart
+++ b/lib/src/pubspec.dart
@@ -151,7 +151,7 @@
           fields.nodes['transformers'].span);
     }
 
-    _transformers = transformers.nodes.map((phase) {
+    _transformers = (transformers as YamlList).nodes.map((phase) {
       var phaseNodes = phase is YamlList ? phase.nodes : [phase];
       return phaseNodes.map((transformerNode) {
         var transformer = transformerNode.value;
@@ -428,14 +428,17 @@
   factory Pubspec.parse(String contents, SourceRegistry sources,
       {String expectedName, Uri location}) {
     var pubspecNode = loadYamlNode(contents, sourceUrl: location);
+    Map pubspecMap;
     if (pubspecNode is YamlScalar && pubspecNode.value == null) {
-      pubspecNode = new YamlMap(sourceUrl: location);
-    } else if (pubspecNode is! YamlMap) {
+      pubspecMap = new YamlMap(sourceUrl: location);
+    } else if (pubspecNode is YamlMap) {
+      pubspecMap = pubspecNode;
+    } else {
       throw new PubspecException(
           'The pubspec must be a YAML mapping.', pubspecNode.span);
     }
 
-    return new Pubspec.fromMap(pubspecNode, sources,
+    return new Pubspec.fromMap(pubspecMap, sources,
         expectedName: expectedName, location: location);
   }
 
diff --git a/lib/src/solver/backtracking_solver.dart b/lib/src/solver/backtracking_solver.dart
index 66d4d2e..4735aad 100644
--- a/lib/src/solver/backtracking_solver.dart
+++ b/lib/src/solver/backtracking_solver.dart
@@ -139,7 +139,7 @@
 
   /// Creates [_implicitPubspec].
   static Pubspec _makeImplicitPubspec(SystemCache systemCache) {
-    var dependencies = [];
+    var dependencies = <PackageDep>[];
     barback.pubConstraints.forEach((name, constraint) {
       dependencies.add(
           systemCache.sources.hosted.refFor(name)
@@ -174,7 +174,7 @@
       logSolve();
       var packages = await _solve();
 
-      var pubspecs = {};
+      var pubspecs = <String, Pubspec>{};
       for (var id in packages) {
         pubspecs[id.name] = await _getPubspec(id);
       }
@@ -204,17 +204,14 @@
   /// because we weren't trying to upgrade it, we will just know the current
   /// version.
   Map<String, List<Version>> _getAvailableVersions(List<PackageId> packages) {
-    var availableVersions = new Map<String, List<Version>>();
+    var availableVersions = <String, List<Version>>{};
     for (var package in packages) {
       var cached = cache.getCachedVersions(package.toRef());
-      var versions;
-      if (cached != null) {
-        versions = cached.map((id) => id.version).toList();
-      } else {
-        // If the version list was never requested, just use the one known
-        // version.
-        versions = [package.version];
-      }
+      // If the version list was never requested, just use the one known
+      // version.
+      var versions = cached == null
+          ? [package.version]
+          : cached.map((id) => id.version).toList();
 
       availableVersions[package.name] = versions;
     }
diff --git a/lib/src/solver/solve_report.dart b/lib/src/solver/solve_report.dart
index 367d963..20f40a9 100644
--- a/lib/src/solver/solve_report.dart
+++ b/lib/src/solver/solve_report.dart
@@ -104,9 +104,9 @@
     removed.removeAll(names);
     if (removed.isNotEmpty) {
       _output.writeln("These packages are no longer being depended on:");
-      removed = removed.toList();
-      removed.sort();
-      removed.forEach((name) => _reportPackage(name, alwaysShow: true));
+      for (var name in ordered(removed)) {
+        _reportPackage(name, alwaysShow: true);
+      }
     }
 
     log.message(_output);
diff --git a/lib/src/solver/version_solver.dart b/lib/src/solver/version_solver.dart
index 3c4553c..2de19f4 100644
--- a/lib/src/solver/version_solver.dart
+++ b/lib/src/solver/version_solver.dart
@@ -210,7 +210,7 @@
     _versionCacheMisses++;
 
     var source = _cache.source(package.source);
-    var ids;
+    List<PackageId> ids;
     try {
       ids = await source.getVersions(package);
     } catch (error, stackTrace) {
@@ -344,8 +344,8 @@
   final String _message;
 
   BadSdkVersionException(String package, String message)
-      : super(package, null),
-        _message = message;
+      : _message = message,
+        super(package, null);
 }
 
 /// Exception thrown when the [VersionConstraint] used to match a package is
diff --git a/lib/src/source.dart b/lib/src/source.dart
index 6263d7f..508dad1 100644
--- a/lib/src/source.dart
+++ b/lib/src/source.dart
@@ -60,7 +60,7 @@
   /// package during version solving.
   ///
   /// Defaults to `false`.
-  final bool hasMultipleVersions = false;
+  bool get hasMultipleVersions => false;
 
   /// Records the system cache to which this source belongs.
   ///
diff --git a/lib/src/source/git.dart b/lib/src/source/git.dart
index 90f4177..b246f36 100644
--- a/lib/src/source/git.dart
+++ b/lib/src/source/git.dart
@@ -156,11 +156,10 @@
   /// package.
   Future<String> getPackageNameFromRepo(String repo) {
     // Clone the repo to a temp directory.
-    return withTempDir((tempDir) {
-      return _clone(repo, tempDir, shallow: true).then((_) {
-        var pubspec = new Pubspec.load(tempDir, systemCache.sources);
-        return pubspec.name;
-      });
+    return withTempDir((tempDir) async {
+      await _clone(repo, tempDir, shallow: true);
+      var pubspec = new Pubspec.load(tempDir, systemCache.sources);
+      return pubspec.name;
     });
   }
 
@@ -249,8 +248,8 @@
   Future<Pair<List<PackageId>, List<PackageId>>> repairCachedPackages() async {
     if (!dirExists(systemCacheRoot)) return new Pair([], []);
 
-    var successes = [];
-    var failures = [];
+    var successes = <PackageId>[];
+    var failures = <PackageId>[];
 
     var packages = listDir(systemCacheRoot)
         .where((entry) => dirExists(path.join(entry, ".git")))
diff --git a/lib/src/source/hosted.dart b/lib/src/source/hosted.dart
index ddee56b..5c2b76b 100644
--- a/lib/src/source/hosted.dart
+++ b/lib/src/source/hosted.dart
@@ -212,8 +212,8 @@
   Future<Pair<List<PackageId>, List<PackageId>>> repairCachedPackages() async {
     if (!dirExists(systemCacheRoot)) return new Pair([], []);
 
-    var successes = [];
-    var failures = [];
+    var successes = <PackageId>[];
+    var failures = <PackageId>[];
 
     for (var serverDir in listDir(systemCacheRoot)) {
       var url = _directoryToUrl(p.basename(serverDir));
diff --git a/lib/src/system_cache.dart b/lib/src/system_cache.dart
index 0580547..0ad1541 100644
--- a/lib/src/system_cache.dart
+++ b/lib/src/system_cache.dart
@@ -101,11 +101,8 @@
   bool contains(PackageId id) {
     var source = this.source(id.source);
 
-    if (source is! CachedSource) {
-      throw new ArgumentError("Package $id is not cacheable.");
-    }
-
-    return source.isInSystemCache(id);
+    if (source is CachedSource) return source.isInSystemCache(id);
+    throw new ArgumentError("Package $id is not cacheable.");
   }
 
   /// Create a new temporary directory within the system cache.
diff --git a/lib/src/utils.dart b/lib/src/utils.dart
index 5d35fd2..2bead98 100644
--- a/lib/src/utils.dart
+++ b/lib/src/utils.dart
@@ -55,47 +55,6 @@
   int get hashCode => first.hashCode ^ last.hashCode;
 }
 
-/// A completer that waits until all added [Future]s complete.
-// TODO(rnystrom): Copied from web_components. Remove from here when it gets
-// added to dart:core. (See #6626.)
-class FutureGroup<T> {
-  int _pending = 0;
-  Completer<List<T>> _completer = new Completer<List<T>>();
-  final List<Future<T>> futures = <Future<T>>[];
-  bool completed = false;
-
-  final List<T> _values = <T>[];
-
-  /// Wait for [task] to complete.
-  Future<T> add(Future<T> task) {
-    if (completed) {
-      throw new StateError("The FutureGroup has already completed.");
-    }
-
-    _pending++;
-    futures.add(task.then((value) {
-      if (completed) return;
-
-      _pending--;
-      _values.add(value);
-
-      if (_pending <= 0) {
-        completed = true;
-        _completer.complete(_values);
-      }
-    }).catchError((e, stackTrace) {
-      if (completed) return;
-
-      completed = true;
-      _completer.completeError(e, stackTrace);
-    }));
-
-    return task;
-  }
-
-  Future<List> get future => _completer.future;
-}
-
 /// Like [new Future], but avoids around issue 11911 by using [new Future.value]
 /// under the covers.
 Future newFuture(callback()) => new Future.value().then((_) => callback());
@@ -145,7 +104,7 @@
 /// only returns once all Futures have completed, successfully or not.
 ///
 /// This will wrap the first error thrown in a [SilentException] and rethrow it.
-Future waitAndPrintErrors(Iterable<Future> futures) {
+Future<List/*<T>*/> waitAndPrintErrors/*<T>*/(Iterable<Future/*<T>*/> futures) {
   return Future.wait(futures.map((future) {
     return future.catchError((error, stackTrace) {
       log.exception(error, stackTrace);
@@ -160,8 +119,8 @@
 /// completes.
 ///
 /// The stream will be passed through unchanged.
-StreamTransformer onDoneTransformer(void onDone()) {
-  return new StreamTransformer.fromHandlers(handleDone: (sink) {
+StreamTransformer/*<T, T>*/ onDoneTransformer/*<T>*/(void onDone()) {
+  return new StreamTransformer/*<T, T>*/.fromHandlers(handleDone: (sink) {
     onDone();
     sink.close();
   });
@@ -274,23 +233,6 @@
   }
 }
 
-/// Flattens nested lists inside an iterable into a single list containing only
-/// non-list elements.
-List flatten(Iterable nested) {
-  var result = [];
-  helper(list) {
-    for (var element in list) {
-      if (element is List) {
-        helper(element);
-      } else {
-        result.add(element);
-      }
-    }
-  }
-  helper(nested);
-  return result;
-}
-
 /// Randomly chooses a single element in [elements].
 /*=T*/ choose/*<T>*/(List/*<T>*/ elements) =>
     elements[random.nextInt(elements.length)];
@@ -312,7 +254,7 @@
 }
 
 /// Returns a list containing the sorted elements of [iter].
-List ordered(Iterable<Comparable> iter) {
+List/*<T>*/ ordered/*<T extends Comparable<T>>*/(Iterable/*<T>*/ iter) {
   var list = iter.toList();
   list.sort();
   return list;
@@ -346,84 +288,13 @@
   });
 }
 
-/// Creates a new map from [map] with new keys and values.
-///
-/// The return values of [key] are used as the keys and the return values of
-/// [value] are used as the values for the new map.
-///
-/// [key] defaults to returning the original key and [value] defaults to
-/// returning the original value.
-Map mapMap(Map map, {key(key, value), value(key, value)}) {
-  if (key == null) key = (key, _) => key;
-  if (value == null) value = (_, value) => value;
-
-  var result = {};
-  map.forEach((mapKey, mapValue) {
-    result[key(mapKey, mapValue)] = value(mapKey, mapValue);
-  });
-  return result;
-}
-
-/// Like [Map.fromIterable], but [key] and [value] may return [Future]s.
-Future<Map> mapFromIterableAsync(Iterable iter, {key(element),
-    value(element)}) {
-  if (key == null) key = (element) => element;
-  if (value == null) value = (element) => element;
-
-  var map = new Map();
-  return Future.wait(iter.map((element) {
-    return Future.wait([
-      new Future.sync(() => key(element)),
-      new Future.sync(() => value(element))
-    ]).then((results) {
-      map[results[0]] = results[1];
-    });
-  })).then((_) => map);
-}
-
-/// Returns a new map with all entries in both [map1] and [map2].
-///
-/// If there are overlapping keys, [map2]'s value wins.
-Map mergeMaps(Map map1, Map map2) {
-  var result = {};
-  result.addAll(map1);
-  result.addAll(map2);
-  return result;
-}
-
-/// Returns the transitive closure of [graph].
-///
-/// This assumes [graph] represents a graph with a vertex for each key and an
-/// edge betweek each key and the values for that key.
-Map<dynamic, Set> transitiveClosure(Map<dynamic, Iterable> graph) {
-  // This uses the Floyd-Warshall algorithm
-  // (https://en.wikipedia.org/wiki/Floyd%E2%80%93Warshall_algorithm).
-  var result = {};
-  graph.forEach((vertex, edges) {
-    result[vertex] = new Set.from(edges)..add(vertex);
-  });
-
-  for (var vertex1 in graph.keys) {
-    for (var vertex2 in graph.keys) {
-      for (var vertex3 in graph.keys) {
-        if (result[vertex2].contains(vertex1) &&
-            result[vertex1].contains(vertex3)) {
-          result[vertex2].add(vertex3);
-        }
-      }
-    }
-  }
-
-  return result;
-}
-
 /// Given a list of filenames, returns a set of patterns that can be used to
 /// filter for those filenames.
 ///
 /// For a given path, that path ends with some string in the returned set if
 /// and only if that path's basename is in [files].
 Set<String> createFileFilter(Iterable<String> files) {
-  return files.expand((file) {
+  return files.expand/*<String>*/((file) {
     var result = ["/$file"];
     if (Platform.operatingSystem == 'windows') result.add("\\$file");
     return result;
@@ -436,7 +307,7 @@
 /// For a given path, that path contains some string in the returned set if
 /// and only if one of that path's components is in [dirs].
 Set<String> createDirectoryFilter(Iterable<String> dirs) {
-  return dirs.expand((dir) {
+  return dirs.expand/*<String>*/((dir) {
     var result = ["/$dir/"];
     if (Platform.operatingSystem == 'windows') {
       result..add("/$dir\\")..add("\\$dir/")..add("\\$dir\\");
@@ -533,7 +404,7 @@
 /// emitting the same values and errors as [stream], but only if at least one
 /// value can be read successfully. If an error occurs before any values are
 /// emitted, the returned Future completes to that error.
-Future<Stream> validateStream(Stream stream) {
+Future<Stream/*<T>*/> validateStream/*<T>*/(Stream/*<T>*/ stream) {
   var completer = new Completer<Stream>();
   var controller = new StreamController(sync: true);
 
@@ -594,66 +465,6 @@
   return new Pair<Stream, StreamSubscription>(controller.stream, subscription);
 }
 
-// TODO(nweiz): remove this when issue 7787 is fixed.
-/// Creates two single-subscription [Stream]s that each emit all values and
-/// errors from [stream].
-///
-/// This is useful if [stream] is single-subscription but multiple subscribers
-/// are necessary.
-Pair<Stream, Stream> tee(Stream stream) {
-  var controller1 = new StreamController(sync: true);
-  var controller2 = new StreamController(sync: true);
-  stream.listen((value) {
-    controller1.add(value);
-    controller2.add(value);
-  }, onError: (error, [stackTrace]) {
-    controller1.addError(error, stackTrace);
-    controller2.addError(error, stackTrace);
-  }, onDone: () {
-    controller1.close();
-    controller2.close();
-  });
-  return new Pair<Stream, Stream>(controller1.stream, controller2.stream);
-}
-
-/// Merges [stream1] and [stream2] into a single stream that emits events from
-/// both sources.
-Stream mergeStreams(Stream stream1, Stream stream2) {
-  var doneCount = 0;
-  var controller = new StreamController(sync: true);
-
-  for (var stream in [stream1, stream2]) {
-    stream.listen(
-        controller.add,
-        onError: controller.addError,
-        onDone: () {
-      doneCount++;
-      if (doneCount == 2) controller.close();
-    });
-  }
-
-  return controller.stream;
-}
-
-/// Returns a [Stream] that will emit the same values as the stream returned by
-/// [callback].
-///
-/// [callback] will only be called when the returned [Stream] gets a subscriber.
-Stream callbackStream(Stream callback()) {
-  var subscription;
-  var controller;
-  controller = new StreamController(onListen: () {
-    subscription = callback().listen(controller.add,
-        onError: controller.addError,
-        onDone: controller.close);
-  },
-      onCancel: () => subscription.cancel(),
-      onPause: () => subscription.pause(),
-      onResume: () => subscription.resume(),
-      sync: true);
-  return controller.stream;
-}
-
 /// A regular expression matching a trailing CR character.
 final _trailingCR = new RegExp(r"\r$");
 
@@ -728,7 +539,7 @@
 /// Convert a URL query string (or `application/x-www-form-urlencoded` body)
 /// into a [Map] from parameter names to values.
 Map<String, String> queryToMap(String queryList) {
-  var map = {};
+  var map = <String, String>{};
   for (var pair in queryList.split("&")) {
     var split = split1(pair, "=");
     if (split.isEmpty) continue;
@@ -755,7 +566,7 @@
 }
 
 /// Returns the union of all elements in each set in [sets].
-Set unionAll(Iterable<Set> sets) =>
+Set/*<T>*/ unionAll/*<T>*/(Iterable<Set/*<T>*/> sets) =>
   sets.fold(new Set(), (union, set) => union.union(set));
 
 // TODO(nweiz): remove this when issue 9068 has been fixed.
@@ -795,13 +606,12 @@
 
   // If we're using verbose logging, be more verbose but more accurate when
   // reporting timing information.
-  if (log.verbosity.isLevelVisible(log.Level.FINE)) {
-    ms = padLeft(ms.toString(), 3, '0');
-  } else {
-    ms ~/= 100;
-  }
+  var msString = log.verbosity.isLevelVisible(log.Level.FINE)
+      ? padLeft(ms.toString(), 3, '0')
+      : (ms ~/ 100).toString();
 
-  return "$result${hasMinutes ? padLeft(s.toString(), 2, '0') : s}.${ms}s";
+  return "$result${hasMinutes ? padLeft(s.toString(), 2, '0') : s}"
+      ".${msString}s";
 }
 
 /// Decodes a URL-encoded string.
@@ -815,26 +625,27 @@
 /// within.
 ///
 /// Completes with the fully resolved structure.
-Future awaitObject(object) {
+Future/*<T>*/ awaitObject/*<T>*/(/*=T*/ object) async {
   // Unroll nested futures.
-  if (object is Future) return object.then(awaitObject);
-  if (object is Iterable) {
-    return Future.wait(object.map(awaitObject).toList());
-  }
-  if (object is! Map) return new Future.value(object);
+  if (object is Future) return await awaitObject(await object);
 
-  var pairs = <Future<Pair>>[];
-  object.forEach((key, value) {
-    pairs.add(awaitObject(value)
-        .then((resolved) => new Pair(key, resolved)));
-  });
-  return Future.wait(pairs).then((resolvedPairs) {
-    var map = {};
-    for (var pair in resolvedPairs) {
-      map[pair.first] = pair.last;
-    }
-    return map;
-  });
+  if (object is Iterable) {
+    // TODO(nweiz): Remove the unnecessary as check when sdk#26965 is fixed.
+    return await Future.wait((object as Iterable).map(awaitObject))
+        as List/*=T*/;
+  }
+
+  if (object is Map) {
+    // TODO(nweiz): Remove the unnecessary as check when sdk#26965 is fixed.
+    var oldMap = object as Map;
+    var newMap = {};
+    await Future.wait(oldMap.keys.map((key) async {
+      newMap[key] = await awaitObject(await oldMap[key]);
+    }));
+    return newMap as Map/*=T*/;
+  }
+
+  return object;
 }
 
 /// Whether "special" strings such as Unicode characters or color escapes are
diff --git a/lib/src/validator.dart b/lib/src/validator.dart
index 57b106c..b2b925d 100644
--- a/lib/src/validator.dart
+++ b/lib/src/validator.dart
@@ -75,10 +75,8 @@
 
     return Future.wait(validators.map((validator) => validator.validate()))
       .then((_) {
-      var errors =
-          flatten(validators.map((validator) => validator.errors));
-      var warnings =
-          flatten(validators.map((validator) => validator.warnings));
+      var errors = validators.expand((validator) => validator.errors);
+      var warnings = validators.expand((validator) => validator.warnings);
 
       if (!errors.isEmpty) {
         log.error("Missing requirements:");
diff --git a/lib/src/validator/dependency.dart b/lib/src/validator/dependency.dart
index a4fdf63..38451ef 100644
--- a/lib/src/validator/dependency.dart
+++ b/lib/src/validator/dependency.dart
@@ -36,23 +36,24 @@
     : super(entrypoint);
 
   Future validate() async {
-    var caretDeps = [];
+    var caretDeps = <PackageDep>[];
 
     for (var dependency in entrypoint.root.pubspec.dependencies) {
+      var constraint = dependency.constraint;
       if (dependency.source is! HostedSource) {
         await _warnAboutSource(dependency);
-      } else if (dependency.constraint.isAny) {
+      } else if (constraint.isAny) {
         _warnAboutNoConstraint(dependency);
-      } else if (dependency.constraint is Version) {
+      } else if (constraint is Version) {
         _warnAboutSingleVersionConstraint(dependency);
-      } else if (dependency.constraint is VersionRange) {
-        if (dependency.constraint.min == null) {
+      } else if (constraint is VersionRange) {
+        if (constraint.min == null) {
           _warnAboutNoConstraintLowerBound(dependency);
-        } else if (dependency.constraint.max == null) {
+        } else if (constraint.max == null) {
           _warnAboutNoConstraintUpperBound(dependency);
         }
 
-        if (dependency.constraint.toString().startsWith("^")) {
+        if (constraint.toString().startsWith("^")) {
           caretDeps.add(dependency);
         }
       }
@@ -65,7 +66,7 @@
 
   /// Warn that dependencies should use the hosted source.
   Future _warnAboutSource(PackageDep dep) async {
-    var versions;
+    List<Version> versions;
     try {
       var ids = await entrypoint.cache.hosted
           .getVersions(entrypoint.cache.sources.hosted.refFor(dep.name));
diff --git a/lib/src/validator/sdk_constraint.dart b/lib/src/validator/sdk_constraint.dart
index aa57baa..479a1dd 100644
--- a/lib/src/validator/sdk_constraint.dart
+++ b/lib/src/validator/sdk_constraint.dart
@@ -21,7 +21,8 @@
 
   Future validate() async {
     var dartConstraint = entrypoint.root.pubspec.dartSdkConstraint;
-    if (dartConstraint.toString().startsWith("^")) {
+    if (dartConstraint is VersionRange &&
+        dartConstraint.toString().startsWith("^")) {
       errors.add(
           "^ version constraints aren't allowed for SDK constraints since "
             "older versions of pub don't support them.\n"
diff --git a/pubspec.yaml b/pubspec.yaml
index 849843c..d1527a5 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -7,8 +7,10 @@
   # Note: Pub's test infrastructure assumes that any dependencies used in tests
   # will be hosted dependencies.
   analyzer: ">=0.25.0 <0.28.0"
-  args: "^0.13.0"
+  args: "^0.13.5"
+  async: "^1.5.0"
   barback: "^0.15.2"
+  collection: "^1.8.0"
   compiler_unsupported: "^1.13.0-dev"
   crypto: ">=1.0.0 <3.0.0"
   glob: "^1.0.0"
diff --git a/test/dependency_computer/utils.dart b/test/dependency_computer/utils.dart
index fc4aa5c..8f626ff 100644
--- a/test/dependency_computer/utils.dart
+++ b/test/dependency_computer/utils.dart
@@ -3,7 +3,10 @@
 // BSD-style license that can be found in the LICENSE file.
 
 import 'package:barback/barback.dart';
+import 'package:collection/collection.dart';
 import 'package:path/path.dart' as p;
+import 'package:scheduled_test/scheduled_test.dart';
+
 import 'package:pub/src/barback/cycle_exception.dart';
 import 'package:pub/src/barback/dependency_computer.dart';
 import 'package:pub/src/entrypoint.dart';
@@ -12,7 +15,6 @@
 import 'package:pub/src/package_graph.dart';
 import 'package:pub/src/system_cache.dart';
 import 'package:pub/src/utils.dart';
-import 'package:scheduled_test/scheduled_test.dart';
 
 import '../test_pub.dart';
 
diff --git a/test/descriptor.dart b/test/descriptor.dart
index de57c14..d4855be 100644
--- a/test/descriptor.dart
+++ b/test/descriptor.dart
@@ -56,7 +56,7 @@
 /// Describes a file named `pubspec.yaml` for an application package with the
 /// given [dependencies].
 Descriptor appPubspec([Map dependencies]) {
-  var map = {"name": "myapp"};
+  var map = <String, dynamic>{"name": "myapp"};
   if (dependencies != null) map["dependencies"] = dependencies;
   return pubspec(map);
 }
@@ -201,5 +201,5 @@
 /// entries (one per key in [dependencies]), each with a path that contains
 /// either the version string (for a reference to the pub cache) or a
 /// path to a path dependency, relative to the application directory.
-Descriptor packagesFile([Map dependencies]) =>
+Descriptor packagesFile([Map<String, String> dependencies]) =>
   new PackagesFileDescriptor(dependencies);
diff --git a/test/descriptor/git.dart b/test/descriptor/git.dart
index 9e9656b..0a1687c 100644
--- a/test/descriptor/git.dart
+++ b/test/descriptor/git.dart
@@ -42,9 +42,12 @@
   /// referred to by [ref] at the current point in the scheduled test run.
   ///
   /// [parent] defaults to [defaultRoot].
-  Future<String> revParse(String ref, [String parent]) => schedule(() {
-    return _runGit(['rev-parse', ref], parent).then((output) => output[0]);
-  }, 'parsing revision $ref for Git repo:\n${describe()}');
+  Future<String> revParse(String ref, [String parent]) {
+    return schedule/*<Future<String>>*/(() async {
+      var output = await _runGit(['rev-parse', ref], parent);
+      return output[0];
+    }, 'parsing revision $ref for Git repo:\n${describe()}');
+  }
 
   /// Schedule a Git command to run in this repository.
   ///
@@ -53,8 +56,11 @@
     return _runGit(args, parent);
   }, "running 'git ${args.join(' ')}' in Git repo:\n${describe()}");
 
-  Future _runGitCommands(String parent, List<List<String>> commands) =>
-      Future.forEach(commands, (command) => _runGit(command, parent));
+  Future _runGitCommands(String parent, List<List<String>> commands) async {
+    for (var command in commands) {
+      await _runGit(command, parent);
+    }
+  }
 
   Future<List<String>> _runGit(List<String> args, String parent) {
     // Explicitly specify the committer information. Git needs this to commit
diff --git a/test/descriptor/packages.dart b/test/descriptor/packages.dart
index c22f5a7..a7470c6 100644
--- a/test/descriptor/packages.dart
+++ b/test/descriptor/packages.dart
@@ -35,7 +35,7 @@
     if (parent == null) parent = defaultRoot;
     var contents = const <int>[];
     if (_dependencies != null) {
-      var mapping = {};
+      var mapping = <String, Uri>{};
       _dependencies.forEach((package, version) {
         var packagePath;
         if (_semverRE.hasMatch(version)) {
diff --git a/test/descriptor/tar.dart b/test/descriptor/tar.dart
index 2258310..18c34e6 100644
--- a/test/descriptor/tar.dart
+++ b/test/descriptor/tar.dart
@@ -18,23 +18,24 @@
 
   /// Creates the files and directories within this tar file, then archives
   /// them, compresses them, and saves the result to [parentDir].
-  Future<String> create([String parent]) => schedule(() {
-    if (parent == null) parent = defaultRoot;
-    return withTempDir((tempDir) {
-      return Future.wait(contents.map((entry) {
-        return entry.create(tempDir);
-      })).then((_) {
+  Future<String> create([String parent]) {
+    return schedule/*<Future<String>>*/(() async {
+      if (parent == null) parent = defaultRoot;
+      return await withTempDir((tempDir) async {
+        await Future.wait(contents.map((entry) => entry.create(tempDir)));
+
         var createdContents = listDir(tempDir,
             recursive: true,
             includeHidden: true);
-        return createTarGz(createdContents, baseDir: tempDir).toBytes();
-      }).then((bytes) {
+        var bytes = await createTarGz(createdContents, baseDir: tempDir)
+            .toBytes();
+
         var file = path.join(parent, name);
         writeBinaryFile(file, bytes);
         return file;
       });
-    });
-  }, 'creating tar file:\n${describe()}');
+    }, 'creating tar file:\n${describe()}');
+  }
 
   /// Validates that the `.tar.gz` file at [path] contains the expected
   /// contents.
diff --git a/test/implicit_barback_dependency_test.dart b/test/implicit_barback_dependency_test.dart
index b8dbfaf..23954cf 100644
--- a/test/implicit_barback_dependency_test.dart
+++ b/test/implicit_barback_dependency_test.dart
@@ -18,6 +18,7 @@
 
   var sourceSpanVersion = barback.pubConstraints["source_span"].min.toString();
   var stackTraceVersion = barback.pubConstraints["stack_trace"].min.toString();
+  var asyncVersion = barback.pubConstraints["async"].min.toString();
 
   forBothPubGetAndUpgrade((command) {
     integration("implicitly constrains barback to versions pub supports", () {
@@ -28,6 +29,7 @@
         builder.serve("barback", max);
         builder.serve("source_span", sourceSpanVersion);
         builder.serve("stack_trace", stackTraceVersion);
+        builder.serve("async", asyncVersion);
       });
 
       d.appDir({
@@ -49,6 +51,7 @@
         builder.serve("barback", max);
         builder.serve("source_span", sourceSpanVersion);
         builder.serve("stack_trace", stackTraceVersion);
+        builder.serve("async", asyncVersion);
       });
 
       d.dir("foo", [
@@ -75,6 +78,7 @@
       servePackages((builder) {
         builder.serve("source_span", sourceSpanVersion);
         builder.serve("stack_trace", stackTraceVersion);
+        builder.serve("async", asyncVersion);
       });
 
       d.dir('barback', [
@@ -105,6 +109,7 @@
       builder.serve("barback", current);
       builder.serve("source_span", sourceSpanVersion);
       builder.serve("stack_trace", stackTraceVersion);
+      builder.serve("async", asyncVersion);
     });
 
     d.appDir({"barback": "any"}).create();
@@ -128,6 +133,7 @@
       builder.serve("barback", previous);
       builder.serve("source_span", sourceSpanVersion);
       builder.serve("stack_trace", stackTraceVersion);
+      builder.serve("async", asyncVersion);
     });
 
     d.appDir({"barback": "any"}).create();
@@ -145,6 +151,7 @@
       builder.serve("barback", current);
       builder.serve("source_span", sourceSpanVersion);
       builder.serve("stack_trace", stackTraceVersion);
+      builder.serve("async", asyncVersion);
     });
 
     d.appDir({"barback": previous}).create();
diff --git a/test/implicit_dependency_test.dart b/test/implicit_dependency_test.dart
index 8a0abef..4c77978 100644
--- a/test/implicit_dependency_test.dart
+++ b/test/implicit_dependency_test.dart
@@ -18,6 +18,7 @@
         builder.serve("stack_trace", nextPatch("stack_trace"));
         builder.serve("stack_trace", max("stack_trace"));
         builder.serve("source_span", current("source_span"));
+        builder.serve("async", current("async"));
       });
 
       d.appDir({
@@ -35,6 +36,7 @@
         builder.serve("barback", current("barback"));
         builder.serve("stack_trace", nextPatch("stack_trace"));
         builder.serve("source_span", current("source_span"));
+        builder.serve("async", current("async"));
       });
 
       d.dir("stack_trace", [
@@ -69,6 +71,7 @@
         builder.serve("stack_trace", nextPatch("stack_trace"));
         builder.serve("stack_trace", max("stack_trace"));
         builder.serve("source_span", current("source_span"));
+        builder.serve("async", current("async"));
       });
 
       d.appDir({
@@ -88,6 +91,7 @@
       builder.serve("stack_trace", previous("stack_trace"));
       builder.serve("stack_trace", current("stack_trace"));
       builder.serve("source_span", current("source_span"));
+      builder.serve("async", current("async"));
     });
 
     d.appDir({"barback": "any"}).create();
diff --git a/test/package_server.dart b/test/package_server.dart
index 7561a5a..6fedf61 100644
--- a/test/package_server.dart
+++ b/test/package_server.dart
@@ -5,6 +5,7 @@
 import 'dart:async';
 import 'dart:convert';
 
+import 'package:async/async.dart';
 import 'package:path/path.dart' as p;
 import 'package:pub/src/io.dart';
 import 'package:pub/src/utils.dart';
@@ -150,16 +151,13 @@
   ///
   /// If [contents] is passed, it's used as the contents of the package. By
   /// default, a package just contains a dummy lib directory.
-  void serve(String name, String version, {Map deps, Map pubspec,
-      Iterable<d.Descriptor> contents}) {
-    _futures.add(Future.wait([
-      awaitObject(deps),
-      awaitObject(pubspec)
-    ]).then((pair) {
-      var resolvedDeps = pair.first;
-      var resolvedPubspec = pair.last;
+  void serve(String name, String version, {Map<String, dynamic> deps,
+      Map<String, dynamic> pubspec, Iterable<d.Descriptor> contents}) {
+    _futures.add(new Future.sync(() async {
+      var resolvedDeps = await awaitObject(deps);
+      var resolvedPubspec = await awaitObject(pubspec);
 
-      var pubspecFields = {
+      var pubspecFields = <String, dynamic>{
         "name": name,
         "version": version
       };
@@ -205,11 +203,10 @@
 
   /// Returns a Future that completes once all the [serve] calls have been fully
   /// processed.
-  Future _await() {
-    if (_futures.futures.isEmpty) return new Future.value();
-    return _futures.future.then((_) {
-      _futures = new FutureGroup();
-    });
+  Future _await() async {
+    _futures.close();
+    await _futures.future;
+    _futures = new FutureGroup();
   }
 
   /// Clears all existing packages from this builder.
diff --git a/test/pub_uploader_test.dart b/test/pub_uploader_test.dart
index 4eb85ee..3f4c1b3 100644
--- a/test/pub_uploader_test.dart
+++ b/test/pub_uploader_test.dart
@@ -32,7 +32,7 @@
 ScheduledProcess startPubUploader(ScheduledServer server, List<String> args) {
   var tokenEndpoint = server.url.then((url) =>
       url.resolve('/token').toString());
-  args = flatten(['uploader', '--server', tokenEndpoint, args]);
+  args = ['uploader', '--server', tokenEndpoint]..addAll(args);
   return startPub(args: args, tokenEndpoint: tokenEndpoint);
 }
 
diff --git a/test/test_pub.dart b/test/test_pub.dart
index d23a3a1..24cd440 100644
--- a/test/test_pub.dart
+++ b/test/test_pub.dart
@@ -12,6 +12,7 @@
 import 'dart:io';
 import 'dart:math';
 
+import 'package:async/async.dart';
 import 'package:http/testing.dart';
 import 'package:path/path.dart' as p;
 import 'package:pub/src/entrypoint.dart';
@@ -160,7 +161,7 @@
 /// "pub run".
 ///
 /// Returns the `pub run` process.
-ScheduledProcess pubRun({bool global: false, Iterable<String> args}) {
+PubProcess pubRun({bool global: false, Iterable<String> args}) {
   var pubArgs = global ? ["global", "run"] : ["run"];
   pubArgs.addAll(args);
   var pub = startPub(args: pubArgs);
@@ -243,7 +244,7 @@
     var actualError = (await pub.stderrStream().toList()).join("\n");
     var actualSilent = (await pub.silentStream().toList()).join("\n");
 
-    var failures = [];
+    var failures = <String>[];
     if (outputJson == null) {
       _validateOutput(failures, 'stdout', output, actualOutput);
     } else {
@@ -263,11 +264,11 @@
 /// package server.
 ///
 /// Any futures in [args] will be resolved before the process is started.
-ScheduledProcess startPublish(ScheduledServer server, {List args}) {
+PubProcess startPublish(ScheduledServer server, {List args}) {
   var tokenEndpoint = server.url.then((url) =>
       url.resolve('/token').toString());
   if (args == null) args = [];
-  args = flatten(['lish', '--server', tokenEndpoint, args]);
+  args = ['lish', '--server', tokenEndpoint]..addAll(args);
   return startPub(args: args, tokenEndpoint: tokenEndpoint);
 }
 
@@ -311,15 +312,17 @@
   return environment;
 }
 
-/// Starts a Pub process and returns a [ScheduledProcess] that supports
-/// interaction with that process.
+/// Starts a Pub process and returns a [PubProcess] that supports interaction
+/// with that process.
 ///
 /// Any futures in [args] will be resolved before the process is started.
 ///
 /// If [environment] is given, any keys in it will override the environment
 /// variables passed to the spawned process.
-ScheduledProcess startPub({List args, Future<String> tokenEndpoint,
+PubProcess startPub({List args, Future<String> tokenEndpoint,
     Map<String, String> environment}) {
+  args ??= [];
+
   schedule(() {
     ensureDir(_pathInSandbox(appPath));
   }, "ensuring $appPath exists");
@@ -342,7 +345,7 @@
   var pubPath = p.absolute(p.join(pubRoot, 'bin/pub.dart'));
   if (fileExists('$pubPath.snapshot')) pubPath += '.snapshot';
 
-  var dartArgs = [
+  var dartArgs = <dynamic>[
     '--package-root=${p.toUri(p.absolute(p.fromUri(Platform.packageRoot)))}',
     pubPath,
     '--verbose'
@@ -381,14 +384,15 @@
 
   Stream<Pair<log.Level, String>> _logStream() {
     if (_log == null) {
-      _log = mergeStreams(
+      _log = StreamGroup.merge([
         _outputToLog(super.stdoutStream(), log.Level.MESSAGE),
-        _outputToLog(super.stderrStream(), log.Level.ERROR));
+        _outputToLog(super.stderrStream(), log.Level.ERROR)
+      ]);
     }
 
-    var pair = tee(_log);
-    _log = pair.first;
-    return pair.last;
+    var logs = StreamSplitter.splitFrom(_log);
+    _log = logs.first;
+    return logs.last;
   }
 
   final _logLineRegExp = new RegExp(r"^([A-Z ]{4})[:|] (.*)$");
@@ -422,9 +426,9 @@
       });
     }
 
-    var pair = tee(_stdout);
-    _stdout = pair.first;
-    return pair.last;
+    var stdouts = StreamSplitter.splitFrom(_stdout);
+    _stdout = stdouts.first;
+    return stdouts.last;
   }
 
   Stream<String> stderrStream() {
@@ -438,9 +442,9 @@
       });
     }
 
-    var pair = tee(_stderr);
-    _stderr = pair.first;
-    return pair.last;
+    var stderrs = StreamSplitter.splitFrom(_stderr);
+    _stderr = stderrs.first;
+    return stderrs.last;
   }
 
   /// A stream of log messages that are silent by default.
@@ -454,9 +458,9 @@
       });
     }
 
-    var pair = tee(_silent);
-    _silent = pair.first;
-    return pair.last;
+    var silents = StreamSplitter.splitFrom(_silent);
+    _silent = silents.first;
+    return silents.last;
   }
 }
 
@@ -581,7 +585,7 @@
 /// Describes a map representing a library package with the given [name],
 /// [version], and [dependencies].
 Map packageMap(String name, String version, [Map dependencies]) {
-  var package = {
+  var package = <String, dynamic>{
     "name": name,
     "version": version,
     "author": "Natalie Weizenbaum <nweiz@google.com>",
@@ -663,7 +667,7 @@
     expectedLines.removeLast();
   }
 
-  var results = [];
+  var results = <String>[];
   var failed = false;
 
   // Compare them line by line to see which ones match.
@@ -728,14 +732,11 @@
 /// by that validator.
 Future<Pair<List<String>, List<String>>> schedulePackageValidation(
     ValidatorCreator fn) {
-  return schedule(() {
+  return schedule/*<Future<Pair<List<String>, List<String>>>>*/(() async {
     var cache = new SystemCache(rootDir: p.join(sandboxDir, cachePath));
-    return new Future.sync(() {
-      var validator = fn(new Entrypoint(p.join(sandboxDir, appPath), cache));
-      return validator.validate().then((_) {
-        return new Pair(validator.errors, validator.warnings);
-      });
-    });
+    var validator = fn(new Entrypoint(p.join(sandboxDir, appPath), cache));
+    await validator.validate();
+    return new Pair(validator.errors, validator.warnings);
   }, "validating package");
 }