null safety wip
diff --git a/lib/client/sse_client.dart b/lib/client/sse_client.dart
index 00def10..e847f4b 100644
--- a/lib/client/sse_client.dart
+++ b/lib/client/sse_client.dart
@@ -26,11 +26,11 @@
int _lastMessageId = -1;
- EventSource _eventSource;
+ late final EventSource _eventSource;
- String _serverUrl;
+ late final String _serverUrl;
- Timer _errorTimer;
+ Timer? _errorTimer;
/// [serverUrl] is the URL under which the server is listening for
/// incoming bi-directional SSE connections.
@@ -114,7 +114,7 @@
}
void _onOutgoingMessage(String message) async {
- String encodedMessage;
+ String? encodedMessage;
try {
encodedMessage = jsonEncode(message);
} on JsonUnsupportedObjectError catch (e) {
diff --git a/lib/src/server/sse_handler.dart b/lib/src/server/sse_handler.dart
index 8dec99e..ab3f938 100644
--- a/lib/src/server/sse_handler.dart
+++ b/lib/src/server/sse_handler.dart
@@ -24,6 +24,7 @@
class _SseMessage {
final int id;
final String message;
+
_SseMessage(this.id, this.message);
}
@@ -38,10 +39,10 @@
Sink _sink;
/// How long to wait after a connection drops before considering it closed.
- final Duration _keepAlive;
+ final Duration? _keepAlive;
/// A timer counting down the KeepAlive period (null if hasn't disconnected).
- Timer _keepAliveTimer;
+ Timer? _keepAliveTimer;
/// Whether this connection is currently in the KeepAlive timeout period.
bool get isInKeepAlivePeriod => _keepAliveTimer?.isActive ?? false;
@@ -57,7 +58,7 @@
/// Wraps the `_outgoingController.stream` to buffer events to enable keep
/// alive.
- StreamQueue _outgoingStreamQueue;
+ late final StreamQueue _outgoingStreamQueue;
/// Creates an [SseConnection] for the supplied [_sink].
///
@@ -67,7 +68,7 @@
///
/// If [keepAlive] is not supplied, the connection will be closed immediately
/// after a disconnect.
- SseConnection(this._sink, {Duration keepAlive}) : _keepAlive = keepAlive {
+ SseConnection(this._sink, {Duration? keepAlive}) : _keepAlive = keepAlive {
_outgoingStreamQueue = StreamQueue(_outgoingController.stream);
unawaited(_setUpListener());
_outgoingController.onCancel = _close;
@@ -154,7 +155,7 @@
// been completely closed, set a timer to close after the timeout period.
// If the connection comes back, this will be cancelled and all messages left
// in the queue tried again.
- _keepAliveTimer = Timer(_keepAlive, _close);
+ _keepAliveTimer = Timer(_keepAlive!, _close);
}
}
@@ -187,11 +188,11 @@
class SseHandler {
final _logger = Logger('SseHandler');
final Uri _uri;
- final Duration _keepAlive;
+ final Duration? _keepAlive;
final _connections = <String, SseConnection>{};
final _connectionController = StreamController<SseConnection>();
- StreamQueue<SseConnection> _connectionsStream;
+ StreamQueue<SseConnection>? _connectionsStream;
/// [_uri] is the URL under which the server is listening for
/// incoming bi-directional SSE connections.
@@ -203,7 +204,7 @@
///
/// If [keepAlive] is not supplied, connections will be closed immediately
/// after a disconnect.
- SseHandler(this._uri, {Duration keepAlive}) : _keepAlive = keepAlive;
+ SseHandler(this._uri, {Duration? keepAlive}) : _keepAlive = keepAlive;
StreamQueue<SseConnection> get connections =>
_connectionsStream ??= StreamQueue(_connectionController.stream);
@@ -215,14 +216,14 @@
shelf.Response _createSseConnection(shelf.Request req, String path) {
req.hijack((channel) async {
var sink = utf8.encoder.startChunkedConversion(channel.sink);
- sink.add(_sseHeaders(req.headers['origin']));
- var clientId = req.url.queryParameters['sseClientId'];
+ sink.add(_sseHeaders(_originFor(req)));
+ var clientId = req.url.queryParameters['sseClientId']!;
// Check if we already have a connection for this ID that is in the process
// of timing out (in which case we can reconnect it transparently).
if (_connections[clientId] != null &&
- _connections[clientId].isInKeepAlivePeriod) {
- _connections[clientId]._acceptReconnection(sink);
+ _connections[clientId]!.isInKeepAlivePeriod) {
+ _connections[clientId]!._acceptReconnection(sink);
} else {
var connection = SseConnection(sink, keepAlive: _keepAlive);
_connections[clientId] = connection;
@@ -280,7 +281,7 @@
String _originFor(shelf.Request req) =>
// Firefox does not set header "origin".
// https://bugzilla.mozilla.org/show_bug.cgi?id=1508661
- req.headers['origin'] ?? req.headers['host'];
+ req.headers['origin'] ?? req.headers['host']!;
/// Immediately close all connections, ignoring any keepAlive periods.
void shutdown() {
diff --git a/pubspec.yaml b/pubspec.yaml
index 0c67e5c..df34fd4 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -7,17 +7,17 @@
requests.
environment:
- sdk: ">=2.2.0 <3.0.0"
+ sdk: ">=2.12.0 <3.0.0"
dependencies:
async: ^2.0.8
collection: ^1.0.0
- logging: '>=0.11.3+2 <2.0.0'
+ logging: ^1.0.0
pedantic: ^1.4.0
- stream_channel: '>=1.6.8 <3.0.0'
+ stream_channel: ^2.0.0
shelf: ^1.1.0
dev_dependencies:
- shelf_static: '>=0.2.8 <2.0.0'
+ shelf_static: ^1.0.0
test: ^1.5.3
webdriver: ^3.0.0
diff --git a/test/sse_test.dart b/test/sse_test.dart
index cbd8e6d..64f809e 100644
--- a/test/sse_test.dart
+++ b/test/sse_test.dart
@@ -16,25 +16,25 @@
import 'package:webdriver/io.dart';
void main() {
- HttpServer server;
- WebDriver webdriver;
- SseHandler handler;
+ late HttpServer server;
+ late WebDriver webdriver;
+ late SseHandler handler;
Process chromeDriver;
setUpAll(() async {
try {
chromeDriver = await Process.start(
'chromedriver', ['--port=4444', '--url-base=wd/hub']);
+
+ addTearDown(() {
+ chromeDriver.kill();
+ });
} catch (e) {
throw StateError(
'Could not start ChromeDriver. Is it installed?\nError: $e');
}
});
- tearDownAll(() {
- chromeDriver.kill();
- });
-
group('SSE', () {
setUp(() async {
handler = SseHandler(Uri.parse('/test'));
diff --git a/test/web/index.dart b/test/web/index.dart
index 7d00757..e149372 100644
--- a/test/web/index.dart
+++ b/test/web/index.dart
@@ -9,7 +9,7 @@
void main() {
var channel = SseClient('/test');
- document.querySelector('button').onClick.listen((_) {
+ document.querySelector('button')!.onClick.listen((_) {
channel.sink.close();
});