blob: 92572122c4d0d32f72aa9b965f07a8b40f2a1365 [file] [log] [blame]
// Copyright 2015 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "mojo/edk/system/master_connection_manager.h"
#include "base/bind.h"
#include "base/bind_helpers.h"
#include "base/location.h"
#include "base/logging.h"
#include "base/message_loop/message_loop.h"
#include "base/synchronization/waitable_event.h"
#include "mojo/edk/embedder/master_process_delegate.h"
#include "mojo/edk/embedder/platform_channel_pair.h"
#include "mojo/edk/embedder/platform_handle.h"
#include "mojo/edk/embedder/platform_handle_vector.h"
#include "mojo/edk/system/connection_manager_messages.h"
#include "mojo/edk/system/message_in_transit.h"
#include "mojo/edk/system/raw_channel.h"
#include "mojo/edk/system/transport_data.h"
#include "mojo/public/cpp/system/macros.h"
namespace mojo {
namespace system {
namespace {
const ProcessIdentifier kFirstSlaveProcessIdentifier = 2;
static_assert(kMasterProcessIdentifier != kInvalidProcessIdentifier,
"Bad master process identifier");
static_assert(kFirstSlaveProcessIdentifier != kInvalidProcessIdentifier,
"Bad first slave process identifier");
static_assert(kMasterProcessIdentifier != kFirstSlaveProcessIdentifier,
"Master and first slave process identifiers are the same");
MessageInTransit::Subtype ConnectionManagerResultToMessageInTransitSubtype(
ConnectionManager::Result result) {
switch (result) {
case ConnectionManager::Result::FAILURE:
return MessageInTransit::Subtype::CONNECTION_MANAGER_ACK_FAILURE;
case ConnectionManager::Result::SUCCESS:
return MessageInTransit::Subtype::CONNECTION_MANAGER_ACK_SUCCESS;
case ConnectionManager::Result::SUCCESS_CONNECT_SAME_PROCESS:
return MessageInTransit::Subtype::
CONNECTION_MANAGER_ACK_SUCCESS_CONNECT_SAME_PROCESS;
case ConnectionManager::Result::SUCCESS_CONNECT_NEW_CONNECTION:
return MessageInTransit::Subtype::
CONNECTION_MANAGER_ACK_SUCCESS_CONNECT_NEW_CONNECTION;
case ConnectionManager::Result::SUCCESS_CONNECT_REUSE_CONNECTION:
return MessageInTransit::Subtype::
CONNECTION_MANAGER_ACK_SUCCESS_CONNECT_REUSE_CONNECTION;
}
NOTREACHED();
return MessageInTransit::Subtype::CONNECTION_MANAGER_ACK_FAILURE;
}
} // namespace
// MasterConnectionManager::Helper ---------------------------------------------
// |MasterConnectionManager::Helper| is not thread-safe, and must only be used
// on its |owner_|'s private thread.
class MOJO_SYSTEM_IMPL_EXPORT MasterConnectionManager::Helper final
: public RawChannel::Delegate {
public:
Helper(MasterConnectionManager* owner,
ProcessIdentifier process_identifier,
embedder::SlaveInfo slave_info,
embedder::ScopedPlatformHandle platform_handle);
~Helper() override;
void Init();
embedder::SlaveInfo Shutdown();
private:
// |RawChannel::Delegate| methods:
void OnReadMessage(
const MessageInTransit::View& message_view,
embedder::ScopedPlatformHandleVectorPtr platform_handles) override;
void OnError(Error error) override;
// Handles an error that's fatal to this object. Note that this probably
// results in |Shutdown()| being called (in the nested context) and then this
// object being destroyed.
void FatalError();
MasterConnectionManager* const owner_;
const ProcessIdentifier process_identifier_;
embedder::SlaveInfo const slave_info_;
scoped_ptr<RawChannel> raw_channel_;
MOJO_DISALLOW_COPY_AND_ASSIGN(Helper);
};
MasterConnectionManager::Helper::Helper(
MasterConnectionManager* owner,
ProcessIdentifier process_identifier,
embedder::SlaveInfo slave_info,
embedder::ScopedPlatformHandle platform_handle)
: owner_(owner),
process_identifier_(process_identifier),
slave_info_(slave_info),
raw_channel_(RawChannel::Create(platform_handle.Pass())) {
}
MasterConnectionManager::Helper::~Helper() {
DCHECK(!raw_channel_);
}
void MasterConnectionManager::Helper::Init() {
raw_channel_->Init(this);
}
embedder::SlaveInfo MasterConnectionManager::Helper::Shutdown() {
raw_channel_->Shutdown();
raw_channel_.reset();
return slave_info_;
}
void MasterConnectionManager::Helper::OnReadMessage(
const MessageInTransit::View& message_view,
embedder::ScopedPlatformHandleVectorPtr platform_handles) {
if (message_view.type() != MessageInTransit::Type::CONNECTION_MANAGER) {
LOG(ERROR) << "Invalid message type " << message_view.type();
FatalError(); // WARNING: This destroys us.
return;
}
// Currently, all the messages simply have a |ConnectionIdentifier| as data.
if (message_view.num_bytes() != sizeof(ConnectionIdentifier)) {
LOG(ERROR) << "Invalid message size " << message_view.num_bytes();
FatalError(); // WARNING: This destroys us.
return;
}
// And none of them should have any platform handles attached.
if (message_view.transport_data_buffer()) {
LOG(ERROR) << "Invalid message with transport data";
FatalError(); // WARNING: This destroys us.
return;
}
const ConnectionIdentifier* connection_id =
reinterpret_cast<const ConnectionIdentifier*>(message_view.bytes());
Result result = Result::FAILURE;
// Note: It's important to fully zero-initialize |data|, including padding,
// since it'll be sent to another process.
ConnectionManagerAckSuccessConnectData data = {};
embedder::ScopedPlatformHandle platform_handle;
uint32_t num_bytes = 0;
const void* bytes = nullptr;
switch (message_view.subtype()) {
case MessageInTransit::Subtype::CONNECTION_MANAGER_ALLOW_CONNECT:
result = owner_->AllowConnectImpl(process_identifier_, *connection_id)
? Result::SUCCESS
: Result::FAILURE;
break;
case MessageInTransit::Subtype::CONNECTION_MANAGER_CANCEL_CONNECT:
result = owner_->CancelConnectImpl(process_identifier_, *connection_id)
? Result::SUCCESS
: Result::FAILURE;
break;
case MessageInTransit::Subtype::CONNECTION_MANAGER_CONNECT: {
result = owner_->ConnectImpl(process_identifier_, *connection_id,
&data.peer_process_identifier,
&data.is_first, &platform_handle);
DCHECK_NE(result, Result::SUCCESS);
// Success acks for "connect" have the peer process identifier as data
// (and also a platform handle in the case of "new connection" -- handled
// further below).
if (result != Result::FAILURE) {
num_bytes = static_cast<uint32_t>(sizeof(data));
bytes = &data;
}
break;
}
default:
LOG(ERROR) << "Invalid message subtype " << message_view.subtype();
FatalError(); // WARNING: This destroys us.
return;
}
scoped_ptr<MessageInTransit> response(new MessageInTransit(
MessageInTransit::Type::CONNECTION_MANAGER_ACK,
ConnectionManagerResultToMessageInTransitSubtype(result), num_bytes,
bytes));
if (result == Result::SUCCESS_CONNECT_NEW_CONNECTION) {
DCHECK_EQ(message_view.subtype(),
MessageInTransit::Subtype::CONNECTION_MANAGER_CONNECT);
DCHECK(platform_handle.is_valid());
embedder::ScopedPlatformHandleVectorPtr platform_handles(
new embedder::PlatformHandleVector());
platform_handles->push_back(platform_handle.release());
response->SetTransportData(make_scoped_ptr(
new TransportData(platform_handles.Pass(),
raw_channel_->GetSerializedPlatformHandleSize())));
} else {
DCHECK(!platform_handle.is_valid());
}
if (!raw_channel_->WriteMessage(response.Pass())) {
LOG(ERROR) << "WriteMessage failed";
FatalError(); // WARNING: This destroys us.
return;
}
}
void MasterConnectionManager::Helper::OnError(Error /*error*/) {
// Every error (read or write) is fatal (for that particular connection). Read
// errors are fatal since no more commands will be received from that
// connection. Write errors are fatal since it is no longer possible to send
// responses.
FatalError(); // WARNING: This destroys us.
}
void MasterConnectionManager::Helper::FatalError() {
owner_->OnError(process_identifier_); // WARNING: This destroys us.
}
// MasterConnectionManager::PendingConnectInfo ---------------------------------
struct MOJO_SYSTEM_IMPL_EXPORT MasterConnectionManager::PendingConnectInfo {
// States:
// - This is created upon a first "allow connect" (with |first| set
// immediately). We then wait for a second "allow connect".
// - After the second "allow connect" (and |second| is set), we wait for
// "connects" from both |first| and |second|.
// - We may then receive "connect" from either |first| or |second|, at which
// which point it remains to wait for "connect" from the other.
// I.e., the valid state transitions are:
// AWAITING_SECOND_ALLOW_CONNECT -> AWAITING_CONNECTS_FROM_BOTH
// -> {AWAITING_CONNECT_FROM_FIRST,AWAITING_CONNECT_FROM_SECOND}
enum class State {
AWAITING_SECOND_ALLOW_CONNECT,
AWAITING_CONNECTS_FROM_BOTH,
AWAITING_CONNECT_FROM_FIRST,
AWAITING_CONNECT_FROM_SECOND
};
explicit PendingConnectInfo(ProcessIdentifier first)
: state(State::AWAITING_SECOND_ALLOW_CONNECT),
first(first),
second(kInvalidProcessIdentifier) {
DCHECK_NE(first, kInvalidProcessIdentifier);
}
~PendingConnectInfo() {}
State state;
ProcessIdentifier first;
ProcessIdentifier second;
};
// MasterConnectionManager::ProcessConnections ---------------------------------
class MasterConnectionManager::ProcessConnections {
public:
enum class ConnectionStatus { NONE, PENDING, RUNNING };
ProcessConnections() {}
~ProcessConnections() {
// TODO(vtl): Log a warning if there are connections pending? (This might be
// very spammy, since the |MasterConnectionManager| may have many
// |ProcessConnections|.
for (auto& p : process_connections_)
p.second.CloseIfNecessary();
}
// If |pending_platform_handle| is non-null and the status is |PENDING| this
// will "return"/pass the stored pending platform handle. Warning: In that
// case, this has the side effect of changing the state to |RUNNING|.
ConnectionStatus GetConnectionStatus(
ProcessIdentifier to_process_identifier,
embedder::ScopedPlatformHandle* pending_platform_handle) {
DCHECK(!pending_platform_handle || !pending_platform_handle->is_valid());
auto it = process_connections_.find(to_process_identifier);
if (it == process_connections_.end())
return ConnectionStatus::NONE;
if (!it->second.is_valid())
return ConnectionStatus::RUNNING;
// Pending:
if (pending_platform_handle) {
pending_platform_handle->reset(it->second);
it->second = embedder::PlatformHandle();
}
return ConnectionStatus::PENDING;
}
void AddConnection(ProcessIdentifier to_process_identifier,
ConnectionStatus status,
embedder::ScopedPlatformHandle pending_platform_handle) {
DCHECK(process_connections_.find(to_process_identifier) ==
process_connections_.end());
if (status == ConnectionStatus::RUNNING) {
DCHECK(!pending_platform_handle.is_valid());
process_connections_[to_process_identifier] = embedder::PlatformHandle();
} else if (status == ConnectionStatus::PENDING) {
DCHECK(pending_platform_handle.is_valid());
process_connections_[to_process_identifier] =
pending_platform_handle.release();
} else {
NOTREACHED();
}
}
private:
base::hash_map<ProcessIdentifier, embedder::PlatformHandle>
process_connections_; // "Owns" any valid platform handles.
MOJO_DISALLOW_COPY_AND_ASSIGN(ProcessConnections);
};
// MasterConnectionManager -----------------------------------------------------
MasterConnectionManager::MasterConnectionManager(
embedder::PlatformSupport* platform_support)
: ConnectionManager(platform_support),
master_process_delegate_(),
private_thread_("MasterConnectionManagerPrivateThread"),
next_process_identifier_(kFirstSlaveProcessIdentifier) {
connections_[kMasterProcessIdentifier] = new ProcessConnections();
}
MasterConnectionManager::~MasterConnectionManager() {
DCHECK(!delegate_thread_task_runner_);
DCHECK(!master_process_delegate_);
DCHECK(!private_thread_.message_loop());
DCHECK(helpers_.empty());
DCHECK(pending_connects_.empty());
}
void MasterConnectionManager::Init(
scoped_refptr<base::TaskRunner> delegate_thread_task_runner,
embedder::MasterProcessDelegate* master_process_delegate) {
DCHECK(delegate_thread_task_runner);
DCHECK(master_process_delegate);
DCHECK(!delegate_thread_task_runner_);
DCHECK(!master_process_delegate_);
DCHECK(!private_thread_.message_loop());
delegate_thread_task_runner_ = delegate_thread_task_runner;
master_process_delegate_ = master_process_delegate;
CHECK(private_thread_.StartWithOptions(
base::Thread::Options(base::MessageLoop::TYPE_IO, 0)));
}
ProcessIdentifier MasterConnectionManager::AddSlave(
embedder::SlaveInfo slave_info,
embedder::ScopedPlatformHandle platform_handle) {
// We don't really care if |slave_info| is non-null or not.
DCHECK(platform_handle.is_valid());
AssertNotOnPrivateThread();
ProcessIdentifier slave_process_identifier;
{
MutexLocker locker(&mutex_);
CHECK_NE(next_process_identifier_, kMasterProcessIdentifier);
slave_process_identifier = next_process_identifier_;
next_process_identifier_++;
DCHECK(connections_.find(slave_process_identifier) == connections_.end());
connections_[slave_process_identifier] = new ProcessConnections();
}
// We have to wait for the task to be executed, in case someone calls
// |AddSlave()| followed immediately by |Shutdown()|.
base::WaitableEvent event(false, false);
private_thread_.message_loop()->PostTask(
FROM_HERE,
base::Bind(&MasterConnectionManager::AddSlaveOnPrivateThread,
base::Unretained(this), base::Unretained(slave_info),
base::Passed(&platform_handle), slave_process_identifier,
base::Unretained(&event)));
event.Wait();
return slave_process_identifier;
}
ProcessIdentifier MasterConnectionManager::AddSlaveAndBootstrap(
embedder::SlaveInfo slave_info,
embedder::ScopedPlatformHandle platform_handle,
const ConnectionIdentifier& connection_id) {
ProcessIdentifier slave_process_identifier =
AddSlave(slave_info, platform_handle.Pass());
MutexLocker locker(&mutex_);
DCHECK(pending_connects_.find(connection_id) == pending_connects_.end());
PendingConnectInfo* info = new PendingConnectInfo(kMasterProcessIdentifier);
info->state = PendingConnectInfo::State::AWAITING_CONNECTS_FROM_BOTH;
info->second = slave_process_identifier;
pending_connects_[connection_id] = info;
return slave_process_identifier;
}
void MasterConnectionManager::Shutdown() {
AssertNotOnPrivateThread();
DCHECK(master_process_delegate_);
DCHECK(private_thread_.message_loop());
// The |Stop()| will actually finish all posted tasks.
private_thread_.message_loop()->PostTask(
FROM_HERE, base::Bind(&MasterConnectionManager::ShutdownOnPrivateThread,
base::Unretained(this)));
private_thread_.Stop();
DCHECK(helpers_.empty());
DCHECK(pending_connects_.empty());
master_process_delegate_ = nullptr;
delegate_thread_task_runner_ = nullptr;
}
bool MasterConnectionManager::AllowConnect(
const ConnectionIdentifier& connection_id) {
AssertNotOnPrivateThread();
return AllowConnectImpl(kMasterProcessIdentifier, connection_id);
}
bool MasterConnectionManager::CancelConnect(
const ConnectionIdentifier& connection_id) {
AssertNotOnPrivateThread();
return CancelConnectImpl(kMasterProcessIdentifier, connection_id);
}
ConnectionManager::Result MasterConnectionManager::Connect(
const ConnectionIdentifier& connection_id,
ProcessIdentifier* peer_process_identifier,
bool* is_first,
embedder::ScopedPlatformHandle* platform_handle) {
return ConnectImpl(kMasterProcessIdentifier, connection_id,
peer_process_identifier, is_first, platform_handle);
}
bool MasterConnectionManager::AllowConnectImpl(
ProcessIdentifier process_identifier,
const ConnectionIdentifier& connection_id) {
DCHECK_NE(process_identifier, kInvalidProcessIdentifier);
MutexLocker locker(&mutex_);
auto it = pending_connects_.find(connection_id);
if (it == pending_connects_.end()) {
pending_connects_[connection_id] =
new PendingConnectInfo(process_identifier);
// TODO(vtl): Track process identifier -> pending connections also (so these
// can be removed efficiently if that process disconnects).
DVLOG(1) << "New pending connection ID " << connection_id.ToString()
<< ": AllowConnect() from first process identifier "
<< process_identifier;
return true;
}
PendingConnectInfo* info = it->second;
if (info->state == PendingConnectInfo::State::AWAITING_SECOND_ALLOW_CONNECT) {
info->state = PendingConnectInfo::State::AWAITING_CONNECTS_FROM_BOTH;
info->second = process_identifier;
DVLOG(1) << "Pending connection ID " << connection_id.ToString()
<< ": AllowConnect() from second process identifier "
<< process_identifier;
return true;
}
// Someone's behaving badly, but we don't know who (it might not be the
// caller).
LOG(ERROR) << "AllowConnect() from process " << process_identifier
<< " for connection ID " << connection_id.ToString()
<< " already in state " << static_cast<int>(info->state);
pending_connects_.erase(it);
delete info;
return false;
}
bool MasterConnectionManager::CancelConnectImpl(
ProcessIdentifier process_identifier,
const ConnectionIdentifier& connection_id) {
DCHECK_NE(process_identifier, kInvalidProcessIdentifier);
MutexLocker locker(&mutex_);
auto it = pending_connects_.find(connection_id);
if (it == pending_connects_.end()) {
// Not necessarily the caller's fault, and not necessarily an error.
DVLOG(1) << "CancelConnect() from process " << process_identifier
<< " for connection ID " << connection_id.ToString()
<< " which is not (or no longer) pending";
return true;
}
PendingConnectInfo* info = it->second;
if (process_identifier != info->first && process_identifier != info->second) {
LOG(ERROR) << "CancelConnect() from process " << process_identifier
<< " for connection ID " << connection_id.ToString()
<< " which is neither connectee";
return false;
}
// Just erase it. The other side may also try to cancel, in which case it'll
// "fail" in the first if statement above (we assume that connection IDs never
// collide, so there's no need to carefully track both sides).
pending_connects_.erase(it);
delete info;
return true;
}
ConnectionManager::Result MasterConnectionManager::ConnectImpl(
ProcessIdentifier process_identifier,
const ConnectionIdentifier& connection_id,
ProcessIdentifier* peer_process_identifier,
bool* is_first,
embedder::ScopedPlatformHandle* platform_handle) {
DCHECK_NE(process_identifier, kInvalidProcessIdentifier);
DCHECK(peer_process_identifier);
DCHECK(is_first);
DCHECK(platform_handle);
DCHECK(!platform_handle->is_valid()); // Not technically wrong, but unlikely.
MutexLocker locker(&mutex_);
auto it = pending_connects_.find(connection_id);
if (it == pending_connects_.end()) {
// Not necessarily the caller's fault.
LOG(ERROR) << "Connect() from process " << process_identifier
<< " for connection ID " << connection_id.ToString()
<< " which is not pending";
return Result::FAILURE;
}
PendingConnectInfo* info = it->second;
ProcessIdentifier peer;
if (info->state == PendingConnectInfo::State::AWAITING_CONNECTS_FROM_BOTH) {
if (process_identifier == info->first) {
info->state = PendingConnectInfo::State::AWAITING_CONNECT_FROM_SECOND;
peer = info->second;
} else if (process_identifier == info->second) {
info->state = PendingConnectInfo::State::AWAITING_CONNECT_FROM_FIRST;
peer = info->first;
} else {
LOG(ERROR) << "Connect() from process " << process_identifier
<< " for connection ID " << connection_id.ToString()
<< " which is neither connectee";
return Result::FAILURE;
}
DVLOG(1) << "Connection ID " << connection_id.ToString()
<< ": first Connect() from process identifier "
<< process_identifier;
*peer_process_identifier = peer;
*is_first = true;
return ConnectImplHelperNoLock(process_identifier, peer, platform_handle);
}
// The remaining cases all result in |it| being removed from
// |pending_connects_| and deleting |info|.
pending_connects_.erase(it);
scoped_ptr<PendingConnectInfo> info_deleter(info);
// |remaining_connectee| should be the same as |process_identifier|.
ProcessIdentifier remaining_connectee;
if (info->state == PendingConnectInfo::State::AWAITING_CONNECT_FROM_FIRST) {
remaining_connectee = info->first;
peer = info->second;
} else if (info->state ==
PendingConnectInfo::State::AWAITING_CONNECT_FROM_SECOND) {
remaining_connectee = info->second;
peer = info->first;
} else {
// Someone's behaving badly, but we don't know who (it might not be the
// caller).
LOG(ERROR) << "Connect() from process " << process_identifier
<< " for connection ID " << connection_id.ToString()
<< " in state " << static_cast<int>(info->state);
return Result::FAILURE;
}
if (process_identifier != remaining_connectee) {
LOG(ERROR) << "Connect() from process " << process_identifier
<< " for connection ID " << connection_id.ToString()
<< " which is not the remaining connectee";
return Result::FAILURE;
}
DVLOG(1) << "Connection ID " << connection_id.ToString()
<< ": second Connect() from process identifier "
<< process_identifier;
*peer_process_identifier = peer;
*is_first = false;
return ConnectImplHelperNoLock(process_identifier, peer, platform_handle);
}
ConnectionManager::Result MasterConnectionManager::ConnectImplHelperNoLock(
ProcessIdentifier process_identifier,
ProcessIdentifier peer_process_identifier,
embedder::ScopedPlatformHandle* platform_handle) {
if (process_identifier == peer_process_identifier) {
platform_handle->reset();
DVLOG(1) << "Connect: same process";
return Result::SUCCESS_CONNECT_SAME_PROCESS;
}
// We should know about the process identified by |process_identifier|.
DCHECK(connections_.find(process_identifier) != connections_.end());
ProcessConnections* process_connections = connections_[process_identifier];
// We should also know about the peer.
DCHECK(connections_.find(peer_process_identifier) != connections_.end());
switch (process_connections->GetConnectionStatus(peer_process_identifier,
platform_handle)) {
case ProcessConnections::ConnectionStatus::NONE: {
// TODO(vtl): In the "second connect" case, this should never be reached
// (but it's not easy to DCHECK this invariant here).
process_connections->AddConnection(
peer_process_identifier,
ProcessConnections::ConnectionStatus::RUNNING,
embedder::ScopedPlatformHandle());
embedder::PlatformChannelPair platform_channel_pair;
*platform_handle = platform_channel_pair.PassServerHandle();
connections_[peer_process_identifier]->AddConnection(
process_identifier, ProcessConnections::ConnectionStatus::PENDING,
platform_channel_pair.PassClientHandle());
break;
}
case ProcessConnections::ConnectionStatus::PENDING:
DCHECK(connections_[peer_process_identifier]->GetConnectionStatus(
process_identifier, nullptr) ==
ProcessConnections::ConnectionStatus::RUNNING);
break;
case ProcessConnections::ConnectionStatus::RUNNING:
// |process_identifier| already has a connection to
// |peer_process_identifier|, so it should reuse that.
platform_handle->reset();
DVLOG(1) << "Connect: reuse connection";
return Result::SUCCESS_CONNECT_REUSE_CONNECTION;
}
DCHECK(platform_handle->is_valid());
DVLOG(1) << "Connect: new connection";
return Result::SUCCESS_CONNECT_NEW_CONNECTION;
}
void MasterConnectionManager::ShutdownOnPrivateThread() {
AssertOnPrivateThread();
if (!pending_connects_.empty()) {
DVLOG(1) << "Shutting down with connections pending";
for (auto& p : pending_connects_)
delete p.second;
pending_connects_.clear();
}
for (auto& p : connections_)
delete p.second;
connections_.clear();
if (!helpers_.empty()) {
DVLOG(1) << "Shutting down with slaves still connected";
for (auto& p : helpers_) {
embedder::SlaveInfo slave_info = p.second->Shutdown();
delete p.second;
CallOnSlaveDisconnect(slave_info);
}
helpers_.clear();
}
}
void MasterConnectionManager::AddSlaveOnPrivateThread(
embedder::SlaveInfo slave_info,
embedder::ScopedPlatformHandle platform_handle,
ProcessIdentifier slave_process_identifier,
base::WaitableEvent* event) {
DCHECK(platform_handle.is_valid());
DCHECK(event);
AssertOnPrivateThread();
scoped_ptr<Helper> helper(new Helper(this, slave_process_identifier,
slave_info, platform_handle.Pass()));
helper->Init();
DCHECK(helpers_.find(slave_process_identifier) == helpers_.end());
helpers_[slave_process_identifier] = helper.release();
DVLOG(1) << "Added slave process identifier " << slave_process_identifier;
event->Signal();
}
void MasterConnectionManager::OnError(ProcessIdentifier process_identifier) {
DCHECK_NE(process_identifier, kInvalidProcessIdentifier);
AssertOnPrivateThread();
auto it = helpers_.find(process_identifier);
DCHECK(it != helpers_.end());
Helper* helper = it->second;
embedder::SlaveInfo slave_info = helper->Shutdown();
helpers_.erase(it);
delete helper;
{
MutexLocker locker(&mutex_);
// TODO(vtl): This isn't very efficient.
for (auto it = pending_connects_.begin(); it != pending_connects_.end();) {
if (it->second->first == process_identifier ||
it->second->second == process_identifier) {
auto it_to_erase = it;
++it;
delete it_to_erase->second;
pending_connects_.erase(it_to_erase);
} else {
++it;
}
}
}
CallOnSlaveDisconnect(slave_info);
}
void MasterConnectionManager::CallOnSlaveDisconnect(
embedder::SlaveInfo slave_info) {
AssertOnPrivateThread();
DCHECK(master_process_delegate_);
delegate_thread_task_runner_->PostTask(
FROM_HERE, base::Bind(&embedder::MasterProcessDelegate::OnSlaveDisconnect,
base::Unretained(master_process_delegate_),
base::Unretained(slave_info)));
}
void MasterConnectionManager::AssertNotOnPrivateThread() const {
// This should only be called after |Init()| and before |Shutdown()|. (If not,
// the subsequent |DCHECK_NE()| is invalid, since the current thread may not
// have a message loop.)
DCHECK(private_thread_.message_loop());
DCHECK_NE(base::MessageLoop::current(), private_thread_.message_loop());
}
void MasterConnectionManager::AssertOnPrivateThread() const {
// This should only be called after |Init()| and before |Shutdown()|.
DCHECK(private_thread_.message_loop());
DCHECK_EQ(base::MessageLoop::current(), private_thread_.message_loop());
}
} // namespace system
} // namespace mojo