| // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
| // for details. All rights reserved. Use of this source code is governed by a |
| // BSD-style license that can be found in the LICENSE file. |
| |
| import "dart:async"; |
| |
| import "package:async/async.dart"; |
| import "package:test/test.dart"; |
| |
| import "utils.dart"; |
| |
| main() { |
| StreamSinkCompleter completer; |
| setUp(() { |
| completer = new StreamSinkCompleter(); |
| }); |
| |
| group("when a stream is linked before events are added", () { |
| test("data events are forwarded", () { |
| var sink = new TestSink(); |
| completer.setDestinationSink(sink); |
| completer.sink..add(1)..add(2)..add(3)..add(4); |
| |
| expect(sink.results[0].asValue.value, equals(1)); |
| expect(sink.results[1].asValue.value, equals(2)); |
| expect(sink.results[2].asValue.value, equals(3)); |
| expect(sink.results[3].asValue.value, equals(4)); |
| }); |
| |
| test("error events are forwarded", () { |
| var sink = new TestSink(); |
| completer.setDestinationSink(sink); |
| completer.sink..addError("oh no")..addError("that's bad"); |
| |
| expect(sink.results[0].asError.error, equals("oh no")); |
| expect(sink.results[1].asError.error, equals("that's bad")); |
| }); |
| |
| test("addStream is forwarded", () async { |
| var sink = new TestSink(); |
| completer.setDestinationSink(sink); |
| |
| var controller = new StreamController(); |
| completer.sink.addStream(controller.stream); |
| |
| controller.add(1); |
| controller.addError("oh no"); |
| controller.add(2); |
| controller.addError("that's bad"); |
| await flushMicrotasks(); |
| |
| expect(sink.results[0].asValue.value, equals(1)); |
| expect(sink.results[1].asError.error, equals("oh no")); |
| expect(sink.results[2].asValue.value, equals(2)); |
| expect(sink.results[3].asError.error, equals("that's bad")); |
| expect(sink.isClosed, isFalse); |
| |
| controller.close(); |
| await flushMicrotasks(); |
| expect(sink.isClosed, isFalse); |
| }); |
| |
| test("close() is forwarded", () { |
| var sink = new TestSink(); |
| completer.setDestinationSink(sink); |
| completer.sink.close(); |
| expect(sink.isClosed, isTrue); |
| }); |
| |
| test("the future from the inner close() is returned", () async { |
| var closeCompleter = new Completer(); |
| var sink = new TestSink(onDone: () => closeCompleter.future); |
| completer.setDestinationSink(sink); |
| |
| var closeCompleted = false; |
| completer.sink.close().then(expectAsync1((_) { |
| closeCompleted = true; |
| })); |
| |
| await flushMicrotasks(); |
| expect(closeCompleted, isFalse); |
| |
| closeCompleter.complete(); |
| await flushMicrotasks(); |
| expect(closeCompleted, isTrue); |
| }); |
| |
| test("errors are forwarded from the inner close()", () { |
| var sink = new TestSink(onDone: () => throw "oh no"); |
| completer.setDestinationSink(sink); |
| expect(completer.sink.done, throwsA("oh no")); |
| expect(completer.sink.close(), throwsA("oh no")); |
| }); |
| |
| test("errors aren't top-leveled if only close() is listened to", () async { |
| var sink = new TestSink(onDone: () => throw "oh no"); |
| completer.setDestinationSink(sink); |
| expect(completer.sink.close(), throwsA("oh no")); |
| |
| // Give the event loop a chance to top-level errors if it's going to. |
| await flushMicrotasks(); |
| }); |
| |
| test("errors aren't top-leveled if only done is listened to", () async { |
| var sink = new TestSink(onDone: () => throw "oh no"); |
| completer.setDestinationSink(sink); |
| completer.sink.close(); |
| expect(completer.sink.done, throwsA("oh no")); |
| |
| // Give the event loop a chance to top-level errors if it's going to. |
| await flushMicrotasks(); |
| }); |
| }); |
| |
| group("when a stream is linked after events are added", () { |
| test("data events are forwarded", () async { |
| completer.sink..add(1)..add(2)..add(3)..add(4); |
| await flushMicrotasks(); |
| |
| var sink = new TestSink(); |
| completer.setDestinationSink(sink); |
| await flushMicrotasks(); |
| |
| expect(sink.results[0].asValue.value, equals(1)); |
| expect(sink.results[1].asValue.value, equals(2)); |
| expect(sink.results[2].asValue.value, equals(3)); |
| expect(sink.results[3].asValue.value, equals(4)); |
| }); |
| |
| test("error events are forwarded", () async { |
| completer.sink..addError("oh no")..addError("that's bad"); |
| await flushMicrotasks(); |
| |
| var sink = new TestSink(); |
| completer.setDestinationSink(sink); |
| await flushMicrotasks(); |
| |
| expect(sink.results[0].asError.error, equals("oh no")); |
| expect(sink.results[1].asError.error, equals("that's bad")); |
| }); |
| |
| test("addStream is forwarded", () async { |
| var controller = new StreamController(); |
| completer.sink.addStream(controller.stream); |
| |
| controller.add(1); |
| controller.addError("oh no"); |
| controller.add(2); |
| controller.addError("that's bad"); |
| controller.close(); |
| await flushMicrotasks(); |
| |
| var sink = new TestSink(); |
| completer.setDestinationSink(sink); |
| await flushMicrotasks(); |
| |
| expect(sink.results[0].asValue.value, equals(1)); |
| expect(sink.results[1].asError.error, equals("oh no")); |
| expect(sink.results[2].asValue.value, equals(2)); |
| expect(sink.results[3].asError.error, equals("that's bad")); |
| expect(sink.isClosed, isFalse); |
| }); |
| |
| test("close() is forwarded", () async { |
| completer.sink.close(); |
| await flushMicrotasks(); |
| |
| var sink = new TestSink(); |
| completer.setDestinationSink(sink); |
| await flushMicrotasks(); |
| |
| expect(sink.isClosed, isTrue); |
| }); |
| |
| test("the future from the inner close() is returned", () async { |
| var closeCompleted = false; |
| completer.sink.close().then(expectAsync1((_) { |
| closeCompleted = true; |
| })); |
| await flushMicrotasks(); |
| |
| var closeCompleter = new Completer(); |
| var sink = new TestSink(onDone: () => closeCompleter.future); |
| completer.setDestinationSink(sink); |
| await flushMicrotasks(); |
| expect(closeCompleted, isFalse); |
| |
| closeCompleter.complete(); |
| await flushMicrotasks(); |
| expect(closeCompleted, isTrue); |
| }); |
| |
| test("errors are forwarded from the inner close()", () async { |
| expect(completer.sink.done, throwsA("oh no")); |
| expect(completer.sink.close(), throwsA("oh no")); |
| await flushMicrotasks(); |
| |
| var sink = new TestSink(onDone: () => throw "oh no"); |
| completer.setDestinationSink(sink); |
| }); |
| |
| test("errors aren't top-leveled if only close() is listened to", () async { |
| expect(completer.sink.close(), throwsA("oh no")); |
| await flushMicrotasks(); |
| |
| var sink = new TestSink(onDone: () => throw "oh no"); |
| completer.setDestinationSink(sink); |
| |
| // Give the event loop a chance to top-level errors if it's going to. |
| await flushMicrotasks(); |
| }); |
| |
| test("errors aren't top-leveled if only done is listened to", () async { |
| completer.sink.close(); |
| expect(completer.sink.done, throwsA("oh no")); |
| await flushMicrotasks(); |
| |
| var sink = new TestSink(onDone: () => throw "oh no"); |
| completer.setDestinationSink(sink); |
| |
| // Give the event loop a chance to top-level errors if it's going to. |
| await flushMicrotasks(); |
| }); |
| }); |
| |
| test("the sink is closed, the destination is set, then done is read", |
| () async { |
| expect(completer.sink.close(), completes); |
| await flushMicrotasks(); |
| |
| completer.setDestinationSink(new TestSink()); |
| await flushMicrotasks(); |
| |
| expect(completer.sink.done, completes); |
| }); |
| |
| test("done is read, the destination is set, then the sink is closed", |
| () async { |
| expect(completer.sink.done, completes); |
| await flushMicrotasks(); |
| |
| completer.setDestinationSink(new TestSink()); |
| await flushMicrotasks(); |
| |
| expect(completer.sink.close(), completes); |
| }); |
| |
| group("fromFuture()", () { |
| test("with a successful completion", () async { |
| var futureCompleter = new Completer<StreamSink>(); |
| var sink = StreamSinkCompleter.fromFuture(futureCompleter.future); |
| sink.add(1); |
| sink.add(2); |
| sink.add(3); |
| sink.close(); |
| |
| var testSink = new TestSink(); |
| futureCompleter.complete(testSink); |
| await testSink.done; |
| |
| expect(testSink.results[0].asValue.value, equals(1)); |
| expect(testSink.results[1].asValue.value, equals(2)); |
| expect(testSink.results[2].asValue.value, equals(3)); |
| }); |
| |
| test("with an error", () async { |
| var futureCompleter = new Completer<StreamSink>(); |
| var sink = StreamSinkCompleter.fromFuture(futureCompleter.future); |
| expect(sink.done, throwsA("oh no")); |
| futureCompleter.completeError("oh no"); |
| }); |
| }); |
| |
| group("setError()", () { |
| test("produces a closed sink with the error", () { |
| completer.setError("oh no"); |
| expect(completer.sink.done, throwsA("oh no")); |
| expect(completer.sink.close(), throwsA("oh no")); |
| }); |
| |
| test("produces an error even if done was accessed earlier", () async { |
| expect(completer.sink.done, throwsA("oh no")); |
| expect(completer.sink.close(), throwsA("oh no")); |
| await flushMicrotasks(); |
| |
| completer.setError("oh no"); |
| }); |
| }); |
| |
| test("doesn't allow the destination sink to be set multiple times", () { |
| completer.setDestinationSink(new TestSink()); |
| expect( |
| () => completer.setDestinationSink(new TestSink()), throwsStateError); |
| expect( |
| () => completer.setDestinationSink(new TestSink()), throwsStateError); |
| }); |
| } |