Implement asBroadcast using a _MultiplexStreamController.
R=floitsch@google.com
Review URL: https://codereview.chromium.org//15673006
git-svn-id: https://dart.googlecode.com/svn/branches/bleeding_edge/dart@23342 260f80e4-7a28-3924-810f-c04153c831b5
diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart
index 528dee0..07378b5 100644
--- a/sdk/lib/async/stream.dart
+++ b/sdk/lib/async/stream.dart
@@ -158,7 +158,7 @@
*/
Stream<T> asBroadcastStream() {
if (isBroadcast) return this;
- return new _SingleStreamMultiplexer<T>(this);
+ return new _AsBroadcastStream<T>(this);
}
/**
diff --git a/sdk/lib/async/stream_controller.dart b/sdk/lib/async/stream_controller.dart
index a313f2a..b057565 100644
--- a/sdk/lib/async/stream_controller.dart
+++ b/sdk/lib/async/stream_controller.dart
@@ -448,6 +448,9 @@
/** Whether there are currently a subscriber on the [Stream]. */
bool get hasListener => !_isEmpty;
+ /** Whether an event is being fired (sent to some, but not all, listeners). */
+ bool get _isFiring => (_state & _STATE_FIRING) != 0;
+
// Linked list helpers
bool get _isEmpty => identical(_next, this);
@@ -488,7 +491,7 @@
// If we are currently firing an event, the empty-check is performed at
// the end of the listener loop instead of here.
if ((_state & _STATE_FIRING) == 0 && _isEmpty) {
- _runGuarded(_onCancel);
+ _callOnCancel();
}
}
}
@@ -499,24 +502,45 @@
// EventSink interface.
void add(T data) {
- assert(!isClosed);
+ if (isClosed) {
+ throw new StateError("Cannot add new events after calling close()");
+ }
+ _sendData(data);
+ }
+
+ void addError(Object error, [Object stackTrace]) {
+ if (isClosed) {
+ throw new StateError("Cannot add new events after calling close()");
+ }
+ if (stackTrace != null) _attachStackTrace(error, stackTrace);
+ _sendError(error);
+ }
+
+ void close() {
+ if (isClosed) {
+ throw new StateError("Cannot add new events after calling close()");
+ }
+ _state |= _STATE_CLOSED;
+ _sendDone();
+ }
+
+ // EventDispatch interface.
+
+ void _sendData(T data) {
if (_isEmpty) return;
_forEachListener((_BufferingStreamSubscription<T> subscription) {
subscription._add(data);
});
}
- void addError(Object error, [Object stackTrace]) {
- assert(!isClosed);
+ void _sendError(Object error) {
if (_isEmpty) return;
_forEachListener((_BufferingStreamSubscription<T> subscription) {
subscription._addError(error);
});
}
- void close() {
- assert(!isClosed);
- _state |= _STATE_CLOSED;
+ void _sendDone() {
if (_isEmpty) return;
_forEachListener((_MultiplexSubscription<T> subscription) {
subscription._close();
@@ -527,7 +551,7 @@
void _forEachListener(
void action(_BufferingStreamSubscription<T> subscription)) {
- if ((_state & _STATE_FIRING) != 0) {
+ if (_isFiring) {
throw new StateError(
"Cannot fire new event. Controller is already firing an event");
}
@@ -561,7 +585,70 @@
_state &= ~_STATE_FIRING;
if (_isEmpty) {
- _runGuarded(_onCancel);
+ _callOnCancel();
}
}
+
+ void _callOnCancel() {
+ _runGuarded(_onCancel);
+ }
+}
+
+class _BufferingMultiplexStreamController<T>
+ extends _MultiplexStreamController<T>
+ implements _EventDispatch<T> {
+ _StreamImplEvents _pending;
+
+ _BufferingMultiplexStreamController(void onListen(), void onCancel())
+ : super(onListen, onCancel);
+
+ bool get _hasPending => _pending != null && ! _pending.isEmpty;
+
+ void _addPendingEvent(_DelayedEvent event) {
+ if (_pending == null) {
+ _pending = new _StreamImplEvents();
+ }
+ _pending.add(event);
+ }
+
+ void add(T data) {
+ if (_isFiring) {
+ _addPendingEvent(new _DelayedData<T>(data));
+ return;
+ }
+ super.add(data);
+ while (_hasPending) {
+ _pending.handleNext(this);
+ }
+ }
+
+ void addError(Object error, [StackTrace stackTrace]) {
+ if (_isFiring) {
+ _addPendingEvent(new _DelayedError(error));
+ return;
+ }
+ super.addError(error, stackTrace);
+ while (_hasPending) {
+ _pending.handleNext(this);
+ }
+ }
+
+ void close() {
+ if (_isFiring) {
+ _addPendingEvent(const _DelayedDone());
+ _state |= _STATE_CLOSED;
+ return;
+ }
+ super.close();
+ assert(!_hasPending);
+ }
+
+ void _callOnCancel() {
+ if (_hasPending) {
+ _pending.clear();
+ _pending = null;
+ }
+ super._callOnCancel();
+
+ }
}
diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart
index 5987050..375844c 100644
--- a/sdk/lib/async/stream_impl.dart
+++ b/sdk/lib/async/stream_impl.dart
@@ -598,86 +598,6 @@
}
}
-/**
- * Simple internal doubly-linked list implementation.
- *
- * In an internal linked list, the links are in the data objects themselves,
- * instead of in a separate object. That means each element can be in at most
- * one list at a time.
- *
- * All links are always members of an element cycle. At creation it's a
- * singleton cycle.
- */
-abstract class _InternalLink {
- _InternalLink _nextLink;
- _InternalLink _previousLink;
-
- _InternalLink() {
- this._previousLink = this._nextLink = this;
- }
-
- /* Removes a link from any list it may be part of, and links it to itself. */
- static void unlink(_InternalLink element) {
- _InternalLink next = element._nextLink;
- _InternalLink previous = element._previousLink;
- next._previousLink = previous;
- previous._nextLink = next;
- element._nextLink = element._previousLink = element;
- }
-
- /** Check whether an element is unattached to other elements. */
- static bool isUnlinked(_InternalLink element) {
- return identical(element, element._nextLink);
- }
-}
-
-/**
- * Marker interface for "list" links.
- *
- * An "InternalLinkList" is an abstraction on top of a link cycle, where the
- * "list" object itself is not considered an element (it's just a header link
- * created to avoid edge cases).
- * An element is considered part of a list if it is in the list's cycle.
- * There should never be more than one "list" object in a cycle.
- */
-abstract class _InternalLinkList extends _InternalLink {
- /**
- * Adds an element to a list, just before the header link.
- *
- * This effectively adds it at the end of the list.
- */
- static void add(_InternalLinkList list, _InternalLink element) {
- if (!_InternalLink.isUnlinked(element)) _InternalLink.unlink(element);
- _InternalLink listEnd = list._previousLink;
- listEnd._nextLink = element;
- list._previousLink = element;
- element._previousLink = listEnd;
- element._nextLink = list;
- }
-
- /** Removes an element from its list. */
- static void remove(_InternalLink element) {
- _InternalLink.unlink(element);
- }
-
- /** Check whether a list contains no elements, only the header link. */
- static bool isEmpty(_InternalLinkList list) => _InternalLink.isUnlinked(list);
-
- /** Moves all elements from the list [other] to [list]. */
- static void addAll(_InternalLinkList list, _InternalLinkList other) {
- if (isEmpty(other)) return;
- _InternalLink listLast = list._previousLink;
- _InternalLink otherNext = other._nextLink;
- listLast._nextLink = otherNext;
- otherNext._previousLink = listLast;
- _InternalLink otherLast = other._previousLink;
- list._previousLink = otherLast;
- otherLast._nextLink = list;
- // Clean up [other].
- other._nextLink = other._previousLink = other;
- }
-}
-
/** Superclass for provider of pending events. */
abstract class _PendingEvents {
// No async event has been scheduled.
@@ -793,283 +713,42 @@
}
}
-/**
- * A subscription used by [_SingleStreamMultiplexer].
- *
- * The [_SingleStreamMultiplexer] is a [Stream] which allows multiple
- * listeners at a time. It is used to implement [Stream.asBroadcastStream].
- *
- * It is itself listening to another stream for events, and it forwards all
- * events to all of its simultanous listeners.
- *
- * The listeners are [_MultiplexerSubscription]s and are kept as a linked list.
- */
-// TODO(lrn): Change "implements" to "with" when automatic mixin constructors
-// are implemented.
-class _MultiplexerSubscription<T> extends _BufferingStreamSubscription<T>
- implements _MultiplexerLinkedList {
- static const int _STATE_NOT_LISTENING = 0;
- // Bit that alternates between event firings. If bit matches the one currently
- // firing, the subscription will not be notified.
- static const int _STATE_EVENT_ID_BIT = 1;
- // Whether the subscription is listening at all. This should be set while
- // it is part of the linked list of listeners of a multiplexer stream.
- static const int _STATE_LISTENING = 2;
- // State bit set while firing an event.
- static const int _STATE_IS_FIRING = 4;
- // Bit set if a subscription is canceled while it's firing (the
- // [_STATE_IS_FIRING] bit is set).
- // If the subscription is canceled while firing, it is not removed from the
- // linked list immediately (to avoid breaking iteration), but is instead
- // removed after it is done firing.
- static const int _STATE_REMOVE_AFTER_FIRING = 8;
-
- // Firing state.
- int _multiplexState;
-
- _SingleStreamMultiplexer _source;
-
- _MultiplexerSubscription(this._source,
- void onData(T data),
- void onError(Object error),
- void onDone(),
- bool cancelOnError,
- int nextEventId)
- : _multiplexState = _STATE_LISTENING | nextEventId,
- super(onData, onError, onDone, cancelOnError) {
- _next = _previous = this;
- }
-
- // Mixin workaround.
- _MultiplexerLinkedList _next;
- _MultiplexerLinkedList _previous;
-
- void _unlink() {
- _previous._next = _next;
- _next._previous = _previous;
- _next = _previous = this;
- }
-
- void _insertBefore(_MultiplexerLinkedList newNext) {
- _MultiplexerLinkedList newPrevious = newNext._previous;
- newPrevious._next = this;
- newNext._previous = _previous;
- _previous._next = newNext;
- _previous = newPrevious;
- }
- // End mixin.
-
- bool get _isListening => _multiplexState >= _STATE_LISTENING;
- bool get _isFiring => _multiplexState >= _STATE_IS_FIRING;
- bool get _removeAfterFiring => _multiplexState >= _STATE_REMOVE_AFTER_FIRING;
- int get _eventId => _multiplexState & _STATE_EVENT_ID_BIT;
-
- void _setRemoveAfterFiring() {
- assert(_isFiring);
- _multiplexState |= _STATE_REMOVE_AFTER_FIRING;
- }
-
- void _startFiring() {
- assert(!_isFiring);
- _multiplexState |= _STATE_IS_FIRING;
- }
-
- /// Marks listener as no longer firing, and toggles its event id.
- void _endFiring() {
- assert(_isFiring);
- _multiplexState ^= (_STATE_IS_FIRING | _STATE_EVENT_ID_BIT);
- }
-
- void _setNotListening() {
- assert(_isListening);
- _multiplexState = _STATE_NOT_LISTENING;
- }
-
- void _onCancel() {
- assert(_isListening);
- _source._removeListener(this);
- }
-}
-
-/**
- * A stream that sends events from another stream to multiple listeners.
- *
- * This is used to implement [Stream.asBroadcastStream].
- *
- * This stream allows listening more than once.
- * When the first listener is added, it starts listening on its source
- * stream for events. All events from the source stream are sent to all
- * active listeners. The listeners handle their own buffering.
- * When the last listener cancels, the source stream subscription is also
- * canceled, and no further listening is possible.
- */
-// TODO(lrn): change "implements" to "with" when the VM supports it.
-class _SingleStreamMultiplexer<T> extends Stream<T>
- implements _MultiplexerLinkedList,
- _EventDispatch<T> {
+class _AsBroadcastStream<T> extends Stream<T> {
final Stream<T> _source;
+ _BufferingMultiplexStreamController<T> _controller;
StreamSubscription<T> _subscription;
- // Alternates between zero and one for each event fired.
- // Listeners are initialized with the next event's id, and will
- // only be notified if they match the event being fired.
- // That way listeners added during event firing will not receive
- // the current event.
- int _eventId = 0;
- bool _isFiring = false;
-
- // Remember events added while firing.
- _StreamImplEvents _pending;
-
- _SingleStreamMultiplexer(this._source) {
- _next = _previous = this;
+ _AsBroadcastStream(this._source) {
+ _controller = new _BufferingMultiplexStreamController<T>(null, _onCancel);
}
- bool get _hasPending => _pending != null && !_pending.isEmpty;
-
- // Should be mixin.
- _MultiplexerLinkedList _next;
- _MultiplexerLinkedList _previous;
-
- void _unlink() {
- _previous._next = _next;
- _next._previous = _previous;
- _next = _previous = this;
- }
-
- void _insertBefore(_MultiplexerLinkedList newNext) {
- _MultiplexerLinkedList newPrevious = newNext._previous;
- newPrevious._next = this;
- newNext._previous = _previous;
- _previous._next = newNext;
- _previous = newPrevious;
- }
- // End of mixin.
+ bool get isBroadcast => true;
StreamSubscription<T> listen(void onData(T data),
{ void onError(Object error),
void onDone(),
- bool cancelOnError }) {
- if (onData == null) onData = _nullDataHandler;
- if (onError == null) onError = _nullErrorHandler;
- if (onDone == null) onDone = _nullDoneHandler;
- cancelOnError = identical(true, cancelOnError);
- _MultiplexerSubscription subscription =
- new _MultiplexerSubscription(this, onData, onError, onDone,
- cancelOnError, _eventId);
+ bool cancelOnError}) {
+ if (_controller == null) {
+ throw new StateError("Source stream has been closed.");
+ }
if (_subscription == null) {
- _subscription = _source.listen(_add, onError: _addError, onDone: _close);
+ _subscription = _source.listen(_controller.add,
+ onError: _controller.addError,
+ onDone: _controller.close);
}
- subscription._insertBefore(this);
- return subscription;
+ return _controller.stream.listen(onData, onError: onError, onDone: onDone,
+ cancelOnError: cancelOnError);
}
- /** Called by [_MultiplexerSubscription.remove]. */
- void _removeListener(_MultiplexerSubscription listener) {
- if (listener._isFiring) {
- listener._setRemoveAfterFiring();
- } else {
- _unlinkListener(listener);
- }
- }
-
- /** Remove a listener and close the subscription after the last one. */
- void _unlinkListener(_MultiplexerSubscription listener) {
- listener._setNotListening();
- listener._unlink();
- if (identical(_next, this)) {
- // Last listener removed.
- _cancel();
- }
- }
-
- void _cancel() {
+ void _onCancel() {
+ // Called by [_controller] when it has no subscribers left.
StreamSubscription subscription = _subscription;
_subscription = null;
+ _controller = null; // Marks the stream as no longer listenable.
subscription.cancel();
- if (_pending != null) _pending.cancelSchedule();
- }
-
- void _forEachListener(void action(_MultiplexerSubscription listener)) {
- int eventId = _eventId;
- _eventId ^= 1;
- _isFiring = true;
- _MultiplexerLinkedList entry = _next;
- // Call each listener in order. A listener can be removed during any
- // other listener's event. During its own event it will only be marked
- // as "to be removed", and it will be handled after the event is done.
- while (!identical(entry, this)) {
- _MultiplexerSubscription listener = entry;
- if (listener._eventId == eventId) {
- listener._startFiring();
- action(listener);
- listener._endFiring(); // Also toggles the event id.
- }
- entry = listener._next;
- if (listener._removeAfterFiring) {
- _unlinkListener(listener);
- }
- }
- _isFiring = false;
- }
-
- void _add(T data) {
- if (_isFiring || _hasPending) {
- _StreamImplEvents pending = _pending;
- if (pending == null) pending = _pending = new _StreamImplEvents();
- pending.add(new _DelayedData(data));
- } else {
- _sendData(data);
- }
- }
-
- void _addError(Object error) {
- if (_isFiring || _hasPending) {
- _StreamImplEvents pending = _pending;
- if (pending == null) pending = _pending = new _StreamImplEvents();
- pending.add(new _DelayedError(error));
- } else {
- _sendError(error);
- }
- }
-
- void _close() {
- if (_isFiring || _hasPending) {
- _StreamImplEvents pending = _pending;
- if (pending == null) pending = _pending = new _StreamImplEvents();
- pending.add(const _DelayedDone());
- } else {
- _sendDone();
- }
- }
-
- void _sendData(T data) {
- _forEachListener((_MultiplexerSubscription listener) {
- listener._add(data);
- });
- if (_hasPending) {
- _pending.schedule(this);
- }
- }
-
- void _sendError(Object error) {
- _forEachListener((_MultiplexerSubscription listener) {
- listener._addError(error);
- });
- if (_hasPending) {
- _pending.schedule(this);
- }
- }
-
- void _sendDone() {
- _forEachListener((_MultiplexerSubscription listener) {
- listener._setRemoveAfterFiring();
- listener._close();
- });
}
}
-
/**
* Simple implementation of [StreamIterator].
*/
diff --git a/tests/lib/async/stream_controller_async_test.dart b/tests/lib/async/stream_controller_async_test.dart
index 1ed5143..65e658f 100644
--- a/tests/lib/async/stream_controller_async_test.dart
+++ b/tests/lib/async/stream_controller_async_test.dart
@@ -482,6 +482,25 @@
c.add(42);
c.close();
});
+
+ test("broadcast-controller-add-in-callback", () {
+ StreamController<int> c;
+ c = new StreamController(
+ onListen: expectAsync0(() {}),
+ onCancel: expectAsync0(() {
+ c.add(42);
+ })
+ );
+ var sub;
+ sub = c.stream.asBroadcastStream().listen(expectAsync1((v) {
+ Expect.equals(37, v);
+ c.add(21);
+ sub.cancel();
+ }));
+ c.add(37); // Triggers listener, which adds 21 and removes itself.
+ // Removing listener triggers onCancel which adds another 42.
+ // Both 21 and 42 are lost because there are no listeners.
+ });
}
main() {