• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include <stdio.h>
6 #include <stdlib.h>
7 #include <string.h>
8 
9 #include <map>
10 #include <queue>
11 #include <sstream>
12 
13 #include "base/logging.h"
14 #include "base/rand_util.h"
15 #include "mojo/edk/system/ports/event.h"
16 #include "mojo/edk/system/ports/node.h"
17 #include "mojo/edk/system/ports/node_delegate.h"
18 #include "testing/gtest/include/gtest/gtest.h"
19 
20 namespace mojo {
21 namespace edk {
22 namespace ports {
23 namespace test {
24 
25 namespace {
26 
LogMessage(const Message * message)27 void LogMessage(const Message* message) {
28   std::stringstream ports;
29   for (size_t i = 0; i < message->num_ports(); ++i) {
30     if (i > 0)
31       ports << ",";
32     ports << message->ports()[i];
33   }
34   DVLOG(1) << "message: \""
35            << static_cast<const char*>(message->payload_bytes())
36            << "\" ports=[" << ports.str() << "]";
37 }
38 
ClosePortsInMessage(Node * node,Message * message)39 void ClosePortsInMessage(Node* node, Message* message) {
40   for (size_t i = 0; i < message->num_ports(); ++i) {
41     PortRef port;
42     ASSERT_EQ(OK, node->GetPort(message->ports()[i], &port));
43     EXPECT_EQ(OK, node->ClosePort(port));
44   }
45 }
46 
47 class TestMessage : public Message {
48  public:
NewUserMessage(size_t num_payload_bytes,size_t num_ports)49   static ScopedMessage NewUserMessage(size_t num_payload_bytes,
50                                       size_t num_ports) {
51     return ScopedMessage(new TestMessage(num_payload_bytes, num_ports));
52   }
53 
TestMessage(size_t num_payload_bytes,size_t num_ports)54   TestMessage(size_t num_payload_bytes, size_t num_ports)
55       : Message(num_payload_bytes, num_ports) {
56     start_ = new char[num_header_bytes_ + num_ports_bytes_ + num_payload_bytes];
57     InitializeUserMessageHeader(start_);
58   }
59 
TestMessage(size_t num_header_bytes,size_t num_payload_bytes,size_t num_ports_bytes)60   TestMessage(size_t num_header_bytes,
61               size_t num_payload_bytes,
62               size_t num_ports_bytes)
63       : Message(num_header_bytes,
64                 num_payload_bytes,
65                 num_ports_bytes) {
66     start_ = new char[num_header_bytes + num_payload_bytes + num_ports_bytes];
67   }
68 
~TestMessage()69   ~TestMessage() override {
70     delete[] start_;
71   }
72 };
73 
74 struct Task {
Taskmojo::edk::ports::test::__anon79fd6c780111::Task75   Task(NodeName node_name, ScopedMessage message)
76       : node_name(node_name),
77         message(std::move(message)),
78         priority(base::RandUint64()) {
79   }
80 
81   NodeName node_name;
82   ScopedMessage message;
83   uint64_t priority;
84 };
85 
86 struct TaskComparator {
operator ()mojo::edk::ports::test::__anon79fd6c780111::TaskComparator87   bool operator()(const Task* a, const Task* b) {
88     return a->priority < b->priority;
89   }
90 };
91 
92 const size_t kMaxNodes = 3;
93 
94 std::priority_queue<Task*, std::vector<Task*>, TaskComparator> task_queue;
95 Node* node_map[kMaxNodes];
96 
GetNode(const NodeName & name)97 Node* GetNode(const NodeName& name) {
98   return node_map[name.v1];
99 }
100 
SetNode(const NodeName & name,Node * node)101 void SetNode(const NodeName& name, Node* node) {
102   node_map[name.v1] = node;
103 }
104 
PumpTasks()105 void PumpTasks() {
106   while (!task_queue.empty()) {
107     Task* task = task_queue.top();
108     task_queue.pop();
109 
110     Node* node = GetNode(task->node_name);
111     if (node)
112       node->AcceptMessage(std::move(task->message));
113 
114     delete task;
115   }
116 }
117 
PumpUntilTask(EventType type)118 void PumpUntilTask(EventType type) {
119   while (!task_queue.empty()) {
120     Task* task = task_queue.top();
121 
122     const EventHeader* header = GetEventHeader(*task->message);
123     if (header->type == type)
124       return;
125 
126     task_queue.pop();
127 
128     Node* node = GetNode(task->node_name);
129     if (node)
130       node->AcceptMessage(std::move(task->message));
131 
132     delete task;
133   }
134 }
135 
DiscardPendingTasks()136 void DiscardPendingTasks() {
137   while (!task_queue.empty()) {
138     Task* task = task_queue.top();
139     task_queue.pop();
140     delete task;
141   }
142 }
143 
SendStringMessage(Node * node,const PortRef & port,const std::string & s)144 int SendStringMessage(Node* node, const PortRef& port, const std::string& s) {
145   size_t size = s.size() + 1;
146   ScopedMessage message = TestMessage::NewUserMessage(size, 0);
147   memcpy(message->mutable_payload_bytes(), s.data(), size);
148   return node->SendMessage(port, std::move(message));
149 }
150 
SendStringMessageWithPort(Node * node,const PortRef & port,const std::string & s,const PortName & sent_port_name)151 int SendStringMessageWithPort(Node* node,
152                               const PortRef& port,
153                               const std::string& s,
154                               const PortName& sent_port_name) {
155   size_t size = s.size() + 1;
156   ScopedMessage message = TestMessage::NewUserMessage(size, 1);
157   memcpy(message->mutable_payload_bytes(), s.data(), size);
158   message->mutable_ports()[0] = sent_port_name;
159   return node->SendMessage(port, std::move(message));
160 }
161 
SendStringMessageWithPort(Node * node,const PortRef & port,const std::string & s,const PortRef & sent_port)162 int SendStringMessageWithPort(Node* node,
163                               const PortRef& port,
164                               const std::string& s,
165                               const PortRef& sent_port) {
166   return SendStringMessageWithPort(node, port, s, sent_port.name());
167 }
168 
ToString(const ScopedMessage & message)169 const char* ToString(const ScopedMessage& message) {
170   return static_cast<const char*>(message->payload_bytes());
171 }
172 
173 class TestNodeDelegate : public NodeDelegate {
174  public:
TestNodeDelegate(const NodeName & node_name)175   explicit TestNodeDelegate(const NodeName& node_name)
176       : node_name_(node_name),
177         drop_messages_(false),
178         read_messages_(true),
179         save_messages_(false) {
180   }
181 
set_drop_messages(bool value)182   void set_drop_messages(bool value) { drop_messages_ = value; }
set_read_messages(bool value)183   void set_read_messages(bool value) { read_messages_ = value; }
set_save_messages(bool value)184   void set_save_messages(bool value) { save_messages_ = value; }
185 
GetSavedMessage(ScopedMessage * message)186   bool GetSavedMessage(ScopedMessage* message) {
187     if (saved_messages_.empty()) {
188       message->reset();
189       return false;
190     }
191     *message = std::move(saved_messages_.front());
192     saved_messages_.pop();
193     return true;
194   }
195 
GenerateRandomPortName(PortName * port_name)196   void GenerateRandomPortName(PortName* port_name) override {
197     static uint64_t next_port_name = 1;
198     port_name->v1 = next_port_name++;
199     port_name->v2 = 0;
200   }
201 
AllocMessage(size_t num_header_bytes,ScopedMessage * message)202   void AllocMessage(size_t num_header_bytes, ScopedMessage* message) override {
203     message->reset(new TestMessage(num_header_bytes, 0, 0));
204   }
205 
ForwardMessage(const NodeName & node_name,ScopedMessage message)206   void ForwardMessage(const NodeName& node_name,
207                       ScopedMessage message) override {
208     if (drop_messages_) {
209       DVLOG(1) << "Dropping ForwardMessage from node "
210                << node_name_ << " to " << node_name;
211       ClosePortsInMessage(GetNode(node_name), message.get());
212       return;
213     }
214     DVLOG(1) << "ForwardMessage from node "
215              << node_name_ << " to " << node_name;
216     task_queue.push(new Task(node_name, std::move(message)));
217   }
218 
BroadcastMessage(ScopedMessage message)219   void BroadcastMessage(ScopedMessage message) override {
220     for (size_t i = 0; i < kMaxNodes; ++i) {
221       Node* node = node_map[i];
222       // Broadcast doesn't deliver to the local node.
223       if (node && node != GetNode(node_name_)) {
224         // NOTE: We only need to support broadcast of events, which have no
225         // payload or ports bytes.
226         ScopedMessage new_message(
227             new TestMessage(message->num_header_bytes(), 0, 0));
228         memcpy(new_message->mutable_header_bytes(), message->header_bytes(),
229                message->num_header_bytes());
230         node->AcceptMessage(std::move(new_message));
231       }
232     }
233   }
234 
PortStatusChanged(const PortRef & port)235   void PortStatusChanged(const PortRef& port) override {
236     DVLOG(1) << "PortStatusChanged for " << port.name() << "@" << node_name_;
237     if (!read_messages_)
238       return;
239     Node* node = GetNode(node_name_);
240     for (;;) {
241       ScopedMessage message;
242       int rv = node->GetMessage(port, &message);
243       EXPECT_TRUE(rv == OK || rv == ERROR_PORT_PEER_CLOSED);
244       if (rv == ERROR_PORT_PEER_CLOSED || !message)
245         break;
246       if (save_messages_) {
247         SaveMessage(std::move(message));
248       } else {
249         LogMessage(message.get());
250         for (size_t i = 0; i < message->num_ports(); ++i) {
251           std::stringstream buf;
252           buf << "got port: " << message->ports()[i];
253 
254           PortRef received_port;
255           node->GetPort(message->ports()[i], &received_port);
256 
257           SendStringMessage(node, received_port, buf.str());
258 
259           // Avoid leaking these ports.
260           node->ClosePort(received_port);
261         }
262       }
263     }
264   }
265 
266  private:
SaveMessage(ScopedMessage message)267   void SaveMessage(ScopedMessage message) {
268     saved_messages_.emplace(std::move(message));
269   }
270 
271   std::queue<ScopedMessage> saved_messages_;
272   NodeName node_name_;
273   bool drop_messages_;
274   bool read_messages_;
275   bool save_messages_;
276 };
277 
278 class PortsTest : public testing::Test {
279  public:
SetUp()280   void SetUp() override {
281     DiscardPendingTasks();
282     SetNode(NodeName(0, 1), nullptr);
283     SetNode(NodeName(1, 1), nullptr);
284     SetNode(NodeName(2, 1), nullptr);
285   }
286 };
287 
288 }  // namespace
289 
TEST_F(PortsTest,Basic1)290 TEST_F(PortsTest, Basic1) {
291   NodeName node0_name(0, 1);
292   TestNodeDelegate node0_delegate(node0_name);
293   Node node0(node0_name, &node0_delegate);
294   SetNode(node0_name, &node0);
295 
296   NodeName node1_name(1, 1);
297   TestNodeDelegate node1_delegate(node1_name);
298   Node node1(node1_name, &node1_delegate);
299   SetNode(node1_name, &node1);
300 
301   // Setup pipe between node0 and node1.
302   PortRef x0, x1;
303   EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
304   EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1));
305   EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name()));
306   EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name()));
307 
308   // Transfer a port from node0 to node1.
309   PortRef a0, a1;
310   EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
311   EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "hello", a1));
312 
313   EXPECT_EQ(OK, node0.ClosePort(a0));
314 
315   EXPECT_EQ(OK, node0.ClosePort(x0));
316   EXPECT_EQ(OK, node1.ClosePort(x1));
317 
318   PumpTasks();
319 
320   EXPECT_TRUE(node0.CanShutdownCleanly(false));
321   EXPECT_TRUE(node1.CanShutdownCleanly(false));
322 }
323 
TEST_F(PortsTest,Basic2)324 TEST_F(PortsTest, Basic2) {
325   NodeName node0_name(0, 1);
326   TestNodeDelegate node0_delegate(node0_name);
327   Node node0(node0_name, &node0_delegate);
328   SetNode(node0_name, &node0);
329 
330   NodeName node1_name(1, 1);
331   TestNodeDelegate node1_delegate(node1_name);
332   Node node1(node1_name, &node1_delegate);
333   SetNode(node1_name, &node1);
334 
335   // Setup pipe between node0 and node1.
336   PortRef x0, x1;
337   EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
338   EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1));
339   EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name()));
340   EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name()));
341 
342   PortRef b0, b1;
343   EXPECT_EQ(OK, node0.CreatePortPair(&b0, &b1));
344   EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "hello", b1));
345   EXPECT_EQ(OK, SendStringMessage(&node0, b0, "hello again"));
346 
347   // This may cause a SendMessage(b1) failure.
348   EXPECT_EQ(OK, node0.ClosePort(b0));
349 
350   EXPECT_EQ(OK, node0.ClosePort(x0));
351   EXPECT_EQ(OK, node1.ClosePort(x1));
352 
353   PumpTasks();
354 
355   EXPECT_TRUE(node0.CanShutdownCleanly(false));
356   EXPECT_TRUE(node1.CanShutdownCleanly(false));
357 }
358 
TEST_F(PortsTest,Basic3)359 TEST_F(PortsTest, Basic3) {
360   NodeName node0_name(0, 1);
361   TestNodeDelegate node0_delegate(node0_name);
362   Node node0(node0_name, &node0_delegate);
363   SetNode(node0_name, &node0);
364 
365   NodeName node1_name(1, 1);
366   TestNodeDelegate node1_delegate(node1_name);
367   Node node1(node1_name, &node1_delegate);
368   SetNode(node1_name, &node1);
369 
370   // Setup pipe between node0 and node1.
371   PortRef x0, x1;
372   EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
373   EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1));
374   EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name()));
375   EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name()));
376 
377   // Transfer a port from node0 to node1.
378   PortRef a0, a1;
379   EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
380   EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "hello", a1));
381   EXPECT_EQ(OK, SendStringMessage(&node0, a0, "hello again"));
382 
383   // Transfer a0 as well.
384   EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "foo", a0));
385 
386   PortRef b0, b1;
387   EXPECT_EQ(OK, node0.CreatePortPair(&b0, &b1));
388   EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "bar", b1));
389   EXPECT_EQ(OK, SendStringMessage(&node0, b0, "baz"));
390 
391   // This may cause a SendMessage(b1) failure.
392   EXPECT_EQ(OK, node0.ClosePort(b0));
393 
394   EXPECT_EQ(OK, node0.ClosePort(x0));
395   EXPECT_EQ(OK, node1.ClosePort(x1));
396 
397   PumpTasks();
398 
399   EXPECT_TRUE(node0.CanShutdownCleanly(false));
400   EXPECT_TRUE(node1.CanShutdownCleanly(false));
401 }
402 
TEST_F(PortsTest,LostConnectionToNode1)403 TEST_F(PortsTest, LostConnectionToNode1) {
404   NodeName node0_name(0, 1);
405   TestNodeDelegate node0_delegate(node0_name);
406   Node node0(node0_name, &node0_delegate);
407   SetNode(node0_name, &node0);
408 
409   NodeName node1_name(1, 1);
410   TestNodeDelegate node1_delegate(node1_name);
411   Node node1(node1_name, &node1_delegate);
412   SetNode(node1_name, &node1);
413 
414   // Setup pipe between node0 and node1.
415   PortRef x0, x1;
416   EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
417   EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1));
418   EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name()));
419   EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name()));
420 
421   // Transfer port to node1 and simulate a lost connection to node1. Dropping
422   // events from node1 is how we simulate the lost connection.
423 
424   node1_delegate.set_drop_messages(true);
425 
426   PortRef a0, a1;
427   EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
428   EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "foo", a1));
429 
430   PumpTasks();
431 
432   EXPECT_EQ(OK, node0.LostConnectionToNode(node1_name));
433 
434   PumpTasks();
435 
436   EXPECT_EQ(OK, node0.ClosePort(a0));
437   EXPECT_EQ(OK, node0.ClosePort(x0));
438   EXPECT_EQ(OK, node1.ClosePort(x1));
439 
440   PumpTasks();
441 
442   EXPECT_TRUE(node0.CanShutdownCleanly(false));
443   EXPECT_TRUE(node1.CanShutdownCleanly(false));
444 }
445 
TEST_F(PortsTest,LostConnectionToNode2)446 TEST_F(PortsTest, LostConnectionToNode2) {
447   NodeName node0_name(0, 1);
448   TestNodeDelegate node0_delegate(node0_name);
449   Node node0(node0_name, &node0_delegate);
450   node_map[0] = &node0;
451 
452   NodeName node1_name(1, 1);
453   TestNodeDelegate node1_delegate(node1_name);
454   Node node1(node1_name, &node1_delegate);
455   node_map[1] = &node1;
456 
457   // Setup pipe between node0 and node1.
458   PortRef x0, x1;
459   EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
460   EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1));
461   EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name()));
462   EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name()));
463 
464   node1_delegate.set_read_messages(false);
465 
466   PortRef a0, a1;
467   EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
468   EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "take a1", a1));
469 
470   PumpTasks();
471 
472   node1_delegate.set_drop_messages(true);
473 
474   EXPECT_EQ(OK, node0.LostConnectionToNode(node1_name));
475 
476   PumpTasks();
477 
478   ScopedMessage message;
479   EXPECT_EQ(ERROR_PORT_PEER_CLOSED, node0.GetMessage(a0, &message));
480   EXPECT_FALSE(message);
481 
482   EXPECT_EQ(OK, node0.ClosePort(a0));
483 
484   EXPECT_EQ(OK, node0.ClosePort(x0));
485 
486   EXPECT_EQ(OK, node1.GetMessage(x1, &message));
487   EXPECT_TRUE(message);
488   ClosePortsInMessage(&node1, message.get());
489 
490   EXPECT_EQ(OK, node1.ClosePort(x1));
491 
492   PumpTasks();
493 
494   EXPECT_TRUE(node0.CanShutdownCleanly(false));
495   EXPECT_TRUE(node1.CanShutdownCleanly(false));
496 }
497 
TEST_F(PortsTest,LostConnectionToNodeWithSecondaryProxy)498 TEST_F(PortsTest, LostConnectionToNodeWithSecondaryProxy) {
499   // Tests that a proxy gets cleaned up when its indirect peer lives on a lost
500   // node.
501 
502   NodeName node0_name(0, 1);
503   TestNodeDelegate node0_delegate(node0_name);
504   Node node0(node0_name, &node0_delegate);
505   node_map[0] = &node0;
506 
507   NodeName node1_name(1, 1);
508   TestNodeDelegate node1_delegate(node1_name);
509   Node node1(node1_name, &node1_delegate);
510   node_map[1] = &node1;
511 
512   NodeName node2_name(2, 1);
513   TestNodeDelegate node2_delegate(node2_name);
514   Node node2(node2_name, &node2_delegate);
515   node_map[2] = &node2;
516 
517   node1_delegate.set_save_messages(true);
518 
519   // Create A-B spanning nodes 0 and 1 and C-D spanning 1 and 2.
520   PortRef A, B, C, D;
521   EXPECT_EQ(OK, node0.CreateUninitializedPort(&A));
522   EXPECT_EQ(OK, node1.CreateUninitializedPort(&B));
523   EXPECT_EQ(OK, node0.InitializePort(A, node1_name, B.name()));
524   EXPECT_EQ(OK, node1.InitializePort(B, node0_name, A.name()));
525   EXPECT_EQ(OK, node1.CreateUninitializedPort(&C));
526   EXPECT_EQ(OK, node2.CreateUninitializedPort(&D));
527   EXPECT_EQ(OK, node1.InitializePort(C, node2_name, D.name()));
528   EXPECT_EQ(OK, node2.InitializePort(D, node1_name, C.name()));
529 
530   // Create E-F and send F over A to node 1.
531   PortRef E, F;
532   EXPECT_EQ(OK, node0.CreatePortPair(&E, &F));
533   EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, ".", F));
534 
535   PumpTasks();
536 
537   ScopedMessage message;
538   ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
539   ASSERT_EQ(1u, message->num_ports());
540 
541   EXPECT_EQ(OK, node1.GetPort(message->ports()[0], &F));
542 
543   // Send F over C to node 2 and then simulate node 2 loss from node 1. Node 1
544   // will trivially become aware of the loss, and this test verifies that the
545   // port A on node 0 will eventually also become aware of it.
546 
547   EXPECT_EQ(OK, SendStringMessageWithPort(&node1, C, ".", F));
548 
549   node_map[2] = nullptr;
550   EXPECT_EQ(OK, node1.LostConnectionToNode(node2_name));
551 
552   PumpTasks();
553 
554   // Port F should be gone.
555   EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.GetPort(F.name(), &F));
556 
557   // Port E should have detected peer closure despite the fact that there is
558   // no longer a continuous route from F to E over which the event could travel.
559   PortStatus status;
560   EXPECT_EQ(OK, node0.GetStatus(E, &status));
561   EXPECT_TRUE(status.peer_closed);
562 
563   EXPECT_EQ(OK, node0.ClosePort(A));
564   EXPECT_EQ(OK, node1.ClosePort(B));
565   EXPECT_EQ(OK, node1.ClosePort(C));
566   EXPECT_EQ(OK, node0.ClosePort(E));
567 
568   EXPECT_TRUE(node0.CanShutdownCleanly(false));
569   EXPECT_TRUE(node1.CanShutdownCleanly(false));
570 }
571 
TEST_F(PortsTest,LostConnectionToNodeWithLocalProxy)572 TEST_F(PortsTest, LostConnectionToNodeWithLocalProxy) {
573   // Tests that a proxy gets cleaned up when its direct peer lives on a lost
574   // node and it's predecessor lives on the same node.
575 
576   NodeName node0_name(0, 1);
577   TestNodeDelegate node0_delegate(node0_name);
578   Node node0(node0_name, &node0_delegate);
579   node_map[0] = &node0;
580 
581   NodeName node1_name(1, 1);
582   TestNodeDelegate node1_delegate(node1_name);
583   Node node1(node1_name, &node1_delegate);
584   node_map[1] = &node1;
585 
586   node1_delegate.set_save_messages(true);
587 
588   // Create A-B spanning nodes 0 and 1.
589   PortRef A, B;
590   EXPECT_EQ(OK, node0.CreateUninitializedPort(&A));
591   EXPECT_EQ(OK, node1.CreateUninitializedPort(&B));
592   EXPECT_EQ(OK, node0.InitializePort(A, node1_name, B.name()));
593   EXPECT_EQ(OK, node1.InitializePort(B, node0_name, A.name()));
594 
595   // Create C-D and send D over A to node 1.
596   PortRef C, D;
597   EXPECT_EQ(OK, node0.CreatePortPair(&C, &D));
598   EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, ".", D));
599 
600   // Pump tasks until the start of port collapse for port D, which should become
601   // a proxy.
602   PumpUntilTask(EventType::kObserveProxy);
603 
604   ScopedMessage message;
605   ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
606   ASSERT_EQ(1u, message->num_ports());
607 
608   PortRef E;
609   EXPECT_EQ(OK, node1.GetPort(message->ports()[0], &E));
610 
611   EXPECT_EQ(OK, node0.LostConnectionToNode(node1_name));
612   PumpTasks();
613 
614   // Port C should have detected peer closure.
615   PortStatus status;
616   EXPECT_EQ(OK, node0.GetStatus(C, &status));
617   EXPECT_TRUE(status.peer_closed);
618 
619   EXPECT_EQ(OK, node0.ClosePort(A));
620   EXPECT_EQ(OK, node1.ClosePort(B));
621   EXPECT_EQ(OK, node0.ClosePort(C));
622   EXPECT_EQ(OK, node1.ClosePort(E));
623 
624   EXPECT_TRUE(node0.CanShutdownCleanly(false));
625   EXPECT_TRUE(node1.CanShutdownCleanly(false));
626 }
627 
TEST_F(PortsTest,GetMessage1)628 TEST_F(PortsTest, GetMessage1) {
629   NodeName node0_name(0, 1);
630   TestNodeDelegate node0_delegate(node0_name);
631   Node node0(node0_name, &node0_delegate);
632   node_map[0] = &node0;
633 
634   PortRef a0, a1;
635   EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
636 
637   ScopedMessage message;
638   EXPECT_EQ(OK, node0.GetMessage(a0, &message));
639   EXPECT_FALSE(message);
640 
641   EXPECT_EQ(OK, node0.ClosePort(a1));
642 
643   EXPECT_EQ(OK, node0.GetMessage(a0, &message));
644   EXPECT_FALSE(message);
645 
646   PumpTasks();
647 
648   EXPECT_EQ(ERROR_PORT_PEER_CLOSED, node0.GetMessage(a0, &message));
649   EXPECT_FALSE(message);
650 
651   EXPECT_EQ(OK, node0.ClosePort(a0));
652 
653   EXPECT_TRUE(node0.CanShutdownCleanly(false));
654 }
655 
TEST_F(PortsTest,GetMessage2)656 TEST_F(PortsTest, GetMessage2) {
657   NodeName node0_name(0, 1);
658   TestNodeDelegate node0_delegate(node0_name);
659   Node node0(node0_name, &node0_delegate);
660   node_map[0] = &node0;
661 
662   node0_delegate.set_read_messages(false);
663 
664   PortRef a0, a1;
665   EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
666 
667   EXPECT_EQ(OK, SendStringMessage(&node0, a1, "1"));
668 
669   ScopedMessage message;
670   EXPECT_EQ(OK, node0.GetMessage(a0, &message));
671 
672   ASSERT_TRUE(message);
673   EXPECT_EQ(0, strcmp("1", ToString(message)));
674 
675   EXPECT_EQ(OK, node0.ClosePort(a0));
676   EXPECT_EQ(OK, node0.ClosePort(a1));
677 
678   EXPECT_TRUE(node0.CanShutdownCleanly(false));
679 }
680 
TEST_F(PortsTest,GetMessage3)681 TEST_F(PortsTest, GetMessage3) {
682   NodeName node0_name(0, 1);
683   TestNodeDelegate node0_delegate(node0_name);
684   Node node0(node0_name, &node0_delegate);
685   node_map[0] = &node0;
686 
687   node0_delegate.set_read_messages(false);
688 
689   PortRef a0, a1;
690   EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
691 
692   const char* kStrings[] = {
693     "1",
694     "2",
695     "3"
696   };
697 
698   for (size_t i = 0; i < sizeof(kStrings)/sizeof(kStrings[0]); ++i)
699     EXPECT_EQ(OK, SendStringMessage(&node0, a1, kStrings[i]));
700 
701   ScopedMessage message;
702   for (size_t i = 0; i < sizeof(kStrings)/sizeof(kStrings[0]); ++i) {
703     EXPECT_EQ(OK, node0.GetMessage(a0, &message));
704     ASSERT_TRUE(message);
705     EXPECT_EQ(0, strcmp(kStrings[i], ToString(message)));
706     DVLOG(1) << "got " << kStrings[i];
707   }
708 
709   EXPECT_EQ(OK, node0.ClosePort(a0));
710   EXPECT_EQ(OK, node0.ClosePort(a1));
711 
712   EXPECT_TRUE(node0.CanShutdownCleanly(false));
713 }
714 
TEST_F(PortsTest,Delegation1)715 TEST_F(PortsTest, Delegation1) {
716   NodeName node0_name(0, 1);
717   TestNodeDelegate node0_delegate(node0_name);
718   Node node0(node0_name, &node0_delegate);
719   SetNode(node0_name, &node0);
720 
721   NodeName node1_name(1, 1);
722   TestNodeDelegate node1_delegate(node1_name);
723   Node node1(node1_name, &node1_delegate);
724   node_map[1] = &node1;
725 
726   node0_delegate.set_save_messages(true);
727   node1_delegate.set_save_messages(true);
728 
729   // Setup pipe between node0 and node1.
730   PortRef x0, x1;
731   EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
732   EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1));
733   EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name()));
734   EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name()));
735 
736   // In this test, we send a message to a port that has been moved.
737 
738   PortRef a0, a1;
739   EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
740 
741   EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "a1", a1));
742 
743   PumpTasks();
744 
745   ScopedMessage message;
746   ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
747 
748   ASSERT_EQ(1u, message->num_ports());
749 
750   // This is "a1" from the point of view of node1.
751   PortName a2_name = message->ports()[0];
752 
753   EXPECT_EQ(OK, SendStringMessageWithPort(&node1, x1, "a2", a2_name));
754 
755   PumpTasks();
756 
757   EXPECT_EQ(OK, SendStringMessage(&node0, a0, "hello"));
758 
759   PumpTasks();
760 
761   ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
762 
763   ASSERT_EQ(1u, message->num_ports());
764 
765   // This is "a2" from the point of view of node1.
766   PortName a3_name = message->ports()[0];
767 
768   PortRef a3;
769   EXPECT_EQ(OK, node0.GetPort(a3_name, &a3));
770 
771   EXPECT_EQ(0, strcmp("a2", ToString(message)));
772 
773   ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
774 
775   EXPECT_EQ(0u, message->num_ports());
776   EXPECT_EQ(0, strcmp("hello", ToString(message)));
777 
778   EXPECT_EQ(OK, node0.ClosePort(a0));
779   EXPECT_EQ(OK, node0.ClosePort(a3));
780 
781   EXPECT_EQ(OK, node0.ClosePort(x0));
782   EXPECT_EQ(OK, node1.ClosePort(x1));
783 
784   EXPECT_TRUE(node0.CanShutdownCleanly(false));
785   EXPECT_TRUE(node1.CanShutdownCleanly(false));
786 }
787 
TEST_F(PortsTest,Delegation2)788 TEST_F(PortsTest, Delegation2) {
789   NodeName node0_name(0, 1);
790   TestNodeDelegate node0_delegate(node0_name);
791   Node node0(node0_name, &node0_delegate);
792   SetNode(node0_name, &node0);
793 
794   NodeName node1_name(1, 1);
795   TestNodeDelegate node1_delegate(node1_name);
796   Node node1(node1_name, &node1_delegate);
797   node_map[1] = &node1;
798 
799   node0_delegate.set_save_messages(true);
800   node1_delegate.set_save_messages(true);
801 
802   for (int i = 0; i < 10; ++i) {
803     // Setup pipe a<->b between node0 and node1.
804     PortRef A, B;
805     EXPECT_EQ(OK, node0.CreateUninitializedPort(&A));
806     EXPECT_EQ(OK, node1.CreateUninitializedPort(&B));
807     EXPECT_EQ(OK, node0.InitializePort(A, node1_name, B.name()));
808     EXPECT_EQ(OK, node1.InitializePort(B, node0_name, A.name()));
809 
810     PortRef C, D;
811     EXPECT_EQ(OK, node0.CreatePortPair(&C, &D));
812 
813     PortRef E, F;
814     EXPECT_EQ(OK, node0.CreatePortPair(&E, &F));
815 
816     // Pass D over A to B.
817     EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, "1", D));
818 
819     // Pass F over C to D.
820     EXPECT_EQ(OK, SendStringMessageWithPort(&node0, C, "1", F));
821 
822     // This message should find its way to node1.
823     EXPECT_EQ(OK, SendStringMessage(&node0, E, "hello"));
824 
825     PumpTasks();
826 
827     EXPECT_EQ(OK, node0.ClosePort(C));
828     EXPECT_EQ(OK, node0.ClosePort(E));
829 
830     EXPECT_EQ(OK, node0.ClosePort(A));
831     EXPECT_EQ(OK, node1.ClosePort(B));
832 
833     for (;;) {
834       ScopedMessage message;
835       if (node1_delegate.GetSavedMessage(&message)) {
836         ClosePortsInMessage(&node1, message.get());
837         if (strcmp("hello", ToString(message)) == 0)
838           break;
839       } else {
840         ASSERT_TRUE(false);  // "hello" message not delivered!
841         break;
842       }
843     }
844 
845     PumpTasks();  // Because ClosePort may have generated tasks.
846   }
847 
848   EXPECT_TRUE(node0.CanShutdownCleanly(false));
849   EXPECT_TRUE(node1.CanShutdownCleanly(false));
850 }
851 
TEST_F(PortsTest,SendUninitialized)852 TEST_F(PortsTest, SendUninitialized) {
853   NodeName node0_name(0, 1);
854   TestNodeDelegate node0_delegate(node0_name);
855   Node node0(node0_name, &node0_delegate);
856   node_map[0] = &node0;
857 
858   PortRef x0;
859   EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
860   EXPECT_EQ(ERROR_PORT_STATE_UNEXPECTED,
861             SendStringMessage(&node0, x0, "oops"));
862   EXPECT_EQ(OK, node0.ClosePort(x0));
863   EXPECT_TRUE(node0.CanShutdownCleanly(false));
864 }
865 
TEST_F(PortsTest,SendFailure)866 TEST_F(PortsTest, SendFailure) {
867   NodeName node0_name(0, 1);
868   TestNodeDelegate node0_delegate(node0_name);
869   Node node0(node0_name, &node0_delegate);
870   node_map[0] = &node0;
871 
872   node0_delegate.set_save_messages(true);
873 
874   PortRef A, B;
875   EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
876 
877   // Try to send A over itself.
878 
879   EXPECT_EQ(ERROR_PORT_CANNOT_SEND_SELF,
880             SendStringMessageWithPort(&node0, A, "oops", A));
881 
882   // Try to send B over A.
883 
884   EXPECT_EQ(ERROR_PORT_CANNOT_SEND_PEER,
885             SendStringMessageWithPort(&node0, A, "nope", B));
886 
887   // B should be closed immediately.
888   EXPECT_EQ(ERROR_PORT_UNKNOWN, node0.GetPort(B.name(), &B));
889 
890   PumpTasks();
891 
892   // There should have been no messages accepted.
893   ScopedMessage message;
894   EXPECT_FALSE(node0_delegate.GetSavedMessage(&message));
895 
896   EXPECT_EQ(OK, node0.ClosePort(A));
897 
898   PumpTasks();
899 
900   EXPECT_TRUE(node0.CanShutdownCleanly(false));
901 }
902 
TEST_F(PortsTest,DontLeakUnreceivedPorts)903 TEST_F(PortsTest, DontLeakUnreceivedPorts) {
904   NodeName node0_name(0, 1);
905   TestNodeDelegate node0_delegate(node0_name);
906   Node node0(node0_name, &node0_delegate);
907   node_map[0] = &node0;
908 
909   node0_delegate.set_read_messages(false);
910 
911   PortRef A, B;
912   EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
913 
914   PortRef C, D;
915   EXPECT_EQ(OK, node0.CreatePortPair(&C, &D));
916 
917   EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, "foo", D));
918 
919   PumpTasks();
920 
921   EXPECT_EQ(OK, node0.ClosePort(C));
922 
923   EXPECT_EQ(OK, node0.ClosePort(A));
924   EXPECT_EQ(OK, node0.ClosePort(B));
925 
926   PumpTasks();
927 
928   EXPECT_TRUE(node0.CanShutdownCleanly(false));
929 }
930 
TEST_F(PortsTest,AllowShutdownWithLocalPortsOpen)931 TEST_F(PortsTest, AllowShutdownWithLocalPortsOpen) {
932   NodeName node0_name(0, 1);
933   TestNodeDelegate node0_delegate(node0_name);
934   Node node0(node0_name, &node0_delegate);
935   node_map[0] = &node0;
936 
937   node0_delegate.set_save_messages(true);
938 
939   PortRef A, B;
940   EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
941 
942   PortRef C, D;
943   EXPECT_EQ(OK, node0.CreatePortPair(&C, &D));
944 
945   EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, "foo", D));
946 
947   ScopedMessage message;
948   EXPECT_TRUE(node0_delegate.GetSavedMessage(&message));
949   ASSERT_EQ(1u, message->num_ports());
950 
951   PortRef E;
952   ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &E));
953 
954   EXPECT_TRUE(node0.CanShutdownCleanly(true));
955 
956   PumpTasks();
957 
958   EXPECT_TRUE(node0.CanShutdownCleanly(true));
959   EXPECT_FALSE(node0.CanShutdownCleanly(false));
960 
961   EXPECT_EQ(OK, node0.ClosePort(A));
962   EXPECT_EQ(OK, node0.ClosePort(B));
963   EXPECT_EQ(OK, node0.ClosePort(C));
964   EXPECT_EQ(OK, node0.ClosePort(E));
965 
966   PumpTasks();
967 
968   EXPECT_TRUE(node0.CanShutdownCleanly(false));
969 }
970 
TEST_F(PortsTest,ProxyCollapse1)971 TEST_F(PortsTest, ProxyCollapse1) {
972   NodeName node0_name(0, 1);
973   TestNodeDelegate node0_delegate(node0_name);
974   Node node0(node0_name, &node0_delegate);
975   node_map[0] = &node0;
976 
977   node0_delegate.set_save_messages(true);
978 
979   PortRef A, B;
980   EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
981 
982   PortRef X, Y;
983   EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y));
984 
985   ScopedMessage message;
986 
987   // Send B and receive it as C.
988   EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", B));
989   ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
990   ASSERT_EQ(1u, message->num_ports());
991   PortRef C;
992   ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &C));
993 
994   // Send C and receive it as D.
995   EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", C));
996   ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
997   ASSERT_EQ(1u, message->num_ports());
998   PortRef D;
999   ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &D));
1000 
1001   // Send D and receive it as E.
1002   EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", D));
1003   ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
1004   ASSERT_EQ(1u, message->num_ports());
1005   PortRef E;
1006   ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &E));
1007 
1008   EXPECT_EQ(OK, node0.ClosePort(X));
1009   EXPECT_EQ(OK, node0.ClosePort(Y));
1010 
1011   EXPECT_EQ(OK, node0.ClosePort(A));
1012   EXPECT_EQ(OK, node0.ClosePort(E));
1013 
1014   PumpTasks();
1015 
1016   EXPECT_TRUE(node0.CanShutdownCleanly(false));
1017 }
1018 
TEST_F(PortsTest,ProxyCollapse2)1019 TEST_F(PortsTest, ProxyCollapse2) {
1020   NodeName node0_name(0, 1);
1021   TestNodeDelegate node0_delegate(node0_name);
1022   Node node0(node0_name, &node0_delegate);
1023   node_map[0] = &node0;
1024 
1025   node0_delegate.set_save_messages(true);
1026 
1027   PortRef A, B;
1028   EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
1029 
1030   PortRef X, Y;
1031   EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y));
1032 
1033   ScopedMessage message;
1034 
1035   // Send B and receive it as C.
1036   EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", B));
1037   ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
1038   ASSERT_EQ(1u, message->num_ports());
1039   PortRef C;
1040   ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &C));
1041 
1042   // Send A and receive it as D.
1043   EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", A));
1044   ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
1045   ASSERT_EQ(1u, message->num_ports());
1046   PortRef D;
1047   ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &D));
1048 
1049   // At this point we have a scenario with:
1050   //
1051   // D -> [B] -> C -> [A]
1052   //
1053   // Ensure that the proxies can collapse.
1054 
1055   EXPECT_EQ(OK, node0.ClosePort(X));
1056   EXPECT_EQ(OK, node0.ClosePort(Y));
1057 
1058   EXPECT_EQ(OK, node0.ClosePort(C));
1059   EXPECT_EQ(OK, node0.ClosePort(D));
1060 
1061   PumpTasks();
1062 
1063   EXPECT_TRUE(node0.CanShutdownCleanly(false));
1064 }
1065 
TEST_F(PortsTest,SendWithClosedPeer)1066 TEST_F(PortsTest, SendWithClosedPeer) {
1067   // This tests that if a port is sent when its peer is already known to be
1068   // closed, the newly created port will be aware of that peer closure, and the
1069   // proxy will eventually collapse.
1070 
1071   NodeName node0_name(0, 1);
1072   TestNodeDelegate node0_delegate(node0_name);
1073   Node node0(node0_name, &node0_delegate);
1074   node_map[0] = &node0;
1075 
1076   node0_delegate.set_read_messages(false);
1077 
1078   // Send a message from A to B, then close A.
1079   PortRef A, B;
1080   EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
1081   EXPECT_EQ(OK, SendStringMessage(&node0, A, "hey"));
1082   EXPECT_EQ(OK, node0.ClosePort(A));
1083 
1084   PumpTasks();
1085 
1086   // Now send B over X-Y as new port C.
1087   PortRef X, Y;
1088   EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y));
1089 
1090   node0_delegate.set_read_messages(true);
1091   node0_delegate.set_save_messages(true);
1092   EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", B));
1093 
1094   EXPECT_EQ(OK, node0.ClosePort(X));
1095   EXPECT_EQ(OK, node0.ClosePort(Y));
1096 
1097   ScopedMessage message;
1098   ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
1099   ASSERT_EQ(1u, message->num_ports());
1100 
1101   PortRef C;
1102   ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &C));
1103 
1104   PumpTasks();
1105 
1106   // C should receive the message originally sent to B, and it should also be
1107   // aware of A's closure.
1108 
1109   ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
1110   EXPECT_EQ(0, strcmp("hey", ToString(message)));
1111 
1112   PortStatus status;
1113   EXPECT_EQ(OK, node0.GetStatus(C, &status));
1114   EXPECT_FALSE(status.receiving_messages);
1115   EXPECT_FALSE(status.has_messages);
1116   EXPECT_TRUE(status.peer_closed);
1117 
1118   node0.ClosePort(C);
1119 
1120   EXPECT_TRUE(node0.CanShutdownCleanly(false));
1121 }
1122 
TEST_F(PortsTest,SendWithClosedPeerSent)1123 TEST_F(PortsTest, SendWithClosedPeerSent) {
1124   // This tests that if a port is closed while some number of proxies are still
1125   // routing messages (directly or indirectly) to it, that the peer port is
1126   // eventually notified of the closure, and the dead-end proxies will
1127   // eventually be removed.
1128 
1129   NodeName node0_name(0, 1);
1130   TestNodeDelegate node0_delegate(node0_name);
1131   Node node0(node0_name, &node0_delegate);
1132   node_map[0] = &node0;
1133 
1134   node0_delegate.set_save_messages(true);
1135 
1136   PortRef X, Y;
1137   EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y));
1138 
1139   PortRef A, B;
1140   EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
1141 
1142   ScopedMessage message;
1143 
1144   // Send A as new port C.
1145   EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", A));
1146   ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
1147   ASSERT_EQ(1u, message->num_ports());
1148   PortRef C;
1149   ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &C));
1150 
1151   // Send C as new port D.
1152   EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", C));
1153   ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
1154   ASSERT_EQ(1u, message->num_ports());
1155   PortRef D;
1156   ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &D));
1157 
1158   node0_delegate.set_read_messages(false);
1159 
1160   // Send a message to B through D, then close D.
1161   EXPECT_EQ(OK, SendStringMessage(&node0, D, "hey"));
1162   EXPECT_EQ(OK, node0.ClosePort(D));
1163 
1164   PumpTasks();
1165 
1166   // Now send B as new port E.
1167 
1168   node0_delegate.set_read_messages(true);
1169   EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", B));
1170 
1171   EXPECT_EQ(OK, node0.ClosePort(X));
1172   EXPECT_EQ(OK, node0.ClosePort(Y));
1173 
1174   ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
1175   ASSERT_EQ(1u, message->num_ports());
1176 
1177   PortRef E;
1178   ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &E));
1179 
1180   PumpTasks();
1181 
1182   // E should receive the message originally sent to B, and it should also be
1183   // aware of D's closure.
1184 
1185   ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
1186   EXPECT_EQ(0, strcmp("hey", ToString(message)));
1187 
1188   PortStatus status;
1189   EXPECT_EQ(OK, node0.GetStatus(E, &status));
1190   EXPECT_FALSE(status.receiving_messages);
1191   EXPECT_FALSE(status.has_messages);
1192   EXPECT_TRUE(status.peer_closed);
1193 
1194   node0.ClosePort(E);
1195 
1196   PumpTasks();
1197 
1198   EXPECT_TRUE(node0.CanShutdownCleanly(false));
1199 }
1200 
TEST_F(PortsTest,MergePorts)1201 TEST_F(PortsTest, MergePorts) {
1202   NodeName node0_name(0, 1);
1203   TestNodeDelegate node0_delegate(node0_name);
1204   Node node0(node0_name, &node0_delegate);
1205   node_map[0] = &node0;
1206 
1207   NodeName node1_name(1, 1);
1208   TestNodeDelegate node1_delegate(node1_name);
1209   Node node1(node1_name, &node1_delegate);
1210   node_map[1] = &node1;
1211 
1212   // Setup two independent port pairs, A-B on node0 and C-D on node1.
1213   PortRef A, B, C, D;
1214   EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
1215   EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));
1216 
1217   node0_delegate.set_read_messages(false);
1218   node1_delegate.set_save_messages(true);
1219 
1220   // Write a message on A.
1221   EXPECT_EQ(OK, SendStringMessage(&node0, A, "hey"));
1222 
1223   PumpTasks();
1224 
1225   // Initiate a merge between B and C.
1226   EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));
1227 
1228   PumpTasks();
1229 
1230   // Expect only two receiving ports to be left after pumping tasks.
1231   EXPECT_TRUE(node0.CanShutdownCleanly(true));
1232   EXPECT_TRUE(node1.CanShutdownCleanly(true));
1233 
1234   // Expect D to have received the message sent on A.
1235   ScopedMessage message;
1236   ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
1237   EXPECT_EQ(0, strcmp("hey", ToString(message)));
1238 
1239   EXPECT_EQ(OK, node0.ClosePort(A));
1240   EXPECT_EQ(OK, node1.ClosePort(D));
1241 
1242   // No more ports should be open.
1243   EXPECT_TRUE(node0.CanShutdownCleanly(false));
1244   EXPECT_TRUE(node1.CanShutdownCleanly(false));
1245 }
1246 
TEST_F(PortsTest,MergePortWithClosedPeer1)1247 TEST_F(PortsTest, MergePortWithClosedPeer1) {
1248   // This tests that the right thing happens when initiating a merge on a port
1249   // whose peer has already been closed.
1250 
1251   NodeName node0_name(0, 1);
1252   TestNodeDelegate node0_delegate(node0_name);
1253   Node node0(node0_name, &node0_delegate);
1254   node_map[0] = &node0;
1255 
1256   NodeName node1_name(1, 1);
1257   TestNodeDelegate node1_delegate(node1_name);
1258   Node node1(node1_name, &node1_delegate);
1259   node_map[1] = &node1;
1260 
1261   // Setup two independent port pairs, A-B on node0 and C-D on node1.
1262   PortRef A, B, C, D;
1263   EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
1264   EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));
1265 
1266   node0_delegate.set_read_messages(false);
1267   node1_delegate.set_save_messages(true);
1268 
1269   // Write a message on A.
1270   EXPECT_EQ(OK, SendStringMessage(&node0, A, "hey"));
1271 
1272   PumpTasks();
1273 
1274   // Close A.
1275   EXPECT_EQ(OK, node0.ClosePort(A));
1276 
1277   // Initiate a merge between B and C.
1278   EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));
1279 
1280   PumpTasks();
1281 
1282   // Expect only one receiving port to be left after pumping tasks.
1283   EXPECT_TRUE(node0.CanShutdownCleanly(false));
1284   EXPECT_TRUE(node1.CanShutdownCleanly(true));
1285 
1286   // Expect D to have received the message sent on A.
1287   ScopedMessage message;
1288   ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
1289   EXPECT_EQ(0, strcmp("hey", ToString(message)));
1290 
1291   EXPECT_EQ(OK, node1.ClosePort(D));
1292 
1293   // No more ports should be open.
1294   EXPECT_TRUE(node0.CanShutdownCleanly(false));
1295   EXPECT_TRUE(node1.CanShutdownCleanly(false));
1296 }
1297 
TEST_F(PortsTest,MergePortWithClosedPeer2)1298 TEST_F(PortsTest, MergePortWithClosedPeer2) {
1299   // This tests that the right thing happens when merging into a port whose peer
1300   // has already been closed.
1301 
1302   NodeName node0_name(0, 1);
1303   TestNodeDelegate node0_delegate(node0_name);
1304   Node node0(node0_name, &node0_delegate);
1305   node_map[0] = &node0;
1306 
1307   NodeName node1_name(1, 1);
1308   TestNodeDelegate node1_delegate(node1_name);
1309   Node node1(node1_name, &node1_delegate);
1310   node_map[1] = &node1;
1311 
1312   // Setup two independent port pairs, A-B on node0 and C-D on node1.
1313   PortRef A, B, C, D;
1314   EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
1315   EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));
1316 
1317   node0_delegate.set_save_messages(true);
1318   node1_delegate.set_read_messages(false);
1319 
1320   // Write a message on D.
1321   EXPECT_EQ(OK, SendStringMessage(&node0, D, "hey"));
1322 
1323   PumpTasks();
1324 
1325   // Close D.
1326   EXPECT_EQ(OK, node1.ClosePort(D));
1327 
1328   // Initiate a merge between B and C.
1329   EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));
1330 
1331   PumpTasks();
1332 
1333   // Expect only one receiving port to be left after pumping tasks.
1334   EXPECT_TRUE(node0.CanShutdownCleanly(true));
1335   EXPECT_TRUE(node1.CanShutdownCleanly(false));
1336 
1337   // Expect A to have received the message sent on D.
1338   ScopedMessage message;
1339   ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
1340   EXPECT_EQ(0, strcmp("hey", ToString(message)));
1341 
1342   EXPECT_EQ(OK, node0.ClosePort(A));
1343 
1344   // No more ports should be open.
1345   EXPECT_TRUE(node0.CanShutdownCleanly(false));
1346   EXPECT_TRUE(node1.CanShutdownCleanly(false));
1347 }
1348 
TEST_F(PortsTest,MergePortsWithClosedPeers)1349 TEST_F(PortsTest, MergePortsWithClosedPeers) {
1350   // This tests that no residual ports are left behind if two ports are merged
1351   // when both of their peers have been closed.
1352 
1353   NodeName node0_name(0, 1);
1354   TestNodeDelegate node0_delegate(node0_name);
1355   Node node0(node0_name, &node0_delegate);
1356   node_map[0] = &node0;
1357 
1358   NodeName node1_name(1, 1);
1359   TestNodeDelegate node1_delegate(node1_name);
1360   Node node1(node1_name, &node1_delegate);
1361   node_map[1] = &node1;
1362 
1363   // Setup two independent port pairs, A-B on node0 and C-D on node1.
1364   PortRef A, B, C, D;
1365   EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
1366   EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));
1367 
1368   node0_delegate.set_save_messages(true);
1369   node1_delegate.set_read_messages(false);
1370 
1371   // Close A and D.
1372   EXPECT_EQ(OK, node0.ClosePort(A));
1373   EXPECT_EQ(OK, node1.ClosePort(D));
1374 
1375   PumpTasks();
1376 
1377   // Initiate a merge between B and C.
1378   EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));
1379 
1380   PumpTasks();
1381 
1382   // Expect everything to have gone away.
1383   EXPECT_TRUE(node0.CanShutdownCleanly(false));
1384   EXPECT_TRUE(node1.CanShutdownCleanly(false));
1385 }
1386 
TEST_F(PortsTest,MergePortsWithMovedPeers)1387 TEST_F(PortsTest, MergePortsWithMovedPeers) {
1388   // This tests that no ports can be merged successfully even if their peers
1389   // are moved around.
1390 
1391   NodeName node0_name(0, 1);
1392   TestNodeDelegate node0_delegate(node0_name);
1393   Node node0(node0_name, &node0_delegate);
1394   node_map[0] = &node0;
1395 
1396   NodeName node1_name(1, 1);
1397   TestNodeDelegate node1_delegate(node1_name);
1398   Node node1(node1_name, &node1_delegate);
1399   node_map[1] = &node1;
1400 
1401   node0_delegate.set_save_messages(true);
1402   node1_delegate.set_read_messages(false);
1403 
1404   // Setup two independent port pairs, A-B on node0 and C-D on node1.
1405   PortRef A, B, C, D;
1406   EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
1407   EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));
1408 
1409   // Set up another pair X-Y for moving ports on node0.
1410   PortRef X, Y;
1411   EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y));
1412 
1413   ScopedMessage message;
1414 
1415   // Move A to new port E.
1416   EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", A));
1417   ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
1418   ASSERT_EQ(1u, message->num_ports());
1419   PortRef E;
1420   ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &E));
1421 
1422   EXPECT_EQ(OK, node0.ClosePort(X));
1423   EXPECT_EQ(OK, node0.ClosePort(Y));
1424 
1425   node0_delegate.set_read_messages(false);
1426 
1427   // Write messages on E and D.
1428   EXPECT_EQ(OK, SendStringMessage(&node0, E, "hey"));
1429   EXPECT_EQ(OK, SendStringMessage(&node1, D, "hi"));
1430 
1431   // Initiate a merge between B and C.
1432   EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));
1433 
1434   node0_delegate.set_read_messages(true);
1435   node1_delegate.set_read_messages(true);
1436   node1_delegate.set_save_messages(true);
1437 
1438   PumpTasks();
1439 
1440   // Expect to receive D's message on E and E's message on D.
1441   ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
1442   EXPECT_EQ(0, strcmp("hi", ToString(message)));
1443   ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
1444   EXPECT_EQ(0, strcmp("hey", ToString(message)));
1445 
1446   // Close E and D.
1447   EXPECT_EQ(OK, node0.ClosePort(E));
1448   EXPECT_EQ(OK, node1.ClosePort(D));
1449 
1450   PumpTasks();
1451 
1452   // Expect everything to have gone away.
1453   EXPECT_TRUE(node0.CanShutdownCleanly(false));
1454   EXPECT_TRUE(node1.CanShutdownCleanly(false));
1455 }
1456 
TEST_F(PortsTest,MergePortsFailsGracefully)1457 TEST_F(PortsTest, MergePortsFailsGracefully) {
1458   // This tests that the system remains in a well-defined state if something
1459   // goes wrong during port merge.
1460 
1461   NodeName node0_name(0, 1);
1462   TestNodeDelegate node0_delegate(node0_name);
1463   Node node0(node0_name, &node0_delegate);
1464   node_map[0] = &node0;
1465 
1466   NodeName node1_name(1, 1);
1467   TestNodeDelegate node1_delegate(node1_name);
1468   Node node1(node1_name, &node1_delegate);
1469   node_map[1] = &node1;
1470 
1471   // Setup two independent port pairs, A-B on node0 and C-D on node1.
1472   PortRef A, B, C, D;
1473   EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
1474   EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));
1475 
1476   PumpTasks();
1477 
1478   // Initiate a merge between B and C.
1479   EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));
1480 
1481   // Move C to a new port E. This is dumb and nobody should do it, but it's
1482   // possible. MergePorts will fail as a result because C won't be in a
1483   // receiving state when the event arrives at node1, so B should be closed.
1484   ScopedMessage message;
1485   PortRef X, Y;
1486   EXPECT_EQ(OK, node1.CreatePortPair(&X, &Y));
1487   node1_delegate.set_save_messages(true);
1488   EXPECT_EQ(OK, SendStringMessageWithPort(&node1, X, "foo", C));
1489   ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
1490   ASSERT_EQ(1u, message->num_ports());
1491   PortRef E;
1492   ASSERT_EQ(OK, node1.GetPort(message->ports()[0], &E));
1493   EXPECT_EQ(OK, node1.ClosePort(X));
1494   EXPECT_EQ(OK, node1.ClosePort(Y));
1495 
1496   // C goes away as a result of normal proxy removal.
1497   PumpTasks();
1498 
1499   EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.GetPort(C.name(), &C));
1500 
1501   // B should have been closed cleanly.
1502   EXPECT_EQ(ERROR_PORT_UNKNOWN, node0.GetPort(B.name(), &B));
1503 
1504   // Close A, D, and E.
1505   EXPECT_EQ(OK, node0.ClosePort(A));
1506   EXPECT_EQ(OK, node1.ClosePort(D));
1507   EXPECT_EQ(OK, node1.ClosePort(E));
1508 
1509   PumpTasks();
1510 
1511   // Expect everything to have gone away.
1512   EXPECT_TRUE(node0.CanShutdownCleanly(false));
1513   EXPECT_TRUE(node1.CanShutdownCleanly(false));
1514 }
1515 
1516 }  // namespace test
1517 }  // namespace ports
1518 }  // namespace edk
1519 }  // namespace mojo
1520