Provide initial support of SSE with DevTools (#548)

* Provide initial support of SSE with DevTools
diff --git a/dwds/CHANGELOG.md b/dwds/CHANGELOG.md
index 9adf2ef..f5872a4 100644
--- a/dwds/CHANGELOG.md
+++ b/dwds/CHANGELOG.md
@@ -1,8 +1,11 @@
-## 0.4.1
+## 0.5.0
 
 - Fix an issue where we source map paths were not normalized.
 - Added a check to tests for the variable DWDS_DEBUG_CHROME to run Chrome with a
   UI rather than headless.
+- Add support for an SSE connection with Dart DevTools.
+- Rename `wsUri` to `uri` on `DebugConnection` to reflect that the uri may not
+  be a websocket.
 - Depend on latest `package:vm_service`.
 
 ## 0.4.0
diff --git a/dwds/lib/src/connections/debug_connection.dart b/dwds/lib/src/connections/debug_connection.dart
index b01e681..41fa125 100644
--- a/dwds/lib/src/connections/debug_connection.dart
+++ b/dwds/lib/src/connections/debug_connection.dart
@@ -26,8 +26,8 @@
   /// The port of the host Dart VM Service.
   int get port => _appDebugServices.debugService.port;
 
-  /// The websocket endpoint of the Dart VM Service.
-  String get wsUri => _appDebugServices.debugService.wsUri;
+  /// The endpoint of the Dart VM Service.
+  String get uri => _appDebugServices.debugService.uri;
 
   /// A client of the Dart VM Service with DWDS specific extensions.
   VmService get vmService => _appDebugServices.dwdsVmClient.client;
diff --git a/dwds/lib/src/handlers/dev_handler.dart b/dwds/lib/src/handlers/dev_handler.dart
index eab5c75..aee205c 100644
--- a/dwds/lib/src/handlers/dev_handler.dart
+++ b/dwds/lib/src/handlers/dev_handler.dart
@@ -125,6 +125,8 @@
                   'VmService proxy responded with an error:\n$response');
             }
           : null,
+      // TODO(grouma) - Use SSE for Dart Debug Extension workflow.
+      useSse: false,
     );
   }
 
@@ -218,7 +220,7 @@
             .sendCommand('Target.createTarget', params: {
           'newWindow': true,
           'url': 'http://${_devTools.hostname}:${_devTools.port}'
-              '/?hide=none&uri=${appServices.debugService.wsUri}',
+              '/?hide=none&uri=${appServices.debugService.uri}',
         });
       } else if (message is ConnectRequest) {
         if (appId != null) {
@@ -275,7 +277,7 @@
     _logWriter(
         Level.INFO,
         'Debug service listening on '
-        '${debugService.wsUri}\n');
+        '${debugService.uri}\n');
     var webdevClient = await DwdsVmClient.create(debugService);
     return AppDebugServices(debugService, webdevClient);
   }
@@ -306,7 +308,7 @@
     await _extensionDebugger.sendCommand('Target.createTarget', params: {
       'newWindow': true,
       'url': 'http://${_devTools.hostname}:${_devTools.port}'
-          '/?hide=none&uri=${appServices.debugService.wsUri}',
+          '/?hide=none&uri=${appServices.debugService.uri}',
     });
   }
 }
diff --git a/dwds/lib/src/services/debug_service.dart b/dwds/lib/src/services/debug_service.dart
index 10160c3..7be8ede 100644
--- a/dwds/lib/src/services/debug_service.dart
+++ b/dwds/lib/src/services/debug_service.dart
@@ -10,11 +10,14 @@
 
 import 'package:dwds/src/debugging/remote_debugger.dart';
 import 'package:http_multi_server/http_multi_server.dart';
+import 'package:pedantic/pedantic.dart';
 import 'package:shelf/shelf.dart' as shelf;
+import 'package:shelf/shelf.dart';
 import 'package:shelf/shelf_io.dart';
 import 'package:shelf_web_socket/shelf_web_socket.dart';
 import 'package:vm_service/vm_service.dart';
 import 'package:web_socket_channel/web_socket_channel.dart';
+import 'package:sse/server/sse_handler.dart';
 
 import '../utilities/shared.dart';
 import 'chrome_proxy_service.dart';
@@ -49,6 +52,31 @@
   };
 }
 
+Future<void> _handleSseConnections(
+  SseHandler handler,
+  ChromeProxyService chromeProxyService,
+  ServiceExtensionRegistry serviceExtensionRegistry, {
+  void Function(Map<String, dynamic>) onRequest,
+  void Function(Map<String, dynamic>) onResponse,
+}) async {
+  while (await handler.connections.hasNext) {
+    var connection = await handler.connections.next;
+    var responseController = StreamController<Map<String, Object>>();
+    var sub = responseController.stream.map((response) {
+      if (onResponse != null) onResponse(response);
+      return jsonEncode(response);
+    }).listen(connection.sink.add);
+    var inputStream = connection.stream.map((value) {
+      var request = jsonDecode(value) as Map<String, Object>;
+      if (onRequest != null) onRequest(request);
+      return request;
+    });
+    var vmServerConnection = VmServerConnection(inputStream,
+        responseController.sink, serviceExtensionRegistry, chromeProxyService);
+    unawaited(vmServerConnection.done.whenComplete(sub.cancel));
+  }
+}
+
 /// A Dart Web Debug Service.
 ///
 /// Creates a [ChromeProxyService] from an existing Chrome instance.
@@ -59,15 +87,24 @@
   final int port;
   final HttpServer _server;
   final String _authToken;
+  final bool _useSse;
 
-  DebugService._(this.chromeProxyService, this.hostname, this.port,
-      this._authToken, this.serviceExtensionRegistry, this._server);
+  DebugService._(
+      this.chromeProxyService,
+      this.hostname,
+      this.port,
+      this._authToken,
+      this.serviceExtensionRegistry,
+      this._server,
+      this._useSse);
 
   Future<void> close() async {
     await _server.close();
   }
 
-  String get wsUri => 'ws://$hostname:$port/$_authToken';
+  String get uri => _useSse
+      ? 'sse://$hostname:$port/$_authToken/\$debugHandler'
+      : 'ws://$hostname:$port/$_authToken';
 
   /// [appInstanceId] is a unique String embedded in the instance of the
   /// application available through `window.$dartAppInstanceId`.
@@ -79,27 +116,44 @@
     String appInstanceId, {
     void Function(Map<String, dynamic>) onRequest,
     void Function(Map<String, dynamic>) onResponse,
+    bool useSse,
   }) async {
     var chromeProxyService = await ChromeProxyService.create(
         remoteDebugger, tabUrl, assetHandler, appInstanceId);
-    var serviceExtensionRegistry = ServiceExtensionRegistry();
     var authToken = _makeAuthToken();
-    var innerHandler = webSocketHandler(_createNewConnectionHandler(
-        chromeProxyService, serviceExtensionRegistry,
-        onRequest: onRequest, onResponse: onResponse));
-    var handler = (shelf.Request request) {
-      if (request.url.pathSegments.first != authToken) {
-        return shelf.Response.forbidden('Incorrect auth token');
-      }
-      return innerHandler(request);
-    };
+    var serviceExtensionRegistry = ServiceExtensionRegistry();
+    Handler handler;
+    if (useSse) {
+      var sseHandler = SseHandler(Uri.parse('/$authToken/\$debugHandler'));
+      handler = sseHandler.handler;
+      unawaited(_handleSseConnections(
+          sseHandler, chromeProxyService, serviceExtensionRegistry,
+          onRequest: onRequest, onResponse: onResponse));
+    } else {
+      var innerHandler = webSocketHandler(_createNewConnectionHandler(
+          chromeProxyService, serviceExtensionRegistry,
+          onRequest: onRequest, onResponse: onResponse));
+      handler = (shelf.Request request) {
+        if (request.url.pathSegments.first != authToken) {
+          return shelf.Response.forbidden('Incorrect auth token');
+        }
+        return innerHandler(request);
+      };
+    }
     var port = await findUnusedPort();
     var server = hostname == 'localhost'
         ? await HttpMultiServer.loopback(port)
         : await HttpServer.bind(hostname, port);
     serveRequests(server, handler);
-    return DebugService._(chromeProxyService, hostname, port, authToken,
-        serviceExtensionRegistry, server);
+    return DebugService._(
+      chromeProxyService,
+      hostname,
+      port,
+      authToken,
+      serviceExtensionRegistry,
+      server,
+      useSse,
+    );
   }
 }
 
diff --git a/dwds/test/debug_service_test.dart b/dwds/test/debug_service_test.dart
index 0eb685c..0fb3217 100644
--- a/dwds/test/debug_service_test.dart
+++ b/dwds/test/debug_service_test.dart
@@ -26,6 +26,6 @@
   });
 
   test('Accepts connections with the auth token', () async {
-    expect(WebSocket.connect('${context.debugConnection.wsUri}/ws'), completes);
+    expect(WebSocket.connect('${context.debugConnection.uri}/ws'), completes);
   });
 }
diff --git a/webdev/lib/src/daemon/app_domain.dart b/webdev/lib/src/daemon/app_domain.dart
index f3bd4d2..d55c46d 100644
--- a/webdev/lib/src/daemon/app_domain.dart
+++ b/webdev/lib/src/daemon/app_domain.dart
@@ -101,7 +101,7 @@
       sendEvent('app.debugPort', {
         'appId': _appId,
         'port': _debugConnection.port,
-        'wsUri': _debugConnection.wsUri,
+        'wsUri': _debugConnection.uri,
       });
       _resultSub = server.buildResults.listen(_handleBuildResult);