Merge pull request #4 from dart-lang/close-cleanup

Close cleanup
diff --git a/.travis.yml b/.travis.yml
index 21c6fe6..3b66d0d 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -4,6 +4,7 @@
 sudo: required
 dist: trusty
 addons:
+  chrome: stable
   apt:
     sources:
       - google-chrome
@@ -29,7 +30,7 @@
   - sh -e /etc/init.d/xvfb start
 
 before_script:
-  - wget http://chromedriver.storage.googleapis.com/2.35/chromedriver_linux64.zip
+  - wget http://chromedriver.storage.googleapis.com/2.46/chromedriver_linux64.zip
   - unzip chromedriver_linux64.zip
   - export PATH=$PATH:$PWD
   - ./tool/travis-setup.sh
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 77c5538..ba13a26 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,9 @@
+## 2.0.0
+
+- No longer expose `close` and `onClose` on an `SseConnection`. This is simply
+  handled by the underlying `stream` / `sink`.
+- Fix a bug where resources of the `SseConnection` were not properly closed.
+
 ## 1.0.0
 
 - Internal cleanup.
diff --git a/lib/server/sse_handler.dart b/lib/server/sse_handler.dart
index a683a2f..a3067f6 100644
--- a/lib/server/sse_handler.dart
+++ b/lib/server/sse_handler.dart
@@ -21,24 +21,28 @@
 
 /// A bi-directional SSE connection between server and browser.
 class SseConnection extends StreamChannelMixin<String> {
+  /// Incoming messages from the Browser client.
   final _incomingController = StreamController<String>();
-  final _outgoingController = StreamController<String>();
-  final _closeCompleter = Completer<Null>();
-  final Sink _sink;
-  final String _clientId;
 
-  SseConnection(this._sink, this._clientId) {
+  /// Outgoing messages to the Browser client.
+  final _outgoingController = StreamController<String>();
+
+  final Sink _sink;
+
+  final _closedCompleter = Completer<void>();
+
+  SseConnection(this._sink) {
     _outgoingController.stream.listen((data) {
-      if (!_closeCompleter.isCompleted) {
+      if (!_closedCompleter.isCompleted) {
         // JSON encode the message to escape new lines.
         _sink.add('data: ${json.encode(data)}\n');
         _sink.add('\n');
       }
     });
+    _outgoingController.onCancel = _close;
+    _incomingController.onCancel = _close;
   }
 
-  Future get onClose => _closeCompleter.future;
-
   /// The message added to the sink has to be JSON encodable.
   @override
   StreamSink<String> get sink => _outgoingController.sink;
@@ -50,8 +54,13 @@
   @override
   Stream<String> get stream => _incomingController.stream;
 
-  void close() {
-    if (!_closeCompleter.isCompleted) _closeCompleter.complete();
+  void _close() {
+    if (!_closedCompleter.isCompleted) {
+      _closedCompleter.complete();
+      _sink.close();
+      if (!_outgoingController.isClosed) _outgoingController.close();
+      if (!_incomingController.isClosed) _incomingController.close();
+    }
   }
 }
 
@@ -63,15 +72,15 @@
 class SseHandler {
   final _logger = Logger('SseHandler');
   final Uri _uri;
-
-  final Set<SseConnection> _connections = Set<SseConnection>();
-
+  final _connections = <String, SseConnection>{};
   final _connectionController = StreamController<SseConnection>();
 
+  StreamQueue<SseConnection> _connectionsStream;
+
   SseHandler(this._uri);
 
   StreamQueue<SseConnection> get connections =>
-      StreamQueue(_connectionController.stream);
+      _connectionsStream ??= StreamQueue(_connectionController.stream);
 
   shelf.Handler get handler => _handle;
 
@@ -82,19 +91,22 @@
       var sink = utf8.encoder.startChunkedConversion(channel.sink);
       sink.add(_sseHeaders(req.headers['origin']));
       var clientId = req.url.queryParameters['sseClientId'];
-      var connection = SseConnection(sink, clientId);
-      _connections.add(connection);
-      unawaited(connection.onClose.then((_) {
-        _connections.remove(connection);
+      var connection = SseConnection(sink);
+      _connections[clientId] = connection;
+      unawaited(connection._closedCompleter.future.then((_) {
+        _connections.remove(clientId);
       }));
+      // Remove connection when it is remotely closed or the stream is
+      // cancelled.
       channel.stream.listen((_) {
         // SSE is unidirectional. Responses are handled through POST requests.
       }, onDone: () {
-        connection.close();
+        connection._close();
       });
+
       _connectionController.add(connection);
     });
-    return null;
+    return shelf.Response.notFound('');
   }
 
   String _getOriginalPath(shelf.Request req) => req.requestedUri.path;
@@ -122,11 +134,7 @@
       var clientId = req.url.queryParameters['sseClientId'];
       var message = await req.readAsString();
       var jsonObject = json.decode(message) as String;
-      for (var connection in _connections) {
-        if (connection._clientId == clientId) {
-          connection._incomingController.add(jsonObject);
-        }
-      }
+      _connections[clientId]?._incomingController?.add(jsonObject);
     } catch (e, st) {
       _logger.fine('Failed to handle incoming message. $e $st');
     }
diff --git a/pubspec.yaml b/pubspec.yaml
index a86543f..054332b 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
 name: sse
-version: 1.0.0
+version: 2.0.0
 author: Dart Team <misc@dartlang.org>
 homepage: https://github.com/dart-lang/sse
 description: >-
diff --git a/test/sse_test.dart b/test/sse_test.dart
index d6e0445..16a9a0c 100644
--- a/test/sse_test.dart
+++ b/test/sse_test.dart
@@ -17,6 +17,21 @@
   HttpServer server;
   WebDriver webdriver;
   SseHandler handler;
+  Process chromeDriver;
+
+  setUpAll(() async {
+    try {
+      chromeDriver = await Process.start(
+          'chromedriver', ['--port=4444', '--url-base=wd/hub']);
+    } catch (e) {
+      throw StateError(
+          'Could not start ChromeDriver. Is it installed?\nError: $e');
+    }
+  });
+
+  tearDownAll(() {
+    chromeDriver.kill();
+  });
 
   setUp(() async {
     handler = SseHandler(Uri.parse('/test'));
@@ -28,7 +43,11 @@
             listDirectories: true, defaultDocument: 'index.html'));
 
     server = await io.serve(cascade.handler, 'localhost', 0);
-    webdriver = await createDriver();
+    webdriver = await createDriver(desired: {
+      'chromeOptions': {
+        'args': ['--headless']
+      }
+    });
   });
 
   tearDown(() async {
@@ -55,12 +74,12 @@
     var connections = handler.connections;
     await webdriver.get('http://localhost:${server.port}');
     var connectionA = await connections.next;
+    connectionA.sink.add('foo');
+    expect(await connectionA.stream.first, 'foo');
+
     await webdriver.get('http://localhost:${server.port}');
     var connectionB = await connections.next;
-
-    connectionA.sink.add('foo');
     connectionB.sink.add('bar');
-    await connectionA.onClose;
     expect(await connectionB.stream.first, 'bar');
   });
 
@@ -69,8 +88,8 @@
     await webdriver.get('http://localhost:${server.port}');
     var connection = await handler.connections.next;
     expect(handler.numberOfClients, 1);
-    connection.close();
-    await connection.onClose;
+    await connection.sink.close();
+    await pumpEventQueue();
     expect(handler.numberOfClients, 0);
   });
 
@@ -83,7 +102,20 @@
     var closeButton = await webdriver.findElement(const By.tagName('button'));
     await closeButton.click();
 
-    await connection.onClose;
+    // Should complete since the connection is closed.
+    await connection.stream.toList();
+    expect(handler.numberOfClients, 0);
+  });
+
+  test('Cancelling the listener closes the connection', () async {
+    expect(handler.numberOfClients, 0);
+    await webdriver.get('http://localhost:${server.port}');
+    var connection = await handler.connections.next;
+    expect(handler.numberOfClients, 1);
+
+    var sub = connection.stream.listen((_) {});
+    await sub.cancel();
+    await pumpEventQueue();
     expect(handler.numberOfClients, 0);
   });
 
diff --git a/tool/travis.sh b/tool/travis.sh
index b9c9c9f..9ebd3d4 100755
--- a/tool/travis.sh
+++ b/tool/travis.sh
@@ -23,10 +23,6 @@
   STATUS=$ANALYSIS_STATUS
 fi
 
-# Start chromedriver.
-chromedriver --port=4444 --url-base=wd/hub &
-PIDC=$!
-
 # Run tests.
 pub run test -r expanded -p vm -j 1
 TEST_STATUS=$?