1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include <stdio.h>
6 #include <stdlib.h>
7 #include <string.h>
8
9 #include <map>
10 #include <queue>
11 #include <sstream>
12
13 #include "base/logging.h"
14 #include "base/rand_util.h"
15 #include "mojo/edk/system/ports/event.h"
16 #include "mojo/edk/system/ports/node.h"
17 #include "mojo/edk/system/ports/node_delegate.h"
18 #include "testing/gtest/include/gtest/gtest.h"
19
20 namespace mojo {
21 namespace edk {
22 namespace ports {
23 namespace test {
24
25 namespace {
26
LogMessage(const Message * message)27 void LogMessage(const Message* message) {
28 std::stringstream ports;
29 for (size_t i = 0; i < message->num_ports(); ++i) {
30 if (i > 0)
31 ports << ",";
32 ports << message->ports()[i];
33 }
34 DVLOG(1) << "message: \""
35 << static_cast<const char*>(message->payload_bytes())
36 << "\" ports=[" << ports.str() << "]";
37 }
38
ClosePortsInMessage(Node * node,Message * message)39 void ClosePortsInMessage(Node* node, Message* message) {
40 for (size_t i = 0; i < message->num_ports(); ++i) {
41 PortRef port;
42 ASSERT_EQ(OK, node->GetPort(message->ports()[i], &port));
43 EXPECT_EQ(OK, node->ClosePort(port));
44 }
45 }
46
47 class TestMessage : public Message {
48 public:
NewUserMessage(size_t num_payload_bytes,size_t num_ports)49 static ScopedMessage NewUserMessage(size_t num_payload_bytes,
50 size_t num_ports) {
51 return ScopedMessage(new TestMessage(num_payload_bytes, num_ports));
52 }
53
TestMessage(size_t num_payload_bytes,size_t num_ports)54 TestMessage(size_t num_payload_bytes, size_t num_ports)
55 : Message(num_payload_bytes, num_ports) {
56 start_ = new char[num_header_bytes_ + num_ports_bytes_ + num_payload_bytes];
57 InitializeUserMessageHeader(start_);
58 }
59
TestMessage(size_t num_header_bytes,size_t num_payload_bytes,size_t num_ports_bytes)60 TestMessage(size_t num_header_bytes,
61 size_t num_payload_bytes,
62 size_t num_ports_bytes)
63 : Message(num_header_bytes,
64 num_payload_bytes,
65 num_ports_bytes) {
66 start_ = new char[num_header_bytes + num_payload_bytes + num_ports_bytes];
67 }
68
~TestMessage()69 ~TestMessage() override {
70 delete[] start_;
71 }
72 };
73
74 struct Task {
Taskmojo::edk::ports::test::__anon79fd6c780111::Task75 Task(NodeName node_name, ScopedMessage message)
76 : node_name(node_name),
77 message(std::move(message)),
78 priority(base::RandUint64()) {
79 }
80
81 NodeName node_name;
82 ScopedMessage message;
83 uint64_t priority;
84 };
85
86 struct TaskComparator {
operator ()mojo::edk::ports::test::__anon79fd6c780111::TaskComparator87 bool operator()(const Task* a, const Task* b) {
88 return a->priority < b->priority;
89 }
90 };
91
92 const size_t kMaxNodes = 3;
93
94 std::priority_queue<Task*, std::vector<Task*>, TaskComparator> task_queue;
95 Node* node_map[kMaxNodes];
96
GetNode(const NodeName & name)97 Node* GetNode(const NodeName& name) {
98 return node_map[name.v1];
99 }
100
SetNode(const NodeName & name,Node * node)101 void SetNode(const NodeName& name, Node* node) {
102 node_map[name.v1] = node;
103 }
104
PumpTasks()105 void PumpTasks() {
106 while (!task_queue.empty()) {
107 Task* task = task_queue.top();
108 task_queue.pop();
109
110 Node* node = GetNode(task->node_name);
111 if (node)
112 node->AcceptMessage(std::move(task->message));
113
114 delete task;
115 }
116 }
117
PumpUntilTask(EventType type)118 void PumpUntilTask(EventType type) {
119 while (!task_queue.empty()) {
120 Task* task = task_queue.top();
121
122 const EventHeader* header = GetEventHeader(*task->message);
123 if (header->type == type)
124 return;
125
126 task_queue.pop();
127
128 Node* node = GetNode(task->node_name);
129 if (node)
130 node->AcceptMessage(std::move(task->message));
131
132 delete task;
133 }
134 }
135
DiscardPendingTasks()136 void DiscardPendingTasks() {
137 while (!task_queue.empty()) {
138 Task* task = task_queue.top();
139 task_queue.pop();
140 delete task;
141 }
142 }
143
SendStringMessage(Node * node,const PortRef & port,const std::string & s)144 int SendStringMessage(Node* node, const PortRef& port, const std::string& s) {
145 size_t size = s.size() + 1;
146 ScopedMessage message = TestMessage::NewUserMessage(size, 0);
147 memcpy(message->mutable_payload_bytes(), s.data(), size);
148 return node->SendMessage(port, std::move(message));
149 }
150
SendStringMessageWithPort(Node * node,const PortRef & port,const std::string & s,const PortName & sent_port_name)151 int SendStringMessageWithPort(Node* node,
152 const PortRef& port,
153 const std::string& s,
154 const PortName& sent_port_name) {
155 size_t size = s.size() + 1;
156 ScopedMessage message = TestMessage::NewUserMessage(size, 1);
157 memcpy(message->mutable_payload_bytes(), s.data(), size);
158 message->mutable_ports()[0] = sent_port_name;
159 return node->SendMessage(port, std::move(message));
160 }
161
SendStringMessageWithPort(Node * node,const PortRef & port,const std::string & s,const PortRef & sent_port)162 int SendStringMessageWithPort(Node* node,
163 const PortRef& port,
164 const std::string& s,
165 const PortRef& sent_port) {
166 return SendStringMessageWithPort(node, port, s, sent_port.name());
167 }
168
ToString(const ScopedMessage & message)169 const char* ToString(const ScopedMessage& message) {
170 return static_cast<const char*>(message->payload_bytes());
171 }
172
173 class TestNodeDelegate : public NodeDelegate {
174 public:
TestNodeDelegate(const NodeName & node_name)175 explicit TestNodeDelegate(const NodeName& node_name)
176 : node_name_(node_name),
177 drop_messages_(false),
178 read_messages_(true),
179 save_messages_(false) {
180 }
181
set_drop_messages(bool value)182 void set_drop_messages(bool value) { drop_messages_ = value; }
set_read_messages(bool value)183 void set_read_messages(bool value) { read_messages_ = value; }
set_save_messages(bool value)184 void set_save_messages(bool value) { save_messages_ = value; }
185
GetSavedMessage(ScopedMessage * message)186 bool GetSavedMessage(ScopedMessage* message) {
187 if (saved_messages_.empty()) {
188 message->reset();
189 return false;
190 }
191 *message = std::move(saved_messages_.front());
192 saved_messages_.pop();
193 return true;
194 }
195
GenerateRandomPortName(PortName * port_name)196 void GenerateRandomPortName(PortName* port_name) override {
197 static uint64_t next_port_name = 1;
198 port_name->v1 = next_port_name++;
199 port_name->v2 = 0;
200 }
201
AllocMessage(size_t num_header_bytes,ScopedMessage * message)202 void AllocMessage(size_t num_header_bytes, ScopedMessage* message) override {
203 message->reset(new TestMessage(num_header_bytes, 0, 0));
204 }
205
ForwardMessage(const NodeName & node_name,ScopedMessage message)206 void ForwardMessage(const NodeName& node_name,
207 ScopedMessage message) override {
208 if (drop_messages_) {
209 DVLOG(1) << "Dropping ForwardMessage from node "
210 << node_name_ << " to " << node_name;
211 ClosePortsInMessage(GetNode(node_name), message.get());
212 return;
213 }
214 DVLOG(1) << "ForwardMessage from node "
215 << node_name_ << " to " << node_name;
216 task_queue.push(new Task(node_name, std::move(message)));
217 }
218
BroadcastMessage(ScopedMessage message)219 void BroadcastMessage(ScopedMessage message) override {
220 for (size_t i = 0; i < kMaxNodes; ++i) {
221 Node* node = node_map[i];
222 // Broadcast doesn't deliver to the local node.
223 if (node && node != GetNode(node_name_)) {
224 // NOTE: We only need to support broadcast of events, which have no
225 // payload or ports bytes.
226 ScopedMessage new_message(
227 new TestMessage(message->num_header_bytes(), 0, 0));
228 memcpy(new_message->mutable_header_bytes(), message->header_bytes(),
229 message->num_header_bytes());
230 node->AcceptMessage(std::move(new_message));
231 }
232 }
233 }
234
PortStatusChanged(const PortRef & port)235 void PortStatusChanged(const PortRef& port) override {
236 DVLOG(1) << "PortStatusChanged for " << port.name() << "@" << node_name_;
237 if (!read_messages_)
238 return;
239 Node* node = GetNode(node_name_);
240 for (;;) {
241 ScopedMessage message;
242 int rv = node->GetMessage(port, &message);
243 EXPECT_TRUE(rv == OK || rv == ERROR_PORT_PEER_CLOSED);
244 if (rv == ERROR_PORT_PEER_CLOSED || !message)
245 break;
246 if (save_messages_) {
247 SaveMessage(std::move(message));
248 } else {
249 LogMessage(message.get());
250 for (size_t i = 0; i < message->num_ports(); ++i) {
251 std::stringstream buf;
252 buf << "got port: " << message->ports()[i];
253
254 PortRef received_port;
255 node->GetPort(message->ports()[i], &received_port);
256
257 SendStringMessage(node, received_port, buf.str());
258
259 // Avoid leaking these ports.
260 node->ClosePort(received_port);
261 }
262 }
263 }
264 }
265
266 private:
SaveMessage(ScopedMessage message)267 void SaveMessage(ScopedMessage message) {
268 saved_messages_.emplace(std::move(message));
269 }
270
271 std::queue<ScopedMessage> saved_messages_;
272 NodeName node_name_;
273 bool drop_messages_;
274 bool read_messages_;
275 bool save_messages_;
276 };
277
278 class PortsTest : public testing::Test {
279 public:
SetUp()280 void SetUp() override {
281 DiscardPendingTasks();
282 SetNode(NodeName(0, 1), nullptr);
283 SetNode(NodeName(1, 1), nullptr);
284 SetNode(NodeName(2, 1), nullptr);
285 }
286 };
287
288 } // namespace
289
TEST_F(PortsTest,Basic1)290 TEST_F(PortsTest, Basic1) {
291 NodeName node0_name(0, 1);
292 TestNodeDelegate node0_delegate(node0_name);
293 Node node0(node0_name, &node0_delegate);
294 SetNode(node0_name, &node0);
295
296 NodeName node1_name(1, 1);
297 TestNodeDelegate node1_delegate(node1_name);
298 Node node1(node1_name, &node1_delegate);
299 SetNode(node1_name, &node1);
300
301 // Setup pipe between node0 and node1.
302 PortRef x0, x1;
303 EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
304 EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1));
305 EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name()));
306 EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name()));
307
308 // Transfer a port from node0 to node1.
309 PortRef a0, a1;
310 EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
311 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "hello", a1));
312
313 EXPECT_EQ(OK, node0.ClosePort(a0));
314
315 EXPECT_EQ(OK, node0.ClosePort(x0));
316 EXPECT_EQ(OK, node1.ClosePort(x1));
317
318 PumpTasks();
319
320 EXPECT_TRUE(node0.CanShutdownCleanly(false));
321 EXPECT_TRUE(node1.CanShutdownCleanly(false));
322 }
323
TEST_F(PortsTest,Basic2)324 TEST_F(PortsTest, Basic2) {
325 NodeName node0_name(0, 1);
326 TestNodeDelegate node0_delegate(node0_name);
327 Node node0(node0_name, &node0_delegate);
328 SetNode(node0_name, &node0);
329
330 NodeName node1_name(1, 1);
331 TestNodeDelegate node1_delegate(node1_name);
332 Node node1(node1_name, &node1_delegate);
333 SetNode(node1_name, &node1);
334
335 // Setup pipe between node0 and node1.
336 PortRef x0, x1;
337 EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
338 EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1));
339 EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name()));
340 EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name()));
341
342 PortRef b0, b1;
343 EXPECT_EQ(OK, node0.CreatePortPair(&b0, &b1));
344 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "hello", b1));
345 EXPECT_EQ(OK, SendStringMessage(&node0, b0, "hello again"));
346
347 // This may cause a SendMessage(b1) failure.
348 EXPECT_EQ(OK, node0.ClosePort(b0));
349
350 EXPECT_EQ(OK, node0.ClosePort(x0));
351 EXPECT_EQ(OK, node1.ClosePort(x1));
352
353 PumpTasks();
354
355 EXPECT_TRUE(node0.CanShutdownCleanly(false));
356 EXPECT_TRUE(node1.CanShutdownCleanly(false));
357 }
358
TEST_F(PortsTest,Basic3)359 TEST_F(PortsTest, Basic3) {
360 NodeName node0_name(0, 1);
361 TestNodeDelegate node0_delegate(node0_name);
362 Node node0(node0_name, &node0_delegate);
363 SetNode(node0_name, &node0);
364
365 NodeName node1_name(1, 1);
366 TestNodeDelegate node1_delegate(node1_name);
367 Node node1(node1_name, &node1_delegate);
368 SetNode(node1_name, &node1);
369
370 // Setup pipe between node0 and node1.
371 PortRef x0, x1;
372 EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
373 EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1));
374 EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name()));
375 EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name()));
376
377 // Transfer a port from node0 to node1.
378 PortRef a0, a1;
379 EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
380 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "hello", a1));
381 EXPECT_EQ(OK, SendStringMessage(&node0, a0, "hello again"));
382
383 // Transfer a0 as well.
384 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "foo", a0));
385
386 PortRef b0, b1;
387 EXPECT_EQ(OK, node0.CreatePortPair(&b0, &b1));
388 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "bar", b1));
389 EXPECT_EQ(OK, SendStringMessage(&node0, b0, "baz"));
390
391 // This may cause a SendMessage(b1) failure.
392 EXPECT_EQ(OK, node0.ClosePort(b0));
393
394 EXPECT_EQ(OK, node0.ClosePort(x0));
395 EXPECT_EQ(OK, node1.ClosePort(x1));
396
397 PumpTasks();
398
399 EXPECT_TRUE(node0.CanShutdownCleanly(false));
400 EXPECT_TRUE(node1.CanShutdownCleanly(false));
401 }
402
TEST_F(PortsTest,LostConnectionToNode1)403 TEST_F(PortsTest, LostConnectionToNode1) {
404 NodeName node0_name(0, 1);
405 TestNodeDelegate node0_delegate(node0_name);
406 Node node0(node0_name, &node0_delegate);
407 SetNode(node0_name, &node0);
408
409 NodeName node1_name(1, 1);
410 TestNodeDelegate node1_delegate(node1_name);
411 Node node1(node1_name, &node1_delegate);
412 SetNode(node1_name, &node1);
413
414 // Setup pipe between node0 and node1.
415 PortRef x0, x1;
416 EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
417 EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1));
418 EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name()));
419 EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name()));
420
421 // Transfer port to node1 and simulate a lost connection to node1. Dropping
422 // events from node1 is how we simulate the lost connection.
423
424 node1_delegate.set_drop_messages(true);
425
426 PortRef a0, a1;
427 EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
428 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "foo", a1));
429
430 PumpTasks();
431
432 EXPECT_EQ(OK, node0.LostConnectionToNode(node1_name));
433
434 PumpTasks();
435
436 EXPECT_EQ(OK, node0.ClosePort(a0));
437 EXPECT_EQ(OK, node0.ClosePort(x0));
438 EXPECT_EQ(OK, node1.ClosePort(x1));
439
440 PumpTasks();
441
442 EXPECT_TRUE(node0.CanShutdownCleanly(false));
443 EXPECT_TRUE(node1.CanShutdownCleanly(false));
444 }
445
TEST_F(PortsTest,LostConnectionToNode2)446 TEST_F(PortsTest, LostConnectionToNode2) {
447 NodeName node0_name(0, 1);
448 TestNodeDelegate node0_delegate(node0_name);
449 Node node0(node0_name, &node0_delegate);
450 node_map[0] = &node0;
451
452 NodeName node1_name(1, 1);
453 TestNodeDelegate node1_delegate(node1_name);
454 Node node1(node1_name, &node1_delegate);
455 node_map[1] = &node1;
456
457 // Setup pipe between node0 and node1.
458 PortRef x0, x1;
459 EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
460 EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1));
461 EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name()));
462 EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name()));
463
464 node1_delegate.set_read_messages(false);
465
466 PortRef a0, a1;
467 EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
468 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "take a1", a1));
469
470 PumpTasks();
471
472 node1_delegate.set_drop_messages(true);
473
474 EXPECT_EQ(OK, node0.LostConnectionToNode(node1_name));
475
476 PumpTasks();
477
478 ScopedMessage message;
479 EXPECT_EQ(ERROR_PORT_PEER_CLOSED, node0.GetMessage(a0, &message));
480 EXPECT_FALSE(message);
481
482 EXPECT_EQ(OK, node0.ClosePort(a0));
483
484 EXPECT_EQ(OK, node0.ClosePort(x0));
485
486 EXPECT_EQ(OK, node1.GetMessage(x1, &message));
487 EXPECT_TRUE(message);
488 ClosePortsInMessage(&node1, message.get());
489
490 EXPECT_EQ(OK, node1.ClosePort(x1));
491
492 PumpTasks();
493
494 EXPECT_TRUE(node0.CanShutdownCleanly(false));
495 EXPECT_TRUE(node1.CanShutdownCleanly(false));
496 }
497
TEST_F(PortsTest,LostConnectionToNodeWithSecondaryProxy)498 TEST_F(PortsTest, LostConnectionToNodeWithSecondaryProxy) {
499 // Tests that a proxy gets cleaned up when its indirect peer lives on a lost
500 // node.
501
502 NodeName node0_name(0, 1);
503 TestNodeDelegate node0_delegate(node0_name);
504 Node node0(node0_name, &node0_delegate);
505 node_map[0] = &node0;
506
507 NodeName node1_name(1, 1);
508 TestNodeDelegate node1_delegate(node1_name);
509 Node node1(node1_name, &node1_delegate);
510 node_map[1] = &node1;
511
512 NodeName node2_name(2, 1);
513 TestNodeDelegate node2_delegate(node2_name);
514 Node node2(node2_name, &node2_delegate);
515 node_map[2] = &node2;
516
517 node1_delegate.set_save_messages(true);
518
519 // Create A-B spanning nodes 0 and 1 and C-D spanning 1 and 2.
520 PortRef A, B, C, D;
521 EXPECT_EQ(OK, node0.CreateUninitializedPort(&A));
522 EXPECT_EQ(OK, node1.CreateUninitializedPort(&B));
523 EXPECT_EQ(OK, node0.InitializePort(A, node1_name, B.name()));
524 EXPECT_EQ(OK, node1.InitializePort(B, node0_name, A.name()));
525 EXPECT_EQ(OK, node1.CreateUninitializedPort(&C));
526 EXPECT_EQ(OK, node2.CreateUninitializedPort(&D));
527 EXPECT_EQ(OK, node1.InitializePort(C, node2_name, D.name()));
528 EXPECT_EQ(OK, node2.InitializePort(D, node1_name, C.name()));
529
530 // Create E-F and send F over A to node 1.
531 PortRef E, F;
532 EXPECT_EQ(OK, node0.CreatePortPair(&E, &F));
533 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, ".", F));
534
535 PumpTasks();
536
537 ScopedMessage message;
538 ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
539 ASSERT_EQ(1u, message->num_ports());
540
541 EXPECT_EQ(OK, node1.GetPort(message->ports()[0], &F));
542
543 // Send F over C to node 2 and then simulate node 2 loss from node 1. Node 1
544 // will trivially become aware of the loss, and this test verifies that the
545 // port A on node 0 will eventually also become aware of it.
546
547 EXPECT_EQ(OK, SendStringMessageWithPort(&node1, C, ".", F));
548
549 node_map[2] = nullptr;
550 EXPECT_EQ(OK, node1.LostConnectionToNode(node2_name));
551
552 PumpTasks();
553
554 // Port F should be gone.
555 EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.GetPort(F.name(), &F));
556
557 // Port E should have detected peer closure despite the fact that there is
558 // no longer a continuous route from F to E over which the event could travel.
559 PortStatus status;
560 EXPECT_EQ(OK, node0.GetStatus(E, &status));
561 EXPECT_TRUE(status.peer_closed);
562
563 EXPECT_EQ(OK, node0.ClosePort(A));
564 EXPECT_EQ(OK, node1.ClosePort(B));
565 EXPECT_EQ(OK, node1.ClosePort(C));
566 EXPECT_EQ(OK, node0.ClosePort(E));
567
568 EXPECT_TRUE(node0.CanShutdownCleanly(false));
569 EXPECT_TRUE(node1.CanShutdownCleanly(false));
570 }
571
TEST_F(PortsTest,LostConnectionToNodeWithLocalProxy)572 TEST_F(PortsTest, LostConnectionToNodeWithLocalProxy) {
573 // Tests that a proxy gets cleaned up when its direct peer lives on a lost
574 // node and it's predecessor lives on the same node.
575
576 NodeName node0_name(0, 1);
577 TestNodeDelegate node0_delegate(node0_name);
578 Node node0(node0_name, &node0_delegate);
579 node_map[0] = &node0;
580
581 NodeName node1_name(1, 1);
582 TestNodeDelegate node1_delegate(node1_name);
583 Node node1(node1_name, &node1_delegate);
584 node_map[1] = &node1;
585
586 node1_delegate.set_save_messages(true);
587
588 // Create A-B spanning nodes 0 and 1.
589 PortRef A, B;
590 EXPECT_EQ(OK, node0.CreateUninitializedPort(&A));
591 EXPECT_EQ(OK, node1.CreateUninitializedPort(&B));
592 EXPECT_EQ(OK, node0.InitializePort(A, node1_name, B.name()));
593 EXPECT_EQ(OK, node1.InitializePort(B, node0_name, A.name()));
594
595 // Create C-D and send D over A to node 1.
596 PortRef C, D;
597 EXPECT_EQ(OK, node0.CreatePortPair(&C, &D));
598 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, ".", D));
599
600 // Pump tasks until the start of port collapse for port D, which should become
601 // a proxy.
602 PumpUntilTask(EventType::kObserveProxy);
603
604 ScopedMessage message;
605 ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
606 ASSERT_EQ(1u, message->num_ports());
607
608 PortRef E;
609 EXPECT_EQ(OK, node1.GetPort(message->ports()[0], &E));
610
611 EXPECT_EQ(OK, node0.LostConnectionToNode(node1_name));
612 PumpTasks();
613
614 // Port C should have detected peer closure.
615 PortStatus status;
616 EXPECT_EQ(OK, node0.GetStatus(C, &status));
617 EXPECT_TRUE(status.peer_closed);
618
619 EXPECT_EQ(OK, node0.ClosePort(A));
620 EXPECT_EQ(OK, node1.ClosePort(B));
621 EXPECT_EQ(OK, node0.ClosePort(C));
622 EXPECT_EQ(OK, node1.ClosePort(E));
623
624 EXPECT_TRUE(node0.CanShutdownCleanly(false));
625 EXPECT_TRUE(node1.CanShutdownCleanly(false));
626 }
627
TEST_F(PortsTest,GetMessage1)628 TEST_F(PortsTest, GetMessage1) {
629 NodeName node0_name(0, 1);
630 TestNodeDelegate node0_delegate(node0_name);
631 Node node0(node0_name, &node0_delegate);
632 node_map[0] = &node0;
633
634 PortRef a0, a1;
635 EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
636
637 ScopedMessage message;
638 EXPECT_EQ(OK, node0.GetMessage(a0, &message));
639 EXPECT_FALSE(message);
640
641 EXPECT_EQ(OK, node0.ClosePort(a1));
642
643 EXPECT_EQ(OK, node0.GetMessage(a0, &message));
644 EXPECT_FALSE(message);
645
646 PumpTasks();
647
648 EXPECT_EQ(ERROR_PORT_PEER_CLOSED, node0.GetMessage(a0, &message));
649 EXPECT_FALSE(message);
650
651 EXPECT_EQ(OK, node0.ClosePort(a0));
652
653 EXPECT_TRUE(node0.CanShutdownCleanly(false));
654 }
655
TEST_F(PortsTest,GetMessage2)656 TEST_F(PortsTest, GetMessage2) {
657 NodeName node0_name(0, 1);
658 TestNodeDelegate node0_delegate(node0_name);
659 Node node0(node0_name, &node0_delegate);
660 node_map[0] = &node0;
661
662 node0_delegate.set_read_messages(false);
663
664 PortRef a0, a1;
665 EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
666
667 EXPECT_EQ(OK, SendStringMessage(&node0, a1, "1"));
668
669 ScopedMessage message;
670 EXPECT_EQ(OK, node0.GetMessage(a0, &message));
671
672 ASSERT_TRUE(message);
673 EXPECT_EQ(0, strcmp("1", ToString(message)));
674
675 EXPECT_EQ(OK, node0.ClosePort(a0));
676 EXPECT_EQ(OK, node0.ClosePort(a1));
677
678 EXPECT_TRUE(node0.CanShutdownCleanly(false));
679 }
680
TEST_F(PortsTest,GetMessage3)681 TEST_F(PortsTest, GetMessage3) {
682 NodeName node0_name(0, 1);
683 TestNodeDelegate node0_delegate(node0_name);
684 Node node0(node0_name, &node0_delegate);
685 node_map[0] = &node0;
686
687 node0_delegate.set_read_messages(false);
688
689 PortRef a0, a1;
690 EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
691
692 const char* kStrings[] = {
693 "1",
694 "2",
695 "3"
696 };
697
698 for (size_t i = 0; i < sizeof(kStrings)/sizeof(kStrings[0]); ++i)
699 EXPECT_EQ(OK, SendStringMessage(&node0, a1, kStrings[i]));
700
701 ScopedMessage message;
702 for (size_t i = 0; i < sizeof(kStrings)/sizeof(kStrings[0]); ++i) {
703 EXPECT_EQ(OK, node0.GetMessage(a0, &message));
704 ASSERT_TRUE(message);
705 EXPECT_EQ(0, strcmp(kStrings[i], ToString(message)));
706 DVLOG(1) << "got " << kStrings[i];
707 }
708
709 EXPECT_EQ(OK, node0.ClosePort(a0));
710 EXPECT_EQ(OK, node0.ClosePort(a1));
711
712 EXPECT_TRUE(node0.CanShutdownCleanly(false));
713 }
714
TEST_F(PortsTest,Delegation1)715 TEST_F(PortsTest, Delegation1) {
716 NodeName node0_name(0, 1);
717 TestNodeDelegate node0_delegate(node0_name);
718 Node node0(node0_name, &node0_delegate);
719 SetNode(node0_name, &node0);
720
721 NodeName node1_name(1, 1);
722 TestNodeDelegate node1_delegate(node1_name);
723 Node node1(node1_name, &node1_delegate);
724 node_map[1] = &node1;
725
726 node0_delegate.set_save_messages(true);
727 node1_delegate.set_save_messages(true);
728
729 // Setup pipe between node0 and node1.
730 PortRef x0, x1;
731 EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
732 EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1));
733 EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name()));
734 EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name()));
735
736 // In this test, we send a message to a port that has been moved.
737
738 PortRef a0, a1;
739 EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
740
741 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "a1", a1));
742
743 PumpTasks();
744
745 ScopedMessage message;
746 ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
747
748 ASSERT_EQ(1u, message->num_ports());
749
750 // This is "a1" from the point of view of node1.
751 PortName a2_name = message->ports()[0];
752
753 EXPECT_EQ(OK, SendStringMessageWithPort(&node1, x1, "a2", a2_name));
754
755 PumpTasks();
756
757 EXPECT_EQ(OK, SendStringMessage(&node0, a0, "hello"));
758
759 PumpTasks();
760
761 ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
762
763 ASSERT_EQ(1u, message->num_ports());
764
765 // This is "a2" from the point of view of node1.
766 PortName a3_name = message->ports()[0];
767
768 PortRef a3;
769 EXPECT_EQ(OK, node0.GetPort(a3_name, &a3));
770
771 EXPECT_EQ(0, strcmp("a2", ToString(message)));
772
773 ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
774
775 EXPECT_EQ(0u, message->num_ports());
776 EXPECT_EQ(0, strcmp("hello", ToString(message)));
777
778 EXPECT_EQ(OK, node0.ClosePort(a0));
779 EXPECT_EQ(OK, node0.ClosePort(a3));
780
781 EXPECT_EQ(OK, node0.ClosePort(x0));
782 EXPECT_EQ(OK, node1.ClosePort(x1));
783
784 EXPECT_TRUE(node0.CanShutdownCleanly(false));
785 EXPECT_TRUE(node1.CanShutdownCleanly(false));
786 }
787
TEST_F(PortsTest,Delegation2)788 TEST_F(PortsTest, Delegation2) {
789 NodeName node0_name(0, 1);
790 TestNodeDelegate node0_delegate(node0_name);
791 Node node0(node0_name, &node0_delegate);
792 SetNode(node0_name, &node0);
793
794 NodeName node1_name(1, 1);
795 TestNodeDelegate node1_delegate(node1_name);
796 Node node1(node1_name, &node1_delegate);
797 node_map[1] = &node1;
798
799 node0_delegate.set_save_messages(true);
800 node1_delegate.set_save_messages(true);
801
802 for (int i = 0; i < 10; ++i) {
803 // Setup pipe a<->b between node0 and node1.
804 PortRef A, B;
805 EXPECT_EQ(OK, node0.CreateUninitializedPort(&A));
806 EXPECT_EQ(OK, node1.CreateUninitializedPort(&B));
807 EXPECT_EQ(OK, node0.InitializePort(A, node1_name, B.name()));
808 EXPECT_EQ(OK, node1.InitializePort(B, node0_name, A.name()));
809
810 PortRef C, D;
811 EXPECT_EQ(OK, node0.CreatePortPair(&C, &D));
812
813 PortRef E, F;
814 EXPECT_EQ(OK, node0.CreatePortPair(&E, &F));
815
816 // Pass D over A to B.
817 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, "1", D));
818
819 // Pass F over C to D.
820 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, C, "1", F));
821
822 // This message should find its way to node1.
823 EXPECT_EQ(OK, SendStringMessage(&node0, E, "hello"));
824
825 PumpTasks();
826
827 EXPECT_EQ(OK, node0.ClosePort(C));
828 EXPECT_EQ(OK, node0.ClosePort(E));
829
830 EXPECT_EQ(OK, node0.ClosePort(A));
831 EXPECT_EQ(OK, node1.ClosePort(B));
832
833 for (;;) {
834 ScopedMessage message;
835 if (node1_delegate.GetSavedMessage(&message)) {
836 ClosePortsInMessage(&node1, message.get());
837 if (strcmp("hello", ToString(message)) == 0)
838 break;
839 } else {
840 ASSERT_TRUE(false); // "hello" message not delivered!
841 break;
842 }
843 }
844
845 PumpTasks(); // Because ClosePort may have generated tasks.
846 }
847
848 EXPECT_TRUE(node0.CanShutdownCleanly(false));
849 EXPECT_TRUE(node1.CanShutdownCleanly(false));
850 }
851
TEST_F(PortsTest,SendUninitialized)852 TEST_F(PortsTest, SendUninitialized) {
853 NodeName node0_name(0, 1);
854 TestNodeDelegate node0_delegate(node0_name);
855 Node node0(node0_name, &node0_delegate);
856 node_map[0] = &node0;
857
858 PortRef x0;
859 EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
860 EXPECT_EQ(ERROR_PORT_STATE_UNEXPECTED,
861 SendStringMessage(&node0, x0, "oops"));
862 EXPECT_EQ(OK, node0.ClosePort(x0));
863 EXPECT_TRUE(node0.CanShutdownCleanly(false));
864 }
865
TEST_F(PortsTest,SendFailure)866 TEST_F(PortsTest, SendFailure) {
867 NodeName node0_name(0, 1);
868 TestNodeDelegate node0_delegate(node0_name);
869 Node node0(node0_name, &node0_delegate);
870 node_map[0] = &node0;
871
872 node0_delegate.set_save_messages(true);
873
874 PortRef A, B;
875 EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
876
877 // Try to send A over itself.
878
879 EXPECT_EQ(ERROR_PORT_CANNOT_SEND_SELF,
880 SendStringMessageWithPort(&node0, A, "oops", A));
881
882 // Try to send B over A.
883
884 EXPECT_EQ(ERROR_PORT_CANNOT_SEND_PEER,
885 SendStringMessageWithPort(&node0, A, "nope", B));
886
887 // B should be closed immediately.
888 EXPECT_EQ(ERROR_PORT_UNKNOWN, node0.GetPort(B.name(), &B));
889
890 PumpTasks();
891
892 // There should have been no messages accepted.
893 ScopedMessage message;
894 EXPECT_FALSE(node0_delegate.GetSavedMessage(&message));
895
896 EXPECT_EQ(OK, node0.ClosePort(A));
897
898 PumpTasks();
899
900 EXPECT_TRUE(node0.CanShutdownCleanly(false));
901 }
902
TEST_F(PortsTest,DontLeakUnreceivedPorts)903 TEST_F(PortsTest, DontLeakUnreceivedPorts) {
904 NodeName node0_name(0, 1);
905 TestNodeDelegate node0_delegate(node0_name);
906 Node node0(node0_name, &node0_delegate);
907 node_map[0] = &node0;
908
909 node0_delegate.set_read_messages(false);
910
911 PortRef A, B;
912 EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
913
914 PortRef C, D;
915 EXPECT_EQ(OK, node0.CreatePortPair(&C, &D));
916
917 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, "foo", D));
918
919 PumpTasks();
920
921 EXPECT_EQ(OK, node0.ClosePort(C));
922
923 EXPECT_EQ(OK, node0.ClosePort(A));
924 EXPECT_EQ(OK, node0.ClosePort(B));
925
926 PumpTasks();
927
928 EXPECT_TRUE(node0.CanShutdownCleanly(false));
929 }
930
TEST_F(PortsTest,AllowShutdownWithLocalPortsOpen)931 TEST_F(PortsTest, AllowShutdownWithLocalPortsOpen) {
932 NodeName node0_name(0, 1);
933 TestNodeDelegate node0_delegate(node0_name);
934 Node node0(node0_name, &node0_delegate);
935 node_map[0] = &node0;
936
937 node0_delegate.set_save_messages(true);
938
939 PortRef A, B;
940 EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
941
942 PortRef C, D;
943 EXPECT_EQ(OK, node0.CreatePortPair(&C, &D));
944
945 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, "foo", D));
946
947 ScopedMessage message;
948 EXPECT_TRUE(node0_delegate.GetSavedMessage(&message));
949 ASSERT_EQ(1u, message->num_ports());
950
951 PortRef E;
952 ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &E));
953
954 EXPECT_TRUE(node0.CanShutdownCleanly(true));
955
956 PumpTasks();
957
958 EXPECT_TRUE(node0.CanShutdownCleanly(true));
959 EXPECT_FALSE(node0.CanShutdownCleanly(false));
960
961 EXPECT_EQ(OK, node0.ClosePort(A));
962 EXPECT_EQ(OK, node0.ClosePort(B));
963 EXPECT_EQ(OK, node0.ClosePort(C));
964 EXPECT_EQ(OK, node0.ClosePort(E));
965
966 PumpTasks();
967
968 EXPECT_TRUE(node0.CanShutdownCleanly(false));
969 }
970
TEST_F(PortsTest,ProxyCollapse1)971 TEST_F(PortsTest, ProxyCollapse1) {
972 NodeName node0_name(0, 1);
973 TestNodeDelegate node0_delegate(node0_name);
974 Node node0(node0_name, &node0_delegate);
975 node_map[0] = &node0;
976
977 node0_delegate.set_save_messages(true);
978
979 PortRef A, B;
980 EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
981
982 PortRef X, Y;
983 EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y));
984
985 ScopedMessage message;
986
987 // Send B and receive it as C.
988 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", B));
989 ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
990 ASSERT_EQ(1u, message->num_ports());
991 PortRef C;
992 ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &C));
993
994 // Send C and receive it as D.
995 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", C));
996 ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
997 ASSERT_EQ(1u, message->num_ports());
998 PortRef D;
999 ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &D));
1000
1001 // Send D and receive it as E.
1002 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", D));
1003 ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
1004 ASSERT_EQ(1u, message->num_ports());
1005 PortRef E;
1006 ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &E));
1007
1008 EXPECT_EQ(OK, node0.ClosePort(X));
1009 EXPECT_EQ(OK, node0.ClosePort(Y));
1010
1011 EXPECT_EQ(OK, node0.ClosePort(A));
1012 EXPECT_EQ(OK, node0.ClosePort(E));
1013
1014 PumpTasks();
1015
1016 EXPECT_TRUE(node0.CanShutdownCleanly(false));
1017 }
1018
TEST_F(PortsTest,ProxyCollapse2)1019 TEST_F(PortsTest, ProxyCollapse2) {
1020 NodeName node0_name(0, 1);
1021 TestNodeDelegate node0_delegate(node0_name);
1022 Node node0(node0_name, &node0_delegate);
1023 node_map[0] = &node0;
1024
1025 node0_delegate.set_save_messages(true);
1026
1027 PortRef A, B;
1028 EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
1029
1030 PortRef X, Y;
1031 EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y));
1032
1033 ScopedMessage message;
1034
1035 // Send B and receive it as C.
1036 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", B));
1037 ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
1038 ASSERT_EQ(1u, message->num_ports());
1039 PortRef C;
1040 ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &C));
1041
1042 // Send A and receive it as D.
1043 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", A));
1044 ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
1045 ASSERT_EQ(1u, message->num_ports());
1046 PortRef D;
1047 ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &D));
1048
1049 // At this point we have a scenario with:
1050 //
1051 // D -> [B] -> C -> [A]
1052 //
1053 // Ensure that the proxies can collapse.
1054
1055 EXPECT_EQ(OK, node0.ClosePort(X));
1056 EXPECT_EQ(OK, node0.ClosePort(Y));
1057
1058 EXPECT_EQ(OK, node0.ClosePort(C));
1059 EXPECT_EQ(OK, node0.ClosePort(D));
1060
1061 PumpTasks();
1062
1063 EXPECT_TRUE(node0.CanShutdownCleanly(false));
1064 }
1065
TEST_F(PortsTest,SendWithClosedPeer)1066 TEST_F(PortsTest, SendWithClosedPeer) {
1067 // This tests that if a port is sent when its peer is already known to be
1068 // closed, the newly created port will be aware of that peer closure, and the
1069 // proxy will eventually collapse.
1070
1071 NodeName node0_name(0, 1);
1072 TestNodeDelegate node0_delegate(node0_name);
1073 Node node0(node0_name, &node0_delegate);
1074 node_map[0] = &node0;
1075
1076 node0_delegate.set_read_messages(false);
1077
1078 // Send a message from A to B, then close A.
1079 PortRef A, B;
1080 EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
1081 EXPECT_EQ(OK, SendStringMessage(&node0, A, "hey"));
1082 EXPECT_EQ(OK, node0.ClosePort(A));
1083
1084 PumpTasks();
1085
1086 // Now send B over X-Y as new port C.
1087 PortRef X, Y;
1088 EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y));
1089
1090 node0_delegate.set_read_messages(true);
1091 node0_delegate.set_save_messages(true);
1092 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", B));
1093
1094 EXPECT_EQ(OK, node0.ClosePort(X));
1095 EXPECT_EQ(OK, node0.ClosePort(Y));
1096
1097 ScopedMessage message;
1098 ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
1099 ASSERT_EQ(1u, message->num_ports());
1100
1101 PortRef C;
1102 ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &C));
1103
1104 PumpTasks();
1105
1106 // C should receive the message originally sent to B, and it should also be
1107 // aware of A's closure.
1108
1109 ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
1110 EXPECT_EQ(0, strcmp("hey", ToString(message)));
1111
1112 PortStatus status;
1113 EXPECT_EQ(OK, node0.GetStatus(C, &status));
1114 EXPECT_FALSE(status.receiving_messages);
1115 EXPECT_FALSE(status.has_messages);
1116 EXPECT_TRUE(status.peer_closed);
1117
1118 node0.ClosePort(C);
1119
1120 EXPECT_TRUE(node0.CanShutdownCleanly(false));
1121 }
1122
TEST_F(PortsTest,SendWithClosedPeerSent)1123 TEST_F(PortsTest, SendWithClosedPeerSent) {
1124 // This tests that if a port is closed while some number of proxies are still
1125 // routing messages (directly or indirectly) to it, that the peer port is
1126 // eventually notified of the closure, and the dead-end proxies will
1127 // eventually be removed.
1128
1129 NodeName node0_name(0, 1);
1130 TestNodeDelegate node0_delegate(node0_name);
1131 Node node0(node0_name, &node0_delegate);
1132 node_map[0] = &node0;
1133
1134 node0_delegate.set_save_messages(true);
1135
1136 PortRef X, Y;
1137 EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y));
1138
1139 PortRef A, B;
1140 EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
1141
1142 ScopedMessage message;
1143
1144 // Send A as new port C.
1145 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", A));
1146 ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
1147 ASSERT_EQ(1u, message->num_ports());
1148 PortRef C;
1149 ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &C));
1150
1151 // Send C as new port D.
1152 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", C));
1153 ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
1154 ASSERT_EQ(1u, message->num_ports());
1155 PortRef D;
1156 ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &D));
1157
1158 node0_delegate.set_read_messages(false);
1159
1160 // Send a message to B through D, then close D.
1161 EXPECT_EQ(OK, SendStringMessage(&node0, D, "hey"));
1162 EXPECT_EQ(OK, node0.ClosePort(D));
1163
1164 PumpTasks();
1165
1166 // Now send B as new port E.
1167
1168 node0_delegate.set_read_messages(true);
1169 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", B));
1170
1171 EXPECT_EQ(OK, node0.ClosePort(X));
1172 EXPECT_EQ(OK, node0.ClosePort(Y));
1173
1174 ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
1175 ASSERT_EQ(1u, message->num_ports());
1176
1177 PortRef E;
1178 ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &E));
1179
1180 PumpTasks();
1181
1182 // E should receive the message originally sent to B, and it should also be
1183 // aware of D's closure.
1184
1185 ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
1186 EXPECT_EQ(0, strcmp("hey", ToString(message)));
1187
1188 PortStatus status;
1189 EXPECT_EQ(OK, node0.GetStatus(E, &status));
1190 EXPECT_FALSE(status.receiving_messages);
1191 EXPECT_FALSE(status.has_messages);
1192 EXPECT_TRUE(status.peer_closed);
1193
1194 node0.ClosePort(E);
1195
1196 PumpTasks();
1197
1198 EXPECT_TRUE(node0.CanShutdownCleanly(false));
1199 }
1200
TEST_F(PortsTest,MergePorts)1201 TEST_F(PortsTest, MergePorts) {
1202 NodeName node0_name(0, 1);
1203 TestNodeDelegate node0_delegate(node0_name);
1204 Node node0(node0_name, &node0_delegate);
1205 node_map[0] = &node0;
1206
1207 NodeName node1_name(1, 1);
1208 TestNodeDelegate node1_delegate(node1_name);
1209 Node node1(node1_name, &node1_delegate);
1210 node_map[1] = &node1;
1211
1212 // Setup two independent port pairs, A-B on node0 and C-D on node1.
1213 PortRef A, B, C, D;
1214 EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
1215 EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));
1216
1217 node0_delegate.set_read_messages(false);
1218 node1_delegate.set_save_messages(true);
1219
1220 // Write a message on A.
1221 EXPECT_EQ(OK, SendStringMessage(&node0, A, "hey"));
1222
1223 PumpTasks();
1224
1225 // Initiate a merge between B and C.
1226 EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));
1227
1228 PumpTasks();
1229
1230 // Expect only two receiving ports to be left after pumping tasks.
1231 EXPECT_TRUE(node0.CanShutdownCleanly(true));
1232 EXPECT_TRUE(node1.CanShutdownCleanly(true));
1233
1234 // Expect D to have received the message sent on A.
1235 ScopedMessage message;
1236 ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
1237 EXPECT_EQ(0, strcmp("hey", ToString(message)));
1238
1239 EXPECT_EQ(OK, node0.ClosePort(A));
1240 EXPECT_EQ(OK, node1.ClosePort(D));
1241
1242 // No more ports should be open.
1243 EXPECT_TRUE(node0.CanShutdownCleanly(false));
1244 EXPECT_TRUE(node1.CanShutdownCleanly(false));
1245 }
1246
TEST_F(PortsTest,MergePortWithClosedPeer1)1247 TEST_F(PortsTest, MergePortWithClosedPeer1) {
1248 // This tests that the right thing happens when initiating a merge on a port
1249 // whose peer has already been closed.
1250
1251 NodeName node0_name(0, 1);
1252 TestNodeDelegate node0_delegate(node0_name);
1253 Node node0(node0_name, &node0_delegate);
1254 node_map[0] = &node0;
1255
1256 NodeName node1_name(1, 1);
1257 TestNodeDelegate node1_delegate(node1_name);
1258 Node node1(node1_name, &node1_delegate);
1259 node_map[1] = &node1;
1260
1261 // Setup two independent port pairs, A-B on node0 and C-D on node1.
1262 PortRef A, B, C, D;
1263 EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
1264 EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));
1265
1266 node0_delegate.set_read_messages(false);
1267 node1_delegate.set_save_messages(true);
1268
1269 // Write a message on A.
1270 EXPECT_EQ(OK, SendStringMessage(&node0, A, "hey"));
1271
1272 PumpTasks();
1273
1274 // Close A.
1275 EXPECT_EQ(OK, node0.ClosePort(A));
1276
1277 // Initiate a merge between B and C.
1278 EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));
1279
1280 PumpTasks();
1281
1282 // Expect only one receiving port to be left after pumping tasks.
1283 EXPECT_TRUE(node0.CanShutdownCleanly(false));
1284 EXPECT_TRUE(node1.CanShutdownCleanly(true));
1285
1286 // Expect D to have received the message sent on A.
1287 ScopedMessage message;
1288 ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
1289 EXPECT_EQ(0, strcmp("hey", ToString(message)));
1290
1291 EXPECT_EQ(OK, node1.ClosePort(D));
1292
1293 // No more ports should be open.
1294 EXPECT_TRUE(node0.CanShutdownCleanly(false));
1295 EXPECT_TRUE(node1.CanShutdownCleanly(false));
1296 }
1297
TEST_F(PortsTest,MergePortWithClosedPeer2)1298 TEST_F(PortsTest, MergePortWithClosedPeer2) {
1299 // This tests that the right thing happens when merging into a port whose peer
1300 // has already been closed.
1301
1302 NodeName node0_name(0, 1);
1303 TestNodeDelegate node0_delegate(node0_name);
1304 Node node0(node0_name, &node0_delegate);
1305 node_map[0] = &node0;
1306
1307 NodeName node1_name(1, 1);
1308 TestNodeDelegate node1_delegate(node1_name);
1309 Node node1(node1_name, &node1_delegate);
1310 node_map[1] = &node1;
1311
1312 // Setup two independent port pairs, A-B on node0 and C-D on node1.
1313 PortRef A, B, C, D;
1314 EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
1315 EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));
1316
1317 node0_delegate.set_save_messages(true);
1318 node1_delegate.set_read_messages(false);
1319
1320 // Write a message on D.
1321 EXPECT_EQ(OK, SendStringMessage(&node0, D, "hey"));
1322
1323 PumpTasks();
1324
1325 // Close D.
1326 EXPECT_EQ(OK, node1.ClosePort(D));
1327
1328 // Initiate a merge between B and C.
1329 EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));
1330
1331 PumpTasks();
1332
1333 // Expect only one receiving port to be left after pumping tasks.
1334 EXPECT_TRUE(node0.CanShutdownCleanly(true));
1335 EXPECT_TRUE(node1.CanShutdownCleanly(false));
1336
1337 // Expect A to have received the message sent on D.
1338 ScopedMessage message;
1339 ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
1340 EXPECT_EQ(0, strcmp("hey", ToString(message)));
1341
1342 EXPECT_EQ(OK, node0.ClosePort(A));
1343
1344 // No more ports should be open.
1345 EXPECT_TRUE(node0.CanShutdownCleanly(false));
1346 EXPECT_TRUE(node1.CanShutdownCleanly(false));
1347 }
1348
TEST_F(PortsTest,MergePortsWithClosedPeers)1349 TEST_F(PortsTest, MergePortsWithClosedPeers) {
1350 // This tests that no residual ports are left behind if two ports are merged
1351 // when both of their peers have been closed.
1352
1353 NodeName node0_name(0, 1);
1354 TestNodeDelegate node0_delegate(node0_name);
1355 Node node0(node0_name, &node0_delegate);
1356 node_map[0] = &node0;
1357
1358 NodeName node1_name(1, 1);
1359 TestNodeDelegate node1_delegate(node1_name);
1360 Node node1(node1_name, &node1_delegate);
1361 node_map[1] = &node1;
1362
1363 // Setup two independent port pairs, A-B on node0 and C-D on node1.
1364 PortRef A, B, C, D;
1365 EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
1366 EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));
1367
1368 node0_delegate.set_save_messages(true);
1369 node1_delegate.set_read_messages(false);
1370
1371 // Close A and D.
1372 EXPECT_EQ(OK, node0.ClosePort(A));
1373 EXPECT_EQ(OK, node1.ClosePort(D));
1374
1375 PumpTasks();
1376
1377 // Initiate a merge between B and C.
1378 EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));
1379
1380 PumpTasks();
1381
1382 // Expect everything to have gone away.
1383 EXPECT_TRUE(node0.CanShutdownCleanly(false));
1384 EXPECT_TRUE(node1.CanShutdownCleanly(false));
1385 }
1386
TEST_F(PortsTest,MergePortsWithMovedPeers)1387 TEST_F(PortsTest, MergePortsWithMovedPeers) {
1388 // This tests that no ports can be merged successfully even if their peers
1389 // are moved around.
1390
1391 NodeName node0_name(0, 1);
1392 TestNodeDelegate node0_delegate(node0_name);
1393 Node node0(node0_name, &node0_delegate);
1394 node_map[0] = &node0;
1395
1396 NodeName node1_name(1, 1);
1397 TestNodeDelegate node1_delegate(node1_name);
1398 Node node1(node1_name, &node1_delegate);
1399 node_map[1] = &node1;
1400
1401 node0_delegate.set_save_messages(true);
1402 node1_delegate.set_read_messages(false);
1403
1404 // Setup two independent port pairs, A-B on node0 and C-D on node1.
1405 PortRef A, B, C, D;
1406 EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
1407 EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));
1408
1409 // Set up another pair X-Y for moving ports on node0.
1410 PortRef X, Y;
1411 EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y));
1412
1413 ScopedMessage message;
1414
1415 // Move A to new port E.
1416 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", A));
1417 ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
1418 ASSERT_EQ(1u, message->num_ports());
1419 PortRef E;
1420 ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &E));
1421
1422 EXPECT_EQ(OK, node0.ClosePort(X));
1423 EXPECT_EQ(OK, node0.ClosePort(Y));
1424
1425 node0_delegate.set_read_messages(false);
1426
1427 // Write messages on E and D.
1428 EXPECT_EQ(OK, SendStringMessage(&node0, E, "hey"));
1429 EXPECT_EQ(OK, SendStringMessage(&node1, D, "hi"));
1430
1431 // Initiate a merge between B and C.
1432 EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));
1433
1434 node0_delegate.set_read_messages(true);
1435 node1_delegate.set_read_messages(true);
1436 node1_delegate.set_save_messages(true);
1437
1438 PumpTasks();
1439
1440 // Expect to receive D's message on E and E's message on D.
1441 ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
1442 EXPECT_EQ(0, strcmp("hi", ToString(message)));
1443 ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
1444 EXPECT_EQ(0, strcmp("hey", ToString(message)));
1445
1446 // Close E and D.
1447 EXPECT_EQ(OK, node0.ClosePort(E));
1448 EXPECT_EQ(OK, node1.ClosePort(D));
1449
1450 PumpTasks();
1451
1452 // Expect everything to have gone away.
1453 EXPECT_TRUE(node0.CanShutdownCleanly(false));
1454 EXPECT_TRUE(node1.CanShutdownCleanly(false));
1455 }
1456
TEST_F(PortsTest,MergePortsFailsGracefully)1457 TEST_F(PortsTest, MergePortsFailsGracefully) {
1458 // This tests that the system remains in a well-defined state if something
1459 // goes wrong during port merge.
1460
1461 NodeName node0_name(0, 1);
1462 TestNodeDelegate node0_delegate(node0_name);
1463 Node node0(node0_name, &node0_delegate);
1464 node_map[0] = &node0;
1465
1466 NodeName node1_name(1, 1);
1467 TestNodeDelegate node1_delegate(node1_name);
1468 Node node1(node1_name, &node1_delegate);
1469 node_map[1] = &node1;
1470
1471 // Setup two independent port pairs, A-B on node0 and C-D on node1.
1472 PortRef A, B, C, D;
1473 EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
1474 EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));
1475
1476 PumpTasks();
1477
1478 // Initiate a merge between B and C.
1479 EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));
1480
1481 // Move C to a new port E. This is dumb and nobody should do it, but it's
1482 // possible. MergePorts will fail as a result because C won't be in a
1483 // receiving state when the event arrives at node1, so B should be closed.
1484 ScopedMessage message;
1485 PortRef X, Y;
1486 EXPECT_EQ(OK, node1.CreatePortPair(&X, &Y));
1487 node1_delegate.set_save_messages(true);
1488 EXPECT_EQ(OK, SendStringMessageWithPort(&node1, X, "foo", C));
1489 ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
1490 ASSERT_EQ(1u, message->num_ports());
1491 PortRef E;
1492 ASSERT_EQ(OK, node1.GetPort(message->ports()[0], &E));
1493 EXPECT_EQ(OK, node1.ClosePort(X));
1494 EXPECT_EQ(OK, node1.ClosePort(Y));
1495
1496 // C goes away as a result of normal proxy removal.
1497 PumpTasks();
1498
1499 EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.GetPort(C.name(), &C));
1500
1501 // B should have been closed cleanly.
1502 EXPECT_EQ(ERROR_PORT_UNKNOWN, node0.GetPort(B.name(), &B));
1503
1504 // Close A, D, and E.
1505 EXPECT_EQ(OK, node0.ClosePort(A));
1506 EXPECT_EQ(OK, node1.ClosePort(D));
1507 EXPECT_EQ(OK, node1.ClosePort(E));
1508
1509 PumpTasks();
1510
1511 // Expect everything to have gone away.
1512 EXPECT_TRUE(node0.CanShutdownCleanly(false));
1513 EXPECT_TRUE(node1.CanShutdownCleanly(false));
1514 }
1515
1516 } // namespace test
1517 } // namespace ports
1518 } // namespace edk
1519 } // namespace mojo
1520