Support Request hijacking in Shelf, using a similar API to Rack.

R=kevmoo@google.com

Review URL: https://codereview.chromium.org//260933004

git-svn-id: https://dart.googlecode.com/svn/branches/bleeding_edge/dart/pkg/shelf@36324 260f80e4-7a28-3924-810f-c04153c831b5
diff --git a/CHANGELOG.md b/CHANGELOG.md
index cbf2905..d83e4cc 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -3,6 +3,9 @@
 * Add a `Cascade` helper that runs handlers in sequence until one returns a
   response that's neither a 404 nor a 405.
 
+* Add a `Request.hijack` method that allows handlers to gain access to the
+  underlying HTTP socket.
+
 ## 0.5.1+1
 
 * Capture all asynchronous errors thrown by handlers if they would otherwise be
diff --git a/lib/shelf.dart b/lib/shelf.dart
index afb077e..98d7815 100644
--- a/lib/shelf.dart
+++ b/lib/shelf.dart
@@ -7,6 +7,7 @@
 export 'src/cascade.dart';
 export 'src/handler.dart';
 export 'src/handlers/logger.dart';
+export 'src/hijack_exception.dart';
 export 'src/middleware.dart';
 export 'src/pipeline.dart';
 export 'src/request.dart';
diff --git a/lib/shelf_io.dart b/lib/shelf_io.dart
index 1bd4f51..d8d4d9c 100644
--- a/lib/shelf_io.dart
+++ b/lib/shelf_io.dart
@@ -6,6 +6,8 @@
 ///
 /// One can provide an instance of [HttpServer] as the `requests` parameter in
 /// [serveRequests].
+///
+/// The `dart:io` adapter supports request hijacking; see [Request.hijack].
 library shelf.io;
 
 import 'dart:async';
@@ -53,15 +55,38 @@
 Future handleRequest(HttpRequest request, Handler handler) {
   var shelfRequest = _fromHttpRequest(request);
 
+  // TODO(nweiz): abstract out hijack handling to make it easier to implement an
+  // adapter.
   return syncFuture(() => handler(shelfRequest))
       .catchError((error, stackTrace) {
-    return _logError('Error thrown by handler\n$error', stackTrace);
+    if (error is HijackException) {
+      // A HijackException should bypass the response-writing logic entirely.
+      if (!shelfRequest.canHijack) throw error;
+
+      // If the request wasn't hijacked, we shouldn't be seeing this exception.
+      return _logError(
+          "Caught HijackException, but the request wasn't hijacked.",
+          stackTrace);
+    }
+
+    return _logError('Error thrown by handler.\n$error', stackTrace);
   }).then((response) {
     if (response == null) {
-      response = _logError('null response from handler');
+      response = _logError('null response from handler.');
+    } else if (!shelfRequest.canHijack) {
+      var message = new StringBuffer()
+          ..writeln("Got a response for hijacked request "
+              "${shelfRequest.method} ${shelfRequest.requestedUri}:")
+          ..writeln(response.statusCode);
+      response.headers.forEach((key, value) =>
+          message.writeln("${key}: ${value}"));
+      throw new Exception(message.toString().trim());
     }
 
     return _writeResponse(response, request.response);
+  }).catchError((error, stackTrace) {
+    // Ignore HijackExceptions.
+    if (error is! HijackException) throw error;
   });
 }
 
@@ -74,9 +99,14 @@
     headers[k] = v.join(',');
   });
 
+  onHijack(callback) {
+    return request.response.detachSocket(writeHeaders: false)
+        .then((socket) => callback(socket, socket));
+  }
+
   return new Request(request.method, request.requestedUri,
       protocolVersion: request.protocolVersion, headers: headers,
-      body: request);
+      body: request, onHijack: onHijack);
 }
 
 Future _writeResponse(Response response, HttpResponse httpResponse) {
diff --git a/lib/src/handlers/logger.dart b/lib/src/handlers/logger.dart
index 2e95cf9..e949c9b 100644
--- a/lib/src/handlers/logger.dart
+++ b/lib/src/handlers/logger.dart
@@ -6,6 +6,7 @@
 
 import 'package:stack_trace/stack_trace.dart';
 
+import '../hijack_exception.dart';
 import '../middleware.dart';
 import '../util.dart';
 
@@ -37,6 +38,8 @@
 
       return response;
     }, onError: (error, stackTrace) {
+      if (error is HijackException) throw error;
+
       var msg = _getErrorMessage(startTime, request.url, request.method,
           watch.elapsed, error, stackTrace);
 
diff --git a/lib/src/hijack_exception.dart b/lib/src/hijack_exception.dart
new file mode 100644
index 0000000..13c5054
--- /dev/null
+++ b/lib/src/hijack_exception.dart
@@ -0,0 +1,21 @@
+// Copyright (c) 2014, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library shelf.hijack_exception;
+
+/// An exception used to indicate that a request has been hijacked.
+///
+/// This shouldn't be captured by any code other than the Shelf adapter that
+/// created the hijackable request. Middleware that captures exceptions should
+/// make sure to pass on HijackExceptions.
+///
+/// See also [Request.hijack].
+class HijackException {
+  const HijackException();
+
+  String toString() =>
+      "A shelf request's underlying data stream was hijacked.\n"
+      "This exception is used for control flow and should only be handled by a "
+        "Shelf adapter.";
+}
diff --git a/lib/src/middleware.dart b/lib/src/middleware.dart
index e1a9f18..228761d 100644
--- a/lib/src/middleware.dart
+++ b/lib/src/middleware.dart
@@ -7,6 +7,7 @@
 import 'request.dart';
 import 'response.dart';
 import 'handler.dart';
+import 'hijack_exception.dart';
 import 'util.dart';
 
 /// A function which creates a new [Handler] by wrapping a [Handler].
@@ -21,6 +22,9 @@
 ///
 /// Common uses for middleware include caching, logging, and authentication.
 ///
+/// Middleware that captures exceptions should be sure to pass
+/// [HijackException]s on without modification.
+///
 /// A simple [Middleware] can be created using [createMiddleware].
 typedef Handler Middleware(Handler innerHandler);
 
@@ -40,8 +44,9 @@
 /// create a new response object.
 ///
 /// If provided, [errorHandler] receives errors thrown by the inner handler. It
-/// does not receive errors thrown by [requestHandler] or [responseHandler]. It
-/// can either return a new response or throw an error.
+/// does not receive errors thrown by [requestHandler] or [responseHandler], nor
+/// does it receive [HijackException]s. It can either return a new response or
+/// throw an error.
 Middleware createMiddleware({requestHandler(Request request),
     responseHandler(Response response),
     errorHandler(error, StackTrace stackTrace)}) {
@@ -56,7 +61,10 @@
 
         return syncFuture(() => innerHandler(request))
             .then((response) => responseHandler(response),
-                onError: errorHandler);
+                onError: (error, stackTrace) {
+          if (error is HijackException) throw error;
+          return errorHandler(error, stackTrace);
+        });
       });
     };
   };
diff --git a/lib/src/request.dart b/lib/src/request.dart
index 192a782..4a08468 100644
--- a/lib/src/request.dart
+++ b/lib/src/request.dart
@@ -8,9 +8,18 @@
 
 import 'package:http_parser/http_parser.dart';
 
+import 'hijack_exception.dart';
 import 'message.dart';
 import 'util.dart';
 
+/// A callback provided by a Shelf handler that's passed to [Request.hijack].
+typedef void HijackCallback(
+    Stream<List<int>> stream, StreamSink<List<int>> sink);
+
+/// A callback provided by a Shelf adapter that's used by [Request.hijack] to
+/// provide a [HijackCallback] with a socket.
+typedef void OnHijackCallback(HijackCallback callback);
+
 /// Represents an HTTP request to be processed by a Shelf application.
 class Request extends Message {
   /// The remainder of the [requestedUri] path and query designating the virtual
@@ -46,6 +55,17 @@
   /// The original [Uri] for the request.
   final Uri requestedUri;
 
+  /// The callback wrapper for hijacking this request.
+  ///
+  /// This will be `null` if this request can't be hijacked.
+  final _OnHijack _onHijack;
+
+  /// Whether this request can be hijacked.
+  ///
+  /// This will be `false` either if the adapter doesn't support hijacking, or
+  /// if the request has already been hijacked.
+  bool get canHijack => _onHijack != null && !_onHijack.called;
+
   /// If this is non-`null` and the requested resource hasn't been modified
   /// since this date and time, the server should return a 304 Not Modified
   /// response.
@@ -69,15 +89,43 @@
   /// [ArgumentError].
   ///
   /// The default value for [protocolVersion] is '1.1'.
+  ///
+  /// ## `onHijack`
+  ///
+  /// [onHijack] allows handlers to take control of the underlying socket for
+  /// the request. It should be passed by adapters that can provide access to
+  /// the bidirectional socket underlying the HTTP connection stream.
+  ///
+  /// The [onHijack] callback will only be called once per request. It will be
+  /// passed another callback which takes a byte stream and a byte sink.
+  /// [onHijack] must pass the stream and sink for the connection stream to this
+  /// callback, although it may do so asynchronously. Both parameters may be the
+  /// same object. If the user closes the sink, the adapter should ensure that
+  /// the stream is closed as well.
+  ///
+  /// If a request is hijacked, the adapter should expect to receive a
+  /// [HijackException] from the handler. This is a special exception used to
+  /// indicate that hijacking has occurred. The adapter should avoid either
+  /// sending a response or notifying the user of an error if a
+  /// [HijackException] is caught.
+  ///
+  /// An adapter can check whether a request was hijacked using [canHijack],
+  /// which will be `false` for a hijacked request. The adapter may throw an
+  /// error if a [HijackException] is received for a non-hijacked request, or if
+  /// no [HijackException] is received for a hijacked request.
+  ///
+  /// See also [hijack].
   // TODO(kevmoo) finish documenting the rest of the arguments.
   Request(this.method, Uri requestedUri, {String protocolVersion,
     Map<String, String> headers, Uri url, String scriptName,
-    Stream<List<int>> body, Map<String, Object> context})
+    Stream<List<int>> body, Map<String, Object> context,
+    OnHijackCallback onHijack})
       : this.requestedUri = requestedUri,
         this.protocolVersion = protocolVersion == null ?
             '1.1' : protocolVersion,
         this.url = _computeUrl(requestedUri, url, scriptName),
         this.scriptName = _computeScriptName(requestedUri, url, scriptName),
+        this._onHijack = onHijack == null ? null : new _OnHijack(onHijack),
         super(body == null ? new Stream.fromIterable([]) : body,
             headers: headers, context: context) {
     if (method.isEmpty) throw new ArgumentError('method cannot be empty.');
@@ -128,6 +176,52 @@
         protocolVersion: this.protocolVersion, headers: headers, url: this.url,
         scriptName: this.scriptName, body: this.read(), context: context);
   }
+
+  /// Takes control of the underlying request socket.
+  ///
+  /// Synchronously, this throws a [HijackException] that indicates to the
+  /// adapter that it shouldn't emit a response itself. Asynchronously,
+  /// [callback] is called with a [Stream<List<int>>] and
+  /// [StreamSink<List<int>>], respectively, that provide access to the
+  /// underlying request socket.
+  ///
+  /// If the sink is closed, the stream will be closed as well. The stream and
+  /// sink may be the same object, as in the case of a `dart:io` `Socket`
+  /// object.
+  ///
+  /// This may only be called when using a Shelf adapter that supports
+  /// hijacking, such as the `dart:io` adapter. In addition, a given request may
+  /// only be hijacked once. [canHijack] can be used to detect whether this
+  /// request can be hijacked.
+  void hijack(HijackCallback callback) {
+    if (_onHijack == null) {
+      throw new StateError("This request can't be hijacked.");
+    }
+
+    _onHijack.run(callback);
+    throw const HijackException();
+  }
+}
+
+/// A class containing a callback for [Request.hijack] that also tracks whether
+/// the callback has been called.
+class _OnHijack {
+  /// The callback.
+  final OnHijackCallback _callback;
+
+  /// Whether [this] has been called.
+  bool called = false;
+
+  _OnHijack(this._callback);
+
+  /// Calls [this].
+  ///
+  /// Throws a [StateError] if [this] has already been called.
+  void run(HijackCallback callback) {
+    if (called) throw new StateError("This request has already been hijacked.");
+    called = true;
+    newFuture(() => _callback(callback));
+  }
 }
 
 /// Computes `url` from the provided [Request] constructor arguments.
diff --git a/lib/src/util.dart b/lib/src/util.dart
index bb4847c..b668177 100644
--- a/lib/src/util.dart
+++ b/lib/src/util.dart
@@ -8,6 +8,10 @@
 
 import 'package:stack_trace/stack_trace.dart';
 
+/// Like [new Future], but avoids around issue 11911 by using [new Future.value]
+/// under the covers.
+Future newFuture(callback()) => new Future.value().then((_) => callback());
+
 /// Like [Future.sync], but wraps the Future in [Chain.track] as well.
 Future syncFuture(callback()) => Chain.track(new Future.sync(callback));
 
diff --git a/pubspec.yaml b/pubspec.yaml
index bd09077..1493c76 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -4,7 +4,7 @@
 description: Web Server Middleware for Dart
 homepage: http://www.dartlang.org
 environment:
-  sdk: '>=1.2.0 <2.0.0'
+  sdk: '>=1.4.0-dev.5.0 <2.0.0'
 documentation: https://api.dartlang.org/apidocs/channels/be/dartdoc-viewer/shelf
 dependencies:
   collection: '>=0.9.1 <0.10.0'
diff --git a/test/create_middleware_test.dart b/test/create_middleware_test.dart
index 5c35f4d..ee01d07 100644
--- a/test/create_middleware_test.dart
+++ b/test/create_middleware_test.dart
@@ -202,6 +202,18 @@
 
       expect(makeSimpleRequest(handler), throwsA('bad handler'));
     });
+
+    test("doesn't handle HijackException", () {
+      var middleware = createMiddleware(errorHandler: (error, stack) {
+        expect(false, "error handler shouldn't be called");
+      });
+
+      var handler = const Pipeline().addMiddleware(middleware)
+          .addHandler((request) => throw const HijackException());
+
+      expect(makeSimpleRequest(handler),
+          throwsA(new isInstanceOf<HijackException>()));
+    });
   });
 }
 
diff --git a/test/hijack_test.dart b/test/hijack_test.dart
new file mode 100644
index 0000000..fa67a6d
--- /dev/null
+++ b/test/hijack_test.dart
@@ -0,0 +1,51 @@
+// Copyright (c) 2014, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library shelf.hijack_test;
+
+import 'dart:async';
+
+import 'package:unittest/unittest.dart';
+import 'package:shelf/shelf.dart';
+
+import 'test_util.dart';
+
+void main() {
+  test('hijacking a non-hijackable request throws a StateError', () {
+    expect(() => new Request('GET', LOCALHOST_URI).hijack((_, __) => null),
+        throwsStateError);
+  });
+
+  test('hijacking a hijackable request throws a HijackException and calls '
+      'onHijack', () {
+    var request = new Request('GET', LOCALHOST_URI,
+        onHijack: expectAsync((callback) {
+      var streamController = new StreamController();
+      streamController.add([1, 2, 3]);
+      streamController.close();
+
+      var sinkController = new StreamController();
+      expect(sinkController.stream.first, completion(equals([4, 5, 6])));
+
+      callback(streamController.stream, sinkController);
+    }));
+
+    expect(() => request.hijack(expectAsync((stream, sink) {
+      expect(stream.first, completion(equals([1, 2, 3])));
+      sink.add([4, 5, 6]);
+      sink.close();
+    })), throwsA(new isInstanceOf<HijackException>()));
+  });
+
+  test('hijacking a hijackable request twice throws a StateError', () {
+    // Assert that the [onHijack] callback is only called once.
+    var request = new Request('GET', LOCALHOST_URI,
+        onHijack: expectAsync((_) => null, count: 1));
+
+    expect(() => request.hijack((_, __) => null),
+        throwsA(new isInstanceOf<HijackException>()));
+
+    expect(() => request.hijack((_, __) => null), throwsStateError);
+  });
+}
diff --git a/test/log_middleware_test.dart b/test/log_middleware_test.dart
index b9a8304..e056504 100644
--- a/test/log_middleware_test.dart
+++ b/test/log_middleware_test.dart
@@ -58,4 +58,14 @@
 
     expect(makeSimpleRequest(handler), throwsA('testing logging throw'));
   });
+
+  test("doesn't log a HijackException", () {
+    var handler = const Pipeline()
+        .addMiddleware(logRequests(logger: logger))
+        .addHandler((request) => throw const HijackException());
+
+    expect(makeSimpleRequest(handler).whenComplete(() {
+      expect(gotLog, isFalse);
+    }), throwsA(new isInstanceOf<HijackException>()));
+  });
 }
diff --git a/test/shelf_io_test.dart b/test/shelf_io_test.dart
index bdece66..5205658 100644
--- a/test/shelf_io_test.dart
+++ b/test/shelf_io_test.dart
@@ -185,6 +185,39 @@
     });
   });
 
+  test('supports request hijacking', () {
+    _scheduleServer((request) {
+      expect(request.method, 'POST');
+
+      request.hijack(expectAsync((stream, sink) {
+        expect(stream.first, completion(equals("Hello".codeUnits)));
+
+        sink.add((
+            "HTTP/1.1 404 Not Found\r\n"
+            "Date: Mon, 23 May 2005 22:38:34 GMT\r\n"
+            "Content-Length: 13\r\n"
+            "\r\n"
+            "Hello, world!").codeUnits);
+        sink.close();
+      }));
+    });
+
+    return _schedulePost(body: "Hello").then((response) {
+      expect(response.statusCode, HttpStatus.NOT_FOUND);
+      expect(response.headers["date"], "Mon, 23 May 2005 22:38:34 GMT");
+      expect(response.stream.bytesToString(),
+          completion(equals("Hello, world!")));
+    });
+  });
+
+  test('reports an error if a HijackException is thrown without hijacking', () {
+    _scheduleServer((request) => throw const HijackException());
+
+    return _scheduleGet().then((response) {
+      expect(response.statusCode, HttpStatus.INTERNAL_SERVER_ERROR);
+    });
+  });
+
   test('passes asynchronous exceptions to the parent error zone', () {
     return runZoned(() {
       return shelf_io.serve((request) {