blob: a3067f6da17fec35b84d7d331effe6bb8c433643 [file] [log] [blame]
// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'dart:convert';
import 'package:async/async.dart';
import 'package:logging/logging.dart';
import 'package:pedantic/pedantic.dart';
import 'package:shelf/shelf.dart' as shelf;
import 'package:stream_channel/stream_channel.dart';
String _sseHeaders(String origin) => 'HTTP/1.1 200 OK\r\n'
'Content-Type: text/event-stream\r\n'
'Cache-Control: no-cache\r\n'
'Connection: keep-alive\r\n'
'Access-Control-Allow-Credentials: true\r\n'
'Access-Control-Allow-Origin: $origin\r\n'
/// A bi-directional SSE connection between server and browser.
class SseConnection extends StreamChannelMixin<String> {
/// Incoming messages from the Browser client.
final _incomingController = StreamController<String>();
/// Outgoing messages to the Browser client.
final _outgoingController = StreamController<String>();
final Sink _sink;
final _closedCompleter = Completer<void>();
SseConnection(this._sink) { {
if (!_closedCompleter.isCompleted) {
// JSON encode the message to escape new lines.
_sink.add('data: ${json.encode(data)}\n');
_outgoingController.onCancel = _close;
_incomingController.onCancel = _close;
/// The message added to the sink has to be JSON encodable.
StreamSink<String> get sink => _outgoingController.sink;
// Add messages to this [StreamSink] to send them to the server.
/// [Stream] of messages sent from the server to this client.
/// A message is a decoded JSON object.
Stream<String> get stream =>;
void _close() {
if (!_closedCompleter.isCompleted) {
if (!_outgoingController.isClosed) _outgoingController.close();
if (!_incomingController.isClosed) _incomingController.close();
/// [SseHandler] handles requests on a user defined path to create
/// two-way communications of JSON encodable data between server and clients.
/// A server sends messages to a client through an SSE channel, while
/// a client sends message to a server through HTTP POST requests.
class SseHandler {
final _logger = Logger('SseHandler');
final Uri _uri;
final _connections = <String, SseConnection>{};
final _connectionController = StreamController<SseConnection>();
StreamQueue<SseConnection> _connectionsStream;
StreamQueue<SseConnection> get connections =>
_connectionsStream ??= StreamQueue(;
shelf.Handler get handler => _handle;
int get numberOfClients => _connections.length;
shelf.Response _createSseConnection(shelf.Request req, String path) {
req.hijack((channel) async {
var sink = utf8.encoder.startChunkedConversion(channel.sink);
var clientId = req.url.queryParameters['sseClientId'];
var connection = SseConnection(sink);
_connections[clientId] = connection;
unawaited(connection._closedCompleter.future.then((_) {
// Remove connection when it is remotely closed or the stream is
// cancelled. {
// SSE is unidirectional. Responses are handled through POST requests.
}, onDone: () {
return shelf.Response.notFound('');
String _getOriginalPath(shelf.Request req) => req.requestedUri.path;
Future<shelf.Response> _handle(shelf.Request req) async {
var path = _getOriginalPath(req);
if (_uri.path != path) {
return shelf.Response.notFound('');
if (req.headers['accept'] == 'text/event-stream' && req.method == 'GET') {
return _createSseConnection(req, path);
if (req.headers['accept'] != 'text/event-stream' && req.method == 'POST') {
return _handleIncomingMessage(req, path);
return shelf.Response.notFound('');
Future<shelf.Response> _handleIncomingMessage(
shelf.Request req, String path) async {
try {
var clientId = req.url.queryParameters['sseClientId'];
var message = await req.readAsString();
var jsonObject = json.decode(message) as String;
} catch (e, st) {
_logger.fine('Failed to handle incoming message. $e $st');
return shelf.Response.ok('', headers: {
'access-control-allow-credentials': 'true',
'access-control-allow-origin': req.headers['origin']