blob: 964d1da10bf9de1c5c5620494ac33e3bbc98faa0 [file] [log] [blame]
// Copyright (c) 2020, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
// @dart = 2.9
import 'dart:async';
import 'package:async/async.dart';
import 'package:shelf/shelf.dart';
import 'package:shelf_web_socket/shelf_web_socket.dart';
import 'package:sse/server/sse_handler.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
/// An individual (transport-agnostic) bidirectional socket connection.
abstract class SocketConnection {
/// Whether this connection is currently in the KeepAlive timeout period.
bool get isInKeepAlivePeriod;
/// Messages added to the sink must be JSON encodable.
StreamSink<dynamic> get sink;
Stream<String> get stream;
/// Immediately close the connection, ignoring any keepAlive period.
void shutdown();
/// A handler that accepts (transport-agnostic) bidirection socket connections.
abstract class SocketHandler {
StreamQueue<SocketConnection> get connections;
FutureOr<Response> handler(Request request);
void shutdown();
/// An implemenation of [SocketConnection] that users server-sent events (SSE)
/// and HTTP POSTS for bidirectional communication by wrapping an [SseConnection].
class SseSocketConnection extends SocketConnection {
final SseConnection _connection;
bool get isInKeepAlivePeriod => _connection.isInKeepAlivePeriod;
StreamSink<dynamic> get sink => _connection.sink;
Stream<String> get stream =>;
void shutdown() => _connection.shutdown();
/// An implemenation of [SocketHandler] that accepts server-sent events (SSE)
/// connections and wraps them in an [SseSocketConnection].
class SseSocketHandler extends SocketHandler {
final SseHandler _sseHandler;
final StreamController<SseSocketConnection> _connectionsStream =
StreamQueue<SseSocketConnection> _connectionsStreamQueue;
SseSocketHandler(this._sseHandler) {
unawaited(() async {
final injectedConnections = _sseHandler.connections;
while (await injectedConnections.hasNext) {
StreamQueue<SseSocketConnection> get connections =>
_connectionsStreamQueue ??= StreamQueue(;
FutureOr<Response> handler(Request request) => _sseHandler.handler(request);
void shutdown() => _sseHandler.shutdown();
/// An implemenation of [SocketConnection] that uses WebSockets for communication
/// by wrapping [WebSocketChannel].
class WebSocketConnection extends SocketConnection {
final WebSocketChannel _channel;
bool get isInKeepAlivePeriod => false;
StreamSink<dynamic> get sink => _channel.sink;
Stream<String> get stream => o) => o?.toString());
void shutdown() => _channel.sink.close();
/// An implemenation of [SocketHandler] that accepts WebSocket connections and
/// wraps them in a [WebSocketConnection].
class WebSocketSocketHandler extends SocketHandler {
Handler _handler;
final StreamController<WebSocketConnection> _connectionsStream =
StreamQueue<WebSocketConnection> _connectionsStreamQueue;
WebSocketSocketHandler() {
_handler = webSocketHandler((WebSocketChannel channel) =>
StreamQueue<WebSocketConnection> get connections =>
_connectionsStreamQueue ??= StreamQueue(;
FutureOr<Response> handler(Request request) => _handler(request);
void shutdown() {}