// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';

import 'package:async/async.dart';

import 'src/guarantee_channel.dart';
import 'src/close_guarantee_channel.dart';
import 'src/stream_channel_transformer.dart';

export 'src/delegating_stream_channel.dart';
export 'src/disconnector.dart';
export 'src/isolate_channel.dart';
export 'src/json_document_transformer.dart';
export 'src/multi_channel.dart';
export 'src/stream_channel_completer.dart';
export 'src/stream_channel_controller.dart';
export 'src/stream_channel_transformer.dart';

/// An abstract class representing a two-way communication channel.
///
/// Users should consider the [stream] emitting a "done" event to be the
/// canonical indicator that the channel has closed. If they wish to close the
/// channel, they should close the [sink]—canceling the stream subscription is
/// not sufficient. Protocol errors may be emitted through the stream or through
/// [sink].done, depending on their underlying cause. Note that the sink may
/// silently drop events if the channel closes before [sink].close is called.
///
/// Implementations are strongly encouraged to mix in or extend
/// [StreamChannelMixin] to get default implementations of the various instance
/// methods. Adding new methods to this interface will not be considered a
/// breaking change if implementations are also added to [StreamChannelMixin].
///
/// Implementations must provide the following guarantees:
///
/// * The stream is single-subscription, and must follow all the guarantees of
///   single-subscription streams.
///
/// * Closing the sink causes the stream to close before it emits any more
///   events.
///
/// * After the stream closes, the sink is automatically closed. If this
///   happens, sink methods should silently drop their arguments until
///   [sink].close is called.
///
/// * If the stream closes before it has a listener, the sink should silently
///   drop events if possible.
///
/// * Canceling the stream's subscription has no effect on the sink. The channel
///   must still be able to respond to the other endpoint closing the channel
///   even after the subscription has been canceled.
///
/// * The sink *either* forwards errors to the other endpoint *or* closes as
///   soon as an error is added and forwards that error to the [sink].done
///   future.
///
/// These guarantees allow users to interact uniformly with all implementations,
/// and ensure that either endpoint closing the stream produces consistent
/// behavior.
abstract class StreamChannel<T> {
  /// The single-subscription stream that emits values from the other endpoint.
  Stream<T> get stream;

  /// The sink for sending values to the other endpoint.
  StreamSink<T> get sink;

  /// Creates a new [StreamChannel] that communicates over [stream] and [sink].
  ///
  /// Note that this stream/sink pair must provide the guarantees listed in the
  /// [StreamChannel] documentation. If they don't do so natively,
  /// [StreamChannel.withGuarantees] should be used instead.
  factory StreamChannel(Stream<T> stream, StreamSink<T> sink) =>
      new _StreamChannel<T>(stream, sink);

  /// Creates a new [StreamChannel] that communicates over [stream] and [sink].
  ///
  /// Unlike [new StreamChannel], this enforces the guarantees listed in the
  /// [StreamChannel] documentation. This makes it somewhat less efficient than
  /// just wrapping a stream and a sink directly, so [new StreamChannel] should
  /// be used when the guarantees are provided natively.
  ///
  /// If [allowSinkErrors] is `false`, errors are not allowed to be passed to
  /// [sink]. If any are, the connection will close and the error will be
  /// forwarded to [sink].done.
  factory StreamChannel.withGuarantees(Stream<T> stream, StreamSink<T> sink,
          {bool allowSinkErrors: true}) =>
      new GuaranteeChannel(stream, sink, allowSinkErrors: allowSinkErrors);

  /// Creates a new [StreamChannel] that communicates over [stream] and [sink].
  ///
  /// This specifically enforces the second guarantee: closing the sink causes
  /// the stream to close before it emits any more events. This guarantee is
  /// invalidated when an asynchronous gap is added between the original
  /// stream's event dispatch and the returned stream's, for example by
  /// transforming it with a [StreamTransformer]. This is a lighter-weight way
  /// of preserving that guarantee in particular than
  /// [StreamChannel.withGuarantees].
  factory StreamChannel.withCloseGuarantee(
          Stream<T> stream, StreamSink<T> sink) =>
      new CloseGuaranteeChannel(stream, sink);

  /// Connects this to [other], so that any values emitted by either are sent
  /// directly to the other.
  void pipe(StreamChannel<T> other);

  /// Transforms this using [transformer].
  ///
  /// This is identical to calling `transformer.bind(channel)`.
  StreamChannel<S> transform<S>(StreamChannelTransformer<S, T> transformer);

  /// Transforms only the [stream] component of this using [transformer].
  StreamChannel<T> transformStream(StreamTransformer<T, T> transformer);

  /// Transforms only the [sink] component of this using [transformer].
  StreamChannel<T> transformSink(StreamSinkTransformer<T, T> transformer);

  /// Returns a copy of this with [stream] replaced by [change]'s return
  /// value.
  StreamChannel<T> changeStream(Stream<T> change(Stream<T> stream));

  /// Returns a copy of this with [sink] replaced by [change]'s return
  /// value.
  StreamChannel<T> changeSink(StreamSink<T> change(StreamSink<T> sink));

  /// Returns a copy of this with the generic type coerced to [S].
  ///
  /// If any events emitted by [stream] aren't of type [S], they're converted
  /// into [CastError] events. Similarly, if any events are added to [sink] that
  /// aren't of type [S], a [CastError] is thrown.
  StreamChannel<S> cast<S>();
}

/// An implementation of [StreamChannel] that simply takes a stream and a sink
/// as parameters.
///
/// This is distinct from [StreamChannel] so that it can use
/// [StreamChannelMixin].
class _StreamChannel<T> extends StreamChannelMixin<T> {
  final Stream<T> stream;
  final StreamSink<T> sink;

  _StreamChannel(this.stream, this.sink);
}

/// A mixin that implements the instance methods of [StreamChannel] in terms of
/// [stream] and [sink].
abstract class StreamChannelMixin<T> implements StreamChannel<T> {
  void pipe(StreamChannel<T> other) {
    stream.pipe(other.sink);
    other.stream.pipe(sink);
  }

  StreamChannel<S> transform<S>(StreamChannelTransformer<S, T> transformer) =>
      transformer.bind(this);

  StreamChannel<T> transformStream(StreamTransformer<T, T> transformer) =>
      changeStream(transformer.bind);

  StreamChannel<T> transformSink(StreamSinkTransformer<T, T> transformer) =>
      changeSink(transformer.bind);

  StreamChannel<T> changeStream(Stream<T> change(Stream<T> stream)) =>
      new StreamChannel.withCloseGuarantee(change(stream), sink);

  StreamChannel<T> changeSink(StreamSink<T> change(StreamSink<T> sink)) =>
      new StreamChannel.withCloseGuarantee(stream, change(sink));

  StreamChannel<S> cast<S>() => new StreamChannel(
      DelegatingStream.typed(stream), DelegatingStreamSink.typed(sink));
}
