Reland: [dart:io] Adds Socket.startConnect

This is a reland of https://dart-review.googlesource.com/c/sdk/+/62484
with the following changes:
- _NativeSocket.connect now drops references to pending sockets on
  an error or successful connection.
- eventhandlers are updated to ignore unset Dart ports on a close
  command.
- Test updated to account for new SocketException.

This is the second part of https://dart-review.googlesource.com/c/sdk/+/62484

This CL adds a startConnect method to Socket types that returns
a ConnectionTask object that can be cancelled. Cancelling
a ConnectionTask closes any sockets that were opened for the
connection attempt that are not yet connected to the host.

This allows a closing HttpClient to close sockets for pending
requests whose sockets weren't fully connected yet.

related https://github.com/flutter/flutter/issues/18617

Change-Id: I47fe3564e41197d622079aad4bb644bbdfe0bfe8
Reviewed-on: https://dart-review.googlesource.com/63040
Reviewed-by: Zach Anderson <zra@google.com>
Commit-Queue: Zach Anderson <zra@google.com>
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 8f5844e..4552410 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -39,6 +39,9 @@
 
   * `dart:io`
     * Adds `HttpClient.connectionTimeout`.
+    * Adds `{Socket,RawSocket,SecureSocket}.startConnect`. These return a
+      `ConnectionTask`, which can be used to cancel an in-flight connection
+      attempt.
 
 ## 2.0.0-dev.66.0
 
diff --git a/pkg/dev_compiler/tool/input_sdk/patch/io_patch.dart b/pkg/dev_compiler/tool/input_sdk/patch/io_patch.dart
index 20cb5ff..b05a024 100644
--- a/pkg/dev_compiler/tool/input_sdk/patch/io_patch.dart
+++ b/pkg/dev_compiler/tool/input_sdk/patch/io_patch.dart
@@ -461,6 +461,12 @@
       {sourceAddress, Duration timeout}) {
     throw UnsupportedError("RawSocket constructor");
   }
+
+  @patch
+  static Future<ConnectionTask<RawSocket>> startConnect(host, int port,
+      {sourceAddress}) {
+    throw UnsupportedError("RawSocket constructor");
+  }
 }
 
 @patch
@@ -470,6 +476,12 @@
       {sourceAddress, Duration timeout}) {
     throw UnsupportedError("Socket constructor");
   }
+
+  @patch
+  static Future<ConnectionTask<Socket>> _startConnect(host, int port,
+      {sourceAddress}) {
+    throw UnsupportedError("Socket constructor");
+  }
 }
 
 @patch
diff --git a/runtime/bin/eventhandler_android.cc b/runtime/bin/eventhandler_android.cc
index d4d49aa..07e6c04 100644
--- a/runtime/bin/eventhandler_android.cc
+++ b/runtime/bin/eventhandler_android.cc
@@ -214,7 +214,9 @@
         // message.
         intptr_t old_mask = di->Mask();
         Dart_Port port = msg[i].dart_port;
-        di->RemovePort(port);
+        if (port != ILLEGAL_PORT) {
+          di->RemovePort(port);
+        }
         intptr_t new_mask = di->Mask();
         UpdateEpollInstance(old_mask, di);
 
diff --git a/runtime/bin/eventhandler_fuchsia.cc b/runtime/bin/eventhandler_fuchsia.cc
index b2dd6e2..6a7ae35 100644
--- a/runtime/bin/eventhandler_fuchsia.cc
+++ b/runtime/bin/eventhandler_fuchsia.cc
@@ -385,7 +385,9 @@
     // message.
     const intptr_t old_mask = di->Mask();
     Dart_Port port = msg->dart_port;
-    di->RemovePort(port);
+    if (port != ILLEGAL_PORT) {
+      di->RemovePort(port);
+    }
     const intptr_t new_mask = di->Mask();
     UpdatePort(old_mask, di);
 
diff --git a/runtime/bin/eventhandler_linux.cc b/runtime/bin/eventhandler_linux.cc
index 9f2760b..2b20572 100644
--- a/runtime/bin/eventhandler_linux.cc
+++ b/runtime/bin/eventhandler_linux.cc
@@ -223,11 +223,14 @@
         // message.
         intptr_t old_mask = di->Mask();
         Dart_Port port = msg[i].dart_port;
-        di->RemovePort(port);
+        if (port != ILLEGAL_PORT) {
+          di->RemovePort(port);
+        }
         intptr_t new_mask = di->Mask();
         UpdateEpollInstance(old_mask, di);
 
         intptr_t fd = di->fd();
+        ASSERT(fd == socket->fd());
         if (di->IsListeningSocket()) {
           // We only close the socket file descriptor from the operating
           // system if there are no other dart socket objects which
diff --git a/runtime/bin/eventhandler_macos.cc b/runtime/bin/eventhandler_macos.cc
index f271969..974ab8a 100644
--- a/runtime/bin/eventhandler_macos.cc
+++ b/runtime/bin/eventhandler_macos.cc
@@ -230,7 +230,9 @@
         // message.
         intptr_t old_mask = di->Mask();
         Dart_Port port = msg[i].dart_port;
-        di->RemovePort(port);
+        if (port != ILLEGAL_PORT) {
+          di->RemovePort(port);
+        }
         intptr_t new_mask = di->Mask();
         UpdateKQueueInstance(old_mask, di);
 
diff --git a/runtime/bin/eventhandler_win.cc b/runtime/bin/eventhandler_win.cc
index 7388863..7dc6561 100644
--- a/runtime/bin/eventhandler_win.cc
+++ b/runtime/bin/eventhandler_win.cc
@@ -1056,7 +1056,9 @@
         listen_socket->SetPortAndMask(msg->dart_port, events);
         TryDispatchingPendingAccepts(listen_socket);
       } else if (IS_COMMAND(msg->data, kCloseCommand)) {
-        listen_socket->RemovePort(msg->dart_port);
+        if (msg->dart_port != ILLEGAL_PORT) {
+          listen_socket->RemovePort(msg->dart_port);
+        }
 
         // We only close the socket file descriptor from the operating
         // system if there are no other dart socket objects which
diff --git a/runtime/bin/socket_patch.dart b/runtime/bin/socket_patch.dart
index 5f5d220..0d66baf 100644
--- a/runtime/bin/socket_patch.dart
+++ b/runtime/bin/socket_patch.dart
@@ -20,6 +20,12 @@
       {sourceAddress, Duration timeout}) {
     return _RawSocket.connect(host, port, sourceAddress, timeout);
   }
+
+  @patch
+  static Future<ConnectionTask<RawSocket>> startConnect(host, int port,
+      {sourceAddress}) {
+    return _RawSocket.startConnect(host, port, sourceAddress);
+  }
 }
 
 @patch
@@ -350,8 +356,8 @@
 
   static Future<List<InternetAddress>> lookup(String host,
       {InternetAddressType type: InternetAddressType.any}) {
-    return _IOService._dispatch(
-        _IOService.socketLookup, [host, type._value]).then((response) {
+    return _IOService._dispatch(_IOService.socketLookup, [host, type._value])
+        .then((response) {
       if (isErrorResponse(response)) {
         throw createError(response, "Failed host lookup: '$host'");
       } else {
@@ -378,8 +384,8 @@
       {bool includeLoopback: false,
       bool includeLinkLocal: false,
       InternetAddressType type: InternetAddressType.any}) {
-    return _IOService._dispatch(
-        _IOService.socketListInterfaces, [type._value]).then((response) {
+    return _IOService._dispatch(_IOService.socketListInterfaces, [type._value])
+        .then((response) {
       if (isErrorResponse(response)) {
         throw createError(response, "Failed listing interfaces");
       } else {
@@ -400,8 +406,8 @@
     });
   }
 
-  static Future<_NativeSocket> connect(
-      host, int port, sourceAddress, Duration timeout) {
+  static Future<ConnectionTask<_NativeSocket>> startConnect(
+      host, int port, sourceAddress) {
     _throwOnBadPort(port);
     if (sourceAddress != null && sourceAddress is! _InternetAddress) {
       if (sourceAddress is String) {
@@ -421,29 +427,11 @@
       var it = (addresses as List<InternetAddress>).iterator;
       var error = null;
       var connecting = new HashMap();
-      Timer timeoutTimer = null;
-      void timeoutHandler() {
-        connecting.forEach((s, t) {
-          t.cancel();
-          s.close();
-          s.setHandlers();
-          s.setListening(read: false, write: false);
-          error = createError(
-              null, "Connection timed out, host: ${host}, port: ${port}");
-          completer.completeError(error);
-        });
-      }
 
       void connectNext() {
-        if ((timeout != null) && (timeoutTimer == null)) {
-          timeoutTimer = new Timer(timeout, timeoutHandler);
-        }
         if (!it.moveNext()) {
           if (connecting.isEmpty) {
             assert(error != null);
-            if (timeoutTimer != null) {
-              timeoutTimer.cancel();
-            }
             completer.completeError(error);
           }
           return;
@@ -471,11 +459,13 @@
           }
           connectNext();
         } else {
-          // Query the local port, for error messages.
+          // Query the local port for error messages.
           try {
             socket.port;
           } catch (e) {
-            error = createError(e, "Connection failed", address, port);
+            if (error == null) {
+              error = createError(e, "Connection failed", address, port);
+            }
             connectNext();
           }
           // Set up timer for when we should retry the next address
@@ -490,9 +480,6 @@
           // indicate that the socket is fully connected.
           socket.setHandlers(write: () {
             timer.cancel();
-            if (timeoutTimer != null) {
-              timeoutTimer.cancel();
-            }
             socket.setListening(read: false, write: false);
             completer.complete(socket);
             connecting.remove(socket);
@@ -502,6 +489,7 @@
               s.setHandlers();
               s.setListening(read: false, write: false);
             });
+            connecting.clear();
           }, error: (e) {
             timer.cancel();
             socket.close();
@@ -514,8 +502,42 @@
         }
       }
 
+      void onCancel() {
+        connecting.forEach((s, t) {
+          t.cancel();
+          s.close();
+          s.setHandlers();
+          s.setListening(read: false, write: false);
+          if (error == null) {
+            error = createError(null,
+                "Connection attempt cancelled, host: ${host}, port: ${port}");
+          }
+        });
+        connecting.clear();
+        if (!completer.isCompleted) {
+          completer.completeError(error);
+        }
+      }
+
       connectNext();
-      return completer.future;
+      return new ConnectionTask<_NativeSocket>._(
+          socket: completer.future, onCancel: onCancel);
+    });
+  }
+
+  static Future<_NativeSocket> connect(
+      host, int port, sourceAddress, Duration timeout) {
+    return startConnect(host, port, sourceAddress)
+        .then((ConnectionTask<_NativeSocket> task) {
+      Future<_NativeSocket> socketFuture = task.socket;
+      if (timeout != null) {
+        socketFuture = socketFuture.timeout(timeout, onTimeout: () {
+          task.cancel();
+          throw createError(
+              null, "Connection timed out, host: ${host}, port: ${port}");
+        });
+      }
+      return socketFuture;
     });
   }
 
@@ -1134,8 +1156,7 @@
       address, int port, int backlog, bool v6Only, bool shared) {
     _throwOnBadPort(port);
     if (backlog < 0) throw new ArgumentError("Invalid backlog $backlog");
-    return _NativeSocket
-        .bind(address, port, backlog, v6Only, shared)
+    return _NativeSocket.bind(address, port, backlog, v6Only, shared)
         .then((socket) => new _RawServerSocket(socket, v6Only));
   }
 
@@ -1228,11 +1249,21 @@
 
   static Future<RawSocket> connect(
       host, int port, sourceAddress, Duration timeout) {
-    return _NativeSocket
-        .connect(host, port, sourceAddress, timeout)
+    return _NativeSocket.connect(host, port, sourceAddress, timeout)
         .then((socket) => new _RawSocket(socket));
   }
 
+  static Future<ConnectionTask<_RawSocket>> startConnect(
+      host, int port, sourceAddress) {
+    return _NativeSocket.startConnect(host, port, sourceAddress)
+        .then((ConnectionTask<_NativeSocket> nativeTask) {
+      final Future<_RawSocket> raw = nativeTask.socket
+          .then((_NativeSocket nativeSocket) => new _RawSocket(nativeSocket));
+      return new ConnectionTask<_RawSocket>._(
+          socket: raw, onCancel: nativeTask._onCancel);
+    });
+  }
+
   _RawSocket(this._socket) {
     var zone = Zone.current;
     _controller = new StreamController(
@@ -1381,8 +1412,7 @@
 
   static Future<_ServerSocket> bind(
       address, int port, int backlog, bool v6Only, bool shared) {
-    return _RawServerSocket
-        .bind(address, port, backlog, v6Only, shared)
+    return _RawServerSocket.bind(address, port, backlog, v6Only, shared)
         .then((socket) => new _ServerSocket(socket));
   }
 
@@ -1414,10 +1444,22 @@
   @patch
   static Future<Socket> _connect(host, int port,
       {sourceAddress, Duration timeout}) {
-    return RawSocket
-        .connect(host, port, sourceAddress: sourceAddress, timeout: timeout)
+    return RawSocket.connect(host, port,
+            sourceAddress: sourceAddress, timeout: timeout)
         .then((socket) => new _Socket(socket));
   }
+
+  @patch
+  static Future<ConnectionTask<Socket>> _startConnect(host, int port,
+      {sourceAddress}) {
+    return RawSocket.startConnect(host, port, sourceAddress: sourceAddress)
+        .then((rawTask) {
+      Future<Socket> socket =
+          rawTask.socket.then((rawSocket) => new _Socket(rawSocket));
+      return new ConnectionTask<Socket>._(
+          socket: socket, onCancel: rawTask._onCancel);
+    });
+  }
 }
 
 class _SocketStreamConsumer extends StreamConsumer<List<int>> {
@@ -1774,8 +1816,7 @@
 
   static Future<RawDatagramSocket> bind(host, int port, bool reuseAddress) {
     _throwOnBadPort(port);
-    return _NativeSocket
-        .bindDatagram(host, port, reuseAddress)
+    return _NativeSocket.bindDatagram(host, port, reuseAddress)
         .then((socket) => new _RawDatagramSocket(socket));
   }
 
diff --git a/sdk/lib/_http/http_impl.dart b/sdk/lib/_http/http_impl.dart
index 528c2bfb..983a4fb 100644
--- a/sdk/lib/_http/http_impl.dart
+++ b/sdk/lib/_http/http_impl.dart
@@ -1039,8 +1039,9 @@
 
   Future<HttpClientResponse> get done {
     if (_response == null) {
-      _response = Future.wait([_responseCompleter.future, super.done],
-          eagerError: true).then((list) => list[0]);
+      _response =
+          Future.wait([_responseCompleter.future, super.done], eagerError: true)
+              .then((list) => list[0]);
     }
     return _response;
   }
@@ -1254,7 +1255,8 @@
           outbound.headers.chunkedTransferEncoding) {
         List acceptEncodings =
             response._httpRequest.headers[HttpHeaders.acceptEncodingHeader];
-        List contentEncoding = outbound.headers[HttpHeaders.contentEncodingHeader];
+        List contentEncoding =
+            outbound.headers[HttpHeaders.contentEncodingHeader];
         if (acceptEncodings != null &&
             acceptEncodings
                 .expand((list) => list.split(","))
@@ -1657,8 +1659,8 @@
     if (proxy.isAuthenticated) {
       // If the proxy configuration contains user information use that
       // for proxy basic authorization.
-      String auth = _CryptoUtils
-          .bytesToBase64(utf8.encode("${proxy.username}:${proxy.password}"));
+      String auth = _CryptoUtils.bytesToBase64(
+          utf8.encode("${proxy.username}:${proxy.password}"));
       request.headers.set(HttpHeaders.proxyAuthorizationHeader, "Basic $auth");
     } else if (!proxy.isDirect && _httpClient._proxyCredentials.length > 0) {
       proxyCreds = _httpClient._findProxyCredentials(proxy);
@@ -1766,7 +1768,8 @@
   void close() {
     closed = true;
     _httpClient._connectionClosed(this);
-    _streamFuture.timeout(_httpClient.idleTimeout)
+    _streamFuture
+        .timeout(_httpClient.idleTimeout)
         .then((_) => _socket.destroy());
   }
 
@@ -1777,8 +1780,8 @@
     if (proxy.isAuthenticated) {
       // If the proxy configuration contains user information use that
       // for proxy basic authorization.
-      String auth = _CryptoUtils
-          .bytesToBase64(utf8.encode("${proxy.username}:${proxy.password}"));
+      String auth = _CryptoUtils.bytesToBase64(
+          utf8.encode("${proxy.username}:${proxy.password}"));
       request.headers.set(HttpHeaders.proxyAuthorizationHeader, "Basic $auth");
     }
     return request.close().then((response) {
@@ -1837,6 +1840,7 @@
   final SecurityContext context;
   final Set<_HttpClientConnection> _idle = new HashSet();
   final Set<_HttpClientConnection> _active = new HashSet();
+  final Set<ConnectionTask> _socketTasks = new HashSet();
   final Queue _pending = new ListQueue();
   int _connecting = 0;
 
@@ -1884,6 +1888,14 @@
   }
 
   void close(bool force) {
+    // Always cancel pending socket connections.
+    for (var t in _socketTasks.toList()) {
+      // Make sure the socket is destroyed if the ConnectionTask is cancelled.
+      t.socket.then((s) {
+        s.destroy();
+      }, onError: (e) {});
+      t.cancel();
+    }
     if (force) {
       for (var c in _idle.toList()) {
         c.destroy();
@@ -1920,35 +1932,48 @@
       return currentBadCertificateCallback(certificate, uriHost, uriPort);
     }
 
-    Future socketFuture = (isSecure && proxy.isDirect
-        ? SecureSocket.connect(host, port,
-            context: context, onBadCertificate: callback,
-            timeout: client.connectionTimeout)
-        : Socket.connect(host, port, timeout: client.connectionTimeout));
+    Future<ConnectionTask> connectionTask = (isSecure && proxy.isDirect
+        ? SecureSocket.startConnect(host, port,
+            context: context, onBadCertificate: callback)
+        : Socket.startConnect(host, port));
     _connecting++;
-    return socketFuture.then((socket) {
-      _connecting--;
-      socket.setOption(SocketOption.tcpNoDelay, true);
-      var connection =
-          new _HttpClientConnection(key, socket, client, false, context);
-      if (isSecure && !proxy.isDirect) {
-        connection._dispose = true;
-        return connection
-            .createProxyTunnel(uriHost, uriPort, proxy, callback)
-            .then((tunnel) {
-          client
-              ._getConnectionTarget(uriHost, uriPort, true)
-              .addNewActive(tunnel);
-          return new _ConnectionInfo(tunnel, proxy);
+    return connectionTask.then((ConnectionTask task) {
+      _socketTasks.add(task);
+      Future socketFuture = task.socket;
+      if (client.connectionTimeout != null) {
+        socketFuture =
+            socketFuture.timeout(client.connectionTimeout, onTimeout: () {
+          _socketTasks.remove(task);
+          task.cancel();
         });
-      } else {
-        addNewActive(connection);
-        return new _ConnectionInfo(connection, proxy);
       }
-    }, onError: (error) {
-      _connecting--;
-      _checkPending();
-      throw error;
+      return socketFuture.then((socket) {
+        _connecting--;
+        socket.setOption(SocketOption.tcpNoDelay, true);
+        var connection =
+            new _HttpClientConnection(key, socket, client, false, context);
+        if (isSecure && !proxy.isDirect) {
+          connection._dispose = true;
+          return connection
+              .createProxyTunnel(uriHost, uriPort, proxy, callback)
+              .then((tunnel) {
+            client
+                ._getConnectionTarget(uriHost, uriPort, true)
+                .addNewActive(tunnel);
+            _socketTasks.remove(task);
+            return new _ConnectionInfo(tunnel, proxy);
+          });
+        } else {
+          addNewActive(connection);
+          _socketTasks.remove(task);
+          return new _ConnectionInfo(connection, proxy);
+        }
+      }, onError: (error) {
+        _connecting--;
+        _socketTasks.remove(task);
+        _checkPending();
+        throw error;
+      });
     });
   }
 }
@@ -2103,9 +2128,8 @@
     bool isSecure = (uri.scheme == "https");
     int port = uri.port;
     if (port == 0) {
-      port = isSecure
-          ? HttpClient.defaultHttpsPort
-          : HttpClient.defaultHttpPort;
+      port =
+          isSecure ? HttpClient.defaultHttpsPort : HttpClient.defaultHttpPort;
     }
     // Check to see if a proxy server should be used for this connection.
     var proxyConf = const _ProxyConfiguration.direct();
@@ -2502,8 +2526,8 @@
 
   static Future<HttpServer> bind(
       address, int port, int backlog, bool v6Only, bool shared) {
-    return ServerSocket
-        .bind(address, port, backlog: backlog, v6Only: v6Only, shared: shared)
+    return ServerSocket.bind(address, port,
+            backlog: backlog, v6Only: v6Only, shared: shared)
         .then<HttpServer>((socket) {
       return new _HttpServer._(socket, true);
     });
@@ -2517,8 +2541,7 @@
       bool v6Only,
       bool requestClientCertificate,
       bool shared) {
-    return SecureServerSocket
-        .bind(address, port, context,
+    return SecureServerSocket.bind(address, port, context,
             backlog: backlog,
             v6Only: v6Only,
             requestClientCertificate: requestClientCertificate,
@@ -2977,8 +3000,8 @@
     if (scheme != null && credentials.scheme != scheme) return false;
     if (uri.host != this.uri.host) return false;
     int thisPort =
-        this.uri.port == 0 ? HttpClient.defaultHttpPort: this.uri.port;
-    int otherPort = uri.port == 0 ? HttpClient.defaultHttpPort: uri.port;
+        this.uri.port == 0 ? HttpClient.defaultHttpPort : this.uri.port;
+    int otherPort = uri.port == 0 ? HttpClient.defaultHttpPort : uri.port;
     if (otherPort != thisPort) return false;
     return uri.path.startsWith(this.uri.path);
   }
@@ -3116,14 +3139,14 @@
   }
 
   void authorize(_Credentials credentials, HttpClientRequest request) {
-    request.headers
-        .set(HttpHeaders.authorizationHeader, authorization(credentials, request));
+    request.headers.set(
+        HttpHeaders.authorizationHeader, authorization(credentials, request));
   }
 
   void authorizeProxy(
       _ProxyCredentials credentials, HttpClientRequest request) {
-    request.headers.set(
-        HttpHeaders.proxyAuthorizationHeader, authorization(credentials, request));
+    request.headers.set(HttpHeaders.proxyAuthorizationHeader,
+        authorization(credentials, request));
   }
 }
 
diff --git a/sdk/lib/_internal/js_runtime/lib/io_patch.dart b/sdk/lib/_internal/js_runtime/lib/io_patch.dart
index 9cfb7ef..8a4c7bb 100644
--- a/sdk/lib/_internal/js_runtime/lib/io_patch.dart
+++ b/sdk/lib/_internal/js_runtime/lib/io_patch.dart
@@ -461,6 +461,12 @@
       {sourceAddress, Duration timeout}) {
     throw new UnsupportedError("RawSocket constructor");
   }
+
+  @patch
+  static Future<ConnectionTask<RawSocket>> startConnect(host, int port,
+      {sourceAddress}) {
+    throw new UnsupportedError("RawSocket constructor");
+  }
 }
 
 @patch
@@ -470,6 +476,12 @@
       {sourceAddress, Duration timeout}) {
     throw new UnsupportedError("Socket constructor");
   }
+
+  @patch
+  static Future<ConnectionTask<Socket>> _startConnect(host, int port,
+      {sourceAddress}) {
+    throw new UnsupportedError("Socket constructor");
+  }
 }
 
 @patch
diff --git a/sdk/lib/io/overrides.dart b/sdk/lib/io/overrides.dart
index f68d417..1a71881 100644
--- a/sdk/lib/io/overrides.dart
+++ b/sdk/lib/io/overrides.dart
@@ -83,6 +83,9 @@
       Future<Socket> Function(dynamic, int,
               {dynamic sourceAddress, Duration timeout})
           socketConnect,
+      Future<ConnectionTask<Socket>> Function(dynamic, int,
+              {dynamic sourceAddress})
+          socketStartConnect,
 
       // Optional Zone parameters
       ZoneSpecification zoneSpecification,
@@ -116,6 +119,7 @@
 
       // Socket
       socketConnect,
+      socketStartConnect,
     );
     return _asyncRunZoned<R>(body,
         zoneValues: {_ioOverridesToken: overrides},
@@ -254,12 +258,22 @@
   /// Asynchronously returns a [Socket] connected to the given host and port.
   ///
   /// When this override is installed, this functions overrides the behavior of
-  /// `Socet.connect(...)`.
+  /// `Socket.connect(...)`.
   Future<Socket> socketConnect(host, int port,
       {sourceAddress, Duration timeout}) {
     return Socket._connect(host, port,
         sourceAddress: sourceAddress, timeout: timeout);
   }
+
+  /// Asynchronously returns a [ConnectionTask] that connects to the given host
+  /// and port when successful.
+  ///
+  /// When this override is installed, this functions overrides the behavior of
+  /// `Socket.startConnect(...)`.
+  Future<ConnectionTask<Socket>> socketStartConnect(host, int port,
+      {sourceAddress}) {
+    return Socket._startConnect(host, port, sourceAddress: sourceAddress);
+  }
 }
 
 class _IOOverridesScope extends IOOverrides {
@@ -294,6 +308,8 @@
   // Socket
   Future<Socket> Function(dynamic, int,
       {dynamic sourceAddress, Duration timeout}) _socketConnect;
+  Future<ConnectionTask<Socket>> Function(dynamic, int, {dynamic sourceAddress})
+      _socketStartConnect;
 
   _IOOverridesScope(
     // Directory
@@ -324,6 +340,7 @@
 
     // Socket
     this._socketConnect,
+    this._socketStartConnect,
   );
 
   // Directory
@@ -448,4 +465,17 @@
     return super.socketConnect(host, port,
         sourceAddress: sourceAddress, timeout: timeout);
   }
+
+  @override
+  Future<ConnectionTask<Socket>> socketStartConnect(host, int port,
+      {sourceAddress}) {
+    if (_socketStartConnect != null) {
+      return _socketStartConnect(host, port, sourceAddress: sourceAddress);
+    }
+    if (_previous != null) {
+      return _previous.socketStartConnect(host, port,
+          sourceAddress: sourceAddress);
+    }
+    return super.socketStartConnect(host, port, sourceAddress: sourceAddress);
+  }
 }
diff --git a/sdk/lib/io/secure_socket.dart b/sdk/lib/io/secure_socket.dart
index 52e41d5..4b3f28f 100644
--- a/sdk/lib/io/secure_socket.dart
+++ b/sdk/lib/io/secure_socket.dart
@@ -46,8 +46,7 @@
       bool onBadCertificate(X509Certificate certificate),
       List<String> supportedProtocols,
       Duration timeout}) {
-    return RawSecureSocket
-        .connect(host, port,
+    return RawSecureSocket.connect(host, port,
             context: context,
             onBadCertificate: onBadCertificate,
             supportedProtocols: supportedProtocols,
@@ -55,6 +54,25 @@
         .then((rawSocket) => new SecureSocket._(rawSocket));
   }
 
+  /// Like [connect], but returns a [Future] that completes with a
+  /// [ConnectionTask] that can be cancelled if the [SecureSocket] is no
+  /// longer needed.
+  static Future<ConnectionTask<SecureSocket>> startConnect(host, int port,
+      {SecurityContext context,
+      bool onBadCertificate(X509Certificate certificate),
+      List<String> supportedProtocols}) {
+    return RawSecureSocket.startConnect(host, port,
+            context: context,
+            onBadCertificate: onBadCertificate,
+            supportedProtocols: supportedProtocols)
+        .then((rawState) {
+      Future<SecureSocket> socket =
+          rawState.socket.then((rawSocket) => new SecureSocket._(rawSocket));
+      return new ConnectionTask<SecureSocket>._(
+          socket: socket, onCancel: rawState._onCancel);
+    });
+  }
+
   /**
    * Takes an already connected [socket] and starts client side TLS
    * handshake to make the communication secure. When the returned
@@ -215,6 +233,26 @@
     });
   }
 
+  /// Like [connect], but returns a [Future] that completes with a
+  /// [ConnectionTask] that can be cancelled if the [RawSecureSocket] is no
+  /// longer needed.
+  static Future<ConnectionTask<RawSecureSocket>> startConnect(host, int port,
+      {SecurityContext context,
+      bool onBadCertificate(X509Certificate certificate),
+      List<String> supportedProtocols}) {
+    return RawSocket.startConnect(host, port)
+        .then((ConnectionTask<RawSocket> rawState) {
+      Future<RawSecureSocket> socket = rawState.socket.then((rawSocket) {
+        return secure(rawSocket,
+            context: context,
+            onBadCertificate: onBadCertificate,
+            supportedProtocols: supportedProtocols);
+      });
+      return new ConnectionTask<RawSecureSocket>._(
+          socket: socket, onCancel: rawState._onCancel);
+    });
+  }
+
   /**
    * Takes an already connected [socket] and starts client side TLS
    * handshake to make the communication secure. When the returned
@@ -990,8 +1028,7 @@
       args[2 * i + 3] = bufs[i].end;
     }
 
-    return _IOService
-        ._dispatch(_IOService.sslProcessFilter, args)
+    return _IOService._dispatch(_IOService.sslProcessFilter, args)
         .then((response) {
       if (response.length == 2) {
         if (wasInHandshake) {
diff --git a/sdk/lib/io/socket.dart b/sdk/lib/io/socket.dart
index 0ab315e..1d8a263 100644
--- a/sdk/lib/io/socket.dart
+++ b/sdk/lib/io/socket.dart
@@ -426,6 +426,32 @@
   }
 }
 
+/// Returned by the `startConnect` methods on client-side socket types `S`,
+/// `ConnectionTask<S>` allows cancelling an attempt to connect to a host.
+class ConnectionTask<S> {
+  /// A `Future` that completes with value that `S.connect()` would return
+  /// unless [cancel] is called on this [ConnectionTask].
+  ///
+  /// If [cancel] is called, the `Future` completes with a [SocketException]
+  /// error whose message indicates that the connection attempt was cancelled.
+  final Future<S> socket;
+  final void Function() _onCancel;
+
+  ConnectionTask._({Future<S> socket, void Function() onCancel})
+      : assert(socket != null),
+        assert(onCancel != null),
+        this.socket = socket,
+        this._onCancel = onCancel;
+
+  /// Cancels the connection attempt.
+  ///
+  /// This also causes the [socket] `Future` to complete with a
+  /// [SocketException] error.
+  void cancel() {
+    _onCancel();
+  }
+}
+
 /**
  * The [RawSocket] is a low-level interface to a socket, exposing the raw
  * events signaled by the system. It's a [Stream] of [RawSocketEvent]s.
@@ -470,6 +496,12 @@
   external static Future<RawSocket> connect(host, int port,
       {sourceAddress, Duration timeout});
 
+  /// Like [connect], but returns a [Future] that completes with a
+  /// [ConnectionTask] that can be cancelled if the [RawSocket] is no
+  /// longer needed.
+  external static Future<ConnectionTask<RawSocket>> startConnect(host, int port,
+      {sourceAddress});
+
   /**
    * Returns the number of received and non-read bytes in the socket that
    * can be read.
@@ -583,9 +615,25 @@
         sourceAddress: sourceAddress, timeout: timeout);
   }
 
+  /// Like [connect], but returns a [Future] that completes with a
+  /// [ConnectionTask] that can be cancelled if the [Socket] is no
+  /// longer needed.
+  static Future<ConnectionTask<Socket>> startConnect(host, int port,
+      {sourceAddress}) {
+    final IOOverrides overrides = IOOverrides.current;
+    if (overrides == null) {
+      return Socket._startConnect(host, port, sourceAddress: sourceAddress);
+    }
+    return overrides.socketStartConnect(host, port,
+        sourceAddress: sourceAddress);
+  }
+
   external static Future<Socket> _connect(host, int port,
       {sourceAddress, Duration timeout});
 
+  external static Future<ConnectionTask<Socket>> _startConnect(host, int port,
+      {sourceAddress});
+
   /**
    * Destroy the socket in both directions. Calling [destroy] will make the
    * send a close event on the stream and will no longer react on data being
diff --git a/tests/standalone/io/io_override_test.dart b/tests/standalone/io/io_override_test.dart
index 012e7ff..1e5983a 100644
--- a/tests/standalone/io/io_override_test.dart
+++ b/tests/standalone/io/io_override_test.dart
@@ -162,6 +162,11 @@
   return null;
 }
 
+Future<ConnectionTask<Socket>> socketStartConnect(host, int port,
+    {sourceAddress}) {
+  return null;
+}
+
 Future<Null> ioOverridesRunTest() async {
   Future<Null> f = IOOverrides.runZoned(
     () async {
@@ -181,6 +186,7 @@
       Expect.isNull(new Directory("directory").watch());
       Expect.isTrue(new Link("link") is LinkMock);
       Expect.isNull(Socket.connect(null, 0));
+      Expect.isNull(Socket.startConnect(null, 0));
     },
     createDirectory: DirectoryMock.createDirectory,
     getCurrentDirectory: DirectoryMock.getCurrent,
@@ -197,6 +203,7 @@
     fsWatchIsSupported: FileSystemWatcherMock.watchSupported,
     createLink: LinkMock.createLink,
     socketConnect: socketConnect,
+    socketStartConnect: socketStartConnect,
   );
   Expect.isFalse(new Directory("directory") is DirectoryMock);
   Expect.isTrue(new Directory("directory") is Directory);
diff --git a/tests/standalone_2/io/http_shutdown_test.dart b/tests/standalone_2/io/http_shutdown_test.dart
index a3410db..c04a537 100644
--- a/tests/standalone_2/io/http_shutdown_test.dart
+++ b/tests/standalone_2/io/http_shutdown_test.dart
@@ -160,7 +160,8 @@
             return request.close();
           })
           .then((response) {})
-          .catchError((e) {}, test: (e) => e is HttpException);
+          .catchError((e) {},
+              test: (e) => e is HttpException || e is SocketException);
     }
     bool clientClosed = false;
     new Timer.periodic(new Duration(milliseconds: 100), (timer) {
diff --git a/tests/standalone_2/io/raw_socket_test.dart b/tests/standalone_2/io/raw_socket_test.dart
index f5506cc..d46d1d6 100644
--- a/tests/standalone_2/io/raw_socket_test.dart
+++ b/tests/standalone_2/io/raw_socket_test.dart
@@ -74,6 +74,21 @@
   });
 }
 
+void testCancelConnect() {
+  asyncStart();
+  RawSocket.startConnect(InternetAddress.loopbackIPv4, 0)
+      .then((ConnectionTask<RawSocket> task) {
+    task.cancel();
+    task.socket.catchError((error) {
+      Expect.isTrue(error is SocketException);
+      asyncEnd();
+    });
+    task.socket.then((s) {
+      Expect.fail("Unreachable");
+    });
+  });
+}
+
 void testCloseOneEnd(String toClose) {
   asyncStart();
   Completer serverDone = new Completer();
@@ -467,6 +482,7 @@
   testCloseOneEnd("server");
   testInvalidBind();
   testSimpleConnect();
+  testCancelConnect();
   testServerListenAfterConnect();
   testSimpleReadWrite(dropReads: false);
   testSimpleReadWrite(dropReads: true);
diff --git a/tests/standalone_2/io/socket_cancel_connect_test.dart b/tests/standalone_2/io/socket_cancel_connect_test.dart
new file mode 100644
index 0000000..7a2e112
--- /dev/null
+++ b/tests/standalone_2/io/socket_cancel_connect_test.dart
@@ -0,0 +1,32 @@
+// Copyright (c) 2018, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+//
+// VMOptions=
+// VMOptions=--short_socket_read
+// VMOptions=--short_socket_write
+// VMOptions=--short_socket_read --short_socket_write
+
+import "dart:async";
+import "dart:io";
+
+import "package:async_helper/async_helper.dart";
+import "package:expect/expect.dart";
+
+void main() {
+  asyncStart();
+  Duration timeout = new Duration(milliseconds: 20);
+  Socket.startConnect("8.8.8.7", 80).then((task) {
+    task.socket.timeout(timeout, onTimeout: () {
+      task.cancel();
+    });
+    task.socket.then((socket) {
+      Expect.fail("Unexpected connection made.");
+      asyncEnd();
+    }).catchError((e) {
+      print(e);
+      Expect.isTrue(e is SocketException);
+      asyncEnd();
+    });
+  });
+}