Implement asBroadcast using a _MultiplexStreamController.

R=floitsch@google.com

Review URL: https://codereview.chromium.org//15673006

git-svn-id: https://dart.googlecode.com/svn/branches/bleeding_edge/dart@23342 260f80e4-7a28-3924-810f-c04153c831b5
diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart
index 528dee0..07378b5 100644
--- a/sdk/lib/async/stream.dart
+++ b/sdk/lib/async/stream.dart
@@ -158,7 +158,7 @@
    */
   Stream<T> asBroadcastStream() {
     if (isBroadcast) return this;
-    return new _SingleStreamMultiplexer<T>(this);
+    return new _AsBroadcastStream<T>(this);
   }
 
   /**
diff --git a/sdk/lib/async/stream_controller.dart b/sdk/lib/async/stream_controller.dart
index a313f2a..b057565 100644
--- a/sdk/lib/async/stream_controller.dart
+++ b/sdk/lib/async/stream_controller.dart
@@ -448,6 +448,9 @@
   /** Whether there are currently a subscriber on the [Stream]. */
   bool get hasListener => !_isEmpty;
 
+  /** Whether an event is being fired (sent to some, but not all, listeners). */
+  bool get _isFiring => (_state & _STATE_FIRING) != 0;
+
   // Linked list helpers
 
   bool get _isEmpty => identical(_next, this);
@@ -488,7 +491,7 @@
       // If we are currently firing an event, the empty-check is performed at
       // the end of the listener loop instead of here.
       if ((_state & _STATE_FIRING) == 0 && _isEmpty) {
-        _runGuarded(_onCancel);
+        _callOnCancel();
       }
     }
   }
@@ -499,24 +502,45 @@
   // EventSink interface.
 
   void add(T data) {
-    assert(!isClosed);
+    if (isClosed) {
+      throw new StateError("Cannot add new events after calling close()");
+    }
+    _sendData(data);
+  }
+
+  void addError(Object error, [Object stackTrace]) {
+    if (isClosed) {
+      throw new StateError("Cannot add new events after calling close()");
+    }
+    if (stackTrace != null) _attachStackTrace(error, stackTrace);
+    _sendError(error);
+  }
+
+  void close() {
+    if (isClosed) {
+      throw new StateError("Cannot add new events after calling close()");
+    }
+    _state |= _STATE_CLOSED;
+    _sendDone();
+  }
+
+  // EventDispatch interface.
+
+  void _sendData(T data) {
     if (_isEmpty) return;
     _forEachListener((_BufferingStreamSubscription<T> subscription) {
       subscription._add(data);
     });
   }
 
-  void addError(Object error, [Object stackTrace]) {
-    assert(!isClosed);
+  void _sendError(Object error) {
     if (_isEmpty) return;
     _forEachListener((_BufferingStreamSubscription<T> subscription) {
       subscription._addError(error);
     });
   }
 
-  void close() {
-    assert(!isClosed);
-    _state |= _STATE_CLOSED;
+  void _sendDone() {
     if (_isEmpty) return;
     _forEachListener((_MultiplexSubscription<T> subscription) {
       subscription._close();
@@ -527,7 +551,7 @@
 
   void _forEachListener(
       void action(_BufferingStreamSubscription<T> subscription)) {
-    if ((_state & _STATE_FIRING) != 0) {
+    if (_isFiring) {
       throw new StateError(
           "Cannot fire new event. Controller is already firing an event");
     }
@@ -561,7 +585,70 @@
     _state &= ~_STATE_FIRING;
 
     if (_isEmpty) {
-      _runGuarded(_onCancel);
+      _callOnCancel();
     }
   }
+
+  void _callOnCancel() {
+    _runGuarded(_onCancel);
+  }
+}
+
+class _BufferingMultiplexStreamController<T>
+    extends _MultiplexStreamController<T>
+    implements _EventDispatch<T> {
+  _StreamImplEvents _pending;
+
+  _BufferingMultiplexStreamController(void onListen(), void onCancel())
+      : super(onListen, onCancel);
+
+  bool get _hasPending => _pending != null && ! _pending.isEmpty;
+
+  void _addPendingEvent(_DelayedEvent event) {
+    if (_pending == null) {
+      _pending = new _StreamImplEvents();
+    }
+    _pending.add(event);
+  }
+
+  void add(T data) {
+    if (_isFiring) {
+      _addPendingEvent(new _DelayedData<T>(data));
+      return;
+    }
+    super.add(data);
+    while (_hasPending) {
+      _pending.handleNext(this);
+    }
+  }
+
+  void addError(Object error, [StackTrace stackTrace]) {
+    if (_isFiring) {
+      _addPendingEvent(new _DelayedError(error));
+      return;
+    }
+    super.addError(error, stackTrace);
+    while (_hasPending) {
+      _pending.handleNext(this);
+    }
+  }
+
+  void close() {
+    if (_isFiring) {
+      _addPendingEvent(const _DelayedDone());
+      _state |= _STATE_CLOSED;
+      return;
+    }
+    super.close();
+    assert(!_hasPending);
+  }
+
+  void _callOnCancel() {
+    if (_hasPending) {
+      _pending.clear();
+      _pending = null;
+    }
+    super._callOnCancel();
+
+  }
 }
diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart
index 5987050..375844c 100644
--- a/sdk/lib/async/stream_impl.dart
+++ b/sdk/lib/async/stream_impl.dart
@@ -598,86 +598,6 @@
   }
 }
 
-/**
- * Simple internal doubly-linked list implementation.
- *
- * In an internal linked list, the links are in the data objects themselves,
- * instead of in a separate object. That means each element can be in at most
- * one list at a time.
- *
- * All links are always members of an element cycle. At creation it's a
- * singleton cycle.
- */
-abstract class _InternalLink {
-  _InternalLink _nextLink;
-  _InternalLink _previousLink;
-
-  _InternalLink() {
-    this._previousLink = this._nextLink = this;
-  }
-
-  /* Removes a link from any list it may be part of, and links it to itself. */
-  static void unlink(_InternalLink element) {
-    _InternalLink next = element._nextLink;
-    _InternalLink previous = element._previousLink;
-    next._previousLink = previous;
-    previous._nextLink = next;
-    element._nextLink = element._previousLink = element;
-  }
-
-  /** Check whether an element is unattached to other elements. */
-  static bool isUnlinked(_InternalLink element) {
-    return identical(element, element._nextLink);
-  }
-}
-
-/**
- * Marker interface for "list" links.
- *
- * An "InternalLinkList" is an abstraction on top of a link cycle, where the
- * "list" object itself is not considered an element (it's just a header link
- * created to avoid edge cases).
- * An element is considered part of a list if it is in the list's cycle.
- * There should never be more than one "list" object in a cycle.
- */
-abstract class _InternalLinkList extends _InternalLink {
-  /**
-   * Adds an element to a list, just before the header link.
-   *
-   * This effectively adds it at the end of the list.
-   */
-  static void add(_InternalLinkList list, _InternalLink element) {
-    if (!_InternalLink.isUnlinked(element)) _InternalLink.unlink(element);
-    _InternalLink listEnd = list._previousLink;
-    listEnd._nextLink = element;
-    list._previousLink = element;
-    element._previousLink = listEnd;
-    element._nextLink = list;
-  }
-
-  /** Removes an element from its list. */
-  static void remove(_InternalLink element) {
-    _InternalLink.unlink(element);
-  }
-
-  /** Check whether a list contains no elements, only the header link. */
-  static bool isEmpty(_InternalLinkList list) => _InternalLink.isUnlinked(list);
-
-  /** Moves all elements from the list [other] to [list]. */
-  static void addAll(_InternalLinkList list, _InternalLinkList other) {
-    if (isEmpty(other)) return;
-    _InternalLink listLast = list._previousLink;
-    _InternalLink otherNext = other._nextLink;
-    listLast._nextLink = otherNext;
-    otherNext._previousLink = listLast;
-    _InternalLink otherLast = other._previousLink;
-    list._previousLink = otherLast;
-    otherLast._nextLink = list;
-    // Clean up [other].
-    other._nextLink = other._previousLink = other;
-  }
-}
-
 /** Superclass for provider of pending events. */
 abstract class _PendingEvents {
   // No async event has been scheduled.
@@ -793,283 +713,42 @@
   }
 }
 
-/**
- * A subscription used by [_SingleStreamMultiplexer].
- *
- * The [_SingleStreamMultiplexer] is a [Stream] which allows multiple
- * listeners at a time. It is used to implement [Stream.asBroadcastStream].
- *
- * It is itself listening to another stream for events, and it forwards all
- * events to all of its simultanous listeners.
- *
- * The listeners are [_MultiplexerSubscription]s and are kept as a linked list.
- */
-// TODO(lrn): Change "implements" to "with" when automatic mixin constructors
-//            are implemented.
-class _MultiplexerSubscription<T> extends _BufferingStreamSubscription<T>
-                                  implements _MultiplexerLinkedList {
-  static const int _STATE_NOT_LISTENING = 0;
-  // Bit that alternates between event firings. If bit matches the one currently
-  // firing, the subscription will not be notified.
-  static const int _STATE_EVENT_ID_BIT = 1;
-  // Whether the subscription is listening at all. This should be set while
-  // it is part of the linked list of listeners of a multiplexer stream.
-  static const int _STATE_LISTENING = 2;
-  // State bit set while firing an event.
-  static const int _STATE_IS_FIRING = 4;
-  // Bit set if a subscription is canceled while it's firing (the
-  // [_STATE_IS_FIRING] bit is set).
-  // If the subscription is canceled while firing, it is not removed from the
-  // linked list immediately (to avoid breaking iteration), but is instead
-  // removed after it is done firing.
-  static const int _STATE_REMOVE_AFTER_FIRING = 8;
-
-  // Firing state.
-  int _multiplexState;
-
-  _SingleStreamMultiplexer _source;
-
-  _MultiplexerSubscription(this._source,
-                           void onData(T data),
-                           void onError(Object error),
-                           void onDone(),
-                           bool cancelOnError,
-                           int nextEventId)
-      : _multiplexState = _STATE_LISTENING | nextEventId,
-        super(onData, onError, onDone, cancelOnError) {
-    _next = _previous = this;
-  }
-
-  // Mixin workaround.
-  _MultiplexerLinkedList _next;
-  _MultiplexerLinkedList _previous;
-
-  void _unlink() {
-    _previous._next = _next;
-    _next._previous = _previous;
-    _next = _previous = this;
-  }
-
-  void _insertBefore(_MultiplexerLinkedList newNext) {
-    _MultiplexerLinkedList newPrevious = newNext._previous;
-    newPrevious._next = this;
-    newNext._previous = _previous;
-    _previous._next = newNext;
-    _previous = newPrevious;
-  }
-  // End mixin.
-
-  bool get _isListening => _multiplexState >= _STATE_LISTENING;
-  bool get _isFiring => _multiplexState >= _STATE_IS_FIRING;
-  bool get _removeAfterFiring => _multiplexState >= _STATE_REMOVE_AFTER_FIRING;
-  int get _eventId => _multiplexState & _STATE_EVENT_ID_BIT;
-
-  void _setRemoveAfterFiring() {
-    assert(_isFiring);
-    _multiplexState |= _STATE_REMOVE_AFTER_FIRING;
-  }
-
-  void _startFiring() {
-    assert(!_isFiring);
-    _multiplexState |= _STATE_IS_FIRING;
-  }
-
-  /// Marks listener as no longer firing, and toggles its event id.
-  void _endFiring() {
-    assert(_isFiring);
-    _multiplexState ^= (_STATE_IS_FIRING | _STATE_EVENT_ID_BIT);
-  }
-
-  void _setNotListening() {
-    assert(_isListening);
-    _multiplexState = _STATE_NOT_LISTENING;
-  }
-
-  void _onCancel() {
-    assert(_isListening);
-    _source._removeListener(this);
-  }
-}
-
-/**
- * A stream that sends events from another stream to multiple listeners.
- *
- * This is used to implement [Stream.asBroadcastStream].
- *
- * This stream allows listening more than once.
- * When the first listener is added, it starts listening on its source
- * stream for events. All events from the source stream are sent to all
- * active listeners. The listeners handle their own buffering.
- * When the last listener cancels, the source stream subscription is also
- * canceled, and no further listening is possible.
- */
-// TODO(lrn): change "implements" to "with" when the VM supports it.
-class _SingleStreamMultiplexer<T> extends Stream<T>
-                                  implements _MultiplexerLinkedList,
-                                             _EventDispatch<T> {
+class _AsBroadcastStream<T> extends Stream<T> {
   final Stream<T> _source;
+  _BufferingMultiplexStreamController<T> _controller;
   StreamSubscription<T> _subscription;
-  // Alternates between zero and one for each event fired.
-  // Listeners are initialized with the next event's id, and will
-  // only be notified if they match the event being fired.
-  // That way listeners added during event firing will not receive
-  // the current event.
-  int _eventId = 0;
 
-  bool _isFiring = false;
-
-  // Remember events added while firing.
-  _StreamImplEvents _pending;
-
-  _SingleStreamMultiplexer(this._source) {
-    _next = _previous = this;
+  _AsBroadcastStream(this._source) {
+    _controller = new _BufferingMultiplexStreamController<T>(null, _onCancel);
   }
 
-  bool get _hasPending => _pending != null && !_pending.isEmpty;
-
-  // Should be mixin.
-  _MultiplexerLinkedList _next;
-  _MultiplexerLinkedList _previous;
-
-  void _unlink() {
-    _previous._next = _next;
-    _next._previous = _previous;
-    _next = _previous = this;
-  }
-
-  void _insertBefore(_MultiplexerLinkedList newNext) {
-    _MultiplexerLinkedList newPrevious = newNext._previous;
-    newPrevious._next = this;
-    newNext._previous = _previous;
-    _previous._next = newNext;
-    _previous = newPrevious;
-  }
-  // End of mixin.
+  bool get isBroadcast => true;
 
   StreamSubscription<T> listen(void onData(T data),
                                { void onError(Object error),
                                  void onDone(),
-                                 bool cancelOnError }) {
-    if (onData == null) onData = _nullDataHandler;
-    if (onError == null) onError = _nullErrorHandler;
-    if (onDone == null) onDone = _nullDoneHandler;
-    cancelOnError = identical(true, cancelOnError);
-    _MultiplexerSubscription subscription =
-        new _MultiplexerSubscription(this, onData, onError, onDone,
-                                     cancelOnError, _eventId);
+                                 bool cancelOnError}) {
+    if (_controller == null) {
+      throw new StateError("Source stream has been closed.");
+    }
     if (_subscription == null) {
-      _subscription = _source.listen(_add, onError: _addError, onDone: _close);
+      _subscription = _source.listen(_controller.add,
+                                     onError: _controller.addError,
+                                     onDone: _controller.close);
     }
-    subscription._insertBefore(this);
-    return subscription;
+    return _controller.stream.listen(onData, onError: onError, onDone: onDone,
+                                     cancelOnError: cancelOnError);
   }
 
-  /** Called by [_MultiplexerSubscription.remove]. */
-  void _removeListener(_MultiplexerSubscription listener) {
-    if (listener._isFiring) {
-      listener._setRemoveAfterFiring();
-    } else {
-      _unlinkListener(listener);
-    }
-  }
-
-  /** Remove a listener and close the subscription after the last one. */
-  void _unlinkListener(_MultiplexerSubscription listener) {
-    listener._setNotListening();
-    listener._unlink();
-    if (identical(_next, this)) {
-      // Last listener removed.
-      _cancel();
-    }
-  }
-
-  void _cancel() {
+  void _onCancel() {
+    // Called by [_controller] when it has no subscribers left.
     StreamSubscription subscription = _subscription;
     _subscription = null;
+    _controller = null;  // Marks the stream as no longer listenable.
     subscription.cancel();
-    if (_pending != null) _pending.cancelSchedule();
-  }
-
-  void _forEachListener(void action(_MultiplexerSubscription listener)) {
-    int eventId = _eventId;
-    _eventId ^= 1;
-    _isFiring = true;
-    _MultiplexerLinkedList entry = _next;
-    // Call each listener in order. A listener can be removed during any
-    // other listener's event. During its own event it will only be marked
-    // as "to be removed", and it will be handled after the event is done.
-    while (!identical(entry, this)) {
-      _MultiplexerSubscription listener = entry;
-      if (listener._eventId == eventId) {
-        listener._startFiring();
-        action(listener);
-        listener._endFiring(); // Also toggles the event id.
-      }
-      entry = listener._next;
-      if (listener._removeAfterFiring) {
-        _unlinkListener(listener);
-      }
-    }
-    _isFiring = false;
-  }
-
-  void _add(T data) {
-    if (_isFiring || _hasPending) {
-      _StreamImplEvents pending = _pending;
-      if (pending == null) pending = _pending = new _StreamImplEvents();
-      pending.add(new _DelayedData(data));
-    } else {
-      _sendData(data);
-    }
-  }
-
-  void _addError(Object error) {
-    if (_isFiring || _hasPending) {
-      _StreamImplEvents pending = _pending;
-      if (pending == null) pending = _pending = new _StreamImplEvents();
-      pending.add(new _DelayedError(error));
-    } else {
-      _sendError(error);
-    }
-  }
-
-  void _close() {
-    if (_isFiring || _hasPending) {
-      _StreamImplEvents pending = _pending;
-      if (pending == null) pending = _pending = new _StreamImplEvents();
-      pending.add(const _DelayedDone());
-    } else {
-      _sendDone();
-    }
-  }
-
-  void _sendData(T data) {
-    _forEachListener((_MultiplexerSubscription listener) {
-      listener._add(data);
-    });
-    if (_hasPending) {
-      _pending.schedule(this);
-    }
-  }
-
-  void _sendError(Object error) {
-    _forEachListener((_MultiplexerSubscription listener) {
-      listener._addError(error);
-    });
-    if (_hasPending) {
-      _pending.schedule(this);
-    }
-  }
-
-  void _sendDone() {
-    _forEachListener((_MultiplexerSubscription listener) {
-      listener._setRemoveAfterFiring();
-      listener._close();
-    });
   }
 }
 
-
 /**
  * Simple implementation of [StreamIterator].
  */
diff --git a/tests/lib/async/stream_controller_async_test.dart b/tests/lib/async/stream_controller_async_test.dart
index 1ed5143..65e658f 100644
--- a/tests/lib/async/stream_controller_async_test.dart
+++ b/tests/lib/async/stream_controller_async_test.dart
@@ -482,6 +482,25 @@
     c.add(42);
     c.close();
   });
+
+  test("broadcast-controller-add-in-callback", () {
+    StreamController<int> c;
+    c = new StreamController(
+      onListen: expectAsync0(() {}),
+      onCancel: expectAsync0(() {
+        c.add(42);
+      })
+    );
+    var sub;
+    sub = c.stream.asBroadcastStream().listen(expectAsync1((v) {
+      Expect.equals(37, v);
+      c.add(21);
+      sub.cancel();
+    }));
+    c.add(37);  // Triggers listener, which adds 21 and removes itself.
+    // Removing listener triggers onCancel which adds another 42.
+    // Both 21 and 42 are lost because there are no listeners.
+  });
 }
 
 main() {