blob: aabd5c41a8c0363eaaf5c25e9928df949ac7bae2 [file] [log] [blame]
// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
#include <utility>
#include "vm/message_handler.h"
#include "vm/port.h"
#include "vm/unit_test.h"
namespace dart {
class MessageHandlerTestPeer {
public:
explicit MessageHandlerTestPeer(MessageHandler* handler)
: handler_(handler) {}
void PostMessage(std::unique_ptr<Message> message) {
handler_->PostMessage(std::move(message));
}
void ClosePort(Dart_Port port) { handler_->ClosePort(port); }
void CloseAllPorts() { handler_->CloseAllPorts(); }
void increment_live_ports() { handler_->increment_live_ports(); }
void decrement_live_ports() { handler_->decrement_live_ports(); }
MessageQueue* queue() const { return handler_->queue_; }
MessageQueue* oob_queue() const { return handler_->oob_queue_; }
private:
MessageHandler* handler_;
DISALLOW_COPY_AND_ASSIGN(MessageHandlerTestPeer);
};
class TestMessageHandler : public MessageHandler {
public:
TestMessageHandler()
: port_buffer_(NULL),
port_buffer_size_(0),
notify_count_(0),
message_count_(0),
start_called_(false),
end_called_(false),
results_(NULL),
monitor_() {}
~TestMessageHandler() {
PortMap::ClosePorts(this);
delete[] port_buffer_;
}
void MessageNotify(Message::Priority priority) {
MonitorLocker ml(&monitor_);
notify_count_++;
ml.Notify();
}
MessageStatus HandleMessage(std::unique_ptr<Message> message) {
// For testing purposes, keep a list of the ports
// for all messages we receive.
MonitorLocker ml(&monitor_);
AddPortToBuffer(message->dest_port());
message_count_++;
MessageStatus status = kOK;
if (results_ != NULL) {
status = results_[0];
results_++;
}
ml.Notify();
return status;
}
MessageStatus Start() {
start_called_ = true;
return kOK;
}
void End() {
MonitorLocker ml(&monitor_);
end_called_ = true;
AddPortToBuffer(-2);
ml.Notify();
}
Dart_Port* port_buffer() const { return port_buffer_; }
int notify_count() const { return notify_count_; }
int message_count() const { return message_count_; }
bool start_called() const { return start_called_; }
bool end_called() const { return end_called_; }
void set_results(MessageStatus* results) { results_ = results; }
Monitor* monitor() { return &monitor_; }
private:
void AddPortToBuffer(Dart_Port port) {
if (port_buffer_ == NULL) {
port_buffer_ = new Dart_Port[10];
port_buffer_size_ = 10;
} else if (message_count_ == port_buffer_size_) {
int new_port_buffer_size_ = 2 * port_buffer_size_;
Dart_Port* new_port_buffer_ = new Dart_Port[new_port_buffer_size_];
for (int i = 0; i < port_buffer_size_; i++) {
new_port_buffer_[i] = port_buffer_[i];
}
delete[] port_buffer_;
port_buffer_ = new_port_buffer_;
port_buffer_size_ = new_port_buffer_size_;
}
port_buffer_[message_count_] = port;
}
Dart_Port* port_buffer_;
int port_buffer_size_;
int notify_count_;
int message_count_;
bool start_called_;
bool end_called_;
MessageStatus* results_;
Monitor monitor_;
DISALLOW_COPY_AND_ASSIGN(TestMessageHandler);
};
MessageHandler::MessageStatus TestStartFunction(uword data) {
return (reinterpret_cast<TestMessageHandler*>(data))->Start();
}
void TestEndFunction(uword data) {
return (reinterpret_cast<TestMessageHandler*>(data))->End();
}
static std::unique_ptr<Message> BlankMessage(Dart_Port dest,
Message::Priority priority) {
return Message::New(dest, reinterpret_cast<uint8_t*>(malloc(1)), 1, nullptr,
priority);
}
VM_UNIT_TEST_CASE(MessageHandler_PostMessage) {
TestMessageHandler handler;
MessageHandlerTestPeer handler_peer(&handler);
EXPECT_EQ(0, handler.notify_count());
// Post a message.
std::unique_ptr<Message> message = BlankMessage(1, Message::kNormalPriority);
Message* raw_message = message.get();
handler_peer.PostMessage(std::move(message));
// The notify callback is called.
EXPECT_EQ(1, handler.notify_count());
// The message has been added to the correct queue.
EXPECT(raw_message == handler_peer.queue()->Dequeue().get());
EXPECT(nullptr == handler_peer.oob_queue()->Dequeue());
// Post an oob message.
message = BlankMessage(1, Message::kOOBPriority);
raw_message = message.get();
handler_peer.PostMessage(std::move(message));
// The notify callback is called.
EXPECT_EQ(2, handler.notify_count());
// The message has been added to the correct queue.
EXPECT(raw_message == handler_peer.oob_queue()->Dequeue().get());
EXPECT(nullptr == handler_peer.queue()->Dequeue());
}
VM_UNIT_TEST_CASE(MessageHandler_HasOOBMessages) {
TestMessageHandler handler;
MessageHandlerTestPeer handler_peer(&handler);
EXPECT(!handler.HasOOBMessages());
// Post a normal message.
std::unique_ptr<Message> message = BlankMessage(1, Message::kNormalPriority);
handler_peer.PostMessage(std::move(message));
EXPECT(!handler.HasOOBMessages());
{
// Acquire ownership of message handler queues, verify one regular message.
MessageHandler::AcquiredQueues aq(&handler);
EXPECT(aq.queue()->Length() == 1);
}
// Post an oob message.
message = BlankMessage(1, Message::kOOBPriority);
handler_peer.PostMessage(std::move(message));
EXPECT(handler.HasOOBMessages());
{
// Acquire ownership of message handler queues, verify one regular and one
// OOB message.
MessageHandler::AcquiredQueues aq(&handler);
EXPECT(aq.queue()->Length() == 1);
EXPECT(aq.oob_queue()->Length() == 1);
}
// Delete all pending messages.
handler_peer.CloseAllPorts();
}
VM_UNIT_TEST_CASE(MessageHandler_ClosePort) {
TestMessageHandler handler;
MessageHandlerTestPeer handler_peer(&handler);
std::unique_ptr<Message> message;
message = BlankMessage(1, Message::kNormalPriority);
Message* raw_message1 = message.get();
handler_peer.PostMessage(std::move(message));
message = BlankMessage(2, Message::kNormalPriority);
Message* raw_message2 = message.get();
handler_peer.PostMessage(std::move(message));
handler_peer.ClosePort(1);
// Closing the port does not drop the messages from the queue.
EXPECT(raw_message1 == handler_peer.queue()->Dequeue().get());
EXPECT(raw_message2 == handler_peer.queue()->Dequeue().get());
}
VM_UNIT_TEST_CASE(MessageHandler_CloseAllPorts) {
TestMessageHandler handler;
MessageHandlerTestPeer handler_peer(&handler);
handler_peer.PostMessage(BlankMessage(1, Message::kNormalPriority));
handler_peer.PostMessage(BlankMessage(2, Message::kNormalPriority));
handler_peer.CloseAllPorts();
// All messages are dropped from the queue.
EXPECT(nullptr == handler_peer.queue()->Dequeue());
}
VM_UNIT_TEST_CASE(MessageHandler_HandleNextMessage) {
TestMessageHandler handler;
MessageHandlerTestPeer handler_peer(&handler);
Dart_Port port1 = PortMap::CreatePort(&handler);
Dart_Port port2 = PortMap::CreatePort(&handler);
Dart_Port port3 = PortMap::CreatePort(&handler);
handler_peer.PostMessage(BlankMessage(port1, Message::kNormalPriority));
handler_peer.PostMessage(BlankMessage(port2, Message::kOOBPriority));
handler_peer.PostMessage(BlankMessage(port2, Message::kNormalPriority));
handler_peer.PostMessage(BlankMessage(port3, Message::kOOBPriority));
// We handle both oob messages and a single normal message.
EXPECT_EQ(MessageHandler::kOK, handler.HandleNextMessage());
EXPECT_EQ(3, handler.message_count());
Dart_Port* ports = handler.port_buffer();
EXPECT_EQ(port2, ports[0]);
EXPECT_EQ(port3, ports[1]);
EXPECT_EQ(port1, ports[2]);
}
VM_UNIT_TEST_CASE(MessageHandler_HandleNextMessage_ProcessOOBAfterError) {
TestMessageHandler handler;
MessageHandler::MessageStatus results[] = {
MessageHandler::kError, // oob_message1
MessageHandler::kOK, // oob_message2
MessageHandler::kOK, // unused
};
handler.set_results(results);
MessageHandlerTestPeer handler_peer(&handler);
Dart_Port port1 = PortMap::CreatePort(&handler);
Dart_Port port2 = PortMap::CreatePort(&handler);
Dart_Port port3 = PortMap::CreatePort(&handler);
handler_peer.PostMessage(BlankMessage(port1, Message::kNormalPriority));
handler_peer.PostMessage(BlankMessage(port2, Message::kOOBPriority));
handler_peer.PostMessage(BlankMessage(port3, Message::kOOBPriority));
// When we get an error, we continue processing oob messages but
// stop handling normal messages.
EXPECT_EQ(MessageHandler::kError, handler.HandleNextMessage());
EXPECT_EQ(2, handler.message_count());
Dart_Port* ports = handler.port_buffer();
EXPECT_EQ(port2, ports[0]); // oob_message1, error
EXPECT_EQ(port3, ports[1]); // oob_message2, ok
handler_peer.CloseAllPorts();
}
VM_UNIT_TEST_CASE(MessageHandler_HandleNextMessage_Shutdown) {
TestMessageHandler handler;
MessageHandler::MessageStatus results[] = {
MessageHandler::kOK, // oob_message1
MessageHandler::kShutdown, // oob_message2
MessageHandler::kOK, // unused
MessageHandler::kOK, // unused
};
handler.set_results(results);
MessageHandlerTestPeer handler_peer(&handler);
Dart_Port port1 = PortMap::CreatePort(&handler);
Dart_Port port2 = PortMap::CreatePort(&handler);
Dart_Port port3 = PortMap::CreatePort(&handler);
Dart_Port port4 = PortMap::CreatePort(&handler);
handler_peer.PostMessage(BlankMessage(port1, Message::kNormalPriority));
handler_peer.PostMessage(BlankMessage(port2, Message::kOOBPriority));
handler_peer.PostMessage(BlankMessage(port3, Message::kOOBPriority));
handler_peer.PostMessage(BlankMessage(port4, Message::kOOBPriority));
// When we get a shutdown message, we stop processing all messages.
EXPECT_EQ(MessageHandler::kShutdown, handler.HandleNextMessage());
EXPECT_EQ(2, handler.message_count());
Dart_Port* ports = handler.port_buffer();
EXPECT_EQ(port2, ports[0]); // oob_message1, ok
EXPECT_EQ(port3, ports[1]); // oob_message2, shutdown
{
// The oob queue has been cleared. oob_message3 is gone.
MessageHandler::AcquiredQueues aq(&handler);
EXPECT(aq.oob_queue()->Length() == 0);
}
handler_peer.CloseAllPorts();
}
VM_UNIT_TEST_CASE(MessageHandler_HandleOOBMessages) {
TestMessageHandler handler;
MessageHandlerTestPeer handler_peer(&handler);
Dart_Port port1 = PortMap::CreatePort(&handler);
Dart_Port port2 = PortMap::CreatePort(&handler);
Dart_Port port3 = PortMap::CreatePort(&handler);
Dart_Port port4 = PortMap::CreatePort(&handler);
handler_peer.PostMessage(BlankMessage(port1, Message::kNormalPriority));
handler_peer.PostMessage(BlankMessage(port2, Message::kNormalPriority));
handler_peer.PostMessage(BlankMessage(port3, Message::kOOBPriority));
handler_peer.PostMessage(BlankMessage(port4, Message::kOOBPriority));
// We handle both oob messages but no normal messages.
EXPECT_EQ(MessageHandler::kOK, handler.HandleOOBMessages());
EXPECT_EQ(2, handler.message_count());
Dart_Port* ports = handler.port_buffer();
EXPECT_EQ(port3, ports[0]);
EXPECT_EQ(port4, ports[1]);
handler_peer.CloseAllPorts();
}
struct ThreadStartInfo {
MessageHandler* handler;
Dart_Port* ports;
int count;
ThreadJoinId join_id;
};
static void SendMessages(uword param) {
ThreadStartInfo* info = reinterpret_cast<ThreadStartInfo*>(param);
info->join_id = OSThread::GetCurrentThreadJoinId(OSThread::Current());
MessageHandler* handler = info->handler;
MessageHandlerTestPeer handler_peer(handler);
for (int i = 0; i < info->count; i++) {
handler_peer.PostMessage(
BlankMessage(info->ports[i], Message::kNormalPriority));
}
}
VM_UNIT_TEST_CASE(MessageHandler_Run) {
TestMessageHandler handler;
ThreadPool pool;
MessageHandlerTestPeer handler_peer(&handler);
EXPECT(!handler.HasLivePorts());
handler_peer.increment_live_ports();
handler.Run(&pool, TestStartFunction, TestEndFunction,
reinterpret_cast<uword>(&handler));
Dart_Port port = PortMap::CreatePort(&handler);
handler_peer.PostMessage(BlankMessage(port, Message::kNormalPriority));
// Wait for the first message to be handled.
{
MonitorLocker ml(handler.monitor());
while (handler.message_count() < 1) {
ml.Wait();
}
EXPECT_EQ(1, handler.message_count());
EXPECT(handler.start_called());
EXPECT(!handler.end_called());
Dart_Port* handler_ports = handler.port_buffer();
EXPECT_EQ(port, handler_ports[0]);
}
// Start a thread which sends more messages.
Dart_Port ports[10];
for (int i = 0; i < 10; i++) {
ports[i] = PortMap::CreatePort(&handler);
}
ThreadStartInfo info;
info.handler = &handler;
info.ports = ports;
info.count = 10;
info.join_id = OSThread::kInvalidThreadJoinId;
OSThread::Start("SendMessages", SendMessages, reinterpret_cast<uword>(&info));
// Wait for the messages to be handled.
{
MonitorLocker ml(handler.monitor());
while (handler.message_count() < 11) {
ml.Wait();
}
Dart_Port* handler_ports = handler.port_buffer();
EXPECT_EQ(11, handler.message_count());
EXPECT(handler.start_called());
EXPECT(!handler.end_called());
EXPECT_EQ(port, handler_ports[0]);
for (int i = 1; i < 11; i++) {
EXPECT_EQ(ports[i - 1], handler_ports[i]);
}
handler_peer.decrement_live_ports();
EXPECT(!handler.HasLivePorts());
}
// Must join the thread or the VM shutdown is racing with any VM state the
// thread touched.
ASSERT(info.join_id != OSThread::kInvalidThreadJoinId);
OSThread::Join(info.join_id);
}
} // namespace dart