blob: 8e5db6ffb5d2686f007723e47f4d10cd49d8fb45 [file] [log] [blame]
// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'dart:convert';
import 'dart:html';
import 'package:http/browser_client.dart';
import 'package:logging/logging.dart';
import 'package:stream_channel/stream_channel.dart';
import 'package:uuid/uuid.dart';
/// A client for bi-directional sse communcation.
///
/// The client can send any JSON-encodable messages to the server by adding
/// them to the [sink] and listen to messages from the server on the [stream].
class SseClient extends StreamChannelMixin<String> {
final _incomingController = new StreamController<String>();
final _outgoingController = new StreamController<String>();
final _client = new BrowserClient()..withCredentials = true;
final _logger = Logger('SseClient');
EventSource _eventSource;
String _serverUrl;
/// [serverUrl] is the URL under which the server is listening for
/// incoming bi-directional SSE connections.
SseClient(String serverUrl) {
var clientId = Uuid().v1();
_eventSource = new EventSource('$serverUrl?sseClientId=$clientId',
withCredentials: true);
_serverUrl = '$serverUrl?sseClientId=$clientId';
_outgoingController.stream
.listen(_onOutgoingMessage, onDone: _onOutgoingDone);
_eventSource.addEventListener('message', _onIncomingMessage);
_eventSource.addEventListener('control', _onIncomingControlMessage);
_eventSource.onError.listen(_incomingController.addError);
}
Stream<Event> get onOpen => _eventSource.onOpen;
/// Add messages to this [StreamSink] to send them to the server.
///
/// The message added to the sink has to be JSON encodable. Messages that fail
/// to encode will be logged through a [Logger].
@override
StreamSink<String> get sink => _outgoingController.sink;
/// [Stream] of messages sent from the server to this client.
///
/// A message is a decoded JSON object.
@override
Stream<String> get stream => _incomingController.stream;
void close() {
_eventSource.close();
_incomingController.close();
_outgoingController.close();
_client.close();
}
void _onIncomingControlMessage(Event message) {
var data = (message as MessageEvent).data;
if (data == 'close') {
close();
} else {
throw new UnsupportedError('Illegal Control Message "$data"');
}
}
void _onIncomingMessage(Event message) {
var decoded =
jsonDecode((message as MessageEvent).data as String) as String;
_incomingController.add(decoded);
}
void _onOutgoingDone() {
close();
}
void _onOutgoingMessage(dynamic message) async {
var encoded = jsonEncode(message);
try {
await _client.post(_serverUrl, body: encoded);
} catch (e) {
_logger.warning('Unable to encode outgoing message: $e');
}
}
}