// Copyright (c) 2011, the Dart project authors.  Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

// Test the basic StreamController and StreamController.singleSubscription.
library stream_controller_test;

import "package:expect/async_helper.dart";
import "package:expect/expect.dart";
import 'dart:async';
import 'event_helper.dart';

const MS = const Duration(milliseconds: 1);

fail(e) {
  Expect.fail("Unexepected error: $e");
}

void testMultiController() {
  // Test normal flow.
  {
    var c = new StreamController(sync: true);
    Events expectedEvents = new Events()
      ..add(42)
      ..add("dibs")
      ..error("error!")
      ..error("error too!")
      ..close();
    CaptureEvents actualEvents =
        new Events.capture(c.stream.asBroadcastStream()) as CaptureEvents;
    expectedEvents.replay(c);
    Expect.listEquals(expectedEvents.events, actualEvents.events);
  }

  // Test automatic unsubscription on error.
  {
    var c = new StreamController(sync: true);
    var expectedEvents = new Events()
      ..add(42)
      ..error("error");
    var actualEvents = new Events.capture(
      c.stream.asBroadcastStream(),
      cancelOnError: true,
    );
    Events sentEvents = new Events()
      ..add(42)
      ..error("error")
      ..add("Are you there?");
    sentEvents.replay(c);
    Expect.listEquals(expectedEvents.events, actualEvents.events);
  }

  // Test manual unsubscription.
  {
    var c = new StreamController(sync: true);
    var expectedEvents = new Events()
      ..add(42)
      ..error("error")
      ..add(37);
    dynamic actualEvents = new Events.capture(
      c.stream.asBroadcastStream(),
      cancelOnError: false,
    );
    expectedEvents.replay(c);
    actualEvents.subscription.cancel();
    c.add("Are you there"); // Not sent to actualEvents.
    Expect.listEquals(expectedEvents.events, actualEvents.events);
  }

  // Test filter.
  {
    var c = new StreamController(sync: true);
    var expectedEvents = new Events()
      ..add("a string")
      ..add("another string")
      ..close();
    var sentEvents = new Events()
      ..add("a string")
      ..add(42)
      ..add("another string")
      ..close();
    var actualEvents = new Events.capture(
      c.stream.asBroadcastStream().where((v) => v is String),
    );
    sentEvents.replay(c);
    Expect.listEquals(expectedEvents.events, actualEvents.events);
  }

  // Test map.
  {
    var c = new StreamController(sync: true);
    var expectedEvents = new Events()
      ..add("abab")
      ..error("error")
      ..close();
    var sentEvents = new Events()
      ..add("ab")
      ..error("error")
      ..close();
    var actualEvents = new Events.capture(
      c.stream.asBroadcastStream().map((v) => "$v$v"),
    );
    sentEvents.replay(c);
    Expect.listEquals(expectedEvents.events, actualEvents.events);
  }

  // Test handleError.
  {
    var c = new StreamController(sync: true);
    var expectedEvents = new Events()
      ..add("ab")
      ..error("[foo]");
    var sentEvents = new Events()
      ..add("ab")
      ..error("foo")
      ..add("ab")
      ..close();
    var actualEvents = new Events.capture(
      c.stream.asBroadcastStream().handleError((error) {
        if (error is String) {
          // TODO(floitsch): this test originally changed the stacktrace.
          throw "[${error}]";
        }
      }),
      cancelOnError: true,
    );
    sentEvents.replay(c);
    Expect.listEquals(expectedEvents.events, actualEvents.events);
  }

  // reduce is tested asynchronously and therefore not in this file.

  // Test expand
  {
    var c = new StreamController(sync: true);
    var sentEvents = new Events()
      ..add(3)
      ..add(2)
      ..add(4)
      ..close();
    var expectedEvents = new Events()
      ..add(1)
      ..add(2)
      ..add(3)
      ..add(1)
      ..add(2)
      ..add(1)
      ..add(2)
      ..add(3)
      ..add(4)
      ..close();
    var actualEvents = new Events.capture(
      c.stream.asBroadcastStream().expand((v) {
        var l = [];
        for (int i = 0; i < v; i++) l.add(i + 1);
        return l;
      }),
    );
    sentEvents.replay(c);
    Expect.listEquals(expectedEvents.events, actualEvents.events);
  }

  // Test transform.
  {
    var c = new StreamController(sync: true);
    var sentEvents = new Events()
      ..add("a")
      ..error(42)
      ..add("b")
      ..close();
    var expectedEvents = new Events()
      ..error("a")
      ..add(42)
      ..error("b")
      ..add("foo")
      ..close();
    var actualEvents = new Events.capture(
      c.stream.asBroadcastStream().transform(
        new StreamTransformer.fromHandlers(
          handleData: (v, s) {
            s.addError(v);
          },
          handleError: (e, st, s) {
            s.add(e);
          },
          handleDone: (s) {
            s.add("foo");
            s.close();
          },
        ),
      ),
    );
    sentEvents.replay(c);
    Expect.listEquals(expectedEvents.events, actualEvents.events);
  }

  // Test multiple filters.
  {
    var c = new StreamController(sync: true);
    var sentEvents = new Events()
      ..add(42)
      ..add("snugglefluffy")
      ..add(7)
      ..add("42")
      ..error("not FormatException") // Unsubscribes.
      ..close();
    var expectedEvents = new Events()
      ..add(42)
      ..error("not FormatException");
    var actualEvents = new Events.capture(
      c.stream
          .asBroadcastStream()
          .where((v) => v is String)
          .map((v) => int.parse(v))
          .handleError((error) {
            if (error is! FormatException) throw error;
          })
          .where((v) => v > 10),
      cancelOnError: true,
    );
    sentEvents.replay(c);
    Expect.listEquals(expectedEvents.events, actualEvents.events);
  }

  // Test subscription changes while firing.
  {
    var c = new StreamController<int>(sync: true);
    var sink = c.sink;
    var stream = c.stream.asBroadcastStream();
    var counter = 0;
    var subscription = stream.listen(null);
    subscription.onData((data) {
      counter += data;
      subscription.cancel();
      stream.listen((data) {
        counter += 10 * data;
      });
      var subscription2 = stream.listen(null);
      subscription2.onData((data) {
        counter += 100 * data;
        if (data == 4) subscription2.cancel();
      });
    });
    sink.add(1); // seen by stream 1
    sink.add(2); // seen by stream 10 and 100
    sink.add(3); // -"-
    sink.add(4); // -"-
    sink.add(5); // seen by stream 10
    Expect.equals(1 + 20 + 200 + 30 + 300 + 40 + 400 + 50, counter);
  }
}

testSingleController() {
  // Test normal flow.
  {
    var c = new StreamController(sync: true);
    Events expectedEvents = new Events()
      ..add(42)
      ..add("dibs")
      ..error("error!")
      ..error("error too!")
      ..close();
    CaptureEvents actualEvents = new Events.capture(c.stream) as CaptureEvents;
    expectedEvents.replay(c);
    Expect.listEquals(expectedEvents.events, actualEvents.events);
  }

  // Test automatic unsubscription on error.
  {
    var c = new StreamController(sync: true);
    var expectedEvents = new Events()
      ..add(42)
      ..error("error");
    var actualEvents = new Events.capture(c.stream, cancelOnError: true);
    Events sentEvents = new Events()
      ..add(42)
      ..error("error")
      ..add("Are you there?");
    sentEvents.replay(c);
    Expect.listEquals(expectedEvents.events, actualEvents.events);
  }

  // Test manual unsubscription.
  {
    var c = new StreamController(sync: true);
    var expectedEvents = new Events()
      ..add(42)
      ..error("error")
      ..add(37);
    dynamic actualEvents = new Events.capture(c.stream, cancelOnError: false);
    expectedEvents.replay(c);
    actualEvents.subscription.cancel();
    c.add("Are you there"); // Not sent to actualEvents.
    Expect.listEquals(expectedEvents.events, actualEvents.events);
  }

  // Test filter.
  {
    var c = new StreamController(sync: true);
    var expectedEvents = new Events()
      ..add("a string")
      ..add("another string")
      ..close();
    var sentEvents = new Events()
      ..add("a string")
      ..add(42)
      ..add("another string")
      ..close();
    var actualEvents = new Events.capture(c.stream.where((v) => v is String));
    sentEvents.replay(c);
    Expect.listEquals(expectedEvents.events, actualEvents.events);
  }

  // Test map.
  {
    var c = new StreamController(sync: true);
    var expectedEvents = new Events()
      ..add("abab")
      ..error("error")
      ..close();
    var sentEvents = new Events()
      ..add("ab")
      ..error("error")
      ..close();
    var actualEvents = new Events.capture(c.stream.map((v) => "$v$v"));
    sentEvents.replay(c);
    Expect.listEquals(expectedEvents.events, actualEvents.events);
  }

  // Test handleError.
  {
    var c = new StreamController(sync: true);
    var expectedEvents = new Events()
      ..add("ab")
      ..error("[foo]");
    var sentEvents = new Events()
      ..add("ab")
      ..error("foo")
      ..add("ab")
      ..close();
    var actualEvents = new Events.capture(
      c.stream.handleError((error) {
        if (error is String) {
          // TODO(floitsch): this error originally changed the stack trace.
          throw "[${error}]";
        }
      }),
      cancelOnError: true,
    );
    sentEvents.replay(c);
    Expect.listEquals(expectedEvents.events, actualEvents.events);
  }

  // reduce is tested asynchronously and therefore not in this file.

  // Test expand
  {
    var c = new StreamController(sync: true);
    var sentEvents = new Events()
      ..add(3)
      ..add(2)
      ..add(4)
      ..close();
    var expectedEvents = new Events()
      ..add(1)
      ..add(2)
      ..add(3)
      ..add(1)
      ..add(2)
      ..add(1)
      ..add(2)
      ..add(3)
      ..add(4)
      ..close();
    var actualEvents = new Events.capture(
      c.stream.expand((v) {
        var l = [];
        for (int i = 0; i < v; i++) l.add(i + 1);
        return l;
      }),
    );
    sentEvents.replay(c);
    Expect.listEquals(expectedEvents.events, actualEvents.events);
  }

  // test contains.
  {
    var c = new StreamController(sync: true);
    // Error after match is not important.
    var sentEvents = new Events()
      ..add("a")
      ..add("x")
      ..error("FAIL")
      ..close();
    Future<bool> contains = c.stream.contains("x");
    contains.then((var c) {
      Expect.isTrue(c);
    });
    sentEvents.replay(c);
  }

  {
    var c = new StreamController(sync: true);
    // Not matching is ok.
    var sentEvents = new Events()
      ..add("a")
      ..add("x")
      ..add("b")
      ..close();
    Future<bool> contains = c.stream.contains("y");
    contains.then((var c) {
      Expect.isFalse(c);
    });
    sentEvents.replay(c);
  }

  {
    var c = new StreamController(sync: true);
    // Error before match makes future err.
    var sentEvents = new Events()
      ..add("a")
      ..error("FAIL")
      ..add("b")
      ..close();
    Future<bool> contains = c.stream.contains("b");
    contains
        .then((var c) {
          Expect.fail("no value expected");
        })
        .catchError((error) {
          Expect.equals("FAIL", error);
        });
    sentEvents.replay(c);
  }

  // Test transform.
  {
    var c = new StreamController(sync: true);
    var sentEvents = new Events()
      ..add("a")
      ..error(42)
      ..add("b")
      ..close();
    var expectedEvents = new Events()
      ..error("a")
      ..add(42)
      ..error("b")
      ..add("foo")
      ..close();
    var actualEvents = new Events.capture(
      c.stream.transform(
        new StreamTransformer.fromHandlers(
          handleData: (v, s) {
            s.addError(v);
          },
          handleError: (e, st, s) {
            s.add(e);
          },
          handleDone: (s) {
            s.add("foo");
            s.close();
          },
        ),
      ),
    );
    sentEvents.replay(c);
    Expect.listEquals(expectedEvents.events, actualEvents.events);
  }

  // Test multiple filters.
  {
    var c = new StreamController(sync: true);
    var sentEvents = new Events()
      ..add(42)
      ..add("snugglefluffy")
      ..add(7)
      ..add("42")
      ..error("not FormatException") // Unsubscribes.
      ..close();
    var expectedEvents = new Events()
      ..add(42)
      ..error("not FormatException");
    var actualEvents = new Events.capture(
      c.stream
          .where((v) => v is String)
          .map((v) => int.parse(v))
          .handleError((error) {
            if (error is! FormatException) throw error;
          })
          .where((v) => v > 10),
      cancelOnError: true,
    );
    sentEvents.replay(c);
    Expect.listEquals(expectedEvents.events, actualEvents.events);
  }

  // Test that only one subscription is allowed.
  {
    var c = new StreamController(sync: true);
    var sink = c.sink;
    var stream = c.stream;
    var counter = 0;
    var subscription = stream.listen((data) {
      counter += data as int;
    });
    Expect.throwsStateError(() => stream.listen(null));
    sink.add(1);
    Expect.equals(1, counter);
    c.close();
  }
}

testExtraMethods() {
  Events sentEvents = new Events()
    ..add(1)
    ..add(2)
    ..add(3)
    ..close();

  var c = new StreamController(sync: true);
  Events expectedEvents = new Events()
    ..add(3)
    ..close();
  Events actualEvents = new Events.capture(c.stream.skip(2));
  sentEvents.replay(c);
  Expect.listEquals(expectedEvents.events, actualEvents.events);

  c = new StreamController(sync: true);
  expectedEvents = new Events()..close();
  actualEvents = new Events.capture(c.stream.skip(3));
  sentEvents.replay(c);
  Expect.listEquals(expectedEvents.events, actualEvents.events);

  c = new StreamController(sync: true);
  expectedEvents = new Events()..close();
  actualEvents = new Events.capture(c.stream.skip(7));
  sentEvents.replay(c);
  Expect.listEquals(expectedEvents.events, actualEvents.events);

  c = new StreamController(sync: true);
  expectedEvents = sentEvents;
  actualEvents = new Events.capture(c.stream.skip(0));
  sentEvents.replay(c);
  Expect.listEquals(expectedEvents.events, actualEvents.events);

  c = new StreamController(sync: true);
  expectedEvents = new Events()
    ..add(3)
    ..close();
  actualEvents = new Events.capture(c.stream.skipWhile((x) => x <= 2));
  sentEvents.replay(c);
  Expect.listEquals(expectedEvents.events, actualEvents.events);

  c = new StreamController(sync: true);
  expectedEvents = new Events()
    ..add(2)
    ..add(3)
    ..close();
  actualEvents = new Events.capture(c.stream.skipWhile((x) => x <= 1));
  sentEvents.replay(c);
  Expect.listEquals(expectedEvents.events, actualEvents.events);

  c = new StreamController(sync: true);
  expectedEvents = new Events()
    ..add(1)
    ..add(2)
    ..add(3)
    ..close();
  actualEvents = new Events.capture(c.stream.skipWhile((x) => false));
  sentEvents.replay(c);
  Expect.listEquals(expectedEvents.events, actualEvents.events);

  c = new StreamController(sync: true);
  expectedEvents = new Events()
    ..add(1)
    ..add(2)
    ..close();
  actualEvents = new Events.capture(c.stream.take(2));
  sentEvents.replay(c);
  Expect.listEquals(expectedEvents.events, actualEvents.events);

  c = new StreamController(sync: true);
  expectedEvents = new Events()
    ..add(1)
    ..add(2)
    ..close();
  actualEvents = new Events.capture(c.stream.takeWhile((x) => x <= 2));
  sentEvents.replay(c);
  Expect.listEquals(expectedEvents.events, actualEvents.events);

  c = new StreamController(sync: true);
  sentEvents = new Events()
    ..add(1)
    ..add(1)
    ..add(2)
    ..add(1)
    ..add(2)
    ..add(2)
    ..add(2)
    ..close();
  expectedEvents = new Events()
    ..add(1)
    ..add(2)
    ..add(1)
    ..add(2)
    ..close();
  actualEvents = new Events.capture(c.stream.distinct());
  sentEvents.replay(c);
  Expect.listEquals(expectedEvents.events, actualEvents.events);

  c = new StreamController(sync: true);
  sentEvents = new Events()
    ..add(5)
    ..add(6)
    ..add(4)
    ..add(6)
    ..add(8)
    ..add(3)
    ..add(4)
    ..add(1)
    ..close();
  expectedEvents = new Events()
    ..add(5)
    ..add(4)
    ..add(3)
    ..add(1)
    ..close();
  // Use 'distinct' as a filter with access to the previously emitted event.
  actualEvents = new Events.capture(c.stream.distinct((a, b) => a < b));
  sentEvents.replay(c);
  Expect.listEquals(expectedEvents.events, actualEvents.events);
}

void testClosed() {
  StreamController c = new StreamController(sync: true);
  Expect.isFalse(c.isClosed);
  c.add(42);
  Expect.isFalse(c.isClosed);
  c.addError("bad");
  Expect.isFalse(c.isClosed);
  c.close();
  Expect.isTrue(c.isClosed);
}

void testCloseFuture() {
  asyncStart();
  asyncStart();
  var c = new StreamController();
  var f = c.close();
  Expect.isTrue(c.isClosed);
  bool doneSeen = false;
  f.then((_) {
    Expect.isTrue(doneSeen);
    asyncEnd();
  });
  // Only listen after a while.
  new Timer(MS * 250, () {
    c.stream.listen(
      null,
      onDone: () {
        asyncEnd();
        doneSeen = true;
      },
    );
  });
}

void testCloseFuture2() {
  asyncStart();
  asyncStart();
  var c = new StreamController.broadcast();
  var f = c.close();
  Expect.isTrue(c.isClosed);
  bool doneSeen = false;
  f.then((_) {
    // Done future on broadcast stream can happen
    // before a listener is added.
    Expect.isFalse(doneSeen);
    asyncEnd();
  });
  // Only listen after a while.
  new Timer(MS * 250, () {
    c.stream.listen(
      null,
      onDone: () {
        doneSeen = true;
        asyncEnd();
      },
    );
  });
}

void testCloseFuture3() {
  asyncStart();
  var c = new StreamController.broadcast();
  c
    ..add(1)
    ..add(2)
    ..add(3)
    ..add(4);
  c.stream.listen(null).cancel();
  var f = c.close();
  Expect.isTrue(c.isClosed);
  f.then((_) {
    asyncEnd();
  });
}

void testStreamEquals() {
  StreamController c;
  c = new StreamController(sync: false);
  Expect.equals(c.stream, c.stream);
  c = new StreamController(sync: true);
  Expect.equals(c.stream, c.stream);
  c = new StreamController(sync: false, onListen: () {});
  Expect.equals(c.stream, c.stream);
  c = new StreamController(sync: true, onListen: () {});
  Expect.equals(c.stream, c.stream);
  c = new StreamController.broadcast(sync: false);
  Expect.equals(c.stream, c.stream);
  c = new StreamController.broadcast(sync: true);
  Expect.equals(c.stream, c.stream);
  c = new StreamController.broadcast(sync: false, onListen: () {});
  Expect.equals(c.stream, c.stream);
  c = new StreamController.broadcast(sync: true, onListen: () {});
  Expect.equals(c.stream, c.stream);
}

void testCancelThrow() {
  asyncStart();
  asyncStart();
  asyncStart();
  StreamController c = new StreamController(
    onCancel: () {
      asyncEnd();
      throw "ERROR";
    },
  );
  c.add(1);
  c.add(2);
  c.add(3);
  Future done = c.close();
  late StreamSubscription sub;
  sub = c.stream.listen((v) {
    Expect.equals(1, v);
    Future f = sub.cancel();
    f.catchError((e) {
      // Must complete with error from onCancel.
      Expect.equals("ERROR", e);
      asyncEnd();
    });
  });
  done.catchError(fail).whenComplete(asyncEnd); // Must complete without error.
}

void testCancelThrow2() {
  asyncStart();
  asyncStart();
  asyncStart();
  asyncStart();
  asyncStart();
  StreamController c2 = new StreamController(
    onCancel: () {
      asyncEnd();
      throw "ERROR";
    },
  );
  c2.add(1);
  c2.add(2);
  Future done2 = c2.close();
  done2.catchError(fail).whenComplete(asyncEnd); // Should not get error;

  StreamController c = new StreamController();
  var sub;
  sub = c.stream.listen((v) {
    Expect.equals(1, v);
    Future f = sub.cancel();
    f.catchError((e) {
      // Error from addStream stream's cancel must go only here.
      asyncEnd();
      Expect.equals("ERROR", e);
    });
  });
  var addDone = c.addStream(c2.stream);
  addDone.catchError(fail).whenComplete(asyncEnd); // Should not get error.
  var done = c.done;
  done.catchError(fail).whenComplete(asyncEnd); // Should not get error.
}

void testCancelThrow3() {
  asyncStart();
  asyncStart();
  asyncStart();
  asyncStart();
  asyncStart();
  asyncStart();
  StreamController c2 = new StreamController(
    onCancel: () {
      asyncEnd();
      throw "ERROR2";
    },
  );
  c2.add(1);
  c2.add(2);
  var done2 = c2.close();
  done2.catchError(fail).whenComplete(asyncEnd); // Should not get error;

  StreamController c = new StreamController(
    onCancel: () {
      asyncEnd();
      throw "ERROR1";
    },
  );
  var sub;
  sub = c.stream.listen((v) {
    Expect.equals(1, v);
    Future f = sub.cancel();
    f.catchError((e) {
      // Only the last error ends up here.
      Expect.equals("ERROR1", e);
      asyncEnd();
    });
  });
  var addDone = c.addStream(c2.stream);
  addDone.catchError(fail).whenComplete(asyncEnd); // Error must not go here.
  c.done.catchError(fail).whenComplete(asyncEnd); // Error must not go here.
}

void testBroadcastListenAfterClose() {
  asyncStart();
  StreamController c = new StreamController.broadcast();
  var f = c.close();
  f.then((_) {
    // Listening after close is allowed. The listener gets a done event.
    c.stream.listen(null, onDone: asyncEnd);
  });
}

void testBroadcastListenAfterClosePaused() {
  asyncStart();
  StreamController c = new StreamController.broadcast();
  var f = c.close();
  f.then((_) {
    // Listening after close is allowed. The listener gets a done event.
    var sub = c.stream.listen(
      null,
      onDone: () {
        Expect.fail("wrong done");
      },
    );
    sub.pause();
    sub.pause();
    new Timer(MS * 100, () {
      sub.asFuture().whenComplete(() {
        Expect.fail("Bad complete");
      });
      sub.resume();
      new Timer(MS * 100, () {
        sub.onDone(asyncEnd);
        sub.resume();
      });
    });
  });
}

void testAsBroadcastListenAfterClose() {
  asyncStart();
  asyncStart();
  StreamController c = new StreamController();
  Stream s = c.stream.asBroadcastStream();
  s.listen(null, onDone: asyncEnd);
  var f = c.close();
  f.then((_) {
    // Listening after close is allowed. The listener gets a done event.
    s.listen(null, onDone: asyncEnd);
  });
}

void testAsBroadcastListenAfterClosePaused() {
  asyncStart();
  asyncStart();
  StreamController c = new StreamController();
  Stream s = c.stream.asBroadcastStream();
  s.listen(null, onDone: asyncEnd);
  var f = c.close();
  f.then((_) {
    // Listening after close is allowed. The listener gets a done event.
    var sub = s.listen(
      null,
      onDone: () {
        Expect.fail("wrong done");
      },
    );
    sub.pause();
    sub.pause();
    new Timer(MS * 100, () {
      sub.asFuture().whenComplete(() {
        Expect.fail("Bad complete");
      });
      sub.resume();
      new Timer(MS * 100, () {
        sub.onDone(asyncEnd);
        sub.resume();
      });
    });
  });
}

void testEventInListen() {
  asyncStart();
  // Regression test for http://dartbug.com/19722
  var c;
  void send() {
    c.add(1);
  }

  int i = 1;
  c = new StreamController.broadcast(onListen: send, sync: true);
  c.stream.listen((v) {
    Expect.equals(i++, v);
  }, onDone: asyncEnd);
  c.add(2);
  c.close();
}

void testSyncControllerNotReentrant() {
  Stream emptyStream = (new StreamController.broadcast()..close()).stream;
  asyncStart();
  for (int listenerCount = 1; listenerCount <= 2; listenerCount++) {
    StreamController c = new StreamController.broadcast(sync: true);
    for (int i = 0; i < listenerCount; i++) {
      asyncStart();
      asyncStart();
      c.stream.listen(
        (v) {
          Expect.equals(42, v);
          Expect.throws(() {
            c.add(37);
          });
          Expect.throws(() {
            c.addError(37);
          });
          Expect.throws(() {
            c.addStream(emptyStream);
          });
          Expect.throws(() {
            c.close();
          });
          asyncEnd();
        },
        onError: (e, s) {
          Expect.equals(87, e);
          Expect.throws(() {
            c.add(37);
          });
          Expect.throws(() {
            c.addError(37);
          });
          Expect.throws(() {
            c.addStream(emptyStream);
          });
          Expect.throws(() {
            c.close();
          });
          asyncEnd();
        },
      );
    }
    c.add(42);
    c.addError(87);
  }
  asyncEnd();
}

void testSettingCallbacks() {
  const int initial = 0;
  const int running = 1;
  const int paused = 2;
  const int canceled = 3;

  var controller = new StreamController();
  var stream = controller.stream;
  var state = initial;

  var onListen = () {
    state = running;
  };
  var onPause = () {
    state = paused;
  };
  var onResume = () {
    state = running;
  };
  var onCancel = () {
    state = canceled;
  };

  Expect.isNull(controller.onListen);
  Expect.isNull(controller.onPause);
  Expect.isNull(controller.onResume);
  Expect.isNull(controller.onCancel);

  controller
    ..onListen = onListen
    ..onPause = onPause
    ..onResume = onResume
    ..onCancel = onCancel;

  Expect.equals(onListen, controller.onListen);
  Expect.equals(onPause, controller.onPause);
  Expect.equals(onResume, controller.onResume);
  Expect.equals(onCancel, controller.onCancel);

  Expect.equals(initial, state);
  var sub = stream.listen(null);
  Expect.equals(running, state);
  sub.pause();
  Expect.equals(paused, state);
  Expect.isTrue(controller.isPaused);
  sub.resume();
  Expect.equals(running, state);
  Expect.isFalse(controller.isPaused);

  var onListen2 = () {
    state = -running;
  };
  var onPause2 = () {
    state = -paused;
  };
  var onResume2 = () {
    state = -running;
  };
  var onCancel2 = () {
    state = -canceled;
  };
  // Changing them later does make a difference.
  controller
    ..onListen = onListen2
    ..onPause = onPause2
    ..onResume = onResume2
    ..onCancel = onCancel2;

  Expect.equals(onListen2, controller.onListen);
  Expect.equals(onPause2, controller.onPause);
  Expect.equals(onResume2, controller.onResume);
  Expect.equals(onCancel2, controller.onCancel);

  Expect.equals(running, state);
  sub.pause();
  Expect.equals(-paused, state);
  Expect.isTrue(controller.isPaused);
  sub.resume();
  Expect.equals(-running, state);
  Expect.isFalse(controller.isPaused);
  sub.cancel();
  Expect.equals(-canceled, state);
}

void testSettingNullCallbacks() {
  failCallback() => fail("Callback should not be called");
  var controller = new StreamController(
    onListen: failCallback,
    onPause: failCallback,
    onResume: failCallback,
    onCancel: failCallback,
  );

  var stream = controller.stream;

  Expect.isFalse(controller.hasListener);
  Expect.isTrue(controller.isPaused);

  Expect.isNotNull(controller.onListen);
  controller.onListen = null;
  Expect.isNull(controller.onListen);

  var sub = stream.listen(null);

  Expect.isTrue(controller.hasListener);
  Expect.isFalse(controller.isPaused);

  Expect.isNotNull(controller.onPause);
  controller.onPause = null;
  Expect.isNull(controller.onPause);

  sub.pause();

  Expect.isTrue(controller.hasListener);
  Expect.isTrue(controller.isPaused);

  Expect.isNotNull(controller.onResume);
  controller.onResume = null;
  Expect.isNull(controller.onResume);

  sub.resume();

  Expect.isTrue(controller.hasListener);
  Expect.isFalse(controller.isPaused);

  Expect.isNotNull(controller.onCancel);
  controller.onCancel = null;
  Expect.isNull(controller.onCancel);

  sub.cancel();

  Expect.isFalse(controller.hasListener);
  Expect.isFalse(controller.isPaused);
}

void testBroadcastSettingCallbacks() {
  const int initial = 0;
  const int running = 1;
  const int canceled = 2;

  var controller = new StreamController.broadcast();
  var stream = controller.stream;
  var state = initial;

  Expect.throwsUnsupportedError(() => controller.onPause = () {});
  Expect.throwsUnsupportedError(() => controller.onResume = () {});

  controller
    ..onListen = () {
      state = running;
    }
    ..onCancel = () {
      state = canceled;
    };

  Expect.equals(initial, state);
  var sub = stream.listen(null);
  Expect.equals(running, state);
  sub.cancel();
  Expect.equals(canceled, state);

  // Changing them later does make a difference.
  controller
    ..onListen = () {
      state = -running;
    }
    ..onCancel = () {
      state = -canceled;
    };

  var sub2 = stream.listen(null);
  Expect.equals(-running, state);
  sub2.cancel();
  Expect.equals(-canceled, state);
}

void testBroadcastSettingNullCallbacks() {
  failCallback() => fail("Callback should not be called");
  var controller = new StreamController.broadcast(
    onListen: failCallback,
    onCancel: failCallback,
  );

  var stream = controller.stream;

  Expect.isFalse(controller.hasListener);

  controller.onListen = null;

  var sub = stream.listen(null);

  Expect.isTrue(controller.hasListener);

  controller.onCancel = null;

  sub.cancel();

  Expect.isFalse(controller.hasListener);
}

main() {
  asyncStart();
  testMultiController();
  testSingleController();
  testExtraMethods();
  testClosed();
  testCloseFuture();
  testCloseFuture2();
  testCloseFuture3();
  testStreamEquals();
  testCancelThrow();
  testCancelThrow2();
  testCancelThrow3();
  testBroadcastListenAfterClose();
  testBroadcastListenAfterClosePaused();
  testAsBroadcastListenAfterClose();
  testAsBroadcastListenAfterClosePaused();
  testEventInListen();
  testSyncControllerNotReentrant();
  testSettingCallbacks();
  testSettingNullCallbacks();
  testBroadcastSettingCallbacks();
  testBroadcastSettingNullCallbacks();
  asyncEnd();
}
