• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include <inttypes.h>
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <string.h>
9 
10 #include <map>
11 #include <sstream>
12 #include <utility>
13 
14 #include "base/bind.h"
15 #include "base/callback.h"
16 #include "base/containers/queue.h"
17 #include "base/logging.h"
18 #include "base/memory/ref_counted.h"
19 #include "base/strings/string_piece.h"
20 #include "base/strings/stringprintf.h"
21 #include "base/synchronization/lock.h"
22 #include "base/synchronization/waitable_event.h"
23 #include "base/test/scoped_task_environment.h"
24 #include "base/threading/thread.h"
25 #include "mojo/core/ports/event.h"
26 #include "mojo/core/ports/node.h"
27 #include "mojo/core/ports/node_delegate.h"
28 #include "mojo/core/ports/user_message.h"
29 #include "testing/gtest/include/gtest/gtest.h"
30 
31 namespace mojo {
32 namespace core {
33 namespace ports {
34 namespace test {
35 
36 namespace {
37 
38 // TODO(rockot): Remove this unnecessary alias.
39 using ScopedMessage = std::unique_ptr<UserMessageEvent>;
40 
41 class TestMessage : public UserMessage {
42  public:
43   static const TypeInfo kUserMessageTypeInfo;
44 
TestMessage(const base::StringPiece & payload)45   TestMessage(const base::StringPiece& payload)
46       : UserMessage(&kUserMessageTypeInfo), payload_(payload) {}
~TestMessage()47   ~TestMessage() override {}
48 
payload() const49   const std::string& payload() const { return payload_; }
50 
51  private:
52   std::string payload_;
53 };
54 
55 const UserMessage::TypeInfo TestMessage::kUserMessageTypeInfo = {};
56 
NewUserMessageEvent(const base::StringPiece & payload,size_t num_ports)57 ScopedMessage NewUserMessageEvent(const base::StringPiece& payload,
58                                   size_t num_ports) {
59   auto event = std::make_unique<UserMessageEvent>(num_ports);
60   event->AttachMessage(std::make_unique<TestMessage>(payload));
61   return event;
62 }
63 
MessageEquals(const ScopedMessage & message,const base::StringPiece & s)64 bool MessageEquals(const ScopedMessage& message, const base::StringPiece& s) {
65   return message->GetMessage<TestMessage>()->payload() == s;
66 }
67 
68 class TestNode;
69 
70 class MessageRouter {
71  public:
~MessageRouter()72   virtual ~MessageRouter() {}
73 
74   virtual void ForwardEvent(TestNode* from_node,
75                             const NodeName& node_name,
76                             ScopedEvent event) = 0;
77   virtual void BroadcastEvent(TestNode* from_node, ScopedEvent event) = 0;
78 };
79 
80 class TestNode : public NodeDelegate {
81  public:
TestNode(uint64_t id)82   explicit TestNode(uint64_t id)
83       : node_name_(id, 1),
84         node_(node_name_, this),
85         node_thread_(base::StringPrintf("Node %" PRIu64 " thread", id)),
86         events_available_event_(
87             base::WaitableEvent::ResetPolicy::AUTOMATIC,
88             base::WaitableEvent::InitialState::NOT_SIGNALED),
89         idle_event_(base::WaitableEvent::ResetPolicy::MANUAL,
90                     base::WaitableEvent::InitialState::SIGNALED) {}
91 
~TestNode()92   ~TestNode() override {
93     StopWhenIdle();
94     node_thread_.Stop();
95   }
96 
name() const97   const NodeName& name() const { return node_name_; }
98 
99   // NOTE: Node is thread-safe.
node()100   Node& node() { return node_; }
101 
idle_event()102   base::WaitableEvent& idle_event() { return idle_event_; }
103 
IsIdle()104   bool IsIdle() {
105     base::AutoLock lock(lock_);
106     return started_ && !dispatching_ &&
107            (incoming_events_.empty() || (block_on_event_ && blocked_));
108   }
109 
BlockOnEvent(Event::Type type)110   void BlockOnEvent(Event::Type type) {
111     base::AutoLock lock(lock_);
112     blocked_event_type_ = type;
113     block_on_event_ = true;
114   }
115 
Unblock()116   void Unblock() {
117     base::AutoLock lock(lock_);
118     block_on_event_ = false;
119     events_available_event_.Signal();
120   }
121 
Start(MessageRouter * router)122   void Start(MessageRouter* router) {
123     router_ = router;
124     node_thread_.Start();
125     node_thread_.task_runner()->PostTask(
126         FROM_HERE,
127         base::Bind(&TestNode::ProcessEvents, base::Unretained(this)));
128   }
129 
StopWhenIdle()130   void StopWhenIdle() {
131     base::AutoLock lock(lock_);
132     should_quit_ = true;
133     events_available_event_.Signal();
134   }
135 
WakeUp()136   void WakeUp() { events_available_event_.Signal(); }
137 
SendStringMessage(const PortRef & port,const std::string & s)138   int SendStringMessage(const PortRef& port, const std::string& s) {
139     return node_.SendUserMessage(port, NewUserMessageEvent(s, 0));
140   }
141 
SendStringMessageWithPort(const PortRef & port,const std::string & s,const PortName & sent_port_name)142   int SendStringMessageWithPort(const PortRef& port,
143                                 const std::string& s,
144                                 const PortName& sent_port_name) {
145     auto event = NewUserMessageEvent(s, 1);
146     event->ports()[0] = sent_port_name;
147     return node_.SendUserMessage(port, std::move(event));
148   }
149 
SendStringMessageWithPort(const PortRef & port,const std::string & s,const PortRef & sent_port)150   int SendStringMessageWithPort(const PortRef& port,
151                                 const std::string& s,
152                                 const PortRef& sent_port) {
153     return SendStringMessageWithPort(port, s, sent_port.name());
154   }
155 
set_drop_messages(bool value)156   void set_drop_messages(bool value) {
157     base::AutoLock lock(lock_);
158     drop_messages_ = value;
159   }
160 
set_save_messages(bool value)161   void set_save_messages(bool value) {
162     base::AutoLock lock(lock_);
163     save_messages_ = value;
164   }
165 
ReadMessage(const PortRef & port,ScopedMessage * message)166   bool ReadMessage(const PortRef& port, ScopedMessage* message) {
167     return node_.GetMessage(port, message, nullptr) == OK && *message;
168   }
169 
GetSavedMessage(ScopedMessage * message)170   bool GetSavedMessage(ScopedMessage* message) {
171     base::AutoLock lock(lock_);
172     if (saved_messages_.empty()) {
173       message->reset();
174       return false;
175     }
176     std::swap(*message, saved_messages_.front());
177     saved_messages_.pop();
178     return true;
179   }
180 
EnqueueEvent(ScopedEvent event)181   void EnqueueEvent(ScopedEvent event) {
182     idle_event_.Reset();
183 
184     // NOTE: This may be called from ForwardMessage and thus must not reenter
185     // |node_|.
186     base::AutoLock lock(lock_);
187     incoming_events_.emplace(std::move(event));
188     events_available_event_.Signal();
189   }
190 
ForwardEvent(const NodeName & node_name,ScopedEvent event)191   void ForwardEvent(const NodeName& node_name, ScopedEvent event) override {
192     {
193       base::AutoLock lock(lock_);
194       if (drop_messages_) {
195         DVLOG(1) << "Dropping ForwardMessage from node " << node_name_ << " to "
196                  << node_name;
197 
198         base::AutoUnlock unlock(lock_);
199         ClosePortsInEvent(event.get());
200         return;
201       }
202     }
203 
204     DCHECK(router_);
205     DVLOG(1) << "ForwardEvent from node " << node_name_ << " to " << node_name;
206     router_->ForwardEvent(this, node_name, std::move(event));
207   }
208 
BroadcastEvent(ScopedEvent event)209   void BroadcastEvent(ScopedEvent event) override {
210     router_->BroadcastEvent(this, std::move(event));
211   }
212 
PortStatusChanged(const PortRef & port)213   void PortStatusChanged(const PortRef& port) override {
214     // The port may be closed, in which case we ignore the notification.
215     base::AutoLock lock(lock_);
216     if (!save_messages_)
217       return;
218 
219     for (;;) {
220       ScopedMessage message;
221       {
222         base::AutoUnlock unlock(lock_);
223         if (!ReadMessage(port, &message))
224           break;
225       }
226 
227       saved_messages_.emplace(std::move(message));
228     }
229   }
230 
ClosePortsInEvent(Event * event)231   void ClosePortsInEvent(Event* event) {
232     if (event->type() != Event::Type::kUserMessage)
233       return;
234 
235     UserMessageEvent* message_event = static_cast<UserMessageEvent*>(event);
236     for (size_t i = 0; i < message_event->num_ports(); ++i) {
237       PortRef port;
238       ASSERT_EQ(OK, node_.GetPort(message_event->ports()[i], &port));
239       EXPECT_EQ(OK, node_.ClosePort(port));
240     }
241   }
242 
243  private:
ProcessEvents()244   void ProcessEvents() {
245     for (;;) {
246       events_available_event_.Wait();
247       base::AutoLock lock(lock_);
248 
249       if (should_quit_)
250         return;
251 
252       dispatching_ = true;
253       while (!incoming_events_.empty()) {
254         if (block_on_event_ &&
255             incoming_events_.front()->type() == blocked_event_type_) {
256           blocked_ = true;
257           // Go idle if we hit a blocked event type.
258           break;
259         } else {
260           blocked_ = false;
261         }
262         ScopedEvent event = std::move(incoming_events_.front());
263         incoming_events_.pop();
264 
265         // NOTE: AcceptMessage() can re-enter this object to call any of the
266         // NodeDelegate interface methods.
267         base::AutoUnlock unlock(lock_);
268         node_.AcceptEvent(std::move(event));
269       }
270 
271       dispatching_ = false;
272       started_ = true;
273       idle_event_.Signal();
274     };
275   }
276 
277   const NodeName node_name_;
278   Node node_;
279   MessageRouter* router_ = nullptr;
280 
281   base::Thread node_thread_;
282   base::WaitableEvent events_available_event_;
283   base::WaitableEvent idle_event_;
284 
285   // Guards fields below.
286   base::Lock lock_;
287   bool started_ = false;
288   bool dispatching_ = false;
289   bool should_quit_ = false;
290   bool drop_messages_ = false;
291   bool save_messages_ = false;
292   bool blocked_ = false;
293   bool block_on_event_ = false;
294   Event::Type blocked_event_type_;
295   base::queue<ScopedEvent> incoming_events_;
296   base::queue<ScopedMessage> saved_messages_;
297 };
298 
299 class PortsTest : public testing::Test, public MessageRouter {
300  public:
AddNode(TestNode * node)301   void AddNode(TestNode* node) {
302     {
303       base::AutoLock lock(lock_);
304       nodes_[node->name()] = node;
305     }
306     node->Start(this);
307   }
308 
RemoveNode(TestNode * node)309   void RemoveNode(TestNode* node) {
310     {
311       base::AutoLock lock(lock_);
312       nodes_.erase(node->name());
313     }
314 
315     for (const auto& entry : nodes_)
316       entry.second->node().LostConnectionToNode(node->name());
317   }
318 
319   // Waits until all known Nodes are idle. Message forwarding and processing
320   // is handled in such a way that idleness is a stable state: once all nodes in
321   // the system are idle, they will remain idle until the test explicitly
322   // initiates some further event (e.g. sending a message, closing a port, or
323   // removing a Node).
WaitForIdle()324   void WaitForIdle() {
325     for (;;) {
326       base::AutoLock global_lock(global_lock_);
327       bool all_nodes_idle = true;
328       for (const auto& entry : nodes_) {
329         if (!entry.second->IsIdle())
330           all_nodes_idle = false;
331         entry.second->WakeUp();
332       }
333       if (all_nodes_idle)
334         return;
335 
336       // Wait for any Node to signal that it's idle.
337       base::AutoUnlock global_unlock(global_lock_);
338       std::vector<base::WaitableEvent*> events;
339       for (const auto& entry : nodes_)
340         events.push_back(&entry.second->idle_event());
341       base::WaitableEvent::WaitMany(events.data(), events.size());
342     }
343   }
344 
CreatePortPair(TestNode * node0,PortRef * port0,TestNode * node1,PortRef * port1)345   void CreatePortPair(TestNode* node0,
346                       PortRef* port0,
347                       TestNode* node1,
348                       PortRef* port1) {
349     if (node0 == node1) {
350       EXPECT_EQ(OK, node0->node().CreatePortPair(port0, port1));
351     } else {
352       EXPECT_EQ(OK, node0->node().CreateUninitializedPort(port0));
353       EXPECT_EQ(OK, node1->node().CreateUninitializedPort(port1));
354       EXPECT_EQ(OK, node0->node().InitializePort(*port0, node1->name(),
355                                                  port1->name()));
356       EXPECT_EQ(OK, node1->node().InitializePort(*port1, node0->name(),
357                                                  port0->name()));
358     }
359   }
360 
361  private:
362   // MessageRouter:
ForwardEvent(TestNode * from_node,const NodeName & node_name,ScopedEvent event)363   void ForwardEvent(TestNode* from_node,
364                     const NodeName& node_name,
365                     ScopedEvent event) override {
366     base::AutoLock global_lock(global_lock_);
367     base::AutoLock lock(lock_);
368     // Drop messages from nodes that have been removed.
369     if (nodes_.find(from_node->name()) == nodes_.end()) {
370       from_node->ClosePortsInEvent(event.get());
371       return;
372     }
373 
374     auto it = nodes_.find(node_name);
375     if (it == nodes_.end()) {
376       DVLOG(1) << "Node not found: " << node_name;
377       return;
378     }
379 
380     it->second->EnqueueEvent(std::move(event));
381   }
382 
BroadcastEvent(TestNode * from_node,ScopedEvent event)383   void BroadcastEvent(TestNode* from_node, ScopedEvent event) override {
384     base::AutoLock global_lock(global_lock_);
385     base::AutoLock lock(lock_);
386 
387     // Drop messages from nodes that have been removed.
388     if (nodes_.find(from_node->name()) == nodes_.end())
389       return;
390 
391     for (const auto& entry : nodes_) {
392       TestNode* node = entry.second;
393       // Broadcast doesn't deliver to the local node.
394       if (node == from_node)
395         continue;
396       node->EnqueueEvent(event->Clone());
397     }
398   }
399 
400   base::test::ScopedTaskEnvironment scoped_task_environment_;
401 
402   // Acquired before any operation which makes a Node busy, and before testing
403   // if all nodes are idle.
404   base::Lock global_lock_;
405 
406   base::Lock lock_;
407   std::map<NodeName, TestNode*> nodes_;
408 };
409 
410 }  // namespace
411 
TEST_F(PortsTest,Basic1)412 TEST_F(PortsTest, Basic1) {
413   TestNode node0(0);
414   AddNode(&node0);
415 
416   TestNode node1(1);
417   AddNode(&node1);
418 
419   PortRef x0, x1;
420   CreatePortPair(&node0, &x0, &node1, &x1);
421 
422   PortRef a0, a1;
423   EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
424   EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", a1));
425   EXPECT_EQ(OK, node0.node().ClosePort(a0));
426 
427   EXPECT_EQ(OK, node0.node().ClosePort(x0));
428   EXPECT_EQ(OK, node1.node().ClosePort(x1));
429 
430   WaitForIdle();
431 
432   EXPECT_TRUE(node0.node().CanShutdownCleanly());
433   EXPECT_TRUE(node1.node().CanShutdownCleanly());
434 }
435 
TEST_F(PortsTest,Basic2)436 TEST_F(PortsTest, Basic2) {
437   TestNode node0(0);
438   AddNode(&node0);
439 
440   TestNode node1(1);
441   AddNode(&node1);
442 
443   PortRef x0, x1;
444   CreatePortPair(&node0, &x0, &node1, &x1);
445 
446   PortRef b0, b1;
447   EXPECT_EQ(OK, node0.node().CreatePortPair(&b0, &b1));
448   EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", b1));
449   EXPECT_EQ(OK, node0.SendStringMessage(b0, "hello again"));
450 
451   EXPECT_EQ(OK, node0.node().ClosePort(b0));
452 
453   EXPECT_EQ(OK, node0.node().ClosePort(x0));
454   EXPECT_EQ(OK, node1.node().ClosePort(x1));
455 
456   WaitForIdle();
457 
458   EXPECT_TRUE(node0.node().CanShutdownCleanly());
459   EXPECT_TRUE(node1.node().CanShutdownCleanly());
460 }
461 
TEST_F(PortsTest,Basic3)462 TEST_F(PortsTest, Basic3) {
463   TestNode node0(0);
464   AddNode(&node0);
465 
466   TestNode node1(1);
467   AddNode(&node1);
468 
469   PortRef x0, x1;
470   CreatePortPair(&node0, &x0, &node1, &x1);
471 
472   PortRef a0, a1;
473   EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
474 
475   EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", a1));
476   EXPECT_EQ(OK, node0.SendStringMessage(a0, "hello again"));
477 
478   EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a0));
479 
480   PortRef b0, b1;
481   EXPECT_EQ(OK, node0.node().CreatePortPair(&b0, &b1));
482   EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "bar", b1));
483   EXPECT_EQ(OK, node0.SendStringMessage(b0, "baz"));
484 
485   EXPECT_EQ(OK, node0.node().ClosePort(b0));
486 
487   EXPECT_EQ(OK, node0.node().ClosePort(x0));
488   EXPECT_EQ(OK, node1.node().ClosePort(x1));
489 
490   WaitForIdle();
491 
492   EXPECT_TRUE(node0.node().CanShutdownCleanly());
493   EXPECT_TRUE(node1.node().CanShutdownCleanly());
494 }
495 
TEST_F(PortsTest,LostConnectionToNode1)496 TEST_F(PortsTest, LostConnectionToNode1) {
497   TestNode node0(0);
498   AddNode(&node0);
499 
500   TestNode node1(1);
501   AddNode(&node1);
502   node1.set_drop_messages(true);
503 
504   PortRef x0, x1;
505   CreatePortPair(&node0, &x0, &node1, &x1);
506 
507   // Transfer a port to node1 and simulate a lost connection to node1.
508 
509   PortRef a0, a1;
510   EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
511   EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a1));
512 
513   WaitForIdle();
514 
515   RemoveNode(&node1);
516 
517   WaitForIdle();
518 
519   EXPECT_EQ(OK, node0.node().ClosePort(a0));
520   EXPECT_EQ(OK, node0.node().ClosePort(x0));
521   EXPECT_EQ(OK, node1.node().ClosePort(x1));
522 
523   WaitForIdle();
524 
525   EXPECT_TRUE(node0.node().CanShutdownCleanly());
526   EXPECT_TRUE(node1.node().CanShutdownCleanly());
527 }
528 
TEST_F(PortsTest,LostConnectionToNode2)529 TEST_F(PortsTest, LostConnectionToNode2) {
530   TestNode node0(0);
531   AddNode(&node0);
532 
533   TestNode node1(1);
534   AddNode(&node1);
535 
536   PortRef x0, x1;
537   CreatePortPair(&node0, &x0, &node1, &x1);
538 
539   PortRef a0, a1;
540   EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
541   EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "take a1", a1));
542 
543   WaitForIdle();
544 
545   node1.set_drop_messages(true);
546 
547   RemoveNode(&node1);
548 
549   WaitForIdle();
550 
551   // a0 should have eventually detected peer closure after node loss.
552   ScopedMessage message;
553   EXPECT_EQ(ERROR_PORT_PEER_CLOSED,
554             node0.node().GetMessage(a0, &message, nullptr));
555   EXPECT_FALSE(message);
556 
557   EXPECT_EQ(OK, node0.node().ClosePort(a0));
558 
559   EXPECT_EQ(OK, node0.node().ClosePort(x0));
560 
561   EXPECT_EQ(OK, node1.node().GetMessage(x1, &message, nullptr));
562   EXPECT_TRUE(message);
563   node1.ClosePortsInEvent(message.get());
564 
565   EXPECT_EQ(OK, node1.node().ClosePort(x1));
566 
567   WaitForIdle();
568 
569   EXPECT_TRUE(node0.node().CanShutdownCleanly());
570   EXPECT_TRUE(node1.node().CanShutdownCleanly());
571 }
572 
TEST_F(PortsTest,LostConnectionToNodeWithSecondaryProxy)573 TEST_F(PortsTest, LostConnectionToNodeWithSecondaryProxy) {
574   // Tests that a proxy gets cleaned up when its indirect peer lives on a lost
575   // node.
576 
577   TestNode node0(0);
578   AddNode(&node0);
579 
580   TestNode node1(1);
581   AddNode(&node1);
582 
583   TestNode node2(2);
584   AddNode(&node2);
585 
586   // Create A-B spanning nodes 0 and 1 and C-D spanning 1 and 2.
587   PortRef A, B, C, D;
588   CreatePortPair(&node0, &A, &node1, &B);
589   CreatePortPair(&node1, &C, &node2, &D);
590 
591   // Create E-F and send F over A to node 1.
592   PortRef E, F;
593   EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F));
594   EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", F));
595 
596   WaitForIdle();
597 
598   ScopedMessage message;
599   ASSERT_TRUE(node1.ReadMessage(B, &message));
600   ASSERT_EQ(1u, message->num_ports());
601 
602   EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &F));
603 
604   // Send F over C to node 2 and then simulate node 2 loss from node 1. Node 1
605   // will trivially become aware of the loss, and this test verifies that the
606   // port A on node 0 will eventually also become aware of it.
607 
608   // Make sure node2 stops processing events when it encounters an ObserveProxy.
609   node2.BlockOnEvent(Event::Type::kObserveProxy);
610 
611   EXPECT_EQ(OK, node1.SendStringMessageWithPort(C, ".", F));
612   WaitForIdle();
613 
614   // Simulate node 1 and 2 disconnecting.
615   EXPECT_EQ(OK, node1.node().LostConnectionToNode(node2.name()));
616 
617   // Let node2 continue processing events and wait for everyone to go idle.
618   node2.Unblock();
619   WaitForIdle();
620 
621   // Port F should be gone.
622   EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(F.name(), &F));
623 
624   // Port E should have detected peer closure despite the fact that there is
625   // no longer a continuous route from F to E over which the event could travel.
626   PortStatus status;
627   EXPECT_EQ(OK, node0.node().GetStatus(E, &status));
628   EXPECT_TRUE(status.peer_closed);
629 
630   EXPECT_EQ(OK, node0.node().ClosePort(A));
631   EXPECT_EQ(OK, node1.node().ClosePort(B));
632   EXPECT_EQ(OK, node1.node().ClosePort(C));
633   EXPECT_EQ(OK, node0.node().ClosePort(E));
634 
635   WaitForIdle();
636 
637   EXPECT_TRUE(node0.node().CanShutdownCleanly());
638   EXPECT_TRUE(node1.node().CanShutdownCleanly());
639 }
640 
TEST_F(PortsTest,LostConnectionToNodeWithLocalProxy)641 TEST_F(PortsTest, LostConnectionToNodeWithLocalProxy) {
642   // Tests that a proxy gets cleaned up when its direct peer lives on a lost
643   // node and it's predecessor lives on the same node.
644 
645   TestNode node0(0);
646   AddNode(&node0);
647 
648   TestNode node1(1);
649   AddNode(&node1);
650 
651   PortRef A, B;
652   CreatePortPair(&node0, &A, &node1, &B);
653 
654   PortRef C, D;
655   EXPECT_EQ(OK, node0.node().CreatePortPair(&C, &D));
656 
657   // Send D but block node0 on an ObserveProxy event.
658   node0.BlockOnEvent(Event::Type::kObserveProxy);
659   EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", D));
660 
661   // node0 won't collapse the proxy but node1 will receive the message before
662   // going idle.
663   WaitForIdle();
664 
665   ScopedMessage message;
666   ASSERT_TRUE(node1.ReadMessage(B, &message));
667   ASSERT_EQ(1u, message->num_ports());
668   PortRef E;
669   EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &E));
670 
671   RemoveNode(&node1);
672 
673   node0.Unblock();
674   WaitForIdle();
675 
676   // Port C should have detected peer closure.
677   PortStatus status;
678   EXPECT_EQ(OK, node0.node().GetStatus(C, &status));
679   EXPECT_TRUE(status.peer_closed);
680 
681   EXPECT_EQ(OK, node0.node().ClosePort(A));
682   EXPECT_EQ(OK, node1.node().ClosePort(B));
683   EXPECT_EQ(OK, node0.node().ClosePort(C));
684   EXPECT_EQ(OK, node1.node().ClosePort(E));
685 
686   EXPECT_TRUE(node0.node().CanShutdownCleanly());
687   EXPECT_TRUE(node1.node().CanShutdownCleanly());
688 }
689 
TEST_F(PortsTest,GetMessage1)690 TEST_F(PortsTest, GetMessage1) {
691   TestNode node(0);
692   AddNode(&node);
693 
694   PortRef a0, a1;
695   EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1));
696 
697   ScopedMessage message;
698   EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr));
699   EXPECT_FALSE(message);
700 
701   EXPECT_EQ(OK, node.node().ClosePort(a1));
702 
703   WaitForIdle();
704 
705   EXPECT_EQ(ERROR_PORT_PEER_CLOSED,
706             node.node().GetMessage(a0, &message, nullptr));
707   EXPECT_FALSE(message);
708 
709   EXPECT_EQ(OK, node.node().ClosePort(a0));
710 
711   WaitForIdle();
712 
713   EXPECT_TRUE(node.node().CanShutdownCleanly());
714 }
715 
TEST_F(PortsTest,GetMessage2)716 TEST_F(PortsTest, GetMessage2) {
717   TestNode node(0);
718   AddNode(&node);
719 
720   PortRef a0, a1;
721   EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1));
722 
723   EXPECT_EQ(OK, node.SendStringMessage(a1, "1"));
724 
725   ScopedMessage message;
726   EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr));
727 
728   ASSERT_TRUE(message);
729   EXPECT_TRUE(MessageEquals(message, "1"));
730 
731   EXPECT_EQ(OK, node.node().ClosePort(a0));
732   EXPECT_EQ(OK, node.node().ClosePort(a1));
733 
734   EXPECT_TRUE(node.node().CanShutdownCleanly());
735 }
736 
TEST_F(PortsTest,GetMessage3)737 TEST_F(PortsTest, GetMessage3) {
738   TestNode node(0);
739   AddNode(&node);
740 
741   PortRef a0, a1;
742   EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1));
743 
744   const char* kStrings[] = {"1", "2", "3"};
745 
746   for (size_t i = 0; i < sizeof(kStrings) / sizeof(kStrings[0]); ++i)
747     EXPECT_EQ(OK, node.SendStringMessage(a1, kStrings[i]));
748 
749   ScopedMessage message;
750   for (size_t i = 0; i < sizeof(kStrings) / sizeof(kStrings[0]); ++i) {
751     EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr));
752     ASSERT_TRUE(message);
753     EXPECT_TRUE(MessageEquals(message, kStrings[i]));
754   }
755 
756   EXPECT_EQ(OK, node.node().ClosePort(a0));
757   EXPECT_EQ(OK, node.node().ClosePort(a1));
758 
759   EXPECT_TRUE(node.node().CanShutdownCleanly());
760 }
761 
TEST_F(PortsTest,Delegation1)762 TEST_F(PortsTest, Delegation1) {
763   TestNode node0(0);
764   AddNode(&node0);
765 
766   TestNode node1(1);
767   AddNode(&node1);
768 
769   PortRef x0, x1;
770   CreatePortPair(&node0, &x0, &node1, &x1);
771 
772   // In this test, we send a message to a port that has been moved.
773 
774   PortRef a0, a1;
775   EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
776   EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "a1", a1));
777   WaitForIdle();
778 
779   ScopedMessage message;
780   ASSERT_TRUE(node1.ReadMessage(x1, &message));
781   ASSERT_EQ(1u, message->num_ports());
782   EXPECT_TRUE(MessageEquals(message, "a1"));
783 
784   // This is "a1" from the point of view of node1.
785   PortName a2_name = message->ports()[0];
786   EXPECT_EQ(OK, node1.SendStringMessageWithPort(x1, "a2", a2_name));
787   EXPECT_EQ(OK, node0.SendStringMessage(a0, "hello"));
788 
789   WaitForIdle();
790 
791   ASSERT_TRUE(node0.ReadMessage(x0, &message));
792   ASSERT_EQ(1u, message->num_ports());
793   EXPECT_TRUE(MessageEquals(message, "a2"));
794 
795   // This is "a2" from the point of view of node1.
796   PortName a3_name = message->ports()[0];
797 
798   PortRef a3;
799   EXPECT_EQ(OK, node0.node().GetPort(a3_name, &a3));
800 
801   ASSERT_TRUE(node0.ReadMessage(a3, &message));
802   EXPECT_EQ(0u, message->num_ports());
803   EXPECT_TRUE(MessageEquals(message, "hello"));
804 
805   EXPECT_EQ(OK, node0.node().ClosePort(a0));
806   EXPECT_EQ(OK, node0.node().ClosePort(a3));
807 
808   EXPECT_EQ(OK, node0.node().ClosePort(x0));
809   EXPECT_EQ(OK, node1.node().ClosePort(x1));
810 
811   EXPECT_TRUE(node0.node().CanShutdownCleanly());
812   EXPECT_TRUE(node1.node().CanShutdownCleanly());
813 }
814 
TEST_F(PortsTest,Delegation2)815 TEST_F(PortsTest, Delegation2) {
816   TestNode node0(0);
817   AddNode(&node0);
818 
819   TestNode node1(1);
820   AddNode(&node1);
821 
822   for (int i = 0; i < 100; ++i) {
823     // Setup pipe a<->b between node0 and node1.
824     PortRef A, B;
825     CreatePortPair(&node0, &A, &node1, &B);
826 
827     PortRef C, D;
828     EXPECT_EQ(OK, node0.node().CreatePortPair(&C, &D));
829 
830     PortRef E, F;
831     EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F));
832 
833     node1.set_save_messages(true);
834 
835     // Pass D over A to B.
836     EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, "1", D));
837 
838     // Pass F over C to D.
839     EXPECT_EQ(OK, node0.SendStringMessageWithPort(C, "1", F));
840 
841     // This message should find its way to node1.
842     EXPECT_EQ(OK, node0.SendStringMessage(E, "hello"));
843 
844     WaitForIdle();
845 
846     EXPECT_EQ(OK, node0.node().ClosePort(C));
847     EXPECT_EQ(OK, node0.node().ClosePort(E));
848 
849     EXPECT_EQ(OK, node0.node().ClosePort(A));
850     EXPECT_EQ(OK, node1.node().ClosePort(B));
851 
852     bool got_hello = false;
853     ScopedMessage message;
854     while (node1.GetSavedMessage(&message)) {
855       node1.ClosePortsInEvent(message.get());
856       if (MessageEquals(message, "hello")) {
857         got_hello = true;
858         break;
859       }
860     }
861 
862     EXPECT_TRUE(got_hello);
863 
864     WaitForIdle();  // Because closing ports may have generated tasks.
865   }
866 
867   EXPECT_TRUE(node0.node().CanShutdownCleanly());
868   EXPECT_TRUE(node1.node().CanShutdownCleanly());
869 }
870 
TEST_F(PortsTest,SendUninitialized)871 TEST_F(PortsTest, SendUninitialized) {
872   TestNode node(0);
873   AddNode(&node);
874 
875   PortRef x0;
876   EXPECT_EQ(OK, node.node().CreateUninitializedPort(&x0));
877   EXPECT_EQ(ERROR_PORT_STATE_UNEXPECTED, node.SendStringMessage(x0, "oops"));
878   EXPECT_EQ(OK, node.node().ClosePort(x0));
879   EXPECT_TRUE(node.node().CanShutdownCleanly());
880 }
881 
TEST_F(PortsTest,SendFailure)882 TEST_F(PortsTest, SendFailure) {
883   TestNode node(0);
884   AddNode(&node);
885 
886   node.set_save_messages(true);
887 
888   PortRef A, B;
889   EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
890 
891   // Try to send A over itself.
892 
893   EXPECT_EQ(ERROR_PORT_CANNOT_SEND_SELF,
894             node.SendStringMessageWithPort(A, "oops", A));
895 
896   // Try to send B over A.
897 
898   EXPECT_EQ(ERROR_PORT_CANNOT_SEND_PEER,
899             node.SendStringMessageWithPort(A, "nope", B));
900 
901   // B should be closed immediately.
902   EXPECT_EQ(ERROR_PORT_UNKNOWN, node.node().GetPort(B.name(), &B));
903 
904   WaitForIdle();
905 
906   // There should have been no messages accepted.
907   ScopedMessage message;
908   EXPECT_FALSE(node.GetSavedMessage(&message));
909 
910   EXPECT_EQ(OK, node.node().ClosePort(A));
911 
912   WaitForIdle();
913 
914   EXPECT_TRUE(node.node().CanShutdownCleanly());
915 }
916 
TEST_F(PortsTest,DontLeakUnreceivedPorts)917 TEST_F(PortsTest, DontLeakUnreceivedPorts) {
918   TestNode node(0);
919   AddNode(&node);
920 
921   PortRef A, B, C, D;
922   EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
923   EXPECT_EQ(OK, node.node().CreatePortPair(&C, &D));
924 
925   EXPECT_EQ(OK, node.SendStringMessageWithPort(A, "foo", D));
926 
927   EXPECT_EQ(OK, node.node().ClosePort(C));
928   EXPECT_EQ(OK, node.node().ClosePort(A));
929   EXPECT_EQ(OK, node.node().ClosePort(B));
930 
931   WaitForIdle();
932 
933   EXPECT_TRUE(node.node().CanShutdownCleanly());
934 }
935 
TEST_F(PortsTest,AllowShutdownWithLocalPortsOpen)936 TEST_F(PortsTest, AllowShutdownWithLocalPortsOpen) {
937   TestNode node(0);
938   AddNode(&node);
939 
940   PortRef A, B, C, D;
941   EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
942   EXPECT_EQ(OK, node.node().CreatePortPair(&C, &D));
943 
944   EXPECT_EQ(OK, node.SendStringMessageWithPort(A, "foo", D));
945 
946   ScopedMessage message;
947   EXPECT_TRUE(node.ReadMessage(B, &message));
948   ASSERT_EQ(1u, message->num_ports());
949   EXPECT_TRUE(MessageEquals(message, "foo"));
950   PortRef E;
951   ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E));
952 
953   EXPECT_TRUE(
954       node.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
955 
956   WaitForIdle();
957 
958   EXPECT_TRUE(
959       node.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
960   EXPECT_FALSE(node.node().CanShutdownCleanly());
961 
962   EXPECT_EQ(OK, node.node().ClosePort(A));
963   EXPECT_EQ(OK, node.node().ClosePort(B));
964   EXPECT_EQ(OK, node.node().ClosePort(C));
965   EXPECT_EQ(OK, node.node().ClosePort(E));
966 
967   WaitForIdle();
968 
969   EXPECT_TRUE(node.node().CanShutdownCleanly());
970 }
971 
TEST_F(PortsTest,ProxyCollapse1)972 TEST_F(PortsTest, ProxyCollapse1) {
973   TestNode node(0);
974   AddNode(&node);
975 
976   PortRef A, B;
977   EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
978 
979   PortRef X, Y;
980   EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
981 
982   ScopedMessage message;
983 
984   // Send B and receive it as C.
985   EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
986   ASSERT_TRUE(node.ReadMessage(Y, &message));
987   ASSERT_EQ(1u, message->num_ports());
988   PortRef C;
989   ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));
990 
991   // Send C and receive it as D.
992   EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", C));
993   ASSERT_TRUE(node.ReadMessage(Y, &message));
994   ASSERT_EQ(1u, message->num_ports());
995   PortRef D;
996   ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &D));
997 
998   // Send D and receive it as E.
999   EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", D));
1000   ASSERT_TRUE(node.ReadMessage(Y, &message));
1001   ASSERT_EQ(1u, message->num_ports());
1002   PortRef E;
1003   ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E));
1004 
1005   EXPECT_EQ(OK, node.node().ClosePort(X));
1006   EXPECT_EQ(OK, node.node().ClosePort(Y));
1007 
1008   EXPECT_EQ(OK, node.node().ClosePort(A));
1009   EXPECT_EQ(OK, node.node().ClosePort(E));
1010 
1011   // The node should not idle until all proxies are collapsed.
1012   WaitForIdle();
1013 
1014   EXPECT_TRUE(node.node().CanShutdownCleanly());
1015 }
1016 
TEST_F(PortsTest,ProxyCollapse2)1017 TEST_F(PortsTest, ProxyCollapse2) {
1018   TestNode node(0);
1019   AddNode(&node);
1020 
1021   PortRef A, B;
1022   EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
1023 
1024   PortRef X, Y;
1025   EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
1026 
1027   ScopedMessage message;
1028 
1029   // Send B and A to create proxies in each direction.
1030   EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
1031   EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", A));
1032 
1033   EXPECT_EQ(OK, node.node().ClosePort(X));
1034   EXPECT_EQ(OK, node.node().ClosePort(Y));
1035 
1036   // At this point we have a scenario with:
1037   //
1038   // D -> [B] -> C -> [A]
1039   //
1040   // Ensure that the proxies can collapse. The sent ports will be closed
1041   // eventually as a result of Y's closure.
1042 
1043   WaitForIdle();
1044 
1045   EXPECT_TRUE(node.node().CanShutdownCleanly());
1046 }
1047 
TEST_F(PortsTest,SendWithClosedPeer)1048 TEST_F(PortsTest, SendWithClosedPeer) {
1049   // This tests that if a port is sent when its peer is already known to be
1050   // closed, the newly created port will be aware of that peer closure, and the
1051   // proxy will eventually collapse.
1052 
1053   TestNode node(0);
1054   AddNode(&node);
1055 
1056   // Send a message from A to B, then close A.
1057   PortRef A, B;
1058   EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
1059   EXPECT_EQ(OK, node.SendStringMessage(A, "hey"));
1060   EXPECT_EQ(OK, node.node().ClosePort(A));
1061 
1062   // Now send B over X-Y as new port C.
1063   PortRef X, Y;
1064   EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
1065   EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
1066   ScopedMessage message;
1067   ASSERT_TRUE(node.ReadMessage(Y, &message));
1068   ASSERT_EQ(1u, message->num_ports());
1069   PortRef C;
1070   ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));
1071 
1072   EXPECT_EQ(OK, node.node().ClosePort(X));
1073   EXPECT_EQ(OK, node.node().ClosePort(Y));
1074 
1075   WaitForIdle();
1076 
1077   // C should have received the message originally sent to B, and it should also
1078   // be aware of A's closure.
1079 
1080   ASSERT_TRUE(node.ReadMessage(C, &message));
1081   EXPECT_TRUE(MessageEquals(message, "hey"));
1082 
1083   PortStatus status;
1084   EXPECT_EQ(OK, node.node().GetStatus(C, &status));
1085   EXPECT_FALSE(status.receiving_messages);
1086   EXPECT_FALSE(status.has_messages);
1087   EXPECT_TRUE(status.peer_closed);
1088 
1089   node.node().ClosePort(C);
1090 
1091   WaitForIdle();
1092 
1093   EXPECT_TRUE(node.node().CanShutdownCleanly());
1094 }
1095 
TEST_F(PortsTest,SendWithClosedPeerSent)1096 TEST_F(PortsTest, SendWithClosedPeerSent) {
1097   // This tests that if a port is closed while some number of proxies are still
1098   // routing messages (directly or indirectly) to it, that the peer port is
1099   // eventually notified of the closure, and the dead-end proxies will
1100   // eventually be removed.
1101 
1102   TestNode node(0);
1103   AddNode(&node);
1104 
1105   PortRef X, Y;
1106   EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
1107 
1108   PortRef A, B;
1109   EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
1110 
1111   ScopedMessage message;
1112 
1113   // Send A as new port C.
1114   EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", A));
1115 
1116   ASSERT_TRUE(node.ReadMessage(Y, &message));
1117   ASSERT_EQ(1u, message->num_ports());
1118   PortRef C;
1119   ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));
1120 
1121   // Send C as new port D.
1122   EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", C));
1123 
1124   ASSERT_TRUE(node.ReadMessage(Y, &message));
1125   ASSERT_EQ(1u, message->num_ports());
1126   PortRef D;
1127   ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &D));
1128 
1129   // Send a message to B through D, then close D.
1130   EXPECT_EQ(OK, node.SendStringMessage(D, "hey"));
1131   EXPECT_EQ(OK, node.node().ClosePort(D));
1132 
1133   // Now send B as new port E.
1134 
1135   EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
1136   EXPECT_EQ(OK, node.node().ClosePort(X));
1137 
1138   ASSERT_TRUE(node.ReadMessage(Y, &message));
1139   ASSERT_EQ(1u, message->num_ports());
1140   PortRef E;
1141   ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E));
1142 
1143   EXPECT_EQ(OK, node.node().ClosePort(Y));
1144 
1145   WaitForIdle();
1146 
1147   // E should receive the message originally sent to B, and it should also be
1148   // aware of D's closure.
1149 
1150   ASSERT_TRUE(node.ReadMessage(E, &message));
1151   EXPECT_TRUE(MessageEquals(message, "hey"));
1152 
1153   PortStatus status;
1154   EXPECT_EQ(OK, node.node().GetStatus(E, &status));
1155   EXPECT_FALSE(status.receiving_messages);
1156   EXPECT_FALSE(status.has_messages);
1157   EXPECT_TRUE(status.peer_closed);
1158 
1159   EXPECT_EQ(OK, node.node().ClosePort(E));
1160 
1161   WaitForIdle();
1162 
1163   EXPECT_TRUE(node.node().CanShutdownCleanly());
1164 }
1165 
TEST_F(PortsTest,MergePorts)1166 TEST_F(PortsTest, MergePorts) {
1167   TestNode node0(0);
1168   AddNode(&node0);
1169 
1170   TestNode node1(1);
1171   AddNode(&node1);
1172 
1173   // Setup two independent port pairs, A-B on node0 and C-D on node1.
1174   PortRef A, B, C, D;
1175   EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
1176   EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
1177 
1178   // Write a message on A.
1179   EXPECT_EQ(OK, node0.SendStringMessage(A, "hey"));
1180 
1181   // Initiate a merge between B and C.
1182   EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
1183 
1184   WaitForIdle();
1185 
1186   // Expect all proxies to be gone once idle.
1187   EXPECT_TRUE(
1188       node0.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
1189   EXPECT_TRUE(
1190       node1.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
1191 
1192   // Expect D to have received the message sent on A.
1193   ScopedMessage message;
1194   ASSERT_TRUE(node1.ReadMessage(D, &message));
1195   EXPECT_TRUE(MessageEquals(message, "hey"));
1196 
1197   EXPECT_EQ(OK, node0.node().ClosePort(A));
1198   EXPECT_EQ(OK, node1.node().ClosePort(D));
1199 
1200   // No more ports should be open.
1201   EXPECT_TRUE(node0.node().CanShutdownCleanly());
1202   EXPECT_TRUE(node1.node().CanShutdownCleanly());
1203 }
1204 
TEST_F(PortsTest,MergePortWithClosedPeer1)1205 TEST_F(PortsTest, MergePortWithClosedPeer1) {
1206   // This tests that the right thing happens when initiating a merge on a port
1207   // whose peer has already been closed.
1208 
1209   TestNode node0(0);
1210   AddNode(&node0);
1211 
1212   TestNode node1(1);
1213   AddNode(&node1);
1214 
1215   // Setup two independent port pairs, A-B on node0 and C-D on node1.
1216   PortRef A, B, C, D;
1217   EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
1218   EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
1219 
1220   // Write a message on A.
1221   EXPECT_EQ(OK, node0.SendStringMessage(A, "hey"));
1222 
1223   // Close A.
1224   EXPECT_EQ(OK, node0.node().ClosePort(A));
1225 
1226   // Initiate a merge between B and C.
1227   EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
1228 
1229   WaitForIdle();
1230 
1231   // Expect all proxies to be gone once idle. node0 should have no ports since
1232   // A was explicitly closed.
1233   EXPECT_TRUE(node0.node().CanShutdownCleanly());
1234   EXPECT_TRUE(
1235       node1.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
1236 
1237   // Expect D to have received the message sent on A.
1238   ScopedMessage message;
1239   ASSERT_TRUE(node1.ReadMessage(D, &message));
1240   EXPECT_TRUE(MessageEquals(message, "hey"));
1241 
1242   EXPECT_EQ(OK, node1.node().ClosePort(D));
1243 
1244   // No more ports should be open.
1245   EXPECT_TRUE(node0.node().CanShutdownCleanly());
1246   EXPECT_TRUE(node1.node().CanShutdownCleanly());
1247 }
1248 
TEST_F(PortsTest,MergePortWithClosedPeer2)1249 TEST_F(PortsTest, MergePortWithClosedPeer2) {
1250   // This tests that the right thing happens when merging into a port whose peer
1251   // has already been closed.
1252 
1253   TestNode node0(0);
1254   AddNode(&node0);
1255 
1256   TestNode node1(1);
1257   AddNode(&node1);
1258 
1259   // Setup two independent port pairs, A-B on node0 and C-D on node1.
1260   PortRef A, B, C, D;
1261   EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
1262   EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
1263 
1264   // Write a message on D and close it.
1265   EXPECT_EQ(OK, node0.SendStringMessage(D, "hey"));
1266   EXPECT_EQ(OK, node1.node().ClosePort(D));
1267 
1268   // Initiate a merge between B and C.
1269   EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
1270 
1271   WaitForIdle();
1272 
1273   // Expect all proxies to be gone once idle. node1 should have no ports since
1274   // D was explicitly closed.
1275   EXPECT_TRUE(
1276       node0.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
1277   EXPECT_TRUE(node1.node().CanShutdownCleanly());
1278 
1279   // Expect A to have received the message sent on D.
1280   ScopedMessage message;
1281   ASSERT_TRUE(node0.ReadMessage(A, &message));
1282   EXPECT_TRUE(MessageEquals(message, "hey"));
1283 
1284   EXPECT_EQ(OK, node0.node().ClosePort(A));
1285 
1286   // No more ports should be open.
1287   EXPECT_TRUE(node0.node().CanShutdownCleanly());
1288   EXPECT_TRUE(node1.node().CanShutdownCleanly());
1289 }
1290 
TEST_F(PortsTest,MergePortsWithClosedPeers)1291 TEST_F(PortsTest, MergePortsWithClosedPeers) {
1292   // This tests that no residual ports are left behind if two ports are merged
1293   // when both of their peers have been closed.
1294 
1295   TestNode node0(0);
1296   AddNode(&node0);
1297 
1298   TestNode node1(1);
1299   AddNode(&node1);
1300 
1301   // Setup two independent port pairs, A-B on node0 and C-D on node1.
1302   PortRef A, B, C, D;
1303   EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
1304   EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
1305 
1306   // Close A and D.
1307   EXPECT_EQ(OK, node0.node().ClosePort(A));
1308   EXPECT_EQ(OK, node1.node().ClosePort(D));
1309 
1310   WaitForIdle();
1311 
1312   // Initiate a merge between B and C.
1313   EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
1314 
1315   WaitForIdle();
1316 
1317   // Expect everything to have gone away.
1318   EXPECT_TRUE(node0.node().CanShutdownCleanly());
1319   EXPECT_TRUE(node1.node().CanShutdownCleanly());
1320 }
1321 
TEST_F(PortsTest,MergePortsWithMovedPeers)1322 TEST_F(PortsTest, MergePortsWithMovedPeers) {
1323   // This tests that ports can be merged successfully even if their peers are
1324   // moved around.
1325 
1326   TestNode node0(0);
1327   AddNode(&node0);
1328 
1329   TestNode node1(1);
1330   AddNode(&node1);
1331 
1332   // Setup two independent port pairs, A-B on node0 and C-D on node1.
1333   PortRef A, B, C, D;
1334   EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
1335   EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
1336 
1337   // Set up another pair X-Y for moving ports on node0.
1338   PortRef X, Y;
1339   EXPECT_EQ(OK, node0.node().CreatePortPair(&X, &Y));
1340 
1341   ScopedMessage message;
1342 
1343   // Move A to new port E.
1344   EXPECT_EQ(OK, node0.SendStringMessageWithPort(X, "foo", A));
1345   ASSERT_TRUE(node0.ReadMessage(Y, &message));
1346   ASSERT_EQ(1u, message->num_ports());
1347   PortRef E;
1348   ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &E));
1349 
1350   EXPECT_EQ(OK, node0.node().ClosePort(X));
1351   EXPECT_EQ(OK, node0.node().ClosePort(Y));
1352 
1353   // Write messages on E and D.
1354   EXPECT_EQ(OK, node0.SendStringMessage(E, "hey"));
1355   EXPECT_EQ(OK, node1.SendStringMessage(D, "hi"));
1356 
1357   // Initiate a merge between B and C.
1358   EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
1359 
1360   WaitForIdle();
1361 
1362   // Expect to receive D's message on E and E's message on D.
1363   ASSERT_TRUE(node0.ReadMessage(E, &message));
1364   EXPECT_TRUE(MessageEquals(message, "hi"));
1365   ASSERT_TRUE(node1.ReadMessage(D, &message));
1366   EXPECT_TRUE(MessageEquals(message, "hey"));
1367 
1368   // Close E and D.
1369   EXPECT_EQ(OK, node0.node().ClosePort(E));
1370   EXPECT_EQ(OK, node1.node().ClosePort(D));
1371 
1372   WaitForIdle();
1373 
1374   // Expect everything to have gone away.
1375   EXPECT_TRUE(node0.node().CanShutdownCleanly());
1376   EXPECT_TRUE(node1.node().CanShutdownCleanly());
1377 }
1378 
TEST_F(PortsTest,MergePortsFailsGracefully)1379 TEST_F(PortsTest, MergePortsFailsGracefully) {
1380   // This tests that the system remains in a well-defined state if something
1381   // goes wrong during port merge.
1382 
1383   TestNode node0(0);
1384   AddNode(&node0);
1385 
1386   TestNode node1(1);
1387   AddNode(&node1);
1388 
1389   // Setup two independent port pairs, A-B on node0 and C-D on node1.
1390   PortRef A, B, C, D;
1391   EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
1392   EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
1393 
1394   ScopedMessage message;
1395   PortRef X, Y;
1396   EXPECT_EQ(OK, node0.node().CreateUninitializedPort(&X));
1397   EXPECT_EQ(OK, node1.node().CreateUninitializedPort(&Y));
1398   EXPECT_EQ(OK, node0.node().InitializePort(X, node1.name(), Y.name()));
1399   EXPECT_EQ(OK, node1.node().InitializePort(Y, node0.name(), X.name()));
1400 
1401   // Block the merge from proceeding until we can do something stupid with port
1402   // C. This avoids the test logic racing with async merge logic.
1403   node1.BlockOnEvent(Event::Type::kMergePort);
1404 
1405   // Initiate the merge between B and C.
1406   EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
1407 
1408   // Move C to a new port E. This is not a sane use of Node's public API but
1409   // is still hypothetically possible. It allows us to force a merge failure
1410   // because C will be in an invalid state by the time the merge is processed.
1411   // As a result, B should be closed.
1412   EXPECT_EQ(OK, node1.SendStringMessageWithPort(Y, "foo", C));
1413 
1414   node1.Unblock();
1415 
1416   WaitForIdle();
1417 
1418   ASSERT_TRUE(node0.ReadMessage(X, &message));
1419   ASSERT_EQ(1u, message->num_ports());
1420   PortRef E;
1421   ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &E));
1422 
1423   EXPECT_EQ(OK, node0.node().ClosePort(X));
1424   EXPECT_EQ(OK, node1.node().ClosePort(Y));
1425 
1426   WaitForIdle();
1427 
1428   // C goes away as a result of normal proxy removal. B should have been closed
1429   // cleanly by the failed MergePorts.
1430   EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(C.name(), &C));
1431   EXPECT_EQ(ERROR_PORT_UNKNOWN, node0.node().GetPort(B.name(), &B));
1432 
1433   // Close A, D, and E.
1434   EXPECT_EQ(OK, node0.node().ClosePort(A));
1435   EXPECT_EQ(OK, node1.node().ClosePort(D));
1436   EXPECT_EQ(OK, node0.node().ClosePort(E));
1437 
1438   WaitForIdle();
1439 
1440   // Expect everything to have gone away.
1441   EXPECT_TRUE(node0.node().CanShutdownCleanly());
1442   EXPECT_TRUE(node1.node().CanShutdownCleanly());
1443 }
1444 
TEST_F(PortsTest,RemotePeerStatus)1445 TEST_F(PortsTest, RemotePeerStatus) {
1446   TestNode node0(0);
1447   AddNode(&node0);
1448 
1449   TestNode node1(1);
1450   AddNode(&node1);
1451 
1452   // Create a local port pair. Neither port should appear to have a remote peer.
1453   PortRef a, b;
1454   PortStatus status;
1455   node0.node().CreatePortPair(&a, &b);
1456   ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
1457   EXPECT_FALSE(status.peer_remote);
1458   ASSERT_EQ(OK, node0.node().GetStatus(b, &status));
1459   EXPECT_FALSE(status.peer_remote);
1460 
1461   // Create a port pair spanning the two nodes. Both spanning ports should
1462   // immediately appear to have a remote peer.
1463   PortRef x0, x1;
1464   CreatePortPair(&node0, &x0, &node1, &x1);
1465 
1466   ASSERT_EQ(OK, node0.node().GetStatus(x0, &status));
1467   EXPECT_TRUE(status.peer_remote);
1468   ASSERT_EQ(OK, node1.node().GetStatus(x1, &status));
1469   EXPECT_TRUE(status.peer_remote);
1470 
1471   PortRef x2, x3;
1472   CreatePortPair(&node0, &x2, &node1, &x3);
1473 
1474   // Transfer |b| to |node1| and |x1| to |node0|. i.e., make the local peers
1475   // remote and the remote peers local.
1476   EXPECT_EQ(OK, node0.SendStringMessageWithPort(x2, "foo", b));
1477   EXPECT_EQ(OK, node1.SendStringMessageWithPort(x3, "bar", x1));
1478   WaitForIdle();
1479 
1480   ScopedMessage message;
1481   ASSERT_TRUE(node0.ReadMessage(x2, &message));
1482   ASSERT_EQ(1u, message->num_ports());
1483   ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &x1));
1484 
1485   ASSERT_TRUE(node1.ReadMessage(x3, &message));
1486   ASSERT_EQ(1u, message->num_ports());
1487   ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &b));
1488 
1489   // Now x0-x1 should be local to node0 and a-b should span the nodes.
1490   ASSERT_EQ(OK, node0.node().GetStatus(x0, &status));
1491   EXPECT_FALSE(status.peer_remote);
1492   ASSERT_EQ(OK, node0.node().GetStatus(x1, &status));
1493   EXPECT_FALSE(status.peer_remote);
1494   ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
1495   EXPECT_TRUE(status.peer_remote);
1496   ASSERT_EQ(OK, node1.node().GetStatus(b, &status));
1497   EXPECT_TRUE(status.peer_remote);
1498 
1499   // And swap them back one more time.
1500   EXPECT_EQ(OK, node0.SendStringMessageWithPort(x2, "foo", x1));
1501   EXPECT_EQ(OK, node1.SendStringMessageWithPort(x3, "bar", b));
1502   WaitForIdle();
1503 
1504   ASSERT_TRUE(node0.ReadMessage(x2, &message));
1505   ASSERT_EQ(1u, message->num_ports());
1506   ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &b));
1507 
1508   ASSERT_TRUE(node1.ReadMessage(x3, &message));
1509   ASSERT_EQ(1u, message->num_ports());
1510   ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &x1));
1511 
1512   ASSERT_EQ(OK, node0.node().GetStatus(x0, &status));
1513   EXPECT_TRUE(status.peer_remote);
1514   ASSERT_EQ(OK, node1.node().GetStatus(x1, &status));
1515   EXPECT_TRUE(status.peer_remote);
1516   ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
1517   EXPECT_FALSE(status.peer_remote);
1518   ASSERT_EQ(OK, node0.node().GetStatus(b, &status));
1519   EXPECT_FALSE(status.peer_remote);
1520 
1521   EXPECT_EQ(OK, node0.node().ClosePort(x0));
1522   EXPECT_EQ(OK, node1.node().ClosePort(x1));
1523   EXPECT_EQ(OK, node0.node().ClosePort(x2));
1524   EXPECT_EQ(OK, node1.node().ClosePort(x3));
1525   EXPECT_EQ(OK, node0.node().ClosePort(a));
1526   EXPECT_EQ(OK, node0.node().ClosePort(b));
1527 
1528   EXPECT_TRUE(node0.node().CanShutdownCleanly());
1529   EXPECT_TRUE(node1.node().CanShutdownCleanly());
1530 }
1531 
TEST_F(PortsTest,RemotePeerStatusAfterLocalPortMerge)1532 TEST_F(PortsTest, RemotePeerStatusAfterLocalPortMerge) {
1533   TestNode node0(0);
1534   AddNode(&node0);
1535 
1536   TestNode node1(1);
1537   AddNode(&node1);
1538 
1539   // Set up a-b on node0 and c-d spanning node0-node1.
1540   PortRef a, b, c, d;
1541   node0.node().CreatePortPair(&a, &b);
1542   CreatePortPair(&node0, &c, &node1, &d);
1543 
1544   PortStatus status;
1545   ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
1546   EXPECT_FALSE(status.peer_remote);
1547   ASSERT_EQ(OK, node0.node().GetStatus(b, &status));
1548   EXPECT_FALSE(status.peer_remote);
1549   ASSERT_EQ(OK, node0.node().GetStatus(c, &status));
1550   EXPECT_TRUE(status.peer_remote);
1551   ASSERT_EQ(OK, node1.node().GetStatus(d, &status));
1552   EXPECT_TRUE(status.peer_remote);
1553 
1554   EXPECT_EQ(OK, node0.node().MergeLocalPorts(b, c));
1555   WaitForIdle();
1556 
1557   ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
1558   EXPECT_TRUE(status.peer_remote);
1559   ASSERT_EQ(OK, node1.node().GetStatus(d, &status));
1560   EXPECT_TRUE(status.peer_remote);
1561 
1562   EXPECT_EQ(OK, node0.node().ClosePort(a));
1563   EXPECT_EQ(OK, node1.node().ClosePort(d));
1564   EXPECT_TRUE(node0.node().CanShutdownCleanly());
1565   EXPECT_TRUE(node1.node().CanShutdownCleanly());
1566 }
1567 
TEST_F(PortsTest,RemotePeerStatusAfterRemotePortMerge)1568 TEST_F(PortsTest, RemotePeerStatusAfterRemotePortMerge) {
1569   TestNode node0(0);
1570   AddNode(&node0);
1571 
1572   TestNode node1(1);
1573   AddNode(&node1);
1574 
1575   // Set up a-b on node0 and c-d on node1.
1576   PortRef a, b, c, d;
1577   node0.node().CreatePortPair(&a, &b);
1578   node1.node().CreatePortPair(&c, &d);
1579 
1580   PortStatus status;
1581   ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
1582   EXPECT_FALSE(status.peer_remote);
1583   ASSERT_EQ(OK, node0.node().GetStatus(b, &status));
1584   EXPECT_FALSE(status.peer_remote);
1585   ASSERT_EQ(OK, node1.node().GetStatus(c, &status));
1586   EXPECT_FALSE(status.peer_remote);
1587   ASSERT_EQ(OK, node1.node().GetStatus(d, &status));
1588   EXPECT_FALSE(status.peer_remote);
1589 
1590   EXPECT_EQ(OK, node0.node().MergePorts(b, node1.name(), c.name()));
1591   WaitForIdle();
1592 
1593   ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
1594   EXPECT_TRUE(status.peer_remote);
1595   ASSERT_EQ(OK, node1.node().GetStatus(d, &status));
1596   EXPECT_TRUE(status.peer_remote);
1597 
1598   EXPECT_EQ(OK, node0.node().ClosePort(a));
1599   EXPECT_EQ(OK, node1.node().ClosePort(d));
1600   EXPECT_TRUE(node0.node().CanShutdownCleanly());
1601   EXPECT_TRUE(node1.node().CanShutdownCleanly());
1602 }
1603 
TEST_F(PortsTest,RetransmitUserMessageEvents)1604 TEST_F(PortsTest, RetransmitUserMessageEvents) {
1605   // Ensures that user message events can be retransmitted properly.
1606   TestNode node0(0);
1607   AddNode(&node0);
1608 
1609   PortRef a, b;
1610   node0.node().CreatePortPair(&a, &b);
1611 
1612   // Ping.
1613   const char* kMessage = "hey";
1614   ScopedMessage message;
1615   EXPECT_EQ(OK, node0.SendStringMessage(a, kMessage));
1616   ASSERT_TRUE(node0.ReadMessage(b, &message));
1617   EXPECT_TRUE(MessageEquals(message, kMessage));
1618 
1619   // Pong.
1620   EXPECT_EQ(OK, node0.node().SendUserMessage(b, std::move(message)));
1621   EXPECT_FALSE(message);
1622   ASSERT_TRUE(node0.ReadMessage(a, &message));
1623   EXPECT_TRUE(MessageEquals(message, kMessage));
1624 
1625   // Ping again.
1626   EXPECT_EQ(OK, node0.node().SendUserMessage(a, std::move(message)));
1627   EXPECT_FALSE(message);
1628   ASSERT_TRUE(node0.ReadMessage(b, &message));
1629   EXPECT_TRUE(MessageEquals(message, kMessage));
1630 
1631   // Pong again!
1632   EXPECT_EQ(OK, node0.node().SendUserMessage(b, std::move(message)));
1633   EXPECT_FALSE(message);
1634   ASSERT_TRUE(node0.ReadMessage(a, &message));
1635   EXPECT_TRUE(MessageEquals(message, kMessage));
1636 
1637   EXPECT_EQ(OK, node0.node().ClosePort(a));
1638   EXPECT_EQ(OK, node0.node().ClosePort(b));
1639 }
1640 
1641 }  // namespace test
1642 }  // namespace ports
1643 }  // namespace core
1644 }  // namespace mojo
1645