Updates migrated from sdk repo (#67)
diff --git a/.travis.yml b/.travis.yml
index 99e217e..c1d0163 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,10 +1,10 @@
language: dart
-sudo: false
+sudo: true
before_script:
# Add an IPv6 config - see the corresponding Travis issue
# https://github.com/travis-ci/travis-ci/issues/8361
- - if [ "${TRAVIS_OS_NAME}" == "linux" ]; then
+ - if [ "${TRAVIS_OS_NAME}" == "linux" ]; then
sudo sh -c 'echo 0 > /proc/sys/net/ipv6/conf/all/disable_ipv6';
fi
diff --git a/lib/http_io.dart b/lib/http_io.dart
index 11540f7..074c9c6 100644
--- a/lib/http_io.dart
+++ b/lib/http_io.dart
@@ -10,6 +10,7 @@
import 'dart:convert';
import 'dart:io'
show
+ ConnectionTask,
BytesBuilder,
gzip,
HandshakeException,
@@ -1263,6 +1264,16 @@
/// The default value is 15 seconds.
Duration idleTimeout;
+ /// Gets and sets the connection timeout.
+ ///
+ /// When connecting to a new host exceeds this timeout, a [SocketException]
+ /// is thrown. The timeout applies only to connections initiated after the
+ /// timeout is set.
+ ///
+ /// When this is `null`, the OS default timeout is used. The default is
+ /// `null`.
+ Duration connectionTimeout;
+
/**
* Gets and sets the maximum number of live connections, to a single host.
*
diff --git a/lib/src/http_impl.dart b/lib/src/http_impl.dart
index 43b5f72..79a890b 100644
--- a/lib/src/http_impl.dart
+++ b/lib/src/http_impl.dart
@@ -1697,7 +1697,7 @@
closed = true;
_httpClient._connectionClosed(this);
_streamFuture
- // TODO(ajohnsen): Add timeout.
+ .timeout(_httpClient.idleTimeout)
.then((_) => _socket.destroy());
}
@@ -1768,6 +1768,7 @@
final SecurityContext context;
final Set<_HttpClientConnection> _idle = new HashSet();
final Set<_HttpClientConnection> _active = new HashSet();
+ final Set<ConnectionTask> _socketTasks = new HashSet();
final Queue _pending = new ListQueue();
int _connecting = 0;
@@ -1815,13 +1816,25 @@
}
void close(bool force) {
- for (var c in _idle.toList()) {
- c.close();
+ // Always cancel pending socket connections.
+ for (var t in _socketTasks.toList()) {
+ // Make sure the socket is destroyed if the ConnectionTask is cancelled.
+ t.socket.then((s) {
+ s.destroy();
+ }, onError: (e) {});
+ t.cancel();
}
if (force) {
+ for (var c in _idle.toList()) {
+ c.destroy();
+ }
for (var c in _active.toList()) {
c.destroy();
}
+ } else {
+ for (var c in _idle.toList()) {
+ c.close();
+ }
}
}
@@ -1847,34 +1860,48 @@
return currentBadCertificateCallback(certificate, uriHost, uriPort);
}
- Future<Socket> socketFuture = (isSecure && proxy.isDirect
- ? SecureSocket.connect(host, port,
+ Future<ConnectionTask> connectionTask = (isSecure && proxy.isDirect
+ ? SecureSocket.startConnect(host, port,
context: context, onBadCertificate: callback)
- : Socket.connect(host, port));
+ : Socket.startConnect(host, port));
_connecting++;
- return socketFuture.then((socket) {
- _connecting--;
- socket.setOption(SocketOption.tcpNoDelay, true);
- var connection =
- new _HttpClientConnection(key, socket, client, false, context);
- if (isSecure && !proxy.isDirect) {
- connection._dispose = true;
- return connection
- .createProxyTunnel(uriHost, uriPort, proxy, callback)
- .then((tunnel) {
- client
- ._getConnectionTarget(uriHost, uriPort, true)
- .addNewActive(tunnel);
- return new _ConnectionInfo(tunnel, proxy);
+ return connectionTask.then((ConnectionTask task) {
+ _socketTasks.add(task);
+ Future socketFuture = task.socket;
+ if (client.connectionTimeout != null) {
+ socketFuture =
+ socketFuture.timeout(client.connectionTimeout, onTimeout: () {
+ _socketTasks.remove(task);
+ task.cancel();
});
- } else {
- addNewActive(connection);
- return new _ConnectionInfo(connection, proxy);
}
- }, onError: (error) {
- _connecting--;
- _checkPending();
- throw error;
+ return socketFuture.then((socket) {
+ _connecting--;
+ socket.setOption(SocketOption.tcpNoDelay, true);
+ var connection =
+ new _HttpClientConnection(key, socket, client, false, context);
+ if (isSecure && !proxy.isDirect) {
+ connection._dispose = true;
+ return connection
+ .createProxyTunnel(uriHost, uriPort, proxy, callback)
+ .then((tunnel) {
+ client
+ ._getConnectionTarget(uriHost, uriPort, true)
+ .addNewActive(tunnel);
+ _socketTasks.remove(task);
+ return new _ConnectionInfo(tunnel, proxy);
+ });
+ } else {
+ addNewActive(connection);
+ _socketTasks.remove(task);
+ return new _ConnectionInfo(connection, proxy);
+ }
+ }, onError: (error) {
+ _connecting--;
+ _socketTasks.remove(task);
+ _checkPending();
+ throw error;
+ });
});
}
}
@@ -1897,6 +1924,8 @@
Duration get idleTimeout => _idleTimeout;
+ Duration connectionTimeout;
+
int maxConnectionsPerHost;
bool autoUncompress = true;
diff --git a/test/http_shutdown_test.dart b/test/http_shutdown_test.dart
index cb32e85..a07502e 100644
--- a/test/http_shutdown_test.dart
+++ b/test/http_shutdown_test.dart
@@ -3,6 +3,7 @@
// BSD-style license that can be found in the LICENSE file.
import "dart:async";
+import "dart:io" show SocketException;
import "package:http_io/http_io.dart";
import "package:test/test.dart";
@@ -170,7 +171,8 @@
return request.close();
})
.then((response) {})
- .catchError((e) {}, test: (e) => e is HttpException);
+ .catchError((e) {},
+ test: (e) => e is HttpException || e is SocketException);
}
bool clientClosed = false;
new Timer.periodic(new Duration(milliseconds: 100), (timer) {