1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/core/node_channel.h"
6
7 #include <cstring>
8 #include <limits>
9 #include <sstream>
10
11 #include "base/bind.h"
12 #include "base/location.h"
13 #include "base/logging.h"
14 #include "base/memory/ptr_util.h"
15 #include "mojo/core/channel.h"
16 #include "mojo/core/configuration.h"
17 #include "mojo/core/core.h"
18 #include "mojo/core/request_context.h"
19
20 namespace mojo {
21 namespace core {
22
23 namespace {
24
25 // NOTE: Please ONLY append messages to the end of this enum.
26 enum class MessageType : uint32_t {
27 ACCEPT_INVITEE,
28 ACCEPT_INVITATION,
29 ADD_BROKER_CLIENT,
30 BROKER_CLIENT_ADDED,
31 ACCEPT_BROKER_CLIENT,
32 EVENT_MESSAGE,
33 REQUEST_PORT_MERGE,
34 REQUEST_INTRODUCTION,
35 INTRODUCE,
36 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
37 RELAY_EVENT_MESSAGE,
38 #endif
39 BROADCAST_EVENT,
40 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
41 EVENT_MESSAGE_FROM_RELAY,
42 #endif
43 ACCEPT_PEER,
44 };
45
46 struct Header {
47 MessageType type;
48 uint32_t padding;
49 };
50
51 static_assert(IsAlignedForChannelMessage(sizeof(Header)),
52 "Invalid header size.");
53
54 struct AcceptInviteeData {
55 ports::NodeName inviter_name;
56 ports::NodeName token;
57 };
58
59 struct AcceptInvitationData {
60 ports::NodeName token;
61 ports::NodeName invitee_name;
62 };
63
64 struct AcceptPeerData {
65 ports::NodeName token;
66 ports::NodeName peer_name;
67 ports::PortName port_name;
68 };
69
70 // This message may include a process handle on plaforms that require it.
71 struct AddBrokerClientData {
72 ports::NodeName client_name;
73 #if !defined(OS_WIN)
74 uint32_t process_handle;
75 uint32_t padding;
76 #endif
77 };
78
79 #if !defined(OS_WIN)
80 static_assert(sizeof(base::ProcessHandle) == sizeof(uint32_t),
81 "Unexpected pid size");
82 static_assert(sizeof(AddBrokerClientData) % kChannelMessageAlignment == 0,
83 "Invalid AddBrokerClientData size.");
84 #endif
85
86 // This data is followed by a platform channel handle to the broker.
87 struct BrokerClientAddedData {
88 ports::NodeName client_name;
89 };
90
91 // This data may be followed by a platform channel handle to the broker. If not,
92 // then the inviter is the broker and its channel should be used as such.
93 struct AcceptBrokerClientData {
94 ports::NodeName broker_name;
95 };
96
97 // This is followed by arbitrary payload data which is interpreted as a token
98 // string for port location.
99 struct RequestPortMergeData {
100 ports::PortName connector_port_name;
101 };
102
103 // Used for both REQUEST_INTRODUCTION and INTRODUCE.
104 //
105 // For INTRODUCE the message also includes a valid platform handle for a channel
106 // the receiver may use to communicate with the named node directly, or an
107 // invalid platform handle if the node is unknown to the sender or otherwise
108 // cannot be introduced.
109 struct IntroductionData {
110 ports::NodeName name;
111 };
112
113 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
114 // This struct is followed by the full payload of a message to be relayed.
115 struct RelayEventMessageData {
116 ports::NodeName destination;
117 };
118
119 // This struct is followed by the full payload of a relayed message.
120 struct EventMessageFromRelayData {
121 ports::NodeName source;
122 };
123 #endif
124
125 template <typename DataType>
CreateMessage(MessageType type,size_t payload_size,size_t num_handles,DataType ** out_data,size_t capacity=0)126 Channel::MessagePtr CreateMessage(MessageType type,
127 size_t payload_size,
128 size_t num_handles,
129 DataType** out_data,
130 size_t capacity = 0) {
131 const size_t total_size = payload_size + sizeof(Header);
132 if (capacity == 0)
133 capacity = total_size;
134 else
135 capacity = std::max(total_size, capacity);
136 auto message =
137 std::make_unique<Channel::Message>(capacity, total_size, num_handles);
138 Header* header = reinterpret_cast<Header*>(message->mutable_payload());
139 header->type = type;
140 header->padding = 0;
141 *out_data = reinterpret_cast<DataType*>(&header[1]);
142 return message;
143 }
144
145 template <typename DataType>
GetMessagePayload(const void * bytes,size_t num_bytes,DataType ** out_data)146 bool GetMessagePayload(const void* bytes,
147 size_t num_bytes,
148 DataType** out_data) {
149 static_assert(sizeof(DataType) > 0, "DataType must have non-zero size.");
150 if (num_bytes < sizeof(Header) + sizeof(DataType))
151 return false;
152 *out_data = reinterpret_cast<const DataType*>(
153 static_cast<const char*>(bytes) + sizeof(Header));
154 return true;
155 }
156
157 } // namespace
158
159 // static
Create(Delegate * delegate,ConnectionParams connection_params,scoped_refptr<base::TaskRunner> io_task_runner,const ProcessErrorCallback & process_error_callback)160 scoped_refptr<NodeChannel> NodeChannel::Create(
161 Delegate* delegate,
162 ConnectionParams connection_params,
163 scoped_refptr<base::TaskRunner> io_task_runner,
164 const ProcessErrorCallback& process_error_callback) {
165 #if defined(OS_NACL_SFI)
166 LOG(FATAL) << "Multi-process not yet supported on NaCl-SFI";
167 return nullptr;
168 #else
169 return new NodeChannel(delegate, std::move(connection_params), io_task_runner,
170 process_error_callback);
171 #endif
172 }
173
174 // static
CreateEventMessage(size_t capacity,size_t payload_size,void ** payload,size_t num_handles)175 Channel::MessagePtr NodeChannel::CreateEventMessage(size_t capacity,
176 size_t payload_size,
177 void** payload,
178 size_t num_handles) {
179 return CreateMessage(MessageType::EVENT_MESSAGE, payload_size, num_handles,
180 payload, capacity);
181 }
182
183 // static
GetEventMessageData(Channel::Message * message,void ** data,size_t * num_data_bytes)184 void NodeChannel::GetEventMessageData(Channel::Message* message,
185 void** data,
186 size_t* num_data_bytes) {
187 // NOTE: OnChannelMessage guarantees that we never accept a Channel::Message
188 // with a payload of fewer than |sizeof(Header)| bytes.
189 *data = reinterpret_cast<Header*>(message->mutable_payload()) + 1;
190 *num_data_bytes = message->payload_size() - sizeof(Header);
191 }
192
Start()193 void NodeChannel::Start() {
194 base::AutoLock lock(channel_lock_);
195 // ShutDown() may have already been called, in which case |channel_| is null.
196 if (channel_)
197 channel_->Start();
198 }
199
ShutDown()200 void NodeChannel::ShutDown() {
201 base::AutoLock lock(channel_lock_);
202 if (channel_) {
203 channel_->ShutDown();
204 channel_ = nullptr;
205 }
206 }
207
LeakHandleOnShutdown()208 void NodeChannel::LeakHandleOnShutdown() {
209 base::AutoLock lock(channel_lock_);
210 if (channel_) {
211 channel_->LeakHandle();
212 }
213 }
214
NotifyBadMessage(const std::string & error)215 void NodeChannel::NotifyBadMessage(const std::string& error) {
216 if (!process_error_callback_.is_null())
217 process_error_callback_.Run("Received bad user message: " + error);
218 }
219
SetRemoteProcessHandle(ScopedProcessHandle process_handle)220 void NodeChannel::SetRemoteProcessHandle(ScopedProcessHandle process_handle) {
221 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
222 {
223 base::AutoLock lock(channel_lock_);
224 if (channel_)
225 channel_->set_remote_process(process_handle.Clone());
226 }
227 base::AutoLock lock(remote_process_handle_lock_);
228 DCHECK(!remote_process_handle_.is_valid());
229 CHECK_NE(remote_process_handle_.get(), base::GetCurrentProcessHandle());
230 remote_process_handle_ = std::move(process_handle);
231 }
232
HasRemoteProcessHandle()233 bool NodeChannel::HasRemoteProcessHandle() {
234 base::AutoLock lock(remote_process_handle_lock_);
235 return remote_process_handle_.is_valid();
236 }
237
CloneRemoteProcessHandle()238 ScopedProcessHandle NodeChannel::CloneRemoteProcessHandle() {
239 base::AutoLock lock(remote_process_handle_lock_);
240 if (!remote_process_handle_.is_valid())
241 return ScopedProcessHandle();
242 return remote_process_handle_.Clone();
243 }
244
SetRemoteNodeName(const ports::NodeName & name)245 void NodeChannel::SetRemoteNodeName(const ports::NodeName& name) {
246 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
247 remote_node_name_ = name;
248 }
249
AcceptInvitee(const ports::NodeName & inviter_name,const ports::NodeName & token)250 void NodeChannel::AcceptInvitee(const ports::NodeName& inviter_name,
251 const ports::NodeName& token) {
252 AcceptInviteeData* data;
253 Channel::MessagePtr message = CreateMessage(
254 MessageType::ACCEPT_INVITEE, sizeof(AcceptInviteeData), 0, &data);
255 data->inviter_name = inviter_name;
256 data->token = token;
257 WriteChannelMessage(std::move(message));
258 }
259
AcceptInvitation(const ports::NodeName & token,const ports::NodeName & invitee_name)260 void NodeChannel::AcceptInvitation(const ports::NodeName& token,
261 const ports::NodeName& invitee_name) {
262 AcceptInvitationData* data;
263 Channel::MessagePtr message = CreateMessage(
264 MessageType::ACCEPT_INVITATION, sizeof(AcceptInvitationData), 0, &data);
265 data->token = token;
266 data->invitee_name = invitee_name;
267 WriteChannelMessage(std::move(message));
268 }
269
AcceptPeer(const ports::NodeName & sender_name,const ports::NodeName & token,const ports::PortName & port_name)270 void NodeChannel::AcceptPeer(const ports::NodeName& sender_name,
271 const ports::NodeName& token,
272 const ports::PortName& port_name) {
273 AcceptPeerData* data;
274 Channel::MessagePtr message =
275 CreateMessage(MessageType::ACCEPT_PEER, sizeof(AcceptPeerData), 0, &data);
276 data->token = token;
277 data->peer_name = sender_name;
278 data->port_name = port_name;
279 WriteChannelMessage(std::move(message));
280 }
281
AddBrokerClient(const ports::NodeName & client_name,ScopedProcessHandle process_handle)282 void NodeChannel::AddBrokerClient(const ports::NodeName& client_name,
283 ScopedProcessHandle process_handle) {
284 AddBrokerClientData* data;
285 std::vector<PlatformHandle> handles;
286 #if defined(OS_WIN)
287 handles.emplace_back(base::win::ScopedHandle(process_handle.release()));
288 #endif
289 Channel::MessagePtr message =
290 CreateMessage(MessageType::ADD_BROKER_CLIENT, sizeof(AddBrokerClientData),
291 handles.size(), &data);
292 message->SetHandles(std::move(handles));
293 data->client_name = client_name;
294 #if !defined(OS_WIN)
295 data->process_handle = process_handle.get();
296 data->padding = 0;
297 #endif
298 WriteChannelMessage(std::move(message));
299 }
300
BrokerClientAdded(const ports::NodeName & client_name,PlatformHandle broker_channel)301 void NodeChannel::BrokerClientAdded(const ports::NodeName& client_name,
302 PlatformHandle broker_channel) {
303 BrokerClientAddedData* data;
304 std::vector<PlatformHandle> handles;
305 if (broker_channel.is_valid())
306 handles.emplace_back(std::move(broker_channel));
307 Channel::MessagePtr message =
308 CreateMessage(MessageType::BROKER_CLIENT_ADDED,
309 sizeof(BrokerClientAddedData), handles.size(), &data);
310 message->SetHandles(std::move(handles));
311 data->client_name = client_name;
312 WriteChannelMessage(std::move(message));
313 }
314
AcceptBrokerClient(const ports::NodeName & broker_name,PlatformHandle broker_channel)315 void NodeChannel::AcceptBrokerClient(const ports::NodeName& broker_name,
316 PlatformHandle broker_channel) {
317 AcceptBrokerClientData* data;
318 std::vector<PlatformHandle> handles;
319 if (broker_channel.is_valid())
320 handles.emplace_back(std::move(broker_channel));
321 Channel::MessagePtr message =
322 CreateMessage(MessageType::ACCEPT_BROKER_CLIENT,
323 sizeof(AcceptBrokerClientData), handles.size(), &data);
324 message->SetHandles(std::move(handles));
325 data->broker_name = broker_name;
326 WriteChannelMessage(std::move(message));
327 }
328
RequestPortMerge(const ports::PortName & connector_port_name,const std::string & token)329 void NodeChannel::RequestPortMerge(const ports::PortName& connector_port_name,
330 const std::string& token) {
331 RequestPortMergeData* data;
332 Channel::MessagePtr message =
333 CreateMessage(MessageType::REQUEST_PORT_MERGE,
334 sizeof(RequestPortMergeData) + token.size(), 0, &data);
335 data->connector_port_name = connector_port_name;
336 memcpy(data + 1, token.data(), token.size());
337 WriteChannelMessage(std::move(message));
338 }
339
RequestIntroduction(const ports::NodeName & name)340 void NodeChannel::RequestIntroduction(const ports::NodeName& name) {
341 IntroductionData* data;
342 Channel::MessagePtr message = CreateMessage(
343 MessageType::REQUEST_INTRODUCTION, sizeof(IntroductionData), 0, &data);
344 data->name = name;
345 WriteChannelMessage(std::move(message));
346 }
347
Introduce(const ports::NodeName & name,PlatformHandle channel_handle)348 void NodeChannel::Introduce(const ports::NodeName& name,
349 PlatformHandle channel_handle) {
350 IntroductionData* data;
351 std::vector<PlatformHandle> handles;
352 if (channel_handle.is_valid())
353 handles.emplace_back(std::move(channel_handle));
354 Channel::MessagePtr message = CreateMessage(
355 MessageType::INTRODUCE, sizeof(IntroductionData), handles.size(), &data);
356 message->SetHandles(std::move(handles));
357 data->name = name;
358 WriteChannelMessage(std::move(message));
359 }
360
SendChannelMessage(Channel::MessagePtr message)361 void NodeChannel::SendChannelMessage(Channel::MessagePtr message) {
362 WriteChannelMessage(std::move(message));
363 }
364
Broadcast(Channel::MessagePtr message)365 void NodeChannel::Broadcast(Channel::MessagePtr message) {
366 DCHECK(!message->has_handles());
367 void* data;
368 Channel::MessagePtr broadcast_message = CreateMessage(
369 MessageType::BROADCAST_EVENT, message->data_num_bytes(), 0, &data);
370 memcpy(data, message->data(), message->data_num_bytes());
371 WriteChannelMessage(std::move(broadcast_message));
372 }
373
374 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
RelayEventMessage(const ports::NodeName & destination,Channel::MessagePtr message)375 void NodeChannel::RelayEventMessage(const ports::NodeName& destination,
376 Channel::MessagePtr message) {
377 #if defined(OS_WIN)
378 DCHECK(message->has_handles());
379
380 // Note that this is only used on Windows, and on Windows all platform
381 // handles are included in the message data. We blindly copy all the data
382 // here and the relay node (the broker) will duplicate handles as needed.
383 size_t num_bytes = sizeof(RelayEventMessageData) + message->data_num_bytes();
384 RelayEventMessageData* data;
385 Channel::MessagePtr relay_message =
386 CreateMessage(MessageType::RELAY_EVENT_MESSAGE, num_bytes, 0, &data);
387 data->destination = destination;
388 memcpy(data + 1, message->data(), message->data_num_bytes());
389
390 // When the handles are duplicated in the broker, the source handles will
391 // be closed. If the broker never receives this message then these handles
392 // will leak, but that means something else has probably broken and the
393 // sending process won't likely be around much longer.
394 //
395 // TODO(https://crbug.com/813112): We would like to be able to violate the
396 // above stated assumption. We should not leak handles in cases where we
397 // outlive the broker, as we may continue existing and eventually accept a new
398 // broker invitation.
399 std::vector<PlatformHandleInTransit> handles = message->TakeHandles();
400 for (auto& handle : handles)
401 handle.TakeHandle().release();
402
403 #else
404 DCHECK(message->has_mach_ports());
405
406 // On OSX, the handles are extracted from the relayed message and attached to
407 // the wrapper. The broker then takes the handles attached to the wrapper and
408 // moves them back to the relayed message. This is necessary because the
409 // message may contain fds which need to be attached to the outer message so
410 // that they can be transferred to the broker.
411 std::vector<PlatformHandleInTransit> handles = message->TakeHandles();
412 size_t num_bytes = sizeof(RelayEventMessageData) + message->data_num_bytes();
413 RelayEventMessageData* data;
414 Channel::MessagePtr relay_message = CreateMessage(
415 MessageType::RELAY_EVENT_MESSAGE, num_bytes, handles.size(), &data);
416 data->destination = destination;
417 memcpy(data + 1, message->data(), message->data_num_bytes());
418 relay_message->SetHandles(std::move(handles));
419 #endif // defined(OS_WIN)
420
421 WriteChannelMessage(std::move(relay_message));
422 }
423
EventMessageFromRelay(const ports::NodeName & source,Channel::MessagePtr message)424 void NodeChannel::EventMessageFromRelay(const ports::NodeName& source,
425 Channel::MessagePtr message) {
426 size_t num_bytes =
427 sizeof(EventMessageFromRelayData) + message->payload_size();
428 EventMessageFromRelayData* data;
429 Channel::MessagePtr relayed_message =
430 CreateMessage(MessageType::EVENT_MESSAGE_FROM_RELAY, num_bytes,
431 message->num_handles(), &data);
432 data->source = source;
433 if (message->payload_size())
434 memcpy(data + 1, message->payload(), message->payload_size());
435 relayed_message->SetHandles(message->TakeHandles());
436 WriteChannelMessage(std::move(relayed_message));
437 }
438 #endif // defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
439
NodeChannel(Delegate * delegate,ConnectionParams connection_params,scoped_refptr<base::TaskRunner> io_task_runner,const ProcessErrorCallback & process_error_callback)440 NodeChannel::NodeChannel(Delegate* delegate,
441 ConnectionParams connection_params,
442 scoped_refptr<base::TaskRunner> io_task_runner,
443 const ProcessErrorCallback& process_error_callback)
444 : delegate_(delegate),
445 io_task_runner_(io_task_runner),
446 process_error_callback_(process_error_callback)
447 #if !defined(OS_NACL_SFI)
448 ,
449 channel_(
450 Channel::Create(this, std::move(connection_params), io_task_runner_))
451 #endif
452 {
453 }
454
~NodeChannel()455 NodeChannel::~NodeChannel() {
456 ShutDown();
457 }
458
OnChannelMessage(const void * payload,size_t payload_size,std::vector<PlatformHandle> handles)459 void NodeChannel::OnChannelMessage(const void* payload,
460 size_t payload_size,
461 std::vector<PlatformHandle> handles) {
462 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
463
464 RequestContext request_context(RequestContext::Source::SYSTEM);
465
466 // Ensure this NodeChannel stays alive through the extent of this method. The
467 // delegate may have the only other reference to this object and it may choose
468 // to drop it here in response to, e.g., a malformed message.
469 scoped_refptr<NodeChannel> keepalive = this;
470
471 if (payload_size <= sizeof(Header)) {
472 delegate_->OnChannelError(remote_node_name_, this);
473 return;
474 }
475
476 const Header* header = static_cast<const Header*>(payload);
477 switch (header->type) {
478 case MessageType::ACCEPT_INVITEE: {
479 const AcceptInviteeData* data;
480 if (GetMessagePayload(payload, payload_size, &data)) {
481 delegate_->OnAcceptInvitee(remote_node_name_, data->inviter_name,
482 data->token);
483 return;
484 }
485 break;
486 }
487
488 case MessageType::ACCEPT_INVITATION: {
489 const AcceptInvitationData* data;
490 if (GetMessagePayload(payload, payload_size, &data)) {
491 delegate_->OnAcceptInvitation(remote_node_name_, data->token,
492 data->invitee_name);
493 return;
494 }
495 break;
496 }
497
498 case MessageType::ADD_BROKER_CLIENT: {
499 const AddBrokerClientData* data;
500 if (GetMessagePayload(payload, payload_size, &data)) {
501 #if defined(OS_WIN)
502 if (handles.size() != 1) {
503 DLOG(ERROR) << "Dropping invalid AddBrokerClient message.";
504 break;
505 }
506 delegate_->OnAddBrokerClient(remote_node_name_, data->client_name,
507 handles[0].ReleaseHandle());
508 #else
509 if (!handles.empty()) {
510 DLOG(ERROR) << "Dropping invalid AddBrokerClient message.";
511 break;
512 }
513 delegate_->OnAddBrokerClient(remote_node_name_, data->client_name,
514 data->process_handle);
515 #endif
516 return;
517 }
518 break;
519 }
520
521 case MessageType::BROKER_CLIENT_ADDED: {
522 const BrokerClientAddedData* data;
523 if (GetMessagePayload(payload, payload_size, &data)) {
524 if (handles.size() != 1) {
525 DLOG(ERROR) << "Dropping invalid BrokerClientAdded message.";
526 break;
527 }
528 delegate_->OnBrokerClientAdded(remote_node_name_, data->client_name,
529 std::move(handles[0]));
530 return;
531 }
532 break;
533 }
534
535 case MessageType::ACCEPT_BROKER_CLIENT: {
536 const AcceptBrokerClientData* data;
537 if (GetMessagePayload(payload, payload_size, &data)) {
538 PlatformHandle broker_channel;
539 if (handles.size() > 1) {
540 DLOG(ERROR) << "Dropping invalid AcceptBrokerClient message.";
541 break;
542 }
543 if (handles.size() == 1)
544 broker_channel = std::move(handles[0]);
545
546 delegate_->OnAcceptBrokerClient(remote_node_name_, data->broker_name,
547 std::move(broker_channel));
548 return;
549 }
550 break;
551 }
552
553 case MessageType::EVENT_MESSAGE: {
554 Channel::MessagePtr message(
555 new Channel::Message(payload_size, handles.size()));
556 message->SetHandles(std::move(handles));
557 memcpy(message->mutable_payload(), payload, payload_size);
558 delegate_->OnEventMessage(remote_node_name_, std::move(message));
559 return;
560 }
561
562 case MessageType::REQUEST_PORT_MERGE: {
563 const RequestPortMergeData* data;
564 if (GetMessagePayload(payload, payload_size, &data)) {
565 // Don't accept an empty token.
566 size_t token_size = payload_size - sizeof(*data) - sizeof(Header);
567 if (token_size == 0)
568 break;
569 std::string token(reinterpret_cast<const char*>(data + 1), token_size);
570 delegate_->OnRequestPortMerge(remote_node_name_,
571 data->connector_port_name, token);
572 return;
573 }
574 break;
575 }
576
577 case MessageType::REQUEST_INTRODUCTION: {
578 const IntroductionData* data;
579 if (GetMessagePayload(payload, payload_size, &data)) {
580 delegate_->OnRequestIntroduction(remote_node_name_, data->name);
581 return;
582 }
583 break;
584 }
585
586 case MessageType::INTRODUCE: {
587 const IntroductionData* data;
588 if (GetMessagePayload(payload, payload_size, &data)) {
589 if (handles.size() > 1) {
590 DLOG(ERROR) << "Dropping invalid introduction message.";
591 break;
592 }
593 PlatformHandle channel_handle;
594 if (handles.size() == 1)
595 channel_handle = std::move(handles[0]);
596
597 delegate_->OnIntroduce(remote_node_name_, data->name,
598 std::move(channel_handle));
599 return;
600 }
601 break;
602 }
603
604 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
605 case MessageType::RELAY_EVENT_MESSAGE: {
606 base::ProcessHandle from_process;
607 {
608 base::AutoLock lock(remote_process_handle_lock_);
609 // NOTE: It's safe to retain a weak reference to this process handle
610 // through the extent of this call because |this| is kept alive and
611 // |remote_process_handle_| is never reset once set.
612 from_process = remote_process_handle_.get();
613 }
614 const RelayEventMessageData* data;
615 if (GetMessagePayload(payload, payload_size, &data)) {
616 // Don't try to relay an empty message.
617 if (payload_size <= sizeof(Header) + sizeof(RelayEventMessageData))
618 break;
619
620 const void* message_start = data + 1;
621 Channel::MessagePtr message = Channel::Message::Deserialize(
622 message_start, payload_size - sizeof(Header) - sizeof(*data),
623 from_process);
624 if (!message) {
625 DLOG(ERROR) << "Dropping invalid relay message.";
626 break;
627 }
628 #if defined(OS_MACOSX) && !defined(OS_IOS)
629 message->SetHandles(std::move(handles));
630 #endif
631 delegate_->OnRelayEventMessage(remote_node_name_, from_process,
632 data->destination, std::move(message));
633 return;
634 }
635 break;
636 }
637 #endif
638
639 case MessageType::BROADCAST_EVENT: {
640 if (payload_size <= sizeof(Header))
641 break;
642 const void* data = static_cast<const void*>(
643 reinterpret_cast<const Header*>(payload) + 1);
644 Channel::MessagePtr message =
645 Channel::Message::Deserialize(data, payload_size - sizeof(Header));
646 if (!message || message->has_handles()) {
647 DLOG(ERROR) << "Dropping invalid broadcast message.";
648 break;
649 }
650 delegate_->OnBroadcast(remote_node_name_, std::move(message));
651 return;
652 }
653
654 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
655 case MessageType::EVENT_MESSAGE_FROM_RELAY:
656 const EventMessageFromRelayData* data;
657 if (GetMessagePayload(payload, payload_size, &data)) {
658 size_t num_bytes = payload_size - sizeof(*data);
659 if (num_bytes < sizeof(Header))
660 break;
661 num_bytes -= sizeof(Header);
662
663 Channel::MessagePtr message(
664 new Channel::Message(num_bytes, handles.size()));
665 message->SetHandles(std::move(handles));
666 if (num_bytes)
667 memcpy(message->mutable_payload(), data + 1, num_bytes);
668 delegate_->OnEventMessageFromRelay(remote_node_name_, data->source,
669 std::move(message));
670 return;
671 }
672 break;
673
674 #endif // defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
675
676 case MessageType::ACCEPT_PEER: {
677 const AcceptPeerData* data;
678 if (GetMessagePayload(payload, payload_size, &data)) {
679 delegate_->OnAcceptPeer(remote_node_name_, data->token, data->peer_name,
680 data->port_name);
681 return;
682 }
683 break;
684 }
685
686 default:
687 // Ignore unrecognized message types, allowing for future extensibility.
688 return;
689 }
690
691 DLOG(ERROR) << "Received invalid message. Closing channel.";
692 if (process_error_callback_)
693 process_error_callback_.Run("NodeChannel received a malformed message");
694 delegate_->OnChannelError(remote_node_name_, this);
695 }
696
OnChannelError(Channel::Error error)697 void NodeChannel::OnChannelError(Channel::Error error) {
698 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
699
700 RequestContext request_context(RequestContext::Source::SYSTEM);
701
702 ShutDown();
703
704 if (process_error_callback_ &&
705 error == Channel::Error::kReceivedMalformedData) {
706 process_error_callback_.Run("Channel received a malformed message");
707 }
708
709 // |OnChannelError()| may cause |this| to be destroyed, but still need access
710 // to the name name after that destruction. So may a copy of
711 // |remote_node_name_| so it can be used if |this| becomes destroyed.
712 ports::NodeName node_name = remote_node_name_;
713 delegate_->OnChannelError(node_name, this);
714 }
715
WriteChannelMessage(Channel::MessagePtr message)716 void NodeChannel::WriteChannelMessage(Channel::MessagePtr message) {
717 // Force a crash if this process attempts to send a message larger than the
718 // maximum allowed size. This is more useful than killing a Channel when we
719 // *receive* an oversized message, as we should consider oversized message
720 // transmission to be a bug and this helps easily identify offending code.
721 CHECK(message->data_num_bytes() < GetConfiguration().max_message_num_bytes);
722
723 base::AutoLock lock(channel_lock_);
724 if (!channel_)
725 DLOG(ERROR) << "Dropping message on closed channel.";
726 else
727 channel_->Write(std::move(message));
728 }
729
730 } // namespace core
731 } // namespace mojo
732