dartfmt
diff --git a/pkgs/stream_channel/lib/src/disconnector.dart b/pkgs/stream_channel/lib/src/disconnector.dart index b23813e..37a376c 100644 --- a/pkgs/stream_channel/lib/src/disconnector.dart +++ b/pkgs/stream_channel/lib/src/disconnector.dart
@@ -34,10 +34,10 @@ /// futures have completed. Note that a [StreamController]'s sink won't close /// until the corresponding stream has a listener. Future disconnect() => _disconnectMemo.runOnce(() { - var futures = _sinks.map((sink) => sink._disconnect()).toList(); - _sinks.clear(); - return Future.wait(futures, eagerError: true); - }); + var futures = _sinks.map((sink) => sink._disconnect()).toList(); + _sinks.clear(); + return Future.wait(futures, eagerError: true); + }); final _disconnectMemo = new AsyncMemoizer(); StreamChannel<T> bind(StreamChannel<T> channel) { @@ -111,10 +111,8 @@ if (_isDisconnected) return new Future.value(); _addStreamCompleter = new Completer.sync(); - _addStreamSubscription = stream.listen( - _inner.add, - onError: _inner.addError, - onDone: _addStreamCompleter.complete); + _addStreamSubscription = stream.listen(_inner.add, + onError: _inner.addError, onDone: _addStreamCompleter.complete); return _addStreamCompleter.future.then((_) { _addStreamCompleter = null; _addStreamSubscription = null;
diff --git a/pkgs/stream_channel/lib/src/guarantee_channel.dart b/pkgs/stream_channel/lib/src/guarantee_channel.dart index a874799..f18cbb9 100644 --- a/pkgs/stream_channel/lib/src/guarantee_channel.dart +++ b/pkgs/stream_channel/lib/src/guarantee_channel.dart
@@ -32,28 +32,29 @@ GuaranteeChannel(Stream<T> innerStream, StreamSink<T> innerSink, {bool allowSinkErrors: true}) { - _sink = new _GuaranteeSink<T>(innerSink, this, - allowErrors: allowSinkErrors); + _sink = + new _GuaranteeSink<T>(innerSink, this, allowErrors: allowSinkErrors); // Enforce the single-subscription guarantee by changing a broadcast stream // to single-subscription. if (innerStream.isBroadcast) { - innerStream = innerStream.transform( - const SingleSubscriptionTransformer()); + innerStream = + innerStream.transform(const SingleSubscriptionTransformer()); } - _streamController = new StreamController<T>(onListen: () { - // If the sink has disconnected, we've already called - // [_streamController.close]. - if (_disconnected) return; + _streamController = new StreamController<T>( + onListen: () { + // If the sink has disconnected, we've already called + // [_streamController.close]. + if (_disconnected) return; - _subscription = innerStream.listen(_streamController.add, - onError: _streamController.addError, - onDone: () { + _subscription = innerStream.listen(_streamController.add, + onError: _streamController.addError, onDone: () { _sink._onStreamDisconnected(); _streamController.close(); }); - }, sync: true); + }, + sync: true); } /// Called by [_GuaranteeSink] when the user closes it. @@ -159,10 +160,8 @@ if (_disconnected) return new Future.value(); _addStreamCompleter = new Completer.sync(); - _addStreamSubscription = stream.listen( - _inner.add, - onError: _addError, - onDone: _addStreamCompleter.complete); + _addStreamSubscription = stream.listen(_inner.add, + onError: _addError, onDone: _addStreamCompleter.complete); return _addStreamCompleter.future.then((_) { _addStreamCompleter = null; _addStreamSubscription = null;
diff --git a/pkgs/stream_channel/lib/src/isolate_channel.dart b/pkgs/stream_channel/lib/src/isolate_channel.dart index c725fef..f1328c0 100644 --- a/pkgs/stream_channel/lib/src/isolate_channel.dart +++ b/pkgs/stream_channel/lib/src/isolate_channel.dart
@@ -44,8 +44,8 @@ // value to be an [IsolateChannel]. var streamCompleter = new StreamCompleter<T>(); var sinkCompleter = new StreamSinkCompleter<T>(); - var channel = new IsolateChannel<T>._( - streamCompleter.stream, sinkCompleter.sink); + var channel = + new IsolateChannel<T>._(streamCompleter.stream, sinkCompleter.sink); // The first message across the ReceivePort should be a SendPort pointing to // the remote end. If it's not, we'll make the stream emit an error @@ -56,9 +56,8 @@ var controller = new StreamChannelController<T>( allowForeignErrors: false, sync: true); new SubscriptionStream(subscription).pipe(controller.local.sink); - controller.local.stream.listen( - (data) => message.send(data), - onDone: receivePort.close); + controller.local.stream + .listen((data) => message.send(data), onDone: receivePort.close); streamCompleter.setSourceStream(controller.foreign.stream); sinkCompleter.setDestinationSink(controller.foreign.sink); @@ -93,12 +92,11 @@ /// Creates a stream channel that receives messages from [receivePort] and /// sends them over [sendPort]. factory IsolateChannel(ReceivePort receivePort, SendPort sendPort) { - var controller = new StreamChannelController<T>( - allowForeignErrors: false, sync: true); + var controller = + new StreamChannelController<T>(allowForeignErrors: false, sync: true); receivePort.pipe(controller.local.sink); - controller.local.stream.listen( - (data) => sendPort.send(data), - onDone: receivePort.close); + controller.local.stream + .listen((data) => sendPort.send(data), onDone: receivePort.close); return new IsolateChannel._( controller.foreign.stream, controller.foreign.sink); }
diff --git a/pkgs/stream_channel/lib/src/stream_channel_completer.dart b/pkgs/stream_channel/lib/src/stream_channel_completer.dart index d15adcf..b9881b9 100644 --- a/pkgs/stream_channel/lib/src/stream_channel_completer.dart +++ b/pkgs/stream_channel/lib/src/stream_channel_completer.dart
@@ -42,8 +42,8 @@ } StreamChannelCompleter() { - _channel = new StreamChannel<T>( - _streamCompleter.stream, _sinkCompleter.sink); + _channel = + new StreamChannel<T>(_streamCompleter.stream, _sinkCompleter.sink); } /// Set a channel as the source and destination for [channel].
diff --git a/pkgs/stream_channel/lib/src/stream_channel_transformer.dart b/pkgs/stream_channel/lib/src/stream_channel_transformer.dart index d6d10ff..1a4afca 100644 --- a/pkgs/stream_channel/lib/src/stream_channel_transformer.dart +++ b/pkgs/stream_channel/lib/src/stream_channel_transformer.dart
@@ -58,9 +58,10 @@ /// and all output from its stream is decoded using [Codec.decoder]. StreamChannelTransformer.fromCodec(Codec<S, T> codec) : this( - typedStreamTransformer(codec.decoder), - StreamSinkTransformer.typed( - new StreamSinkTransformer.fromStreamTransformer(codec.encoder))); + typedStreamTransformer(codec.decoder), + StreamSinkTransformer.typed( + new StreamSinkTransformer.fromStreamTransformer( + codec.encoder))); /// Transforms the events sent to and emitted by [channel]. ///
diff --git a/pkgs/stream_channel/lib/src/transformer/typed.dart b/pkgs/stream_channel/lib/src/transformer/typed.dart index f35e01c..9c2d72b 100644 --- a/pkgs/stream_channel/lib/src/transformer/typed.dart +++ b/pkgs/stream_channel/lib/src/transformer/typed.dart
@@ -8,7 +8,7 @@ /// transformer to `S`. class TypeSafeStreamChannelTransformer<S, T> implements StreamChannelTransformer<S, T> { - final StreamChannelTransformer _inner; + final StreamChannelTransformer _inner; TypeSafeStreamChannelTransformer(this._inner);
diff --git a/pkgs/stream_channel/lib/stream_channel.dart b/pkgs/stream_channel/lib/stream_channel.dart index dcbc9a0..8d0e604 100644 --- a/pkgs/stream_channel/lib/stream_channel.dart +++ b/pkgs/stream_channel/lib/stream_channel.dart
@@ -97,8 +97,8 @@ /// transforming it with a [StreamTransformer]. This is a lighter-weight way /// of preserving that guarantee in particular than /// [StreamChannel.withGuarantees]. - factory StreamChannel.withCloseGuarantee(Stream<T> stream, - StreamSink<T> sink) => + factory StreamChannel.withCloseGuarantee( + Stream<T> stream, StreamSink<T> sink) => new CloseGuaranteeChannel(stream, sink); /// Connects [this] to [other], so that any values emitted by either are sent @@ -108,8 +108,7 @@ /// Transforms [this] using [transformer]. /// /// This is identical to calling `transformer.bind(channel)`. - StreamChannel<S> transform<S>( - StreamChannelTransformer<S, T> transformer); + StreamChannel<S> transform<S>(StreamChannelTransformer<S, T> transformer); /// Transforms only the [stream] component of [this] using [transformer]. StreamChannel<T> transformStream(StreamTransformer<T, T> transformer); @@ -153,8 +152,7 @@ other.stream.pipe(sink); } - StreamChannel<S> transform<S>( - StreamChannelTransformer<S, T> transformer) => + StreamChannel<S> transform<S>(StreamChannelTransformer<S, T> transformer) => transformer.bind(this); StreamChannel<T> transformStream(StreamTransformer<T, T> transformer) =>
diff --git a/pkgs/stream_channel/test/disconnector_test.dart b/pkgs/stream_channel/test/disconnector_test.dart index 311a41c..e194ee3 100644 --- a/pkgs/stream_channel/test/disconnector_test.dart +++ b/pkgs/stream_channel/test/disconnector_test.dart
@@ -84,14 +84,16 @@ var sinkController = new StreamController(); var disconnector = new Disconnector(); var sink = new _CloseCompleterSink(sinkController.sink); - var channel = new StreamChannel.withGuarantees( - streamController.stream, sink) - .transform(disconnector); + var channel = + new StreamChannel.withGuarantees(streamController.stream, sink) + .transform(disconnector); var disconnectFutureFired = false; - expect(disconnector.disconnect().then((_) { - disconnectFutureFired = true; - }), completes); + expect( + disconnector.disconnect().then((_) { + disconnectFutureFired = true; + }), + completes); // Give the future time to fire early if it's going to. await pumpEventQueue(); @@ -114,7 +116,7 @@ channel.sink.add(2); channel.sink.add(3); channel.sink.close(); - + expect(sinkController.stream.toList(), completion(isEmpty)); }); @@ -126,7 +128,7 @@ sinkController.stream.listen(null); // Work around sdk#19095. expect(channel.sink.done, completes); }); - + test("still emits state errors after explicit close", () { sinkController.stream.listen(null); // Work around sdk#19095. expect(channel.sink.close(), completes);
diff --git a/pkgs/stream_channel/test/isolate_channel_test.dart b/pkgs/stream_channel/test/isolate_channel_test.dart index fa4d8d5..7316c6a 100644 --- a/pkgs/stream_channel/test/isolate_channel_test.dart +++ b/pkgs/stream_channel/test/isolate_channel_test.dart
@@ -64,7 +64,8 @@ }); group("stream channel rules", () { - test("closing the sink causes the stream to close before it emits any more " + test( + "closing the sink causes the stream to close before it emits any more " "events", () { sendPort.send(1); sendPort.send(2);
diff --git a/pkgs/stream_channel/test/json_document_transformer_test.dart b/pkgs/stream_channel/test/json_document_transformer_test.dart index 6f18e36..22bb830 100644 --- a/pkgs/stream_channel/test/json_document_transformer_test.dart +++ b/pkgs/stream_channel/test/json_document_transformer_test.dart
@@ -15,8 +15,8 @@ setUp(() { streamController = new StreamController(); sinkController = new StreamController(); - channel = new StreamChannel<String>( - streamController.stream, sinkController.sink); + channel = + new StreamChannel<String>(streamController.stream, sinkController.sink); }); test("decodes JSON emitted by the channel", () {
diff --git a/pkgs/stream_channel/test/multi_channel_test.dart b/pkgs/stream_channel/test/multi_channel_test.dart index 0e58d38..69059b6 100644 --- a/pkgs/stream_channel/test/multi_channel_test.dart +++ b/pkgs/stream_channel/test/multi_channel_test.dart
@@ -47,7 +47,8 @@ channel1.sink.close(); }); - test("doesn't closes the local virtual channel when the stream " + test( + "doesn't closes the local virtual channel when the stream " "subscription is canceled", () { channel1.sink.done.then(expectAsync((_) {}, count: 0)); @@ -58,7 +59,8 @@ return pumpEventQueue(); }); - test("closes the underlying channel when it closes without any other " + test( + "closes the underlying channel when it closes without any other " "virtual channels", () { expect(controller.local.sink.done, completes); expect(controller.foreign.sink.done, completes); @@ -66,7 +68,8 @@ channel1.sink.close(); }); - test("doesn't close the underlying channel when it closes with other " + test( + "doesn't close the underlying channel when it closes with other " "virtual channels", () { controller.local.sink.done.then(expectAsync((_) {}, count: 0)); controller.foreign.sink.done.then(expectAsync((_) {}, count: 0)); @@ -126,7 +129,8 @@ virtual1.sink.close(); }); - test("doesn't closes the local virtual channel when the stream " + test( + "doesn't closes the local virtual channel when the stream " "subscription is canceled", () { virtual1.sink.done.then(expectAsync((_) {}, count: 0)); virtual1.stream.listen((_) {}).cancel(); @@ -136,7 +140,8 @@ return pumpEventQueue(); }); - test("closes the underlying channel when it closes without any other " + test( + "closes the underlying channel when it closes without any other " "virtual channels", () async { // First close the default channel so we can test the new channel as the // last living virtual channel. @@ -149,7 +154,8 @@ virtual1.sink.close(); }); - test("doesn't close the underlying channel when it closes with other " + test( + "doesn't close the underlying channel when it closes with other " "virtual channels", () { controller.local.sink.done.then(expectAsync((_) {}, count: 0)); controller.foreign.sink.done.then(expectAsync((_) {}, count: 0)); @@ -169,10 +175,10 @@ // we're properly testing two channels with the same id. expect(virtual1.id, equals(virtual3.id)); - virtual2.stream.listen( - expectAsync((message) => expect(message, equals("hello")))); - virtual4.stream.listen( - expectAsync((message) => expect(message, equals("goodbye")))); + virtual2.stream + .listen(expectAsync((message) => expect(message, equals("hello")))); + virtual4.stream + .listen(expectAsync((message) => expect(message, equals("goodbye")))); virtual1.sink.add("hello"); virtual3.sink.add("goodbye"); @@ -223,7 +229,8 @@ virtual2.sink.close(); }); - test("doesn't closes the local virtual channel when the stream " + test( + "doesn't closes the local virtual channel when the stream " "subscription is canceled", () { virtual2.sink.done.then(expectAsync((_) {}, count: 0)); virtual2.stream.listen((_) {}).cancel(); @@ -233,7 +240,8 @@ return pumpEventQueue(); }); - test("closes the underlying channel when it closes without any other " + test( + "closes the underlying channel when it closes without any other " "virtual channels", () async { // First close the default channel so we can test the new channel as the // last living virtual channel. @@ -246,7 +254,8 @@ virtual2.sink.close(); }); - test("doesn't close the underlying channel when it closes with other " + test( + "doesn't close the underlying channel when it closes with other " "virtual channels", () { controller.local.sink.done.then(expectAsync((_) {}, count: 0)); controller.foreign.sink.done.then(expectAsync((_) {}, count: 0)); @@ -259,8 +268,7 @@ }); test("doesn't allow another virtual channel with the same id", () { - expect(() => channel2.virtualChannel(virtual1.id), - throwsArgumentError); + expect(() => channel2.virtualChannel(virtual1.id), throwsArgumentError); }); }); @@ -314,7 +322,8 @@ group("stream channel rules", () { group("for the main stream:", () { - test("closing the sink causes the stream to close before it emits any more " + test( + "closing the sink causes the stream to close before it emits any more " "events", () { channel1.sink.add(1); channel1.sink.add(2); @@ -380,7 +389,8 @@ virtual2 = channel2.virtualChannel(virtual1.id); }); - test("closing the sink causes the stream to close before it emits any more " + test( + "closing the sink causes the stream to close before it emits any more " "events", () { virtual1.sink.add(1); virtual1.sink.add(2);
diff --git a/pkgs/stream_channel/test/stream_channel_completer_test.dart b/pkgs/stream_channel/test/stream_channel_completer_test.dart index 1ee40a5..e08dd8a 100644 --- a/pkgs/stream_channel/test/stream_channel_completer_test.dart +++ b/pkgs/stream_channel/test/stream_channel_completer_test.dart
@@ -18,8 +18,8 @@ completer = new StreamChannelCompleter(); streamController = new StreamController(); sinkController = new StreamController(); - innerChannel = new StreamChannel( - streamController.stream, sinkController.sink); + innerChannel = + new StreamChannel(streamController.stream, sinkController.sink); }); group("when a channel is set before accessing", () { @@ -98,8 +98,8 @@ group("forFuture", () { test("forwards a StreamChannel", () { - var channel = StreamChannelCompleter.fromFuture( - new Future.value(innerChannel)); + var channel = + StreamChannelCompleter.fromFuture(new Future.value(innerChannel)); channel.sink.add(1); channel.sink.close(); streamController.sink.add(2); @@ -110,8 +110,8 @@ }); test("forwards an error", () { - var channel = StreamChannelCompleter.fromFuture( - new Future.error("oh no")); + var channel = + StreamChannelCompleter.fromFuture(new Future.error("oh no")); expect(channel.stream.toList(), throwsA("oh no")); }); });
diff --git a/pkgs/stream_channel/test/stream_channel_controller_test.dart b/pkgs/stream_channel/test/stream_channel_controller_test.dart index 7503387..8162a17 100644 --- a/pkgs/stream_channel/test/stream_channel_controller_test.dart +++ b/pkgs/stream_channel/test/stream_channel_controller_test.dart
@@ -13,16 +13,25 @@ }); test("forwards events from the local sink to the foreign stream", () { - controller.local.sink..add(1)..add(2)..add(3)..close(); + controller.local.sink + ..add(1) + ..add(2) + ..add(3) + ..close(); expect(controller.foreign.stream.toList(), completion(equals([1, 2, 3]))); }); test("forwards events from the foreign sink to the local stream", () { - controller.foreign.sink..add(1)..add(2)..add(3)..close(); + controller.foreign.sink + ..add(1) + ..add(2) + ..add(3) + ..close(); expect(controller.local.stream.toList(), completion(equals([1, 2, 3]))); }); - test("with allowForeignErrors: false, shuts down the connection if an " + test( + "with allowForeignErrors: false, shuts down the connection if an " "error is added to the foreign channel", () { controller = new StreamChannelController(allowForeignErrors: false); @@ -40,7 +49,8 @@ controller = new StreamChannelController(sync: true); }); - test("synchronously forwards events from the local sink to the foreign " + test( + "synchronously forwards events from the local sink to the foreign " "stream", () { var receivedEvent = false; var receivedError = false; @@ -65,7 +75,8 @@ expect(receivedDone, isTrue); }); - test("synchronously forwards events from the foreign sink to the local " + test( + "synchronously forwards events from the foreign sink to the local " "stream", () { var receivedEvent = false; var receivedError = false;
diff --git a/pkgs/stream_channel/test/stream_channel_test.dart b/pkgs/stream_channel/test/stream_channel_test.dart index 41f4c32..f0a8383 100644 --- a/pkgs/stream_channel/test/stream_channel_test.dart +++ b/pkgs/stream_channel/test/stream_channel_test.dart
@@ -16,8 +16,7 @@ setUp(() { streamController = new StreamController(); sinkController = new StreamController(); - channel = new StreamChannel( - streamController.stream, sinkController.sink); + channel = new StreamChannel(streamController.stream, sinkController.sink); }); test("pipe() pipes data from each channel's stream into the other's sink", @@ -42,8 +41,8 @@ }); test("transform() transforms the channel", () async { - var transformed = channel.transform( - new StreamChannelTransformer.fromCodec(UTF8)); + var transformed = + channel.transform(new StreamChannelTransformer.fromCodec(UTF8)); streamController.add([102, 111, 111, 98, 97, 114]); streamController.close(); @@ -51,8 +50,11 @@ transformed.sink.add("fblthp"); transformed.sink.close(); - expect(sinkController.stream.toList(), - completion(equals([[102, 98, 108, 116, 104, 112]]))); + expect( + sinkController.stream.toList(), + completion(equals([ + [102, 98, 108, 116, 104, 112] + ]))); }); test("transformStream() transforms only the stream", () async { @@ -64,8 +66,7 @@ transformed.sink.add("fblthp"); transformed.sink.close(); - expect(sinkController.stream.toList(), - completion(equals(["fblthp"]))); + expect(sinkController.stream.toList(), completion(equals(["fblthp"]))); }); test("transformSink() transforms only the sink", () async { @@ -74,13 +75,19 @@ streamController.add([102, 111, 111, 98, 97, 114]); streamController.close(); - expect(await transformed.stream.toList(), - equals([[102, 111, 111, 98, 97, 114]])); + expect( + await transformed.stream.toList(), + equals([ + [102, 111, 111, 98, 97, 114] + ])); transformed.sink.add("fblthp"); transformed.sink.close(); - expect(sinkController.stream.toList(), - completion(equals([[102, 98, 108, 116, 104, 112]]))); + expect( + sinkController.stream.toList(), + completion(equals([ + [102, 98, 108, 116, 104, 112] + ]))); }); test("changeStream() changes the stream", () {
diff --git a/pkgs/stream_channel/test/utils.dart b/pkgs/stream_channel/test/utils.dart index 130a3e1..e533895 100644 --- a/pkgs/stream_channel/test/utils.dart +++ b/pkgs/stream_channel/test/utils.dart
@@ -9,7 +9,7 @@ /// /// By default, this should pump the event queue enough times to allow any code /// to run, as long as it's not waiting on some external event. -Future pumpEventQueue([int times=20]) { +Future pumpEventQueue([int times = 20]) { if (times == 0) return new Future.value(); // Use [new Future] future to allow microtask events to finish. The [new // Future.value] constructor uses scheduleMicrotask itself and would therefore @@ -17,4 +17,3 @@ // method. return new Future(() => pumpEventQueue(times - 1)); } -
diff --git a/pkgs/stream_channel/test/with_close_guarantee_test.dart b/pkgs/stream_channel/test/with_close_guarantee_test.dart index caf48cf..dafbfef 100644 --- a/pkgs/stream_channel/test/with_close_guarantee_test.dart +++ b/pkgs/stream_channel/test/with_close_guarantee_test.dart
@@ -35,26 +35,30 @@ channel = new StreamChannel.withCloseGuarantee(stream, sink); }); - test("closing the event sink causes the stream to close before it emits any " + test( + "closing the event sink causes the stream to close before it emits any " "more events", () async { controller.local.sink.add(1); controller.local.sink.add(2); controller.local.sink.add(3); - expect(channel.stream.listen(expectAsync((event) { - if (event == 2) channel.sink.close(); - }, count: 2)).asFuture(), completes); + expect( + channel.stream + .listen(expectAsync((event) { + if (event == 2) channel.sink.close(); + }, count: 2)) + .asFuture(), + completes); await pumpEventQueue(); }); - test("closing the event sink before events are emitted causes the stream to " + test( + "closing the event sink before events are emitted causes the stream to " "close immediately", () async { channel.sink.close(); - channel.stream.listen( - expectAsync((_) {}, count: 0), - onError: expectAsync((_, __) {}, count: 0), - onDone: expectAsync(() {})); + channel.stream.listen(expectAsync((_) {}, count: 0), + onError: expectAsync((_, __) {}, count: 0), onDone: expectAsync(() {})); controller.local.sink.add(1); controller.local.sink.add(2);
diff --git a/pkgs/stream_channel/test/with_guarantees_test.dart b/pkgs/stream_channel/test/with_guarantees_test.dart index 409b28f..b8156f2 100644 --- a/pkgs/stream_channel/test/with_guarantees_test.dart +++ b/pkgs/stream_channel/test/with_guarantees_test.dart
@@ -43,15 +43,20 @@ }); }); - test("closing the event sink causes the stream to close before it emits any " + test( + "closing the event sink causes the stream to close before it emits any " "more events", () { streamController.add(1); streamController.add(2); streamController.add(3); - expect(channel.stream.listen(expectAsync((event) { - if (event == 2) channel.sink.close(); - }, count: 2)).asFuture(), completes); + expect( + channel.stream + .listen(expectAsync((event) { + if (event == 2) channel.sink.close(); + }, count: 2)) + .asFuture(), + completes); }); test("after the stream closes, the sink ignores events", () async { @@ -65,8 +70,7 @@ channel.sink.close(); // None of our channel.sink additions should make it to the other endpoint. - sinkController.stream.listen( - expectAsync((_) {}, count: 0), + sinkController.stream.listen(expectAsync((_) {}, count: 0), onDone: expectAsync(() {}, count: 0)); await pumpEventQueue(); }); @@ -97,8 +101,7 @@ channel.sink.close(); // The sink should be ignoring events because the stream closed. - sinkController.stream.listen( - expectAsync((_) {}, count: 0), + sinkController.stream.listen(expectAsync((_) {}, count: 0), onDone: expectAsync(() {}, count: 0)); await pumpEventQueue(); }); @@ -142,14 +145,15 @@ streamController = new StreamController(); sinkController = new StreamController(); channel = new StreamChannel.withGuarantees( - streamController.stream, sinkController.sink, allowSinkErrors: false); + streamController.stream, sinkController.sink, + allowSinkErrors: false); }); test("forwards errors to Sink.done but not the stream", () { channel.sink.addError("oh no"); expect(channel.sink.done, throwsA("oh no")); - sinkController.stream.listen(null, - onError: expectAsync((_) {}, count: 0)); + sinkController.stream + .listen(null, onError: expectAsync((_) {}, count: 0)); }); test("adding an error causes the stream to emit a done event", () { @@ -159,9 +163,13 @@ streamController.add(2); streamController.add(3); - expect(channel.stream.listen(expectAsync((event) { - if (event == 2) channel.sink.addError("oh no"); - }, count: 2)).asFuture(), completes); + expect( + channel.stream + .listen(expectAsync((event) { + if (event == 2) channel.sink.addError("oh no"); + }, count: 2)) + .asFuture(), + completes); }); test("adding an error closes the inner sink", () { @@ -170,7 +178,8 @@ expect(sinkController.stream.toList(), completion(isEmpty)); }); - test("adding an error via via addStream causes the stream to emit a done " + test( + "adding an error via via addStream causes the stream to emit a done " "event", () async { var canceled = false; var controller = new StreamController(onCancel: () {