Merge _StreamQueue into StreamQueue (#73)

The original reason to split into an abstract and concrete class was to
allow other implementations that would allow for the transaction
implementation. Since that was implemented without a separate subclass
merge the implementation back to one class to avoid unnecessary
abstraction and make it easier to debug.
diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart
index 0c81aec..61a1654 100644
--- a/lib/src/stream_queue.dart
+++ b/lib/src/stream_queue.dart
@@ -62,7 +62,7 @@
 ///
 /// When you need no further events the `StreamQueue` should be closed
 /// using [cancel]. This releases the underlying stream subscription.
-abstract class StreamQueue<T> {
+class StreamQueue<T> {
   // This class maintains two queues: one of events and one of requests.
   // The active request (the one in front of the queue) is called with
   // the current event queue when it becomes active, every time a
@@ -81,6 +81,14 @@
   // potentially a request that takes either five or zero events, determined
   // by the content of the fifth event.
 
+  final Stream<T> _source;
+
+  /// Subscription on [_source] while listening for events.
+  ///
+  /// Set to subscription when listening, and set to `null` when the
+  /// subscription is done (and [_isDone] is set to true).
+  StreamSubscription<T> _subscription;
+
   /// Whether the event source is done.
   bool _isDone = false;
 
@@ -107,9 +115,10 @@
   final Queue<_EventRequest> _requestQueue = new Queue();
 
   /// Create a `StreamQueue` of the events of [source].
-  factory StreamQueue(Stream<T> source) = _StreamQueue<T>;
+  factory StreamQueue(Stream<T> source) => StreamQueue._(source);
 
-  StreamQueue._();
+  // Private generative constructor to avoid subclasses.
+  StreamQueue._(this._source);
 
   /// Asks if the stream has any more events.
   ///
@@ -432,24 +441,66 @@
   /// Can only be used by the very last request (the stream queue must
   /// be closed by that request).
   /// Only used by [rest].
-  Stream<T> _extractStream();
+  Stream<T> _extractStream() {
+    assert(_isClosed);
+    if (_isDone) {
+      return new Stream<T>.empty();
+    }
+    _isDone = true;
+
+    if (_subscription == null) {
+      return _source;
+    }
+
+    var subscription = _subscription;
+    _subscription = null;
+
+    var wasPaused = subscription.isPaused;
+    var result = new SubscriptionStream<T>(subscription);
+    // Resume after creating stream because that pauses the subscription too.
+    // This way there won't be a short resumption in the middle.
+    if (wasPaused) subscription.resume();
+    return result;
+  }
 
   /// Requests that the event source pauses events.
   ///
   /// This is called automatically when the request queue is empty.
   ///
   /// The event source is restarted by the next call to [_ensureListening].
-  void _pause();
+  void _pause() {
+    _subscription.pause();
+  }
 
   /// Ensures that we are listening on events from the event source.
   ///
   /// Starts listening for the first time or resumes after a [_pause].
   ///
   /// Is called automatically if a request requires more events.
-  void _ensureListening();
+  void _ensureListening() {
+    if (_isDone) return;
+    if (_subscription == null) {
+      _subscription = _source.listen((data) {
+        _addResult(new Result.value(data));
+      }, onError: (error, StackTrace stackTrace) {
+        _addResult(new Result.error(error, stackTrace));
+      }, onDone: () {
+        _subscription = null;
+        this._close();
+      });
+    } else {
+      _subscription.resume();
+    }
+  }
 
   /// Cancels the underlying event source.
-  Future _cancel();
+  Future _cancel() {
+    if (_isDone) return null;
+    _subscription ??= _source.listen(null);
+    var future = _subscription.cancel();
+    _close();
+    return future;
+  }
 
   // ------------------------------------------------------------------
   // Methods called by the event source to add events or say that it's
@@ -494,73 +545,6 @@
   }
 }
 
-/// The default implementation of [StreamQueue].
-///
-/// This queue gets its events from a stream which is listened
-/// to when a request needs events.
-class _StreamQueue<T> extends StreamQueue<T> {
-  /// Source of events.
-  final Stream<T> _sourceStream;
-
-  /// Subscription on [_sourceStream] while listening for events.
-  ///
-  /// Set to subscription when listening, and set to `null` when the
-  /// subscription is done (and [_isDone] is set to true).
-  StreamSubscription<T> _subscription;
-
-  _StreamQueue(this._sourceStream) : super._();
-
-  Future _cancel() {
-    if (_isDone) return null;
-    if (_subscription == null) _subscription = _sourceStream.listen(null);
-    var future = _subscription.cancel();
-    _close();
-    return future;
-  }
-
-  void _ensureListening() {
-    if (_isDone) return;
-    if (_subscription == null) {
-      _subscription = _sourceStream.listen((data) {
-        _addResult(new Result.value(data));
-      }, onError: (error, StackTrace stackTrace) {
-        _addResult(new Result.error(error, stackTrace));
-      }, onDone: () {
-        _subscription = null;
-        this._close();
-      });
-    } else {
-      _subscription.resume();
-    }
-  }
-
-  void _pause() {
-    _subscription.pause();
-  }
-
-  Stream<T> _extractStream() {
-    assert(_isClosed);
-    if (_isDone) {
-      return new Stream<T>.empty();
-    }
-    _isDone = true;
-
-    if (_subscription == null) {
-      return _sourceStream;
-    }
-
-    var subscription = _subscription;
-    _subscription = null;
-
-    var wasPaused = subscription.isPaused;
-    var result = new SubscriptionStream<T>(subscription);
-    // Resume after creating stream because that pauses the subscription too.
-    // This way there won't be a short resumption in the middle.
-    if (wasPaused) subscription.resume();
-    return result;
-  }
-}
-
 /// A transaction on a [StreamQueue], created by [StreamQueue.startTransaction].
 ///
 /// Copies of the parent queue may be created using [newQueue]. Calling [commit]