blob: 8bc8a98737ced1e16127d6b81ef75a993564ba21 [file] [log] [blame]
// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
part of dart.io;
class _HttpIncoming
extends Stream<List<int>> implements StreamSink<List<int>> {
final int _transferLength;
final Completer _dataCompleter = new Completer();
Stream<List<int>> _stream;
bool fullBodyRead = false;
// Common properties.
final _HttpHeaders headers;
bool upgraded = false;
// ClientResponse properties.
int statusCode;
String reasonPhrase;
// Request properties.
String method;
Uri uri;
// The transfer length if the length of the message body as it
// appears in the message (RFC 2616 section 4.4). This can be -1 if
// the length of the massage body is not known due to transfer
// codings.
int get transferLength => _transferLength;
_HttpIncoming(_HttpHeaders this.headers,
int this._transferLength,
Stream<List<int>> this._stream) {
}
StreamSubscription<List<int>> listen(void onData(List<int> event),
{void onError(AsyncError error),
void onDone(),
bool unsubscribeOnError}) {
return _stream.listen(onData,
onError: onError,
onDone: onDone,
unsubscribeOnError: unsubscribeOnError);
}
// Is completed once all data have been received.
Future get dataDone => _dataCompleter.future;
void close() {
fullBodyRead = true;
_dataCompleter.complete();
}
}
class _HttpInboundMessage extends Stream<List<int>> {
final _HttpIncoming _incoming;
List<Cookie> _cookies;
_HttpInboundMessage(_HttpIncoming this._incoming);
List<Cookie> get cookies {
if (_cookies != null) return _cookies;
return _cookies = headers._parseCookies();
}
HttpHeaders get headers => _incoming.headers;
String get protocolVersion => headers.protocolVersion;
int get contentLength => headers.contentLength;
bool get persistentConnection => headers.persistentConnection;
}
class _HttpRequest extends _HttpInboundMessage implements HttpRequest {
final HttpResponse response;
// Lazy initialized parsed query parameters.
Map<String, String> _queryParameters;
final _HttpServer _httpServer;
final _HttpConnection _httpConnection;
HttpSession _session;
_HttpRequest(_HttpResponse this.response,
_HttpIncoming _incoming,
_HttpServer this._httpServer,
_HttpConnection this._httpConnection)
: super(_incoming) {
response.headers.persistentConnection = headers.persistentConnection;
if (_httpServer._sessionManagerInstance != null) {
// Map to session if exists.
var sessionId = cookies.reduce(null, (last, cookie) {
if (last != null) return last;
return cookie.name.toUpperCase() == _DART_SESSION_ID ?
cookie.value : null;
});
if (sessionId != null) {
_session = _httpServer._sessionManager.getSession(sessionId);
if (_session != null) {
_session._markSeen();
}
}
}
}
StreamSubscription<List<int>> listen(void onData(List<int> event),
{void onError(AsyncError error),
void onDone(),
bool unsubscribeOnError}) {
return _incoming.listen(onData,
onError: onError,
onDone: onDone,
unsubscribeOnError: unsubscribeOnError);
}
Map<String, String> get queryParameters {
if (_queryParameters == null) {
_queryParameters = _HttpUtils.splitQueryString(uri.query);
}
return _queryParameters;
}
Uri get uri => _incoming.uri;
String get method => _incoming.method;
HttpSession get session {
if (_session != null) {
// It's already mapped, use it.
return _session;
}
// Create session, store it in connection, and return.
return _session = _httpServer._sessionManager.createSession();
}
HttpConnectionInfo get connectionInfo => _httpConnection.connectionInfo;
X509Certificate get certificate {
Socket socket = _httpConnection._socket;
if (socket is SecureSocket) return socket.peerCertificate;
return null;
}
}
class _HttpClientResponse
extends _HttpInboundMessage implements HttpClientResponse {
List<RedirectInfo> get redirects => _httpRequest._responseRedirects;
// The HttpClient this response belongs to.
final _HttpClient _httpClient;
// The HttpClientRequest of this response.
final _HttpClientRequest _httpRequest;
List<Cookie> _cookies;
_HttpClientResponse(_HttpIncoming _incoming,
_HttpClientRequest this._httpRequest,
_HttpClient this._httpClient)
: super(_incoming);
int get statusCode => _incoming.statusCode;
String get reasonPhrase => _incoming.reasonPhrase;
List<Cookie> get cookies {
if (_cookies != null) return _cookies;
_cookies = new List<Cookie>();
List<String> values = headers["set-cookie"];
if (values != null) {
values.forEach((value) {
_cookies.add(new Cookie.fromSetCookieValue(value));
});
}
return _cookies;
}
bool get isRedirect {
if (_httpRequest.method == "GET" || _httpRequest.method == "HEAD") {
return statusCode == HttpStatus.MOVED_PERMANENTLY ||
statusCode == HttpStatus.FOUND ||
statusCode == HttpStatus.SEE_OTHER ||
statusCode == HttpStatus.TEMPORARY_REDIRECT;
} else if (_httpRequest.method == "POST") {
return statusCode == HttpStatus.SEE_OTHER;
}
return false;
}
Future<HttpClientResponse> redirect([String method,
Uri url,
bool followLoops]) {
if (method == null) {
// Set method as defined by RFC 2616 section 10.3.4.
if (statusCode == HttpStatus.SEE_OTHER && _httpRequest.method == "POST") {
method = "GET";
} else {
method = _httpRequest.method;
}
}
if (url == null) {
String location = headers.value(HttpHeaders.LOCATION);
if (location == null) {
throw new StateError("Response has no Location header for redirect");
}
url = Uri.parse(location);
}
if (followLoops != true) {
for (var redirect in redirects) {
if (redirect.location == url) {
return new Future.immediateError(
new RedirectLoopException(redirects));
}
}
}
return _httpClient._openUrlFromRequest(method, url, _httpRequest)
.then((request) {
request._responseRedirects.addAll(this.redirects);
request._responseRedirects.add(new _RedirectInfo(statusCode,
method,
url));
return request.close();
});
}
StreamSubscription<List<int>> listen(void onData(List<int> event),
{void onError(AsyncError error),
void onDone(),
bool unsubscribeOnError}) {
return _incoming.listen(onData,
onError: onError,
onDone: onDone,
unsubscribeOnError: unsubscribeOnError);
}
Future<Socket> detachSocket() {
_httpClient._connectionClosed(_httpRequest._httpClientConnection);
return _httpRequest._httpClientConnection.detachSocket();
}
HttpConnectionInfo get connectionInfo => _httpRequest.connectionInfo;
bool get _shouldAuthenticate {
// Only try to authenticate if there is a challenge in the response.
List<String> challenge = headers[HttpHeaders.WWW_AUTHENTICATE];
return statusCode == HttpStatus.UNAUTHORIZED &&
challenge != null && challenge.length == 1;
}
Future<HttpClientResponse> _authenticate() {
Future<HttpClientResponse> retryWithCredentials(_Credentials cr) {
if (cr != null) {
// TODO(sgjesse): Support digest.
if (cr.scheme == _AuthenticationScheme.BASIC) {
// Drain body and retry.
return reduce(null, (x, y) {}).then((_) {
return _httpClient._openUrlFromRequest(_httpRequest.method,
_httpRequest.uri,
_httpRequest)
.then((request) => request.close());
});
}
}
// Fall through to here to perform normal response handling if
// there is no sensible authorization handling.
return new Future.immediate(this);
}
List<String> challenge = headers[HttpHeaders.WWW_AUTHENTICATE];
assert(challenge != null || challenge.length == 1);
_HeaderValue header =
new _HeaderValue.fromString(challenge[0], parameterSeparator: ",");
_AuthenticationScheme scheme =
new _AuthenticationScheme.fromString(header.value);
String realm = header.parameters["realm"];
// See if any credentials are available.
_Credentials cr = _httpClient._findCredentials(_httpRequest.uri, scheme);
if (cr != null && !cr.used) {
// If credentials found prepare for retrying the request.
return retryWithCredentials(cr);
}
// Ask for more credentials if none found or the one found has
// already been used. If it has already been used it must now be
// invalid and is removed.
if (cr != null) {
_httpClient._removeCredentials(cr);
cr = null;
}
if (_httpClient._authenticate != null) {
Future authComplete = _httpClient._authenticate(_httpRequest.uri,
scheme.toString(),
realm);
return authComplete.then((credsAvailable) {
if (credsAvailable) {
cr = _httpClient._findCredentials(_httpRequest.uri, scheme);
return retryWithCredentials(cr);
} else {
// No credentials available, complete with original response.
return this;
}
});
}
// No credentials were found and the callback was not set.
return new Future.immediate(this);
}
}
class _HttpOutboundMessage<T> extends IOSink {
// Used to mark when the body should be written. This is used for HEAD
// requests and in error handling.
bool _ignoreBody = false;
_HttpOutboundMessage(String protocolVersion, _HttpOutgoing outgoing)
: super(outgoing),
_outgoing = outgoing,
headers = new _HttpHeaders(protocolVersion);
int get contentLength => headers.contentLength;
void set contentLength(int contentLength) {
headers.contentLength = contentLength;
}
bool get persistentConnection => headers.persistentConnection;
bool set persistentConnection(bool p) {
headers.persistentConnection = p;
}
Future<T> consume(Stream<List<int>> stream) {
_writeHeaders();
if (_ignoreBody) return new Future.immediate(this);
if (_chunked) {
// Transform when chunked.
stream = stream.transform(new _ChunkedTransformer());
}
return super.consume(stream).then((_) => this);
}
void add(List<int> data) {
_writeHeaders();
if (_ignoreBody) return;
if (_chunked) {
_ChunkedTransformer._addChunk(data, super.add);
} else {
super.add(data);
}
}
void close() {
if (!_headersWritten && !_ignoreBody && headers.chunkedTransferEncoding) {
// If no body was written, _ignoreBody is false (it's not a HEAD
// request) and the content-length is unspecified, set contentLength to 0.
headers.contentLength = 0;
}
_writeHeaders();
if (!_ignoreBody) {
if (_chunked) {
_ChunkedTransformer._addChunk([], super.add);
}
}
super.close();
}
void _writeHeaders() {
if (_headersWritten) return;
bool _tmpIgnoreBody = _ignoreBody;
_ignoreBody = false;
_headersWritten = true;
_writeHeader();
_ignoreBody = _tmpIgnoreBody;
if (_ignoreBody) {
super.close();
return;
}
_chunked = headers.chunkedTransferEncoding;
if (!_chunked) {
_outgoing.setTransferLength(headers.contentLength);
}
}
void _writeHeader(); // TODO(ajohnsen): Better name.
final _HttpHeaders headers;
final _HttpOutgoing _outgoing;
bool _headersWritten = false;
bool _chunked = false;
}
class _HttpResponse extends _HttpOutboundMessage<HttpResponse>
implements HttpResponse {
int statusCode = 200;
String _reasonPhrase;
List<Cookie> _cookies;
_HttpRequest _httpRequest;
_HttpResponse(String protocolVersion,
_HttpOutgoing _outgoing)
: super(protocolVersion, _outgoing);
List<Cookie> get cookies {
if (_cookies == null) _cookies = new List<Cookie>();
return _cookies;
}
String get reasonPhrase => _findReasonPhrase(statusCode);
void set reasonPhrase(String reasonPhrase) {
if (_headersWritten) throw new StateError("Header already sent");
_reasonPhrase = reasonPhrase;
}
Future<Socket> detachSocket() {
if (_headersWritten) throw new StateError("Headers already sent");
_writeHeaders();
var future = _httpRequest._httpConnection.detachSocket();
// Close connection so the socket is 'free'.
close();
return future;
}
HttpConnectionInfo get connectionInfo => _httpRequest.connectionInfo;
void _writeHeader() {
writeSP() => add([_CharCode.SP]);
writeCRLF() => add([_CharCode.CR, _CharCode.LF]);
// Write status line.
if (headers.protocolVersion == "1.1") {
add(_Const.HTTP11);
} else {
add(_Const.HTTP10);
}
writeSP();
addString(statusCode.toString());
writeSP();
addString(reasonPhrase);
writeCRLF();
var session = _httpRequest._session;
if (session != null && !session._destroyed) {
// Mark as not new.
session._isNew = false;
// Make sure we only send the current session id.
bool found = false;
for (int i = 0; i < cookies.length; i++) {
if (cookies[i].name.toUpperCase() == _DART_SESSION_ID) {
cookies[i].value = session.id;
cookies[i].httpOnly = true;
found = true;
break;
}
}
if (!found) {
cookies.add(new Cookie(_DART_SESSION_ID, session.id)..httpOnly = true);
}
}
// Add all the cookies set to the headers.
if (_cookies != null) {
_cookies.forEach((cookie) {
headers.add("set-cookie", cookie);
});
}
headers._finalize();
// Write headers.
headers._write(this);
writeCRLF();
}
String _findReasonPhrase(int statusCode) {
if (_reasonPhrase != null) {
return _reasonPhrase;
}
switch (statusCode) {
case HttpStatus.CONTINUE: return "Continue";
case HttpStatus.SWITCHING_PROTOCOLS: return "Switching Protocols";
case HttpStatus.OK: return "OK";
case HttpStatus.CREATED: return "Created";
case HttpStatus.ACCEPTED: return "Accepted";
case HttpStatus.NON_AUTHORITATIVE_INFORMATION:
return "Non-Authoritative Information";
case HttpStatus.NO_CONTENT: return "No Content";
case HttpStatus.RESET_CONTENT: return "Reset Content";
case HttpStatus.PARTIAL_CONTENT: return "Partial Content";
case HttpStatus.MULTIPLE_CHOICES: return "Multiple Choices";
case HttpStatus.MOVED_PERMANENTLY: return "Moved Permanently";
case HttpStatus.FOUND: return "Found";
case HttpStatus.SEE_OTHER: return "See Other";
case HttpStatus.NOT_MODIFIED: return "Not Modified";
case HttpStatus.USE_PROXY: return "Use Proxy";
case HttpStatus.TEMPORARY_REDIRECT: return "Temporary Redirect";
case HttpStatus.BAD_REQUEST: return "Bad Request";
case HttpStatus.UNAUTHORIZED: return "Unauthorized";
case HttpStatus.PAYMENT_REQUIRED: return "Payment Required";
case HttpStatus.FORBIDDEN: return "Forbidden";
case HttpStatus.NOT_FOUND: return "Not Found";
case HttpStatus.METHOD_NOT_ALLOWED: return "Method Not Allowed";
case HttpStatus.NOT_ACCEPTABLE: return "Not Acceptable";
case HttpStatus.PROXY_AUTHENTICATION_REQUIRED:
return "Proxy Authentication Required";
case HttpStatus.REQUEST_TIMEOUT: return "Request Time-out";
case HttpStatus.CONFLICT: return "Conflict";
case HttpStatus.GONE: return "Gone";
case HttpStatus.LENGTH_REQUIRED: return "Length Required";
case HttpStatus.PRECONDITION_FAILED: return "Precondition Failed";
case HttpStatus.REQUEST_ENTITY_TOO_LARGE:
return "Request Entity Too Large";
case HttpStatus.REQUEST_URI_TOO_LONG: return "Request-URI Too Large";
case HttpStatus.UNSUPPORTED_MEDIA_TYPE: return "Unsupported Media Type";
case HttpStatus.REQUESTED_RANGE_NOT_SATISFIABLE:
return "Requested range not satisfiable";
case HttpStatus.EXPECTATION_FAILED: return "Expectation Failed";
case HttpStatus.INTERNAL_SERVER_ERROR: return "Internal Server Error";
case HttpStatus.NOT_IMPLEMENTED: return "Not Implemented";
case HttpStatus.BAD_GATEWAY: return "Bad Gateway";
case HttpStatus.SERVICE_UNAVAILABLE: return "Service Unavailable";
case HttpStatus.GATEWAY_TIMEOUT: return "Gateway Time-out";
case HttpStatus.HTTP_VERSION_NOT_SUPPORTED:
return "Http Version not supported";
default: return "Status $statusCode";
}
}
}
class _HttpClientRequest extends _HttpOutboundMessage<HttpClientRequest>
implements HttpClientRequest {
final String method;
final Uri uri;
final List<Cookie> cookies = new List<Cookie>();
// The HttpClient this request belongs to.
final _HttpClient _httpClient;
final _HttpClientConnection _httpClientConnection;
final Completer<HttpClientResponse> _responseCompleter
= new Completer<HttpClientResponse>();
final bool _usingProxy;
// TODO(ajohnsen): Get default value from client?
bool _followRedirects = true;
int _maxRedirects = 5;
List<RedirectInfo> _responseRedirects = [];
_HttpClientRequest(_HttpOutgoing outgoing,
Uri this.uri,
String this.method,
bool this._usingProxy,
_HttpClient this._httpClient,
_HttpClientConnection this._httpClientConnection)
: super("1.1", outgoing) {
// GET and HEAD have 'content-length: 0' by default.
if (method == "GET" || method == "HEAD") {
contentLength = 0;
}
}
Future<HttpClientResponse> get response => _responseCompleter.future;
Future<HttpClientResponse> close() {
super.close();
return response;
}
int get maxRedirects => _maxRedirects;
void set maxRedirects(int maxRedirects) {
if (_headersWritten) throw new StateError("Request already sent");
_maxRedirects = maxRedirects;
}
bool get followRedirects => _followRedirects;
void set followRedirects(bool followRedirects) {
if (_headersWritten) throw new StateError("Request already sent");
_followRedirects = followRedirects;
}
HttpConnectionInfo get connectionInfo => _httpClientConnection.connectionInfo;
void _onIncoming(_HttpIncoming incoming) {
var response = new _HttpClientResponse(incoming,
this,
_httpClient);
Future<HttpClientResponse> future;
if (followRedirects && response.isRedirect) {
if (response.redirects.length < maxRedirects) {
// Redirect and drain response.
future = response.reduce(null, (x, y) {})
.then((_) => response.redirect());
} else {
// End with exception, too many redirects.
future = response.reduce(null, (x, y) {})
.then((_) => new Future.immediateError(
new RedirectLimitExceededException(response.redirects)));
}
} else if (response._shouldAuthenticate) {
future = response._authenticate();
} else {
future = new Future<HttpClientResponse>.immediate(response);
}
future.then(
(v) => _responseCompleter.complete(v),
onError: (e) {
_responseCompleter.completeError(e);
});
}
void _onError(AsyncError error) {
_responseCompleter.completeError(error);
}
void _writeHeader() {
writeSP() => add([_CharCode.SP]);
writeCRLF() => add([_CharCode.CR, _CharCode.LF]);
addString(method);
writeSP();
// Send the path for direct connections and the whole URL for
// proxy connections.
if (!_usingProxy) {
String path = uri.path;
if (path.length == 0) path = "/";
if (uri.query != "") {
if (uri.fragment != "") {
path = "${path}?${uri.query}#${uri.fragment}";
} else {
path = "${path}?${uri.query}";
}
}
addString(path);
} else {
addString(uri.toString());
}
writeSP();
add(_Const.HTTP11);
writeCRLF();
// Add the cookies to the headers.
if (!cookies.isEmpty) {
StringBuffer sb = new StringBuffer();
for (int i = 0; i < cookies.length; i++) {
if (i > 0) sb.add("; ");
sb.add(cookies[i].name);
sb.add("=");
sb.add(cookies[i].value);
}
headers.add("cookie", sb.toString());
}
headers._finalize();
// Write headers.
headers._write(this);
writeCRLF();
}
}
// Transformer that transforms data to HTTP Chunked Encoding.
class _ChunkedTransformer implements StreamTransformer<List<int>, List<int>> {
final StreamController<List<int>> _controller
= new StreamController<List<int>>();
Stream<List<int>> bind(Stream<List<int>> stream) {
var subscription = stream.listen(
(data) {
if (data.length == 0) return; // Avoid close on 0-bytes payload.
_addChunk(data, _controller.add);
},
onDone: () {
_addChunk([], _controller.add);
_controller.close();
});
return _controller.stream;
}
static void _addChunk(List<int> data, void add(List<int> data)) {
add(_chunkHeader(data.length));
if (data.length > 0) add(data);
add(_chunkFooter);
}
static List<int> _chunkHeader(int length) {
const hexDigits = const [0x30, 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37,
0x38, 0x39, 0x41, 0x42, 0x43, 0x44, 0x45, 0x46];
var header = [];
if (length == 0) {
header.add(hexDigits[length]);
} else {
while (length > 0) {
header.insertRange(0, 1, hexDigits[length % 16]);
length = length >> 4;
}
}
header.add(_CharCode.CR);
header.add(_CharCode.LF);
return header;
}
// Footer is just a CRLF.
static List<int> get _chunkFooter => const [_CharCode.CR, _CharCode.LF];
}
// Transformer that invokes [_onDone] when completed.
class _DoneTransformer implements StreamTransformer<List<int>, List<int>> {
final StreamController<List<int>> _controller
= new StreamController<List<int>>();
final Function _onDone;
_DoneTransformer(this._onDone);
Stream<List<int>> bind(Stream<List<int>> stream) {
var subscription = stream.listen(
_controller.add,
onError: _controller.signalError,
onDone: () {
_onDone();
_controller.close();
});
return _controller.stream;
}
}
// Transformer that validates the data written.
class _DataValidatorTransformer
implements StreamTransformer<List<int>, List<int>> {
final StreamController<List<int>> _controller
= new StreamController<List<int>>();
int _bytesWritten = 0;
Completer _completer = new Completer();
int expectedTransferLength;
_DataValidatorTransformer();
Future get validatorFuture => _completer.future;
Stream<List<int>> bind(Stream<List<int>> stream) {
var subscription;
subscription = stream.listen(
(data) {
if (expectedTransferLength != null) {
_bytesWritten += data.length;
if (_bytesWritten > expectedTransferLength) {
_controller.close();
subscription.cancel();
if (_completer != null) {
_completer.completeError(new HttpException(
"Content size exceeds specified contentLength. "
"$_bytesWritten bytes written while expected "
"$expectedTransferLength."));
_completer = null;
}
return;
}
}
_controller.add(data);
},
onError: (error) {
_controller.close();
if (_completer != null) {
_completer.completeError(error);
_completer = null;
}
},
onDone: () {
_controller.close();
if (expectedTransferLength != null) {
if (_bytesWritten < expectedTransferLength) {
if (_completer != null) {
_completer.completeError(new HttpException(
"Content size below specified contentLength. "
" $_bytesWritten bytes written while expected "
"$expectedTransferLength."));
_completer = null;
return;
}
}
}
if (_completer != null) {
_completer.complete(this);
_completer = null;
}
},
unsubscribeOnError: true);
return _controller.stream;
}
}
// Extends StreamConsumer as this is an internal type, only used to pipe to.
class _HttpOutgoing implements StreamConsumer<List<int>, dynamic> {
final Completer _dataCompleter = new Completer();
final Completer _streamCompleter = new Completer();
final _DataValidatorTransformer _validator = new _DataValidatorTransformer();
// Future that completes when all data is written.
Future get dataDone => _dataCompleter.future;
// Future that completes with the Stream, once the _HttpClientConnection is
// bound to one.
Future<Stream<List<int>>> get stream => _streamCompleter.future;
void setTransferLength(int transferLength) {
_validator.expectedTransferLength = transferLength;
}
Future consume(Stream<List<int>> stream) {
stream = stream.transform(_validator);
_streamCompleter.complete(stream);
_validator.validatorFuture.catchError((e) {
_dataCompleter.completeError(e);
});
return _validator.validatorFuture.then((v) {
_dataCompleter.complete();
return v;
});
}
}
class _HttpClientConnection {
final String key;
final Socket _socket;
final _HttpParser _httpParser;
StreamSubscription _subscription;
final _HttpClient _httpClient;
Completer<_HttpIncoming> _nextResponseCompleter;
Future _writeDoneFuture;
_HttpClientConnection(String this.key,
Socket this._socket,
_HttpClient this._httpClient)
: _httpParser = new _HttpParser.responseParser() {
_socket.pipe(_httpParser);
_socket.done.catchError((e) { destroy(); });
// Set up handlers on the parser here, so we are sure to get 'onDone' from
// the parser.
_subscription = _httpParser.listen(
(incoming) {
// Only handle one incoming response at the time. Keep the
// stream paused until the response have been processed.
_subscription.pause();
// We assume the response is not here, until we have send the request.
assert(_nextResponseCompleter != null);
_nextResponseCompleter.complete(incoming);
},
onError: (error) {
if (_nextResponseCompleter != null) {
_nextResponseCompleter.completeError(error);
}
},
onDone: () {
close();
});
}
Future<_HttpIncoming> sendRequest(_HttpOutgoing outgoing) {
return outgoing.stream
.then((stream) {
// Close socket if output data is invalid.
outgoing.dataDone.catchError((e) {
close();
});
// Sending request, set up response completer.
_nextResponseCompleter = new Completer();
_writeDoneFuture = _socket.addStream(stream);
// Listen for response.
return _nextResponseCompleter.future
.whenComplete(() {
_nextResponseCompleter = null;
})
.then((incoming) {
incoming.dataDone.then((_) {
if (!incoming.headers.persistentConnection) {
close();
} else {
// Wait for the socket to be done with writing, before we
// continue.
_writeDoneFuture.then((_) {
_subscription.resume();
// Return connection, now we are done.
_httpClient._returnConnection(this);
});
}
});
// TODO(ajohnsen): Can there be an error on dataDone?
return incoming;
})
// If we see a state error, we failed to get the 'first' element.
// Transform the error to a HttpParserException, for consistency.
.catchError((error) {
throw new HttpParserException(
"Connection closed before data was received");
}, test: (error) => error is StateError)
.catchError((error) {
// We are done with the socket.
destroy();
throw error;
});
});
}
Future<Socket> detachSocket() {
return _writeDoneFuture.then((_) =>
new _DetachedSocket(_socket, _httpParser.detachIncoming()));
}
void destroy() {
_socket.destroy();
_httpClient._connectionClosed(this);
}
void close() {
var future = _writeDoneFuture;
if (future == null) future = new Future.immediate(null);
_httpClient._connectionClosed(this);
future.then((_) {
_socket.close();
// TODO(ajohnsen): Add timeout.
// Delay destroy until socket is actually done writing.
_socket.done.then((_) => _socket.destroy(),
onError: (_) => _socket.destroy());
});
}
HttpConnectionInfo get connectionInfo => _HttpConnectionInfo.create(_socket);
}
class _ConnnectionInfo {
_ConnnectionInfo(_HttpClientConnection this.connection, _Proxy this.proxy);
final _HttpClientConnection connection;
final _Proxy proxy;
}
class _HttpClient implements HttpClient {
// TODO(ajohnsen): Use eviction timeout.
static const int DEFAULT_EVICTION_TIMEOUT = 60000;
bool _closing = false;
final Map<String, Queue<_HttpClientConnection>> _idleConnections
= new Map<String, Queue<_HttpClientConnection>>();
final Set<_HttpClientConnection> _activeConnections
= new Set<_HttpClientConnection>();
final List<_Credentials> _credentials = [];
Function _authenticate;
Function _findProxy;
Future<HttpClientRequest> open(String method,
String host,
int port,
String path) {
// TODO(sgjesse): The path set here can contain both query and
// fragment. They should be cracked and set correctly.
return _openUrl(method, new Uri.fromComponents(
scheme: "http", domain: host, port: port, path: path));
}
Future<HttpClientRequest> openUrl(String method, Uri url) {
return _openUrl(method, url);
}
Future<HttpClientRequest> get(String host,
int port,
String path) {
return open("get", host, port, path);
}
Future<HttpClientRequest> getUrl(Uri url) {
return _openUrl("get", url);
}
Future<HttpClientRequest> post(String host,
int port,
String path) {
return open("post", host, port, path);
}
Future<HttpClientRequest> postUrl(Uri url) {
return _openUrl("post", url);
}
void close({bool force: false}) {
_closing = true;
// Create flattened copy of _idleConnections, as 'destory' will manipulate
// it.
var idle = _idleConnections.values.reduce(
[],
(l, e) {
l.addAll(e);
return l;
});
idle.forEach((e) {
e.close();
});
assert(_idleConnections.isEmpty);
if (force) {
for (var connection in _activeConnections.toList()) {
connection.destroy();
}
assert(_activeConnections.isEmpty);
_activeConnections.clear();
}
}
set authenticate(Future<bool> f(Uri url, String scheme, String realm)) {
_authenticate = f;
}
void addCredentials(Uri url, String realm, HttpClientCredentials cr) {
_credentials.add(new _Credentials(url, realm, cr));
}
set findProxy(String f(Uri uri)) => _findProxy = f;
Future<HttpClientRequest> _openUrl(String method, Uri uri) {
if (method == null) {
throw new ArgumentError(method);
}
if (uri.domain.isEmpty || (uri.scheme != "http" && uri.scheme != "https")) {
throw new ArgumentError("Unsupported scheme '${uri.scheme}' in $uri");
}
bool isSecure = (uri.scheme == "https");
int port = uri.port;
if (port == 0) {
port = isSecure ?
HttpClient.DEFAULT_HTTPS_PORT :
HttpClient.DEFAULT_HTTP_PORT;
}
// Check to see if a proxy server should be used for this connection.
var proxyConf = const _ProxyConfiguration.direct();
if (_findProxy != null) {
// TODO(sgjesse): Keep a map of these as normally only a few
// configuration strings will be used.
try {
proxyConf = new _ProxyConfiguration(_findProxy(uri));
} catch (error, stackTrace) {
return new Future.immediateError(error, stackTrace);
}
}
return _getConnection(uri.domain, port, proxyConf, isSecure).then((info) {
// Create new internal outgoing connection.
var outgoing = new _HttpOutgoing();
// Create new request object, wrapping the outgoing connection.
var request = new _HttpClientRequest(outgoing,
uri,
method.toUpperCase(),
!info.proxy.isDirect,
this,
info.connection);
request.headers.host = uri.domain;
request.headers.port = port;
if (uri.userInfo != null && !uri.userInfo.isEmpty) {
// If the URL contains user information use that for basic
// authorization
String auth =
CryptoUtils.bytesToBase64(_encodeString(uri.userInfo));
request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth");
} else {
// Look for credentials.
_Credentials cr = _findCredentials(uri);
if (cr != null) {
cr.authorize(request);
}
}
// Start sending the request (lazy, delayed until the user provides
// data).
info.connection._httpParser.responseToMethod = method;
info.connection.sendRequest(outgoing)
.then((incoming) {
// The full request have been sent and a response is received
// containing status-code, headers and etc.
request._onIncoming(incoming);
})
.catchError((error) {
// An error occoured before the http-header was parsed. This
// could be either a socket-error or parser-error.
request._onError(error);
});
// Return the request to the user. Immediate socket errors are not
// handled, thus forwarded to the user.
return request;
});
}
Future<HttpClientRequest> _openUrlFromRequest(String method,
Uri uri,
_HttpClientRequest previous) {
return openUrl(method, uri).then((request) {
// Only follow redirects if initial request did.
request.followRedirects = previous.followRedirects;
// Allow same number of redirects.
request.maxRedirects = previous.maxRedirects;
// Copy headers
for (var header in previous.headers._headers.keys) {
if (request.headers[header] == null) {
request.headers.set(header, previous.headers[header]);
}
}
request.headers.chunkedTransferEncoding = false;
request.contentLength = 0;
return request;
});
}
// Return a live connection to the idle pool.
void _returnConnection(_HttpClientConnection connection) {
_activeConnections.remove(connection);
if (_closing) {
connection.close();
return;
}
// TODO(ajohnsen): Listen for socket close events.
if (!_idleConnections.containsKey(connection.key)) {
_idleConnections[connection.key] = new Queue();
}
_idleConnections[connection.key].addLast(connection);
}
// Remove a closed connnection from the active set.
void _connectionClosed(_HttpClientConnection connection) {
_activeConnections.remove(connection);
if (_idleConnections.containsKey(connection.key)) {
_idleConnections[connection.key].remove(connection);
if (_idleConnections[connection.key].isEmpty) {
_idleConnections.remove(connection.key);
}
}
}
// Get a new _HttpClientConnection, either from the idle pool or created from
// a new Socket.
Future<_ConnnectionInfo> _getConnection(String uriHost,
int uriPort,
_ProxyConfiguration proxyConf,
bool isSecure) {
Iterator<_Proxy> proxies = proxyConf.proxies.iterator;
Future<_ConnnectionInfo> connect(error) {
if (!proxies.moveNext()) return new Future.immediateError(error);
_Proxy proxy = proxies.current;
String host = proxy.isDirect ? uriHost: proxy.host;
int port = proxy.isDirect ? uriPort: proxy.port;
String key = isSecure ? "ssh:$host:$port" : "$host:$port";
if (_idleConnections.containsKey(key)) {
var connection = _idleConnections[key].removeFirst();
if (_idleConnections[key].isEmpty) {
_idleConnections.remove(key);
}
_activeConnections.add(connection);
return new Future.immediate(new _ConnnectionInfo(connection, proxy));
}
return (isSecure && proxy.isDirect
? SecureSocket.connect(host,
port,
sendClientCertificate: true)
: Socket.connect(host, port))
.then((socket) {
var connection = new _HttpClientConnection(key, socket, this);
_activeConnections.add(connection);
return new _ConnnectionInfo(connection, proxy);
}, onError: (error) {
// Continue with next proxy.
return connect(error.error);
});
}
return connect(new HttpException("No proxies given"));
}
_Credentials _findCredentials(Uri url, [_AuthenticationScheme scheme]) {
// Look for credentials.
_Credentials cr =
_credentials.reduce(null, (_Credentials prev, _Credentials value) {
if (value.applies(url, scheme)) {
if (prev == null) return value;
return value.uri.path.length > prev.uri.path.length ? value : prev;
} else {
return prev;
}
});
return cr;
}
void _removeCredentials(_Credentials cr) {
int index = _credentials.indexOf(cr);
if (index != -1) {
_credentials.removeAt(index);
}
}
}
class _HttpConnection {
static const _ACTIVE = 0;
static const _IDLE = 1;
static const _CLOSING = 2;
static const _DETACHED = 3;
int _state = _IDLE;
final Socket _socket;
final _HttpServer _httpServer;
final _HttpParser _httpParser;
StreamSubscription _subscription;
Future _writeDoneFuture;
_HttpConnection(Socket this._socket, _HttpServer this._httpServer)
: _httpParser = new _HttpParser.requestParser() {
_socket.pipe(_httpParser);
_socket.done.catchError((e) => destroy());
_subscription = _httpParser.listen(
(incoming) {
// Only handle one incoming request at the time. Keep the
// stream paused until the request has been send.
_subscription.pause();
_state = _ACTIVE;
var outgoing = new _HttpOutgoing();
_writeDoneFuture = outgoing.stream.then(_socket.addStream);
var response = new _HttpResponse(
incoming.headers.protocolVersion,
outgoing);
var request = new _HttpRequest(response, incoming, _httpServer, this);
response._ignoreBody = request.method == "HEAD";
response._httpRequest = request;
outgoing.dataDone.then((_) {
if (_state == _DETACHED) return;
if (response.headers.persistentConnection &&
incoming.fullBodyRead) {
// Wait for the socket to be done with writing, before we
// continue.
_writeDoneFuture.then((_) {
_state = _IDLE;
// Resume the subscription for incoming requests as the
// request is now processed.
_subscription.resume();
});
} else {
// Close socket, keep-alive not used or body sent before received
// data was handled.
close();
}
}).catchError((e) {
close();
});
_httpServer._handleRequest(request);
},
onDone: () {
close();
},
onError: (error) {
_httpServer._handleError(error);
destroy();
});
}
void destroy() {
if (_state == _CLOSING || _state == _DETACHED) return;
_state = _CLOSING;
_socket.destroy();
_httpServer._connectionClosed(this);
}
void close() {
if (_state == _CLOSING || _state == _DETACHED) return;
_state = _CLOSING;
var future = _writeDoneFuture;
if (future == null) future = new Future.immediate(null);
_httpServer._connectionClosed(this);
future.then((_) {
_socket.close();
// TODO(ajohnsen): Add timeout.
// Delay destroy until socket is actually done writing.
_socket.done.then((_) => _socket.destroy(),
onError: (_) => _socket.destroy());
});
}
Future<Socket> detachSocket() {
_state = _DETACHED;
// Remove connection from server.
_httpServer._connectionClosed(this);
_HttpDetachedIncoming detachedIncoming = _httpParser.detachIncoming();
return _writeDoneFuture.then((_) {
return new _DetachedSocket(_socket, detachedIncoming);
});
}
HttpConnectionInfo get connectionInfo => _HttpConnectionInfo.create(_socket);
bool get _isActive => _state == _ACTIVE;
bool get _isIdle => _state == _IDLE;
bool get _isClosing => _state == _CLOSING;
bool get _isDetached => _state == _DETACHED;
}
// HTTP server waiting for socket connections.
class _HttpServer extends Stream<HttpRequest> implements HttpServer {
static Future<HttpServer> bind(String host, int port, int backlog) {
return ServerSocket.bind(host, port, backlog).then((socket) {
return new _HttpServer._(socket, true);
});
}
static Future<HttpServer> bindSecure(String host,
int port,
int backlog,
String certificate_name,
bool requestClientCertificate) {
return SecureServerSocket.bind(
host,
port,
backlog,
certificate_name,
requestClientCertificate: requestClientCertificate)
.then((socket) {
return new _HttpServer._(socket, true);
});
}
_HttpServer._(this._serverSocket, this._closeServer);
_HttpServer.listenOn(ServerSocket this._serverSocket)
: _closeServer = false;
StreamSubscription<HttpRequest> listen(void onData(HttpRequest event),
{void onError(AsyncError error),
void onDone(),
bool unsubscribeOnError}) {
_serverSocket.listen(
(Socket socket) {
// Accept the client connection.
_HttpConnection connection = new _HttpConnection(socket, this);
_connections.add(connection);
},
onError: _controller.signalError,
onDone: _controller.close);
return _controller.stream.listen(onData,
onError: onError,
onDone: onDone,
unsubscribeOnError: unsubscribeOnError);
}
void close() {
closed = true;
if (_serverSocket != null && _closeServer) {
_serverSocket.close();
}
if (_sessionManagerInstance != null) {
_sessionManagerInstance.close();
_sessionManagerInstance = null;
}
for (_HttpConnection connection in _connections.toList()) {
connection.destroy();
}
_connections.clear();
}
int get port {
if (closed) throw new HttpException("HttpServer is not bound to a socket");
return _serverSocket.port;
}
set sessionTimeout(int timeout) {
_sessionManager.sessionTimeout = timeout;
}
void _handleRequest(HttpRequest request) {
_controller.add(request);
}
void _handleError(AsyncError error) {
if (!closed) _controller.signalError(error);
}
void _connectionClosed(_HttpConnection connection) {
_connections.remove(connection);
}
_HttpSessionManager get _sessionManager {
// Lazy init.
if (_sessionManagerInstance == null) {
_sessionManagerInstance = new _HttpSessionManager();
}
return _sessionManagerInstance;
}
HttpConnectionsInfo connectionsInfo() {
HttpConnectionsInfo result = new HttpConnectionsInfo();
result.total = _connections.length;
_connections.forEach((_HttpConnection conn) {
if (conn._isActive) {
result.active++;
} else if (conn._isIdle) {
result.idle++;
} else {
assert(conn._isClosing);
result.closing++;
}
});
return result;
}
_HttpSessionManager _sessionManagerInstance;
// Indicated if the http server has been closed.
bool closed = false;
// The server listen socket.
final ServerSocket _serverSocket;
final bool _closeServer;
// Set of currently connected clients.
final Set<_HttpConnection> _connections = new Set<_HttpConnection>();
final StreamController<HttpRequest> _controller
= new StreamController<HttpRequest>();
// TODO(ajohnsen): Use close queue?
}
class _ProxyConfiguration {
static const String PROXY_PREFIX = "PROXY ";
static const String DIRECT_PREFIX = "DIRECT";
_ProxyConfiguration(String configuration) : proxies = new List<_Proxy>() {
if (configuration == null) {
throw new HttpException("Invalid proxy configuration $configuration");
}
List<String> list = configuration.split(";");
list.forEach((String proxy) {
proxy = proxy.trim();
if (!proxy.isEmpty) {
if (proxy.startsWith(PROXY_PREFIX)) {
int colon = proxy.indexOf(":");
if (colon == -1 || colon == 0 || colon == proxy.length - 1) {
throw new HttpException(
"Invalid proxy configuration $configuration");
}
// Skip the "PROXY " prefix.
String host = proxy.substring(PROXY_PREFIX.length, colon).trim();
String portString = proxy.substring(colon + 1).trim();
int port;
try {
port = int.parse(portString);
} on FormatException catch (e) {
throw new HttpException(
"Invalid proxy configuration $configuration, "
"invalid port '$portString'");
}
proxies.add(new _Proxy(host, port));
} else if (proxy.trim() == DIRECT_PREFIX) {
proxies.add(new _Proxy.direct());
} else {
throw new HttpException("Invalid proxy configuration $configuration");
}
}
});
}
const _ProxyConfiguration.direct()
: proxies = const [const _Proxy.direct()];
final List<_Proxy> proxies;
}
class _Proxy {
const _Proxy(this.host, this.port) : isDirect = false;
const _Proxy.direct() : host = null, port = null, isDirect = true;
final String host;
final int port;
final bool isDirect;
}
class _HttpConnectionInfo implements HttpConnectionInfo {
static _HttpConnectionInfo create(Socket socket) {
if (socket == null) return null;
try {
_HttpConnectionInfo info = new _HttpConnectionInfo._();
info.remoteHost = socket.remoteHost;
info.remotePort = socket.remotePort;
info.localPort = socket.port;
return info;
} catch (e) { }
return null;
}
_HttpConnectionInfo._();
String remoteHost;
int remotePort;
int localPort;
}
class _DetachedSocket implements Socket {
final Stream<List<int>> _incoming;
final Socket _socket;
_DetachedSocket(this._socket, this._incoming);
StreamSubscription<List<int>> listen(void onData(List<int> event),
{void onError(AsyncError error),
void onDone(),
bool unsubscribeOnError}) {
return _incoming.listen(onData,
onError: onError,
onDone: onDone,
unsubscribeOnError: unsubscribeOnError);
}
Future<Socket> consume(Stream<List<int>> stream) {
return _socket.consume(stream);
}
Future<Socket> addStream(Stream<List<int>> stream) {
return _socket.addStream(stream);
}
void addString(String string, [Encoding encoding = Encoding.UTF_8]) {
return _socket.addString(string, encoding);
}
void destroy() => _socket.destroy();
void add(List<int> data) => _socket.add(data);
Future<Socket> close() => _socket.close();
}
class _AuthenticationScheme {
static const UNKNOWN = const _AuthenticationScheme(-1);
static const BASIC = const _AuthenticationScheme(0);
static const DIGEST = const _AuthenticationScheme(1);
const _AuthenticationScheme(this._scheme);
factory _AuthenticationScheme.fromString(String scheme) {
if (scheme.toLowerCase() == "basic") return BASIC;
if (scheme.toLowerCase() == "digest") return DIGEST;
return UNKNOWN;
}
String toString() {
if (this == BASIC) return "Basic";
if (this == DIGEST) return "Digest";
return "Unknown";
}
final int _scheme;
}
class _Credentials {
_Credentials(this.uri, this.realm, this.credentials);
_AuthenticationScheme get scheme => credentials.scheme;
bool applies(Uri uri, _AuthenticationScheme scheme) {
if (scheme != null && credentials.scheme != scheme) return false;
if (uri.domain != this.uri.domain) return false;
int thisPort =
this.uri.port == 0 ? HttpClient.DEFAULT_HTTP_PORT : this.uri.port;
int otherPort = uri.port == 0 ? HttpClient.DEFAULT_HTTP_PORT : uri.port;
if (otherPort != thisPort) return false;
return uri.path.startsWith(this.uri.path);
}
void authorize(HttpClientRequest request) {
credentials.authorize(this, request);
used = true;
}
bool used = false;
Uri uri;
String realm;
_HttpClientCredentials credentials;
// Digest specific fields.
String nonce;
String algorithm;
String qop;
}
abstract class _HttpClientCredentials implements HttpClientCredentials {
_AuthenticationScheme get scheme;
void authorize(_Credentials credentials, HttpClientRequest request);
}
class _HttpClientBasicCredentials
extends _HttpClientCredentials
implements HttpClientBasicCredentials {
_HttpClientBasicCredentials(this.username,
this.password);
_AuthenticationScheme get scheme => _AuthenticationScheme.BASIC;
void authorize(_Credentials _, HttpClientRequest request) {
// There is no mentioning of username/password encoding in RFC
// 2617. However there is an open draft for adding an additional
// accept-charset parameter to the WWW-Authenticate and
// Proxy-Authenticate headers, see
// http://tools.ietf.org/html/draft-reschke-basicauth-enc-06. For
// now always use UTF-8 encoding.
String auth =
CryptoUtils.bytesToBase64(_encodeString("$username:$password"));
request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth");
}
String username;
String password;
}
class _HttpClientDigestCredentials
extends _HttpClientCredentials
implements HttpClientDigestCredentials {
_HttpClientDigestCredentials(this.username,
this.password);
_AuthenticationScheme get scheme => _AuthenticationScheme.DIGEST;
void authorize(_Credentials credentials, HttpClientRequest request) {
// TODO(sgjesse): Implement!!!
throw new UnsupportedError("Digest authentication not yet supported");
}
String username;
String password;
}
class _RedirectInfo implements RedirectInfo {
const _RedirectInfo(int this.statusCode,
String this.method,
Uri this.location);
final int statusCode;
final String method;
final Uri location;
}