Updates migrated from sdk repo (#67)

diff --git a/.travis.yml b/.travis.yml
index 99e217e..c1d0163 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,10 +1,10 @@
 language: dart
-sudo: false
+sudo: true
 
 before_script:
   # Add an IPv6 config - see the corresponding Travis issue
   # https://github.com/travis-ci/travis-ci/issues/8361
-  - if [ "${TRAVIS_OS_NAME}" == "linux" ]; then 
+  - if [ "${TRAVIS_OS_NAME}" == "linux" ]; then
       sudo sh -c 'echo 0 > /proc/sys/net/ipv6/conf/all/disable_ipv6';
     fi
 
diff --git a/lib/http_io.dart b/lib/http_io.dart
index 11540f7..074c9c6 100644
--- a/lib/http_io.dart
+++ b/lib/http_io.dart
@@ -10,6 +10,7 @@
 import 'dart:convert';
 import 'dart:io'
     show
+        ConnectionTask,
         BytesBuilder,
         gzip,
         HandshakeException,
@@ -1263,6 +1264,16 @@
   /// The default value is 15 seconds.
   Duration idleTimeout;
 
+  /// Gets and sets the connection timeout.
+  ///
+  /// When connecting to a new host exceeds this timeout, a [SocketException]
+  /// is thrown. The timeout applies only to connections initiated after the
+  /// timeout is set.
+  ///
+  /// When this is `null`, the OS default timeout is used. The default is
+  /// `null`.
+  Duration connectionTimeout;
+
   /**
    * Gets and sets the maximum number of live connections, to a single host.
    *
diff --git a/lib/src/http_impl.dart b/lib/src/http_impl.dart
index 43b5f72..79a890b 100644
--- a/lib/src/http_impl.dart
+++ b/lib/src/http_impl.dart
@@ -1697,7 +1697,7 @@
     closed = true;
     _httpClient._connectionClosed(this);
     _streamFuture
-        // TODO(ajohnsen): Add timeout.
+        .timeout(_httpClient.idleTimeout)
         .then((_) => _socket.destroy());
   }
 
@@ -1768,6 +1768,7 @@
   final SecurityContext context;
   final Set<_HttpClientConnection> _idle = new HashSet();
   final Set<_HttpClientConnection> _active = new HashSet();
+  final Set<ConnectionTask> _socketTasks = new HashSet();
   final Queue _pending = new ListQueue();
   int _connecting = 0;
 
@@ -1815,13 +1816,25 @@
   }
 
   void close(bool force) {
-    for (var c in _idle.toList()) {
-      c.close();
+    // Always cancel pending socket connections.
+    for (var t in _socketTasks.toList()) {
+      // Make sure the socket is destroyed if the ConnectionTask is cancelled.
+      t.socket.then((s) {
+        s.destroy();
+      }, onError: (e) {});
+      t.cancel();
     }
     if (force) {
+      for (var c in _idle.toList()) {
+        c.destroy();
+      }
       for (var c in _active.toList()) {
         c.destroy();
       }
+    } else {
+      for (var c in _idle.toList()) {
+        c.close();
+      }
     }
   }
 
@@ -1847,34 +1860,48 @@
       return currentBadCertificateCallback(certificate, uriHost, uriPort);
     }
 
-    Future<Socket> socketFuture = (isSecure && proxy.isDirect
-        ? SecureSocket.connect(host, port,
+    Future<ConnectionTask> connectionTask = (isSecure && proxy.isDirect
+        ? SecureSocket.startConnect(host, port,
             context: context, onBadCertificate: callback)
-        : Socket.connect(host, port));
+        : Socket.startConnect(host, port));
     _connecting++;
-    return socketFuture.then((socket) {
-      _connecting--;
-      socket.setOption(SocketOption.tcpNoDelay, true);
-      var connection =
-          new _HttpClientConnection(key, socket, client, false, context);
-      if (isSecure && !proxy.isDirect) {
-        connection._dispose = true;
-        return connection
-            .createProxyTunnel(uriHost, uriPort, proxy, callback)
-            .then((tunnel) {
-          client
-              ._getConnectionTarget(uriHost, uriPort, true)
-              .addNewActive(tunnel);
-          return new _ConnectionInfo(tunnel, proxy);
+    return connectionTask.then((ConnectionTask task) {
+      _socketTasks.add(task);
+      Future socketFuture = task.socket;
+      if (client.connectionTimeout != null) {
+        socketFuture =
+            socketFuture.timeout(client.connectionTimeout, onTimeout: () {
+          _socketTasks.remove(task);
+          task.cancel();
         });
-      } else {
-        addNewActive(connection);
-        return new _ConnectionInfo(connection, proxy);
       }
-    }, onError: (error) {
-      _connecting--;
-      _checkPending();
-      throw error;
+      return socketFuture.then((socket) {
+        _connecting--;
+        socket.setOption(SocketOption.tcpNoDelay, true);
+        var connection =
+            new _HttpClientConnection(key, socket, client, false, context);
+        if (isSecure && !proxy.isDirect) {
+          connection._dispose = true;
+          return connection
+              .createProxyTunnel(uriHost, uriPort, proxy, callback)
+              .then((tunnel) {
+            client
+                ._getConnectionTarget(uriHost, uriPort, true)
+                .addNewActive(tunnel);
+            _socketTasks.remove(task);
+            return new _ConnectionInfo(tunnel, proxy);
+          });
+        } else {
+          addNewActive(connection);
+          _socketTasks.remove(task);
+          return new _ConnectionInfo(connection, proxy);
+        }
+      }, onError: (error) {
+        _connecting--;
+        _socketTasks.remove(task);
+        _checkPending();
+        throw error;
+      });
     });
   }
 }
@@ -1897,6 +1924,8 @@
 
   Duration get idleTimeout => _idleTimeout;
 
+  Duration connectionTimeout;
+
   int maxConnectionsPerHost;
 
   bool autoUncompress = true;
diff --git a/test/http_shutdown_test.dart b/test/http_shutdown_test.dart
index cb32e85..a07502e 100644
--- a/test/http_shutdown_test.dart
+++ b/test/http_shutdown_test.dart
@@ -3,6 +3,7 @@
 // BSD-style license that can be found in the LICENSE file.
 
 import "dart:async";
+import "dart:io" show SocketException;
 
 import "package:http_io/http_io.dart";
 import "package:test/test.dart";
@@ -170,7 +171,8 @@
             return request.close();
           })
           .then((response) {})
-          .catchError((e) {}, test: (e) => e is HttpException);
+          .catchError((e) {},
+              test: (e) => e is HttpException || e is SocketException);
     }
     bool clientClosed = false;
     new Timer.periodic(new Duration(milliseconds: 100), (timer) {