blob: 4d729941920d6f28fbc749d95b8faf6d4e292175 [file] [log] [blame]
// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'dart:convert';
import 'dart:typed_data';
import 'package:jni/jni.dart';
import 'package:web_socket/web_socket.dart';
import 'jni/bindings.dart' as bindings;
/// A [WebSocket] implemented using the OkHttp library's
/// [WebSocket](https://square.github.io/okhttp/5.x/okhttp/okhttp3/-web-socket/index.html)
/// API.
///
/// > [!NOTE]
/// > The [WebSocket] interface is currently experimental and may change in the
/// > future.
///
/// Example usage of [OkHttpWebSocket]:
/// ```dart
/// import 'package:ok_http/ok_http.dart';
/// import 'package:web_socket/web_socket.dart';
///
/// void main() async {
/// final socket = await OkHttpWebSocket.connect(
/// Uri.parse('wss://ws.postman-echo.com/raw'));
///
/// socket.events.listen((e) async {
/// switch (e) {
/// case TextDataReceived(text: final text):
/// print('Received Text: $text');
/// await socket.close();
/// case BinaryDataReceived(data: final data):
/// print('Received Binary: $data');
/// case CloseReceived(code: final code, reason: final reason):
/// print('Connection to server closed: $code [$reason]');
/// }
/// });
/// }
/// ```
///
/// > [!TIP]
/// > [`AdapterWebSocketChannel`](https://pub.dev/documentation/web_socket_channel/latest/adapter_web_socket_channel/AdapterWebSocketChannel-class.html)
/// > can be used to adapt a [OkHttpWebSocket] into a
/// > [`WebSocketChannel`](https://pub.dev/documentation/web_socket_channel/latest/web_socket_channel/WebSocketChannel-class.html).
class OkHttpWebSocket implements WebSocket {
late bindings.OkHttpClient _client;
late final bindings.WebSocket _webSocket;
final _events = StreamController<WebSocketEvent>();
String? _protocol;
/// Private constructor to prevent direct instantiation.
///
/// Used by [connect] to create a new WebSocket connection, which requires a
/// [bindings.OkHttpClient] instance (see [_connect]), and cannot be accessed
/// statically.
OkHttpWebSocket._() {
// Add the WebSocketInterceptor to prevent response parsing errors.
_client = bindings.WebSocketInterceptor.Companion
.addWSInterceptor(bindings.OkHttpClient_Builder())
.build();
}
/// Create a new WebSocket connection using `OkHttp`'s
/// [WebSocket](https://square.github.io/okhttp/5.x/okhttp/okhttp3/-web-socket/index.html)
/// API.
///
/// The URL supplied in [url] must use the scheme ws or wss.
///
/// If provided, the [protocols] argument indicates the subprotocols that
/// the peer is able to select. See
/// [RFC-6455 1.9](https://datatracker.ietf.org/doc/html/rfc6455#section-1.9).
static Future<WebSocket> connect(Uri url,
{Iterable<dynamic>? protocols}) async =>
OkHttpWebSocket._()._connect(url, protocols);
Future<WebSocket> _connect(Uri url, Iterable<dynamic>? protocols) async {
if (!url.isScheme('ws') && !url.isScheme('wss')) {
throw ArgumentError.value(
url, 'url', 'only ws: and wss: schemes are supported');
}
final requestBuilder =
bindings.Request_Builder().url$1(url.toString().toJString());
if (protocols != null) {
requestBuilder.addHeader('Sec-WebSocket-Protocol'.toJString(),
protocols.join(', ').toJString());
}
var openCompleter = Completer<WebSocket>();
_client.newWebSocket(
requestBuilder.build(),
bindings.WebSocketListenerProxy(
bindings.WebSocketListenerProxy_WebSocketListener.implement(
bindings.$WebSocketListenerProxy_WebSocketListener(
onOpen: (webSocket, response) {
_webSocket = webSocket;
var protocolHeader =
response.header$1('sec-websocket-protocol'.toJString());
if (!protocolHeader.isNull) {
_protocol = protocolHeader.toDartString(releaseOriginal: true);
if (!(protocols?.contains(_protocol) ?? true)) {
openCompleter
.completeError(WebSocketException('Protocol mismatch. '
'Expected one of $protocols, but received $_protocol'));
return;
}
}
openCompleter.complete(this);
},
onMessage: (bindings.WebSocket webSocket, JString string) {
if (_events.isClosed) return;
_events.add(TextDataReceived(string.toDartString()));
},
onMessage$1:
(bindings.WebSocket webSocket, bindings.ByteString byteString) {
if (_events.isClosed) return;
_events.add(
BinaryDataReceived(byteString.toByteArray().toUint8List()));
},
onClosing:
(bindings.WebSocket webSocket, int i, JString string) async {
_okHttpClientClose();
if (_events.isClosed) return;
_events.add(CloseReceived(i, string.toDartString()));
await _events.close();
},
onFailure: (bindings.WebSocket webSocket, JObject throwable,
bindings.Response response) {
if (_events.isClosed) return;
var throwableString = throwable.toString();
// If the throwable is:
// - java.net.ProtocolException: Control frames must be final.
// - java.io.EOFException
// - java.net.SocketException: Socket closed
// Then the connection was closed abnormally.
if (throwableString.contains(RegExp(
r'(java\.net\.ProtocolException: Control frames must be final\.|java\.io\.EOFException|java\.net\.SocketException: Socket closed)'))) {
_events.add(CloseReceived(1006, 'abnormal close'));
unawaited(_events.close());
return;
}
var error = WebSocketException(
'Connection ended unexpectedly $throwableString');
if (openCompleter.isCompleted) {
_events.addError(error);
return;
}
openCompleter.completeError(error);
},
))));
return openCompleter.future;
}
@override
Future<void> close([int? code, String? reason]) async {
if (_events.isClosed) {
throw WebSocketConnectionClosed();
}
if (code != null && code != 1000 && !(code >= 3000 && code <= 4999)) {
throw ArgumentError('Invalid argument: $code, close code must be 1000 or '
'in the range 3000-4999');
}
if (reason != null && utf8.encode(reason).length > 123) {
throw ArgumentError.value(reason, 'reason',
'reason must be <= 123 bytes long when encoded as UTF-8');
}
unawaited(_events.close());
// When no code is provided, cause an abnormal closure to send 1005.
if (code == null) {
_webSocket.cancel();
return;
}
_webSocket.close(
code, reason?.toJString() ?? JString.fromReference(jNullReference));
}
@override
Stream<WebSocketEvent> get events => _events.stream;
@override
String get protocol => _protocol ?? '';
@override
void sendBytes(Uint8List b) {
if (_events.isClosed) {
throw WebSocketConnectionClosed();
}
_webSocket.send$1(bindings.ByteString.of(b.toJArray()));
}
@override
void sendText(String s) {
if (_events.isClosed) {
throw WebSocketConnectionClosed();
}
_webSocket.send(s.toJString());
}
/// Closes the OkHttpClient using the recommended shutdown procedure.
///
/// https://square.github.io/okhttp/5.x/okhttp/okhttp3/-ok-http-client/index.html#:~:text=Shutdown
void _okHttpClientClose() {
_client.dispatcher().executorService().shutdown();
_client.connectionPool().evictAll();
var cache = _client.cache();
if (!cache.isNull) {
cache.close();
}
_client.release();
}
}
extension on Uint8List {
JArray<jbyte> toJArray() =>
JArray(jbyte.type, length)..setRange(0, length, this);
}
extension on JArray<jbyte> {
Uint8List toUint8List({int? length}) =>
getRange(0, length ?? this.length).buffer.asUint8List();
}