blob: e6a3179511c0435c63411ab25363c2ebc5c4aa30 [file] [log] [blame]
// Copyright 2015 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// This file tests both |RemoteProducerDataPipeImpl| and
// |RemoteConsumerDataPipeImpl|.
#include <stdint.h>
#include "base/bind.h"
#include "base/location.h"
#include "base/logging.h"
#include "base/message_loop/message_loop.h"
#include "base/test/test_io_thread.h"
#include "mojo/edk/embedder/platform_channel_pair.h"
#include "mojo/edk/embedder/simple_platform_support.h"
#include "mojo/edk/system/channel.h"
#include "mojo/edk/system/channel_endpoint.h"
#include "mojo/edk/system/data_pipe.h"
#include "mojo/edk/system/data_pipe_consumer_dispatcher.h"
#include "mojo/edk/system/data_pipe_producer_dispatcher.h"
#include "mojo/edk/system/memory.h"
#include "mojo/edk/system/message_pipe.h"
#include "mojo/edk/system/raw_channel.h"
#include "mojo/edk/system/test_utils.h"
#include "mojo/edk/system/waiter.h"
#include "mojo/public/cpp/system/macros.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace mojo {
namespace system {
namespace {
const MojoHandleSignals kAllSignals = MOJO_HANDLE_SIGNAL_READABLE |
MOJO_HANDLE_SIGNAL_WRITABLE |
MOJO_HANDLE_SIGNAL_PEER_CLOSED;
class RemoteDataPipeImplTest : public testing::Test {
public:
RemoteDataPipeImplTest() : io_thread_(base::TestIOThread::kAutoStart) {}
~RemoteDataPipeImplTest() override {}
void SetUp() override {
scoped_refptr<ChannelEndpoint> ep[2];
message_pipes_[0] = MessagePipe::CreateLocalProxy(&ep[0]);
message_pipes_[1] = MessagePipe::CreateLocalProxy(&ep[1]);
io_thread_.PostTaskAndWait(
FROM_HERE, base::Bind(&RemoteDataPipeImplTest::SetUpOnIOThread,
base::Unretained(this), ep[0], ep[1]));
}
void TearDown() override {
EnsureMessagePipeClosed(0);
EnsureMessagePipeClosed(1);
io_thread_.PostTaskAndWait(
FROM_HERE, base::Bind(&RemoteDataPipeImplTest::TearDownOnIOThread,
base::Unretained(this)));
}
protected:
static DataPipe* CreateLocal(size_t element_size, size_t num_elements) {
const MojoCreateDataPipeOptions options = {
static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions)),
MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,
static_cast<uint32_t>(element_size),
static_cast<uint32_t>(num_elements * element_size)};
MojoCreateDataPipeOptions validated_options = {};
CHECK_EQ(DataPipe::ValidateCreateOptions(MakeUserPointer(&options),
&validated_options),
MOJO_RESULT_OK);
return DataPipe::CreateLocal(validated_options);
}
scoped_refptr<MessagePipe> message_pipe(size_t i) {
return message_pipes_[i];
}
void EnsureMessagePipeClosed(size_t i) {
if (!message_pipes_[i])
return;
message_pipes_[i]->Close(0);
message_pipes_[i] = nullptr;
}
private:
void SetUpOnIOThread(scoped_refptr<ChannelEndpoint> ep0,
scoped_refptr<ChannelEndpoint> ep1) {
CHECK_EQ(base::MessageLoop::current(), io_thread_.message_loop());
embedder::PlatformChannelPair channel_pair;
channels_[0] = new Channel(&platform_support_);
channels_[0]->Init(RawChannel::Create(channel_pair.PassServerHandle()));
channels_[0]->SetBootstrapEndpoint(ep0);
channels_[1] = new Channel(&platform_support_);
channels_[1]->Init(RawChannel::Create(channel_pair.PassClientHandle()));
channels_[1]->SetBootstrapEndpoint(ep1);
}
void TearDownOnIOThread() {
CHECK_EQ(base::MessageLoop::current(), io_thread_.message_loop());
if (channels_[0]) {
channels_[0]->Shutdown();
channels_[0] = nullptr;
}
if (channels_[1]) {
channels_[1]->Shutdown();
channels_[1] = nullptr;
}
}
embedder::SimplePlatformSupport platform_support_;
base::TestIOThread io_thread_;
scoped_refptr<Channel> channels_[2];
scoped_refptr<MessagePipe> message_pipes_[2];
MOJO_DISALLOW_COPY_AND_ASSIGN(RemoteDataPipeImplTest);
};
// These tests are heavier-weight than ideal. They test remote data pipes by
// passing data pipe (producer/consumer) dispatchers over remote message pipes.
// Make sure that the test fixture works properly (i.e., that the message pipe
// works properly, and that things are shut down correctly).
// TODO(vtl): Make lighter-weight tests. Ideally, we'd have tests for remote
// data pipes which don't involve message pipes (or even data pipe dispatchers).
TEST_F(RemoteDataPipeImplTest, Sanity) {
static const char kHello[] = "hello";
char read_buffer[100] = {};
uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
Waiter waiter;
HandleSignalsState hss;
uint32_t context = 0;
// Write on MP 0 (port 0). Wait and receive on MP 1 (port 0). (Add the waiter
// first, to avoid any handling the case where it's already readable.)
waiter.Init();
ASSERT_EQ(MOJO_RESULT_OK,
message_pipe(1)->AddAwakable(
0, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));
EXPECT_EQ(MOJO_RESULT_OK,
message_pipe(0)->WriteMessage(0, UserPointer<const void>(kHello),
sizeof(kHello), nullptr,
MOJO_WRITE_MESSAGE_FLAG_NONE));
EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::ActionDeadline(), &context));
EXPECT_EQ(123u, context);
hss = HandleSignalsState();
message_pipe(1)->RemoveAwakable(0, &waiter, &hss);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
hss.satisfied_signals);
EXPECT_EQ(kAllSignals, hss.satisfiable_signals);
EXPECT_EQ(MOJO_RESULT_OK, message_pipe(1)->ReadMessage(
0, UserPointer<void>(read_buffer),
MakeUserPointer(&read_buffer_size), nullptr,
nullptr, MOJO_READ_MESSAGE_FLAG_NONE));
EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
EXPECT_STREQ(kHello, read_buffer);
}
// TODO(vtl): This test doesn't have an obvious analogue in
// |LocalDataPipeImplTest|.
TEST_F(RemoteDataPipeImplTest, SendConsumerWithClosedProducer) {
char read_buffer[100] = {};
uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
DispatcherVector read_dispatchers;
uint32_t read_num_dispatchers = 10; // Maximum to get.
Waiter waiter;
HandleSignalsState hss;
uint32_t context = 0;
scoped_refptr<DataPipe> dp(CreateLocal(sizeof(int32_t), 1000));
// This is the consumer dispatcher we'll send.
scoped_refptr<DataPipeConsumerDispatcher> consumer =
DataPipeConsumerDispatcher::Create();
consumer->Init(dp);
// Write to the producer and close it, before sending the consumer.
int32_t elements[10] = {123};
uint32_t num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
EXPECT_EQ(MOJO_RESULT_OK,
dp->ProducerWriteData(UserPointer<const void>(elements),
MakeUserPointer(&num_bytes), false));
EXPECT_EQ(1u * sizeof(elements[0]), num_bytes);
dp->ProducerClose();
// Write the consumer to MP 0 (port 0). Wait and receive on MP 1 (port 0).
// (Add the waiter first, to avoid any handling the case where it's already
// readable.)
waiter.Init();
ASSERT_EQ(MOJO_RESULT_OK,
message_pipe(1)->AddAwakable(
0, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));
{
DispatcherTransport transport(
test::DispatcherTryStartTransport(consumer.get()));
EXPECT_TRUE(transport.is_valid());
std::vector<DispatcherTransport> transports;
transports.push_back(transport);
EXPECT_EQ(MOJO_RESULT_OK, message_pipe(0)->WriteMessage(
0, NullUserPointer(), 0, &transports,
MOJO_WRITE_MESSAGE_FLAG_NONE));
transport.End();
// |consumer| should have been closed. This is |DCHECK()|ed when it is
// destroyed.
EXPECT_TRUE(consumer->HasOneRef());
consumer = nullptr;
}
EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::ActionDeadline(), &context));
EXPECT_EQ(123u, context);
hss = HandleSignalsState();
message_pipe(1)->RemoveAwakable(0, &waiter, &hss);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
hss.satisfied_signals);
EXPECT_EQ(kAllSignals, hss.satisfiable_signals);
EXPECT_EQ(MOJO_RESULT_OK,
message_pipe(1)->ReadMessage(
0, UserPointer<void>(read_buffer),
MakeUserPointer(&read_buffer_size), &read_dispatchers,
&read_num_dispatchers, MOJO_READ_MESSAGE_FLAG_NONE));
EXPECT_EQ(0u, static_cast<size_t>(read_buffer_size));
EXPECT_EQ(1u, read_dispatchers.size());
EXPECT_EQ(1u, read_num_dispatchers);
ASSERT_TRUE(read_dispatchers[0]);
EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
EXPECT_EQ(Dispatcher::Type::DATA_PIPE_CONSUMER,
read_dispatchers[0]->GetType());
consumer =
static_cast<DataPipeConsumerDispatcher*>(read_dispatchers[0].get());
read_dispatchers.clear();
waiter.Init();
hss = HandleSignalsState();
MojoResult result =
consumer->AddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 456, &hss);
if (result == MOJO_RESULT_OK) {
context = 0;
EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::ActionDeadline(), &context));
EXPECT_EQ(456u, context);
consumer->RemoveAwakable(&waiter, &hss);
} else {
ASSERT_EQ(MOJO_RESULT_ALREADY_EXISTS, result);
}
// We don't know if the fact that the producer has been closed is known yet.
EXPECT_TRUE((hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE));
EXPECT_TRUE((hss.satisfiable_signals & MOJO_HANDLE_SIGNAL_READABLE));
// Read one element.
elements[0] = -1;
elements[1] = -1;
num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
EXPECT_EQ(MOJO_RESULT_OK, consumer->ReadData(UserPointer<void>(elements),
MakeUserPointer(&num_bytes),
MOJO_READ_DATA_FLAG_NONE));
EXPECT_EQ(1u * sizeof(elements[0]), num_bytes);
EXPECT_EQ(123, elements[0]);
EXPECT_EQ(-1, elements[1]);
waiter.Init();
hss = HandleSignalsState();
result =
consumer->AddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 789, &hss);
if (result == MOJO_RESULT_OK) {
context = 0;
EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
waiter.Wait(test::ActionDeadline(), &context));
EXPECT_EQ(789u, context);
consumer->RemoveAwakable(&waiter, &hss);
} else {
ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result);
}
EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
consumer->Close();
}
} // namespace
} // namespace system
} // namespace mojo