blob: ace6f92f696f5e1e56ae1c01f18cf24d8269aed7 [file] [log] [blame]
// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
part of;
* Helper class to wrap a [StreamConsumer<List<int>, T>] and provide utility
* functions for writing to the StreamConsumer directly. The [IOSink]
* buffers the input given by [add] and [addString] and will delay a [consume]
* or [addStream] until the buffer is flushed.
* When the [IOSink] is bound to a stream (through either [consume]
* or [addStream]) any call to the [IOSink] will throw a
* [StateError].
abstract class IOSink<T> implements StreamConsumer<List<int>, T> {
factory IOSink(StreamConsumer<List<int>, T> target)
=> new _IOSinkImpl(target);
* Provide functionality for piping to the [IOSink].
Future<T> consume(Stream<List<int>> stream);
* Like [consume], but will not close the target when done.
Future<T> addStream(Stream<List<int>> stream);
* Write a list of bytes to the target.
void add(List<int> data);
* Write a String to the target.
void addString(String string, [Encoding encoding = Encoding.UTF_8]);
* Close the target.
void close();
* Get future that will complete when all data has been written to
* the IOSink and it has been closed.
Future<T> get done;
class _IOSinkImpl<T> implements IOSink<T> {
final StreamConsumer<List<int>, T> _target;
StreamController<List<int>> _controllerInstance;
Future<T> _pipeFuture;
StreamSubscription<List<int>> _bindSubscription;
bool _paused = true;
_IOSinkImpl(StreamConsumer<List<int>, T> target) : _target = target;
Future<T> consume(Stream<List<int>> stream) {
if (_isBound) {
throw new StateError("IOSink is already bound to a stream");
return _fillFromStream(stream);
Future<T> addStream(Stream<List<int>> stream) {
if (_isBound) {
throw new StateError("IOSink is already bound to a stream");
return _fillFromStream(stream, unbind: true);
void add(List<int> data) {
if (_isBound) {
throw new StateError("IOSink is already bound to a stream");
void addString(String string, [Encoding encoding = Encoding.UTF_8]) {
add(_encodeString(string, encoding));
void close() {
if (_isBound) {
throw new StateError("IOSink is already bound to a stream");
Future<T> get done {
return _pipeFuture.then((_) => this);
StreamController<List<int>> get _controller {
if (_controllerInstance == null) {
_controllerInstance = new StreamController<List<int>>(
onPauseStateChange: _onPauseStateChange,
onSubscriptionStateChange: _onSubscriptionStateChange);
_pipeFuture = => this);
return _controllerInstance;
bool get _isBound => _bindSubscription != null;
void _onPauseStateChange() {
_paused = _controller.isPaused;
if (_controller.isPaused) {
} else {
void _pause() {
if (_bindSubscription != null) {
try {
// The subscription can be canceled at this point.
} catch (e) {
void _resume() {
if (_bindSubscription != null) {
try {
// The subscription can be canceled at this point.
} catch (e) {
void _onSubscriptionStateChange() {
if (_controller.hasSubscribers) {
_paused = false;
} else {
if (_bindSubscription != null) {
_bindSubscription = null;
Future<T> _fillFromStream(Stream<List<int>> stream, {unbind: false}) {
Completer<T> unbindCompleter;
if (unbind) {
unbindCompleter = new Completer<T>();
completeUnbind([error]) {
if (unbindCompleter == null) return;
var tmp = unbindCompleter;
unbindCompleter = null;
if (error == null) {
_bindSubscription = null;
} else {
_bindSubscription = stream.listen(
onDone: () {
if (unbind) {
} else {
onError: _controller.signalError);
if (_paused) _pause();
if (unbind) {
.then((_) => completeUnbind(),
onError: (error) => completeUnbind(error));
return unbindCompleter.future;
} else {
return _pipeFuture.then((_) => this);