1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/edk/system/ports/node.h"
6
7 #include <string.h>
8
9 #include <utility>
10
11 #include "base/logging.h"
12 #include "base/memory/ref_counted.h"
13 #include "base/synchronization/lock.h"
14 #include "mojo/edk/system/ports/node_delegate.h"
15
16 namespace mojo {
17 namespace edk {
18 namespace ports {
19
20 namespace {
21
DebugError(const char * message,int error_code)22 int DebugError(const char* message, int error_code) {
23 CHECK(false) << "Oops: " << message;
24 return error_code;
25 }
26
27 #define OOPS(x) DebugError(#x, x)
28
CanAcceptMoreMessages(const Port * port)29 bool CanAcceptMoreMessages(const Port* port) {
30 // Have we already doled out the last message (i.e., do we expect to NOT
31 // receive further messages)?
32 uint64_t next_sequence_num = port->message_queue.next_sequence_num();
33 if (port->state == Port::kClosed)
34 return false;
35 if (port->peer_closed || port->remove_proxy_on_last_message) {
36 if (port->last_sequence_num_to_receive == next_sequence_num - 1)
37 return false;
38 }
39 return true;
40 }
41
42 } // namespace
43
44 class Node::LockedPort {
45 public:
LockedPort(Port * port)46 explicit LockedPort(Port* port) : port_(port) {
47 port_->lock.AssertAcquired();
48 }
49
get() const50 Port* get() const { return port_; }
operator ->() const51 Port* operator->() const { return port_; }
52
53 private:
54 Port* const port_;
55 };
56
Node(const NodeName & name,NodeDelegate * delegate)57 Node::Node(const NodeName& name, NodeDelegate* delegate)
58 : name_(name),
59 delegate_(delegate) {
60 }
61
~Node()62 Node::~Node() {
63 if (!ports_.empty())
64 DLOG(WARNING) << "Unclean shutdown for node " << name_;
65 }
66
CanShutdownCleanly(bool allow_local_ports)67 bool Node::CanShutdownCleanly(bool allow_local_ports) {
68 base::AutoLock ports_lock(ports_lock_);
69
70 if (!allow_local_ports) {
71 #if DCHECK_IS_ON()
72 for (auto entry : ports_) {
73 DVLOG(2) << "Port " << entry.first << " referencing node "
74 << entry.second->peer_node_name << " is blocking shutdown of "
75 << "node " << name_ << " (state=" << entry.second->state << ")";
76 }
77 #endif
78 return ports_.empty();
79 }
80
81 // NOTE: This is not efficient, though it probably doesn't need to be since
82 // relatively few ports should be open during shutdown and shutdown doesn't
83 // need to be blazingly fast.
84 bool can_shutdown = true;
85 for (auto entry : ports_) {
86 base::AutoLock lock(entry.second->lock);
87 if (entry.second->peer_node_name != name_ &&
88 entry.second->state != Port::kReceiving) {
89 can_shutdown = false;
90 #if DCHECK_IS_ON()
91 DVLOG(2) << "Port " << entry.first << " referencing node "
92 << entry.second->peer_node_name << " is blocking shutdown of "
93 << "node " << name_ << " (state=" << entry.second->state << ")";
94 #else
95 // Exit early when not debugging.
96 break;
97 #endif
98 }
99 }
100
101 return can_shutdown;
102 }
103
GetPort(const PortName & port_name,PortRef * port_ref)104 int Node::GetPort(const PortName& port_name, PortRef* port_ref) {
105 scoped_refptr<Port> port = GetPort(port_name);
106 if (!port)
107 return ERROR_PORT_UNKNOWN;
108
109 *port_ref = PortRef(port_name, std::move(port));
110 return OK;
111 }
112
CreateUninitializedPort(PortRef * port_ref)113 int Node::CreateUninitializedPort(PortRef* port_ref) {
114 PortName port_name;
115 delegate_->GenerateRandomPortName(&port_name);
116
117 scoped_refptr<Port> port = make_scoped_refptr(new Port(kInitialSequenceNum,
118 kInitialSequenceNum));
119 int rv = AddPortWithName(port_name, port);
120 if (rv != OK)
121 return rv;
122
123 *port_ref = PortRef(port_name, std::move(port));
124 return OK;
125 }
126
InitializePort(const PortRef & port_ref,const NodeName & peer_node_name,const PortName & peer_port_name)127 int Node::InitializePort(const PortRef& port_ref,
128 const NodeName& peer_node_name,
129 const PortName& peer_port_name) {
130 Port* port = port_ref.port();
131
132 {
133 base::AutoLock lock(port->lock);
134 if (port->state != Port::kUninitialized)
135 return ERROR_PORT_STATE_UNEXPECTED;
136
137 port->state = Port::kReceiving;
138 port->peer_node_name = peer_node_name;
139 port->peer_port_name = peer_port_name;
140 }
141
142 delegate_->PortStatusChanged(port_ref);
143
144 return OK;
145 }
146
CreatePortPair(PortRef * port0_ref,PortRef * port1_ref)147 int Node::CreatePortPair(PortRef* port0_ref, PortRef* port1_ref) {
148 int rv;
149
150 rv = CreateUninitializedPort(port0_ref);
151 if (rv != OK)
152 return rv;
153
154 rv = CreateUninitializedPort(port1_ref);
155 if (rv != OK)
156 return rv;
157
158 rv = InitializePort(*port0_ref, name_, port1_ref->name());
159 if (rv != OK)
160 return rv;
161
162 rv = InitializePort(*port1_ref, name_, port0_ref->name());
163 if (rv != OK)
164 return rv;
165
166 return OK;
167 }
168
SetUserData(const PortRef & port_ref,const scoped_refptr<UserData> & user_data)169 int Node::SetUserData(const PortRef& port_ref,
170 const scoped_refptr<UserData>& user_data) {
171 Port* port = port_ref.port();
172
173 base::AutoLock lock(port->lock);
174 if (port->state == Port::kClosed)
175 return ERROR_PORT_STATE_UNEXPECTED;
176
177 port->user_data = std::move(user_data);
178
179 return OK;
180 }
181
GetUserData(const PortRef & port_ref,scoped_refptr<UserData> * user_data)182 int Node::GetUserData(const PortRef& port_ref,
183 scoped_refptr<UserData>* user_data) {
184 Port* port = port_ref.port();
185
186 base::AutoLock lock(port->lock);
187 if (port->state == Port::kClosed)
188 return ERROR_PORT_STATE_UNEXPECTED;
189
190 *user_data = port->user_data;
191
192 return OK;
193 }
194
ClosePort(const PortRef & port_ref)195 int Node::ClosePort(const PortRef& port_ref) {
196 std::deque<PortName> referenced_port_names;
197
198 ObserveClosureEventData data;
199
200 NodeName peer_node_name;
201 PortName peer_port_name;
202 Port* port = port_ref.port();
203 {
204 // We may need to erase the port, which requires ports_lock_ to be held,
205 // but ports_lock_ must be acquired before any individual port locks.
206 base::AutoLock ports_lock(ports_lock_);
207
208 base::AutoLock lock(port->lock);
209 if (port->state == Port::kUninitialized) {
210 // If the port was not yet initialized, there's nothing interesting to do.
211 ErasePort_Locked(port_ref.name());
212 return OK;
213 }
214
215 if (port->state != Port::kReceiving)
216 return ERROR_PORT_STATE_UNEXPECTED;
217
218 port->state = Port::kClosed;
219
220 // We pass along the sequence number of the last message sent from this
221 // port to allow the peer to have the opportunity to consume all inbound
222 // messages before notifying the embedder that this port is closed.
223 data.last_sequence_num = port->next_sequence_num_to_send - 1;
224
225 peer_node_name = port->peer_node_name;
226 peer_port_name = port->peer_port_name;
227
228 // If the port being closed still has unread messages, then we need to take
229 // care to close those ports so as to avoid leaking memory.
230 port->message_queue.GetReferencedPorts(&referenced_port_names);
231
232 ErasePort_Locked(port_ref.name());
233 }
234
235 DVLOG(2) << "Sending ObserveClosure from " << port_ref.name() << "@" << name_
236 << " to " << peer_port_name << "@" << peer_node_name;
237
238 delegate_->ForwardMessage(
239 peer_node_name,
240 NewInternalMessage(peer_port_name, EventType::kObserveClosure, data));
241
242 for (const auto& name : referenced_port_names) {
243 PortRef ref;
244 if (GetPort(name, &ref) == OK)
245 ClosePort(ref);
246 }
247 return OK;
248 }
249
GetStatus(const PortRef & port_ref,PortStatus * port_status)250 int Node::GetStatus(const PortRef& port_ref, PortStatus* port_status) {
251 Port* port = port_ref.port();
252
253 base::AutoLock lock(port->lock);
254
255 if (port->state != Port::kReceiving)
256 return ERROR_PORT_STATE_UNEXPECTED;
257
258 port_status->has_messages = port->message_queue.HasNextMessage();
259 port_status->receiving_messages = CanAcceptMoreMessages(port);
260 port_status->peer_closed = port->peer_closed;
261 return OK;
262 }
263
GetMessage(const PortRef & port_ref,ScopedMessage * message)264 int Node::GetMessage(const PortRef& port_ref, ScopedMessage* message) {
265 return GetMessageIf(port_ref, nullptr, message);
266 }
267
GetMessageIf(const PortRef & port_ref,std::function<bool (const Message &)> selector,ScopedMessage * message)268 int Node::GetMessageIf(const PortRef& port_ref,
269 std::function<bool(const Message&)> selector,
270 ScopedMessage* message) {
271 *message = nullptr;
272
273 DVLOG(2) << "GetMessageIf for " << port_ref.name() << "@" << name_;
274
275 Port* port = port_ref.port();
276 {
277 base::AutoLock lock(port->lock);
278
279 // This could also be treated like the port being unknown since the
280 // embedder should no longer be referring to a port that has been sent.
281 if (port->state != Port::kReceiving)
282 return ERROR_PORT_STATE_UNEXPECTED;
283
284 // Let the embedder get messages until there are no more before reporting
285 // that the peer closed its end.
286 if (!CanAcceptMoreMessages(port))
287 return ERROR_PORT_PEER_CLOSED;
288
289 port->message_queue.GetNextMessageIf(std::move(selector), message);
290 }
291
292 // Allow referenced ports to trigger PortStatusChanged calls.
293 if (*message) {
294 for (size_t i = 0; i < (*message)->num_ports(); ++i) {
295 const PortName& new_port_name = (*message)->ports()[i];
296 scoped_refptr<Port> new_port = GetPort(new_port_name);
297
298 DCHECK(new_port) << "Port " << new_port_name << "@" << name_
299 << " does not exist!";
300
301 base::AutoLock lock(new_port->lock);
302
303 DCHECK(new_port->state == Port::kReceiving);
304 new_port->message_queue.set_signalable(true);
305 }
306 }
307
308 return OK;
309 }
310
SendMessage(const PortRef & port_ref,ScopedMessage message)311 int Node::SendMessage(const PortRef& port_ref, ScopedMessage message) {
312 int rv = SendMessageInternal(port_ref, &message);
313 if (rv != OK) {
314 // If send failed, close all carried ports. Note that we're careful not to
315 // close the sending port itself if it happened to be one of the encoded
316 // ports (an invalid but possible condition.)
317 for (size_t i = 0; i < message->num_ports(); ++i) {
318 if (message->ports()[i] == port_ref.name())
319 continue;
320
321 PortRef port;
322 if (GetPort(message->ports()[i], &port) == OK)
323 ClosePort(port);
324 }
325 }
326 return rv;
327 }
328
AcceptMessage(ScopedMessage message)329 int Node::AcceptMessage(ScopedMessage message) {
330 const EventHeader* header = GetEventHeader(*message);
331 switch (header->type) {
332 case EventType::kUser:
333 return OnUserMessage(std::move(message));
334 case EventType::kPortAccepted:
335 return OnPortAccepted(header->port_name);
336 case EventType::kObserveProxy:
337 return OnObserveProxy(
338 header->port_name,
339 *GetEventData<ObserveProxyEventData>(*message));
340 case EventType::kObserveProxyAck:
341 return OnObserveProxyAck(
342 header->port_name,
343 GetEventData<ObserveProxyAckEventData>(*message)->last_sequence_num);
344 case EventType::kObserveClosure:
345 return OnObserveClosure(
346 header->port_name,
347 GetEventData<ObserveClosureEventData>(*message)->last_sequence_num);
348 case EventType::kMergePort:
349 return OnMergePort(header->port_name,
350 *GetEventData<MergePortEventData>(*message));
351 }
352 return OOPS(ERROR_NOT_IMPLEMENTED);
353 }
354
MergePorts(const PortRef & port_ref,const NodeName & destination_node_name,const PortName & destination_port_name)355 int Node::MergePorts(const PortRef& port_ref,
356 const NodeName& destination_node_name,
357 const PortName& destination_port_name) {
358 Port* port = port_ref.port();
359 MergePortEventData data;
360 {
361 base::AutoLock lock(port->lock);
362
363 DVLOG(1) << "Sending MergePort from " << port_ref.name() << "@" << name_
364 << " to " << destination_port_name << "@" << destination_node_name;
365
366 // Send the port-to-merge over to the destination node so it can be merged
367 // into the port cycle atomically there.
368 data.new_port_name = port_ref.name();
369 WillSendPort(LockedPort(port), destination_node_name, &data.new_port_name,
370 &data.new_port_descriptor);
371 }
372 delegate_->ForwardMessage(
373 destination_node_name,
374 NewInternalMessage(destination_port_name,
375 EventType::kMergePort, data));
376 return OK;
377 }
378
MergeLocalPorts(const PortRef & port0_ref,const PortRef & port1_ref)379 int Node::MergeLocalPorts(const PortRef& port0_ref, const PortRef& port1_ref) {
380 Port* port0 = port0_ref.port();
381 Port* port1 = port1_ref.port();
382 int rv;
383 {
384 // |ports_lock_| must be held when acquiring overlapping port locks.
385 base::AutoLock ports_lock(ports_lock_);
386 base::AutoLock port0_lock(port0->lock);
387 base::AutoLock port1_lock(port1->lock);
388
389 DVLOG(1) << "Merging local ports " << port0_ref.name() << "@" << name_
390 << " and " << port1_ref.name() << "@" << name_;
391
392 if (port0->state != Port::kReceiving || port1->state != Port::kReceiving)
393 rv = ERROR_PORT_STATE_UNEXPECTED;
394 else
395 rv = MergePorts_Locked(port0_ref, port1_ref);
396 }
397
398 if (rv != OK) {
399 ClosePort(port0_ref);
400 ClosePort(port1_ref);
401 }
402
403 return rv;
404 }
405
LostConnectionToNode(const NodeName & node_name)406 int Node::LostConnectionToNode(const NodeName& node_name) {
407 // We can no longer send events to the given node. We also can't expect any
408 // PortAccepted events.
409
410 DVLOG(1) << "Observing lost connection from node " << name_
411 << " to node " << node_name;
412
413 DestroyAllPortsWithPeer(node_name, kInvalidPortName);
414 return OK;
415 }
416
OnUserMessage(ScopedMessage message)417 int Node::OnUserMessage(ScopedMessage message) {
418 PortName port_name = GetEventHeader(*message)->port_name;
419 const auto* event = GetEventData<UserEventData>(*message);
420
421 #if DCHECK_IS_ON()
422 std::ostringstream ports_buf;
423 for (size_t i = 0; i < message->num_ports(); ++i) {
424 if (i > 0)
425 ports_buf << ",";
426 ports_buf << message->ports()[i];
427 }
428
429 DVLOG(2) << "AcceptMessage " << event->sequence_num
430 << " [ports=" << ports_buf.str() << "] at "
431 << port_name << "@" << name_;
432 #endif
433
434 scoped_refptr<Port> port = GetPort(port_name);
435
436 // Even if this port does not exist, cannot receive anymore messages or is
437 // buffering or proxying messages, we still need these ports to be bound to
438 // this node. When the message is forwarded, these ports will get transferred
439 // following the usual method. If the message cannot be accepted, then the
440 // newly bound ports will simply be closed.
441
442 for (size_t i = 0; i < message->num_ports(); ++i) {
443 int rv = AcceptPort(message->ports()[i], GetPortDescriptors(event)[i]);
444 if (rv != OK)
445 return rv;
446 }
447
448 bool has_next_message = false;
449 bool message_accepted = false;
450
451 if (port) {
452 // We may want to forward messages once the port lock is held, so we must
453 // acquire |ports_lock_| first.
454 base::AutoLock ports_lock(ports_lock_);
455 base::AutoLock lock(port->lock);
456
457 // Reject spurious messages if we've already received the last expected
458 // message.
459 if (CanAcceptMoreMessages(port.get())) {
460 message_accepted = true;
461 port->message_queue.AcceptMessage(std::move(message), &has_next_message);
462
463 if (port->state == Port::kBuffering) {
464 has_next_message = false;
465 } else if (port->state == Port::kProxying) {
466 has_next_message = false;
467
468 // Forward messages. We forward messages in sequential order here so
469 // that we maintain the message queue's notion of next sequence number.
470 // That's useful for the proxy removal process as we can tell when this
471 // port has seen all of the messages it is expected to see.
472 int rv = ForwardMessages_Locked(LockedPort(port.get()), port_name);
473 if (rv != OK)
474 return rv;
475
476 MaybeRemoveProxy_Locked(LockedPort(port.get()), port_name);
477 }
478 }
479 }
480
481 if (!message_accepted) {
482 DVLOG(2) << "Message not accepted!\n";
483 // Close all newly accepted ports as they are effectively orphaned.
484 for (size_t i = 0; i < message->num_ports(); ++i) {
485 PortRef port_ref;
486 if (GetPort(message->ports()[i], &port_ref) == OK) {
487 ClosePort(port_ref);
488 } else {
489 DLOG(WARNING) << "Cannot close non-existent port!\n";
490 }
491 }
492 } else if (has_next_message) {
493 PortRef port_ref(port_name, port);
494 delegate_->PortStatusChanged(port_ref);
495 }
496
497 return OK;
498 }
499
OnPortAccepted(const PortName & port_name)500 int Node::OnPortAccepted(const PortName& port_name) {
501 scoped_refptr<Port> port = GetPort(port_name);
502 if (!port)
503 return ERROR_PORT_UNKNOWN;
504
505 DVLOG(2) << "PortAccepted at " << port_name << "@" << name_
506 << " pointing to "
507 << port->peer_port_name << "@" << port->peer_node_name;
508
509 return BeginProxying(PortRef(port_name, port));
510 }
511
OnObserveProxy(const PortName & port_name,const ObserveProxyEventData & event)512 int Node::OnObserveProxy(const PortName& port_name,
513 const ObserveProxyEventData& event) {
514 if (port_name == kInvalidPortName) {
515 // An ObserveProxy with an invalid target port name is a broadcast used to
516 // inform ports when their peer (which was itself a proxy) has become
517 // defunct due to unexpected node disconnection.
518 //
519 // Receiving ports affected by this treat it as equivalent to peer closure.
520 // Proxies affected by this can be removed and will in turn broadcast their
521 // own death with a similar message.
522 CHECK_EQ(event.proxy_to_node_name, kInvalidNodeName);
523 CHECK_EQ(event.proxy_to_port_name, kInvalidPortName);
524 DestroyAllPortsWithPeer(event.proxy_node_name, event.proxy_port_name);
525 return OK;
526 }
527
528 // The port may have already been closed locally, in which case the
529 // ObserveClosure message will contain the last_sequence_num field.
530 // We can then silently ignore this message.
531 scoped_refptr<Port> port = GetPort(port_name);
532 if (!port) {
533 DVLOG(1) << "ObserveProxy: " << port_name << "@" << name_ << " not found";
534
535 if (port_name != event.proxy_port_name &&
536 port_name != event.proxy_to_port_name) {
537 // The receiving port may have been removed while this message was in
538 // transit. In this case, we restart the ObserveProxy circulation from
539 // the referenced proxy port to avoid leaking the proxy.
540 delegate_->ForwardMessage(
541 event.proxy_node_name,
542 NewInternalMessage(
543 event.proxy_port_name, EventType::kObserveProxy, event));
544 }
545 return OK;
546 }
547
548 DVLOG(2) << "ObserveProxy at " << port_name << "@" << name_ << ", proxy at "
549 << event.proxy_port_name << "@"
550 << event.proxy_node_name << " pointing to "
551 << event.proxy_to_port_name << "@"
552 << event.proxy_to_node_name;
553
554 {
555 base::AutoLock lock(port->lock);
556
557 if (port->peer_node_name == event.proxy_node_name &&
558 port->peer_port_name == event.proxy_port_name) {
559 if (port->state == Port::kReceiving) {
560 port->peer_node_name = event.proxy_to_node_name;
561 port->peer_port_name = event.proxy_to_port_name;
562
563 ObserveProxyAckEventData ack;
564 ack.last_sequence_num = port->next_sequence_num_to_send - 1;
565
566 delegate_->ForwardMessage(
567 event.proxy_node_name,
568 NewInternalMessage(event.proxy_port_name,
569 EventType::kObserveProxyAck,
570 ack));
571 } else {
572 // As a proxy ourselves, we don't know how to honor the ObserveProxy
573 // event or to populate the last_sequence_num field of ObserveProxyAck.
574 // Afterall, another port could be sending messages to our peer now
575 // that we've sent out our own ObserveProxy event. Instead, we will
576 // send an ObserveProxyAck indicating that the ObserveProxy event
577 // should be re-sent (last_sequence_num set to kInvalidSequenceNum).
578 // However, this has to be done after we are removed as a proxy.
579 // Otherwise, we might just find ourselves back here again, which
580 // would be akin to a busy loop.
581
582 DVLOG(2) << "Delaying ObserveProxyAck to "
583 << event.proxy_port_name << "@" << event.proxy_node_name;
584
585 ObserveProxyAckEventData ack;
586 ack.last_sequence_num = kInvalidSequenceNum;
587
588 port->send_on_proxy_removal.reset(
589 new std::pair<NodeName, ScopedMessage>(
590 event.proxy_node_name,
591 NewInternalMessage(event.proxy_port_name,
592 EventType::kObserveProxyAck,
593 ack)));
594 }
595 } else {
596 // Forward this event along to our peer. Eventually, it should find the
597 // port referring to the proxy.
598 delegate_->ForwardMessage(
599 port->peer_node_name,
600 NewInternalMessage(port->peer_port_name,
601 EventType::kObserveProxy,
602 event));
603 }
604 }
605 return OK;
606 }
607
OnObserveProxyAck(const PortName & port_name,uint64_t last_sequence_num)608 int Node::OnObserveProxyAck(const PortName& port_name,
609 uint64_t last_sequence_num) {
610 DVLOG(2) << "ObserveProxyAck at " << port_name << "@" << name_
611 << " (last_sequence_num=" << last_sequence_num << ")";
612
613 scoped_refptr<Port> port = GetPort(port_name);
614 if (!port)
615 return ERROR_PORT_UNKNOWN; // The port may have observed closure first, so
616 // this is not an "Oops".
617
618 {
619 base::AutoLock lock(port->lock);
620
621 if (port->state != Port::kProxying)
622 return OOPS(ERROR_PORT_STATE_UNEXPECTED);
623
624 if (last_sequence_num == kInvalidSequenceNum) {
625 // Send again.
626 InitiateProxyRemoval(LockedPort(port.get()), port_name);
627 return OK;
628 }
629
630 // We can now remove this port once we have received and forwarded the last
631 // message addressed to this port.
632 port->remove_proxy_on_last_message = true;
633 port->last_sequence_num_to_receive = last_sequence_num;
634 }
635 TryRemoveProxy(PortRef(port_name, port));
636 return OK;
637 }
638
OnObserveClosure(const PortName & port_name,uint64_t last_sequence_num)639 int Node::OnObserveClosure(const PortName& port_name,
640 uint64_t last_sequence_num) {
641 // OK if the port doesn't exist, as it may have been closed already.
642 scoped_refptr<Port> port = GetPort(port_name);
643 if (!port)
644 return OK;
645
646 // This message tells the port that it should no longer expect more messages
647 // beyond last_sequence_num. This message is forwarded along until we reach
648 // the receiving end, and this message serves as an equivalent to
649 // ObserveProxyAck.
650
651 bool notify_delegate = false;
652 ObserveClosureEventData forwarded_data;
653 NodeName peer_node_name;
654 PortName peer_port_name;
655 bool try_remove_proxy = false;
656 {
657 base::AutoLock lock(port->lock);
658
659 port->peer_closed = true;
660 port->last_sequence_num_to_receive = last_sequence_num;
661
662 DVLOG(2) << "ObserveClosure at " << port_name << "@" << name_
663 << " (state=" << port->state << ") pointing to "
664 << port->peer_port_name << "@" << port->peer_node_name
665 << " (last_sequence_num=" << last_sequence_num << ")";
666
667 // We always forward ObserveClosure, even beyond the receiving port which
668 // cares about it. This ensures that any dead-end proxies beyond that port
669 // are notified to remove themselves.
670
671 if (port->state == Port::kReceiving) {
672 notify_delegate = true;
673
674 // When forwarding along the other half of the port cycle, this will only
675 // reach dead-end proxies. Tell them we've sent our last message so they
676 // can go away.
677 //
678 // TODO: Repurposing ObserveClosure for this has the desired result but
679 // may be semantically confusing since the forwarding port is not actually
680 // closed. Consider replacing this with a new event type.
681 forwarded_data.last_sequence_num = port->next_sequence_num_to_send - 1;
682 } else {
683 // We haven't yet reached the receiving peer of the closed port, so
684 // forward the message along as-is.
685 forwarded_data.last_sequence_num = last_sequence_num;
686
687 // See about removing the port if it is a proxy as our peer won't be able
688 // to participate in proxy removal.
689 port->remove_proxy_on_last_message = true;
690 if (port->state == Port::kProxying)
691 try_remove_proxy = true;
692 }
693
694 DVLOG(2) << "Forwarding ObserveClosure from "
695 << port_name << "@" << name_ << " to peer "
696 << port->peer_port_name << "@" << port->peer_node_name
697 << " (last_sequence_num=" << forwarded_data.last_sequence_num
698 << ")";
699
700 peer_node_name = port->peer_node_name;
701 peer_port_name = port->peer_port_name;
702 }
703 if (try_remove_proxy)
704 TryRemoveProxy(PortRef(port_name, port));
705
706 delegate_->ForwardMessage(
707 peer_node_name,
708 NewInternalMessage(peer_port_name, EventType::kObserveClosure,
709 forwarded_data));
710
711 if (notify_delegate) {
712 PortRef port_ref(port_name, port);
713 delegate_->PortStatusChanged(port_ref);
714 }
715 return OK;
716 }
717
OnMergePort(const PortName & port_name,const MergePortEventData & event)718 int Node::OnMergePort(const PortName& port_name,
719 const MergePortEventData& event) {
720 scoped_refptr<Port> port = GetPort(port_name);
721
722 DVLOG(1) << "MergePort at " << port_name << "@" << name_ << " (state="
723 << (port ? port->state : -1) << ") merging with proxy "
724 << event.new_port_name
725 << "@" << name_ << " pointing to "
726 << event.new_port_descriptor.peer_port_name << "@"
727 << event.new_port_descriptor.peer_node_name << " referred by "
728 << event.new_port_descriptor.referring_port_name << "@"
729 << event.new_port_descriptor.referring_node_name;
730
731 bool close_target_port = false;
732 bool close_new_port = false;
733
734 // Accept the new port. This is now the receiving end of the other port cycle
735 // to be merged with ours.
736 int rv = AcceptPort(event.new_port_name, event.new_port_descriptor);
737 if (rv != OK) {
738 close_target_port = true;
739 } else if (port) {
740 // BeginProxying_Locked may call MaybeRemoveProxy_Locked, which in turn
741 // needs to hold |ports_lock_|. We also acquire multiple port locks within.
742 base::AutoLock ports_lock(ports_lock_);
743 base::AutoLock lock(port->lock);
744
745 if (port->state != Port::kReceiving) {
746 close_new_port = true;
747 } else {
748 scoped_refptr<Port> new_port = GetPort_Locked(event.new_port_name);
749 base::AutoLock new_port_lock(new_port->lock);
750 DCHECK(new_port->state == Port::kReceiving);
751
752 // Both ports are locked. Now all we have to do is swap their peer
753 // information and set them up as proxies.
754
755 PortRef port0_ref(port_name, port);
756 PortRef port1_ref(event.new_port_name, new_port);
757 int rv = MergePorts_Locked(port0_ref, port1_ref);
758 if (rv == OK)
759 return rv;
760
761 close_new_port = true;
762 close_target_port = true;
763 }
764 } else {
765 close_new_port = true;
766 }
767
768 if (close_target_port) {
769 PortRef target_port;
770 rv = GetPort(port_name, &target_port);
771 DCHECK(rv == OK);
772
773 ClosePort(target_port);
774 }
775
776 if (close_new_port) {
777 PortRef new_port;
778 rv = GetPort(event.new_port_name, &new_port);
779 DCHECK(rv == OK);
780
781 ClosePort(new_port);
782 }
783
784 return ERROR_PORT_STATE_UNEXPECTED;
785 }
786
AddPortWithName(const PortName & port_name,const scoped_refptr<Port> & port)787 int Node::AddPortWithName(const PortName& port_name,
788 const scoped_refptr<Port>& port) {
789 base::AutoLock lock(ports_lock_);
790
791 if (!ports_.insert(std::make_pair(port_name, port)).second)
792 return OOPS(ERROR_PORT_EXISTS); // Suggests a bad UUID generator.
793
794 DVLOG(2) << "Created port " << port_name << "@" << name_;
795 return OK;
796 }
797
ErasePort(const PortName & port_name)798 void Node::ErasePort(const PortName& port_name) {
799 base::AutoLock lock(ports_lock_);
800 ErasePort_Locked(port_name);
801 }
802
ErasePort_Locked(const PortName & port_name)803 void Node::ErasePort_Locked(const PortName& port_name) {
804 ports_lock_.AssertAcquired();
805 ports_.erase(port_name);
806 DVLOG(2) << "Deleted port " << port_name << "@" << name_;
807 }
808
GetPort(const PortName & port_name)809 scoped_refptr<Port> Node::GetPort(const PortName& port_name) {
810 base::AutoLock lock(ports_lock_);
811 return GetPort_Locked(port_name);
812 }
813
GetPort_Locked(const PortName & port_name)814 scoped_refptr<Port> Node::GetPort_Locked(const PortName& port_name) {
815 ports_lock_.AssertAcquired();
816 auto iter = ports_.find(port_name);
817 if (iter == ports_.end())
818 return nullptr;
819
820 return iter->second;
821 }
822
SendMessageInternal(const PortRef & port_ref,ScopedMessage * message)823 int Node::SendMessageInternal(const PortRef& port_ref, ScopedMessage* message) {
824 ScopedMessage& m = *message;
825 for (size_t i = 0; i < m->num_ports(); ++i) {
826 if (m->ports()[i] == port_ref.name())
827 return ERROR_PORT_CANNOT_SEND_SELF;
828 }
829
830 Port* port = port_ref.port();
831 NodeName peer_node_name;
832 {
833 // We must acquire |ports_lock_| before grabbing any port locks, because
834 // WillSendMessage_Locked may need to lock multiple ports out of order.
835 base::AutoLock ports_lock(ports_lock_);
836 base::AutoLock lock(port->lock);
837
838 if (port->state != Port::kReceiving)
839 return ERROR_PORT_STATE_UNEXPECTED;
840
841 if (port->peer_closed)
842 return ERROR_PORT_PEER_CLOSED;
843
844 int rv = WillSendMessage_Locked(LockedPort(port), port_ref.name(), m.get());
845 if (rv != OK)
846 return rv;
847
848 // Beyond this point there's no sense in returning anything but OK. Even if
849 // message forwarding or acceptance fails, there's nothing the embedder can
850 // do to recover. Assume that failure beyond this point must be treated as a
851 // transport failure.
852
853 peer_node_name = port->peer_node_name;
854 }
855
856 if (peer_node_name != name_) {
857 delegate_->ForwardMessage(peer_node_name, std::move(m));
858 return OK;
859 }
860
861 int rv = AcceptMessage(std::move(m));
862 if (rv != OK) {
863 // See comment above for why we don't return an error in this case.
864 DVLOG(2) << "AcceptMessage failed: " << rv;
865 }
866
867 return OK;
868 }
869
MergePorts_Locked(const PortRef & port0_ref,const PortRef & port1_ref)870 int Node::MergePorts_Locked(const PortRef& port0_ref,
871 const PortRef& port1_ref) {
872 Port* port0 = port0_ref.port();
873 Port* port1 = port1_ref.port();
874
875 ports_lock_.AssertAcquired();
876 port0->lock.AssertAcquired();
877 port1->lock.AssertAcquired();
878
879 CHECK(port0->state == Port::kReceiving);
880 CHECK(port1->state == Port::kReceiving);
881
882 // Ports cannot be merged with their own receiving peer!
883 if (port0->peer_node_name == name_ &&
884 port0->peer_port_name == port1_ref.name())
885 return ERROR_PORT_STATE_UNEXPECTED;
886
887 if (port1->peer_node_name == name_ &&
888 port1->peer_port_name == port0_ref.name())
889 return ERROR_PORT_STATE_UNEXPECTED;
890
891 // Only merge if both ports have never sent a message.
892 if (port0->next_sequence_num_to_send == kInitialSequenceNum &&
893 port1->next_sequence_num_to_send == kInitialSequenceNum) {
894 // Swap the ports' peer information and switch them both into buffering
895 // (eventually proxying) mode.
896
897 std::swap(port0->peer_node_name, port1->peer_node_name);
898 std::swap(port0->peer_port_name, port1->peer_port_name);
899
900 port0->state = Port::kBuffering;
901 if (port0->peer_closed)
902 port0->remove_proxy_on_last_message = true;
903
904 port1->state = Port::kBuffering;
905 if (port1->peer_closed)
906 port1->remove_proxy_on_last_message = true;
907
908 int rv1 = BeginProxying_Locked(LockedPort(port0), port0_ref.name());
909 int rv2 = BeginProxying_Locked(LockedPort(port1), port1_ref.name());
910
911 if (rv1 == OK && rv2 == OK) {
912 // If either merged port had a closed peer, its new peer needs to be
913 // informed of this.
914 if (port1->peer_closed) {
915 ObserveClosureEventData data;
916 data.last_sequence_num = port0->last_sequence_num_to_receive;
917 delegate_->ForwardMessage(
918 port0->peer_node_name,
919 NewInternalMessage(port0->peer_port_name,
920 EventType::kObserveClosure, data));
921 }
922
923 if (port0->peer_closed) {
924 ObserveClosureEventData data;
925 data.last_sequence_num = port1->last_sequence_num_to_receive;
926 delegate_->ForwardMessage(
927 port1->peer_node_name,
928 NewInternalMessage(port1->peer_port_name,
929 EventType::kObserveClosure, data));
930 }
931
932 return OK;
933 }
934
935 // If either proxy failed to initialize (e.g. had undeliverable messages
936 // or ended up in a bad state somehow), we keep the system in a consistent
937 // state by undoing the peer swap.
938 std::swap(port0->peer_node_name, port1->peer_node_name);
939 std::swap(port0->peer_port_name, port1->peer_port_name);
940 port0->remove_proxy_on_last_message = false;
941 port1->remove_proxy_on_last_message = false;
942 port0->state = Port::kReceiving;
943 port1->state = Port::kReceiving;
944 }
945
946 return ERROR_PORT_STATE_UNEXPECTED;
947 }
948
WillSendPort(const LockedPort & port,const NodeName & to_node_name,PortName * port_name,PortDescriptor * port_descriptor)949 void Node::WillSendPort(const LockedPort& port,
950 const NodeName& to_node_name,
951 PortName* port_name,
952 PortDescriptor* port_descriptor) {
953 port->lock.AssertAcquired();
954
955 PortName local_port_name = *port_name;
956
957 PortName new_port_name;
958 delegate_->GenerateRandomPortName(&new_port_name);
959
960 // Make sure we don't send messages to the new peer until after we know it
961 // exists. In the meantime, just buffer messages locally.
962 DCHECK(port->state == Port::kReceiving);
963 port->state = Port::kBuffering;
964
965 // If we already know our peer is closed, we already know this proxy can
966 // be removed once it receives and forwards its last expected message.
967 if (port->peer_closed)
968 port->remove_proxy_on_last_message = true;
969
970 *port_name = new_port_name;
971
972 port_descriptor->peer_node_name = port->peer_node_name;
973 port_descriptor->peer_port_name = port->peer_port_name;
974 port_descriptor->referring_node_name = name_;
975 port_descriptor->referring_port_name = local_port_name;
976 port_descriptor->next_sequence_num_to_send = port->next_sequence_num_to_send;
977 port_descriptor->next_sequence_num_to_receive =
978 port->message_queue.next_sequence_num();
979 port_descriptor->last_sequence_num_to_receive =
980 port->last_sequence_num_to_receive;
981 port_descriptor->peer_closed = port->peer_closed;
982 memset(port_descriptor->padding, 0, sizeof(port_descriptor->padding));
983
984 // Configure the local port to point to the new port.
985 port->peer_node_name = to_node_name;
986 port->peer_port_name = new_port_name;
987 }
988
AcceptPort(const PortName & port_name,const PortDescriptor & port_descriptor)989 int Node::AcceptPort(const PortName& port_name,
990 const PortDescriptor& port_descriptor) {
991 scoped_refptr<Port> port = make_scoped_refptr(
992 new Port(port_descriptor.next_sequence_num_to_send,
993 port_descriptor.next_sequence_num_to_receive));
994 port->state = Port::kReceiving;
995 port->peer_node_name = port_descriptor.peer_node_name;
996 port->peer_port_name = port_descriptor.peer_port_name;
997 port->last_sequence_num_to_receive =
998 port_descriptor.last_sequence_num_to_receive;
999 port->peer_closed = port_descriptor.peer_closed;
1000
1001 DVLOG(2) << "Accepting port " << port_name << " [peer_closed="
1002 << port->peer_closed << "; last_sequence_num_to_receive="
1003 << port->last_sequence_num_to_receive << "]";
1004
1005 // A newly accepted port is not signalable until the message referencing the
1006 // new port finds its way to the consumer (see GetMessageIf).
1007 port->message_queue.set_signalable(false);
1008
1009 int rv = AddPortWithName(port_name, port);
1010 if (rv != OK)
1011 return rv;
1012
1013 // Allow referring port to forward messages.
1014 delegate_->ForwardMessage(
1015 port_descriptor.referring_node_name,
1016 NewInternalMessage(port_descriptor.referring_port_name,
1017 EventType::kPortAccepted));
1018 return OK;
1019 }
1020
WillSendMessage_Locked(const LockedPort & port,const PortName & port_name,Message * message)1021 int Node::WillSendMessage_Locked(const LockedPort& port,
1022 const PortName& port_name,
1023 Message* message) {
1024 ports_lock_.AssertAcquired();
1025 port->lock.AssertAcquired();
1026
1027 DCHECK(message);
1028
1029 // Messages may already have a sequence number if they're being forwarded
1030 // by a proxy. Otherwise, use the next outgoing sequence number.
1031 uint64_t* sequence_num =
1032 &GetMutableEventData<UserEventData>(message)->sequence_num;
1033 if (*sequence_num == 0)
1034 *sequence_num = port->next_sequence_num_to_send++;
1035
1036 #if DCHECK_IS_ON()
1037 std::ostringstream ports_buf;
1038 for (size_t i = 0; i < message->num_ports(); ++i) {
1039 if (i > 0)
1040 ports_buf << ",";
1041 ports_buf << message->ports()[i];
1042 }
1043 #endif
1044
1045 if (message->num_ports() > 0) {
1046 // Note: Another thread could be trying to send the same ports, so we need
1047 // to ensure that they are ours to send before we mutate their state.
1048
1049 std::vector<scoped_refptr<Port>> ports;
1050 ports.resize(message->num_ports());
1051
1052 {
1053 for (size_t i = 0; i < message->num_ports(); ++i) {
1054 ports[i] = GetPort_Locked(message->ports()[i]);
1055 DCHECK(ports[i]);
1056
1057 ports[i]->lock.Acquire();
1058 int error = OK;
1059 if (ports[i]->state != Port::kReceiving)
1060 error = ERROR_PORT_STATE_UNEXPECTED;
1061 else if (message->ports()[i] == port->peer_port_name)
1062 error = ERROR_PORT_CANNOT_SEND_PEER;
1063
1064 if (error != OK) {
1065 // Oops, we cannot send this port.
1066 for (size_t j = 0; j <= i; ++j)
1067 ports[i]->lock.Release();
1068 // Backpedal on the sequence number.
1069 port->next_sequence_num_to_send--;
1070 return error;
1071 }
1072 }
1073 }
1074
1075 PortDescriptor* port_descriptors =
1076 GetMutablePortDescriptors(GetMutableEventData<UserEventData>(message));
1077
1078 for (size_t i = 0; i < message->num_ports(); ++i) {
1079 WillSendPort(LockedPort(ports[i].get()),
1080 port->peer_node_name,
1081 message->mutable_ports() + i,
1082 port_descriptors + i);
1083 }
1084
1085 for (size_t i = 0; i < message->num_ports(); ++i)
1086 ports[i]->lock.Release();
1087 }
1088
1089 #if DCHECK_IS_ON()
1090 DVLOG(2) << "Sending message "
1091 << GetEventData<UserEventData>(*message)->sequence_num
1092 << " [ports=" << ports_buf.str() << "]"
1093 << " from " << port_name << "@" << name_
1094 << " to " << port->peer_port_name << "@" << port->peer_node_name;
1095 #endif
1096
1097 GetMutableEventHeader(message)->port_name = port->peer_port_name;
1098 return OK;
1099 }
1100
BeginProxying_Locked(const LockedPort & port,const PortName & port_name)1101 int Node::BeginProxying_Locked(const LockedPort& port,
1102 const PortName& port_name) {
1103 ports_lock_.AssertAcquired();
1104 port->lock.AssertAcquired();
1105
1106 if (port->state != Port::kBuffering)
1107 return OOPS(ERROR_PORT_STATE_UNEXPECTED);
1108
1109 port->state = Port::kProxying;
1110
1111 int rv = ForwardMessages_Locked(LockedPort(port), port_name);
1112 if (rv != OK)
1113 return rv;
1114
1115 // We may have observed closure while buffering. In that case, we can advance
1116 // to removing the proxy without sending out an ObserveProxy message. We
1117 // already know the last expected message, etc.
1118
1119 if (port->remove_proxy_on_last_message) {
1120 MaybeRemoveProxy_Locked(LockedPort(port), port_name);
1121
1122 // Make sure we propagate closure to our current peer.
1123 ObserveClosureEventData data;
1124 data.last_sequence_num = port->last_sequence_num_to_receive;
1125 delegate_->ForwardMessage(
1126 port->peer_node_name,
1127 NewInternalMessage(port->peer_port_name,
1128 EventType::kObserveClosure, data));
1129 } else {
1130 InitiateProxyRemoval(LockedPort(port), port_name);
1131 }
1132
1133 return OK;
1134 }
1135
BeginProxying(PortRef port_ref)1136 int Node::BeginProxying(PortRef port_ref) {
1137 Port* port = port_ref.port();
1138 {
1139 base::AutoLock ports_lock(ports_lock_);
1140 base::AutoLock lock(port->lock);
1141
1142 if (port->state != Port::kBuffering)
1143 return OOPS(ERROR_PORT_STATE_UNEXPECTED);
1144
1145 port->state = Port::kProxying;
1146
1147 int rv = ForwardMessages_Locked(LockedPort(port), port_ref.name());
1148 if (rv != OK)
1149 return rv;
1150 }
1151
1152 bool should_remove;
1153 NodeName peer_node_name;
1154 ScopedMessage closure_message;
1155 {
1156 base::AutoLock lock(port->lock);
1157 if (port->state != Port::kProxying)
1158 return OOPS(ERROR_PORT_STATE_UNEXPECTED);
1159
1160 should_remove = port->remove_proxy_on_last_message;
1161 if (should_remove) {
1162 // Make sure we propagate closure to our current peer.
1163 ObserveClosureEventData data;
1164 data.last_sequence_num = port->last_sequence_num_to_receive;
1165 peer_node_name = port->peer_node_name;
1166 closure_message = NewInternalMessage(port->peer_port_name,
1167 EventType::kObserveClosure, data);
1168 } else {
1169 InitiateProxyRemoval(LockedPort(port), port_ref.name());
1170 }
1171 }
1172
1173 if (should_remove) {
1174 TryRemoveProxy(port_ref);
1175 delegate_->ForwardMessage(peer_node_name, std::move(closure_message));
1176 }
1177
1178 return OK;
1179 }
1180
ForwardMessages_Locked(const LockedPort & port,const PortName & port_name)1181 int Node::ForwardMessages_Locked(const LockedPort& port,
1182 const PortName &port_name) {
1183 ports_lock_.AssertAcquired();
1184 port->lock.AssertAcquired();
1185
1186 for (;;) {
1187 ScopedMessage message;
1188 port->message_queue.GetNextMessageIf(nullptr, &message);
1189 if (!message)
1190 break;
1191
1192 int rv = WillSendMessage_Locked(LockedPort(port), port_name, message.get());
1193 if (rv != OK)
1194 return rv;
1195
1196 delegate_->ForwardMessage(port->peer_node_name, std::move(message));
1197 }
1198 return OK;
1199 }
1200
InitiateProxyRemoval(const LockedPort & port,const PortName & port_name)1201 void Node::InitiateProxyRemoval(const LockedPort& port,
1202 const PortName& port_name) {
1203 port->lock.AssertAcquired();
1204
1205 // To remove this node, we start by notifying the connected graph that we are
1206 // a proxy. This allows whatever port is referencing this node to skip it.
1207 // Eventually, this node will receive ObserveProxyAck (or ObserveClosure if
1208 // the peer was closed in the meantime).
1209
1210 ObserveProxyEventData data;
1211 data.proxy_node_name = name_;
1212 data.proxy_port_name = port_name;
1213 data.proxy_to_node_name = port->peer_node_name;
1214 data.proxy_to_port_name = port->peer_port_name;
1215
1216 delegate_->ForwardMessage(
1217 port->peer_node_name,
1218 NewInternalMessage(port->peer_port_name, EventType::kObserveProxy, data));
1219 }
1220
MaybeRemoveProxy_Locked(const LockedPort & port,const PortName & port_name)1221 void Node::MaybeRemoveProxy_Locked(const LockedPort& port,
1222 const PortName& port_name) {
1223 // |ports_lock_| must be held so we can potentilaly ErasePort_Locked().
1224 ports_lock_.AssertAcquired();
1225 port->lock.AssertAcquired();
1226
1227 DCHECK(port->state == Port::kProxying);
1228
1229 // Make sure we have seen ObserveProxyAck before removing the port.
1230 if (!port->remove_proxy_on_last_message)
1231 return;
1232
1233 if (!CanAcceptMoreMessages(port.get())) {
1234 // This proxy port is done. We can now remove it!
1235 ErasePort_Locked(port_name);
1236
1237 if (port->send_on_proxy_removal) {
1238 NodeName to_node = port->send_on_proxy_removal->first;
1239 ScopedMessage& message = port->send_on_proxy_removal->second;
1240
1241 delegate_->ForwardMessage(to_node, std::move(message));
1242 port->send_on_proxy_removal.reset();
1243 }
1244 } else {
1245 DVLOG(2) << "Cannot remove port " << port_name << "@" << name_
1246 << " now; waiting for more messages";
1247 }
1248 }
1249
TryRemoveProxy(PortRef port_ref)1250 void Node::TryRemoveProxy(PortRef port_ref) {
1251 Port* port = port_ref.port();
1252 bool should_erase = false;
1253 ScopedMessage msg;
1254 NodeName to_node;
1255 {
1256 base::AutoLock lock(port->lock);
1257
1258 // Port already removed. Nothing to do.
1259 if (port->state == Port::kClosed)
1260 return;
1261
1262 DCHECK(port->state == Port::kProxying);
1263
1264 // Make sure we have seen ObserveProxyAck before removing the port.
1265 if (!port->remove_proxy_on_last_message)
1266 return;
1267
1268 if (!CanAcceptMoreMessages(port)) {
1269 // This proxy port is done. We can now remove it!
1270 should_erase = true;
1271
1272 if (port->send_on_proxy_removal) {
1273 to_node = port->send_on_proxy_removal->first;
1274 msg = std::move(port->send_on_proxy_removal->second);
1275 port->send_on_proxy_removal.reset();
1276 }
1277 } else {
1278 DVLOG(2) << "Cannot remove port " << port_ref.name() << "@" << name_
1279 << " now; waiting for more messages";
1280 }
1281 }
1282
1283 if (should_erase)
1284 ErasePort(port_ref.name());
1285
1286 if (msg)
1287 delegate_->ForwardMessage(to_node, std::move(msg));
1288 }
1289
DestroyAllPortsWithPeer(const NodeName & node_name,const PortName & port_name)1290 void Node::DestroyAllPortsWithPeer(const NodeName& node_name,
1291 const PortName& port_name) {
1292 // Wipes out all ports whose peer node matches |node_name| and whose peer port
1293 // matches |port_name|. If |port_name| is |kInvalidPortName|, only the peer
1294 // node is matched.
1295
1296 std::vector<PortRef> ports_to_notify;
1297 std::vector<PortName> dead_proxies_to_broadcast;
1298 std::deque<PortName> referenced_port_names;
1299
1300 {
1301 base::AutoLock ports_lock(ports_lock_);
1302
1303 for (auto iter = ports_.begin(); iter != ports_.end(); ++iter) {
1304 Port* port = iter->second.get();
1305 {
1306 base::AutoLock port_lock(port->lock);
1307
1308 if (port->peer_node_name == node_name &&
1309 (port_name == kInvalidPortName ||
1310 port->peer_port_name == port_name)) {
1311 if (!port->peer_closed) {
1312 // Treat this as immediate peer closure. It's an exceptional
1313 // condition akin to a broken pipe, so we don't care about losing
1314 // messages.
1315
1316 port->peer_closed = true;
1317 port->last_sequence_num_to_receive =
1318 port->message_queue.next_sequence_num() - 1;
1319
1320 if (port->state == Port::kReceiving)
1321 ports_to_notify.push_back(PortRef(iter->first, port));
1322 }
1323
1324 // We don't expect to forward any further messages, and we don't
1325 // expect to receive a Port{Accepted,Rejected} event. Because we're
1326 // a proxy with no active peer, we cannot use the normal proxy removal
1327 // procedure of forward-propagating an ObserveProxy. Instead we
1328 // broadcast our own death so it can be back-propagated. This is
1329 // inefficient but rare.
1330 if (port->state != Port::kReceiving) {
1331 dead_proxies_to_broadcast.push_back(iter->first);
1332 iter->second->message_queue.GetReferencedPorts(
1333 &referenced_port_names);
1334 }
1335 }
1336 }
1337 }
1338
1339 for (const auto& proxy_name : dead_proxies_to_broadcast) {
1340 ports_.erase(proxy_name);
1341 DVLOG(2) << "Forcibly deleted port " << proxy_name << "@" << name_;
1342 }
1343 }
1344
1345 // Wake up any receiving ports who have just observed simulated peer closure.
1346 for (const auto& port : ports_to_notify)
1347 delegate_->PortStatusChanged(port);
1348
1349 for (const auto& proxy_name : dead_proxies_to_broadcast) {
1350 // Broadcast an event signifying that this proxy is no longer functioning.
1351 ObserveProxyEventData event;
1352 event.proxy_node_name = name_;
1353 event.proxy_port_name = proxy_name;
1354 event.proxy_to_node_name = kInvalidNodeName;
1355 event.proxy_to_port_name = kInvalidPortName;
1356 delegate_->BroadcastMessage(NewInternalMessage(
1357 kInvalidPortName, EventType::kObserveProxy, event));
1358
1359 // Also process death locally since the port that points this closed one
1360 // could be on the current node.
1361 // Note: Although this is recursive, only a single port is involved which
1362 // limits the expected branching to 1.
1363 DestroyAllPortsWithPeer(name_, proxy_name);
1364 }
1365
1366 // Close any ports referenced by the closed proxies.
1367 for (const auto& name : referenced_port_names) {
1368 PortRef ref;
1369 if (GetPort(name, &ref) == OK)
1370 ClosePort(ref);
1371 }
1372 }
1373
NewInternalMessage_Helper(const PortName & port_name,const EventType & type,const void * data,size_t num_data_bytes)1374 ScopedMessage Node::NewInternalMessage_Helper(const PortName& port_name,
1375 const EventType& type,
1376 const void* data,
1377 size_t num_data_bytes) {
1378 ScopedMessage message;
1379 delegate_->AllocMessage(sizeof(EventHeader) + num_data_bytes, &message);
1380
1381 EventHeader* header = GetMutableEventHeader(message.get());
1382 header->port_name = port_name;
1383 header->type = type;
1384 header->padding = 0;
1385
1386 if (num_data_bytes)
1387 memcpy(header + 1, data, num_data_bytes);
1388
1389 return message;
1390 }
1391
1392 } // namespace ports
1393 } // namespace edk
1394 } // namespace mojo
1395