// Copyright (c) 2015, the Dart project authors.  Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';

import 'future_group.dart';
import 'result.dart';

/// A class that splits a single source stream into an arbitrary number of
/// (single-subscription) streams (called "branch") that emit the same events.
///
/// Each branch will emit all the same values and errors as the source stream,
/// regardless of which values have been emitted on other branches. This means
/// that the splitter stores every event that has been emitted so far, which may
/// consume a lot of memory. The user can call [close] to indicate that no more
/// branches will be created, and this memory will be released.
///
/// The source stream is only listened to once a branch is created *and listened
/// to*. It's paused when all branches are paused *or when all branches are
/// canceled*, and resumed once there's at least one branch that's listening and
/// unpaused. It's not canceled unless no branches are listening and [close] has
/// been called.
class StreamSplitter<T> {
  /// The wrapped stream.
  final Stream<T> _stream;

  /// The subscription to [_stream].
  ///
  /// This will be `null` until a branch has a listener.
  StreamSubscription<T> _subscription;

  /// The buffer of events or errors that have already been emitted by
  /// [_stream].
  final _buffer = new List<Result<T>>();

  /// The controllers for branches that are listening for future events from
  /// [_stream].
  ///
  /// Once a branch is canceled, it's removed from this list. When [_stream] is
  /// done, all branches are removed.
  final _controllers = new Set<StreamController<T>>();

  /// A group of futures returned by [close].
  ///
  /// This is used to ensure that [close] doesn't complete until all
  /// [StreamController.close] and [StreamSubscription.cancel] calls complete.
  final _closeGroup = new FutureGroup();

  /// Whether [_stream] is done emitting events.
  var _isDone = false;

  /// Whether [close] has been called.
  var _isClosed = false;

  /// Splits [stream] into [count] identical streams.
  ///
  /// [count] defaults to 2. This is the same as creating [count] branches and
  /// then closing the [StreamSplitter].
  static List<Stream/*<T>*/> splitFrom/*<T>*/(Stream/*<T>*/ stream,
      [int count]) {
    if (count == null) count = 2;
    var splitter = new StreamSplitter/*<T>*/(stream);
    var streams = new List<Stream>.generate(count, (_) => splitter.split());
    splitter.close();
    return streams;
  }

  StreamSplitter(this._stream);

  /// Returns a single-subscription stream that's a copy of the input stream.
  ///
  /// This will throw a [StateError] if [close] has been called.
  Stream<T> split() {
    if (_isClosed) {
      throw new StateError("Can't call split() on a closed StreamSplitter.");
    }

    var controller = new StreamController<T>(
        onListen: _onListen,
        onPause: _onPause,
        onResume: _onResume);
    controller.onCancel = () => _onCancel(controller);

    for (var result in _buffer) {
      result.addTo(controller);
    }

    if (_isDone) {
      _closeGroup.add(controller.close());
    } else {
      _controllers.add(controller);
    }

    return controller.stream;
  }

  /// Indicates that no more branches will be requested via [split].
  ///
  /// This clears the internal buffer of events. If there are no branches or all
  /// branches have been canceled, this cancels the subscription to the input
  /// stream.
  ///
  /// Returns a [Future] that completes once all events have been processed by
  /// all branches and (if applicable) the subscription to the input stream has
  /// been canceled.
  Future close() {
    if (_isClosed) return _closeGroup.future;
    _isClosed = true;

    _buffer.clear();
    if (_controllers.isEmpty) _cancelSubscription();

    return _closeGroup.future;
  }

  /// Cancel [_subscription] and close [_closeGroup].
  ///
  /// This should be called after all the branches' subscriptions have been
  /// canceled and the splitter has been closed. In that case, we won't use the
  /// events from [_subscription] any more, since there's nothing to pipe them
  /// to and no more branches will be created. If [_subscription] is done,
  /// canceling it will be a no-op.
  ///
  /// This may also be called before any branches have been created, in which
  /// case [_subscription] will be `null`.
  void _cancelSubscription() {
    assert(_controllers.isEmpty);
    assert(_isClosed);

    var future = null;
    if (_subscription != null) future = _subscription.cancel();
    if (future != null) _closeGroup.add(future);
    _closeGroup.close();
  }

  // StreamController events

  /// Subscribe to [_stream] if we haven't yet done so, and resume the
  /// subscription if we have.
  void _onListen() {
    if (_isDone) return;

    if (_subscription != null) {
      // Resume the subscription in case it was paused, either because all the
      // controllers were paused or because the last one was canceled. If it
      // wasn't paused, this will be a no-op.
      _subscription.resume();
    } else {
      _subscription = _stream.listen(
          _onData, onError: _onError, onDone: _onDone);
    }
  }

  /// Pauses [_subscription] if every controller is paused.
  void _onPause() {
    if (!_controllers.every((controller) => controller.isPaused)) return;
    _subscription.pause();
  }

  /// Resumes [_subscription].
  ///
  /// If [_subscription] wasn't paused, this is a no-op.
  void _onResume() {
    _subscription.resume();
  }

  /// Removes [controller] from [_controllers] and cancels or pauses
  /// [_subscription] as appropriate.
  ///
  /// Since the controller emitting a done event will cause it to register as
  /// canceled, this is the only way that a controller is ever removed from
  /// [_controllers].
  void _onCancel(StreamController controller) {
    _controllers.remove(controller);
    if (_controllers.isNotEmpty) return;

    if (_isClosed) {
      _cancelSubscription();
    } else {
      _subscription.pause();
    }
  }

  // Stream events

  /// Buffers [data] and passes it to [_controllers].
  void _onData(T data) {
    if (!_isClosed) _buffer.add(new Result.value(data));
    for (var controller in _controllers) {
      controller.add(data);
    }
  }

  /// Buffers [error] and passes it to [_controllers].
  void _onError(Object error, StackTrace stackTrace) {
    if (!_isClosed) _buffer.add(new Result.error(error, stackTrace));
    for (var controller in _controllers) {
      controller.addError(error, stackTrace);
    }
  }

  /// Marks [_controllers] as done.
  void _onDone() {
    _isDone = true;
    for (var controller in _controllers) {
      _closeGroup.add(controller.close());
    }
  }
}
