blob: bd696985eabc843b74d94e8af6ceb0f5c7e4be37 [file] [log] [blame]
// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:ffi';
import 'dart:typed_data';
import 'package:ffi/ffi.dart';
import 'primitives.dart';
import 'sendable.dart';
final class _MailboxRepr extends Struct {
external Pointer<Uint8> buffer;
@Int32()
external int bufferLength;
@Int32()
external int state;
}
class _SendableMailbox {
final int address;
final Sendable<Mutex> mutex;
final Sendable<ConditionVariable> condVar;
_SendableMailbox(
{required this.address, required this.mutex, required this.condVar});
}
/// Mailbox communication primitive.
///
/// This synchronization primitive allows a single producer to send messages
/// to one or more consumers. Producer uses [put] to place a message into
/// a mailbox which consumers can then [take] out.
///
/// [Mailbox] object can not be directly sent to other isolates via a
/// `SendPort`, but it can be converted to a `Sendable<Mailbox>` via
/// `asSendable` getter.
///
/// [Mailbox] object is owned by an isolate which created them.
class Mailbox {
final Pointer<_MailboxRepr> _mailbox;
final Mutex _mutex;
final ConditionVariable _condVar;
static const _stateEmpty = 0;
static const _stateFull = 1;
static const _stateClosed = 2;
static final finalizer = Finalizer((Pointer<_MailboxRepr> mailbox) {
calloc.free(mailbox.ref.buffer);
calloc.free(mailbox);
});
Mailbox()
: _mailbox = calloc.allocate(sizeOf<_MailboxRepr>()),
_mutex = Mutex(),
_condVar = ConditionVariable() {
finalizer.attach(this, _mailbox);
}
Mailbox._fromSendable(_SendableMailbox sendable)
: _mailbox = Pointer.fromAddress(sendable.address),
_mutex = sendable.mutex.materialize(),
_condVar = sendable.condVar.materialize();
/// Place a message into the mailbox if has space for it.
///
/// If mailbox already contains a message or mailbox is closed then [put] will
/// throw [StateError].
void put(Uint8List message) {
final buffer = message.isEmpty ? nullptr : _toBuffer(message);
_mutex.runLocked(() {
if (_mailbox.ref.state != _stateEmpty) {
throw StateError('Mailbox is closed or full');
}
_mailbox.ref.state = _stateFull;
_mailbox.ref.buffer = buffer;
_mailbox.ref.bufferLength = message.length;
_condVar.notify();
});
}
/// Close a mailbox.
///
/// If mailbox already contains a message then [close] will drop the message.
void close() => _mutex.runLocked(() {
if (_mailbox.ref.state == _stateFull && _mailbox.ref.bufferLength > 0) {
malloc.free(_mailbox.ref.buffer);
}
_mailbox.ref.state = _stateClosed;
_mailbox.ref.buffer = nullptr;
_mailbox.ref.bufferLength = 0;
_condVar.notify();
});
/// Take a message from the mailbox.
///
/// If mailbox is empty then [take] will synchronously block until message
/// is available or mailbox is closed. If mailbox is closed then [take] will
/// throw [StateError].
Uint8List take() => _mutex.runLocked(() {
while (_mailbox.ref.state == _stateEmpty) {
_condVar.wait(_mutex);
}
if (_mailbox.ref.state == _stateClosed) {
throw StateError('Mailbox is closed');
}
final result = _toList(_mailbox.ref.buffer, _mailbox.ref.bufferLength);
_mailbox.ref.state = _stateEmpty;
_mailbox.ref.buffer = nullptr;
_mailbox.ref.bufferLength = 0;
return result;
});
static final _emptyResponse = Uint8List(0);
static Uint8List _toList(Pointer<Uint8> buffer, int length) {
if (length == 0) {
return _emptyResponse;
}
// TODO: remove feature detection once 3.1 becomes stable.
// ignore: omit_local_variable_types
final Uint8List Function(int) asTypedList = buffer.asTypedList;
if (asTypedList is Uint8List Function(int,
{Pointer<NativeFinalizerFunction> finalizer})) {
return asTypedList(length, finalizer: malloc.nativeFree);
}
final result = Uint8List(length);
result.setRange(0, length, buffer.asTypedList(length));
malloc.free(buffer);
return result;
}
static Pointer<Uint8> _toBuffer(Uint8List list) {
final buffer = malloc.allocate<Uint8>(list.length);
buffer.asTypedList(list.length).setRange(0, list.length, list);
return buffer;
}
Sendable<Mailbox> get asSendable => Sendable.wrap(
Mailbox._fromSendable,
_SendableMailbox(
address: _mailbox.address,
mutex: _mutex.asSendable,
condVar: _condVar.asSendable));
}