Allow a type on MultiChannel (#26)
- Add generic type arguments to the classes and relevant fields.
- Explicitly indicate which arguments take a `StreamChannel<dynamic>`.
- Update the test for MultiChannel to use a consistent type.
diff --git a/CHANGELOG.md b/CHANGELOG.md
index eafdb81..b284e5e 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,6 +1,7 @@
## 1.6.7
* Update SDK version to 2.0.0-dev.17.0.
+* Add a type argument to `MultiChannel`.
## 1.6.6
diff --git a/lib/src/multi_channel.dart b/lib/src/multi_channel.dart
index e541543..e7e2584 100644
--- a/lib/src/multi_channel.dart
+++ b/lib/src/multi_channel.dart
@@ -41,24 +41,25 @@
///
/// Each virtual channel may be closed individually. When all of them are
/// closed, the underlying [StreamSink] is closed automatically.
-abstract class MultiChannel implements StreamChannel {
+abstract class MultiChannel<T> implements StreamChannel<T> {
/// The default input stream.
///
/// This connects to the remote [sink].
- Stream get stream;
+ Stream<T> get stream;
/// The default output stream.
///
/// This connects to the remote [stream]. If this is closed, the remote
/// [stream] will close, but other virtual channels will remain open and new
/// virtual channels may be opened.
- StreamSink get sink;
+ StreamSink<T> get sink;
/// Creates a new [MultiChannel] that sends and receives messages over
/// [inner].
///
/// The inner channel must take JSON-like objects.
- factory MultiChannel(StreamChannel inner) => new _MultiChannel(inner);
+ factory MultiChannel(StreamChannel<dynamic> inner) =>
+ new _MultiChannel<T>(inner);
/// Creates a new virtual channel.
///
@@ -71,31 +72,32 @@
///
/// Throws an [ArgumentError] if a virtual channel already exists for [id].
/// Throws a [StateError] if the underlying channel is closed.
- VirtualChannel virtualChannel([id]);
+ VirtualChannel<T> virtualChannel([id]);
}
/// The implementation of [MultiChannel].
///
/// This is private so that [VirtualChannel] can inherit from [MultiChannel]
/// without having to implement all the private members.
-class _MultiChannel extends StreamChannelMixin implements MultiChannel {
+class _MultiChannel<T> extends StreamChannelMixin<T>
+ implements MultiChannel<T> {
/// The inner channel over which all communication is conducted.
///
/// This will be `null` if the underlying communication channel is closed.
- StreamChannel _inner;
+ StreamChannel<dynamic> _inner;
/// The subscription to [_inner.stream].
- StreamSubscription _innerStreamSubscription;
+ StreamSubscription<dynamic> _innerStreamSubscription;
- Stream get stream => _mainController.foreign.stream;
- StreamSink get sink => _mainController.foreign.sink;
+ Stream<T> get stream => _mainController.foreign.stream;
+ StreamSink<T> get sink => _mainController.foreign.sink;
/// The controller for this channel.
- final _mainController = new StreamChannelController(sync: true);
+ final _mainController = new StreamChannelController<T>(sync: true);
/// A map from input IDs to [StreamChannelController]s that should be used to
/// communicate over those channels.
- final _controllers = <int, StreamChannelController>{};
+ final _controllers = <int, StreamChannelController<T>>{};
/// Input IDs of controllers in [_controllers] that we've received messages
/// for but that have not yet had a local [virtualChannel] created.
@@ -164,7 +166,7 @@
onError: _mainController.local.sink.addError);
}
- VirtualChannel virtualChannel([id]) {
+ VirtualChannel<T> virtualChannel([id]) {
var inputId;
var outputId;
if (id != null) {
@@ -189,7 +191,7 @@
this, inputId, new Stream.empty(), new NullStreamSink());
}
- StreamChannelController controller;
+ StreamChannelController<T> controller;
if (_pendingIds.remove(inputId)) {
// If we've already received messages for this channel, use the controller
// where those messages are buffered.
@@ -244,9 +246,10 @@
/// This implements [MultiChannel] for convenience.
/// [VirtualChannel.virtualChannel] is semantically identical to the parent's
/// [MultiChannel.virtualChannel].
-class VirtualChannel extends StreamChannelMixin implements MultiChannel {
+class VirtualChannel<T> extends StreamChannelMixin<T>
+ implements MultiChannel<T> {
/// The [MultiChannel] that created this.
- final MultiChannel _parent;
+ final MultiChannel<T> _parent;
/// The identifier for this channel.
///
@@ -255,10 +258,10 @@
/// except that it will be JSON-serializable.
final id;
- final Stream stream;
- final StreamSink sink;
+ final Stream<T> stream;
+ final StreamSink<T> sink;
VirtualChannel._(this._parent, this.id, this.stream, this.sink);
- VirtualChannel virtualChannel([id]) => _parent.virtualChannel(id);
+ VirtualChannel<T> virtualChannel([id]) => _parent.virtualChannel(id);
}
diff --git a/pubspec.yaml b/pubspec.yaml
index 47e24c2..bbb29d8 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
name: stream_channel
-version: 1.6.7-dev
+version: 1.6.7
description: An abstraction for two-way communication channels.
author: Dart Team <misc@dartlang.org>
homepage: https://github.com/dart-lang/stream_channel
diff --git a/test/multi_channel_test.dart b/test/multi_channel_test.dart
index 0c9316a..99a53c3 100644
--- a/test/multi_channel_test.dart
+++ b/test/multi_channel_test.dart
@@ -11,8 +11,8 @@
var channel2;
setUp(() {
controller = new StreamChannelController();
- channel1 = new MultiChannel(controller.local);
- channel2 = new MultiChannel(controller.foreign);
+ channel1 = new MultiChannel<int>(controller.local);
+ channel2 = new MultiChannel<int>(controller.foreign);
});
group("the default virtual channel", () {
@@ -20,15 +20,15 @@
var first = true;
channel2.stream.listen(expectAsync1((message) {
if (first) {
- expect(message, equals("hello"));
+ expect(message, equals(1));
first = false;
} else {
- expect(message, equals("world"));
+ expect(message, equals(2));
}
}, count: 2));
- channel1.sink.add("hello");
- channel1.sink.add("world");
+ channel1.sink.add(1);
+ channel1.sink.add(2);
});
test("closes the remote virtual channel when it closes", () {
@@ -95,10 +95,10 @@
var first = true;
virtual2.stream.listen(expectAsync1((message) {
if (first) {
- expect(message, equals("hello"));
+ expect(message, equals(1));
first = false;
} else {
- expect(message, equals("world"));
+ expect(message, equals(2));
}
}, count: 2));
@@ -109,8 +109,8 @@
}
channel2.stream.listen(expectAsync1((_) {}, count: 0));
- virtual1.sink.add("hello");
- virtual1.sink.add("world");
+ virtual1.sink.add(1);
+ virtual1.sink.add(2);
});
test("closes the remote virtual channel when it closes", () {
@@ -174,12 +174,12 @@
expect(virtual1.id, equals(virtual3.id));
virtual2.stream
- .listen(expectAsync1((message) => expect(message, equals("hello"))));
- virtual4.stream.listen(
- expectAsync1((message) => expect(message, equals("goodbye"))));
+ .listen(expectAsync1((message) => expect(message, equals(1))));
+ virtual4.stream
+ .listen(expectAsync1((message) => expect(message, equals(2))));
- virtual1.sink.add("hello");
- virtual3.sink.add("goodbye");
+ virtual1.sink.add(1);
+ virtual3.sink.add(2);
});
});
@@ -195,10 +195,10 @@
var first = true;
virtual1.stream.listen(expectAsync1((message) {
if (first) {
- expect(message, equals("hello"));
+ expect(message, equals(1));
first = false;
} else {
- expect(message, equals("world"));
+ expect(message, equals(2));
}
}, count: 2));
@@ -209,8 +209,8 @@
}
channel1.stream.listen(expectAsync1((_) {}, count: 0));
- virtual2.sink.add("hello");
- virtual2.sink.add("world");
+ virtual2.sink.add(1);
+ virtual2.sink.add(2);
});
test("closes the remote virtual channel when it closes", () {
@@ -273,14 +273,13 @@
() async {
virtual1 = channel1.virtualChannel();
- virtual1.sink.add("hello");
+ virtual1.sink.add(1);
await pumpEventQueue();
- virtual1.sink.add("world");
+ virtual1.sink.add(2);
await pumpEventQueue();
- expect(channel2.virtualChannel(virtual1.id).stream,
- emitsInOrder(["hello", "world"]));
+ expect(channel2.virtualChannel(virtual1.id).stream, emitsInOrder([1, 2]));
});
test(