blob: caf18e1856e502c44c1b3610b014155f324638ae [file] [log] [blame]
// Copyright (c) 2022, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
/// A single variable with a stream emitting the variable's changing values.
///
/// Contains a single [value] which can be read and written as any other
/// variable.
///
/// Whenever the value is set, the new value is emitted on the
/// [updates] stream.
/// Further, any new listener on the [updates] stream is initially sent the
/// current [value].
///
/// Consider using immutable types for [value] since listeners will not
/// be notified if the internal state of [value] changes, only if
/// the [value] variable is written to.
///
/// Example:
/// ```dart
/// class ToggleWidget {
/// final ObservableValue<bool> _isVisible = ObservableValue(true);
///
/// // ...
///
/// Stream<bool> get visibilityChanges => _isVisible.updates;
///
/// void toggleVisibility() {
/// _isVisible.value ^= true; // XOR with true toggles value.
/// }
/// }
/// ```
abstract class ObservableValue<T> {
/// The current value.
///
/// The [updates] stream emits the current value when listened to,
/// and the new value when `value` is set to a new value.
abstract T value;
/// A stream emitting the current value of [value].
///
/// New listeners will receive an event containing the current value
/// of [value] as soon as possible after listening.
/// Every time [value] changes, they will then receive an event
/// with the new value.
///
/// This stream can be listened to multiple times.
/// It will never emit an error event.
/// _It's not a broadcast stream, because cancelling a subscription
/// on the stream, and then quickly listening again, may cause the
/// same event (the current value) to be seen twice._
abstract final Stream<T> updates;
/// Creates an observable value with the given initial value.
factory ObservableValue(T initialValue) = _ObservableValue<T>;
// Possible method to add:
//
// /// Prevents further updates to [value].
// ///
// /// The [value] can no longer be set.
// ///
// /// The [updates] stream will close current listeners,
// /// and close new listeners after sending them the current value.
// void freeze();
}
/// Default implementation of [ObservableValue].
class _ObservableValue<T> implements ObservableValue<T> {
/// The stored value.
T _value;
/// Listeners waiting for updates.
final List<MultiStreamController<T>> _listeners = [];
/// The update stream, created when the first listener is added.
Stream<T>? _stream;
_ObservableValue(this._value);
@override
T get value => _value;
@override
set value(T value) {
_value = value;
_notify(value);
}
@override
Stream<T> get updates => _stream ??= Stream<T>.multi((controller) {
controller.add(_value);
_listeners.add(controller);
});
void _notify(T value) {
// Sends event to active listeners, and compacts listener list
// to only contain listeners which are still active.
// Since `add` is asynchronous, this loop will not call any user code.
var activeCount = 0;
for (var i = 0; i < _listeners.length; i++) {
var controller = _listeners[i];
if (controller.hasListener) {
controller.add(value); // Async event, so no synchronous side effects.
if (activeCount < i) {
_listeners[activeCount] = controller;
}
activeCount++;
}
}
if (activeCount < _listeners.length) _listeners.length = activeCount;
}
}