Movatterモバイル変換


[0]ホーム

URL:


Google Git
Sign in
chromium /chromium /src /refs/heads/main /. /ipc /ipc_mojo_bootstrap.cc
blob: 75056be9b688fa7cb4eb2fec7e324eb7d5da7be5 [file] [log] [blame] [edit]
// Copyright 2014 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include"ipc/ipc_mojo_bootstrap.h"
#include<inttypes.h>
#include<stdint.h>
#include<algorithm>
#include<map>
#include<memory>
#include<optional>
#include<set>
#include<utility>
#include<vector>
#include"base/check_op.h"
#include"base/containers/circular_deque.h"
#include"base/containers/contains.h"
#include"base/feature_list.h"
#include"base/functional/bind.h"
#include"base/functional/callback.h"
#include"base/hash/hash.h"
#include"base/memory/ptr_util.h"
#include"base/memory/raw_ptr.h"
#include"base/no_destructor.h"
#include"base/sequence_checker.h"
#include"base/strings/stringprintf.h"
#include"base/synchronization/lock.h"
#include"base/synchronization/waitable_event.h"
#include"base/task/common/task_annotator.h"
#include"base/task/sequenced_task_runner.h"
#include"base/task/single_thread_task_runner.h"
#include"base/thread_annotations.h"
#include"base/trace_event/memory_allocator_dump.h"
#include"base/trace_event/memory_dump_manager.h"
#include"base/trace_event/memory_dump_provider.h"
#include"base/trace_event/typed_macros.h"
#include"ipc/ipc_channel.h"
#include"ipc/urgent_message_observer.h"
#include"mojo/public/c/system/types.h"
#include"mojo/public/cpp/bindings/associated_group.h"
#include"mojo/public/cpp/bindings/associated_group_controller.h"
#include"mojo/public/cpp/bindings/connector.h"
#include"mojo/public/cpp/bindings/features.h"
#include"mojo/public/cpp/bindings/interface_endpoint_client.h"
#include"mojo/public/cpp/bindings/interface_endpoint_controller.h"
#include"mojo/public/cpp/bindings/interface_id.h"
#include"mojo/public/cpp/bindings/message.h"
#include"mojo/public/cpp/bindings/message_header_validator.h"
#include"mojo/public/cpp/bindings/mojo_buildflags.h"
#include"mojo/public/cpp/bindings/pipe_control_message_handler.h"
#include"mojo/public/cpp/bindings/pipe_control_message_handler_delegate.h"
#include"mojo/public/cpp/bindings/pipe_control_message_proxy.h"
#include"mojo/public/cpp/bindings/scoped_message_error_crash_key.h"
#include"mojo/public/cpp/bindings/sequence_local_sync_event_watcher.h"
#include"mojo/public/cpp/bindings/tracing_helpers.h"
namespace IPC{
classChannelAssociatedGroupController;
namespace{
constinit thread_localbool off_sequence_binding_allowed=false;
BASE_FEATURE(kMojoChannelAssociatedSendUsesRunOrPostTask,
"MojoChannelAssociatedSendUsesRunOrPostTask",
base::FEATURE_DISABLED_BY_DEFAULT);
// Used to track some internal Channel state in pursuit of message leaks.
//
// TODO(crbug.com/40563310): Remove this.
classControllerMemoryDumpProvider
:public base::trace_event::MemoryDumpProvider{
public:
ControllerMemoryDumpProvider(){
base::trace_event::MemoryDumpManager::GetInstance()->RegisterDumpProvider(
this,"IPCChannel",nullptr);
}
ControllerMemoryDumpProvider(constControllerMemoryDumpProvider&)=delete;
ControllerMemoryDumpProvider&operator=(constControllerMemoryDumpProvider&)=
delete;
~ControllerMemoryDumpProvider() override{
base::trace_event::MemoryDumpManager::GetInstance()->UnregisterDumpProvider(
this);
}
voidAddController(ChannelAssociatedGroupController* controller){
base::AutoLock lock(lock_);
controllers_.insert(controller);
}
voidRemoveController(ChannelAssociatedGroupController* controller){
base::AutoLock lock(lock_);
controllers_.erase(controller);
}
// base::trace_event::MemoryDumpProvider:
boolOnMemoryDump(const base::trace_event::MemoryDumpArgs& args,
base::trace_event::ProcessMemoryDump* pmd) override;
private:
base::Lock lock_;
std::set<raw_ptr<ChannelAssociatedGroupController,SetExperimental>>
controllers_;
};
ControllerMemoryDumpProvider&GetMemoryDumpProvider(){
static base::NoDestructor<ControllerMemoryDumpProvider> provider;
return*provider;
}
// Messages are grouped by this info when recording memory metrics.
structMessageMemoryDumpInfo{
MessageMemoryDumpInfo(const mojo::Message& message)
: id(message.name()), profiler_tag(message.heap_profiler_tag()){}
MessageMemoryDumpInfo()=default;
booloperator==(constMessageMemoryDumpInfo& other)const{
return other.id== id&& other.profiler_tag== profiler_tag;
}
uint32_t id=0;
constchar* profiler_tag=nullptr;
};
structMessageMemoryDumpInfoHash{
size_toperator()(constMessageMemoryDumpInfo& info)const{
return base::HashInts(
info.id, info.profiler_tag? base::FastHash(info.profiler_tag):0);
}
};
classScopedUrgentMessageNotification{
public:
explicitScopedUrgentMessageNotification(
UrgentMessageObserver* observer=nullptr)
: observer_(observer){
if(observer_){
observer_->OnUrgentMessageReceived();
}
}
~ScopedUrgentMessageNotification(){
if(observer_){
observer_->OnUrgentMessageProcessed();
}
}
ScopedUrgentMessageNotification(ScopedUrgentMessageNotification&& other)
: observer_(std::exchange(other.observer_,nullptr)){}
ScopedUrgentMessageNotification&operator=(
ScopedUrgentMessageNotification&& other){
observer_= std::exchange(other.observer_,nullptr);
return*this;
}
private:
raw_ptr<UrgentMessageObserver> observer_;
};
}// namespace
classChannelAssociatedGroupController
:public mojo::AssociatedGroupController,
public mojo::MessageReceiver,
public mojo::PipeControlMessageHandlerDelegate{
public:
ChannelAssociatedGroupController(
bool set_interface_id_namespace_bit,
const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner)
: task_runner_(ipc_task_runner),
proxy_task_runner_(proxy_task_runner),
set_interface_id_namespace_bit_(set_interface_id_namespace_bit),
dispatcher_(this),
control_message_handler_(this),
control_message_proxy_thunk_(this),
control_message_proxy_(&control_message_proxy_thunk_){
control_message_handler_.SetDescription(
"IPC::mojom::Bootstrap [primary] PipeControlMessageHandler");
dispatcher_.SetValidator(std::make_unique<mojo::MessageHeaderValidator>(
"IPC::mojom::Bootstrap [primary] MessageHeaderValidator"));
GetMemoryDumpProvider().AddController(this);
DETACH_FROM_SEQUENCE(sequence_checker_);
}
ChannelAssociatedGroupController(constChannelAssociatedGroupController&)=
delete;
ChannelAssociatedGroupController&operator=(
constChannelAssociatedGroupController&)=delete;
size_tGetQueuedMessageCount(){
base::AutoLock lock(outgoing_messages_lock_);
return outgoing_messages_.size();
}
voidGetTopQueuedMessageMemoryDumpInfo(MessageMemoryDumpInfo* info,
size_t* count){
std::unordered_map<MessageMemoryDumpInfo,size_t,MessageMemoryDumpInfoHash>
counts;
std::pair<MessageMemoryDumpInfo,size_t> top_message_info_and_count={
MessageMemoryDumpInfo(),0};
base::AutoLock lock(outgoing_messages_lock_);
for(constauto& message: outgoing_messages_){
auto it_and_inserted= counts.emplace(MessageMemoryDumpInfo(message),0);
it_and_inserted.first->second++;
if(it_and_inserted.first->second> top_message_info_and_count.second)
top_message_info_and_count=*it_and_inserted.first;
}
*info= top_message_info_and_count.first;
*count= top_message_info_and_count.second;
}
voidPause(){
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
CHECK(was_bound_or_message_sent_);
CHECK(!paused_);
paused_=true;
}
voidUnpause(){
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
CHECK(was_bound_or_message_sent_);
CHECK(paused_);
paused_=false;
}
voidFlushOutgoingMessages(){
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
CHECK(was_bound_or_message_sent_);
std::vector<mojo::Message> outgoing_messages;
{
base::AutoLock lock(outgoing_messages_lock_);
std::swap(outgoing_messages, outgoing_messages_);
}
for(auto& message: outgoing_messages)
SendMessage(&message);
}
voidBind(mojo::ScopedMessagePipeHandle handle,
mojo::PendingAssociatedRemote<mojom::Channel>* sender,
mojo::PendingAssociatedReceiver<mojom::Channel>* receiver){
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
connector_= std::make_unique<mojo::Connector>(
std::move(handle), mojo::Connector::SINGLE_THREADED_SEND,
"IPC Channel");
connector_->set_incoming_receiver(&dispatcher_);
connector_->set_connection_error_handler(
base::BindOnce(&ChannelAssociatedGroupController::OnPipeError,
base::Unretained(this)));
connector_->set_enforce_errors_from_incoming_receiver(false);
// Don't let the Connector do any sort of queuing on our behalf. Individual
// messages bound for the IPC::ChannelProxy thread (i.e. that vast majority
// of messages received by this Connector) are already individually
// scheduled for dispatch by ChannelProxy, so Connector's normal mode of
// operation would only introduce a redundant scheduling step for most
// messages.
connector_->set_force_immediate_dispatch(true);
mojo::InterfaceId sender_id, receiver_id;
if(set_interface_id_namespace_bit_){
sender_id=1| mojo::kInterfaceIdNamespaceMask;
receiver_id=1;
}else{
sender_id=1;
receiver_id=1| mojo::kInterfaceIdNamespaceMask;
}
{
base::AutoLock locker(lock_);
Endpoint* sender_endpoint=newEndpoint(this, sender_id);
Endpoint* receiver_endpoint=newEndpoint(this, receiver_id);
endpoints_.insert({ sender_id, sender_endpoint});
endpoints_.insert({ receiver_id, receiver_endpoint});
sender_endpoint->set_handle_created();
receiver_endpoint->set_handle_created();
}
mojo::ScopedInterfaceEndpointHandle sender_handle=
CreateScopedInterfaceEndpointHandle(sender_id);
mojo::ScopedInterfaceEndpointHandle receiver_handle=
CreateScopedInterfaceEndpointHandle(receiver_id);
*sender= mojo::PendingAssociatedRemote<mojom::Channel>(
std::move(sender_handle),0);
*receiver= mojo::PendingAssociatedReceiver<mojom::Channel>(
std::move(receiver_handle));
if(!was_bound_or_message_sent_){
was_bound_or_message_sent_=true;
DETACH_FROM_SEQUENCE(sequence_checker_);
}
}
voidStartReceiving(){
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
CHECK(was_bound_or_message_sent_);
connector_->StartReceiving(task_runner_);
}
voidShutDown(){
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
shut_down_=true;
if(connector_)
connector_->CloseMessagePipe();
OnPipeError();
connector_.reset();
base::AutoLock lock(outgoing_messages_lock_);
outgoing_messages_.clear();
}
// mojo::AssociatedGroupController:
mojo::InterfaceIdAssociateInterface(
mojo::ScopedInterfaceEndpointHandle handle_to_send) override{
if(!handle_to_send.pending_association())
return mojo::kInvalidInterfaceId;
uint32_t id=0;
{
base::AutoLock locker(lock_);
do{
if(next_interface_id_>= mojo::kInterfaceIdNamespaceMask)
next_interface_id_=2;
id= next_interface_id_++;
if(set_interface_id_namespace_bit_)
id|= mojo::kInterfaceIdNamespaceMask;
}while(base::Contains(endpoints_, id));
Endpoint* endpoint=newEndpoint(this, id);
if(encountered_error_)
endpoint->set_peer_closed();
endpoint->set_handle_created();
endpoints_.insert({id, endpoint});
}
if(!NotifyAssociation(&handle_to_send, id)){
// The peer handle of |handle_to_send|, which is supposed to join this
// associated group, has been closed.
{
base::AutoLock locker(lock_);
Endpoint* endpoint=FindEndpoint(id);
if(endpoint)
MarkClosedAndMaybeRemove(endpoint);
}
control_message_proxy_.NotifyPeerEndpointClosed(
id, handle_to_send.disconnect_reason());
}
return id;
}
mojo::ScopedInterfaceEndpointHandleCreateLocalEndpointHandle(
mojo::InterfaceId id) override{
if(!mojo::IsValidInterfaceId(id))
return mojo::ScopedInterfaceEndpointHandle();
// Unless it is the primary ID, |id| is from the remote side and therefore
// its namespace bit is supposed to be different than the value that this
// router would use.
if(!mojo::IsPrimaryInterfaceId(id)&&
set_interface_id_namespace_bit_==
mojo::HasInterfaceIdNamespaceBitSet(id)){
return mojo::ScopedInterfaceEndpointHandle();
}
base::AutoLock locker(lock_);
bool inserted=false;
Endpoint* endpoint=FindOrInsertEndpoint(id,&inserted);
if(inserted){
DCHECK(!endpoint->handle_created());
if(encountered_error_)
endpoint->set_peer_closed();
}else{
if(endpoint->handle_created())
return mojo::ScopedInterfaceEndpointHandle();
}
endpoint->set_handle_created();
returnCreateScopedInterfaceEndpointHandle(id);
}
voidCloseEndpointHandle(
mojo::InterfaceId id,
const std::optional<mojo::DisconnectReason>& reason) override{
if(!mojo::IsValidInterfaceId(id))
return;
{
base::AutoLock locker(lock_);
DCHECK(base::Contains(endpoints_, id));
Endpoint* endpoint= endpoints_[id].get();
DCHECK(!endpoint->client());
DCHECK(!endpoint->closed());
MarkClosedAndMaybeRemove(endpoint);
}
if(!mojo::IsPrimaryInterfaceId(id)|| reason)
control_message_proxy_.NotifyPeerEndpointClosed(id, reason);
}
voidNotifyLocalEndpointOfPeerClosure(mojo::InterfaceId id) override{
if(!task_runner_->RunsTasksInCurrentSequence()){
task_runner_->PostTask(
FROM_HERE, base::BindOnce(&ChannelAssociatedGroupController::
NotifyLocalEndpointOfPeerClosure,
base::WrapRefCounted(this), id));
return;
}
OnPeerAssociatedEndpointClosed(id, std::nullopt);
}
mojo::InterfaceEndpointController*AttachEndpointClient(
const mojo::ScopedInterfaceEndpointHandle& handle,
mojo::InterfaceEndpointClient* client,
scoped_refptr<base::SequencedTaskRunner> runner) override{
const mojo::InterfaceId id= handle.id();
DCHECK(mojo::IsValidInterfaceId(id));
DCHECK(client);
base::AutoLock locker(lock_);
DCHECK(base::Contains(endpoints_, id));
Endpoint* endpoint= endpoints_[id].get();
endpoint->AttachClient(client, std::move(runner));
if(endpoint->peer_closed())
NotifyEndpointOfError(endpoint,true/* force_async */);
return endpoint;
}
voidDetachEndpointClient(
const mojo::ScopedInterfaceEndpointHandle& handle) override{
const mojo::InterfaceId id= handle.id();
DCHECK(mojo::IsValidInterfaceId(id));
base::AutoLock locker(lock_);
DCHECK(base::Contains(endpoints_, id));
Endpoint* endpoint= endpoints_[id].get();
endpoint->DetachClient();
}
voidRaiseError() override{
// We ignore errors on channel endpoints, leaving the pipe open. There are
// good reasons for this:
//
// * We should never close a channel endpoint in either process as long as
// the child process is still alive. The child's endpoint should only be
// closed implicitly by process death, and the browser's endpoint should
// only be closed after the child process is confirmed to be dead. Crash
// reporting logic in Chrome relies on this behavior in order to do the
// right thing.
//
// * There are two interesting conditions under which RaiseError() can be
// implicitly reached: an incoming message fails validation, or the
// local endpoint drops a response callback without calling it.
//
// * In the validation case, we also report the message as bad, and this
// will imminently trigger the common bad-IPC path in the browser,
// causing the browser to kill the offending renderer.
//
// * In the dropped response callback case, the net result of ignoring the
// issue is generally innocuous. While indicative of programmer error,
// it's not a severe failure and is already covered by separate DCHECKs.
//
// See https://crbug.com/861607 for additional discussion.
}
boolPrefersSerializedMessages() override{returntrue;}
voidSetUrgentMessageObserver(UrgentMessageObserver* observer){
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
CHECK(!was_bound_or_message_sent_);
urgent_message_observer_= observer;
DETACH_FROM_SEQUENCE(sequence_checker_);
}
private:
classEndpoint;
classControlMessageProxyThunk;
friendclassEndpoint;
friendclassControlMessageProxyThunk;
// MessageWrapper objects are always destroyed under the controller's lock. On
// destruction, if the message it wrappers contains
// ScopedInterfaceEndpointHandles (which cannot be destructed under the
// controller's lock), the wrapper unlocks to clean them up.
classMessageWrapper{
public:
MessageWrapper()=default;
MessageWrapper(ChannelAssociatedGroupController* controller,
mojo::Message message)
: controller_(controller), value_(std::move(message)){}
MessageWrapper(MessageWrapper&& other)
: controller_(other.controller_), value_(std::move(other.value_)){}
MessageWrapper(constMessageWrapper&)=delete;
MessageWrapper&operator=(constMessageWrapper&)=delete;
~MessageWrapper(){
if(value_.associated_endpoint_handles()->empty())
return;
controller_->lock_.AssertAcquired();
{
base::AutoUnlock unlocker(controller_->lock_);
value_.mutable_associated_endpoint_handles()->clear();
}
}
MessageWrapper&operator=(MessageWrapper&& other){
controller_= other.controller_;
value_= std::move(other.value_);
return*this;
}
boolHasRequestId(uint64_t request_id){
return!value_.IsNull()&& value_.version()>=1&&
value_.header_v1()->request_id== request_id;
}
mojo::Message& value(){return value_;}
private:
raw_ptr<ChannelAssociatedGroupController> controller_=nullptr;
mojo::Message value_;
};
classEndpoint:public base::RefCountedThreadSafe<Endpoint>,
public mojo::InterfaceEndpointController{
public:
Endpoint(ChannelAssociatedGroupController* controller, mojo::InterfaceId id)
: controller_(controller), id_(id){}
Endpoint(constEndpoint&)=delete;
Endpoint&operator=(constEndpoint&)=delete;
mojo::InterfaceId id()const{return id_;}
bool closed()const{
controller_->lock_.AssertAcquired();
return closed_;
}
void set_closed(){
controller_->lock_.AssertAcquired();
closed_=true;
}
bool peer_closed()const{
controller_->lock_.AssertAcquired();
return peer_closed_;
}
void set_peer_closed(){
controller_->lock_.AssertAcquired();
peer_closed_=true;
}
bool handle_created()const{
controller_->lock_.AssertAcquired();
return handle_created_;
}
void set_handle_created(){
controller_->lock_.AssertAcquired();
handle_created_=true;
}
const std::optional<mojo::DisconnectReason>& disconnect_reason()const{
return disconnect_reason_;
}
void set_disconnect_reason(
const std::optional<mojo::DisconnectReason>& disconnect_reason){
disconnect_reason_= disconnect_reason;
}
base::SequencedTaskRunner* task_runner()const{
return task_runner_.get();
}
bool was_bound_off_sequence()const{return was_bound_off_sequence_;}
mojo::InterfaceEndpointClient* client()const{
controller_->lock_.AssertAcquired();
return client_;
}
voidAttachClient(mojo::InterfaceEndpointClient* client,
scoped_refptr<base::SequencedTaskRunner> runner){
controller_->lock_.AssertAcquired();
DCHECK(!client_);
DCHECK(!closed_);
task_runner_= std::move(runner);
client_= client;
if(off_sequence_binding_allowed){
was_bound_off_sequence_=true;
}
}
voidDetachClient(){
controller_->lock_.AssertAcquired();
DCHECK(client_);
DCHECK(!closed_);
task_runner_=nullptr;
client_=nullptr;
sync_watcher_.reset();
}
std::optional<uint32_t>EnqueueSyncMessage(MessageWrapper message){
controller_->lock_.AssertAcquired();
if(exclusive_wait_&& exclusive_wait_->TryFulfillingWith(message)){
exclusive_wait_=nullptr;
return std::nullopt;
}
uint32_t id=GenerateSyncMessageId();
sync_messages_.emplace_back(id, std::move(message));
SignalSyncMessageEvent();
return id;
}
voidSignalSyncMessageEvent(){
controller_->lock_.AssertAcquired();
if(sync_watcher_)
sync_watcher_->SignalEvent();
}
MessageWrapperPopSyncMessage(uint32_t id){
controller_->lock_.AssertAcquired();
if(sync_messages_.empty()|| sync_messages_.front().first!= id)
returnMessageWrapper();
MessageWrapper message= std::move(sync_messages_.front().second);
sync_messages_.pop_front();
return message;
}
// mojo::InterfaceEndpointController:
boolSendMessage(mojo::Message* message) override{
DCHECK(task_runner_->RunsTasksInCurrentSequence());
message->set_interface_id(id_);
return controller_->SendMessage(message);
}
voidAllowWokenUpBySyncWatchOnSameThread() override{
DCHECK(task_runner_->RunsTasksInCurrentSequence());
EnsureSyncWatcherExists();
sync_watcher_->AllowWokenUpBySyncWatchOnSameSequence();
}
boolSyncWatch(constbool& should_stop) override{
DCHECK(task_runner_->RunsTasksInCurrentSequence());
// It's not legal to make sync calls from the primary endpoint's thread,
// and in fact they must only happen from the proxy task runner.
DCHECK(!controller_->task_runner_->BelongsToCurrentThread());
DCHECK(controller_->proxy_task_runner_->BelongsToCurrentThread());
EnsureSyncWatcherExists();
{
base::AutoLock locker(controller_->lock_);
if(peer_closed_){
SignalSyncMessageEvent();
}
}
return sync_watcher_->SyncWatch(&should_stop);
}
MessageWrapperWaitForIncomingSyncReply(uint64_t request_id){
std::optional<ExclusiveSyncWait> wait;
{
base::AutoLock lock(controller_->lock_);
for(auto&[id, message]: sync_messages_){
if(message.HasRequestId(request_id)){
return std::move(message);
}
}
DCHECK(!exclusive_wait_);
wait.emplace(request_id);
exclusive_wait_=&wait.value();
}
wait->event.Wait();
return std::move(wait->message);
}
boolSyncWatchExclusive(uint64_t request_id) override{
MessageWrapper message=WaitForIncomingSyncReply(request_id);
if(message.value().IsNull()||!client_){
returnfalse;
}
if(!client_->HandleIncomingMessage(&message.value())){
base::AutoLock locker(controller_->lock_);
controller_->RaiseError();
returnfalse;
}
returntrue;
}
voidRegisterExternalSyncWaiter(uint64_t request_id) override{}
private:
friendclass base::RefCountedThreadSafe<Endpoint>;
~Endpoint() override{
controller_->lock_.AssertAcquired();
DCHECK(!client_);
DCHECK(closed_);
DCHECK(peer_closed_);
DCHECK(!sync_watcher_);
if(exclusive_wait_){
exclusive_wait_->event.Signal();
}
}
voidOnSyncMessageEventReady(){
DCHECK(task_runner_->RunsTasksInCurrentSequence());
// SUBTLE: The order of these scoped_refptrs matters.
// `controller_keepalive` MUST outlive `keepalive` because the Endpoint
// holds raw pointer to the AssociatedGroupController.
scoped_refptr<AssociatedGroupController> controller_keepalive(
controller_.get());
scoped_refptr<Endpoint> keepalive(this);
base::AutoLock locker(controller_->lock_);
bool more_to_process=false;
if(!sync_messages_.empty()){
MessageWrapper message_wrapper=
std::move(sync_messages_.front().second);
sync_messages_.pop_front();
bool dispatch_succeeded;
mojo::InterfaceEndpointClient* client= client_;
{
base::AutoUnlock unlocker(controller_->lock_);
dispatch_succeeded=
client->HandleIncomingMessage(&message_wrapper.value());
}
if(!sync_messages_.empty())
more_to_process=true;
if(!dispatch_succeeded)
controller_->RaiseError();
}
if(!more_to_process)
sync_watcher_->ResetEvent();
// If there are no queued sync messages and the peer has closed, there
// there won't be incoming sync messages in the future. If any
// SyncWatch() calls are on the stack for this endpoint, resetting the
// watcher will allow them to exit as the stack undwinds.
if(!more_to_process&& peer_closed_)
sync_watcher_.reset();
}
voidEnsureSyncWatcherExists(){
DCHECK(task_runner_->RunsTasksInCurrentSequence());
if(sync_watcher_)
return;
base::AutoLock locker(controller_->lock_);
sync_watcher_= std::make_unique<mojo::SequenceLocalSyncEventWatcher>(
base::BindRepeating(&Endpoint::OnSyncMessageEventReady,
base::Unretained(this)));
if(peer_closed_||!sync_messages_.empty())
SignalSyncMessageEvent();
}
uint32_tGenerateSyncMessageId(){
// Overflow is fine.
uint32_t id= next_sync_message_id_++;
DCHECK(sync_messages_.empty()|| sync_messages_.front().first!= id);
return id;
}
// Tracks the state of a pending sync wait which excludes all other incoming
// IPC on the waiting thread.
structExclusiveSyncWait{
explicitExclusiveSyncWait(uint64_t request_id)
: request_id(request_id){}
~ExclusiveSyncWait()=default;
boolTryFulfillingWith(MessageWrapper& wrapper){
if(!wrapper.HasRequestId(request_id)){
returnfalse;
}
message= std::move(wrapper);
event.Signal();
returntrue;
}
uint64_t request_id;
base::WaitableEvent event;
MessageWrapper message;
};
const raw_ptr<ChannelAssociatedGroupController> controller_;
const mojo::InterfaceId id_;
bool closed_=false;
bool peer_closed_=false;
bool handle_created_=false;
bool was_bound_off_sequence_=false;
std::optional<mojo::DisconnectReason> disconnect_reason_;
raw_ptr<mojo::InterfaceEndpointClient> client_=nullptr;
scoped_refptr<base::SequencedTaskRunner> task_runner_;
std::unique_ptr<mojo::SequenceLocalSyncEventWatcher> sync_watcher_;
base::circular_deque<std::pair<uint32_t,MessageWrapper>> sync_messages_;
raw_ptr<ExclusiveSyncWait> exclusive_wait_=nullptr;
uint32_t next_sync_message_id_=0;
};
classControlMessageProxyThunk:publicMessageReceiver{
public:
explicitControlMessageProxyThunk(
ChannelAssociatedGroupController* controller)
: controller_(controller){}
ControlMessageProxyThunk(constControlMessageProxyThunk&)=delete;
ControlMessageProxyThunk&operator=(constControlMessageProxyThunk&)=
delete;
private:
// MessageReceiver:
boolAccept(mojo::Message* message) override{
return controller_->SendMessage(message);
}
raw_ptr<ChannelAssociatedGroupController> controller_;
};
~ChannelAssociatedGroupController() override{
DCHECK(!connector_);
base::AutoLock locker(lock_);
for(auto iter= endpoints_.begin(); iter!= endpoints_.end();){
Endpoint* endpoint= iter->second.get();
++iter;
if(!endpoint->closed()){
// This happens when a NotifyPeerEndpointClosed message been received,
// but the interface ID hasn't been used to create local endpoint
// handle.
DCHECK(!endpoint->client());
DCHECK(endpoint->peer_closed());
MarkClosed(endpoint);
}else{
MarkPeerClosed(endpoint);
}
}
endpoints_.clear();
GetMemoryDumpProvider().RemoveController(this);
}
boolSendMessage(mojo::Message* message){
DCHECK(message->heap_profiler_tag());
if(task_runner_->BelongsToCurrentThread()){
returnSendMessageOnSequence(message);
}
// PostTask (or RunOrPostTask) so that `message` is sent after messages from
// tasks that are already queued (e.g. by `IPC::ChannelProxy::Send`).
auto callback= base::BindOnce(
&ChannelAssociatedGroupController::SendMessageOnSequenceViaTask,this,
std::move(*message));
if(base::FeatureList::IsEnabled(
kMojoChannelAssociatedSendUsesRunOrPostTask)){
task_runner_->RunOrPostTask(base::subtle::RunOrPostTaskPassKey(),
FROM_HERE, std::move(callback));
}else{
task_runner_->PostTask(FROM_HERE, std::move(callback));
}
returntrue;
}
boolSendMessageOnSequence(mojo::Message* message){
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
was_bound_or_message_sent_=true;
if(!connector_|| paused_){
if(!shut_down_){
base::AutoLock lock(outgoing_messages_lock_);
outgoing_messages_.emplace_back(std::move(*message));
}
returntrue;
}
MojoResult result= connector_->AcceptAndGetResult(message);
if(result== MOJO_RESULT_OK){
returntrue;
}
CHECK(connector_->encountered_error());
returnfalse;
}
voidSendMessageOnSequenceViaTask(mojo::Message message){
if(!SendMessageOnSequence(&message)){
RaiseError();
}
}
voidOnPipeError(){
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
// We keep |this| alive here because it's possible for the notifications
// below to release all other references.
scoped_refptr<ChannelAssociatedGroupController> keepalive(this);
base::AutoLock locker(lock_);
encountered_error_=true;
std::vector<uint32_t> endpoints_to_remove;
std::vector<scoped_refptr<Endpoint>> endpoints_to_notify;
for(auto iter= endpoints_.begin(); iter!= endpoints_.end();){
Endpoint* endpoint= iter->second.get();
++iter;
if(endpoint->client()){
endpoints_to_notify.push_back(endpoint);
}
if(MarkPeerClosed(endpoint)){
endpoints_to_remove.push_back(endpoint->id());
}
}
for(auto& endpoint: endpoints_to_notify){
// Because a notification may in turn detach any endpoint, we have to
// check each client again here.
if(endpoint->client())
NotifyEndpointOfError(endpoint.get(),false/* force_async */);
}
for(uint32_t id: endpoints_to_remove){
endpoints_.erase(id);
}
}
voidNotifyEndpointOfError(Endpoint* endpoint,bool force_async)
EXCLUSIVE_LOCKS_REQUIRED(lock_){
DCHECK(endpoint->task_runner()&& endpoint->client());
if(endpoint->task_runner()->RunsTasksInCurrentSequence()&&!force_async){
mojo::InterfaceEndpointClient* client= endpoint->client();
std::optional<mojo::DisconnectReason> reason(
endpoint->disconnect_reason());
base::AutoUnlock unlocker(lock_);
client->NotifyError(reason);
}else{
endpoint->task_runner()->PostTask(
FROM_HERE,
base::BindOnce(&ChannelAssociatedGroupController::
NotifyEndpointOfErrorOnEndpointThread,
this, endpoint->id(),
// This is safe as `endpoint` is verified to be in
// `endpoints_` (a map with ownership) before use.
base::UnsafeDangling(endpoint)));
}
}
// `endpoint` might be a dangling ptr and must be checked before dereference.
voidNotifyEndpointOfErrorOnEndpointThread(mojo::InterfaceId id,
MayBeDangling<Endpoint> endpoint){
base::AutoLock locker(lock_);
auto iter= endpoints_.find(id);
if(iter== endpoints_.end()|| iter->second.get()!= endpoint)
return;
if(!endpoint->client())
return;
DCHECK(endpoint->task_runner()->RunsTasksInCurrentSequence());
NotifyEndpointOfError(endpoint,false/* force_async */);
}
// Marks `endpoint` as closed and returns true if and only if its peer was
// also already closed.
boolMarkClosed(Endpoint* endpoint) EXCLUSIVE_LOCKS_REQUIRED(lock_){
endpoint->set_closed();
return endpoint->peer_closed();
}
// Marks `endpoint` as having a closed peer and returns true if and only if
// `endpoint` itself was also already closed.
boolMarkPeerClosed(Endpoint* endpoint) EXCLUSIVE_LOCKS_REQUIRED(lock_){
endpoint->set_peer_closed();
endpoint->SignalSyncMessageEvent();
return endpoint->closed();
}
voidMarkClosedAndMaybeRemove(Endpoint* endpoint)
EXCLUSIVE_LOCKS_REQUIRED(lock_){
if(MarkClosed(endpoint)){
endpoints_.erase(endpoint->id());
}
}
voidMarkPeerClosedAndMaybeRemove(Endpoint* endpoint)
EXCLUSIVE_LOCKS_REQUIRED(lock_){
if(MarkPeerClosed(endpoint)){
endpoints_.erase(endpoint->id());
}
}
Endpoint*FindOrInsertEndpoint(mojo::InterfaceId id,bool* inserted)
EXCLUSIVE_LOCKS_REQUIRED(lock_){
DCHECK(!inserted||!*inserted);
Endpoint* endpoint=FindEndpoint(id);
if(!endpoint){
endpoint=newEndpoint(this, id);
endpoints_.insert({id, endpoint});
if(inserted)
*inserted=true;
}
return endpoint;
}
Endpoint*FindEndpoint(mojo::InterfaceId id) EXCLUSIVE_LOCKS_REQUIRED(lock_){
auto iter= endpoints_.find(id);
return iter!= endpoints_.end()? iter->second.get():nullptr;
}
// mojo::MessageReceiver:
boolAccept(mojo::Message* message) override{
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if(!message->DeserializeAssociatedEndpointHandles(this))
returnfalse;
if(mojo::PipeControlMessageHandler::IsPipeControlMessage(message))
return control_message_handler_.Accept(message);
mojo::InterfaceId id= message->interface_id();
if(!mojo::IsValidInterfaceId(id))
returnfalse;
base::ReleasableAutoLock locker(&lock_);
Endpoint* endpoint=FindEndpoint(id);
if(!endpoint)
returntrue;
mojo::InterfaceEndpointClient* client= endpoint->client();
if(!client||!endpoint->task_runner()->RunsTasksInCurrentSequence()){
// The ChannelProxy for this channel is bound to `proxy_task_runner_` and
// by default legacy IPCs must dispatch to either the IO thread or the
// proxy task runner. We generally impose the same constraint on
// associated interface endpoints so that FIFO can be guaranteed across
// all interfaces without stalling any of them to wait for a pending
// endpoint to be bound.
//
// This allows us to assume that if an endpoint is not yet bound when we
// receive a message targeting it, it *will* be bound on the proxy task
// runner by the time a newly posted task runs there. Hence we simply post
// a hopeful dispatch task to that task runner.
//
// As it turns out, there are even some instances of endpoints binding to
// alternative (non-IO-thread, non-proxy) task runners, but still
// ultimately relying on the fact that we schedule their messages on the
// proxy task runner. So even if the endpoint is already bound, we
// default to scheduling it on the proxy task runner as long as it's not
// bound specifically to the IO task runner.
// TODO(rockot): Try to sort out these cases and maybe eliminate them.
//
// Finally, it's also possible that an endpoint was bound to an
// alternative task runner and it really does want its messages to
// dispatch there. In that case `was_bound_off_sequence()` will be true to
// signal that we should really use that task runner.
const scoped_refptr<base::SequencedTaskRunner> task_runner=
client&& endpoint->was_bound_off_sequence()
? endpoint->task_runner()
: proxy_task_runner_.get();
ScopedUrgentMessageNotification scoped_urgent_message_notification(
message->has_flag(mojo::Message::kFlagIsUrgent)
? urgent_message_observer_
:nullptr);
if(message->has_flag(mojo::Message::kFlagIsSync)){
MessageWrapper message_wrapper(this, std::move(*message));
// Sync messages may need to be handled by the endpoint if it's blocking
// on a sync reply. We pass ownership of the message to the endpoint's
// sync message queue. If the endpoint was blocking, it will dequeue the
// message and dispatch it. Otherwise the posted |AcceptSyncMessage()|
// call will dequeue the message and dispatch it.
std::optional<uint32_t> message_id=
endpoint->EnqueueSyncMessage(std::move(message_wrapper));
if(message_id){
task_runner->PostTask(
FROM_HERE,
base::BindOnce(
&ChannelAssociatedGroupController::AcceptSyncMessage,this,
id,*message_id,
std::move(scoped_urgent_message_notification)));
}
returntrue;
}
// If |task_runner| has been torn down already, this PostTask will fail
// and destroy |message|. That operation may need to in turn destroy
// in-transit associated endpoints and thus acquire |lock_|. We no longer
// need the lock to be held now, so we can release it before the PostTask.
{
// Grab interface name from |client| before releasing the lock to ensure
// that |client| is safe to access.
base::TaskAnnotator::ScopedSetIpcHash scoped_set_ipc_hash(
client? client->interface_name():"unknown interface");
locker.Release();
task_runner->PostTask(
FROM_HERE,
base::BindOnce(
&ChannelAssociatedGroupController::AcceptOnEndpointThread,this,
std::move(*message),
std::move(scoped_urgent_message_notification)));
}
returntrue;
}
locker.Release();
// It's safe to access |client| here without holding a lock, because this
// code runs on a proxy thread and |client| can't be destroyed from any
// thread.
return client->HandleIncomingMessage(message);
}
voidAcceptOnEndpointThread(
mojo::Message message,
ScopedUrgentMessageNotification scoped_urgent_message_notification){
TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("mojom"),
"ChannelAssociatedGroupController::AcceptOnEndpointThread");
mojo::InterfaceId id= message.interface_id();
DCHECK(mojo::IsValidInterfaceId(id)&&!mojo::IsPrimaryInterfaceId(id));
base::AutoLock locker(lock_);
Endpoint* endpoint=FindEndpoint(id);
if(!endpoint)
return;
mojo::InterfaceEndpointClient* client= endpoint->client();
if(!client)
return;
if(!endpoint->task_runner()->RunsTasksInCurrentSequence()&&
!proxy_task_runner_->RunsTasksInCurrentSequence()){
return;
}
// TODO(altimin): This event is temporarily kept as a debug fallback. Remove
// it once the new implementation proves to be stable.
TRACE_EVENT(
TRACE_DISABLED_BY_DEFAULT("mojom"),
// Using client->interface_name() is safe here because this is a static
// string defined for each mojo interface.
perfetto::StaticString(client->interface_name()),
[&](perfetto::EventContext& ctx){
staticconstuint8_t* toplevel_flow_enabled=
TRACE_EVENT_API_GET_CATEGORY_GROUP_ENABLED("toplevel.flow");
if(!*toplevel_flow_enabled)
return;
perfetto::Flow::Global(message.GetTraceId())(ctx);
});
// Sync messages should never make their way to this method.
DCHECK(!message.has_flag(mojo::Message::kFlagIsSync));
bool result=false;
{
base::AutoUnlock unlocker(lock_);
result= client->HandleIncomingMessage(&message);
}
if(!result)
RaiseError();
}
voidAcceptSyncMessage(
mojo::InterfaceId interface_id,
uint32_t message_id,
ScopedUrgentMessageNotification scoped_urgent_message_notification){
TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("mojom"),
"ChannelAssociatedGroupController::AcceptSyncMessage");
base::AutoLock locker(lock_);
Endpoint* endpoint=FindEndpoint(interface_id);
if(!endpoint)
return;
// Careful, if the endpoint is detached its members are cleared. Check for
// that before dereferencing.
mojo::InterfaceEndpointClient* client= endpoint->client();
if(!client)
return;
if(!endpoint->task_runner()->RunsTasksInCurrentSequence()&&
!proxy_task_runner_->RunsTasksInCurrentSequence()){
return;
}
// Using client->interface_name() is safe here because this is a static
// string defined for each mojo interface.
TRACE_EVENT0("mojom", client->interface_name());
MessageWrapper message_wrapper= endpoint->PopSyncMessage(message_id);
// The message must have already been dequeued by the endpoint waking up
// from a sync wait. Nothing to do.
if(message_wrapper.value().IsNull())
return;
bool result=false;
{
base::AutoUnlock unlocker(lock_);
result= client->HandleIncomingMessage(&message_wrapper.value());
}
if(!result)
RaiseError();
}
// mojo::PipeControlMessageHandlerDelegate:
boolOnPeerAssociatedEndpointClosed(
mojo::InterfaceId id,
const std::optional<mojo::DisconnectReason>& reason) override{
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
scoped_refptr<ChannelAssociatedGroupController> keepalive(this);
base::AutoLock locker(lock_);
scoped_refptr<Endpoint> endpoint=FindOrInsertEndpoint(id,nullptr);
if(reason)
endpoint->set_disconnect_reason(reason);
if(!endpoint->peer_closed()){
if(endpoint->client())
NotifyEndpointOfError(endpoint.get(),false/* force_async */);
MarkPeerClosedAndMaybeRemove(endpoint.get());
}
returntrue;
}
boolWaitForFlushToComplete(
mojo::ScopedMessagePipeHandle flush_pipe) override{
// We don't support async flushing on the IPC Channel pipe.
returnfalse;
}
const scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
const scoped_refptr<base::SingleThreadTaskRunner> proxy_task_runner_;
constbool set_interface_id_namespace_bit_;
// Ensures sequenced access to members below.
SEQUENCE_CHECKER(sequence_checker_);
// Whether `Bind()` or `SendMessageOnSequence()` was called.
// `sequence_checker_` can be detached when this is `false`.
bool was_bound_or_message_sent_ GUARDED_BY_CONTEXT(sequence_checker_)=false;
bool paused_ GUARDED_BY_CONTEXT(sequence_checker_)=false;
bool shut_down_ GUARDED_BY_CONTEXT(sequence_checker_)=false;
std::unique_ptr<mojo::Connector> connector_
GUARDED_BY_CONTEXT(sequence_checker_);
mojo::MessageDispatcher dispatcher_ GUARDED_BY_CONTEXT(sequence_checker_);
mojo::PipeControlMessageHandler control_message_handler_
GUARDED_BY_CONTEXT(sequence_checker_);
ControlMessageProxyThunk control_message_proxy_thunk_
GUARDED_BY_CONTEXT(sequence_checker_);
raw_ptr<UrgentMessageObserver> urgent_message_observer_
GUARDED_BY_CONTEXT(sequence_checker_)=nullptr;
// NOTE: It is unsafe to call into this object while holding |lock_|.
mojo::PipeControlMessageProxy control_message_proxy_;
// Outgoing messages sent before this controller Bound() to a pipe or while it
// was paused. Protected by a lock to support memory dumps from any thread.
base::Lock outgoing_messages_lock_;
std::vector<mojo::Message> outgoing_messages_
GUARDED_BY(outgoing_messages_lock_);
// Guards the fields below for thread-safe access.
base::Lock lock_;
bool encountered_error_ GUARDED_BY(lock_)=false;
// ID #1 is reserved for the mojom::Channel interface.
uint32_t next_interface_id_ GUARDED_BY(lock_)=2;
std::map<uint32_t, scoped_refptr<Endpoint>> endpoints_ GUARDED_BY(lock_);
};
namespace{
boolControllerMemoryDumpProvider::OnMemoryDump(
const base::trace_event::MemoryDumpArgs& args,
base::trace_event::ProcessMemoryDump* pmd){
base::AutoLock lock(lock_);
for(ChannelAssociatedGroupController* controller: controllers_){
base::trace_event::MemoryAllocatorDump* dump= pmd->CreateAllocatorDump(
base::StringPrintf("mojo/queued_ipc_channel_message/0x%"PRIxPTR,
reinterpret_cast<uintptr_t>(controller)));
dump->AddScalar(base::trace_event::MemoryAllocatorDump::kNameObjectCount,
base::trace_event::MemoryAllocatorDump::kUnitsObjects,
controller->GetQueuedMessageCount());
MessageMemoryDumpInfo info;
size_t count=0;
controller->GetTopQueuedMessageMemoryDumpInfo(&info,&count);
dump->AddScalar("top_message_name","id", info.id);
dump->AddScalar("top_message_count",
base::trace_event::MemoryAllocatorDump::kUnitsObjects,
count);
if(info.profiler_tag){
// TODO(ssid): Memory dumps currently do not support adding string
// arguments in background dumps. So, add this value as a trace event for
// now.
TRACE_EVENT2(base::trace_event::MemoryDumpManager::kTraceCategory,
"ControllerMemoryDumpProvider::OnMemoryDump",
"top_queued_message_tag", info.profiler_tag,
"count", count);
}
}
returntrue;
}
classMojoBootstrapImpl:publicMojoBootstrap{
public:
MojoBootstrapImpl(
mojo::ScopedMessagePipeHandle handle,
const scoped_refptr<ChannelAssociatedGroupController> controller)
: controller_(controller),
associated_group_(controller),
handle_(std::move(handle)){}
MojoBootstrapImpl(constMojoBootstrapImpl&)=delete;
MojoBootstrapImpl&operator=(constMojoBootstrapImpl&)=delete;
~MojoBootstrapImpl() override{
controller_->ShutDown();
}
private:
voidConnect(
mojo::PendingAssociatedRemote<mojom::Channel>* sender,
mojo::PendingAssociatedReceiver<mojom::Channel>* receiver) override{
controller_->Bind(std::move(handle_), sender, receiver);
}
voidStartReceiving() override{ controller_->StartReceiving();}
voidPause() override{
controller_->Pause();
}
voidUnpause() override{
controller_->Unpause();
}
voidFlush() override{
controller_->FlushOutgoingMessages();
}
mojo::AssociatedGroup*GetAssociatedGroup() override{
return&associated_group_;
}
voidSetUrgentMessageObserver(UrgentMessageObserver* observer) override{
controller_->SetUrgentMessageObserver(observer);
}
scoped_refptr<ChannelAssociatedGroupController> controller_;
mojo::AssociatedGroup associated_group_;
mojo::ScopedMessagePipeHandle handle_;
};
}// namespace
ScopedAllowOffSequenceChannelAssociatedBindings::
ScopedAllowOffSequenceChannelAssociatedBindings()
: resetter_(&off_sequence_binding_allowed,true){}
ScopedAllowOffSequenceChannelAssociatedBindings::
~ScopedAllowOffSequenceChannelAssociatedBindings()=default;
// static
std::unique_ptr<MojoBootstrap>MojoBootstrap::Create(
mojo::ScopedMessagePipeHandle handle,
Channel::Mode mode,
const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner){
return std::make_unique<MojoBootstrapImpl>(
std::move(handle),
base::MakeRefCounted<ChannelAssociatedGroupController>(
mode==Channel::MODE_SERVER, ipc_task_runner, proxy_task_runner));
}
}// namespace IPC

[8]ページ先頭

©2009-2025 Movatter.jp