blob: 4f20686b5ff80d08625146f4767395f675eacd4c [file] [log] [blame]
// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'dart:convert';
import 'utils.dart';
/// A class for managing a stream of input messages and a sink for output
/// messages.
///
/// This contains stream logic that's shared between [Server] and [Client].
class TwoWayStream {
/// The name of the component whose streams are being managed (e.g. "Server").
///
/// Used for error reporting.
final String _name;
/// The input stream.
///
/// This is a stream of decoded JSON objects.
final Stream _input;
/// The subscription to [_input].
StreamSubscription _inputSubscription;
/// The output sink.
///
/// This takes decoded JSON objects.
final StreamSink _output;
/// Returns a [Future] that completes when the connection is closed.
///
/// This is the same future that's returned by [listen].
Future get done => _doneCompleter.future;
final _doneCompleter = new Completer();
/// Whether the stream has been closed.
bool get isClosed => _doneCompleter.isCompleted;
/// Creates a two-way stream.
///
/// [input] and [output] should emit and take (respectively) JSON-encoded
/// strings.
///
/// [inputName] is used in error messages as the name of the input parameter.
/// [outputName] is likewise used as the name of the output parameter.
///
/// If [onInvalidInput] is passed, any errors parsing messages from [input]
/// are passed to it. Otherwise, they're ignored and the input is discarded.
factory TwoWayStream(String name, Stream<String> input, String inputName,
StreamSink<String> output, String outputName,
{void onInvalidInput(String message, FormatException error)}) {
if (output == null) {
if (input is! StreamSink) {
throw new ArgumentError("Either `$inputName` must be a StreamSink or "
"`$outputName` must be passed.");
}
output = input as StreamSink;
}
var wrappedOutput = mapStreamSink(output, JSON.encode);
return new TwoWayStream.withoutJson(name, input.expand((message) {
var decodedMessage;
try {
decodedMessage = JSON.decode(message);
} on FormatException catch (error) {
if (onInvalidInput != null) onInvalidInput(message, error);
return [];
}
return [decodedMessage];
}), inputName, wrappedOutput, outputName);
}
/// Creates a two-way stream that reads decoded input and writes decoded
/// responses.
///
/// [input] and [output] should emit and take (respectively) decoded JSON
/// objects.
///
/// [inputName] is used in error messages as the name of the input parameter.
/// [outputName] is likewise used as the name of the output parameter.
TwoWayStream.withoutJson(this._name, Stream input, String inputName,
StreamSink output, String outputName)
: _input = input,
_output = output == null && input is StreamSink ? input : output {
if (_output == null) {
throw new ArgumentError("Either `$inputName` must be a StreamSink or "
"`$outputName` must be passed.");
}
}
/// Starts listening to the input stream.
///
/// The returned Future will complete when the input stream is closed. If the
/// input stream emits an error, that will be piped to the returned Future.
Future listen(void handleInput(input)) {
if (_inputSubscription != null) {
throw new StateError("Can only call $_name.listen once.");
}
_inputSubscription = _input.listen(handleInput,
onError: (error, stackTrace) {
if (_doneCompleter.isCompleted) return;
_doneCompleter.completeError(error, stackTrace);
_output.close();
}, onDone: () {
if (_doneCompleter.isCompleted) return;
_doneCompleter.complete();
_output.close();
}, cancelOnError: true);
return _doneCompleter.future;
}
/// Emit [event] on the output stream.
void add(event) => _output.add(event);
/// Stops listening to the input stream and closes the output stream.
Future close() {
if (_inputSubscription == null) {
throw new StateError("Can't call $_name.close before $_name.listen.");
}
if (!_doneCompleter.isCompleted) _doneCompleter.complete();
var inputFuture = _inputSubscription.cancel();
// TODO(nweiz): include the output future in the return value when issue
// 19095 is fixed.
_output.close();
return inputFuture == null ? new Future.value() : inputFuture;
}
}