// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import "dart:async";

import "package:async/async.dart";
import "package:test/test.dart";

import "utils.dart";

main() {
  var completer;
  setUp(() {
    completer = new StreamSinkCompleter();
  });

  group("when a stream is linked before events are added", () {
    test("data events are forwarded", () {
      var sink = new TestSink();
      completer.setDestinationSink(sink);
      completer.sink..add(1)..add(2)..add(3)..add(4);

      expect(sink.results[0].asValue.value, equals(1));
      expect(sink.results[1].asValue.value, equals(2));
      expect(sink.results[2].asValue.value, equals(3));
      expect(sink.results[3].asValue.value, equals(4));
    });

    test("error events are forwarded", () {
      var sink = new TestSink();
      completer.setDestinationSink(sink);
      completer.sink..addError("oh no")..addError("that's bad");

      expect(sink.results[0].asError.error, equals("oh no"));
      expect(sink.results[1].asError.error, equals("that's bad"));
    });

    test("addStream is forwarded", () async {
      var sink = new TestSink();
      completer.setDestinationSink(sink);

      var controller = new StreamController();
      completer.sink.addStream(controller.stream);

      controller.add(1);
      controller.addError("oh no");
      controller.add(2);
      controller.addError("that's bad");
      await flushMicrotasks();

      expect(sink.results[0].asValue.value, equals(1));
      expect(sink.results[1].asError.error, equals("oh no"));
      expect(sink.results[2].asValue.value, equals(2));
      expect(sink.results[3].asError.error, equals("that's bad"));
      expect(sink.isClosed, isFalse);

      controller.close();
      await flushMicrotasks();
      expect(sink.isClosed, isFalse);
    });

    test("close() is forwarded", () {
      var sink = new TestSink();
      completer.setDestinationSink(sink);
      completer.sink.close();
      expect(sink.isClosed, isTrue);
    });

    test("the future from the inner close() is returned", () async {
      var closeCompleter = new Completer();
      var sink = new TestSink(onDone: () => closeCompleter.future);
      completer.setDestinationSink(sink);

      var closeCompleted = false;
      completer.sink.close().then(expectAsync1((_) {
        closeCompleted = true;
      }));

      await flushMicrotasks();
      expect(closeCompleted, isFalse);

      closeCompleter.complete();
      await flushMicrotasks();
      expect(closeCompleted, isTrue);
    });

    test("errors are forwarded from the inner close()", () {
      var sink = new TestSink(onDone: () => throw "oh no");
      completer.setDestinationSink(sink);
      expect(completer.sink.done, throwsA("oh no"));
      expect(completer.sink.close(), throwsA("oh no"));
    });

    test("errors aren't top-leveled if only close() is listened to", () async {
      var sink = new TestSink(onDone: () => throw "oh no");
      completer.setDestinationSink(sink);
      expect(completer.sink.close(), throwsA("oh no"));

      // Give the event loop a chance to top-level errors if it's going to.
      await flushMicrotasks();
    });

    test("errors aren't top-leveled if only done is listened to", () async {
      var sink = new TestSink(onDone: () => throw "oh no");
      completer.setDestinationSink(sink);
      completer.sink.close();
      expect(completer.sink.done, throwsA("oh no"));

      // Give the event loop a chance to top-level errors if it's going to.
      await flushMicrotasks();
    });
  });

  group("when a stream is linked after events are added", () {
    test("data events are forwarded", () async {
      completer.sink..add(1)..add(2)..add(3)..add(4);
      await flushMicrotasks();

      var sink = new TestSink();
      completer.setDestinationSink(sink);
      await flushMicrotasks();

      expect(sink.results[0].asValue.value, equals(1));
      expect(sink.results[1].asValue.value, equals(2));
      expect(sink.results[2].asValue.value, equals(3));
      expect(sink.results[3].asValue.value, equals(4));
    });

    test("error events are forwarded", () async {
      completer.sink..addError("oh no")..addError("that's bad");
      await flushMicrotasks();

      var sink = new TestSink();
      completer.setDestinationSink(sink);
      await flushMicrotasks();

      expect(sink.results[0].asError.error, equals("oh no"));
      expect(sink.results[1].asError.error, equals("that's bad"));
    });

    test("addStream is forwarded", () async {
      var controller = new StreamController();
      completer.sink.addStream(controller.stream);

      controller.add(1);
      controller.addError("oh no");
      controller.add(2);
      controller.addError("that's bad");
      controller.close();
      await flushMicrotasks();

      var sink = new TestSink();
      completer.setDestinationSink(sink);
      await flushMicrotasks();

      expect(sink.results[0].asValue.value, equals(1));
      expect(sink.results[1].asError.error, equals("oh no"));
      expect(sink.results[2].asValue.value, equals(2));
      expect(sink.results[3].asError.error, equals("that's bad"));
      expect(sink.isClosed, isFalse);
    });

    test("close() is forwarded", () async {
      completer.sink.close();
      await flushMicrotasks();

      var sink = new TestSink();
      completer.setDestinationSink(sink);
      await flushMicrotasks();

      expect(sink.isClosed, isTrue);
    });

    test("the future from the inner close() is returned", () async {
      var closeCompleted = false;
      completer.sink.close().then(expectAsync1((_) {
        closeCompleted = true;
      }));
      await flushMicrotasks();

      var closeCompleter = new Completer();
      var sink = new TestSink(onDone: () => closeCompleter.future);
      completer.setDestinationSink(sink);
      await flushMicrotasks();
      expect(closeCompleted, isFalse);

      closeCompleter.complete();
      await flushMicrotasks();
      expect(closeCompleted, isTrue);
    });

    test("errors are forwarded from the inner close()", () async {
      expect(completer.sink.done, throwsA("oh no"));
      expect(completer.sink.close(), throwsA("oh no"));
      await flushMicrotasks();

      var sink = new TestSink(onDone: () => throw "oh no");
      completer.setDestinationSink(sink);
    });

    test("errors aren't top-leveled if only close() is listened to", () async {
      expect(completer.sink.close(), throwsA("oh no"));
      await flushMicrotasks();

      var sink = new TestSink(onDone: () => throw "oh no");
      completer.setDestinationSink(sink);

      // Give the event loop a chance to top-level errors if it's going to.
      await flushMicrotasks();
    });

    test("errors aren't top-leveled if only done is listened to", () async {
      completer.sink.close();
      expect(completer.sink.done, throwsA("oh no"));
      await flushMicrotasks();

      var sink = new TestSink(onDone: () => throw "oh no");
      completer.setDestinationSink(sink);

      // Give the event loop a chance to top-level errors if it's going to.
      await flushMicrotasks();
    });
  });

  test("the sink is closed, the destination is set, then done is read",
      () async {
    expect(completer.sink.close(), completes);
    await flushMicrotasks();

    completer.setDestinationSink(new TestSink());
    await flushMicrotasks();

    expect(completer.sink.done, completes);
  });

  test("done is read, the destination is set, then the sink is closed",
      () async {
    expect(completer.sink.done, completes);
    await flushMicrotasks();

    completer.setDestinationSink(new TestSink());
    await flushMicrotasks();

    expect(completer.sink.close(), completes);
  });

  group("fromFuture()", () {
    test("with a successful completion", () async {
      var futureCompleter = new Completer<StreamSink>();
      var sink = StreamSinkCompleter.fromFuture(futureCompleter.future);
      sink.add(1);
      sink.add(2);
      sink.add(3);
      sink.close();

      var testSink = new TestSink();
      futureCompleter.complete(testSink);
      await testSink.done;

      expect(testSink.results[0].asValue.value, equals(1));
      expect(testSink.results[1].asValue.value, equals(2));
      expect(testSink.results[2].asValue.value, equals(3));
    });

    test("with an error", () async {
      var futureCompleter = new Completer<StreamSink>();
      var sink = StreamSinkCompleter.fromFuture(futureCompleter.future);
      expect(sink.done, throwsA("oh no"));
      futureCompleter.completeError("oh no");
    });
  });

  group("setError()", () {
    test("produces a closed sink with the error", () {
      completer.setError("oh no");
      expect(completer.sink.done, throwsA("oh no"));
      expect(completer.sink.close(), throwsA("oh no"));
    });

    test("produces an error even if done was accessed earlier", () async {
      expect(completer.sink.done, throwsA("oh no"));
      expect(completer.sink.close(), throwsA("oh no"));
      await flushMicrotasks();

      completer.setError("oh no");
    });
  });

  test("doesn't allow the destination sink to be set multiple times", () {
    completer.setDestinationSink(new TestSink());
    expect(
        () => completer.setDestinationSink(new TestSink()), throwsStateError);
    expect(
        () => completer.setDestinationSink(new TestSink()), throwsStateError);
  });
}
