// Copyright (c) 2013, the Dart project authors.  Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

// Test that transformations like `map` and `where` preserve broadcast flag.
library stream_join_test;

import 'dart:async';
import 'event_helper.dart';
import 'package:unittest/unittest.dart';
import "package:expect/expect.dart";

main() {
  testStream("singlesub", () => new StreamController(), (c) => c.stream);
  testStream(
      "broadcast", () => new StreamController.broadcast(), (c) => c.stream);
  testStream("asBroadcast", () => new StreamController(),
      (c) => c.stream.asBroadcastStream());
  testStream("broadcast.asBroadcast", () => new StreamController.broadcast(),
      (c) => c.stream.asBroadcastStream());
}

void testStream(
    String name, StreamController create(), Stream getStream(controller)) {
  test("$name-map", () {
    var c = create();
    var s = getStream(c);
    Stream newStream = s.map((x) => x + 1);
    Expect.equals(s.isBroadcast, newStream.isBroadcast);
    newStream.single.then(expectAsync((v) {
      Expect.equals(43, v);
    }));
    c.add(42);
    c.close();
  });
  test("$name-where", () {
    var c = create();
    var s = getStream(c);
    Stream newStream = s.where((x) => x.isEven);
    Expect.equals(s.isBroadcast, newStream.isBroadcast);
    newStream.single.then(expectAsync((v) {
      Expect.equals(42, v);
    }));
    c.add(37);
    c.add(42);
    c.add(87);
    c.close();
  });
  test("$name-handleError", () {
    var c = create();
    var s = getStream(c);
    Stream newStream = s.handleError((x, s) {});
    Expect.equals(s.isBroadcast, newStream.isBroadcast);
    newStream.single.then(expectAsync((v) {
      Expect.equals(42, v);
    }));
    c.addError("BAD1");
    c.add(42);
    c.addError("BAD2");
    c.close();
  });
  test("$name-expand", () {
    var c = create();
    var s = getStream(c);
    Stream newStream = s.expand((x) => x.isEven ? [x] : []);
    Expect.equals(s.isBroadcast, newStream.isBroadcast);
    newStream.single.then(expectAsync((v) {
      Expect.equals(42, v);
    }));
    c.add(37);
    c.add(42);
    c.add(87);
    c.close();
  });
  test("$name-transform", () {
    var c = create();
    var s = getStream(c);
    // TODO: find name of default transformer
    var t =
        new StreamTransformer.fromHandlers(handleData: (value, EventSink sink) {
      sink.add(value);
    });
    Stream newStream = s.transform(t);
    Expect.equals(s.isBroadcast, newStream.isBroadcast);
    newStream.single.then(expectAsync((v) {
      Expect.equals(42, v);
    }));
    c.add(42);
    c.close();
  });
  test("$name-take", () {
    var c = create();
    var s = getStream(c);
    Stream newStream = s.take(1);
    Expect.equals(s.isBroadcast, newStream.isBroadcast);
    newStream.single.then(expectAsync((v) {
      Expect.equals(42, v);
    }));
    c.add(42);
    c.add(37);
    c.close();
  });
  test("$name-takeWhile", () {
    var c = create();
    var s = getStream(c);
    Stream newStream = s.takeWhile((x) => x.isEven);
    Expect.equals(s.isBroadcast, newStream.isBroadcast);
    newStream.single.then(expectAsync((v) {
      Expect.equals(42, v);
    }));
    c.add(42);
    c.add(37);
    c.close();
  });
  test("$name-skip", () {
    var c = create();
    var s = getStream(c);
    Stream newStream = s.skip(1);
    Expect.equals(s.isBroadcast, newStream.isBroadcast);
    newStream.single.then(expectAsync((v) {
      Expect.equals(42, v);
    }));
    c.add(37);
    c.add(42);
    c.close();
  });
  test("$name-skipWhile", () {
    var c = create();
    var s = getStream(c);
    Stream newStream = s.skipWhile((x) => x.isOdd);
    Expect.equals(s.isBroadcast, newStream.isBroadcast);
    newStream.single.then(expectAsync((v) {
      Expect.equals(42, v);
    }));
    c.add(37);
    c.add(42);
    c.close();
  });
  test("$name-distinct", () {
    var c = create();
    var s = getStream(c);
    Stream newStream = s.distinct();
    Expect.equals(s.isBroadcast, newStream.isBroadcast);
    newStream.single.then(expectAsync((v) {
      Expect.equals(42, v);
    }));
    c.add(42);
    c.add(42);
    c.close();
  });
  test("$name-timeout", () {
    var c = create();
    var s = getStream(c);
    Stream newStream = s.timeout(const Duration(seconds: 1));
    Expect.equals(s.isBroadcast, newStream.isBroadcast);
    newStream.single.then(expectAsync((v) {
      Expect.equals(42, v);
    }));
    c.add(42);
    c.close();
  });
  test("$name-asyncMap", () {
    var c = create();
    var s = getStream(c);
    Stream newStream = s.asyncMap((x) => new Future.value(x + 1));
    Expect.equals(s.isBroadcast, newStream.isBroadcast);
    newStream.single.then(expectAsync((v) {
      Expect.equals(43, v);
    }));
    c.add(42);
    c.close();
  });
  test("$name-asyncExpand", () {
    var c = create();
    var s = getStream(c);
    Stream newStream = s.asyncExpand((x) => new Stream.fromIterable([x + 1]));
    Expect.equals(s.isBroadcast, newStream.isBroadcast);
    newStream.single.then(expectAsync((v) {
      Expect.equals(43, v);
    }));
    c.add(42);
    c.close();
  });

  // The following tests are only on broadcast streams, they require listening
  // more than once.
  if (name.startsWith("singlesub")) return;

  test("$name-skip-multilisten", () {
    if (name.startsWith("singlesub") || name.startsWith("asBroadcast")) return;
    var c = create();
    var s = getStream(c);
    Stream newStream = s.skip(5);
    // Listen immediately, to ensure that an asBroadcast stream is started.
    var sub = newStream.listen((_) {});
    int i = 0;
    var expect1 = 11;
    var expect2 = 21;
    var handler2 = expectAsync((v) {
      expect(v, expect2);
      expect2++;
    }, count: 5);
    var handler1 = expectAsync((v) {
      expect(v, expect1);
      expect1++;
    }, count: 15);
    var loop;
    loop = expectAsync(() {
      i++;
      c.add(i);
      if (i == 5) {
        scheduleMicrotask(() {
          newStream.listen(handler1);
        });
      }
      if (i == 15) {
        scheduleMicrotask(() {
          newStream.listen(handler2);
        });
      }
      if (i < 25) {
        scheduleMicrotask(loop);
      } else {
        sub.cancel();
        c.close();
      }
    }, count: 25);
    scheduleMicrotask(loop);
  });

  test("$name-take-multilisten", () {
    var c = create();
    var s = getStream(c);
    Stream newStream = s.take(10);
    // Listen immediately, to ensure that an asBroadcast stream is started.
    var sub = newStream.listen((_) {});
    int i = 0;
    var expect1 = 6;
    var expect2 = 11;
    var handler2 = expectAsync((v) {
      expect(v, expect2);
      expect(v <= 20, isTrue);
      expect2++;
    }, count: 10);
    var handler1 = expectAsync((v) {
      expect(v, expect1);
      expect(v <= 15, isTrue);
      expect1++;
    }, count: 10);
    var loop;
    loop = expectAsync(() {
      i++;
      c.add(i);
      if (i == 5) {
        scheduleMicrotask(() {
          newStream.listen(handler1);
        });
      }
      if (i == 10) {
        scheduleMicrotask(() {
          newStream.listen(handler2);
        });
      }
      if (i < 25) {
        scheduleMicrotask(loop);
      } else {
        sub.cancel();
        c.close();
      }
    }, count: 25);
    scheduleMicrotask(loop);
  });

  test("$name-skipWhile-multilisten", () {
    if (name.startsWith("singlesub") || name.startsWith("asBroadcast")) return;
    var c = create();
    var s = getStream(c);
    Stream newStream = s.skipWhile((x) => (x % 10) != 1);
    // Listen immediately, to ensure that an asBroadcast stream is started.
    var sub = newStream.listen((_) {});
    int i = 0;
    var expect1 = 11;
    var expect2 = 21;
    var handler2 = expectAsync((v) {
      expect(v, expect2);
      expect2++;
    }, count: 5);
    var handler1 = expectAsync((v) {
      expect(v, expect1);
      expect1++;
    }, count: 15);
    var loop;
    loop = expectAsync(() {
      i++;
      c.add(i);
      if (i == 5) {
        scheduleMicrotask(() {
          newStream.listen(handler1);
        });
      }
      if (i == 15) {
        scheduleMicrotask(() {
          newStream.listen(handler2);
        });
      }
      if (i < 25) {
        scheduleMicrotask(loop);
      } else {
        sub.cancel();
        c.close();
      }
    }, count: 25);
    scheduleMicrotask(loop);
  });

  test("$name-takeWhile-multilisten", () {
    var c = create();
    var s = getStream(c);
    Stream newStream = s.takeWhile((x) => (x % 10) != 5);
    // Listen immediately, to ensure that an asBroadcast stream is started.
    var sub = newStream.listen((_) {});
    int i = 0;
    // Non-overlapping ranges means the test must not remember its first
    // failure.
    var expect1 = 6;
    var expect2 = 16;
    var handler2 = expectAsync((v) {
      expect(v, expect2);
      expect(v <= 25, isTrue);
      expect2++;
    }, count: 9);
    var handler1 = expectAsync((v) {
      expect(v, expect1);
      expect(v <= 15, isTrue);
      expect1++;
    }, count: 9);
    var loop;
    loop = expectAsync(() {
      i++;
      c.add(i);
      if (i == 5) {
        scheduleMicrotask(() {
          newStream.listen(handler1);
        });
      }
      if (i == 15) {
        scheduleMicrotask(() {
          newStream.listen(handler2);
        });
      }
      if (i < 25) {
        scheduleMicrotask(loop);
      } else {
        sub.cancel();
        c.close();
      }
    }, count: 25);
    scheduleMicrotask(loop);
  });
}
