blob: 72cc91900daddb7adb0a0da0db333977249b67a7 [file] [log] [blame]
// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'package:async/async.dart';
import 'package:collection/collection.dart';
import 'package:logging/logging.dart';
import 'package:pedantic/pedantic.dart';
import 'package:shelf/shelf.dart' as shelf;
import 'package:stream_channel/stream_channel.dart';
// RFC 2616 requires carriage return delimiters.
String _sseHeaders(String? origin) => 'HTTP/1.1 200 OK\r\n'
'Content-Type: text/event-stream\r\n'
'Cache-Control: no-cache\r\n'
'Connection: keep-alive\r\n'
'Access-Control-Allow-Credentials: true\r\n'
"${origin != null ? 'Access-Control-Allow-Origin: $origin\r\n' : ''}"
'\r\n\r\n';
class _SseMessage {
final int id;
final String message;
_SseMessage(this.id, this.message);
}
/// A bi-directional SSE connection between server and browser.
class SseConnection extends StreamChannelMixin<String?> {
/// Incoming messages from the Browser client.
final _incomingController = StreamController<String>();
/// Outgoing messages to the Browser client.
final _outgoingController = StreamController<String>();
Sink _sink;
/// How long to wait after a connection drops before considering it closed.
final Duration? _keepAlive;
/// A timer counting down the KeepAlive period (null if hasn't disconnected).
Timer? _keepAliveTimer;
/// Whether this connection is currently in the KeepAlive timeout period.
bool get isInKeepAlivePeriod => _keepAliveTimer?.isActive ?? false;
/// The id of the last processed incoming message.
int _lastProcessedId = -1;
/// Incoming messages that have yet to be processed.
final _pendingMessages =
HeapPriorityQueue<_SseMessage>((a, b) => a.id.compareTo(b.id));
final _closedCompleter = Completer<void>();
/// Wraps the `_outgoingController.stream` to buffer events to enable keep
/// alive.
late StreamQueue _outgoingStreamQueue;
/// Creates an [SseConnection] for the supplied [_sink].
///
/// If [keepAlive] is supplied, the connection will remain active for this
/// period after a disconnect and can be reconnected transparently. If there
/// is no reconnect within that period, the connection will be closed normally.
///
/// If [keepAlive] is not supplied, the connection will be closed immediately
/// after a disconnect.
SseConnection(this._sink, {Duration? keepAlive}) : _keepAlive = keepAlive {
_outgoingStreamQueue = StreamQueue(_outgoingController.stream);
unawaited(_setUpListener());
_outgoingController.onCancel = _close;
_incomingController.onCancel = _close;
}
Future<void> _setUpListener() async {
while (
!_outgoingController.isClosed && await _outgoingStreamQueue.hasNext) {
// If we're in a KeepAlive timeout, there's nowhere to send messages so
// wait a short period and check again.
if (isInKeepAlivePeriod) {
await Future.delayed(const Duration(milliseconds: 200));
continue;
}
// Peek the data so we don't remove it from the stream if we're unable to
// send it.
final data = await _outgoingStreamQueue.peek;
// Ignore outgoing messages since the connection may have closed while
// waiting for the keep alive.
if (_closedCompleter.isCompleted) break;
try {
// JSON encode the message to escape new lines.
_sink.add('data: ${json.encode(data)}\n');
_sink.add('\n');
await _outgoingStreamQueue.next; // Consume from stream if no errors.
} catch (e) {
if ((e is StateError || e is SocketException) &&
(_keepAlive != null && !_closedCompleter.isCompleted)) {
// If we got here then the sink may have closed but the stream.onDone
// hasn't fired yet, so pause the subscription and skip calling
// `next` so the message remains in the queue to try again.
_handleDisconnect();
} else {
rethrow;
}
}
}
}
/// The message added to the sink has to be JSON encodable.
@override
StreamSink<String> get sink => _outgoingController.sink;
// Add messages to this [StreamSink] to send them to the server.
/// [Stream] of messages sent from the server to this client.
///
/// A message is a decoded JSON object.
@override
Stream<String> get stream => _incomingController.stream;
/// Adds an incoming [message] to the [stream].
///
/// This will buffer messages to guarantee order.
void _addIncomingMessage(int id, String message) {
_pendingMessages.add(_SseMessage(id, message));
while (_pendingMessages.isNotEmpty) {
var pendingMessage = _pendingMessages.first;
// Only process the next incremental message.
if (pendingMessage.id - _lastProcessedId <= 1) {
_incomingController.sink.add(pendingMessage.message);
_lastProcessedId = pendingMessage.id;
_pendingMessages.removeFirst();
} else {
// A message came out of order. Wait until we receive the previous
// messages to process.
break;
}
}
}
void _acceptReconnection(Sink sink) {
_keepAliveTimer?.cancel();
_sink = sink;
}
void _handleDisconnect() {
if (_keepAlive == null) {
// Close immediately if we're not keeping alive.
_close();
} else if (!isInKeepAlivePeriod && !_closedCompleter.isCompleted) {
// Otherwise if we didn't already have an active timer and we've not already
// been completely closed, set a timer to close after the timeout period.
// If the connection comes back, this will be cancelled and all messages left
// in the queue tried again.
_keepAliveTimer = Timer(_keepAlive!, _close);
}
}
void _close() {
if (!_closedCompleter.isCompleted) {
_closedCompleter.complete();
// Cancel any existing timer in case we were told to explicitly shut down
// to avoid keeping the process alive.
_keepAliveTimer?.cancel();
_sink.close();
if (!_outgoingController.isClosed) {
_outgoingStreamQueue.cancel(immediate: true);
_outgoingController.close();
}
if (!_incomingController.isClosed) _incomingController.close();
}
}
/// Immediately close the connection, ignoring any keepAlive period.
void shutdown() {
_close();
}
}
/// [SseHandler] handles requests on a user defined path to create
/// two-way communications of JSON encodable data between server and clients.
///
/// A server sends messages to a client through an SSE channel, while
/// a client sends message to a server through HTTP POST requests.
class SseHandler {
final _logger = Logger('SseHandler');
final Uri _uri;
final Duration? _keepAlive;
final _connections = <String?, SseConnection>{};
final _connectionController = StreamController<SseConnection>();
StreamQueue<SseConnection>? _connectionsStream;
/// [_uri] is the URL under which the server is listening for
/// incoming bi-directional SSE connections.
///
/// If [keepAlive] is supplied, connections will remain active for this
/// period after a disconnect and can be reconnected transparently. If there
/// is no reconnect within that period, the connection will be closed
/// normally.
///
/// If [keepAlive] is not supplied, connections will be closed immediately
/// after a disconnect.
SseHandler(this._uri, {Duration? keepAlive}) : _keepAlive = keepAlive;
StreamQueue<SseConnection> get connections =>
_connectionsStream ??= StreamQueue(_connectionController.stream);
shelf.Handler get handler => _handle;
int get numberOfClients => _connections.length;
shelf.Response _createSseConnection(shelf.Request req, String path) {
req.hijack((channel) async {
var sink = utf8.encoder.startChunkedConversion(channel.sink);
sink.add(_sseHeaders(req.headers['origin']));
var clientId = req.url.queryParameters['sseClientId'];
// Check if we already have a connection for this ID that is in the process
// of timing out (in which case we can reconnect it transparently).
if (_connections[clientId] != null &&
_connections[clientId]!.isInKeepAlivePeriod) {
_connections[clientId]!._acceptReconnection(sink);
} else {
var connection = SseConnection(sink, keepAlive: _keepAlive);
_connections[clientId] = connection;
unawaited(connection._closedCompleter.future.then((_) {
_connections.remove(clientId);
}));
_connectionController.add(connection);
}
// Remove connection when it is remotely closed or the stream is
// cancelled.
channel.stream.listen((_) {
// SSE is unidirectional. Responses are handled through POST requests.
}, onDone: () {
_connections[clientId]?._handleDisconnect();
});
});
}
String _getOriginalPath(shelf.Request req) => req.requestedUri.path;
Future<shelf.Response> _handle(shelf.Request req) async {
var path = _getOriginalPath(req);
if (_uri.path != path) {
return shelf.Response.notFound('');
}
if (req.headers['accept'] == 'text/event-stream' && req.method == 'GET') {
return _createSseConnection(req, path);
}
if (req.headers['accept'] != 'text/event-stream' && req.method == 'POST') {
return _handleIncomingMessage(req, path);
}
return shelf.Response.notFound('');
}
Future<shelf.Response> _handleIncomingMessage(
shelf.Request req, String path) async {
try {
var clientId = req.url.queryParameters['sseClientId'];
var messageId = int.parse(req.url.queryParameters['messageId'] ?? '0');
var message = await req.readAsString();
var jsonObject = json.decode(message) as String;
_connections[clientId]?._addIncomingMessage(messageId, jsonObject);
} catch (e, st) {
_logger.fine('Failed to handle incoming message. $e $st');
}
return shelf.Response.ok('', headers: {
'access-control-allow-credentials': 'true',
'access-control-allow-origin': _originFor(req),
});
}
String _originFor(shelf.Request req) =>
// Firefox does not set header "origin".
// https://bugzilla.mozilla.org/show_bug.cgi?id=1508661
req.headers['origin'] ?? req.headers['host']!;
/// Immediately close all connections, ignoring any keepAlive periods.
void shutdown() {
for (final connection in _connections.values) {
connection.shutdown();
}
}
}
void closeSink(SseConnection connection) => connection._sink.close();