1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/edk/system/ports/node.h"
6
7 #include <string.h>
8
9 #include <utility>
10
11 #include "base/atomicops.h"
12 #include "base/logging.h"
13 #include "base/memory/ref_counted.h"
14 #include "base/synchronization/lock.h"
15 #include "mojo/edk/system/ports/node_delegate.h"
16
17 namespace mojo {
18 namespace edk {
19 namespace ports {
20
21 namespace {
22
DebugError(const char * message,int error_code)23 int DebugError(const char* message, int error_code) {
24 CHECK(false) << "Oops: " << message;
25 return error_code;
26 }
27
28 #define OOPS(x) DebugError(#x, x)
29
CanAcceptMoreMessages(const Port * port)30 bool CanAcceptMoreMessages(const Port* port) {
31 // Have we already doled out the last message (i.e., do we expect to NOT
32 // receive further messages)?
33 uint64_t next_sequence_num = port->message_queue.next_sequence_num();
34 if (port->state == Port::kClosed)
35 return false;
36 if (port->peer_closed || port->remove_proxy_on_last_message) {
37 if (port->last_sequence_num_to_receive == next_sequence_num - 1)
38 return false;
39 }
40 return true;
41 }
42
43 } // namespace
44
45 class Node::LockedPort {
46 public:
LockedPort(Port * port)47 explicit LockedPort(Port* port) : port_(port) {
48 port_->lock.AssertAcquired();
49 }
50
get() const51 Port* get() const { return port_; }
operator ->() const52 Port* operator->() const { return port_; }
53
54 private:
55 Port* const port_;
56 };
57
Node(const NodeName & name,NodeDelegate * delegate)58 Node::Node(const NodeName& name, NodeDelegate* delegate)
59 : name_(name),
60 delegate_(delegate) {
61 }
62
~Node()63 Node::~Node() {
64 if (!ports_.empty())
65 DLOG(WARNING) << "Unclean shutdown for node " << name_;
66 }
67
CanShutdownCleanly(ShutdownPolicy policy)68 bool Node::CanShutdownCleanly(ShutdownPolicy policy) {
69 base::AutoLock ports_lock(ports_lock_);
70
71 if (policy == ShutdownPolicy::DONT_ALLOW_LOCAL_PORTS) {
72 #if DCHECK_IS_ON()
73 for (auto entry : ports_) {
74 DVLOG(2) << "Port " << entry.first << " referencing node "
75 << entry.second->peer_node_name << " is blocking shutdown of "
76 << "node " << name_ << " (state=" << entry.second->state << ")";
77 }
78 #endif
79 return ports_.empty();
80 }
81
82 DCHECK_EQ(policy, ShutdownPolicy::ALLOW_LOCAL_PORTS);
83
84 // NOTE: This is not efficient, though it probably doesn't need to be since
85 // relatively few ports should be open during shutdown and shutdown doesn't
86 // need to be blazingly fast.
87 bool can_shutdown = true;
88 for (auto entry : ports_) {
89 base::AutoLock lock(entry.second->lock);
90 if (entry.second->peer_node_name != name_ &&
91 entry.second->state != Port::kReceiving) {
92 can_shutdown = false;
93 #if DCHECK_IS_ON()
94 DVLOG(2) << "Port " << entry.first << " referencing node "
95 << entry.second->peer_node_name << " is blocking shutdown of "
96 << "node " << name_ << " (state=" << entry.second->state << ")";
97 #else
98 // Exit early when not debugging.
99 break;
100 #endif
101 }
102 }
103
104 return can_shutdown;
105 }
106
GetPort(const PortName & port_name,PortRef * port_ref)107 int Node::GetPort(const PortName& port_name, PortRef* port_ref) {
108 scoped_refptr<Port> port = GetPort(port_name);
109 if (!port)
110 return ERROR_PORT_UNKNOWN;
111
112 *port_ref = PortRef(port_name, std::move(port));
113 return OK;
114 }
115
CreateUninitializedPort(PortRef * port_ref)116 int Node::CreateUninitializedPort(PortRef* port_ref) {
117 PortName port_name;
118 delegate_->GenerateRandomPortName(&port_name);
119
120 scoped_refptr<Port> port(new Port(kInitialSequenceNum, kInitialSequenceNum));
121 int rv = AddPortWithName(port_name, port);
122 if (rv != OK)
123 return rv;
124
125 *port_ref = PortRef(port_name, std::move(port));
126 return OK;
127 }
128
InitializePort(const PortRef & port_ref,const NodeName & peer_node_name,const PortName & peer_port_name)129 int Node::InitializePort(const PortRef& port_ref,
130 const NodeName& peer_node_name,
131 const PortName& peer_port_name) {
132 Port* port = port_ref.port();
133
134 {
135 base::AutoLock lock(port->lock);
136 if (port->state != Port::kUninitialized)
137 return ERROR_PORT_STATE_UNEXPECTED;
138
139 port->state = Port::kReceiving;
140 port->peer_node_name = peer_node_name;
141 port->peer_port_name = peer_port_name;
142 }
143
144 delegate_->PortStatusChanged(port_ref);
145
146 return OK;
147 }
148
CreatePortPair(PortRef * port0_ref,PortRef * port1_ref)149 int Node::CreatePortPair(PortRef* port0_ref, PortRef* port1_ref) {
150 int rv;
151
152 rv = CreateUninitializedPort(port0_ref);
153 if (rv != OK)
154 return rv;
155
156 rv = CreateUninitializedPort(port1_ref);
157 if (rv != OK)
158 return rv;
159
160 rv = InitializePort(*port0_ref, name_, port1_ref->name());
161 if (rv != OK)
162 return rv;
163
164 rv = InitializePort(*port1_ref, name_, port0_ref->name());
165 if (rv != OK)
166 return rv;
167
168 return OK;
169 }
170
SetUserData(const PortRef & port_ref,scoped_refptr<UserData> user_data)171 int Node::SetUserData(const PortRef& port_ref,
172 scoped_refptr<UserData> user_data) {
173 Port* port = port_ref.port();
174
175 base::AutoLock lock(port->lock);
176 if (port->state == Port::kClosed)
177 return ERROR_PORT_STATE_UNEXPECTED;
178
179 port->user_data = std::move(user_data);
180
181 return OK;
182 }
183
GetUserData(const PortRef & port_ref,scoped_refptr<UserData> * user_data)184 int Node::GetUserData(const PortRef& port_ref,
185 scoped_refptr<UserData>* user_data) {
186 Port* port = port_ref.port();
187
188 base::AutoLock lock(port->lock);
189 if (port->state == Port::kClosed)
190 return ERROR_PORT_STATE_UNEXPECTED;
191
192 *user_data = port->user_data;
193
194 return OK;
195 }
196
ClosePort(const PortRef & port_ref)197 int Node::ClosePort(const PortRef& port_ref) {
198 std::deque<PortName> referenced_port_names;
199
200 ObserveClosureEventData data;
201
202 NodeName peer_node_name;
203 PortName peer_port_name;
204 Port* port = port_ref.port();
205 {
206 // We may need to erase the port, which requires ports_lock_ to be held,
207 // but ports_lock_ must be acquired before any individual port locks.
208 base::AutoLock ports_lock(ports_lock_);
209
210 base::AutoLock lock(port->lock);
211 if (port->state == Port::kUninitialized) {
212 // If the port was not yet initialized, there's nothing interesting to do.
213 ErasePort_Locked(port_ref.name());
214 return OK;
215 }
216
217 if (port->state != Port::kReceiving)
218 return ERROR_PORT_STATE_UNEXPECTED;
219
220 port->state = Port::kClosed;
221
222 // We pass along the sequence number of the last message sent from this
223 // port to allow the peer to have the opportunity to consume all inbound
224 // messages before notifying the embedder that this port is closed.
225 data.last_sequence_num = port->next_sequence_num_to_send - 1;
226
227 peer_node_name = port->peer_node_name;
228 peer_port_name = port->peer_port_name;
229
230 // If the port being closed still has unread messages, then we need to take
231 // care to close those ports so as to avoid leaking memory.
232 port->message_queue.GetReferencedPorts(&referenced_port_names);
233
234 ErasePort_Locked(port_ref.name());
235 }
236
237 DVLOG(2) << "Sending ObserveClosure from " << port_ref.name() << "@" << name_
238 << " to " << peer_port_name << "@" << peer_node_name;
239
240 delegate_->ForwardMessage(
241 peer_node_name,
242 NewInternalMessage(peer_port_name, EventType::kObserveClosure, data));
243
244 for (const auto& name : referenced_port_names) {
245 PortRef ref;
246 if (GetPort(name, &ref) == OK)
247 ClosePort(ref);
248 }
249 return OK;
250 }
251
GetStatus(const PortRef & port_ref,PortStatus * port_status)252 int Node::GetStatus(const PortRef& port_ref, PortStatus* port_status) {
253 Port* port = port_ref.port();
254
255 base::AutoLock lock(port->lock);
256
257 if (port->state != Port::kReceiving)
258 return ERROR_PORT_STATE_UNEXPECTED;
259
260 port_status->has_messages = port->message_queue.HasNextMessage();
261 port_status->receiving_messages = CanAcceptMoreMessages(port);
262 port_status->peer_closed = port->peer_closed;
263 return OK;
264 }
265
GetMessage(const PortRef & port_ref,ScopedMessage * message,MessageFilter * filter)266 int Node::GetMessage(const PortRef& port_ref,
267 ScopedMessage* message,
268 MessageFilter* filter) {
269 *message = nullptr;
270
271 DVLOG(4) << "GetMessage for " << port_ref.name() << "@" << name_;
272
273 Port* port = port_ref.port();
274 {
275 base::AutoLock lock(port->lock);
276
277 // This could also be treated like the port being unknown since the
278 // embedder should no longer be referring to a port that has been sent.
279 if (port->state != Port::kReceiving)
280 return ERROR_PORT_STATE_UNEXPECTED;
281
282 // Let the embedder get messages until there are no more before reporting
283 // that the peer closed its end.
284 if (!CanAcceptMoreMessages(port))
285 return ERROR_PORT_PEER_CLOSED;
286
287 port->message_queue.GetNextMessage(message, filter);
288 }
289
290 // Allow referenced ports to trigger PortStatusChanged calls.
291 if (*message) {
292 for (size_t i = 0; i < (*message)->num_ports(); ++i) {
293 const PortName& new_port_name = (*message)->ports()[i];
294 scoped_refptr<Port> new_port = GetPort(new_port_name);
295
296 DCHECK(new_port) << "Port " << new_port_name << "@" << name_
297 << " does not exist!";
298
299 base::AutoLock lock(new_port->lock);
300
301 DCHECK(new_port->state == Port::kReceiving);
302 new_port->message_queue.set_signalable(true);
303 }
304 }
305
306 return OK;
307 }
308
SendMessage(const PortRef & port_ref,ScopedMessage message)309 int Node::SendMessage(const PortRef& port_ref, ScopedMessage message) {
310 int rv = SendMessageInternal(port_ref, &message);
311 if (rv != OK) {
312 // If send failed, close all carried ports. Note that we're careful not to
313 // close the sending port itself if it happened to be one of the encoded
314 // ports (an invalid but possible condition.)
315 for (size_t i = 0; i < message->num_ports(); ++i) {
316 if (message->ports()[i] == port_ref.name())
317 continue;
318
319 PortRef port;
320 if (GetPort(message->ports()[i], &port) == OK)
321 ClosePort(port);
322 }
323 }
324 return rv;
325 }
326
AcceptMessage(ScopedMessage message)327 int Node::AcceptMessage(ScopedMessage message) {
328 const EventHeader* header = GetEventHeader(*message);
329 switch (header->type) {
330 case EventType::kUser:
331 return OnUserMessage(std::move(message));
332 case EventType::kPortAccepted:
333 return OnPortAccepted(header->port_name);
334 case EventType::kObserveProxy:
335 return OnObserveProxy(
336 header->port_name,
337 *GetEventData<ObserveProxyEventData>(*message));
338 case EventType::kObserveProxyAck:
339 return OnObserveProxyAck(
340 header->port_name,
341 GetEventData<ObserveProxyAckEventData>(*message)->last_sequence_num);
342 case EventType::kObserveClosure:
343 return OnObserveClosure(
344 header->port_name,
345 GetEventData<ObserveClosureEventData>(*message)->last_sequence_num);
346 case EventType::kMergePort:
347 return OnMergePort(header->port_name,
348 *GetEventData<MergePortEventData>(*message));
349 }
350 return OOPS(ERROR_NOT_IMPLEMENTED);
351 }
352
MergePorts(const PortRef & port_ref,const NodeName & destination_node_name,const PortName & destination_port_name)353 int Node::MergePorts(const PortRef& port_ref,
354 const NodeName& destination_node_name,
355 const PortName& destination_port_name) {
356 Port* port = port_ref.port();
357 MergePortEventData data;
358 {
359 base::AutoLock lock(port->lock);
360
361 DVLOG(1) << "Sending MergePort from " << port_ref.name() << "@" << name_
362 << " to " << destination_port_name << "@" << destination_node_name;
363
364 // Send the port-to-merge over to the destination node so it can be merged
365 // into the port cycle atomically there.
366 data.new_port_name = port_ref.name();
367 WillSendPort(LockedPort(port), destination_node_name, &data.new_port_name,
368 &data.new_port_descriptor);
369 }
370 delegate_->ForwardMessage(
371 destination_node_name,
372 NewInternalMessage(destination_port_name,
373 EventType::kMergePort, data));
374 return OK;
375 }
376
MergeLocalPorts(const PortRef & port0_ref,const PortRef & port1_ref)377 int Node::MergeLocalPorts(const PortRef& port0_ref, const PortRef& port1_ref) {
378 Port* port0 = port0_ref.port();
379 Port* port1 = port1_ref.port();
380 int rv;
381 {
382 // |ports_lock_| must be held when acquiring overlapping port locks.
383 base::AutoLock ports_lock(ports_lock_);
384 base::AutoLock port0_lock(port0->lock);
385 base::AutoLock port1_lock(port1->lock);
386
387 DVLOG(1) << "Merging local ports " << port0_ref.name() << "@" << name_
388 << " and " << port1_ref.name() << "@" << name_;
389
390 if (port0->state != Port::kReceiving || port1->state != Port::kReceiving)
391 rv = ERROR_PORT_STATE_UNEXPECTED;
392 else
393 rv = MergePorts_Locked(port0_ref, port1_ref);
394 }
395
396 if (rv != OK) {
397 ClosePort(port0_ref);
398 ClosePort(port1_ref);
399 }
400
401 return rv;
402 }
403
LostConnectionToNode(const NodeName & node_name)404 int Node::LostConnectionToNode(const NodeName& node_name) {
405 // We can no longer send events to the given node. We also can't expect any
406 // PortAccepted events.
407
408 DVLOG(1) << "Observing lost connection from node " << name_
409 << " to node " << node_name;
410
411 DestroyAllPortsWithPeer(node_name, kInvalidPortName);
412 return OK;
413 }
414
OnUserMessage(ScopedMessage message)415 int Node::OnUserMessage(ScopedMessage message) {
416 PortName port_name = GetEventHeader(*message)->port_name;
417 const auto* event = GetEventData<UserEventData>(*message);
418
419 #if DCHECK_IS_ON()
420 std::ostringstream ports_buf;
421 for (size_t i = 0; i < message->num_ports(); ++i) {
422 if (i > 0)
423 ports_buf << ",";
424 ports_buf << message->ports()[i];
425 }
426
427 DVLOG(4) << "AcceptMessage " << event->sequence_num
428 << " [ports=" << ports_buf.str() << "] at "
429 << port_name << "@" << name_;
430 #endif
431
432 scoped_refptr<Port> port = GetPort(port_name);
433
434 // Even if this port does not exist, cannot receive anymore messages or is
435 // buffering or proxying messages, we still need these ports to be bound to
436 // this node. When the message is forwarded, these ports will get transferred
437 // following the usual method. If the message cannot be accepted, then the
438 // newly bound ports will simply be closed.
439
440 for (size_t i = 0; i < message->num_ports(); ++i) {
441 int rv = AcceptPort(message->ports()[i], GetPortDescriptors(event)[i]);
442 if (rv != OK)
443 return rv;
444 }
445
446 bool has_next_message = false;
447 bool message_accepted = false;
448
449 if (port) {
450 // We may want to forward messages once the port lock is held, so we must
451 // acquire |ports_lock_| first.
452 base::AutoLock ports_lock(ports_lock_);
453 base::AutoLock lock(port->lock);
454
455 // Reject spurious messages if we've already received the last expected
456 // message.
457 if (CanAcceptMoreMessages(port.get())) {
458 message_accepted = true;
459 port->message_queue.AcceptMessage(std::move(message), &has_next_message);
460
461 if (port->state == Port::kBuffering) {
462 has_next_message = false;
463 } else if (port->state == Port::kProxying) {
464 has_next_message = false;
465
466 // Forward messages. We forward messages in sequential order here so
467 // that we maintain the message queue's notion of next sequence number.
468 // That's useful for the proxy removal process as we can tell when this
469 // port has seen all of the messages it is expected to see.
470 int rv = ForwardMessages_Locked(LockedPort(port.get()), port_name);
471 if (rv != OK)
472 return rv;
473
474 MaybeRemoveProxy_Locked(LockedPort(port.get()), port_name);
475 }
476 }
477 }
478
479 if (!message_accepted) {
480 DVLOG(2) << "Message not accepted!\n";
481 // Close all newly accepted ports as they are effectively orphaned.
482 for (size_t i = 0; i < message->num_ports(); ++i) {
483 PortRef port_ref;
484 if (GetPort(message->ports()[i], &port_ref) == OK) {
485 ClosePort(port_ref);
486 } else {
487 DLOG(WARNING) << "Cannot close non-existent port!\n";
488 }
489 }
490 } else if (has_next_message) {
491 PortRef port_ref(port_name, port);
492 delegate_->PortStatusChanged(port_ref);
493 }
494
495 return OK;
496 }
497
OnPortAccepted(const PortName & port_name)498 int Node::OnPortAccepted(const PortName& port_name) {
499 scoped_refptr<Port> port = GetPort(port_name);
500 if (!port)
501 return ERROR_PORT_UNKNOWN;
502
503 DVLOG(2) << "PortAccepted at " << port_name << "@" << name_
504 << " pointing to "
505 << port->peer_port_name << "@" << port->peer_node_name;
506
507 return BeginProxying(PortRef(port_name, std::move(port)));
508 }
509
OnObserveProxy(const PortName & port_name,const ObserveProxyEventData & event)510 int Node::OnObserveProxy(const PortName& port_name,
511 const ObserveProxyEventData& event) {
512 if (port_name == kInvalidPortName) {
513 // An ObserveProxy with an invalid target port name is a broadcast used to
514 // inform ports when their peer (which was itself a proxy) has become
515 // defunct due to unexpected node disconnection.
516 //
517 // Receiving ports affected by this treat it as equivalent to peer closure.
518 // Proxies affected by this can be removed and will in turn broadcast their
519 // own death with a similar message.
520 CHECK_EQ(event.proxy_to_node_name, kInvalidNodeName);
521 CHECK_EQ(event.proxy_to_port_name, kInvalidPortName);
522 DestroyAllPortsWithPeer(event.proxy_node_name, event.proxy_port_name);
523 return OK;
524 }
525
526 // The port may have already been closed locally, in which case the
527 // ObserveClosure message will contain the last_sequence_num field.
528 // We can then silently ignore this message.
529 scoped_refptr<Port> port = GetPort(port_name);
530 if (!port) {
531 DVLOG(1) << "ObserveProxy: " << port_name << "@" << name_ << " not found";
532 return OK;
533 }
534
535 DVLOG(2) << "ObserveProxy at " << port_name << "@" << name_ << ", proxy at "
536 << event.proxy_port_name << "@"
537 << event.proxy_node_name << " pointing to "
538 << event.proxy_to_port_name << "@"
539 << event.proxy_to_node_name;
540
541 {
542 base::AutoLock lock(port->lock);
543
544 if (port->peer_node_name == event.proxy_node_name &&
545 port->peer_port_name == event.proxy_port_name) {
546 if (port->state == Port::kReceiving) {
547 port->peer_node_name = event.proxy_to_node_name;
548 port->peer_port_name = event.proxy_to_port_name;
549
550 ObserveProxyAckEventData ack;
551 ack.last_sequence_num = port->next_sequence_num_to_send - 1;
552
553 delegate_->ForwardMessage(
554 event.proxy_node_name,
555 NewInternalMessage(event.proxy_port_name,
556 EventType::kObserveProxyAck,
557 ack));
558 } else {
559 // As a proxy ourselves, we don't know how to honor the ObserveProxy
560 // event or to populate the last_sequence_num field of ObserveProxyAck.
561 // Afterall, another port could be sending messages to our peer now
562 // that we've sent out our own ObserveProxy event. Instead, we will
563 // send an ObserveProxyAck indicating that the ObserveProxy event
564 // should be re-sent (last_sequence_num set to kInvalidSequenceNum).
565 // However, this has to be done after we are removed as a proxy.
566 // Otherwise, we might just find ourselves back here again, which
567 // would be akin to a busy loop.
568
569 DVLOG(2) << "Delaying ObserveProxyAck to "
570 << event.proxy_port_name << "@" << event.proxy_node_name;
571
572 ObserveProxyAckEventData ack;
573 ack.last_sequence_num = kInvalidSequenceNum;
574
575 port->send_on_proxy_removal.reset(
576 new std::pair<NodeName, ScopedMessage>(
577 event.proxy_node_name,
578 NewInternalMessage(event.proxy_port_name,
579 EventType::kObserveProxyAck,
580 ack)));
581 }
582 } else {
583 // Forward this event along to our peer. Eventually, it should find the
584 // port referring to the proxy.
585 delegate_->ForwardMessage(
586 port->peer_node_name,
587 NewInternalMessage(port->peer_port_name,
588 EventType::kObserveProxy,
589 event));
590 }
591 }
592 return OK;
593 }
594
OnObserveProxyAck(const PortName & port_name,uint64_t last_sequence_num)595 int Node::OnObserveProxyAck(const PortName& port_name,
596 uint64_t last_sequence_num) {
597 DVLOG(2) << "ObserveProxyAck at " << port_name << "@" << name_
598 << " (last_sequence_num=" << last_sequence_num << ")";
599
600 scoped_refptr<Port> port = GetPort(port_name);
601 if (!port)
602 return ERROR_PORT_UNKNOWN; // The port may have observed closure first, so
603 // this is not an "Oops".
604
605 {
606 base::AutoLock lock(port->lock);
607
608 if (port->state != Port::kProxying)
609 return OOPS(ERROR_PORT_STATE_UNEXPECTED);
610
611 if (last_sequence_num == kInvalidSequenceNum) {
612 // Send again.
613 InitiateProxyRemoval(LockedPort(port.get()), port_name);
614 return OK;
615 }
616
617 // We can now remove this port once we have received and forwarded the last
618 // message addressed to this port.
619 port->remove_proxy_on_last_message = true;
620 port->last_sequence_num_to_receive = last_sequence_num;
621 }
622 TryRemoveProxy(PortRef(port_name, std::move(port)));
623 return OK;
624 }
625
OnObserveClosure(const PortName & port_name,uint64_t last_sequence_num)626 int Node::OnObserveClosure(const PortName& port_name,
627 uint64_t last_sequence_num) {
628 // OK if the port doesn't exist, as it may have been closed already.
629 scoped_refptr<Port> port = GetPort(port_name);
630 if (!port)
631 return OK;
632
633 // This message tells the port that it should no longer expect more messages
634 // beyond last_sequence_num. This message is forwarded along until we reach
635 // the receiving end, and this message serves as an equivalent to
636 // ObserveProxyAck.
637
638 bool notify_delegate = false;
639 ObserveClosureEventData forwarded_data;
640 NodeName peer_node_name;
641 PortName peer_port_name;
642 bool try_remove_proxy = false;
643 {
644 base::AutoLock lock(port->lock);
645
646 port->peer_closed = true;
647 port->last_sequence_num_to_receive = last_sequence_num;
648
649 DVLOG(2) << "ObserveClosure at " << port_name << "@" << name_
650 << " (state=" << port->state << ") pointing to "
651 << port->peer_port_name << "@" << port->peer_node_name
652 << " (last_sequence_num=" << last_sequence_num << ")";
653
654 // We always forward ObserveClosure, even beyond the receiving port which
655 // cares about it. This ensures that any dead-end proxies beyond that port
656 // are notified to remove themselves.
657
658 if (port->state == Port::kReceiving) {
659 notify_delegate = true;
660
661 // When forwarding along the other half of the port cycle, this will only
662 // reach dead-end proxies. Tell them we've sent our last message so they
663 // can go away.
664 //
665 // TODO: Repurposing ObserveClosure for this has the desired result but
666 // may be semantically confusing since the forwarding port is not actually
667 // closed. Consider replacing this with a new event type.
668 forwarded_data.last_sequence_num = port->next_sequence_num_to_send - 1;
669 } else {
670 // We haven't yet reached the receiving peer of the closed port, so
671 // forward the message along as-is.
672 forwarded_data.last_sequence_num = last_sequence_num;
673
674 // See about removing the port if it is a proxy as our peer won't be able
675 // to participate in proxy removal.
676 port->remove_proxy_on_last_message = true;
677 if (port->state == Port::kProxying)
678 try_remove_proxy = true;
679 }
680
681 DVLOG(2) << "Forwarding ObserveClosure from "
682 << port_name << "@" << name_ << " to peer "
683 << port->peer_port_name << "@" << port->peer_node_name
684 << " (last_sequence_num=" << forwarded_data.last_sequence_num
685 << ")";
686
687 peer_node_name = port->peer_node_name;
688 peer_port_name = port->peer_port_name;
689 }
690 if (try_remove_proxy)
691 TryRemoveProxy(PortRef(port_name, port));
692
693 delegate_->ForwardMessage(
694 peer_node_name,
695 NewInternalMessage(peer_port_name, EventType::kObserveClosure,
696 forwarded_data));
697
698 if (notify_delegate) {
699 PortRef port_ref(port_name, std::move(port));
700 delegate_->PortStatusChanged(port_ref);
701 }
702 return OK;
703 }
704
OnMergePort(const PortName & port_name,const MergePortEventData & event)705 int Node::OnMergePort(const PortName& port_name,
706 const MergePortEventData& event) {
707 scoped_refptr<Port> port = GetPort(port_name);
708
709 DVLOG(1) << "MergePort at " << port_name << "@" << name_ << " (state="
710 << (port ? port->state : -1) << ") merging with proxy "
711 << event.new_port_name
712 << "@" << name_ << " pointing to "
713 << event.new_port_descriptor.peer_port_name << "@"
714 << event.new_port_descriptor.peer_node_name << " referred by "
715 << event.new_port_descriptor.referring_port_name << "@"
716 << event.new_port_descriptor.referring_node_name;
717
718 bool close_target_port = false;
719 bool close_new_port = false;
720
721 // Accept the new port. This is now the receiving end of the other port cycle
722 // to be merged with ours.
723 int rv = AcceptPort(event.new_port_name, event.new_port_descriptor);
724 if (rv != OK) {
725 close_target_port = true;
726 } else if (port) {
727 // BeginProxying_Locked may call MaybeRemoveProxy_Locked, which in turn
728 // needs to hold |ports_lock_|. We also acquire multiple port locks within.
729 base::AutoLock ports_lock(ports_lock_);
730 base::AutoLock lock(port->lock);
731
732 if (port->state != Port::kReceiving) {
733 close_new_port = true;
734 } else {
735 scoped_refptr<Port> new_port = GetPort_Locked(event.new_port_name);
736 base::AutoLock new_port_lock(new_port->lock);
737 DCHECK(new_port->state == Port::kReceiving);
738
739 // Both ports are locked. Now all we have to do is swap their peer
740 // information and set them up as proxies.
741
742 PortRef port0_ref(port_name, port);
743 PortRef port1_ref(event.new_port_name, new_port);
744 int rv = MergePorts_Locked(port0_ref, port1_ref);
745 if (rv == OK)
746 return rv;
747
748 close_new_port = true;
749 close_target_port = true;
750 }
751 } else {
752 close_new_port = true;
753 }
754
755 if (close_target_port) {
756 PortRef target_port;
757 rv = GetPort(port_name, &target_port);
758 DCHECK(rv == OK);
759
760 ClosePort(target_port);
761 }
762
763 if (close_new_port) {
764 PortRef new_port;
765 rv = GetPort(event.new_port_name, &new_port);
766 DCHECK(rv == OK);
767
768 ClosePort(new_port);
769 }
770
771 return ERROR_PORT_STATE_UNEXPECTED;
772 }
773
AddPortWithName(const PortName & port_name,scoped_refptr<Port> port)774 int Node::AddPortWithName(const PortName& port_name, scoped_refptr<Port> port) {
775 base::AutoLock lock(ports_lock_);
776
777 if (!ports_.insert(std::make_pair(port_name, std::move(port))).second)
778 return OOPS(ERROR_PORT_EXISTS); // Suggests a bad UUID generator.
779
780 DVLOG(2) << "Created port " << port_name << "@" << name_;
781 return OK;
782 }
783
ErasePort(const PortName & port_name)784 void Node::ErasePort(const PortName& port_name) {
785 base::AutoLock lock(ports_lock_);
786 ErasePort_Locked(port_name);
787 }
788
ErasePort_Locked(const PortName & port_name)789 void Node::ErasePort_Locked(const PortName& port_name) {
790 ports_lock_.AssertAcquired();
791 ports_.erase(port_name);
792 DVLOG(2) << "Deleted port " << port_name << "@" << name_;
793 }
794
GetPort(const PortName & port_name)795 scoped_refptr<Port> Node::GetPort(const PortName& port_name) {
796 base::AutoLock lock(ports_lock_);
797 return GetPort_Locked(port_name);
798 }
799
GetPort_Locked(const PortName & port_name)800 scoped_refptr<Port> Node::GetPort_Locked(const PortName& port_name) {
801 ports_lock_.AssertAcquired();
802 auto iter = ports_.find(port_name);
803 if (iter == ports_.end())
804 return nullptr;
805
806 #if defined(OS_ANDROID) && defined(ARCH_CPU_ARM64)
807 // Workaround for https://crbug.com/665869.
808 base::subtle::MemoryBarrier();
809 #endif
810
811 return iter->second;
812 }
813
SendMessageInternal(const PortRef & port_ref,ScopedMessage * message)814 int Node::SendMessageInternal(const PortRef& port_ref, ScopedMessage* message) {
815 ScopedMessage& m = *message;
816 for (size_t i = 0; i < m->num_ports(); ++i) {
817 if (m->ports()[i] == port_ref.name())
818 return ERROR_PORT_CANNOT_SEND_SELF;
819 }
820
821 Port* port = port_ref.port();
822 NodeName peer_node_name;
823 {
824 // We must acquire |ports_lock_| before grabbing any port locks, because
825 // WillSendMessage_Locked may need to lock multiple ports out of order.
826 base::AutoLock ports_lock(ports_lock_);
827 base::AutoLock lock(port->lock);
828
829 if (port->state != Port::kReceiving)
830 return ERROR_PORT_STATE_UNEXPECTED;
831
832 if (port->peer_closed)
833 return ERROR_PORT_PEER_CLOSED;
834
835 int rv = WillSendMessage_Locked(LockedPort(port), port_ref.name(), m.get());
836 if (rv != OK)
837 return rv;
838
839 // Beyond this point there's no sense in returning anything but OK. Even if
840 // message forwarding or acceptance fails, there's nothing the embedder can
841 // do to recover. Assume that failure beyond this point must be treated as a
842 // transport failure.
843
844 peer_node_name = port->peer_node_name;
845 }
846
847 if (peer_node_name != name_) {
848 delegate_->ForwardMessage(peer_node_name, std::move(m));
849 return OK;
850 }
851
852 int rv = AcceptMessage(std::move(m));
853 if (rv != OK) {
854 // See comment above for why we don't return an error in this case.
855 DVLOG(2) << "AcceptMessage failed: " << rv;
856 }
857
858 return OK;
859 }
860
MergePorts_Locked(const PortRef & port0_ref,const PortRef & port1_ref)861 int Node::MergePorts_Locked(const PortRef& port0_ref,
862 const PortRef& port1_ref) {
863 Port* port0 = port0_ref.port();
864 Port* port1 = port1_ref.port();
865
866 ports_lock_.AssertAcquired();
867 port0->lock.AssertAcquired();
868 port1->lock.AssertAcquired();
869
870 CHECK(port0->state == Port::kReceiving);
871 CHECK(port1->state == Port::kReceiving);
872
873 // Ports cannot be merged with their own receiving peer!
874 if (port0->peer_node_name == name_ &&
875 port0->peer_port_name == port1_ref.name())
876 return ERROR_PORT_STATE_UNEXPECTED;
877
878 if (port1->peer_node_name == name_ &&
879 port1->peer_port_name == port0_ref.name())
880 return ERROR_PORT_STATE_UNEXPECTED;
881
882 // Only merge if both ports have never sent a message.
883 if (port0->next_sequence_num_to_send == kInitialSequenceNum &&
884 port1->next_sequence_num_to_send == kInitialSequenceNum) {
885 // Swap the ports' peer information and switch them both into buffering
886 // (eventually proxying) mode.
887
888 std::swap(port0->peer_node_name, port1->peer_node_name);
889 std::swap(port0->peer_port_name, port1->peer_port_name);
890
891 port0->state = Port::kBuffering;
892 if (port0->peer_closed)
893 port0->remove_proxy_on_last_message = true;
894
895 port1->state = Port::kBuffering;
896 if (port1->peer_closed)
897 port1->remove_proxy_on_last_message = true;
898
899 int rv1 = BeginProxying_Locked(LockedPort(port0), port0_ref.name());
900 int rv2 = BeginProxying_Locked(LockedPort(port1), port1_ref.name());
901
902 if (rv1 == OK && rv2 == OK) {
903 // If either merged port had a closed peer, its new peer needs to be
904 // informed of this.
905 if (port1->peer_closed) {
906 ObserveClosureEventData data;
907 data.last_sequence_num = port0->last_sequence_num_to_receive;
908 delegate_->ForwardMessage(
909 port0->peer_node_name,
910 NewInternalMessage(port0->peer_port_name,
911 EventType::kObserveClosure, data));
912 }
913
914 if (port0->peer_closed) {
915 ObserveClosureEventData data;
916 data.last_sequence_num = port1->last_sequence_num_to_receive;
917 delegate_->ForwardMessage(
918 port1->peer_node_name,
919 NewInternalMessage(port1->peer_port_name,
920 EventType::kObserveClosure, data));
921 }
922
923 return OK;
924 }
925
926 // If either proxy failed to initialize (e.g. had undeliverable messages
927 // or ended up in a bad state somehow), we keep the system in a consistent
928 // state by undoing the peer swap.
929 std::swap(port0->peer_node_name, port1->peer_node_name);
930 std::swap(port0->peer_port_name, port1->peer_port_name);
931 port0->remove_proxy_on_last_message = false;
932 port1->remove_proxy_on_last_message = false;
933 port0->state = Port::kReceiving;
934 port1->state = Port::kReceiving;
935 }
936
937 return ERROR_PORT_STATE_UNEXPECTED;
938 }
939
WillSendPort(const LockedPort & port,const NodeName & to_node_name,PortName * port_name,PortDescriptor * port_descriptor)940 void Node::WillSendPort(const LockedPort& port,
941 const NodeName& to_node_name,
942 PortName* port_name,
943 PortDescriptor* port_descriptor) {
944 port->lock.AssertAcquired();
945
946 PortName local_port_name = *port_name;
947
948 PortName new_port_name;
949 delegate_->GenerateRandomPortName(&new_port_name);
950
951 // Make sure we don't send messages to the new peer until after we know it
952 // exists. In the meantime, just buffer messages locally.
953 DCHECK(port->state == Port::kReceiving);
954 port->state = Port::kBuffering;
955
956 // If we already know our peer is closed, we already know this proxy can
957 // be removed once it receives and forwards its last expected message.
958 if (port->peer_closed)
959 port->remove_proxy_on_last_message = true;
960
961 *port_name = new_port_name;
962
963 port_descriptor->peer_node_name = port->peer_node_name;
964 port_descriptor->peer_port_name = port->peer_port_name;
965 port_descriptor->referring_node_name = name_;
966 port_descriptor->referring_port_name = local_port_name;
967 port_descriptor->next_sequence_num_to_send = port->next_sequence_num_to_send;
968 port_descriptor->next_sequence_num_to_receive =
969 port->message_queue.next_sequence_num();
970 port_descriptor->last_sequence_num_to_receive =
971 port->last_sequence_num_to_receive;
972 port_descriptor->peer_closed = port->peer_closed;
973 memset(port_descriptor->padding, 0, sizeof(port_descriptor->padding));
974
975 // Configure the local port to point to the new port.
976 port->peer_node_name = to_node_name;
977 port->peer_port_name = new_port_name;
978 }
979
AcceptPort(const PortName & port_name,const PortDescriptor & port_descriptor)980 int Node::AcceptPort(const PortName& port_name,
981 const PortDescriptor& port_descriptor) {
982 scoped_refptr<Port> port = make_scoped_refptr(
983 new Port(port_descriptor.next_sequence_num_to_send,
984 port_descriptor.next_sequence_num_to_receive));
985 port->state = Port::kReceiving;
986 port->peer_node_name = port_descriptor.peer_node_name;
987 port->peer_port_name = port_descriptor.peer_port_name;
988 port->last_sequence_num_to_receive =
989 port_descriptor.last_sequence_num_to_receive;
990 port->peer_closed = port_descriptor.peer_closed;
991
992 DVLOG(2) << "Accepting port " << port_name << " [peer_closed="
993 << port->peer_closed << "; last_sequence_num_to_receive="
994 << port->last_sequence_num_to_receive << "]";
995
996 // A newly accepted port is not signalable until the message referencing the
997 // new port finds its way to the consumer (see GetMessage).
998 port->message_queue.set_signalable(false);
999
1000 int rv = AddPortWithName(port_name, std::move(port));
1001 if (rv != OK)
1002 return rv;
1003
1004 // Allow referring port to forward messages.
1005 delegate_->ForwardMessage(
1006 port_descriptor.referring_node_name,
1007 NewInternalMessage(port_descriptor.referring_port_name,
1008 EventType::kPortAccepted));
1009 return OK;
1010 }
1011
WillSendMessage_Locked(const LockedPort & port,const PortName & port_name,Message * message)1012 int Node::WillSendMessage_Locked(const LockedPort& port,
1013 const PortName& port_name,
1014 Message* message) {
1015 ports_lock_.AssertAcquired();
1016 port->lock.AssertAcquired();
1017
1018 DCHECK(message);
1019
1020 // Messages may already have a sequence number if they're being forwarded
1021 // by a proxy. Otherwise, use the next outgoing sequence number.
1022 uint64_t* sequence_num =
1023 &GetMutableEventData<UserEventData>(message)->sequence_num;
1024 if (*sequence_num == 0)
1025 *sequence_num = port->next_sequence_num_to_send++;
1026
1027 #if DCHECK_IS_ON()
1028 std::ostringstream ports_buf;
1029 for (size_t i = 0; i < message->num_ports(); ++i) {
1030 if (i > 0)
1031 ports_buf << ",";
1032 ports_buf << message->ports()[i];
1033 }
1034 #endif
1035
1036 if (message->num_ports() > 0) {
1037 // Note: Another thread could be trying to send the same ports, so we need
1038 // to ensure that they are ours to send before we mutate their state.
1039
1040 std::vector<scoped_refptr<Port>> ports;
1041 ports.resize(message->num_ports());
1042
1043 {
1044 for (size_t i = 0; i < message->num_ports(); ++i) {
1045 ports[i] = GetPort_Locked(message->ports()[i]);
1046 DCHECK(ports[i]);
1047
1048 ports[i]->lock.Acquire();
1049 int error = OK;
1050 if (ports[i]->state != Port::kReceiving)
1051 error = ERROR_PORT_STATE_UNEXPECTED;
1052 else if (message->ports()[i] == port->peer_port_name)
1053 error = ERROR_PORT_CANNOT_SEND_PEER;
1054
1055 if (error != OK) {
1056 // Oops, we cannot send this port.
1057 for (size_t j = 0; j <= i; ++j)
1058 ports[i]->lock.Release();
1059 // Backpedal on the sequence number.
1060 port->next_sequence_num_to_send--;
1061 return error;
1062 }
1063 }
1064 }
1065
1066 PortDescriptor* port_descriptors =
1067 GetMutablePortDescriptors(GetMutableEventData<UserEventData>(message));
1068
1069 for (size_t i = 0; i < message->num_ports(); ++i) {
1070 WillSendPort(LockedPort(ports[i].get()),
1071 port->peer_node_name,
1072 message->mutable_ports() + i,
1073 port_descriptors + i);
1074 }
1075
1076 for (size_t i = 0; i < message->num_ports(); ++i)
1077 ports[i]->lock.Release();
1078 }
1079
1080 #if DCHECK_IS_ON()
1081 DVLOG(4) << "Sending message "
1082 << GetEventData<UserEventData>(*message)->sequence_num
1083 << " [ports=" << ports_buf.str() << "]"
1084 << " from " << port_name << "@" << name_
1085 << " to " << port->peer_port_name << "@" << port->peer_node_name;
1086 #endif
1087
1088 GetMutableEventHeader(message)->port_name = port->peer_port_name;
1089 return OK;
1090 }
1091
BeginProxying_Locked(const LockedPort & port,const PortName & port_name)1092 int Node::BeginProxying_Locked(const LockedPort& port,
1093 const PortName& port_name) {
1094 ports_lock_.AssertAcquired();
1095 port->lock.AssertAcquired();
1096
1097 if (port->state != Port::kBuffering)
1098 return OOPS(ERROR_PORT_STATE_UNEXPECTED);
1099
1100 port->state = Port::kProxying;
1101
1102 int rv = ForwardMessages_Locked(LockedPort(port), port_name);
1103 if (rv != OK)
1104 return rv;
1105
1106 // We may have observed closure while buffering. In that case, we can advance
1107 // to removing the proxy without sending out an ObserveProxy message. We
1108 // already know the last expected message, etc.
1109
1110 if (port->remove_proxy_on_last_message) {
1111 MaybeRemoveProxy_Locked(LockedPort(port), port_name);
1112
1113 // Make sure we propagate closure to our current peer.
1114 ObserveClosureEventData data;
1115 data.last_sequence_num = port->last_sequence_num_to_receive;
1116 delegate_->ForwardMessage(
1117 port->peer_node_name,
1118 NewInternalMessage(port->peer_port_name,
1119 EventType::kObserveClosure, data));
1120 } else {
1121 InitiateProxyRemoval(LockedPort(port), port_name);
1122 }
1123
1124 return OK;
1125 }
1126
BeginProxying(PortRef port_ref)1127 int Node::BeginProxying(PortRef port_ref) {
1128 Port* port = port_ref.port();
1129 {
1130 base::AutoLock ports_lock(ports_lock_);
1131 base::AutoLock lock(port->lock);
1132
1133 if (port->state != Port::kBuffering)
1134 return OOPS(ERROR_PORT_STATE_UNEXPECTED);
1135
1136 port->state = Port::kProxying;
1137
1138 int rv = ForwardMessages_Locked(LockedPort(port), port_ref.name());
1139 if (rv != OK)
1140 return rv;
1141 }
1142
1143 bool should_remove;
1144 NodeName peer_node_name;
1145 ScopedMessage closure_message;
1146 {
1147 base::AutoLock lock(port->lock);
1148 if (port->state != Port::kProxying)
1149 return OOPS(ERROR_PORT_STATE_UNEXPECTED);
1150
1151 should_remove = port->remove_proxy_on_last_message;
1152 if (should_remove) {
1153 // Make sure we propagate closure to our current peer.
1154 ObserveClosureEventData data;
1155 data.last_sequence_num = port->last_sequence_num_to_receive;
1156 peer_node_name = port->peer_node_name;
1157 closure_message = NewInternalMessage(port->peer_port_name,
1158 EventType::kObserveClosure, data);
1159 } else {
1160 InitiateProxyRemoval(LockedPort(port), port_ref.name());
1161 }
1162 }
1163
1164 if (should_remove) {
1165 TryRemoveProxy(port_ref);
1166 delegate_->ForwardMessage(peer_node_name, std::move(closure_message));
1167 }
1168
1169 return OK;
1170 }
1171
ForwardMessages_Locked(const LockedPort & port,const PortName & port_name)1172 int Node::ForwardMessages_Locked(const LockedPort& port,
1173 const PortName &port_name) {
1174 ports_lock_.AssertAcquired();
1175 port->lock.AssertAcquired();
1176
1177 for (;;) {
1178 ScopedMessage message;
1179 port->message_queue.GetNextMessage(&message, nullptr);
1180 if (!message)
1181 break;
1182
1183 int rv = WillSendMessage_Locked(LockedPort(port), port_name, message.get());
1184 if (rv != OK)
1185 return rv;
1186
1187 delegate_->ForwardMessage(port->peer_node_name, std::move(message));
1188 }
1189 return OK;
1190 }
1191
InitiateProxyRemoval(const LockedPort & port,const PortName & port_name)1192 void Node::InitiateProxyRemoval(const LockedPort& port,
1193 const PortName& port_name) {
1194 port->lock.AssertAcquired();
1195
1196 // To remove this node, we start by notifying the connected graph that we are
1197 // a proxy. This allows whatever port is referencing this node to skip it.
1198 // Eventually, this node will receive ObserveProxyAck (or ObserveClosure if
1199 // the peer was closed in the meantime).
1200
1201 ObserveProxyEventData data;
1202 data.proxy_node_name = name_;
1203 data.proxy_port_name = port_name;
1204 data.proxy_to_node_name = port->peer_node_name;
1205 data.proxy_to_port_name = port->peer_port_name;
1206
1207 delegate_->ForwardMessage(
1208 port->peer_node_name,
1209 NewInternalMessage(port->peer_port_name, EventType::kObserveProxy, data));
1210 }
1211
MaybeRemoveProxy_Locked(const LockedPort & port,const PortName & port_name)1212 void Node::MaybeRemoveProxy_Locked(const LockedPort& port,
1213 const PortName& port_name) {
1214 // |ports_lock_| must be held so we can potentilaly ErasePort_Locked().
1215 ports_lock_.AssertAcquired();
1216 port->lock.AssertAcquired();
1217
1218 DCHECK(port->state == Port::kProxying);
1219
1220 // Make sure we have seen ObserveProxyAck before removing the port.
1221 if (!port->remove_proxy_on_last_message)
1222 return;
1223
1224 if (!CanAcceptMoreMessages(port.get())) {
1225 // This proxy port is done. We can now remove it!
1226 ErasePort_Locked(port_name);
1227
1228 if (port->send_on_proxy_removal) {
1229 NodeName to_node = port->send_on_proxy_removal->first;
1230 ScopedMessage& message = port->send_on_proxy_removal->second;
1231
1232 delegate_->ForwardMessage(to_node, std::move(message));
1233 port->send_on_proxy_removal.reset();
1234 }
1235 } else {
1236 DVLOG(2) << "Cannot remove port " << port_name << "@" << name_
1237 << " now; waiting for more messages";
1238 }
1239 }
1240
TryRemoveProxy(PortRef port_ref)1241 void Node::TryRemoveProxy(PortRef port_ref) {
1242 Port* port = port_ref.port();
1243 bool should_erase = false;
1244 ScopedMessage msg;
1245 NodeName to_node;
1246 {
1247 base::AutoLock lock(port->lock);
1248
1249 // Port already removed. Nothing to do.
1250 if (port->state == Port::kClosed)
1251 return;
1252
1253 DCHECK(port->state == Port::kProxying);
1254
1255 // Make sure we have seen ObserveProxyAck before removing the port.
1256 if (!port->remove_proxy_on_last_message)
1257 return;
1258
1259 if (!CanAcceptMoreMessages(port)) {
1260 // This proxy port is done. We can now remove it!
1261 should_erase = true;
1262
1263 if (port->send_on_proxy_removal) {
1264 to_node = port->send_on_proxy_removal->first;
1265 msg = std::move(port->send_on_proxy_removal->second);
1266 port->send_on_proxy_removal.reset();
1267 }
1268 } else {
1269 DVLOG(2) << "Cannot remove port " << port_ref.name() << "@" << name_
1270 << " now; waiting for more messages";
1271 }
1272 }
1273
1274 if (should_erase)
1275 ErasePort(port_ref.name());
1276
1277 if (msg)
1278 delegate_->ForwardMessage(to_node, std::move(msg));
1279 }
1280
DestroyAllPortsWithPeer(const NodeName & node_name,const PortName & port_name)1281 void Node::DestroyAllPortsWithPeer(const NodeName& node_name,
1282 const PortName& port_name) {
1283 // Wipes out all ports whose peer node matches |node_name| and whose peer port
1284 // matches |port_name|. If |port_name| is |kInvalidPortName|, only the peer
1285 // node is matched.
1286
1287 std::vector<PortRef> ports_to_notify;
1288 std::vector<PortName> dead_proxies_to_broadcast;
1289 std::deque<PortName> referenced_port_names;
1290
1291 {
1292 base::AutoLock ports_lock(ports_lock_);
1293
1294 for (auto iter = ports_.begin(); iter != ports_.end(); ++iter) {
1295 Port* port = iter->second.get();
1296 {
1297 base::AutoLock port_lock(port->lock);
1298
1299 if (port->peer_node_name == node_name &&
1300 (port_name == kInvalidPortName ||
1301 port->peer_port_name == port_name)) {
1302 if (!port->peer_closed) {
1303 // Treat this as immediate peer closure. It's an exceptional
1304 // condition akin to a broken pipe, so we don't care about losing
1305 // messages.
1306
1307 port->peer_closed = true;
1308 port->last_sequence_num_to_receive =
1309 port->message_queue.next_sequence_num() - 1;
1310
1311 if (port->state == Port::kReceiving)
1312 ports_to_notify.push_back(PortRef(iter->first, port));
1313 }
1314
1315 // We don't expect to forward any further messages, and we don't
1316 // expect to receive a Port{Accepted,Rejected} event. Because we're
1317 // a proxy with no active peer, we cannot use the normal proxy removal
1318 // procedure of forward-propagating an ObserveProxy. Instead we
1319 // broadcast our own death so it can be back-propagated. This is
1320 // inefficient but rare.
1321 if (port->state != Port::kReceiving) {
1322 dead_proxies_to_broadcast.push_back(iter->first);
1323 iter->second->message_queue.GetReferencedPorts(
1324 &referenced_port_names);
1325 }
1326 }
1327 }
1328 }
1329
1330 for (const auto& proxy_name : dead_proxies_to_broadcast) {
1331 ports_.erase(proxy_name);
1332 DVLOG(2) << "Forcibly deleted port " << proxy_name << "@" << name_;
1333 }
1334 }
1335
1336 // Wake up any receiving ports who have just observed simulated peer closure.
1337 for (const auto& port : ports_to_notify)
1338 delegate_->PortStatusChanged(port);
1339
1340 for (const auto& proxy_name : dead_proxies_to_broadcast) {
1341 // Broadcast an event signifying that this proxy is no longer functioning.
1342 ObserveProxyEventData event;
1343 event.proxy_node_name = name_;
1344 event.proxy_port_name = proxy_name;
1345 event.proxy_to_node_name = kInvalidNodeName;
1346 event.proxy_to_port_name = kInvalidPortName;
1347 delegate_->BroadcastMessage(NewInternalMessage(
1348 kInvalidPortName, EventType::kObserveProxy, event));
1349
1350 // Also process death locally since the port that points this closed one
1351 // could be on the current node.
1352 // Note: Although this is recursive, only a single port is involved which
1353 // limits the expected branching to 1.
1354 DestroyAllPortsWithPeer(name_, proxy_name);
1355 }
1356
1357 // Close any ports referenced by the closed proxies.
1358 for (const auto& name : referenced_port_names) {
1359 PortRef ref;
1360 if (GetPort(name, &ref) == OK)
1361 ClosePort(ref);
1362 }
1363 }
1364
NewInternalMessage_Helper(const PortName & port_name,const EventType & type,const void * data,size_t num_data_bytes)1365 ScopedMessage Node::NewInternalMessage_Helper(const PortName& port_name,
1366 const EventType& type,
1367 const void* data,
1368 size_t num_data_bytes) {
1369 ScopedMessage message;
1370 delegate_->AllocMessage(sizeof(EventHeader) + num_data_bytes, &message);
1371
1372 EventHeader* header = GetMutableEventHeader(message.get());
1373 header->port_name = port_name;
1374 header->type = type;
1375 header->padding = 0;
1376
1377 if (num_data_bytes)
1378 memcpy(header + 1, data, num_data_bytes);
1379
1380 return message;
1381 }
1382
1383 } // namespace ports
1384 } // namespace edk
1385 } // namespace mojo
1386