blob: bfd5a9ae610e03c69497dd28555483d2530d3060 [file] [log] [blame] [edit]
import 'dart:async';
import 'dart:ffi';
import 'dart:isolate';
import 'ns_data.dart';
import 'ns_string.dart';
import 'objective_c_bindings_generated.dart';
extension NSInputStreamStreamExtension on Stream<List<int>> {
/// Return a [NSInputStream] that, when read, will contain the contents of
/// the [Stream].
///
/// > [!IMPORTANT]
/// > [NSInputStream.read] must be called from a different thread or [Isolate]
/// > than the one that calls [toNSInputStream]. Otherwise,
/// > [NSInputStream.read] will deadlock waiting for data to be added from the
/// > [Stream].
NSInputStream toNSInputStream() {
// Eagerly add data until `maxReadAheadSize` is buffered.
const maxReadAheadSize = 4096;
final port = ReceivePort();
final inputStream =
DartInputStreamAdapter.inputStreamWithPort(port.sendPort.nativePort);
late final StreamSubscription<dynamic> dataSubscription;
dataSubscription = listen((data) {
if (inputStream.addData(data.toNSData()) > maxReadAheadSize) {
dataSubscription.pause();
}
}, onError: (Object e) {
final d = NSMutableDictionary();
d[NSLocalizedDescriptionKey] = e.toString().toNSString();
inputStream.setError(NSError.errorWithDomain('DartError'.toNSString(),
code: 0, userInfo: d));
port.close();
}, onDone: () {
inputStream.setDone();
port.close();
}, cancelOnError: true);
dataSubscription.pause();
port.listen((count) {
// -1 indicates that the `NSInputStream` is closed. All other values
// indicate that the `NSInputStream` needs more data.
//
// If [DartInputStreamAdapter.setError] or
// [DartInputStreamAdapter.setDone] is called then the close message (-1)
// will not be sent when the input stream is closed.
if (count == -1) {
port.close();
} else {
dataSubscription.resume();
}
}, onDone: () {
dataSubscription.cancel();
});
return inputStream;
}
}