Reland: [dart:io] Adds Socket.startConnect
This is a reland of https://dart-review.googlesource.com/c/sdk/+/62484
with the following changes:
- _NativeSocket.connect now drops references to pending sockets on
an error or successful connection.
- eventhandlers are updated to ignore unset Dart ports on a close
command.
- Test updated to account for new SocketException.
This is the second part of https://dart-review.googlesource.com/c/sdk/+/62484
This CL adds a startConnect method to Socket types that returns
a ConnectionTask object that can be cancelled. Cancelling
a ConnectionTask closes any sockets that were opened for the
connection attempt that are not yet connected to the host.
This allows a closing HttpClient to close sockets for pending
requests whose sockets weren't fully connected yet.
related https://github.com/flutter/flutter/issues/18617
Change-Id: I47fe3564e41197d622079aad4bb644bbdfe0bfe8
Reviewed-on: https://dart-review.googlesource.com/63040
Reviewed-by: Zach Anderson <zra@google.com>
Commit-Queue: Zach Anderson <zra@google.com>
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 8f5844e..4552410 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -39,6 +39,9 @@
* `dart:io`
* Adds `HttpClient.connectionTimeout`.
+ * Adds `{Socket,RawSocket,SecureSocket}.startConnect`. These return a
+ `ConnectionTask`, which can be used to cancel an in-flight connection
+ attempt.
## 2.0.0-dev.66.0
diff --git a/pkg/dev_compiler/tool/input_sdk/patch/io_patch.dart b/pkg/dev_compiler/tool/input_sdk/patch/io_patch.dart
index 20cb5ff..b05a024 100644
--- a/pkg/dev_compiler/tool/input_sdk/patch/io_patch.dart
+++ b/pkg/dev_compiler/tool/input_sdk/patch/io_patch.dart
@@ -461,6 +461,12 @@
{sourceAddress, Duration timeout}) {
throw UnsupportedError("RawSocket constructor");
}
+
+ @patch
+ static Future<ConnectionTask<RawSocket>> startConnect(host, int port,
+ {sourceAddress}) {
+ throw UnsupportedError("RawSocket constructor");
+ }
}
@patch
@@ -470,6 +476,12 @@
{sourceAddress, Duration timeout}) {
throw UnsupportedError("Socket constructor");
}
+
+ @patch
+ static Future<ConnectionTask<Socket>> _startConnect(host, int port,
+ {sourceAddress}) {
+ throw UnsupportedError("Socket constructor");
+ }
}
@patch
diff --git a/runtime/bin/eventhandler_android.cc b/runtime/bin/eventhandler_android.cc
index d4d49aa..07e6c04 100644
--- a/runtime/bin/eventhandler_android.cc
+++ b/runtime/bin/eventhandler_android.cc
@@ -214,7 +214,9 @@
// message.
intptr_t old_mask = di->Mask();
Dart_Port port = msg[i].dart_port;
- di->RemovePort(port);
+ if (port != ILLEGAL_PORT) {
+ di->RemovePort(port);
+ }
intptr_t new_mask = di->Mask();
UpdateEpollInstance(old_mask, di);
diff --git a/runtime/bin/eventhandler_fuchsia.cc b/runtime/bin/eventhandler_fuchsia.cc
index b2dd6e2..6a7ae35 100644
--- a/runtime/bin/eventhandler_fuchsia.cc
+++ b/runtime/bin/eventhandler_fuchsia.cc
@@ -385,7 +385,9 @@
// message.
const intptr_t old_mask = di->Mask();
Dart_Port port = msg->dart_port;
- di->RemovePort(port);
+ if (port != ILLEGAL_PORT) {
+ di->RemovePort(port);
+ }
const intptr_t new_mask = di->Mask();
UpdatePort(old_mask, di);
diff --git a/runtime/bin/eventhandler_linux.cc b/runtime/bin/eventhandler_linux.cc
index 9f2760b..2b20572 100644
--- a/runtime/bin/eventhandler_linux.cc
+++ b/runtime/bin/eventhandler_linux.cc
@@ -223,11 +223,14 @@
// message.
intptr_t old_mask = di->Mask();
Dart_Port port = msg[i].dart_port;
- di->RemovePort(port);
+ if (port != ILLEGAL_PORT) {
+ di->RemovePort(port);
+ }
intptr_t new_mask = di->Mask();
UpdateEpollInstance(old_mask, di);
intptr_t fd = di->fd();
+ ASSERT(fd == socket->fd());
if (di->IsListeningSocket()) {
// We only close the socket file descriptor from the operating
// system if there are no other dart socket objects which
diff --git a/runtime/bin/eventhandler_macos.cc b/runtime/bin/eventhandler_macos.cc
index f271969..974ab8a 100644
--- a/runtime/bin/eventhandler_macos.cc
+++ b/runtime/bin/eventhandler_macos.cc
@@ -230,7 +230,9 @@
// message.
intptr_t old_mask = di->Mask();
Dart_Port port = msg[i].dart_port;
- di->RemovePort(port);
+ if (port != ILLEGAL_PORT) {
+ di->RemovePort(port);
+ }
intptr_t new_mask = di->Mask();
UpdateKQueueInstance(old_mask, di);
diff --git a/runtime/bin/eventhandler_win.cc b/runtime/bin/eventhandler_win.cc
index 7388863..7dc6561 100644
--- a/runtime/bin/eventhandler_win.cc
+++ b/runtime/bin/eventhandler_win.cc
@@ -1056,7 +1056,9 @@
listen_socket->SetPortAndMask(msg->dart_port, events);
TryDispatchingPendingAccepts(listen_socket);
} else if (IS_COMMAND(msg->data, kCloseCommand)) {
- listen_socket->RemovePort(msg->dart_port);
+ if (msg->dart_port != ILLEGAL_PORT) {
+ listen_socket->RemovePort(msg->dart_port);
+ }
// We only close the socket file descriptor from the operating
// system if there are no other dart socket objects which
diff --git a/runtime/bin/socket_patch.dart b/runtime/bin/socket_patch.dart
index 5f5d220..0d66baf 100644
--- a/runtime/bin/socket_patch.dart
+++ b/runtime/bin/socket_patch.dart
@@ -20,6 +20,12 @@
{sourceAddress, Duration timeout}) {
return _RawSocket.connect(host, port, sourceAddress, timeout);
}
+
+ @patch
+ static Future<ConnectionTask<RawSocket>> startConnect(host, int port,
+ {sourceAddress}) {
+ return _RawSocket.startConnect(host, port, sourceAddress);
+ }
}
@patch
@@ -350,8 +356,8 @@
static Future<List<InternetAddress>> lookup(String host,
{InternetAddressType type: InternetAddressType.any}) {
- return _IOService._dispatch(
- _IOService.socketLookup, [host, type._value]).then((response) {
+ return _IOService._dispatch(_IOService.socketLookup, [host, type._value])
+ .then((response) {
if (isErrorResponse(response)) {
throw createError(response, "Failed host lookup: '$host'");
} else {
@@ -378,8 +384,8 @@
{bool includeLoopback: false,
bool includeLinkLocal: false,
InternetAddressType type: InternetAddressType.any}) {
- return _IOService._dispatch(
- _IOService.socketListInterfaces, [type._value]).then((response) {
+ return _IOService._dispatch(_IOService.socketListInterfaces, [type._value])
+ .then((response) {
if (isErrorResponse(response)) {
throw createError(response, "Failed listing interfaces");
} else {
@@ -400,8 +406,8 @@
});
}
- static Future<_NativeSocket> connect(
- host, int port, sourceAddress, Duration timeout) {
+ static Future<ConnectionTask<_NativeSocket>> startConnect(
+ host, int port, sourceAddress) {
_throwOnBadPort(port);
if (sourceAddress != null && sourceAddress is! _InternetAddress) {
if (sourceAddress is String) {
@@ -421,29 +427,11 @@
var it = (addresses as List<InternetAddress>).iterator;
var error = null;
var connecting = new HashMap();
- Timer timeoutTimer = null;
- void timeoutHandler() {
- connecting.forEach((s, t) {
- t.cancel();
- s.close();
- s.setHandlers();
- s.setListening(read: false, write: false);
- error = createError(
- null, "Connection timed out, host: ${host}, port: ${port}");
- completer.completeError(error);
- });
- }
void connectNext() {
- if ((timeout != null) && (timeoutTimer == null)) {
- timeoutTimer = new Timer(timeout, timeoutHandler);
- }
if (!it.moveNext()) {
if (connecting.isEmpty) {
assert(error != null);
- if (timeoutTimer != null) {
- timeoutTimer.cancel();
- }
completer.completeError(error);
}
return;
@@ -471,11 +459,13 @@
}
connectNext();
} else {
- // Query the local port, for error messages.
+ // Query the local port for error messages.
try {
socket.port;
} catch (e) {
- error = createError(e, "Connection failed", address, port);
+ if (error == null) {
+ error = createError(e, "Connection failed", address, port);
+ }
connectNext();
}
// Set up timer for when we should retry the next address
@@ -490,9 +480,6 @@
// indicate that the socket is fully connected.
socket.setHandlers(write: () {
timer.cancel();
- if (timeoutTimer != null) {
- timeoutTimer.cancel();
- }
socket.setListening(read: false, write: false);
completer.complete(socket);
connecting.remove(socket);
@@ -502,6 +489,7 @@
s.setHandlers();
s.setListening(read: false, write: false);
});
+ connecting.clear();
}, error: (e) {
timer.cancel();
socket.close();
@@ -514,8 +502,42 @@
}
}
+ void onCancel() {
+ connecting.forEach((s, t) {
+ t.cancel();
+ s.close();
+ s.setHandlers();
+ s.setListening(read: false, write: false);
+ if (error == null) {
+ error = createError(null,
+ "Connection attempt cancelled, host: ${host}, port: ${port}");
+ }
+ });
+ connecting.clear();
+ if (!completer.isCompleted) {
+ completer.completeError(error);
+ }
+ }
+
connectNext();
- return completer.future;
+ return new ConnectionTask<_NativeSocket>._(
+ socket: completer.future, onCancel: onCancel);
+ });
+ }
+
+ static Future<_NativeSocket> connect(
+ host, int port, sourceAddress, Duration timeout) {
+ return startConnect(host, port, sourceAddress)
+ .then((ConnectionTask<_NativeSocket> task) {
+ Future<_NativeSocket> socketFuture = task.socket;
+ if (timeout != null) {
+ socketFuture = socketFuture.timeout(timeout, onTimeout: () {
+ task.cancel();
+ throw createError(
+ null, "Connection timed out, host: ${host}, port: ${port}");
+ });
+ }
+ return socketFuture;
});
}
@@ -1134,8 +1156,7 @@
address, int port, int backlog, bool v6Only, bool shared) {
_throwOnBadPort(port);
if (backlog < 0) throw new ArgumentError("Invalid backlog $backlog");
- return _NativeSocket
- .bind(address, port, backlog, v6Only, shared)
+ return _NativeSocket.bind(address, port, backlog, v6Only, shared)
.then((socket) => new _RawServerSocket(socket, v6Only));
}
@@ -1228,11 +1249,21 @@
static Future<RawSocket> connect(
host, int port, sourceAddress, Duration timeout) {
- return _NativeSocket
- .connect(host, port, sourceAddress, timeout)
+ return _NativeSocket.connect(host, port, sourceAddress, timeout)
.then((socket) => new _RawSocket(socket));
}
+ static Future<ConnectionTask<_RawSocket>> startConnect(
+ host, int port, sourceAddress) {
+ return _NativeSocket.startConnect(host, port, sourceAddress)
+ .then((ConnectionTask<_NativeSocket> nativeTask) {
+ final Future<_RawSocket> raw = nativeTask.socket
+ .then((_NativeSocket nativeSocket) => new _RawSocket(nativeSocket));
+ return new ConnectionTask<_RawSocket>._(
+ socket: raw, onCancel: nativeTask._onCancel);
+ });
+ }
+
_RawSocket(this._socket) {
var zone = Zone.current;
_controller = new StreamController(
@@ -1381,8 +1412,7 @@
static Future<_ServerSocket> bind(
address, int port, int backlog, bool v6Only, bool shared) {
- return _RawServerSocket
- .bind(address, port, backlog, v6Only, shared)
+ return _RawServerSocket.bind(address, port, backlog, v6Only, shared)
.then((socket) => new _ServerSocket(socket));
}
@@ -1414,10 +1444,22 @@
@patch
static Future<Socket> _connect(host, int port,
{sourceAddress, Duration timeout}) {
- return RawSocket
- .connect(host, port, sourceAddress: sourceAddress, timeout: timeout)
+ return RawSocket.connect(host, port,
+ sourceAddress: sourceAddress, timeout: timeout)
.then((socket) => new _Socket(socket));
}
+
+ @patch
+ static Future<ConnectionTask<Socket>> _startConnect(host, int port,
+ {sourceAddress}) {
+ return RawSocket.startConnect(host, port, sourceAddress: sourceAddress)
+ .then((rawTask) {
+ Future<Socket> socket =
+ rawTask.socket.then((rawSocket) => new _Socket(rawSocket));
+ return new ConnectionTask<Socket>._(
+ socket: socket, onCancel: rawTask._onCancel);
+ });
+ }
}
class _SocketStreamConsumer extends StreamConsumer<List<int>> {
@@ -1774,8 +1816,7 @@
static Future<RawDatagramSocket> bind(host, int port, bool reuseAddress) {
_throwOnBadPort(port);
- return _NativeSocket
- .bindDatagram(host, port, reuseAddress)
+ return _NativeSocket.bindDatagram(host, port, reuseAddress)
.then((socket) => new _RawDatagramSocket(socket));
}
diff --git a/sdk/lib/_http/http_impl.dart b/sdk/lib/_http/http_impl.dart
index 528c2bfb..983a4fb 100644
--- a/sdk/lib/_http/http_impl.dart
+++ b/sdk/lib/_http/http_impl.dart
@@ -1039,8 +1039,9 @@
Future<HttpClientResponse> get done {
if (_response == null) {
- _response = Future.wait([_responseCompleter.future, super.done],
- eagerError: true).then((list) => list[0]);
+ _response =
+ Future.wait([_responseCompleter.future, super.done], eagerError: true)
+ .then((list) => list[0]);
}
return _response;
}
@@ -1254,7 +1255,8 @@
outbound.headers.chunkedTransferEncoding) {
List acceptEncodings =
response._httpRequest.headers[HttpHeaders.acceptEncodingHeader];
- List contentEncoding = outbound.headers[HttpHeaders.contentEncodingHeader];
+ List contentEncoding =
+ outbound.headers[HttpHeaders.contentEncodingHeader];
if (acceptEncodings != null &&
acceptEncodings
.expand((list) => list.split(","))
@@ -1657,8 +1659,8 @@
if (proxy.isAuthenticated) {
// If the proxy configuration contains user information use that
// for proxy basic authorization.
- String auth = _CryptoUtils
- .bytesToBase64(utf8.encode("${proxy.username}:${proxy.password}"));
+ String auth = _CryptoUtils.bytesToBase64(
+ utf8.encode("${proxy.username}:${proxy.password}"));
request.headers.set(HttpHeaders.proxyAuthorizationHeader, "Basic $auth");
} else if (!proxy.isDirect && _httpClient._proxyCredentials.length > 0) {
proxyCreds = _httpClient._findProxyCredentials(proxy);
@@ -1766,7 +1768,8 @@
void close() {
closed = true;
_httpClient._connectionClosed(this);
- _streamFuture.timeout(_httpClient.idleTimeout)
+ _streamFuture
+ .timeout(_httpClient.idleTimeout)
.then((_) => _socket.destroy());
}
@@ -1777,8 +1780,8 @@
if (proxy.isAuthenticated) {
// If the proxy configuration contains user information use that
// for proxy basic authorization.
- String auth = _CryptoUtils
- .bytesToBase64(utf8.encode("${proxy.username}:${proxy.password}"));
+ String auth = _CryptoUtils.bytesToBase64(
+ utf8.encode("${proxy.username}:${proxy.password}"));
request.headers.set(HttpHeaders.proxyAuthorizationHeader, "Basic $auth");
}
return request.close().then((response) {
@@ -1837,6 +1840,7 @@
final SecurityContext context;
final Set<_HttpClientConnection> _idle = new HashSet();
final Set<_HttpClientConnection> _active = new HashSet();
+ final Set<ConnectionTask> _socketTasks = new HashSet();
final Queue _pending = new ListQueue();
int _connecting = 0;
@@ -1884,6 +1888,14 @@
}
void close(bool force) {
+ // Always cancel pending socket connections.
+ for (var t in _socketTasks.toList()) {
+ // Make sure the socket is destroyed if the ConnectionTask is cancelled.
+ t.socket.then((s) {
+ s.destroy();
+ }, onError: (e) {});
+ t.cancel();
+ }
if (force) {
for (var c in _idle.toList()) {
c.destroy();
@@ -1920,35 +1932,48 @@
return currentBadCertificateCallback(certificate, uriHost, uriPort);
}
- Future socketFuture = (isSecure && proxy.isDirect
- ? SecureSocket.connect(host, port,
- context: context, onBadCertificate: callback,
- timeout: client.connectionTimeout)
- : Socket.connect(host, port, timeout: client.connectionTimeout));
+ Future<ConnectionTask> connectionTask = (isSecure && proxy.isDirect
+ ? SecureSocket.startConnect(host, port,
+ context: context, onBadCertificate: callback)
+ : Socket.startConnect(host, port));
_connecting++;
- return socketFuture.then((socket) {
- _connecting--;
- socket.setOption(SocketOption.tcpNoDelay, true);
- var connection =
- new _HttpClientConnection(key, socket, client, false, context);
- if (isSecure && !proxy.isDirect) {
- connection._dispose = true;
- return connection
- .createProxyTunnel(uriHost, uriPort, proxy, callback)
- .then((tunnel) {
- client
- ._getConnectionTarget(uriHost, uriPort, true)
- .addNewActive(tunnel);
- return new _ConnectionInfo(tunnel, proxy);
+ return connectionTask.then((ConnectionTask task) {
+ _socketTasks.add(task);
+ Future socketFuture = task.socket;
+ if (client.connectionTimeout != null) {
+ socketFuture =
+ socketFuture.timeout(client.connectionTimeout, onTimeout: () {
+ _socketTasks.remove(task);
+ task.cancel();
});
- } else {
- addNewActive(connection);
- return new _ConnectionInfo(connection, proxy);
}
- }, onError: (error) {
- _connecting--;
- _checkPending();
- throw error;
+ return socketFuture.then((socket) {
+ _connecting--;
+ socket.setOption(SocketOption.tcpNoDelay, true);
+ var connection =
+ new _HttpClientConnection(key, socket, client, false, context);
+ if (isSecure && !proxy.isDirect) {
+ connection._dispose = true;
+ return connection
+ .createProxyTunnel(uriHost, uriPort, proxy, callback)
+ .then((tunnel) {
+ client
+ ._getConnectionTarget(uriHost, uriPort, true)
+ .addNewActive(tunnel);
+ _socketTasks.remove(task);
+ return new _ConnectionInfo(tunnel, proxy);
+ });
+ } else {
+ addNewActive(connection);
+ _socketTasks.remove(task);
+ return new _ConnectionInfo(connection, proxy);
+ }
+ }, onError: (error) {
+ _connecting--;
+ _socketTasks.remove(task);
+ _checkPending();
+ throw error;
+ });
});
}
}
@@ -2103,9 +2128,8 @@
bool isSecure = (uri.scheme == "https");
int port = uri.port;
if (port == 0) {
- port = isSecure
- ? HttpClient.defaultHttpsPort
- : HttpClient.defaultHttpPort;
+ port =
+ isSecure ? HttpClient.defaultHttpsPort : HttpClient.defaultHttpPort;
}
// Check to see if a proxy server should be used for this connection.
var proxyConf = const _ProxyConfiguration.direct();
@@ -2502,8 +2526,8 @@
static Future<HttpServer> bind(
address, int port, int backlog, bool v6Only, bool shared) {
- return ServerSocket
- .bind(address, port, backlog: backlog, v6Only: v6Only, shared: shared)
+ return ServerSocket.bind(address, port,
+ backlog: backlog, v6Only: v6Only, shared: shared)
.then<HttpServer>((socket) {
return new _HttpServer._(socket, true);
});
@@ -2517,8 +2541,7 @@
bool v6Only,
bool requestClientCertificate,
bool shared) {
- return SecureServerSocket
- .bind(address, port, context,
+ return SecureServerSocket.bind(address, port, context,
backlog: backlog,
v6Only: v6Only,
requestClientCertificate: requestClientCertificate,
@@ -2977,8 +3000,8 @@
if (scheme != null && credentials.scheme != scheme) return false;
if (uri.host != this.uri.host) return false;
int thisPort =
- this.uri.port == 0 ? HttpClient.defaultHttpPort: this.uri.port;
- int otherPort = uri.port == 0 ? HttpClient.defaultHttpPort: uri.port;
+ this.uri.port == 0 ? HttpClient.defaultHttpPort : this.uri.port;
+ int otherPort = uri.port == 0 ? HttpClient.defaultHttpPort : uri.port;
if (otherPort != thisPort) return false;
return uri.path.startsWith(this.uri.path);
}
@@ -3116,14 +3139,14 @@
}
void authorize(_Credentials credentials, HttpClientRequest request) {
- request.headers
- .set(HttpHeaders.authorizationHeader, authorization(credentials, request));
+ request.headers.set(
+ HttpHeaders.authorizationHeader, authorization(credentials, request));
}
void authorizeProxy(
_ProxyCredentials credentials, HttpClientRequest request) {
- request.headers.set(
- HttpHeaders.proxyAuthorizationHeader, authorization(credentials, request));
+ request.headers.set(HttpHeaders.proxyAuthorizationHeader,
+ authorization(credentials, request));
}
}
diff --git a/sdk/lib/_internal/js_runtime/lib/io_patch.dart b/sdk/lib/_internal/js_runtime/lib/io_patch.dart
index 9cfb7ef..8a4c7bb 100644
--- a/sdk/lib/_internal/js_runtime/lib/io_patch.dart
+++ b/sdk/lib/_internal/js_runtime/lib/io_patch.dart
@@ -461,6 +461,12 @@
{sourceAddress, Duration timeout}) {
throw new UnsupportedError("RawSocket constructor");
}
+
+ @patch
+ static Future<ConnectionTask<RawSocket>> startConnect(host, int port,
+ {sourceAddress}) {
+ throw new UnsupportedError("RawSocket constructor");
+ }
}
@patch
@@ -470,6 +476,12 @@
{sourceAddress, Duration timeout}) {
throw new UnsupportedError("Socket constructor");
}
+
+ @patch
+ static Future<ConnectionTask<Socket>> _startConnect(host, int port,
+ {sourceAddress}) {
+ throw new UnsupportedError("Socket constructor");
+ }
}
@patch
diff --git a/sdk/lib/io/overrides.dart b/sdk/lib/io/overrides.dart
index f68d417..1a71881 100644
--- a/sdk/lib/io/overrides.dart
+++ b/sdk/lib/io/overrides.dart
@@ -83,6 +83,9 @@
Future<Socket> Function(dynamic, int,
{dynamic sourceAddress, Duration timeout})
socketConnect,
+ Future<ConnectionTask<Socket>> Function(dynamic, int,
+ {dynamic sourceAddress})
+ socketStartConnect,
// Optional Zone parameters
ZoneSpecification zoneSpecification,
@@ -116,6 +119,7 @@
// Socket
socketConnect,
+ socketStartConnect,
);
return _asyncRunZoned<R>(body,
zoneValues: {_ioOverridesToken: overrides},
@@ -254,12 +258,22 @@
/// Asynchronously returns a [Socket] connected to the given host and port.
///
/// When this override is installed, this functions overrides the behavior of
- /// `Socet.connect(...)`.
+ /// `Socket.connect(...)`.
Future<Socket> socketConnect(host, int port,
{sourceAddress, Duration timeout}) {
return Socket._connect(host, port,
sourceAddress: sourceAddress, timeout: timeout);
}
+
+ /// Asynchronously returns a [ConnectionTask] that connects to the given host
+ /// and port when successful.
+ ///
+ /// When this override is installed, this functions overrides the behavior of
+ /// `Socket.startConnect(...)`.
+ Future<ConnectionTask<Socket>> socketStartConnect(host, int port,
+ {sourceAddress}) {
+ return Socket._startConnect(host, port, sourceAddress: sourceAddress);
+ }
}
class _IOOverridesScope extends IOOverrides {
@@ -294,6 +308,8 @@
// Socket
Future<Socket> Function(dynamic, int,
{dynamic sourceAddress, Duration timeout}) _socketConnect;
+ Future<ConnectionTask<Socket>> Function(dynamic, int, {dynamic sourceAddress})
+ _socketStartConnect;
_IOOverridesScope(
// Directory
@@ -324,6 +340,7 @@
// Socket
this._socketConnect,
+ this._socketStartConnect,
);
// Directory
@@ -448,4 +465,17 @@
return super.socketConnect(host, port,
sourceAddress: sourceAddress, timeout: timeout);
}
+
+ @override
+ Future<ConnectionTask<Socket>> socketStartConnect(host, int port,
+ {sourceAddress}) {
+ if (_socketStartConnect != null) {
+ return _socketStartConnect(host, port, sourceAddress: sourceAddress);
+ }
+ if (_previous != null) {
+ return _previous.socketStartConnect(host, port,
+ sourceAddress: sourceAddress);
+ }
+ return super.socketStartConnect(host, port, sourceAddress: sourceAddress);
+ }
}
diff --git a/sdk/lib/io/secure_socket.dart b/sdk/lib/io/secure_socket.dart
index 52e41d5..4b3f28f 100644
--- a/sdk/lib/io/secure_socket.dart
+++ b/sdk/lib/io/secure_socket.dart
@@ -46,8 +46,7 @@
bool onBadCertificate(X509Certificate certificate),
List<String> supportedProtocols,
Duration timeout}) {
- return RawSecureSocket
- .connect(host, port,
+ return RawSecureSocket.connect(host, port,
context: context,
onBadCertificate: onBadCertificate,
supportedProtocols: supportedProtocols,
@@ -55,6 +54,25 @@
.then((rawSocket) => new SecureSocket._(rawSocket));
}
+ /// Like [connect], but returns a [Future] that completes with a
+ /// [ConnectionTask] that can be cancelled if the [SecureSocket] is no
+ /// longer needed.
+ static Future<ConnectionTask<SecureSocket>> startConnect(host, int port,
+ {SecurityContext context,
+ bool onBadCertificate(X509Certificate certificate),
+ List<String> supportedProtocols}) {
+ return RawSecureSocket.startConnect(host, port,
+ context: context,
+ onBadCertificate: onBadCertificate,
+ supportedProtocols: supportedProtocols)
+ .then((rawState) {
+ Future<SecureSocket> socket =
+ rawState.socket.then((rawSocket) => new SecureSocket._(rawSocket));
+ return new ConnectionTask<SecureSocket>._(
+ socket: socket, onCancel: rawState._onCancel);
+ });
+ }
+
/**
* Takes an already connected [socket] and starts client side TLS
* handshake to make the communication secure. When the returned
@@ -215,6 +233,26 @@
});
}
+ /// Like [connect], but returns a [Future] that completes with a
+ /// [ConnectionTask] that can be cancelled if the [RawSecureSocket] is no
+ /// longer needed.
+ static Future<ConnectionTask<RawSecureSocket>> startConnect(host, int port,
+ {SecurityContext context,
+ bool onBadCertificate(X509Certificate certificate),
+ List<String> supportedProtocols}) {
+ return RawSocket.startConnect(host, port)
+ .then((ConnectionTask<RawSocket> rawState) {
+ Future<RawSecureSocket> socket = rawState.socket.then((rawSocket) {
+ return secure(rawSocket,
+ context: context,
+ onBadCertificate: onBadCertificate,
+ supportedProtocols: supportedProtocols);
+ });
+ return new ConnectionTask<RawSecureSocket>._(
+ socket: socket, onCancel: rawState._onCancel);
+ });
+ }
+
/**
* Takes an already connected [socket] and starts client side TLS
* handshake to make the communication secure. When the returned
@@ -990,8 +1028,7 @@
args[2 * i + 3] = bufs[i].end;
}
- return _IOService
- ._dispatch(_IOService.sslProcessFilter, args)
+ return _IOService._dispatch(_IOService.sslProcessFilter, args)
.then((response) {
if (response.length == 2) {
if (wasInHandshake) {
diff --git a/sdk/lib/io/socket.dart b/sdk/lib/io/socket.dart
index 0ab315e..1d8a263 100644
--- a/sdk/lib/io/socket.dart
+++ b/sdk/lib/io/socket.dart
@@ -426,6 +426,32 @@
}
}
+/// Returned by the `startConnect` methods on client-side socket types `S`,
+/// `ConnectionTask<S>` allows cancelling an attempt to connect to a host.
+class ConnectionTask<S> {
+ /// A `Future` that completes with value that `S.connect()` would return
+ /// unless [cancel] is called on this [ConnectionTask].
+ ///
+ /// If [cancel] is called, the `Future` completes with a [SocketException]
+ /// error whose message indicates that the connection attempt was cancelled.
+ final Future<S> socket;
+ final void Function() _onCancel;
+
+ ConnectionTask._({Future<S> socket, void Function() onCancel})
+ : assert(socket != null),
+ assert(onCancel != null),
+ this.socket = socket,
+ this._onCancel = onCancel;
+
+ /// Cancels the connection attempt.
+ ///
+ /// This also causes the [socket] `Future` to complete with a
+ /// [SocketException] error.
+ void cancel() {
+ _onCancel();
+ }
+}
+
/**
* The [RawSocket] is a low-level interface to a socket, exposing the raw
* events signaled by the system. It's a [Stream] of [RawSocketEvent]s.
@@ -470,6 +496,12 @@
external static Future<RawSocket> connect(host, int port,
{sourceAddress, Duration timeout});
+ /// Like [connect], but returns a [Future] that completes with a
+ /// [ConnectionTask] that can be cancelled if the [RawSocket] is no
+ /// longer needed.
+ external static Future<ConnectionTask<RawSocket>> startConnect(host, int port,
+ {sourceAddress});
+
/**
* Returns the number of received and non-read bytes in the socket that
* can be read.
@@ -583,9 +615,25 @@
sourceAddress: sourceAddress, timeout: timeout);
}
+ /// Like [connect], but returns a [Future] that completes with a
+ /// [ConnectionTask] that can be cancelled if the [Socket] is no
+ /// longer needed.
+ static Future<ConnectionTask<Socket>> startConnect(host, int port,
+ {sourceAddress}) {
+ final IOOverrides overrides = IOOverrides.current;
+ if (overrides == null) {
+ return Socket._startConnect(host, port, sourceAddress: sourceAddress);
+ }
+ return overrides.socketStartConnect(host, port,
+ sourceAddress: sourceAddress);
+ }
+
external static Future<Socket> _connect(host, int port,
{sourceAddress, Duration timeout});
+ external static Future<ConnectionTask<Socket>> _startConnect(host, int port,
+ {sourceAddress});
+
/**
* Destroy the socket in both directions. Calling [destroy] will make the
* send a close event on the stream and will no longer react on data being
diff --git a/tests/standalone/io/io_override_test.dart b/tests/standalone/io/io_override_test.dart
index 012e7ff..1e5983a 100644
--- a/tests/standalone/io/io_override_test.dart
+++ b/tests/standalone/io/io_override_test.dart
@@ -162,6 +162,11 @@
return null;
}
+Future<ConnectionTask<Socket>> socketStartConnect(host, int port,
+ {sourceAddress}) {
+ return null;
+}
+
Future<Null> ioOverridesRunTest() async {
Future<Null> f = IOOverrides.runZoned(
() async {
@@ -181,6 +186,7 @@
Expect.isNull(new Directory("directory").watch());
Expect.isTrue(new Link("link") is LinkMock);
Expect.isNull(Socket.connect(null, 0));
+ Expect.isNull(Socket.startConnect(null, 0));
},
createDirectory: DirectoryMock.createDirectory,
getCurrentDirectory: DirectoryMock.getCurrent,
@@ -197,6 +203,7 @@
fsWatchIsSupported: FileSystemWatcherMock.watchSupported,
createLink: LinkMock.createLink,
socketConnect: socketConnect,
+ socketStartConnect: socketStartConnect,
);
Expect.isFalse(new Directory("directory") is DirectoryMock);
Expect.isTrue(new Directory("directory") is Directory);
diff --git a/tests/standalone_2/io/http_shutdown_test.dart b/tests/standalone_2/io/http_shutdown_test.dart
index a3410db..c04a537 100644
--- a/tests/standalone_2/io/http_shutdown_test.dart
+++ b/tests/standalone_2/io/http_shutdown_test.dart
@@ -160,7 +160,8 @@
return request.close();
})
.then((response) {})
- .catchError((e) {}, test: (e) => e is HttpException);
+ .catchError((e) {},
+ test: (e) => e is HttpException || e is SocketException);
}
bool clientClosed = false;
new Timer.periodic(new Duration(milliseconds: 100), (timer) {
diff --git a/tests/standalone_2/io/raw_socket_test.dart b/tests/standalone_2/io/raw_socket_test.dart
index f5506cc..d46d1d6 100644
--- a/tests/standalone_2/io/raw_socket_test.dart
+++ b/tests/standalone_2/io/raw_socket_test.dart
@@ -74,6 +74,21 @@
});
}
+void testCancelConnect() {
+ asyncStart();
+ RawSocket.startConnect(InternetAddress.loopbackIPv4, 0)
+ .then((ConnectionTask<RawSocket> task) {
+ task.cancel();
+ task.socket.catchError((error) {
+ Expect.isTrue(error is SocketException);
+ asyncEnd();
+ });
+ task.socket.then((s) {
+ Expect.fail("Unreachable");
+ });
+ });
+}
+
void testCloseOneEnd(String toClose) {
asyncStart();
Completer serverDone = new Completer();
@@ -467,6 +482,7 @@
testCloseOneEnd("server");
testInvalidBind();
testSimpleConnect();
+ testCancelConnect();
testServerListenAfterConnect();
testSimpleReadWrite(dropReads: false);
testSimpleReadWrite(dropReads: true);
diff --git a/tests/standalone_2/io/socket_cancel_connect_test.dart b/tests/standalone_2/io/socket_cancel_connect_test.dart
new file mode 100644
index 0000000..7a2e112
--- /dev/null
+++ b/tests/standalone_2/io/socket_cancel_connect_test.dart
@@ -0,0 +1,32 @@
+// Copyright (c) 2018, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+//
+// VMOptions=
+// VMOptions=--short_socket_read
+// VMOptions=--short_socket_write
+// VMOptions=--short_socket_read --short_socket_write
+
+import "dart:async";
+import "dart:io";
+
+import "package:async_helper/async_helper.dart";
+import "package:expect/expect.dart";
+
+void main() {
+ asyncStart();
+ Duration timeout = new Duration(milliseconds: 20);
+ Socket.startConnect("8.8.8.7", 80).then((task) {
+ task.socket.timeout(timeout, onTimeout: () {
+ task.cancel();
+ });
+ task.socket.then((socket) {
+ Expect.fail("Unexpected connection made.");
+ asyncEnd();
+ }).catchError((e) {
+ print(e);
+ Expect.isTrue(e is SocketException);
+ asyncEnd();
+ });
+ });
+}