// Copyright (c) 2011, the Dart project authors.  Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

// @dart = 2.9

// Test the basic StreamController and StreamController.broadcast.
library stream_controller_async_test;

import 'dart:async';

import 'package:expect/expect.dart';
import 'package:async_helper/async_minitest.dart';

import 'event_helper.dart';
import 'stream_state_helper.dart';

void cancelSub(StreamSubscription sub) {
  sub.cancel();
}

testController() {
  // Test fold
  test("StreamController.fold", () {
    StreamController c = new StreamController();
    Stream stream = c.stream.asBroadcastStream(onCancel: cancelSub);
    stream.fold(0, (a, b) => a + b).then(expectAsync((int v) {
      Expect.equals(42, v);
    }));
    c.add(10);
    c.add(32);
    c.close();
  });

  test("StreamController.fold throws", () {
    StreamController c = new StreamController();
    Stream stream = c.stream.asBroadcastStream(onCancel: cancelSub);
    stream.fold(0, (a, b) {
      throw "Fnyf!";
    }).catchError(expectAsync((error) {
      Expect.equals("Fnyf!", error);
    }));
    c.add(42);
  });
}

testSingleController() {
  test("Single-subscription StreamController.fold", () {
    StreamController c = new StreamController();
    Stream stream = c.stream;
    stream.fold(0, (a, b) => a + b).then(expectAsync((int v) {
      Expect.equals(42, v);
    }));
    c.add(10);
    c.add(32);
    c.close();
  });

  test("Single-subscription StreamController.fold throws", () {
    StreamController c = new StreamController();
    Stream stream = c.stream;
    stream.fold(0, (a, b) {
      throw "Fnyf!";
    }).catchError(expectAsync((e) {
      Expect.equals("Fnyf!", e);
    }));
    c.add(42);
  });

  test(
      "Single-subscription StreamController events are buffered when"
      " there is no subscriber", () {
    StreamController c = new StreamController();
    EventSink sink = c.sink;
    Stream stream = c.stream;
    int counter = 0;
    sink.add(1);
    sink.add(2);
    sink.close();
    stream.listen((data) {
      counter += data;
    }, onDone: expectAsync(() {
      Expect.equals(3, counter);
    }));
  });
}

testExtraMethods() {
  Events sentEvents = new Events()
    ..add(7)
    ..add(9)
    ..add(13)
    ..add(87)
    ..close();

  test("forEach", () {
    StreamController c = new StreamController();
    Events actualEvents = new Events();
    Future f = c.stream.forEach(actualEvents.add);
    f.then(expectAsync((_) {
      actualEvents.close();
      Expect.listEquals(sentEvents.events, actualEvents.events);
    }));
    sentEvents.replay(c);
  });

  test("forEachError", () {
    Events sentEvents = new Events()
      ..add(7)
      ..error("bad")
      ..add(87)
      ..close();
    StreamController c = new StreamController();
    Events actualEvents = new Events();
    Future f = c.stream.forEach(actualEvents.add);
    f.catchError(expectAsync((error) {
      Expect.equals("bad", error);
      Expect.listEquals((new Events()..add(7)).events, actualEvents.events);
    }));
    sentEvents.replay(c);
  });

  test("forEachError2", () {
    Events sentEvents = new Events()
      ..add(7)
      ..add(9)
      ..add(87)
      ..close();
    StreamController c = new StreamController();
    Events actualEvents = new Events();
    Future f = c.stream.forEach((x) {
      if (x == 9) throw "bad";
      actualEvents.add(x);
    });
    f.catchError(expectAsync((error) {
      Expect.equals("bad", error);
      Expect.listEquals((new Events()..add(7)).events, actualEvents.events);
    }));
    sentEvents.replay(c);
  });

  test("firstWhere", () {
    StreamController c = new StreamController();
    Future f = c.stream.firstWhere((x) => (x % 3) == 0);
    f.then(expectAsync((v) {
      Expect.equals(9, v);
    }));
    sentEvents.replay(c);
  });

  test("firstWhere 2", () {
    StreamController c = new StreamController();
    Future f = c.stream.firstWhere((x) => (x % 4) == 0);
    f.catchError(expectAsync((e) {}));
    sentEvents.replay(c);
  });

  test("firstWhere 3", () {
    StreamController c = new StreamController();
    Future f = c.stream.firstWhere((x) => (x % 4) == 0, orElse: () => 999);
    f.then(expectAsync((v) {
      Expect.equals(999, v);
    }));
    sentEvents.replay(c);
  });

  test("lastWhere", () {
    StreamController c = new StreamController();
    Future f = c.stream.lastWhere((x) => (x % 3) == 0);
    f.then(expectAsync((v) {
      Expect.equals(87, v);
    }));
    sentEvents.replay(c);
  });

  test("lastWhere 2", () {
    StreamController c = new StreamController();
    Future f = c.stream.lastWhere((x) => (x % 4) == 0);
    f.catchError(expectAsync((e) {}));
    sentEvents.replay(c);
  });

  test("lastWhere 3", () {
    StreamController c = new StreamController();
    Future f = c.stream.lastWhere((x) => (x % 4) == 0, orElse: () => 999);
    f.then(expectAsync((v) {
      Expect.equals(999, v);
    }));
    sentEvents.replay(c);
  });

  test("singleWhere", () {
    StreamController c = new StreamController();
    Future f = c.stream.singleWhere((x) => (x % 9) == 0);
    f.then(expectAsync((v) {
      Expect.equals(9, v);
    }));
    sentEvents.replay(c);
  });

  test("singleWhere 2", () {
    StreamController c = new StreamController();
    Future f = c.stream.singleWhere((x) => (x % 3) == 0); // Matches 9 and 87..
    f.catchError(expectAsync((error) {
      Expect.isTrue(error is StateError);
    }));
    sentEvents.replay(c);
  });

  test("first", () {
    StreamController c = new StreamController();
    Future f = c.stream.first;
    f.then(expectAsync((v) {
      Expect.equals(7, v);
    }));
    sentEvents.replay(c);
  });

  test("first empty", () {
    StreamController c = new StreamController();
    Future f = c.stream.first;
    f.catchError(expectAsync((error) {
      Expect.isTrue(error is StateError);
    }));
    Events emptyEvents = new Events()..close();
    emptyEvents.replay(c);
  });

  test("first error", () {
    StreamController c = new StreamController();
    Future f = c.stream.first;
    f.catchError(expectAsync((error) {
      Expect.equals("error", error);
    }));
    Events errorEvents = new Events()
      ..error("error")
      ..close();
    errorEvents.replay(c);
  });

  test("first error 2", () {
    StreamController c = new StreamController();
    Future f = c.stream.first;
    f.catchError(expectAsync((error) {
      Expect.equals("error", error);
    }));
    Events errorEvents = new Events()
      ..error("error")
      ..error("error2")
      ..close();
    errorEvents.replay(c);
  });

  test("last", () {
    StreamController c = new StreamController();
    Future f = c.stream.last;
    f.then(expectAsync((v) {
      Expect.equals(87, v);
    }));
    sentEvents.replay(c);
  });

  test("last empty", () {
    StreamController c = new StreamController();
    Future f = c.stream.last;
    f.catchError(expectAsync((error) {
      Expect.isTrue(error is StateError);
    }));
    Events emptyEvents = new Events()..close();
    emptyEvents.replay(c);
  });

  test("last error", () {
    StreamController c = new StreamController();
    Future f = c.stream.last;
    f.catchError(expectAsync((error) {
      Expect.equals("error", error);
    }));
    Events errorEvents = new Events()
      ..error("error")
      ..close();
    errorEvents.replay(c);
  });

  test("last error 2", () {
    StreamController c = new StreamController();
    Future f = c.stream.last;
    f.catchError(expectAsync((error) {
      Expect.equals("error", error);
    }));
    Events errorEvents = new Events()
      ..error("error")
      ..error("error2")
      ..close();
    errorEvents.replay(c);
  });

  test("elementAt", () {
    StreamController c = new StreamController();
    Future f = c.stream.elementAt(2);
    f.then(expectAsync((v) {
      Expect.equals(13, v);
    }));
    sentEvents.replay(c);
  });

  test("elementAt 2", () {
    StreamController c = new StreamController();
    Future f = c.stream.elementAt(20);
    f.catchError(expectAsync((error) {
      Expect.isTrue(error is RangeError);
    }));
    sentEvents.replay(c);
  });

  test("drain", () {
    StreamController c = new StreamController();
    Future f = c.stream.drain();
    f.then(expectAsync((v) {
      Expect.equals(null, v);
    }));
    sentEvents.replay(c);
  });

  test("drain error", () {
    StreamController c = new StreamController();
    Future f = c.stream.drain();
    f.catchError(expectAsync((error) {
      Expect.equals("error", error);
    }));
    Events errorEvents = new Events()
      ..error("error")
      ..error("error2")
      ..close();
    errorEvents.replay(c);
  });
}

testPause() {
  test("pause event-unpause", () {
    StreamProtocolTest test = new StreamProtocolTest();
    Completer completer = new Completer();
    test
      ..expectListen()
      ..expectData(42, () {
        test.pause(completer.future);
      })
      ..expectPause(() {
        completer.complete(null);
      })
      ..expectData(43)
      ..expectData(44)
      ..expectCancel()
      ..expectDone(test.terminate);
    test.listen();
    test.add(42);
    test.add(43);
    test.add(44);
    test.close();
  });

  test("pause twice event-unpause", () {
    StreamProtocolTest test = new StreamProtocolTest();
    Completer completer = new Completer();
    Completer completer2 = new Completer();
    test
      ..expectListen()
      ..expectData(42, () {
        test.pause(completer.future);
        test.pause(completer2.future);
      })
      ..expectPause(() {
        completer.future.then(completer2.complete);
        completer.complete(null);
      })
      ..expectData(43)
      ..expectData(44)
      ..expectCancel()
      ..expectDone(test.terminate);
    test
      ..listen()
      ..add(42)
      ..add(43)
      ..add(44)
      ..close();
  });

  test("pause twice direct-unpause", () {
    StreamProtocolTest test = new StreamProtocolTest();
    test
      ..expectListen()
      ..expectData(42, () {
        test.pause();
        test.pause();
      })
      ..expectPause(() {
        test.resume();
        test.resume();
      })
      ..expectData(43)
      ..expectData(44)
      ..expectCancel()
      ..expectDone(test.terminate);
    test
      ..listen()
      ..add(42)
      ..add(43)
      ..add(44)
      ..close();
  });

  test("pause twice direct-event-unpause", () {
    StreamProtocolTest test = new StreamProtocolTest();
    Completer completer = new Completer();
    test
      ..expectListen()
      ..expectData(42, () {
        test.pause();
        test.pause(completer.future);
        test.add(43);
        test.add(44);
        test.close();
      })
      ..expectPause(() {
        completer.future.then((v) => test.resume());
        completer.complete(null);
      })
      ..expectData(43)
      ..expectData(44)
      ..expectCancel()
      ..expectDone(test.terminate);
    test
      ..listen()
      ..add(42);
  });
}

class TestError {
  const TestError();
}

testRethrow() {
  TestError error = const TestError();

  testStream(name, streamValueTransform) {
    test("rethrow-$name-value", () {
      StreamController c = new StreamController();
      Stream s = streamValueTransform(c.stream, (v) {
        throw error;
      });
      s.listen((_) {
        Expect.fail("unexpected value");
      }, onError: expectAsync((e) {
        Expect.identical(error, e);
      }));
      c.add(null);
      c.close();
    });
  }

  testStreamError(name, streamErrorTransform) {
    test("rethrow-$name-error", () {
      StreamController c = new StreamController();
      Stream s = streamErrorTransform(c.stream, (e) {
        throw error;
      });
      s.listen((_) {
        Expect.fail("unexpected value");
      }, onError: expectAsync((e) {
        Expect.identical(error, e);
      }));
      c.addError("SOME ERROR");
      c.close();
    });
  }

  testFuture(name, streamValueTransform) {
    test("rethrow-$name-value", () {
      StreamController c = new StreamController();
      Future f = streamValueTransform(c.stream, (v) {
        throw error;
      });
      f.then((v) {
        Expect.fail("unreachable");
      }, onError: expectAsync((e) {
        Expect.identical(error, e);
      }));
      // Need two values to trigger compare for reduce.
      c.add(0);
      c.add(1);
      c.close();
    });
  }

  testStream("where", (s, act) => s.where(act));
  testStream("map", (s, act) => s.map(act));
  testStream("expand", (s, act) => s.expand(act));
  testStream("where", (s, act) => s.where(act));
  testStreamError("handleError", (s, act) => s.handleError(act));
  testStreamError("handleTest", (s, act) => s.handleError((v) {}, test: act));
  testFuture("forEach", (s, act) => s.forEach(act));
  testFuture("every", (s, act) => s.every(act));
  testFuture("any", (s, act) => s.any(act));
  testFuture("reduce", (s, act) => s.reduce((a, b) => act(b)));
  testFuture("fold", (s, act) => s.fold(0, (a, b) => act(b)));
  testFuture("drain", (s, act) => s.drain().then(act));
}

void testBroadcastController() {
  test("broadcast-controller-basic", () {
    StreamProtocolTest test = new StreamProtocolTest.broadcast();
    test
      ..expectListen()
      ..expectData(42)
      ..expectCancel()
      ..expectDone(test.terminate);
    test
      ..listen()
      ..add(42)
      ..close();
  });

  test("broadcast-controller-listen-twice", () {
    StreamProtocolTest test = new StreamProtocolTest.broadcast();
    test
      ..expectListen()
      ..expectData(42, () {
        test.listen();
        test.add(37);
        test.close();
      })
      // Order is not guaranteed between subscriptions if not sync.
      ..expectData(37)
      ..expectData(37)
      ..expectDone()
      ..expectCancel()
      ..expectDone(test.terminate);
    test.listen();
    test.add(42);
  });

  test("broadcast-controller-listen-twice-non-overlap", () {
    StreamProtocolTest test = new StreamProtocolTest.broadcast();
    test
      ..expectListen(() {
        test.add(42);
      })
      ..expectData(42, () {
        test.cancel();
      })
      ..expectCancel(() {
        test.listen();
      })
      ..expectListen(() {
        test.add(37);
      })
      ..expectData(37, () {
        test.close();
      })
      ..expectCancel()
      ..expectDone(test.terminate);
    test.listen();
  });

  test("broadcast-controller-individual-pause", () {
    StreamProtocolTest test = new StreamProtocolTest.broadcast();
    var sub1;
    test
      ..expectListen()
      ..expectData(42)
      ..expectData(42, () {
        sub1.pause();
      })
      ..expectData(43, () {
        sub1.cancel();
        test.listen();
        test.add(44);
        test.expectData(44);
        test.expectData(44, test.terminate);
      });
    sub1 = test.listen();
    test.listen();
    test.add(42);
    test.add(43);
  });

  test("broadcast-controller-add-in-callback", () {
    StreamProtocolTest test = new StreamProtocolTest.broadcast();
    test.expectListen();
    var sub = test.listen();
    test.add(42);
    sub.expectData(42, () {
      test.add(87);
      sub.cancel();
    });
    test.expectCancel(() {
      test.add(37);
      test.terminate();
    });
  });
}

void testAsBroadcast() {
  test("asBroadcast-not-canceled", () {
    StreamProtocolTest test = new StreamProtocolTest.asBroadcast();
    var sub;
    test
      ..expectListen()
      ..expectBroadcastListen((_) {
        test.add(42);
      })
      ..expectData(42, () {
        sub.cancel();
      })
      ..expectBroadcastCancel((_) {
        sub = test.listen();
      })
      ..expectBroadcastListen((_) {
        test.terminate();
      });
    sub = test.listen();
  });

  test("asBroadcast-canceled", () {
    StreamProtocolTest test = new StreamProtocolTest.asBroadcast();
    var sub;
    test
      ..expectListen()
      ..expectBroadcastListen((_) {
        test.add(42);
      })
      ..expectData(42, () {
        sub.cancel();
      })
      ..expectBroadcastCancel((originalSub) {
        originalSub.cancel();
      })
      ..expectCancel(test.terminate);
    sub = test.listen();
  });

  test("asBroadcast-pause-original", () {
    StreamProtocolTest test = new StreamProtocolTest.asBroadcast();
    var sub;
    test
      ..expectListen()
      ..expectBroadcastListen((_) {
        test.add(42);
        test.add(43);
      })
      ..expectData(42, () {
        sub.cancel();
      })
      ..expectBroadcastCancel((originalSub) {
        originalSub.pause(); // Pause before sending 43 from original sub.
      })
      ..expectPause(() {
        sub = test.listen();
      })
      ..expectBroadcastListen((originalSub) {
        originalSub.resume();
      })
      ..expectData(43)
      ..expectResume(() {
        test.close();
      })
      ..expectCancel()
      ..expectDone()
      ..expectBroadcastCancel((_) => test.terminate());
    sub = test.listen();
  });
}

void testSink({bool sync, bool broadcast, bool asBroadcast}) {
  String type =
      "${sync ? "S" : "A"}${broadcast ? "B" : "S"}${asBroadcast ? "aB" : ""}";
  test("$type-controller-sink", () {
    var done = expectAsync(() {});
    var c = broadcast
        ? new StreamController.broadcast(sync: sync)
        : new StreamController(sync: sync);
    var expected = new Events()
      ..add(42)
      ..error("error")
      ..add(1)
      ..add(2)
      ..add(3)
      ..add(4)
      ..add(5)
      ..add(43)
      ..close();
    var actual = new Events.capture(
        asBroadcast ? c.stream.asBroadcastStream() : c.stream);
    var sink = c.sink;
    sink.add(42);
    sink.addError("error");
    sink.addStream(new Stream.fromIterable([1, 2, 3, 4, 5])).then((_) {
      sink.add(43);
      return sink.close();
    }).then((_) {
      Expect.listEquals(expected.events, actual.events);
      done();
    });
  });

  test("$type-controller-sink-canceled", () {
    var done = expectAsync(() {});
    var c = broadcast
        ? new StreamController.broadcast(sync: sync)
        : new StreamController(sync: sync);
    var expected = new Events()
      ..add(42)
      ..error("error")
      ..add(1)
      ..add(2)
      ..add(3);
    var stream = asBroadcast ? c.stream.asBroadcastStream() : c.stream;
    var actual = new Events();
    var sub;
    // Cancel subscription after receiving "3" event.
    sub = stream.listen((v) {
      if (v == 3) sub.cancel();
      actual.add(v);
    }, onError: actual.error);
    var sink = c.sink;
    sink.add(42);
    sink.addError("error");
    sink.addStream(new Stream.fromIterable([1, 2, 3, 4, 5])).then((_) {
      Expect.listEquals(expected.events, actual.events);
      // Close controller as well. It has no listener. If it is a broadcast
      // stream, it will still be open, and we read the "done" future before
      // closing. A normal stream is already done when its listener cancels.
      Future doneFuture = sink.done;
      sink.close();
      return doneFuture;
    }).then((_) {
      // No change in events.
      Expect.listEquals(expected.events, actual.events);
      done();
    });
  });

  test("$type-controller-sink-paused", () {
    var done = expectAsync(() {});
    var c = broadcast
        ? new StreamController.broadcast(sync: sync)
        : new StreamController(sync: sync);
    var expected = new Events()
      ..add(42)
      ..error("error")
      ..add(1)
      ..add(2)
      ..add(3)
      ..add(4)
      ..add(5)
      ..add(43)
      ..close();
    var stream = asBroadcast ? c.stream.asBroadcastStream() : c.stream;
    var actual = new Events();
    var sub;
    var pauseIsDone = false;
    sub = stream.listen((v) {
      if (v == 3) {
        sub.pause(new Future.delayed(const Duration(milliseconds: 15), () {
          pauseIsDone = true;
        }));
      }
      actual.add(v);
    }, onError: actual.error, onDone: actual.close);
    var sink = c.sink;
    sink.add(42);
    sink.addError("error");
    sink.addStream(new Stream.fromIterable([1, 2, 3, 4, 5])).then((_) {
      sink.add(43);
      return sink.close();
    }).then((_) {
      if (asBroadcast || broadcast) {
        // The done-future of the sink completes when it passes
        // the done event to the asBroadcastStream controller, which is
        // before the final listener gets the event.
        // Wait for the done event to be *delivered* before testing the
        // events.
        actual.onDone(() {
          Expect.listEquals(expected.events, actual.events);
          done();
        });
      } else {
        Expect.listEquals(expected.events, actual.events);
        done();
      }
    });
  });

  test("$type-controller-addstream-error-stop", () {
    // Check that addStream defaults to ending after the first error.
    var done = expectAsync(() {});
    StreamController c = broadcast
        ? new StreamController.broadcast(sync: sync)
        : new StreamController(sync: sync);
    Stream stream = asBroadcast ? c.stream.asBroadcastStream() : c.stream;
    var actual = new Events.capture(stream);

    var source = new Events();
    source
      ..add(1)
      ..add(2)
      ..error("BAD")
      ..add(3)
      ..error("FAIL")
      ..close();

    var expected = new Events()
      ..add(1)
      ..add(2)
      ..error("BAD")
      ..close();
    StreamController sourceController = new StreamController();
    c.addStream(sourceController.stream, cancelOnError: true).then((_) {
      c.close().then((_) {
        Expect.listEquals(expected.events, actual.events);
        done();
      });
    });

    source.replay(sourceController);
  });

  test("$type-controller-addstream-error-forward", () {
    // Check that addStream with cancelOnError:false passes all data and errors
    // to the controller.
    var done = expectAsync(() {});
    StreamController c = broadcast
        ? new StreamController.broadcast(sync: sync)
        : new StreamController(sync: sync);
    Stream stream = asBroadcast ? c.stream.asBroadcastStream() : c.stream;
    var actual = new Events.capture(stream);

    var source = new Events();
    source
      ..add(1)
      ..add(2)
      ..addError("BAD")
      ..add(3)
      ..addError("FAIL")
      ..close();

    StreamController sourceController = new StreamController();
    c.addStream(sourceController.stream).then((_) {
      c.close().then((_) {
        Expect.listEquals(source.events, actual.events);
        done();
      });
    });

    source.replay(sourceController);
  });

  test("$type-controller-addstream-twice", () {
    // Using addStream twice on the same stream
    var done = expectAsync(() {});
    StreamController c = broadcast
        ? new StreamController.broadcast(sync: sync)
        : new StreamController(sync: sync);
    Stream stream = asBroadcast ? c.stream.asBroadcastStream() : c.stream;
    var actual = new Events.capture(stream);

    // Streams of five events, throws on 3.
    Stream s1 = new Stream.fromIterable([1, 2, 3, 4, 5])
        .map((x) => (x == 3 ? throw x : x));
    Stream s2 = new Stream.fromIterable([1, 2, 3, 4, 5])
        .map((x) => (x == 3 ? throw x : x));

    Events expected = new Events();
    expected
      ..add(1)
      ..add(2)
      ..error(3);
    expected
      ..add(1)
      ..add(2)
      ..error(3)
      ..add(4)
      ..add(5);
    expected..close();

    c.addStream(s1, cancelOnError: true).then((_) {
      c.addStream(s2, cancelOnError: false).then((_) {
        c.close().then((_) {
          Expect.listEquals(expected.events, actual.events);
          done();
        });
      });
    });
  });
}

main() {
  testController();
  testSingleController();
  testExtraMethods();
  testPause();
  testRethrow();
  testBroadcastController();
  testAsBroadcast();
  testSink(sync: true, broadcast: false, asBroadcast: false);
  testSink(sync: true, broadcast: false, asBroadcast: true);
  testSink(sync: true, broadcast: true, asBroadcast: false);
  testSink(sync: false, broadcast: false, asBroadcast: false);
  testSink(sync: false, broadcast: false, asBroadcast: true);
  testSink(sync: false, broadcast: true, asBroadcast: false);
}
