// Copyright (c) 2011, the Dart project authors.  Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

// Test the basic StreamController and StreamController.broadcast.
library stream_controller_async_test;

import 'dart:async';
import 'dart:isolate';
import "package:expect/expect.dart";
import 'package:unittest/unittest.dart';
import 'event_helper.dart';
import 'stream_state_helper.dart';

void cancelSub(StreamSubscription sub) { sub.cancel(); }

testController() {
  // Test fold
  test("StreamController.fold", () {
    StreamController c = new StreamController();
    Stream stream = c.stream.asBroadcastStream(onCancel: cancelSub);
    stream.fold(0, (a,b) => a + b)
     .then(expectAsync((int v) {
        Expect.equals(42, v);
    }));
    c.add(10);
    c.add(32);
    c.close();
  });

  test("StreamController.fold throws", () {
    StreamController c = new StreamController();
    Stream stream = c.stream.asBroadcastStream(onCancel: cancelSub);
    stream.fold(0, (a,b) { throw "Fnyf!"; })
     .catchError(expectAsync((error) { Expect.equals("Fnyf!", error); }));
    c.add(42);
  });
}

testSingleController() {
  test("Single-subscription StreamController.fold", () {
    StreamController c = new StreamController();
    Stream stream = c.stream;
    stream.fold(0, (a,b) => a + b)
    .then(expectAsync((int v) { Expect.equals(42, v); }));
    c.add(10);
    c.add(32);
    c.close();
  });

  test("Single-subscription StreamController.fold throws", () {
    StreamController c = new StreamController();
    Stream stream = c.stream;
    stream.fold(0, (a,b) { throw "Fnyf!"; })
            .catchError(expectAsync((e) { Expect.equals("Fnyf!", e); }));
    c.add(42);
  });

  test("Single-subscription StreamController events are buffered when"
       " there is no subscriber",
       () {
    StreamController c = new StreamController();
    EventSink sink = c.sink;
    Stream stream = c.stream;
    int counter = 0;
    sink.add(1);
    sink.add(2);
    sink.close();
    stream.listen(
      (data) {
        counter += data;
      },
      onDone: expectAsync(() {
        Expect.equals(3, counter);
      }));
  });
}

testExtraMethods() {
  Events sentEvents = new Events()..add(7)..add(9)..add(13)..add(87)..close();

  test("forEach", () {
    StreamController c = new StreamController();
    Events actualEvents = new Events();
    Future f = c.stream.forEach(actualEvents.add);
    f.then(expectAsync((_) {
      actualEvents.close();
      Expect.listEquals(sentEvents.events, actualEvents.events);
    }));
    sentEvents.replay(c);
  });

  test("forEachError", () {
    Events sentEvents = new Events()..add(7)..error("bad")..add(87)..close();
    StreamController c = new StreamController();
    Events actualEvents = new Events();
    Future f = c.stream.forEach(actualEvents.add);
    f.catchError(expectAsync((error) {
      Expect.equals("bad", error);
      Expect.listEquals((new Events()..add(7)).events, actualEvents.events);
    }));
    sentEvents.replay(c);
  });

  test("forEachError2", () {
    Events sentEvents = new Events()..add(7)..add(9)..add(87)..close();
    StreamController c = new StreamController();
    Events actualEvents = new Events();
    Future f = c.stream.forEach((x) {
      if (x == 9) throw "bad";
      actualEvents.add(x);
    });
    f.catchError(expectAsync((error) {
      Expect.equals("bad", error);
      Expect.listEquals((new Events()..add(7)).events, actualEvents.events);
    }));
    sentEvents.replay(c);
  });

  test("firstWhere", () {
    StreamController c = new StreamController();
    Future f = c.stream.firstWhere((x) => (x % 3) == 0);
    f.then(expectAsync((v) { Expect.equals(9, v); }));
    sentEvents.replay(c);
  });

  test("firstWhere 2", () {
    StreamController c = new StreamController();
    Future f = c.stream.firstWhere((x) => (x % 4) == 0);
    f.catchError(expectAsync((e) {}));
    sentEvents.replay(c);
  });

  test("firstWhere 3", () {
    StreamController c = new StreamController();
    Future f = c.stream.firstWhere((x) => (x % 4) == 0, defaultValue: () => 999);
    f.then(expectAsync((v) { Expect.equals(999, v); }));
    sentEvents.replay(c);
  });


  test("lastWhere", () {
    StreamController c = new StreamController();
    Future f = c.stream.lastWhere((x) => (x % 3) == 0);
    f.then(expectAsync((v) { Expect.equals(87, v); }));
    sentEvents.replay(c);
  });

  test("lastWhere 2", () {
    StreamController c = new StreamController();
    Future f = c.stream.lastWhere((x) => (x % 4) == 0);
    f.catchError(expectAsync((e) {}));
    sentEvents.replay(c);
  });

  test("lastWhere 3", () {
    StreamController c = new StreamController();
    Future f = c.stream.lastWhere((x) => (x % 4) == 0, defaultValue: () => 999);
    f.then(expectAsync((v) { Expect.equals(999, v); }));
    sentEvents.replay(c);
  });

  test("singleWhere", () {
    StreamController c = new StreamController();
    Future f = c.stream.singleWhere((x) => (x % 9) == 0);
    f.then(expectAsync((v) { Expect.equals(9, v); }));
    sentEvents.replay(c);
  });

  test("singleWhere 2", () {
    StreamController c = new StreamController();
    Future f = c.stream.singleWhere((x) => (x % 3) == 0);  // Matches 9 and 87..
    f.catchError(expectAsync((error) { Expect.isTrue(error is StateError); }));
    sentEvents.replay(c);
  });

  test("first", () {
    StreamController c = new StreamController();
    Future f = c.stream.first;
    f.then(expectAsync((v) { Expect.equals(7, v);}));
    sentEvents.replay(c);
  });

  test("first empty", () {
    StreamController c = new StreamController();
    Future f = c.stream.first;
    f.catchError(expectAsync((error) { Expect.isTrue(error is StateError); }));
    Events emptyEvents = new Events()..close();
    emptyEvents.replay(c);
  });

  test("first error", () {
    StreamController c = new StreamController();
    Future f = c.stream.first;
    f.catchError(expectAsync((error) { Expect.equals("error", error); }));
    Events errorEvents = new Events()..error("error")..close();
    errorEvents.replay(c);
  });

  test("first error 2", () {
    StreamController c = new StreamController();
    Future f = c.stream.first;
    f.catchError(expectAsync((error) { Expect.equals("error", error); }));
    Events errorEvents = new Events()..error("error")..error("error2")..close();
    errorEvents.replay(c);
  });

  test("last", () {
    StreamController c = new StreamController();
    Future f = c.stream.last;
    f.then(expectAsync((v) { Expect.equals(87, v);}));
    sentEvents.replay(c);
  });

  test("last empty", () {
    StreamController c = new StreamController();
    Future f = c.stream.last;
    f.catchError(expectAsync((error) { Expect.isTrue(error is StateError); }));
    Events emptyEvents = new Events()..close();
    emptyEvents.replay(c);
  });

  test("last error", () {
    StreamController c = new StreamController();
    Future f = c.stream.last;
    f.catchError(expectAsync((error) { Expect.equals("error", error); }));
    Events errorEvents = new Events()..error("error")..close();
    errorEvents.replay(c);
  });

  test("last error 2", () {
    StreamController c = new StreamController();
    Future f = c.stream.last;
    f.catchError(expectAsync((error) { Expect.equals("error", error); }));
    Events errorEvents = new Events()..error("error")..error("error2")..close();
    errorEvents.replay(c);
  });

  test("elementAt", () {
    StreamController c = new StreamController();
    Future f = c.stream.elementAt(2);
    f.then(expectAsync((v) { Expect.equals(13, v);}));
    sentEvents.replay(c);
  });

  test("elementAt 2", () {
    StreamController c = new StreamController();
    Future f = c.stream.elementAt(20);
    f.catchError(expectAsync((error) { Expect.isTrue(error is RangeError); }));
    sentEvents.replay(c);
  });

  test("drain", () {
    StreamController c = new StreamController();
    Future f = c.stream.drain();
    f.then(expectAsync((v) { Expect.equals(null, v);}));
    sentEvents.replay(c);
  });

  test("drain error", () {
    StreamController c = new StreamController();
    Future f = c.stream.drain();
    f.catchError(expectAsync((error) { Expect.equals("error", error); }));
    Events errorEvents = new Events()..error("error")..error("error2")..close();
    errorEvents.replay(c);
  });

}

testPause() {
  test("pause event-unpause", () {

    StreamProtocolTest test = new StreamProtocolTest();
    Completer completer = new Completer();
    test..expectListen()
        ..expectData(42, () { test.pause(completer.future); })
        ..expectPause(() {
            completer.complete(null);
          })
        ..expectData(43)
        ..expectData(44)
        ..expectCancel()
        ..expectDone(test.terminate);
    test.listen();
    test.add(42);
    test.add(43);
    test.add(44);
    test.close();
  });

  test("pause twice event-unpause", () {
    StreamProtocolTest test = new StreamProtocolTest();
    Completer completer = new Completer();
    Completer completer2 = new Completer();
    test..expectListen()
        ..expectData(42, () {
            test.pause(completer.future);
            test.pause(completer2.future);
          })
        ..expectPause(() {
            completer.future.then(completer2.complete);
            completer.complete(null);
          })
        ..expectData(43)
        ..expectData(44)
        ..expectCancel()
        ..expectDone(test.terminate);
    test..listen()
        ..add(42)
        ..add(43)
        ..add(44)
        ..close();
  });

  test("pause twice direct-unpause", () {
    StreamProtocolTest test = new StreamProtocolTest();
    test..expectListen()
        ..expectData(42, () {
            test.pause();
            test.pause();
          })
        ..expectPause(() {
            test.resume();
            test.resume();
          })
        ..expectData(43)
        ..expectData(44)
        ..expectCancel()
        ..expectDone(test.terminate);
    test..listen()
        ..add(42)
        ..add(43)
        ..add(44)
        ..close();
  });

  test("pause twice direct-event-unpause", () {
    StreamProtocolTest test = new StreamProtocolTest();
    Completer completer = new Completer();
    test..expectListen()
        ..expectData(42, () {
            test.pause();
            test.pause(completer.future);
            test.add(43);
            test.add(44);
            test.close();
          })
        ..expectPause(() {
            completer.future.then((v) => test.resume());
            completer.complete(null);
          })
        ..expectData(43)
        ..expectData(44)
        ..expectCancel()
        ..expectDone(test.terminate);
    test..listen()
        ..add(42);
  });
}

class TestError { const TestError(); }

testRethrow() {
  TestError error = const TestError();

  testStream(name, streamValueTransform) {
    test("rethrow-$name-value", () {
      StreamController c = new StreamController();
      Stream s = streamValueTransform(c.stream, (v) { throw error; });
      s.listen((_) { Expect.fail("unexpected value"); }, onError: expectAsync(
          (e) { Expect.identical(error, e); }));
      c.add(null);
      c.close();
    });
  }

  testStreamError(name, streamErrorTransform) {
    test("rethrow-$name-error", () {
      StreamController c = new StreamController();
      Stream s = streamErrorTransform(c.stream, (e) { throw error; });
      s.listen((_) { Expect.fail("unexpected value"); }, onError: expectAsync(
          (e) { Expect.identical(error, e); }));
      c.addError("SOME ERROR");
      c.close();
    });
  }

  testFuture(name, streamValueTransform) {
    test("rethrow-$name-value", () {
      StreamController c = new StreamController();
      Future f = streamValueTransform(c.stream, (v) { throw error; });
      f.then((v) { Expect.fail("unreachable"); },
             onError: expectAsync((e) { Expect.identical(error, e); }));
      // Need two values to trigger compare for reduce.
      c.add(0);
      c.add(1);
      c.close();
    });
  }

  testStream("where", (s, act) => s.where(act));
  testStream("map", (s, act) => s.map(act));
  testStream("expand", (s, act) => s.expand(act));
  testStream("where", (s, act) => s.where(act));
  testStreamError("handleError", (s, act) => s.handleError(act));
  testStreamError("handleTest", (s, act) => s.handleError((v) {}, test: act));
  testFuture("forEach", (s, act) => s.forEach(act));
  testFuture("every", (s, act) => s.every(act));
  testFuture("any", (s, act) => s.any(act));
  testFuture("reduce", (s, act) => s.reduce((a,b) => act(b)));
  testFuture("fold", (s, act) => s.fold(0, (a,b) => act(b)));
  testFuture("drain", (s, act) => s.drain().then(act));
}

void testBroadcastController() {
  test("broadcast-controller-basic", () {
    StreamProtocolTest test = new StreamProtocolTest.broadcast();
    test..expectListen()
        ..expectData(42)
        ..expectCancel()
        ..expectDone(test.terminate);
    test..listen()
        ..add(42)
        ..close();
  });

  test("broadcast-controller-listen-twice", () {
    StreamProtocolTest test = new StreamProtocolTest.broadcast();
    test..expectListen()
        ..expectData(42, () {
            test.listen();
            test.add(37);
            test.close();
          })
      // Order is not guaranteed between subscriptions if not sync.
        ..expectData(37)
        ..expectData(37)
        ..expectDone()
        ..expectCancel()
        ..expectDone(test.terminate);
    test.listen();
    test.add(42);
  });

  test("broadcast-controller-listen-twice-non-overlap", () {
    StreamProtocolTest test = new StreamProtocolTest.broadcast();
    test
    ..expectListen(() {
      test.add(42);
    })
    ..expectData(42, () {
      test.cancel();
    })
    ..expectCancel(() {
      test.listen();
    })..expectListen(() {
      test.add(37);
    })
    ..expectData(37, () {
      test.close();
    })
    ..expectCancel()
    ..expectDone(test.terminate);
    test.listen();
  });

  test("broadcast-controller-individual-pause", () {
    StreamProtocolTest test = new StreamProtocolTest.broadcast();
    var sub1;
    test..expectListen()
        ..expectData(42)
        ..expectData(42, () { sub1.pause(); })
        ..expectData(43, () {
      sub1.cancel();
      test.listen();
      test.add(44);
      test.expectData(44);
      test.expectData(44, test.terminate);
    });
    sub1 = test.listen();
    test.listen();
    test.add(42);
    test.add(43);
  });

  test("broadcast-controller-add-in-callback", () {
    StreamProtocolTest test = new StreamProtocolTest.broadcast();
    test.expectListen();
    var sub = test.listen();
    test.add(42);
    sub.expectData(42, () {
      test.add(87);
      sub.cancel();
    });
    test.expectCancel(() {
      test.add(37);
      test.terminate();
    });
  });
}

void testAsBroadcast() {
  test("asBroadcast-not-canceled", () {
    StreamProtocolTest test = new StreamProtocolTest.asBroadcast();
    var sub;
    test..expectListen()
        ..expectBroadcastListen((_) {
            test.add(42);
          })
        ..expectData(42, () {
            sub.cancel();
          })
        ..expectBroadcastCancel((_) {
            sub = test.listen();
          })
        ..expectBroadcastListen((_) {
            test.terminate();
          });
    sub = test.listen();
  });

  test("asBroadcast-canceled", () {
    StreamProtocolTest test = new StreamProtocolTest.asBroadcast();
    var sub;
    test..expectListen()
        ..expectBroadcastListen((_) {
            test.add(42);
          })
        ..expectData(42, () {
            sub.cancel();
          })
        ..expectBroadcastCancel((originalSub) {
            originalSub.cancel();
          })
        ..expectCancel(test.terminate);
    sub = test.listen();
  });

  test("asBroadcast-pause-original", () {
    StreamProtocolTest test = new StreamProtocolTest.asBroadcast();
    var sub;
    test..expectListen()
        ..expectBroadcastListen((_) {
            test.add(42);
            test.add(43);
          })
        ..expectData(42, () {
            sub.cancel();
          })
        ..expectBroadcastCancel((originalSub) {
            originalSub.pause();  // Pause before sending 43 from original sub.
          })
        ..expectPause(() {
            sub = test.listen();
          })
        ..expectBroadcastListen((originalSub) {
            originalSub.resume();
          })
        ..expectData(43)
        ..expectResume(() {
            test.close();
          })
        ..expectCancel()
        ..expectDone()
        ..expectBroadcastCancel((_) => test.terminate());
    sub = test.listen();
  });
}

void testSink({bool sync, bool broadcast, bool asBroadcast}) {
  String type = "${sync?"S":"A"}${broadcast?"B":"S"}${asBroadcast?"aB":""}";
  test("$type-controller-sink", () {
    var done = expectAsync((){});
    var c = broadcast ? new StreamController.broadcast(sync: sync)
                      : new StreamController(sync: sync);
    var expected = new Events()
        ..add(42)..error("error")
        ..add(1)..add(2)..add(3)..add(4)..add(5)
        ..add(43)..close();
    var actual = new Events.capture(asBroadcast ? c.stream.asBroadcastStream()
                                                : c.stream);
    var sink = c.sink;
    sink.add(42);
    sink.addError("error");
    sink.addStream(new Stream.fromIterable([1, 2, 3, 4, 5]))
        .then((_) {
          sink.add(43);
          return sink.close();
        })
        .then((_) {
          Expect.listEquals(expected.events, actual.events);
          done();
        });
  });

  test("$type-controller-sink-canceled", () {
    var done = expectAsync((){});
    var c = broadcast ? new StreamController.broadcast(sync: sync)
                      : new StreamController(sync: sync);
    var expected = new Events()
        ..add(42)..error("error")
        ..add(1)..add(2)..add(3);
    var stream = asBroadcast ? c.stream.asBroadcastStream() : c.stream;
    var actual = new Events();
    var sub;
    // Cancel subscription after receiving "3" event.
    sub = stream.listen((v) {
      if (v == 3) sub.cancel();
      actual.add(v);
    }, onError: actual.error);
    var sink = c.sink;
    sink.add(42);
    sink.addError("error");
    sink.addStream(new Stream.fromIterable([1, 2, 3, 4, 5]))
        .then((_) {
          Expect.listEquals(expected.events, actual.events);
          // Close controller as well. It has no listener. If it is a broadcast
          // stream, it will still be open, and we read the "done" future before
          // closing. A normal stream is already done when its listener cancels.
          Future doneFuture = sink.done;
          sink.close();
          return doneFuture;
        })
        .then((_) {
          // No change in events.
          Expect.listEquals(expected.events, actual.events);
          done();
        });
  });

  test("$type-controller-sink-paused", () {
    var done = expectAsync((){});
    var c = broadcast ? new StreamController.broadcast(sync: sync)
                      : new StreamController(sync: sync);
    var expected = new Events()
        ..add(42)..error("error")
        ..add(1)..add(2)..add(3)
        ..add(4)..add(5)..add(43)..close();
    var stream = asBroadcast ? c.stream.asBroadcastStream() : c.stream;
    var actual = new Events();
    var sub;
    var pauseIsDone = false;
    sub = stream.listen(
        (v) {
          if (v == 3) {
            sub.pause(new Future.delayed(const Duration(milliseconds: 15),
                                         () { pauseIsDone = true; }));
          }
          actual.add(v);
        },
        onError: actual.error,
        onDone: actual.close);
    var sink = c.sink;
    sink.add(42);
    sink.addError("error");
    sink.addStream(new Stream.fromIterable([1, 2, 3, 4, 5]))
        .then((_) {
          sink.add(43);
          return sink.close();
        })
        .then((_) {
          if (asBroadcast || broadcast) {
            // The done-future of the sink completes when it passes
            // the done event to the asBroadcastStream controller, which is
            // before the final listener gets the event.
            // Wait for the done event to be *delivered* before testing the
            // events.
            actual.onDone(() {
              Expect.listEquals(expected.events, actual.events);
              done();
            });
          } else {
            Expect.listEquals(expected.events, actual.events);
            done();
          }
        });
  });

  test("$type-controller-addstream-error-stop", () {
    // Check that addStream defaults to ending after the first error.
    var done = expectAsync((){});
    StreamController c = broadcast
        ? new StreamController.broadcast(sync: sync)
        : new StreamController(sync: sync);
    Stream stream = asBroadcast ? c.stream.asBroadcastStream() : c.stream;
    var actual = new Events.capture(stream);

    var source = new Events();
    source..add(1)..add(2)..error("BAD")..add(3)..error("FAIL")..close();

    var expected = new Events()..add(1)..add(2)..error("BAD")..close();
    StreamController sourceController = new StreamController();
    c.addStream(sourceController.stream).then((_) {
      c.close().then((_) {
        Expect.listEquals(expected.events, actual.events);
        done();
      });
    });

    source.replay(sourceController);
  });

  test("$type-controller-addstream-error-forward", () {
    // Check that addStream with cancelOnError:false passes all data and errors
    // to the controller.
    var done = expectAsync((){});
    StreamController c = broadcast
        ? new StreamController.broadcast(sync: sync)
        : new StreamController(sync: sync);
    Stream stream = asBroadcast ? c.stream.asBroadcastStream() : c.stream;
    var actual = new Events.capture(stream);

    var source = new Events();
    source..add(1)..add(2)..addError("BAD")..add(3)..addError("FAIL")..close();

    StreamController sourceController = new StreamController();
    c.addStream(sourceController.stream, cancelOnError: false).then((_) {
      c.close().then((_) {
        Expect.listEquals(source.events, actual.events);
        done();
      });
    });

    source.replay(sourceController);
  });

  test("$type-controller-addstream-twice", () {
    // Using addStream twice on the same stream
    var done = expectAsync((){});
    StreamController c = broadcast
        ? new StreamController.broadcast(sync: sync)
        : new StreamController(sync: sync);
    Stream stream = asBroadcast ? c.stream.asBroadcastStream() : c.stream;
    var actual = new Events.capture(stream);

    // Streams of five events, throws on 3.
    Stream s1 = new Stream.fromIterable([1,2,3,4,5])
                    .map((x) => (x == 3 ? throw x : x));
    Stream s2 = new Stream.fromIterable([1,2,3,4,5])
                    .map((x) => (x == 3 ? throw x : x));

    Events expected = new Events();
    expected..add(1)..add(2)..error(3);
    expected..add(1)..add(2)..error(3)..add(4)..add(5);
    expected..close();

    c.addStream(s1).then((_) {
      c.addStream(s2, cancelOnError: false).then((_) {
        c.close().then((_) {
          Expect.listEquals(expected.events, actual.events);
          done();
        });
      });
    });
  });
}

main() {
  testController();
  testSingleController();
  testExtraMethods();
  testPause();
  testRethrow();
  testBroadcastController();
  testAsBroadcast();
  testSink(sync: true, broadcast: false, asBroadcast: false);
  testSink(sync: true, broadcast: false, asBroadcast: true);
  testSink(sync: true, broadcast: true, asBroadcast: false);
  testSink(sync: false, broadcast: false, asBroadcast: false);
  testSink(sync: false, broadcast: false, asBroadcast: true);
  testSink(sync: false, broadcast: true, asBroadcast: false);
}
