1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/core/ports/node.h"
6
7 #include <string.h>
8
9 #include <algorithm>
10 #include <utility>
11
12 #include "base/atomicops.h"
13 #include "base/containers/stack_container.h"
14 #include "base/lazy_instance.h"
15 #include "base/logging.h"
16 #include "base/memory/ref_counted.h"
17 #include "base/optional.h"
18 #include "base/synchronization/lock.h"
19 #include "base/threading/thread_local.h"
20 #include "build/build_config.h"
21 #include "mojo/core/ports/event.h"
22 #include "mojo/core/ports/node_delegate.h"
23 #include "mojo/core/ports/port_locker.h"
24
25 #if !defined(OS_NACL)
26 #include "crypto/random.h"
27 #else
28 #include "base/rand_util.h"
29 #endif
30
31 namespace mojo {
32 namespace core {
33 namespace ports {
34
35 namespace {
36
37 constexpr size_t kRandomNameCacheSize = 256;
38
39 // Random port name generator which maintains a cache of random bytes to draw
40 // from. This amortizes the cost of random name generation on platforms where
41 // RandBytes may have significant per-call overhead.
42 //
43 // Note that the use of this cache means one has to be careful about fork()ing
44 // a process once any port names have been generated, as that behavior can lead
45 // to collisions between independently generated names in different processes.
46 class RandomNameGenerator {
47 public:
48 RandomNameGenerator() = default;
49 ~RandomNameGenerator() = default;
50
GenerateRandomPortName()51 PortName GenerateRandomPortName() {
52 base::AutoLock lock(lock_);
53 if (cache_index_ == kRandomNameCacheSize) {
54 #if defined(OS_NACL)
55 base::RandBytes(cache_, sizeof(PortName) * kRandomNameCacheSize);
56 #else
57 crypto::RandBytes(cache_, sizeof(PortName) * kRandomNameCacheSize);
58 #endif
59 cache_index_ = 0;
60 }
61 return cache_[cache_index_++];
62 }
63
64 private:
65 base::Lock lock_;
66 PortName cache_[kRandomNameCacheSize];
67 size_t cache_index_ = kRandomNameCacheSize;
68
69 DISALLOW_COPY_AND_ASSIGN(RandomNameGenerator);
70 };
71
72 base::LazyInstance<RandomNameGenerator>::Leaky g_name_generator =
73 LAZY_INSTANCE_INITIALIZER;
74
DebugError(const char * message,int error_code)75 int DebugError(const char* message, int error_code) {
76 NOTREACHED() << "Oops: " << message;
77 return error_code;
78 }
79
80 #define OOPS(x) DebugError(#x, x)
81
CanAcceptMoreMessages(const Port * port)82 bool CanAcceptMoreMessages(const Port* port) {
83 // Have we already doled out the last message (i.e., do we expect to NOT
84 // receive further messages)?
85 uint64_t next_sequence_num = port->message_queue.next_sequence_num();
86 if (port->state == Port::kClosed)
87 return false;
88 if (port->peer_closed || port->remove_proxy_on_last_message) {
89 if (port->last_sequence_num_to_receive == next_sequence_num - 1)
90 return false;
91 }
92 return true;
93 }
94
GenerateRandomPortName(PortName * name)95 void GenerateRandomPortName(PortName* name) {
96 *name = g_name_generator.Get().GenerateRandomPortName();
97 }
98
99 } // namespace
100
Node(const NodeName & name,NodeDelegate * delegate)101 Node::Node(const NodeName& name, NodeDelegate* delegate)
102 : name_(name), delegate_(this, delegate) {}
103
~Node()104 Node::~Node() {
105 if (!ports_.empty())
106 DLOG(WARNING) << "Unclean shutdown for node " << name_;
107 }
108
CanShutdownCleanly(ShutdownPolicy policy)109 bool Node::CanShutdownCleanly(ShutdownPolicy policy) {
110 PortLocker::AssertNoPortsLockedOnCurrentThread();
111 base::AutoLock ports_lock(ports_lock_);
112
113 if (policy == ShutdownPolicy::DONT_ALLOW_LOCAL_PORTS) {
114 #if DCHECK_IS_ON()
115 for (auto& entry : ports_) {
116 DVLOG(2) << "Port " << entry.first << " referencing node "
117 << entry.second->peer_node_name << " is blocking shutdown of "
118 << "node " << name_ << " (state=" << entry.second->state << ")";
119 }
120 #endif
121 return ports_.empty();
122 }
123
124 DCHECK_EQ(policy, ShutdownPolicy::ALLOW_LOCAL_PORTS);
125
126 // NOTE: This is not efficient, though it probably doesn't need to be since
127 // relatively few ports should be open during shutdown and shutdown doesn't
128 // need to be blazingly fast.
129 bool can_shutdown = true;
130 for (auto& entry : ports_) {
131 PortRef port_ref(entry.first, entry.second);
132 SinglePortLocker locker(&port_ref);
133 auto* port = locker.port();
134 if (port->peer_node_name != name_ && port->state != Port::kReceiving) {
135 can_shutdown = false;
136 #if DCHECK_IS_ON()
137 DVLOG(2) << "Port " << entry.first << " referencing node "
138 << port->peer_node_name << " is blocking shutdown of "
139 << "node " << name_ << " (state=" << port->state << ")";
140 #else
141 // Exit early when not debugging.
142 break;
143 #endif
144 }
145 }
146
147 return can_shutdown;
148 }
149
GetPort(const PortName & port_name,PortRef * port_ref)150 int Node::GetPort(const PortName& port_name, PortRef* port_ref) {
151 PortLocker::AssertNoPortsLockedOnCurrentThread();
152 base::AutoLock lock(ports_lock_);
153 auto iter = ports_.find(port_name);
154 if (iter == ports_.end())
155 return ERROR_PORT_UNKNOWN;
156
157 #if defined(OS_ANDROID) && defined(ARCH_CPU_ARM64)
158 // Workaround for https://crbug.com/665869.
159 base::subtle::MemoryBarrier();
160 #endif
161
162 *port_ref = PortRef(port_name, iter->second);
163 return OK;
164 }
165
CreateUninitializedPort(PortRef * port_ref)166 int Node::CreateUninitializedPort(PortRef* port_ref) {
167 PortName port_name;
168 GenerateRandomPortName(&port_name);
169
170 scoped_refptr<Port> port(new Port(kInitialSequenceNum, kInitialSequenceNum));
171 int rv = AddPortWithName(port_name, port);
172 if (rv != OK)
173 return rv;
174
175 *port_ref = PortRef(port_name, std::move(port));
176 return OK;
177 }
178
InitializePort(const PortRef & port_ref,const NodeName & peer_node_name,const PortName & peer_port_name)179 int Node::InitializePort(const PortRef& port_ref,
180 const NodeName& peer_node_name,
181 const PortName& peer_port_name) {
182 {
183 SinglePortLocker locker(&port_ref);
184 auto* port = locker.port();
185 if (port->state != Port::kUninitialized)
186 return ERROR_PORT_STATE_UNEXPECTED;
187
188 port->state = Port::kReceiving;
189 port->peer_node_name = peer_node_name;
190 port->peer_port_name = peer_port_name;
191 }
192
193 delegate_->PortStatusChanged(port_ref);
194
195 return OK;
196 }
197
CreatePortPair(PortRef * port0_ref,PortRef * port1_ref)198 int Node::CreatePortPair(PortRef* port0_ref, PortRef* port1_ref) {
199 int rv;
200
201 rv = CreateUninitializedPort(port0_ref);
202 if (rv != OK)
203 return rv;
204
205 rv = CreateUninitializedPort(port1_ref);
206 if (rv != OK)
207 return rv;
208
209 rv = InitializePort(*port0_ref, name_, port1_ref->name());
210 if (rv != OK)
211 return rv;
212
213 rv = InitializePort(*port1_ref, name_, port0_ref->name());
214 if (rv != OK)
215 return rv;
216
217 return OK;
218 }
219
SetUserData(const PortRef & port_ref,scoped_refptr<UserData> user_data)220 int Node::SetUserData(const PortRef& port_ref,
221 scoped_refptr<UserData> user_data) {
222 SinglePortLocker locker(&port_ref);
223 auto* port = locker.port();
224 if (port->state == Port::kClosed)
225 return ERROR_PORT_STATE_UNEXPECTED;
226
227 port->user_data = std::move(user_data);
228
229 return OK;
230 }
231
GetUserData(const PortRef & port_ref,scoped_refptr<UserData> * user_data)232 int Node::GetUserData(const PortRef& port_ref,
233 scoped_refptr<UserData>* user_data) {
234 SinglePortLocker locker(&port_ref);
235 auto* port = locker.port();
236 if (port->state == Port::kClosed)
237 return ERROR_PORT_STATE_UNEXPECTED;
238
239 *user_data = port->user_data;
240
241 return OK;
242 }
243
ClosePort(const PortRef & port_ref)244 int Node::ClosePort(const PortRef& port_ref) {
245 std::vector<std::unique_ptr<UserMessageEvent>> undelivered_messages;
246 NodeName peer_node_name;
247 PortName peer_port_name;
248 uint64_t last_sequence_num = 0;
249 bool was_initialized = false;
250 {
251 SinglePortLocker locker(&port_ref);
252 auto* port = locker.port();
253 switch (port->state) {
254 case Port::kUninitialized:
255 break;
256
257 case Port::kReceiving:
258 was_initialized = true;
259 port->state = Port::kClosed;
260
261 // We pass along the sequence number of the last message sent from this
262 // port to allow the peer to have the opportunity to consume all inbound
263 // messages before notifying the embedder that this port is closed.
264 last_sequence_num = port->next_sequence_num_to_send - 1;
265
266 peer_node_name = port->peer_node_name;
267 peer_port_name = port->peer_port_name;
268
269 // If the port being closed still has unread messages, then we need to
270 // take care to close those ports so as to avoid leaking memory.
271 port->message_queue.TakeAllMessages(&undelivered_messages);
272 break;
273
274 default:
275 return ERROR_PORT_STATE_UNEXPECTED;
276 }
277 }
278
279 ErasePort(port_ref.name());
280
281 if (was_initialized) {
282 DVLOG(2) << "Sending ObserveClosure from " << port_ref.name() << "@"
283 << name_ << " to " << peer_port_name << "@" << peer_node_name;
284 delegate_->ForwardEvent(peer_node_name,
285 std::make_unique<ObserveClosureEvent>(
286 peer_port_name, last_sequence_num));
287 for (const auto& message : undelivered_messages) {
288 for (size_t i = 0; i < message->num_ports(); ++i) {
289 PortRef ref;
290 if (GetPort(message->ports()[i], &ref) == OK)
291 ClosePort(ref);
292 }
293 }
294 }
295 return OK;
296 }
297
GetStatus(const PortRef & port_ref,PortStatus * port_status)298 int Node::GetStatus(const PortRef& port_ref, PortStatus* port_status) {
299 SinglePortLocker locker(&port_ref);
300 auto* port = locker.port();
301 if (port->state != Port::kReceiving)
302 return ERROR_PORT_STATE_UNEXPECTED;
303
304 port_status->has_messages = port->message_queue.HasNextMessage();
305 port_status->receiving_messages = CanAcceptMoreMessages(port);
306 port_status->peer_closed = port->peer_closed;
307 port_status->peer_remote = port->peer_node_name != name_;
308 port_status->queued_message_count =
309 port->message_queue.queued_message_count();
310 port_status->queued_num_bytes = port->message_queue.queued_num_bytes();
311 return OK;
312 }
313
GetMessage(const PortRef & port_ref,std::unique_ptr<UserMessageEvent> * message,MessageFilter * filter)314 int Node::GetMessage(const PortRef& port_ref,
315 std::unique_ptr<UserMessageEvent>* message,
316 MessageFilter* filter) {
317 *message = nullptr;
318
319 DVLOG(4) << "GetMessage for " << port_ref.name() << "@" << name_;
320
321 {
322 SinglePortLocker locker(&port_ref);
323 auto* port = locker.port();
324
325 // This could also be treated like the port being unknown since the
326 // embedder should no longer be referring to a port that has been sent.
327 if (port->state != Port::kReceiving)
328 return ERROR_PORT_STATE_UNEXPECTED;
329
330 // Let the embedder get messages until there are no more before reporting
331 // that the peer closed its end.
332 if (!CanAcceptMoreMessages(port))
333 return ERROR_PORT_PEER_CLOSED;
334
335 port->message_queue.GetNextMessage(message, filter);
336 }
337
338 // Allow referenced ports to trigger PortStatusChanged calls.
339 if (*message) {
340 for (size_t i = 0; i < (*message)->num_ports(); ++i) {
341 PortRef new_port_ref;
342 int rv = GetPort((*message)->ports()[i], &new_port_ref);
343
344 DCHECK_EQ(OK, rv) << "Port " << new_port_ref.name() << "@" << name_
345 << " does not exist!";
346
347 SinglePortLocker locker(&new_port_ref);
348 DCHECK(locker.port()->state == Port::kReceiving);
349 locker.port()->message_queue.set_signalable(true);
350 }
351
352 // The user may retransmit this message from another port. We reset the
353 // sequence number so that the message will get a new one if that happens.
354 (*message)->set_sequence_num(0);
355 }
356
357 return OK;
358 }
359
SendUserMessage(const PortRef & port_ref,std::unique_ptr<UserMessageEvent> message)360 int Node::SendUserMessage(const PortRef& port_ref,
361 std::unique_ptr<UserMessageEvent> message) {
362 int rv = SendUserMessageInternal(port_ref, &message);
363 if (rv != OK) {
364 // If send failed, close all carried ports. Note that we're careful not to
365 // close the sending port itself if it happened to be one of the encoded
366 // ports (an invalid but possible condition.)
367 for (size_t i = 0; i < message->num_ports(); ++i) {
368 if (message->ports()[i] == port_ref.name())
369 continue;
370
371 PortRef port;
372 if (GetPort(message->ports()[i], &port) == OK)
373 ClosePort(port);
374 }
375 }
376 return rv;
377 }
378
AcceptEvent(ScopedEvent event)379 int Node::AcceptEvent(ScopedEvent event) {
380 switch (event->type()) {
381 case Event::Type::kUserMessage:
382 return OnUserMessage(Event::Cast<UserMessageEvent>(&event));
383 case Event::Type::kPortAccepted:
384 return OnPortAccepted(Event::Cast<PortAcceptedEvent>(&event));
385 case Event::Type::kObserveProxy:
386 return OnObserveProxy(Event::Cast<ObserveProxyEvent>(&event));
387 case Event::Type::kObserveProxyAck:
388 return OnObserveProxyAck(Event::Cast<ObserveProxyAckEvent>(&event));
389 case Event::Type::kObserveClosure:
390 return OnObserveClosure(Event::Cast<ObserveClosureEvent>(&event));
391 case Event::Type::kMergePort:
392 return OnMergePort(Event::Cast<MergePortEvent>(&event));
393 }
394 return OOPS(ERROR_NOT_IMPLEMENTED);
395 }
396
MergePorts(const PortRef & port_ref,const NodeName & destination_node_name,const PortName & destination_port_name)397 int Node::MergePorts(const PortRef& port_ref,
398 const NodeName& destination_node_name,
399 const PortName& destination_port_name) {
400 PortName new_port_name;
401 Event::PortDescriptor new_port_descriptor;
402 {
403 SinglePortLocker locker(&port_ref);
404
405 DVLOG(1) << "Sending MergePort from " << port_ref.name() << "@" << name_
406 << " to " << destination_port_name << "@" << destination_node_name;
407
408 // Send the port-to-merge over to the destination node so it can be merged
409 // into the port cycle atomically there.
410 new_port_name = port_ref.name();
411 ConvertToProxy(locker.port(), destination_node_name, &new_port_name,
412 &new_port_descriptor);
413 }
414
415 if (new_port_descriptor.peer_node_name == name_ &&
416 destination_node_name != name_) {
417 // Ensure that the locally retained peer of the new proxy gets a status
418 // update so it notices that its peer is now remote.
419 PortRef local_peer;
420 if (GetPort(new_port_descriptor.peer_port_name, &local_peer) == OK)
421 delegate_->PortStatusChanged(local_peer);
422 }
423
424 delegate_->ForwardEvent(
425 destination_node_name,
426 std::make_unique<MergePortEvent>(destination_port_name, new_port_name,
427 new_port_descriptor));
428 return OK;
429 }
430
MergeLocalPorts(const PortRef & port0_ref,const PortRef & port1_ref)431 int Node::MergeLocalPorts(const PortRef& port0_ref, const PortRef& port1_ref) {
432 DVLOG(1) << "Merging local ports " << port0_ref.name() << "@" << name_
433 << " and " << port1_ref.name() << "@" << name_;
434 return MergePortsInternal(port0_ref, port1_ref,
435 true /* allow_close_on_bad_state */);
436 }
437
LostConnectionToNode(const NodeName & node_name)438 int Node::LostConnectionToNode(const NodeName& node_name) {
439 // We can no longer send events to the given node. We also can't expect any
440 // PortAccepted events.
441
442 DVLOG(1) << "Observing lost connection from node " << name_ << " to node "
443 << node_name;
444
445 DestroyAllPortsWithPeer(node_name, kInvalidPortName);
446 return OK;
447 }
448
OnUserMessage(std::unique_ptr<UserMessageEvent> message)449 int Node::OnUserMessage(std::unique_ptr<UserMessageEvent> message) {
450 PortName port_name = message->port_name();
451
452 #if DCHECK_IS_ON()
453 std::ostringstream ports_buf;
454 for (size_t i = 0; i < message->num_ports(); ++i) {
455 if (i > 0)
456 ports_buf << ",";
457 ports_buf << message->ports()[i];
458 }
459
460 DVLOG(4) << "OnUserMessage " << message->sequence_num()
461 << " [ports=" << ports_buf.str() << "] at " << port_name << "@"
462 << name_;
463 #endif
464
465 // Even if this port does not exist, cannot receive anymore messages or is
466 // buffering or proxying messages, we still need these ports to be bound to
467 // this node. When the message is forwarded, these ports will get transferred
468 // following the usual method. If the message cannot be accepted, then the
469 // newly bound ports will simply be closed.
470 for (size_t i = 0; i < message->num_ports(); ++i) {
471 Event::PortDescriptor& descriptor = message->port_descriptors()[i];
472 if (descriptor.referring_node_name == kInvalidNodeName) {
473 // If the referring node name is invalid, this descriptor can be ignored
474 // and the port should already exist locally.
475 PortRef port_ref;
476 if (GetPort(message->ports()[i], &port_ref) != OK)
477 return ERROR_PORT_UNKNOWN;
478 } else {
479 int rv = AcceptPort(message->ports()[i], descriptor);
480 if (rv != OK)
481 return rv;
482
483 // Ensure that the referring node is wiped out of this descriptor. This
484 // allows the event to be forwarded across multiple local hops without
485 // attempting to accept the port more than once.
486 descriptor.referring_node_name = kInvalidNodeName;
487 }
488 }
489
490 PortRef port_ref;
491 GetPort(port_name, &port_ref);
492 bool has_next_message = false;
493 bool message_accepted = false;
494 bool should_forward_messages = false;
495 if (port_ref.is_valid()) {
496 SinglePortLocker locker(&port_ref);
497 auto* port = locker.port();
498
499 // Reject spurious messages if we've already received the last expected
500 // message.
501 if (CanAcceptMoreMessages(port)) {
502 message_accepted = true;
503 port->message_queue.AcceptMessage(std::move(message), &has_next_message);
504
505 if (port->state == Port::kBuffering) {
506 has_next_message = false;
507 } else if (port->state == Port::kProxying) {
508 has_next_message = false;
509 should_forward_messages = true;
510 }
511 }
512 }
513
514 if (should_forward_messages) {
515 int rv = ForwardUserMessagesFromProxy(port_ref);
516 if (rv != OK)
517 return rv;
518 TryRemoveProxy(port_ref);
519 }
520
521 if (!message_accepted) {
522 DVLOG(2) << "Message not accepted!\n";
523 // Close all newly accepted ports as they are effectively orphaned.
524 for (size_t i = 0; i < message->num_ports(); ++i) {
525 PortRef attached_port_ref;
526 if (GetPort(message->ports()[i], &attached_port_ref) == OK) {
527 ClosePort(attached_port_ref);
528 } else {
529 DLOG(WARNING) << "Cannot close non-existent port!\n";
530 }
531 }
532 } else if (has_next_message) {
533 delegate_->PortStatusChanged(port_ref);
534 }
535
536 return OK;
537 }
538
OnPortAccepted(std::unique_ptr<PortAcceptedEvent> event)539 int Node::OnPortAccepted(std::unique_ptr<PortAcceptedEvent> event) {
540 PortRef port_ref;
541 if (GetPort(event->port_name(), &port_ref) != OK)
542 return ERROR_PORT_UNKNOWN;
543
544 #if DCHECK_IS_ON()
545 {
546 SinglePortLocker locker(&port_ref);
547 DVLOG(2) << "PortAccepted at " << port_ref.name() << "@" << name_
548 << " pointing to " << locker.port()->peer_port_name << "@"
549 << locker.port()->peer_node_name;
550 }
551 #endif
552
553 return BeginProxying(port_ref);
554 }
555
OnObserveProxy(std::unique_ptr<ObserveProxyEvent> event)556 int Node::OnObserveProxy(std::unique_ptr<ObserveProxyEvent> event) {
557 if (event->port_name() == kInvalidPortName) {
558 // An ObserveProxy with an invalid target port name is a broadcast used to
559 // inform ports when their peer (which was itself a proxy) has become
560 // defunct due to unexpected node disconnection.
561 //
562 // Receiving ports affected by this treat it as equivalent to peer closure.
563 // Proxies affected by this can be removed and will in turn broadcast their
564 // own death with a similar message.
565 DCHECK_EQ(event->proxy_target_node_name(), kInvalidNodeName);
566 DCHECK_EQ(event->proxy_target_port_name(), kInvalidPortName);
567 DestroyAllPortsWithPeer(event->proxy_node_name(), event->proxy_port_name());
568 return OK;
569 }
570
571 // The port may have already been closed locally, in which case the
572 // ObserveClosure message will contain the last_sequence_num field.
573 // We can then silently ignore this message.
574 PortRef port_ref;
575 if (GetPort(event->port_name(), &port_ref) != OK) {
576 DVLOG(1) << "ObserveProxy: " << event->port_name() << "@" << name_
577 << " not found";
578 return OK;
579 }
580
581 DVLOG(2) << "ObserveProxy at " << port_ref.name() << "@" << name_
582 << ", proxy at " << event->proxy_port_name() << "@"
583 << event->proxy_node_name() << " pointing to "
584 << event->proxy_target_port_name() << "@"
585 << event->proxy_target_node_name();
586
587 bool update_status = false;
588 ScopedEvent event_to_forward;
589 NodeName event_target_node;
590 {
591 SinglePortLocker locker(&port_ref);
592 auto* port = locker.port();
593
594 if (port->peer_node_name == event->proxy_node_name() &&
595 port->peer_port_name == event->proxy_port_name()) {
596 if (port->state == Port::kReceiving) {
597 port->peer_node_name = event->proxy_target_node_name();
598 port->peer_port_name = event->proxy_target_port_name();
599 event_target_node = event->proxy_node_name();
600 event_to_forward = std::make_unique<ObserveProxyAckEvent>(
601 event->proxy_port_name(), port->next_sequence_num_to_send - 1);
602 update_status = true;
603 DVLOG(2) << "Forwarding ObserveProxyAck from " << event->port_name()
604 << "@" << name_ << " to " << event->proxy_port_name() << "@"
605 << event_target_node;
606 } else {
607 // As a proxy ourselves, we don't know how to honor the ObserveProxy
608 // event or to populate the last_sequence_num field of ObserveProxyAck.
609 // Afterall, another port could be sending messages to our peer now
610 // that we've sent out our own ObserveProxy event. Instead, we will
611 // send an ObserveProxyAck indicating that the ObserveProxy event
612 // should be re-sent (last_sequence_num set to kInvalidSequenceNum).
613 // However, this has to be done after we are removed as a proxy.
614 // Otherwise, we might just find ourselves back here again, which
615 // would be akin to a busy loop.
616
617 DVLOG(2) << "Delaying ObserveProxyAck to " << event->proxy_port_name()
618 << "@" << event->proxy_node_name();
619
620 port->send_on_proxy_removal.reset(new std::pair<NodeName, ScopedEvent>(
621 event->proxy_node_name(),
622 std::make_unique<ObserveProxyAckEvent>(event->proxy_port_name(),
623 kInvalidSequenceNum)));
624 }
625 } else {
626 // Forward this event along to our peer. Eventually, it should find the
627 // port referring to the proxy.
628 event_target_node = port->peer_node_name;
629 event->set_port_name(port->peer_port_name);
630 event_to_forward = std::move(event);
631 }
632 }
633
634 if (event_to_forward)
635 delegate_->ForwardEvent(event_target_node, std::move(event_to_forward));
636
637 if (update_status)
638 delegate_->PortStatusChanged(port_ref);
639
640 return OK;
641 }
642
OnObserveProxyAck(std::unique_ptr<ObserveProxyAckEvent> event)643 int Node::OnObserveProxyAck(std::unique_ptr<ObserveProxyAckEvent> event) {
644 DVLOG(2) << "ObserveProxyAck at " << event->port_name() << "@" << name_
645 << " (last_sequence_num=" << event->last_sequence_num() << ")";
646
647 PortRef port_ref;
648 if (GetPort(event->port_name(), &port_ref) != OK)
649 return ERROR_PORT_UNKNOWN; // The port may have observed closure first.
650
651 bool try_remove_proxy_immediately;
652 {
653 SinglePortLocker locker(&port_ref);
654 auto* port = locker.port();
655 if (port->state != Port::kProxying)
656 return OOPS(ERROR_PORT_STATE_UNEXPECTED);
657
658 // If the last sequence number is invalid, this is a signal that we need to
659 // retransmit the ObserveProxy event for this port rather than flagging the
660 // the proxy for removal ASAP.
661 try_remove_proxy_immediately =
662 event->last_sequence_num() != kInvalidSequenceNum;
663 if (try_remove_proxy_immediately) {
664 // We can now remove this port once we have received and forwarded the
665 // last message addressed to this port.
666 port->remove_proxy_on_last_message = true;
667 port->last_sequence_num_to_receive = event->last_sequence_num();
668 }
669 }
670
671 if (try_remove_proxy_immediately)
672 TryRemoveProxy(port_ref);
673 else
674 InitiateProxyRemoval(port_ref);
675
676 return OK;
677 }
678
OnObserveClosure(std::unique_ptr<ObserveClosureEvent> event)679 int Node::OnObserveClosure(std::unique_ptr<ObserveClosureEvent> event) {
680 // OK if the port doesn't exist, as it may have been closed already.
681 PortRef port_ref;
682 if (GetPort(event->port_name(), &port_ref) != OK)
683 return OK;
684
685 // This message tells the port that it should no longer expect more messages
686 // beyond last_sequence_num. This message is forwarded along until we reach
687 // the receiving end, and this message serves as an equivalent to
688 // ObserveProxyAck.
689
690 bool notify_delegate = false;
691 NodeName peer_node_name;
692 PortName peer_port_name;
693 bool try_remove_proxy = false;
694 {
695 SinglePortLocker locker(&port_ref);
696 auto* port = locker.port();
697
698 port->peer_closed = true;
699 port->last_sequence_num_to_receive = event->last_sequence_num();
700
701 DVLOG(2) << "ObserveClosure at " << port_ref.name() << "@" << name_
702 << " (state=" << port->state << ") pointing to "
703 << port->peer_port_name << "@" << port->peer_node_name
704 << " (last_sequence_num=" << event->last_sequence_num() << ")";
705
706 // We always forward ObserveClosure, even beyond the receiving port which
707 // cares about it. This ensures that any dead-end proxies beyond that port
708 // are notified to remove themselves.
709
710 if (port->state == Port::kReceiving) {
711 notify_delegate = true;
712
713 // When forwarding along the other half of the port cycle, this will only
714 // reach dead-end proxies. Tell them we've sent our last message so they
715 // can go away.
716 //
717 // TODO: Repurposing ObserveClosure for this has the desired result but
718 // may be semantically confusing since the forwarding port is not actually
719 // closed. Consider replacing this with a new event type.
720 event->set_last_sequence_num(port->next_sequence_num_to_send - 1);
721 } else {
722 // We haven't yet reached the receiving peer of the closed port, so we'll
723 // forward the message along as-is.
724 // See about removing the port if it is a proxy as our peer won't be able
725 // to participate in proxy removal.
726 port->remove_proxy_on_last_message = true;
727 if (port->state == Port::kProxying)
728 try_remove_proxy = true;
729 }
730
731 DVLOG(2) << "Forwarding ObserveClosure from " << port_ref.name() << "@"
732 << name_ << " to peer " << port->peer_port_name << "@"
733 << port->peer_node_name
734 << " (last_sequence_num=" << event->last_sequence_num() << ")";
735
736 peer_node_name = port->peer_node_name;
737 peer_port_name = port->peer_port_name;
738 }
739
740 if (try_remove_proxy)
741 TryRemoveProxy(port_ref);
742
743 event->set_port_name(peer_port_name);
744 delegate_->ForwardEvent(peer_node_name, std::move(event));
745
746 if (notify_delegate)
747 delegate_->PortStatusChanged(port_ref);
748
749 return OK;
750 }
751
OnMergePort(std::unique_ptr<MergePortEvent> event)752 int Node::OnMergePort(std::unique_ptr<MergePortEvent> event) {
753 PortRef port_ref;
754 GetPort(event->port_name(), &port_ref);
755
756 DVLOG(1) << "MergePort at " << port_ref.name() << "@" << name_
757 << " merging with proxy " << event->new_port_name() << "@" << name_
758 << " pointing to " << event->new_port_descriptor().peer_port_name
759 << "@" << event->new_port_descriptor().peer_node_name
760 << " referred by "
761 << event->new_port_descriptor().referring_port_name << "@"
762 << event->new_port_descriptor().referring_node_name;
763
764 // Accept the new port. This is now the receiving end of the other port cycle
765 // to be merged with ours. Note that we always attempt to accept the new port
766 // first as otherwise its peer receiving port could be left stranded
767 // indefinitely.
768 if (AcceptPort(event->new_port_name(), event->new_port_descriptor()) != OK) {
769 if (port_ref.is_valid())
770 ClosePort(port_ref);
771 return ERROR_PORT_STATE_UNEXPECTED;
772 }
773
774 PortRef new_port_ref;
775 GetPort(event->new_port_name(), &new_port_ref);
776 if (!port_ref.is_valid() && new_port_ref.is_valid()) {
777 ClosePort(new_port_ref);
778 return ERROR_PORT_UNKNOWN;
779 } else if (port_ref.is_valid() && !new_port_ref.is_valid()) {
780 ClosePort(port_ref);
781 return ERROR_PORT_UNKNOWN;
782 }
783
784 return MergePortsInternal(port_ref, new_port_ref,
785 false /* allow_close_on_bad_state */);
786 }
787
AddPortWithName(const PortName & port_name,scoped_refptr<Port> port)788 int Node::AddPortWithName(const PortName& port_name, scoped_refptr<Port> port) {
789 PortLocker::AssertNoPortsLockedOnCurrentThread();
790 base::AutoLock lock(ports_lock_);
791 if (!ports_.emplace(port_name, std::move(port)).second)
792 return OOPS(ERROR_PORT_EXISTS); // Suggests a bad UUID generator.
793 DVLOG(2) << "Created port " << port_name << "@" << name_;
794 return OK;
795 }
796
ErasePort(const PortName & port_name)797 void Node::ErasePort(const PortName& port_name) {
798 PortLocker::AssertNoPortsLockedOnCurrentThread();
799 scoped_refptr<Port> port;
800 {
801 base::AutoLock lock(ports_lock_);
802 auto it = ports_.find(port_name);
803 if (it == ports_.end())
804 return;
805 port = std::move(it->second);
806 ports_.erase(it);
807 }
808 // NOTE: We are careful not to release the port's messages while holding any
809 // locks, since they may run arbitrary user code upon destruction.
810 std::vector<std::unique_ptr<UserMessageEvent>> messages;
811 {
812 PortRef port_ref(port_name, std::move(port));
813 SinglePortLocker locker(&port_ref);
814 locker.port()->message_queue.TakeAllMessages(&messages);
815 }
816 DVLOG(2) << "Deleted port " << port_name << "@" << name_;
817 }
818
SendUserMessageInternal(const PortRef & port_ref,std::unique_ptr<UserMessageEvent> * message)819 int Node::SendUserMessageInternal(const PortRef& port_ref,
820 std::unique_ptr<UserMessageEvent>* message) {
821 std::unique_ptr<UserMessageEvent>& m = *message;
822 for (size_t i = 0; i < m->num_ports(); ++i) {
823 if (m->ports()[i] == port_ref.name())
824 return ERROR_PORT_CANNOT_SEND_SELF;
825 }
826
827 NodeName target_node;
828 int rv = PrepareToForwardUserMessage(port_ref, Port::kReceiving,
829 false /* ignore_closed_peer */, m.get(),
830 &target_node);
831 if (rv != OK)
832 return rv;
833
834 // Beyond this point there's no sense in returning anything but OK. Even if
835 // message forwarding or acceptance fails, there's nothing the embedder can
836 // do to recover. Assume that failure beyond this point must be treated as a
837 // transport failure.
838
839 DCHECK_NE(kInvalidNodeName, target_node);
840 if (target_node != name_) {
841 delegate_->ForwardEvent(target_node, std::move(m));
842 return OK;
843 }
844
845 int accept_result = AcceptEvent(std::move(m));
846 if (accept_result != OK) {
847 // See comment above for why we don't return an error in this case.
848 DVLOG(2) << "AcceptEvent failed: " << accept_result;
849 }
850
851 return OK;
852 }
853
MergePortsInternal(const PortRef & port0_ref,const PortRef & port1_ref,bool allow_close_on_bad_state)854 int Node::MergePortsInternal(const PortRef& port0_ref,
855 const PortRef& port1_ref,
856 bool allow_close_on_bad_state) {
857 const PortRef* port_refs[2] = {&port0_ref, &port1_ref};
858 {
859 base::Optional<PortLocker> locker(base::in_place, port_refs, 2);
860 auto* port0 = locker->GetPort(port0_ref);
861 auto* port1 = locker->GetPort(port1_ref);
862
863 // There are several conditions which must be met before we'll consider
864 // merging two ports:
865 //
866 // - They must both be in the kReceiving state
867 // - They must not be each other's peer
868 // - They must have never sent a user message
869 //
870 // If any of these criteria are not met, we fail early.
871 if (port0->state != Port::kReceiving || port1->state != Port::kReceiving ||
872 (port0->peer_node_name == name_ &&
873 port0->peer_port_name == port1_ref.name()) ||
874 (port1->peer_node_name == name_ &&
875 port1->peer_port_name == port0_ref.name()) ||
876 port0->next_sequence_num_to_send != kInitialSequenceNum ||
877 port1->next_sequence_num_to_send != kInitialSequenceNum) {
878 // On failure, we only close a port if it was at least properly in the
879 // |kReceiving| state. This avoids getting the system in an inconsistent
880 // state by e.g. closing a proxy abruptly.
881 //
882 // Note that we must release the port locks before closing ports.
883 const bool close_port0 =
884 port0->state == Port::kReceiving || allow_close_on_bad_state;
885 const bool close_port1 =
886 port1->state == Port::kReceiving || allow_close_on_bad_state;
887 locker.reset();
888 if (close_port0)
889 ClosePort(port0_ref);
890 if (close_port1)
891 ClosePort(port1_ref);
892 return ERROR_PORT_STATE_UNEXPECTED;
893 }
894
895 // Swap the ports' peer information and switch them both to proxying mode.
896 std::swap(port0->peer_node_name, port1->peer_node_name);
897 std::swap(port0->peer_port_name, port1->peer_port_name);
898 port0->state = Port::kProxying;
899 port1->state = Port::kProxying;
900 if (port0->peer_closed)
901 port0->remove_proxy_on_last_message = true;
902 if (port1->peer_closed)
903 port1->remove_proxy_on_last_message = true;
904 }
905
906 // Flush any queued messages from the new proxies and, if successful, complete
907 // the merge by initiating proxy removals.
908 if (ForwardUserMessagesFromProxy(port0_ref) == OK &&
909 ForwardUserMessagesFromProxy(port1_ref) == OK) {
910 for (size_t i = 0; i < 2; ++i) {
911 bool try_remove_proxy_immediately = false;
912 ScopedEvent closure_event;
913 NodeName closure_event_target_node;
914 {
915 SinglePortLocker locker(port_refs[i]);
916 auto* port = locker.port();
917 DCHECK(port->state == Port::kProxying);
918 try_remove_proxy_immediately = port->remove_proxy_on_last_message;
919 if (try_remove_proxy_immediately || port->peer_closed) {
920 // If either end of the port cycle is closed, we propagate an
921 // ObserveClosure event.
922 closure_event_target_node = port->peer_node_name;
923 closure_event = std::make_unique<ObserveClosureEvent>(
924 port->peer_port_name, port->last_sequence_num_to_receive);
925 }
926 }
927 if (try_remove_proxy_immediately)
928 TryRemoveProxy(*port_refs[i]);
929 else
930 InitiateProxyRemoval(*port_refs[i]);
931
932 if (closure_event) {
933 delegate_->ForwardEvent(closure_event_target_node,
934 std::move(closure_event));
935 }
936 }
937
938 return OK;
939 }
940
941 // If we failed to forward proxied messages, we keep the system in a
942 // consistent state by undoing the peer swap and closing the ports.
943 {
944 PortLocker locker(port_refs, 2);
945 auto* port0 = locker.GetPort(port0_ref);
946 auto* port1 = locker.GetPort(port1_ref);
947 std::swap(port0->peer_node_name, port1->peer_node_name);
948 std::swap(port0->peer_port_name, port1->peer_port_name);
949 port0->remove_proxy_on_last_message = false;
950 port1->remove_proxy_on_last_message = false;
951 DCHECK_EQ(Port::kProxying, port0->state);
952 DCHECK_EQ(Port::kProxying, port1->state);
953 port0->state = Port::kReceiving;
954 port1->state = Port::kReceiving;
955 }
956
957 ClosePort(port0_ref);
958 ClosePort(port1_ref);
959 return ERROR_PORT_STATE_UNEXPECTED;
960 }
961
ConvertToProxy(Port * port,const NodeName & to_node_name,PortName * port_name,Event::PortDescriptor * port_descriptor)962 void Node::ConvertToProxy(Port* port,
963 const NodeName& to_node_name,
964 PortName* port_name,
965 Event::PortDescriptor* port_descriptor) {
966 port->AssertLockAcquired();
967 PortName local_port_name = *port_name;
968
969 PortName new_port_name;
970 GenerateRandomPortName(&new_port_name);
971
972 // Make sure we don't send messages to the new peer until after we know it
973 // exists. In the meantime, just buffer messages locally.
974 DCHECK(port->state == Port::kReceiving);
975 port->state = Port::kBuffering;
976
977 // If we already know our peer is closed, we already know this proxy can
978 // be removed once it receives and forwards its last expected message.
979 if (port->peer_closed)
980 port->remove_proxy_on_last_message = true;
981
982 *port_name = new_port_name;
983
984 port_descriptor->peer_node_name = port->peer_node_name;
985 port_descriptor->peer_port_name = port->peer_port_name;
986 port_descriptor->referring_node_name = name_;
987 port_descriptor->referring_port_name = local_port_name;
988 port_descriptor->next_sequence_num_to_send = port->next_sequence_num_to_send;
989 port_descriptor->next_sequence_num_to_receive =
990 port->message_queue.next_sequence_num();
991 port_descriptor->last_sequence_num_to_receive =
992 port->last_sequence_num_to_receive;
993 port_descriptor->peer_closed = port->peer_closed;
994 memset(port_descriptor->padding, 0, sizeof(port_descriptor->padding));
995
996 // Configure the local port to point to the new port.
997 port->peer_node_name = to_node_name;
998 port->peer_port_name = new_port_name;
999 }
1000
AcceptPort(const PortName & port_name,const Event::PortDescriptor & port_descriptor)1001 int Node::AcceptPort(const PortName& port_name,
1002 const Event::PortDescriptor& port_descriptor) {
1003 scoped_refptr<Port> port =
1004 base::MakeRefCounted<Port>(port_descriptor.next_sequence_num_to_send,
1005 port_descriptor.next_sequence_num_to_receive);
1006 port->state = Port::kReceiving;
1007 port->peer_node_name = port_descriptor.peer_node_name;
1008 port->peer_port_name = port_descriptor.peer_port_name;
1009 port->last_sequence_num_to_receive =
1010 port_descriptor.last_sequence_num_to_receive;
1011 port->peer_closed = port_descriptor.peer_closed;
1012
1013 DVLOG(2) << "Accepting port " << port_name
1014 << " [peer_closed=" << port->peer_closed
1015 << "; last_sequence_num_to_receive="
1016 << port->last_sequence_num_to_receive << "]";
1017
1018 // A newly accepted port is not signalable until the message referencing the
1019 // new port finds its way to the consumer (see GetMessage).
1020 port->message_queue.set_signalable(false);
1021
1022 int rv = AddPortWithName(port_name, std::move(port));
1023 if (rv != OK)
1024 return rv;
1025
1026 // Allow referring port to forward messages.
1027 delegate_->ForwardEvent(
1028 port_descriptor.referring_node_name,
1029 std::make_unique<PortAcceptedEvent>(port_descriptor.referring_port_name));
1030 return OK;
1031 }
1032
PrepareToForwardUserMessage(const PortRef & forwarding_port_ref,Port::State expected_port_state,bool ignore_closed_peer,UserMessageEvent * message,NodeName * forward_to_node)1033 int Node::PrepareToForwardUserMessage(const PortRef& forwarding_port_ref,
1034 Port::State expected_port_state,
1035 bool ignore_closed_peer,
1036 UserMessageEvent* message,
1037 NodeName* forward_to_node) {
1038 bool target_is_remote = false;
1039 for (;;) {
1040 NodeName target_node_name;
1041 {
1042 SinglePortLocker locker(&forwarding_port_ref);
1043 target_node_name = locker.port()->peer_node_name;
1044 }
1045
1046 // NOTE: This may call out to arbitrary user code, so it's important to call
1047 // it only while no port locks are held on the calling thread.
1048 if (target_node_name != name_) {
1049 if (!message->NotifyWillBeRoutedExternally()) {
1050 LOG(ERROR) << "NotifyWillBeRoutedExternally failed unexpectedly.";
1051 return ERROR_PORT_STATE_UNEXPECTED;
1052 }
1053 }
1054
1055 // Simultaneously lock the forwarding port as well as all attached ports.
1056 base::StackVector<PortRef, 4> attached_port_refs;
1057 base::StackVector<const PortRef*, 5> ports_to_lock;
1058 attached_port_refs.container().resize(message->num_ports());
1059 ports_to_lock.container().resize(message->num_ports() + 1);
1060 ports_to_lock[0] = &forwarding_port_ref;
1061 for (size_t i = 0; i < message->num_ports(); ++i) {
1062 GetPort(message->ports()[i], &attached_port_refs[i]);
1063 DCHECK(attached_port_refs[i].is_valid());
1064 ports_to_lock[i + 1] = &attached_port_refs[i];
1065 }
1066 PortLocker locker(ports_to_lock.container().data(),
1067 ports_to_lock.container().size());
1068 auto* forwarding_port = locker.GetPort(forwarding_port_ref);
1069
1070 if (forwarding_port->peer_node_name != target_node_name) {
1071 // The target node has already changed since we last held the lock.
1072 if (target_node_name == name_) {
1073 // If the target node was previously this local node, we need to restart
1074 // the loop, since that means we may now route the message externally.
1075 continue;
1076 }
1077
1078 target_node_name = forwarding_port->peer_node_name;
1079 }
1080 target_is_remote = target_node_name != name_;
1081
1082 if (forwarding_port->state != expected_port_state)
1083 return ERROR_PORT_STATE_UNEXPECTED;
1084 if (forwarding_port->peer_closed && !ignore_closed_peer)
1085 return ERROR_PORT_PEER_CLOSED;
1086
1087 // Messages may already have a sequence number if they're being forwarded by
1088 // a proxy. Otherwise, use the next outgoing sequence number.
1089 if (message->sequence_num() == 0)
1090 message->set_sequence_num(forwarding_port->next_sequence_num_to_send++);
1091 #if DCHECK_IS_ON()
1092 std::ostringstream ports_buf;
1093 for (size_t i = 0; i < message->num_ports(); ++i) {
1094 if (i > 0)
1095 ports_buf << ",";
1096 ports_buf << message->ports()[i];
1097 }
1098 #endif
1099
1100 if (message->num_ports() > 0) {
1101 // Sanity check to make sure we can actually send all the attached ports.
1102 // They must all be in the |kReceiving| state and must not be the sender's
1103 // own peer.
1104 DCHECK_EQ(message->num_ports(), attached_port_refs.container().size());
1105 for (size_t i = 0; i < message->num_ports(); ++i) {
1106 auto* attached_port = locker.GetPort(attached_port_refs[i]);
1107 int error = OK;
1108 if (attached_port->state != Port::kReceiving) {
1109 error = ERROR_PORT_STATE_UNEXPECTED;
1110 } else if (attached_port_refs[i].name() ==
1111 forwarding_port->peer_port_name) {
1112 error = ERROR_PORT_CANNOT_SEND_PEER;
1113 }
1114
1115 if (error != OK) {
1116 // Not going to send. Backpedal on the sequence number.
1117 forwarding_port->next_sequence_num_to_send--;
1118 return error;
1119 }
1120 }
1121
1122 if (target_is_remote) {
1123 // We only bother to proxy and rewrite ports in the event if it's
1124 // going to be routed to an external node. This substantially reduces
1125 // the amount of port churn in the system, as many port-carrying
1126 // events are routed at least 1 or 2 intra-node hops before (if ever)
1127 // being routed externally.
1128 Event::PortDescriptor* port_descriptors = message->port_descriptors();
1129 for (size_t i = 0; i < message->num_ports(); ++i) {
1130 ConvertToProxy(locker.GetPort(attached_port_refs[i]),
1131 target_node_name, message->ports() + i,
1132 port_descriptors + i);
1133 }
1134 }
1135 }
1136
1137 #if DCHECK_IS_ON()
1138 DVLOG(4) << "Sending message " << message->sequence_num()
1139 << " [ports=" << ports_buf.str() << "]"
1140 << " from " << forwarding_port_ref.name() << "@" << name_ << " to "
1141 << forwarding_port->peer_port_name << "@" << target_node_name;
1142 #endif
1143
1144 *forward_to_node = target_node_name;
1145 message->set_port_name(forwarding_port->peer_port_name);
1146 break;
1147 }
1148
1149 if (target_is_remote) {
1150 for (size_t i = 0; i < message->num_ports(); ++i) {
1151 // For any ports that were converted to proxies above, make sure their
1152 // prior local peer (if applicable) receives a status update so it can be
1153 // made aware of its peer's location.
1154 const Event::PortDescriptor& descriptor = message->port_descriptors()[i];
1155 if (descriptor.peer_node_name == name_) {
1156 PortRef local_peer;
1157 if (GetPort(descriptor.peer_port_name, &local_peer) == OK)
1158 delegate_->PortStatusChanged(local_peer);
1159 }
1160 }
1161 }
1162
1163 return OK;
1164 }
1165
BeginProxying(const PortRef & port_ref)1166 int Node::BeginProxying(const PortRef& port_ref) {
1167 {
1168 SinglePortLocker locker(&port_ref);
1169 auto* port = locker.port();
1170 if (port->state != Port::kBuffering)
1171 return OOPS(ERROR_PORT_STATE_UNEXPECTED);
1172 port->state = Port::kProxying;
1173 }
1174
1175 int rv = ForwardUserMessagesFromProxy(port_ref);
1176 if (rv != OK)
1177 return rv;
1178
1179 bool try_remove_proxy_immediately;
1180 ScopedEvent closure_event;
1181 NodeName closure_target_node;
1182 {
1183 SinglePortLocker locker(&port_ref);
1184 auto* port = locker.port();
1185 if (port->state != Port::kProxying)
1186 return OOPS(ERROR_PORT_STATE_UNEXPECTED);
1187
1188 try_remove_proxy_immediately = port->remove_proxy_on_last_message;
1189 if (try_remove_proxy_immediately) {
1190 // Make sure we propagate closure to our current peer.
1191 closure_target_node = port->peer_node_name;
1192 closure_event = std::make_unique<ObserveClosureEvent>(
1193 port->peer_port_name, port->last_sequence_num_to_receive);
1194 }
1195 }
1196
1197 if (try_remove_proxy_immediately) {
1198 TryRemoveProxy(port_ref);
1199 delegate_->ForwardEvent(closure_target_node, std::move(closure_event));
1200 } else {
1201 InitiateProxyRemoval(port_ref);
1202 }
1203
1204 return OK;
1205 }
1206
ForwardUserMessagesFromProxy(const PortRef & port_ref)1207 int Node::ForwardUserMessagesFromProxy(const PortRef& port_ref) {
1208 for (;;) {
1209 // NOTE: We forward messages in sequential order here so that we maintain
1210 // the message queue's notion of next sequence number. That's useful for the
1211 // proxy removal process as we can tell when this port has seen all of the
1212 // messages it is expected to see.
1213 std::unique_ptr<UserMessageEvent> message;
1214 {
1215 SinglePortLocker locker(&port_ref);
1216 locker.port()->message_queue.GetNextMessage(&message, nullptr);
1217 if (!message)
1218 break;
1219 }
1220
1221 NodeName target_node;
1222 int rv = PrepareToForwardUserMessage(port_ref, Port::kProxying,
1223 true /* ignore_closed_peer */,
1224 message.get(), &target_node);
1225 if (rv != OK)
1226 return rv;
1227
1228 delegate_->ForwardEvent(target_node, std::move(message));
1229 }
1230 return OK;
1231 }
1232
InitiateProxyRemoval(const PortRef & port_ref)1233 void Node::InitiateProxyRemoval(const PortRef& port_ref) {
1234 NodeName peer_node_name;
1235 PortName peer_port_name;
1236 {
1237 SinglePortLocker locker(&port_ref);
1238 auto* port = locker.port();
1239 peer_node_name = port->peer_node_name;
1240 peer_port_name = port->peer_port_name;
1241 }
1242
1243 // To remove this node, we start by notifying the connected graph that we are
1244 // a proxy. This allows whatever port is referencing this node to skip it.
1245 // Eventually, this node will receive ObserveProxyAck (or ObserveClosure if
1246 // the peer was closed in the meantime).
1247 delegate_->ForwardEvent(peer_node_name,
1248 std::make_unique<ObserveProxyEvent>(
1249 peer_port_name, name_, port_ref.name(),
1250 peer_node_name, peer_port_name));
1251 }
1252
TryRemoveProxy(const PortRef & port_ref)1253 void Node::TryRemoveProxy(const PortRef& port_ref) {
1254 bool should_erase = false;
1255 NodeName removal_target_node;
1256 ScopedEvent removal_event;
1257
1258 {
1259 SinglePortLocker locker(&port_ref);
1260 auto* port = locker.port();
1261 DCHECK(port->state == Port::kProxying);
1262
1263 // Make sure we have seen ObserveProxyAck before removing the port.
1264 if (!port->remove_proxy_on_last_message)
1265 return;
1266
1267 if (!CanAcceptMoreMessages(port)) {
1268 should_erase = true;
1269 if (port->send_on_proxy_removal) {
1270 removal_target_node = port->send_on_proxy_removal->first;
1271 removal_event = std::move(port->send_on_proxy_removal->second);
1272 }
1273 } else {
1274 DVLOG(2) << "Cannot remove port " << port_ref.name() << "@" << name_
1275 << " now; waiting for more messages";
1276 }
1277 }
1278
1279 if (should_erase)
1280 ErasePort(port_ref.name());
1281
1282 if (removal_event)
1283 delegate_->ForwardEvent(removal_target_node, std::move(removal_event));
1284 }
1285
DestroyAllPortsWithPeer(const NodeName & node_name,const PortName & port_name)1286 void Node::DestroyAllPortsWithPeer(const NodeName& node_name,
1287 const PortName& port_name) {
1288 // Wipes out all ports whose peer node matches |node_name| and whose peer port
1289 // matches |port_name|. If |port_name| is |kInvalidPortName|, only the peer
1290 // node is matched.
1291
1292 std::vector<PortRef> ports_to_notify;
1293 std::vector<PortName> dead_proxies_to_broadcast;
1294 std::vector<std::unique_ptr<UserMessageEvent>> undelivered_messages;
1295
1296 {
1297 PortLocker::AssertNoPortsLockedOnCurrentThread();
1298 base::AutoLock ports_lock(ports_lock_);
1299
1300 for (auto iter = ports_.begin(); iter != ports_.end(); ++iter) {
1301 PortRef port_ref(iter->first, iter->second);
1302 {
1303 SinglePortLocker locker(&port_ref);
1304 auto* port = locker.port();
1305
1306 if (port->peer_node_name == node_name &&
1307 (port_name == kInvalidPortName ||
1308 port->peer_port_name == port_name)) {
1309 if (!port->peer_closed) {
1310 // Treat this as immediate peer closure. It's an exceptional
1311 // condition akin to a broken pipe, so we don't care about losing
1312 // messages.
1313
1314 port->peer_closed = true;
1315 port->last_sequence_num_to_receive =
1316 port->message_queue.next_sequence_num() - 1;
1317
1318 if (port->state == Port::kReceiving)
1319 ports_to_notify.push_back(PortRef(iter->first, port));
1320 }
1321
1322 // We don't expect to forward any further messages, and we don't
1323 // expect to receive a Port{Accepted,Rejected} event. Because we're
1324 // a proxy with no active peer, we cannot use the normal proxy removal
1325 // procedure of forward-propagating an ObserveProxy. Instead we
1326 // broadcast our own death so it can be back-propagated. This is
1327 // inefficient but rare.
1328 if (port->state != Port::kReceiving) {
1329 dead_proxies_to_broadcast.push_back(iter->first);
1330 std::vector<std::unique_ptr<UserMessageEvent>> messages;
1331 iter->second->message_queue.TakeAllMessages(&messages);
1332 for (auto& message : messages)
1333 undelivered_messages.emplace_back(std::move(message));
1334 }
1335 }
1336 }
1337 }
1338 }
1339
1340 for (const auto& proxy_name : dead_proxies_to_broadcast) {
1341 ErasePort(proxy_name);
1342 DVLOG(2) << "Forcibly deleted port " << proxy_name << "@" << name_;
1343 }
1344
1345 // Wake up any receiving ports who have just observed simulated peer closure.
1346 for (const auto& port : ports_to_notify)
1347 delegate_->PortStatusChanged(port);
1348
1349 for (const auto& proxy_name : dead_proxies_to_broadcast) {
1350 // Broadcast an event signifying that this proxy is no longer functioning.
1351 delegate_->BroadcastEvent(std::make_unique<ObserveProxyEvent>(
1352 kInvalidPortName, name_, proxy_name, kInvalidNodeName,
1353 kInvalidPortName));
1354
1355 // Also process death locally since the port that points this closed one
1356 // could be on the current node.
1357 // Note: Although this is recursive, only a single port is involved which
1358 // limits the expected branching to 1.
1359 DestroyAllPortsWithPeer(name_, proxy_name);
1360 }
1361
1362 // Close any ports referenced by undelivered messages.
1363 for (const auto& message : undelivered_messages) {
1364 for (size_t i = 0; i < message->num_ports(); ++i) {
1365 PortRef ref;
1366 if (GetPort(message->ports()[i], &ref) == OK)
1367 ClosePort(ref);
1368 }
1369 }
1370 }
1371
DelegateHolder(Node * node,NodeDelegate * delegate)1372 Node::DelegateHolder::DelegateHolder(Node* node, NodeDelegate* delegate)
1373 : node_(node), delegate_(delegate) {
1374 DCHECK(node_);
1375 }
1376
~DelegateHolder()1377 Node::DelegateHolder::~DelegateHolder() {}
1378
1379 #if DCHECK_IS_ON()
EnsureSafeDelegateAccess() const1380 void Node::DelegateHolder::EnsureSafeDelegateAccess() const {
1381 PortLocker::AssertNoPortsLockedOnCurrentThread();
1382 base::AutoLock lock(node_->ports_lock_);
1383 }
1384 #endif
1385
1386 } // namespace ports
1387 } // namespace core
1388 } // namespace mojo
1389