blob: 33296da433ae6d06904e540c4b09259acb68984a [file] [log] [blame]
// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'package:async/async.dart';
import '../stream_channel.dart';
/// A [StreamChannel] that specifically enforces the stream channel guarantee
/// that closing the sink causes the stream to close before it emits any more
/// events
///
/// This is exposed via [new StreamChannel.withCloseGuarantee].
class CloseGuaranteeChannel<T> extends StreamChannelMixin<T> {
@override
Stream<T> get stream => _stream;
_CloseGuaranteeStream<T> _stream;
@override
StreamSink<T> get sink => _sink;
_CloseGuaranteeSink<T> _sink;
/// The subscription to the inner stream.
StreamSubscription<T> _subscription;
/// Whether the sink has closed, causing the underlying channel to disconnect.
bool _disconnected = false;
CloseGuaranteeChannel(Stream<T> innerStream, StreamSink<T> innerSink) {
_sink = _CloseGuaranteeSink<T>(innerSink, this);
_stream = _CloseGuaranteeStream<T>(innerStream, this);
}
}
/// The stream for [CloseGuaranteeChannel].
///
/// This wraps the inner stream to save the subscription on the channel when
/// [listen] is called.
class _CloseGuaranteeStream<T> extends Stream<T> {
/// The inner stream this is delegating to.
final Stream<T> _inner;
/// The [CloseGuaranteeChannel] this belongs to.
final CloseGuaranteeChannel<T> _channel;
_CloseGuaranteeStream(this._inner, this._channel);
@override
StreamSubscription<T> listen(void onData(T event),
{Function onError, void onDone(), bool cancelOnError}) {
// If the channel is already disconnected, we shouldn't dispatch anything
// but a done event.
if (_channel._disconnected) {
onData = null;
onError = null;
}
var subscription = _inner.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
if (!_channel._disconnected) {
_channel._subscription = subscription;
}
return subscription;
}
}
/// The sink for [CloseGuaranteeChannel].
///
/// This wraps the inner sink to cancel the stream subscription when the sink is
/// canceled.
class _CloseGuaranteeSink<T> extends DelegatingStreamSink<T> {
/// The [CloseGuaranteeChannel] this belongs to.
final CloseGuaranteeChannel<T> _channel;
_CloseGuaranteeSink(StreamSink<T> inner, this._channel) : super(inner);
@override
Future<void> close() {
var done = super.close();
_channel._disconnected = true;
if (_channel._subscription != null) {
// Don't dispatch anything but a done event.
_channel._subscription.onData(null);
_channel._subscription.onError(null);
}
return done;
}
}