1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include <inttypes.h>
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <string.h>
9
10 #include <map>
11 #include <sstream>
12 #include <utility>
13
14 #include "base/bind.h"
15 #include "base/callback.h"
16 #include "base/containers/queue.h"
17 #include "base/logging.h"
18 #include "base/memory/ref_counted.h"
19 #include "base/strings/string_piece.h"
20 #include "base/strings/stringprintf.h"
21 #include "base/synchronization/lock.h"
22 #include "base/synchronization/waitable_event.h"
23 #include "base/test/scoped_task_environment.h"
24 #include "base/threading/thread.h"
25 #include "mojo/core/ports/event.h"
26 #include "mojo/core/ports/node.h"
27 #include "mojo/core/ports/node_delegate.h"
28 #include "mojo/core/ports/user_message.h"
29 #include "testing/gtest/include/gtest/gtest.h"
30
31 namespace mojo {
32 namespace core {
33 namespace ports {
34 namespace test {
35
36 namespace {
37
38 // TODO(rockot): Remove this unnecessary alias.
39 using ScopedMessage = std::unique_ptr<UserMessageEvent>;
40
41 class TestMessage : public UserMessage {
42 public:
43 static const TypeInfo kUserMessageTypeInfo;
44
TestMessage(const base::StringPiece & payload)45 TestMessage(const base::StringPiece& payload)
46 : UserMessage(&kUserMessageTypeInfo), payload_(payload) {}
~TestMessage()47 ~TestMessage() override {}
48
payload() const49 const std::string& payload() const { return payload_; }
50
51 private:
52 std::string payload_;
53 };
54
55 const UserMessage::TypeInfo TestMessage::kUserMessageTypeInfo = {};
56
NewUserMessageEvent(const base::StringPiece & payload,size_t num_ports)57 ScopedMessage NewUserMessageEvent(const base::StringPiece& payload,
58 size_t num_ports) {
59 auto event = std::make_unique<UserMessageEvent>(num_ports);
60 event->AttachMessage(std::make_unique<TestMessage>(payload));
61 return event;
62 }
63
MessageEquals(const ScopedMessage & message,const base::StringPiece & s)64 bool MessageEquals(const ScopedMessage& message, const base::StringPiece& s) {
65 return message->GetMessage<TestMessage>()->payload() == s;
66 }
67
68 class TestNode;
69
70 class MessageRouter {
71 public:
~MessageRouter()72 virtual ~MessageRouter() {}
73
74 virtual void ForwardEvent(TestNode* from_node,
75 const NodeName& node_name,
76 ScopedEvent event) = 0;
77 virtual void BroadcastEvent(TestNode* from_node, ScopedEvent event) = 0;
78 };
79
80 class TestNode : public NodeDelegate {
81 public:
TestNode(uint64_t id)82 explicit TestNode(uint64_t id)
83 : node_name_(id, 1),
84 node_(node_name_, this),
85 node_thread_(base::StringPrintf("Node %" PRIu64 " thread", id)),
86 events_available_event_(
87 base::WaitableEvent::ResetPolicy::AUTOMATIC,
88 base::WaitableEvent::InitialState::NOT_SIGNALED),
89 idle_event_(base::WaitableEvent::ResetPolicy::MANUAL,
90 base::WaitableEvent::InitialState::SIGNALED) {}
91
~TestNode()92 ~TestNode() override {
93 StopWhenIdle();
94 node_thread_.Stop();
95 }
96
name() const97 const NodeName& name() const { return node_name_; }
98
99 // NOTE: Node is thread-safe.
node()100 Node& node() { return node_; }
101
idle_event()102 base::WaitableEvent& idle_event() { return idle_event_; }
103
IsIdle()104 bool IsIdle() {
105 base::AutoLock lock(lock_);
106 return started_ && !dispatching_ &&
107 (incoming_events_.empty() || (block_on_event_ && blocked_));
108 }
109
BlockOnEvent(Event::Type type)110 void BlockOnEvent(Event::Type type) {
111 base::AutoLock lock(lock_);
112 blocked_event_type_ = type;
113 block_on_event_ = true;
114 }
115
Unblock()116 void Unblock() {
117 base::AutoLock lock(lock_);
118 block_on_event_ = false;
119 events_available_event_.Signal();
120 }
121
Start(MessageRouter * router)122 void Start(MessageRouter* router) {
123 router_ = router;
124 node_thread_.Start();
125 node_thread_.task_runner()->PostTask(
126 FROM_HERE,
127 base::Bind(&TestNode::ProcessEvents, base::Unretained(this)));
128 }
129
StopWhenIdle()130 void StopWhenIdle() {
131 base::AutoLock lock(lock_);
132 should_quit_ = true;
133 events_available_event_.Signal();
134 }
135
WakeUp()136 void WakeUp() { events_available_event_.Signal(); }
137
SendStringMessage(const PortRef & port,const std::string & s)138 int SendStringMessage(const PortRef& port, const std::string& s) {
139 return node_.SendUserMessage(port, NewUserMessageEvent(s, 0));
140 }
141
SendStringMessageWithPort(const PortRef & port,const std::string & s,const PortName & sent_port_name)142 int SendStringMessageWithPort(const PortRef& port,
143 const std::string& s,
144 const PortName& sent_port_name) {
145 auto event = NewUserMessageEvent(s, 1);
146 event->ports()[0] = sent_port_name;
147 return node_.SendUserMessage(port, std::move(event));
148 }
149
SendStringMessageWithPort(const PortRef & port,const std::string & s,const PortRef & sent_port)150 int SendStringMessageWithPort(const PortRef& port,
151 const std::string& s,
152 const PortRef& sent_port) {
153 return SendStringMessageWithPort(port, s, sent_port.name());
154 }
155
set_drop_messages(bool value)156 void set_drop_messages(bool value) {
157 base::AutoLock lock(lock_);
158 drop_messages_ = value;
159 }
160
set_save_messages(bool value)161 void set_save_messages(bool value) {
162 base::AutoLock lock(lock_);
163 save_messages_ = value;
164 }
165
ReadMessage(const PortRef & port,ScopedMessage * message)166 bool ReadMessage(const PortRef& port, ScopedMessage* message) {
167 return node_.GetMessage(port, message, nullptr) == OK && *message;
168 }
169
GetSavedMessage(ScopedMessage * message)170 bool GetSavedMessage(ScopedMessage* message) {
171 base::AutoLock lock(lock_);
172 if (saved_messages_.empty()) {
173 message->reset();
174 return false;
175 }
176 std::swap(*message, saved_messages_.front());
177 saved_messages_.pop();
178 return true;
179 }
180
EnqueueEvent(ScopedEvent event)181 void EnqueueEvent(ScopedEvent event) {
182 idle_event_.Reset();
183
184 // NOTE: This may be called from ForwardMessage and thus must not reenter
185 // |node_|.
186 base::AutoLock lock(lock_);
187 incoming_events_.emplace(std::move(event));
188 events_available_event_.Signal();
189 }
190
ForwardEvent(const NodeName & node_name,ScopedEvent event)191 void ForwardEvent(const NodeName& node_name, ScopedEvent event) override {
192 {
193 base::AutoLock lock(lock_);
194 if (drop_messages_) {
195 DVLOG(1) << "Dropping ForwardMessage from node " << node_name_ << " to "
196 << node_name;
197
198 base::AutoUnlock unlock(lock_);
199 ClosePortsInEvent(event.get());
200 return;
201 }
202 }
203
204 DCHECK(router_);
205 DVLOG(1) << "ForwardEvent from node " << node_name_ << " to " << node_name;
206 router_->ForwardEvent(this, node_name, std::move(event));
207 }
208
BroadcastEvent(ScopedEvent event)209 void BroadcastEvent(ScopedEvent event) override {
210 router_->BroadcastEvent(this, std::move(event));
211 }
212
PortStatusChanged(const PortRef & port)213 void PortStatusChanged(const PortRef& port) override {
214 // The port may be closed, in which case we ignore the notification.
215 base::AutoLock lock(lock_);
216 if (!save_messages_)
217 return;
218
219 for (;;) {
220 ScopedMessage message;
221 {
222 base::AutoUnlock unlock(lock_);
223 if (!ReadMessage(port, &message))
224 break;
225 }
226
227 saved_messages_.emplace(std::move(message));
228 }
229 }
230
ClosePortsInEvent(Event * event)231 void ClosePortsInEvent(Event* event) {
232 if (event->type() != Event::Type::kUserMessage)
233 return;
234
235 UserMessageEvent* message_event = static_cast<UserMessageEvent*>(event);
236 for (size_t i = 0; i < message_event->num_ports(); ++i) {
237 PortRef port;
238 ASSERT_EQ(OK, node_.GetPort(message_event->ports()[i], &port));
239 EXPECT_EQ(OK, node_.ClosePort(port));
240 }
241 }
242
243 private:
ProcessEvents()244 void ProcessEvents() {
245 for (;;) {
246 events_available_event_.Wait();
247 base::AutoLock lock(lock_);
248
249 if (should_quit_)
250 return;
251
252 dispatching_ = true;
253 while (!incoming_events_.empty()) {
254 if (block_on_event_ &&
255 incoming_events_.front()->type() == blocked_event_type_) {
256 blocked_ = true;
257 // Go idle if we hit a blocked event type.
258 break;
259 } else {
260 blocked_ = false;
261 }
262 ScopedEvent event = std::move(incoming_events_.front());
263 incoming_events_.pop();
264
265 // NOTE: AcceptMessage() can re-enter this object to call any of the
266 // NodeDelegate interface methods.
267 base::AutoUnlock unlock(lock_);
268 node_.AcceptEvent(std::move(event));
269 }
270
271 dispatching_ = false;
272 started_ = true;
273 idle_event_.Signal();
274 };
275 }
276
277 const NodeName node_name_;
278 Node node_;
279 MessageRouter* router_ = nullptr;
280
281 base::Thread node_thread_;
282 base::WaitableEvent events_available_event_;
283 base::WaitableEvent idle_event_;
284
285 // Guards fields below.
286 base::Lock lock_;
287 bool started_ = false;
288 bool dispatching_ = false;
289 bool should_quit_ = false;
290 bool drop_messages_ = false;
291 bool save_messages_ = false;
292 bool blocked_ = false;
293 bool block_on_event_ = false;
294 Event::Type blocked_event_type_;
295 base::queue<ScopedEvent> incoming_events_;
296 base::queue<ScopedMessage> saved_messages_;
297 };
298
299 class PortsTest : public testing::Test, public MessageRouter {
300 public:
AddNode(TestNode * node)301 void AddNode(TestNode* node) {
302 {
303 base::AutoLock lock(lock_);
304 nodes_[node->name()] = node;
305 }
306 node->Start(this);
307 }
308
RemoveNode(TestNode * node)309 void RemoveNode(TestNode* node) {
310 {
311 base::AutoLock lock(lock_);
312 nodes_.erase(node->name());
313 }
314
315 for (const auto& entry : nodes_)
316 entry.second->node().LostConnectionToNode(node->name());
317 }
318
319 // Waits until all known Nodes are idle. Message forwarding and processing
320 // is handled in such a way that idleness is a stable state: once all nodes in
321 // the system are idle, they will remain idle until the test explicitly
322 // initiates some further event (e.g. sending a message, closing a port, or
323 // removing a Node).
WaitForIdle()324 void WaitForIdle() {
325 for (;;) {
326 base::AutoLock global_lock(global_lock_);
327 bool all_nodes_idle = true;
328 for (const auto& entry : nodes_) {
329 if (!entry.second->IsIdle())
330 all_nodes_idle = false;
331 entry.second->WakeUp();
332 }
333 if (all_nodes_idle)
334 return;
335
336 // Wait for any Node to signal that it's idle.
337 base::AutoUnlock global_unlock(global_lock_);
338 std::vector<base::WaitableEvent*> events;
339 for (const auto& entry : nodes_)
340 events.push_back(&entry.second->idle_event());
341 base::WaitableEvent::WaitMany(events.data(), events.size());
342 }
343 }
344
CreatePortPair(TestNode * node0,PortRef * port0,TestNode * node1,PortRef * port1)345 void CreatePortPair(TestNode* node0,
346 PortRef* port0,
347 TestNode* node1,
348 PortRef* port1) {
349 if (node0 == node1) {
350 EXPECT_EQ(OK, node0->node().CreatePortPair(port0, port1));
351 } else {
352 EXPECT_EQ(OK, node0->node().CreateUninitializedPort(port0));
353 EXPECT_EQ(OK, node1->node().CreateUninitializedPort(port1));
354 EXPECT_EQ(OK, node0->node().InitializePort(*port0, node1->name(),
355 port1->name()));
356 EXPECT_EQ(OK, node1->node().InitializePort(*port1, node0->name(),
357 port0->name()));
358 }
359 }
360
361 private:
362 // MessageRouter:
ForwardEvent(TestNode * from_node,const NodeName & node_name,ScopedEvent event)363 void ForwardEvent(TestNode* from_node,
364 const NodeName& node_name,
365 ScopedEvent event) override {
366 base::AutoLock global_lock(global_lock_);
367 base::AutoLock lock(lock_);
368 // Drop messages from nodes that have been removed.
369 if (nodes_.find(from_node->name()) == nodes_.end()) {
370 from_node->ClosePortsInEvent(event.get());
371 return;
372 }
373
374 auto it = nodes_.find(node_name);
375 if (it == nodes_.end()) {
376 DVLOG(1) << "Node not found: " << node_name;
377 return;
378 }
379
380 it->second->EnqueueEvent(std::move(event));
381 }
382
BroadcastEvent(TestNode * from_node,ScopedEvent event)383 void BroadcastEvent(TestNode* from_node, ScopedEvent event) override {
384 base::AutoLock global_lock(global_lock_);
385 base::AutoLock lock(lock_);
386
387 // Drop messages from nodes that have been removed.
388 if (nodes_.find(from_node->name()) == nodes_.end())
389 return;
390
391 for (const auto& entry : nodes_) {
392 TestNode* node = entry.second;
393 // Broadcast doesn't deliver to the local node.
394 if (node == from_node)
395 continue;
396 node->EnqueueEvent(event->Clone());
397 }
398 }
399
400 base::test::ScopedTaskEnvironment scoped_task_environment_;
401
402 // Acquired before any operation which makes a Node busy, and before testing
403 // if all nodes are idle.
404 base::Lock global_lock_;
405
406 base::Lock lock_;
407 std::map<NodeName, TestNode*> nodes_;
408 };
409
410 } // namespace
411
TEST_F(PortsTest,Basic1)412 TEST_F(PortsTest, Basic1) {
413 TestNode node0(0);
414 AddNode(&node0);
415
416 TestNode node1(1);
417 AddNode(&node1);
418
419 PortRef x0, x1;
420 CreatePortPair(&node0, &x0, &node1, &x1);
421
422 PortRef a0, a1;
423 EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
424 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", a1));
425 EXPECT_EQ(OK, node0.node().ClosePort(a0));
426
427 EXPECT_EQ(OK, node0.node().ClosePort(x0));
428 EXPECT_EQ(OK, node1.node().ClosePort(x1));
429
430 WaitForIdle();
431
432 EXPECT_TRUE(node0.node().CanShutdownCleanly());
433 EXPECT_TRUE(node1.node().CanShutdownCleanly());
434 }
435
TEST_F(PortsTest,Basic2)436 TEST_F(PortsTest, Basic2) {
437 TestNode node0(0);
438 AddNode(&node0);
439
440 TestNode node1(1);
441 AddNode(&node1);
442
443 PortRef x0, x1;
444 CreatePortPair(&node0, &x0, &node1, &x1);
445
446 PortRef b0, b1;
447 EXPECT_EQ(OK, node0.node().CreatePortPair(&b0, &b1));
448 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", b1));
449 EXPECT_EQ(OK, node0.SendStringMessage(b0, "hello again"));
450
451 EXPECT_EQ(OK, node0.node().ClosePort(b0));
452
453 EXPECT_EQ(OK, node0.node().ClosePort(x0));
454 EXPECT_EQ(OK, node1.node().ClosePort(x1));
455
456 WaitForIdle();
457
458 EXPECT_TRUE(node0.node().CanShutdownCleanly());
459 EXPECT_TRUE(node1.node().CanShutdownCleanly());
460 }
461
TEST_F(PortsTest,Basic3)462 TEST_F(PortsTest, Basic3) {
463 TestNode node0(0);
464 AddNode(&node0);
465
466 TestNode node1(1);
467 AddNode(&node1);
468
469 PortRef x0, x1;
470 CreatePortPair(&node0, &x0, &node1, &x1);
471
472 PortRef a0, a1;
473 EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
474
475 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", a1));
476 EXPECT_EQ(OK, node0.SendStringMessage(a0, "hello again"));
477
478 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a0));
479
480 PortRef b0, b1;
481 EXPECT_EQ(OK, node0.node().CreatePortPair(&b0, &b1));
482 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "bar", b1));
483 EXPECT_EQ(OK, node0.SendStringMessage(b0, "baz"));
484
485 EXPECT_EQ(OK, node0.node().ClosePort(b0));
486
487 EXPECT_EQ(OK, node0.node().ClosePort(x0));
488 EXPECT_EQ(OK, node1.node().ClosePort(x1));
489
490 WaitForIdle();
491
492 EXPECT_TRUE(node0.node().CanShutdownCleanly());
493 EXPECT_TRUE(node1.node().CanShutdownCleanly());
494 }
495
TEST_F(PortsTest,LostConnectionToNode1)496 TEST_F(PortsTest, LostConnectionToNode1) {
497 TestNode node0(0);
498 AddNode(&node0);
499
500 TestNode node1(1);
501 AddNode(&node1);
502 node1.set_drop_messages(true);
503
504 PortRef x0, x1;
505 CreatePortPair(&node0, &x0, &node1, &x1);
506
507 // Transfer a port to node1 and simulate a lost connection to node1.
508
509 PortRef a0, a1;
510 EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
511 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a1));
512
513 WaitForIdle();
514
515 RemoveNode(&node1);
516
517 WaitForIdle();
518
519 EXPECT_EQ(OK, node0.node().ClosePort(a0));
520 EXPECT_EQ(OK, node0.node().ClosePort(x0));
521 EXPECT_EQ(OK, node1.node().ClosePort(x1));
522
523 WaitForIdle();
524
525 EXPECT_TRUE(node0.node().CanShutdownCleanly());
526 EXPECT_TRUE(node1.node().CanShutdownCleanly());
527 }
528
TEST_F(PortsTest,LostConnectionToNode2)529 TEST_F(PortsTest, LostConnectionToNode2) {
530 TestNode node0(0);
531 AddNode(&node0);
532
533 TestNode node1(1);
534 AddNode(&node1);
535
536 PortRef x0, x1;
537 CreatePortPair(&node0, &x0, &node1, &x1);
538
539 PortRef a0, a1;
540 EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
541 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "take a1", a1));
542
543 WaitForIdle();
544
545 node1.set_drop_messages(true);
546
547 RemoveNode(&node1);
548
549 WaitForIdle();
550
551 // a0 should have eventually detected peer closure after node loss.
552 ScopedMessage message;
553 EXPECT_EQ(ERROR_PORT_PEER_CLOSED,
554 node0.node().GetMessage(a0, &message, nullptr));
555 EXPECT_FALSE(message);
556
557 EXPECT_EQ(OK, node0.node().ClosePort(a0));
558
559 EXPECT_EQ(OK, node0.node().ClosePort(x0));
560
561 EXPECT_EQ(OK, node1.node().GetMessage(x1, &message, nullptr));
562 EXPECT_TRUE(message);
563 node1.ClosePortsInEvent(message.get());
564
565 EXPECT_EQ(OK, node1.node().ClosePort(x1));
566
567 WaitForIdle();
568
569 EXPECT_TRUE(node0.node().CanShutdownCleanly());
570 EXPECT_TRUE(node1.node().CanShutdownCleanly());
571 }
572
TEST_F(PortsTest,LostConnectionToNodeWithSecondaryProxy)573 TEST_F(PortsTest, LostConnectionToNodeWithSecondaryProxy) {
574 // Tests that a proxy gets cleaned up when its indirect peer lives on a lost
575 // node.
576
577 TestNode node0(0);
578 AddNode(&node0);
579
580 TestNode node1(1);
581 AddNode(&node1);
582
583 TestNode node2(2);
584 AddNode(&node2);
585
586 // Create A-B spanning nodes 0 and 1 and C-D spanning 1 and 2.
587 PortRef A, B, C, D;
588 CreatePortPair(&node0, &A, &node1, &B);
589 CreatePortPair(&node1, &C, &node2, &D);
590
591 // Create E-F and send F over A to node 1.
592 PortRef E, F;
593 EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F));
594 EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", F));
595
596 WaitForIdle();
597
598 ScopedMessage message;
599 ASSERT_TRUE(node1.ReadMessage(B, &message));
600 ASSERT_EQ(1u, message->num_ports());
601
602 EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &F));
603
604 // Send F over C to node 2 and then simulate node 2 loss from node 1. Node 1
605 // will trivially become aware of the loss, and this test verifies that the
606 // port A on node 0 will eventually also become aware of it.
607
608 // Make sure node2 stops processing events when it encounters an ObserveProxy.
609 node2.BlockOnEvent(Event::Type::kObserveProxy);
610
611 EXPECT_EQ(OK, node1.SendStringMessageWithPort(C, ".", F));
612 WaitForIdle();
613
614 // Simulate node 1 and 2 disconnecting.
615 EXPECT_EQ(OK, node1.node().LostConnectionToNode(node2.name()));
616
617 // Let node2 continue processing events and wait for everyone to go idle.
618 node2.Unblock();
619 WaitForIdle();
620
621 // Port F should be gone.
622 EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(F.name(), &F));
623
624 // Port E should have detected peer closure despite the fact that there is
625 // no longer a continuous route from F to E over which the event could travel.
626 PortStatus status;
627 EXPECT_EQ(OK, node0.node().GetStatus(E, &status));
628 EXPECT_TRUE(status.peer_closed);
629
630 EXPECT_EQ(OK, node0.node().ClosePort(A));
631 EXPECT_EQ(OK, node1.node().ClosePort(B));
632 EXPECT_EQ(OK, node1.node().ClosePort(C));
633 EXPECT_EQ(OK, node0.node().ClosePort(E));
634
635 WaitForIdle();
636
637 EXPECT_TRUE(node0.node().CanShutdownCleanly());
638 EXPECT_TRUE(node1.node().CanShutdownCleanly());
639 }
640
TEST_F(PortsTest,LostConnectionToNodeWithLocalProxy)641 TEST_F(PortsTest, LostConnectionToNodeWithLocalProxy) {
642 // Tests that a proxy gets cleaned up when its direct peer lives on a lost
643 // node and it's predecessor lives on the same node.
644
645 TestNode node0(0);
646 AddNode(&node0);
647
648 TestNode node1(1);
649 AddNode(&node1);
650
651 PortRef A, B;
652 CreatePortPair(&node0, &A, &node1, &B);
653
654 PortRef C, D;
655 EXPECT_EQ(OK, node0.node().CreatePortPair(&C, &D));
656
657 // Send D but block node0 on an ObserveProxy event.
658 node0.BlockOnEvent(Event::Type::kObserveProxy);
659 EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", D));
660
661 // node0 won't collapse the proxy but node1 will receive the message before
662 // going idle.
663 WaitForIdle();
664
665 ScopedMessage message;
666 ASSERT_TRUE(node1.ReadMessage(B, &message));
667 ASSERT_EQ(1u, message->num_ports());
668 PortRef E;
669 EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &E));
670
671 RemoveNode(&node1);
672
673 node0.Unblock();
674 WaitForIdle();
675
676 // Port C should have detected peer closure.
677 PortStatus status;
678 EXPECT_EQ(OK, node0.node().GetStatus(C, &status));
679 EXPECT_TRUE(status.peer_closed);
680
681 EXPECT_EQ(OK, node0.node().ClosePort(A));
682 EXPECT_EQ(OK, node1.node().ClosePort(B));
683 EXPECT_EQ(OK, node0.node().ClosePort(C));
684 EXPECT_EQ(OK, node1.node().ClosePort(E));
685
686 EXPECT_TRUE(node0.node().CanShutdownCleanly());
687 EXPECT_TRUE(node1.node().CanShutdownCleanly());
688 }
689
TEST_F(PortsTest,GetMessage1)690 TEST_F(PortsTest, GetMessage1) {
691 TestNode node(0);
692 AddNode(&node);
693
694 PortRef a0, a1;
695 EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1));
696
697 ScopedMessage message;
698 EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr));
699 EXPECT_FALSE(message);
700
701 EXPECT_EQ(OK, node.node().ClosePort(a1));
702
703 WaitForIdle();
704
705 EXPECT_EQ(ERROR_PORT_PEER_CLOSED,
706 node.node().GetMessage(a0, &message, nullptr));
707 EXPECT_FALSE(message);
708
709 EXPECT_EQ(OK, node.node().ClosePort(a0));
710
711 WaitForIdle();
712
713 EXPECT_TRUE(node.node().CanShutdownCleanly());
714 }
715
TEST_F(PortsTest,GetMessage2)716 TEST_F(PortsTest, GetMessage2) {
717 TestNode node(0);
718 AddNode(&node);
719
720 PortRef a0, a1;
721 EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1));
722
723 EXPECT_EQ(OK, node.SendStringMessage(a1, "1"));
724
725 ScopedMessage message;
726 EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr));
727
728 ASSERT_TRUE(message);
729 EXPECT_TRUE(MessageEquals(message, "1"));
730
731 EXPECT_EQ(OK, node.node().ClosePort(a0));
732 EXPECT_EQ(OK, node.node().ClosePort(a1));
733
734 EXPECT_TRUE(node.node().CanShutdownCleanly());
735 }
736
TEST_F(PortsTest,GetMessage3)737 TEST_F(PortsTest, GetMessage3) {
738 TestNode node(0);
739 AddNode(&node);
740
741 PortRef a0, a1;
742 EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1));
743
744 const char* kStrings[] = {"1", "2", "3"};
745
746 for (size_t i = 0; i < sizeof(kStrings) / sizeof(kStrings[0]); ++i)
747 EXPECT_EQ(OK, node.SendStringMessage(a1, kStrings[i]));
748
749 ScopedMessage message;
750 for (size_t i = 0; i < sizeof(kStrings) / sizeof(kStrings[0]); ++i) {
751 EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr));
752 ASSERT_TRUE(message);
753 EXPECT_TRUE(MessageEquals(message, kStrings[i]));
754 }
755
756 EXPECT_EQ(OK, node.node().ClosePort(a0));
757 EXPECT_EQ(OK, node.node().ClosePort(a1));
758
759 EXPECT_TRUE(node.node().CanShutdownCleanly());
760 }
761
TEST_F(PortsTest,Delegation1)762 TEST_F(PortsTest, Delegation1) {
763 TestNode node0(0);
764 AddNode(&node0);
765
766 TestNode node1(1);
767 AddNode(&node1);
768
769 PortRef x0, x1;
770 CreatePortPair(&node0, &x0, &node1, &x1);
771
772 // In this test, we send a message to a port that has been moved.
773
774 PortRef a0, a1;
775 EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
776 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "a1", a1));
777 WaitForIdle();
778
779 ScopedMessage message;
780 ASSERT_TRUE(node1.ReadMessage(x1, &message));
781 ASSERT_EQ(1u, message->num_ports());
782 EXPECT_TRUE(MessageEquals(message, "a1"));
783
784 // This is "a1" from the point of view of node1.
785 PortName a2_name = message->ports()[0];
786 EXPECT_EQ(OK, node1.SendStringMessageWithPort(x1, "a2", a2_name));
787 EXPECT_EQ(OK, node0.SendStringMessage(a0, "hello"));
788
789 WaitForIdle();
790
791 ASSERT_TRUE(node0.ReadMessage(x0, &message));
792 ASSERT_EQ(1u, message->num_ports());
793 EXPECT_TRUE(MessageEquals(message, "a2"));
794
795 // This is "a2" from the point of view of node1.
796 PortName a3_name = message->ports()[0];
797
798 PortRef a3;
799 EXPECT_EQ(OK, node0.node().GetPort(a3_name, &a3));
800
801 ASSERT_TRUE(node0.ReadMessage(a3, &message));
802 EXPECT_EQ(0u, message->num_ports());
803 EXPECT_TRUE(MessageEquals(message, "hello"));
804
805 EXPECT_EQ(OK, node0.node().ClosePort(a0));
806 EXPECT_EQ(OK, node0.node().ClosePort(a3));
807
808 EXPECT_EQ(OK, node0.node().ClosePort(x0));
809 EXPECT_EQ(OK, node1.node().ClosePort(x1));
810
811 EXPECT_TRUE(node0.node().CanShutdownCleanly());
812 EXPECT_TRUE(node1.node().CanShutdownCleanly());
813 }
814
TEST_F(PortsTest,Delegation2)815 TEST_F(PortsTest, Delegation2) {
816 TestNode node0(0);
817 AddNode(&node0);
818
819 TestNode node1(1);
820 AddNode(&node1);
821
822 for (int i = 0; i < 100; ++i) {
823 // Setup pipe a<->b between node0 and node1.
824 PortRef A, B;
825 CreatePortPair(&node0, &A, &node1, &B);
826
827 PortRef C, D;
828 EXPECT_EQ(OK, node0.node().CreatePortPair(&C, &D));
829
830 PortRef E, F;
831 EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F));
832
833 node1.set_save_messages(true);
834
835 // Pass D over A to B.
836 EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, "1", D));
837
838 // Pass F over C to D.
839 EXPECT_EQ(OK, node0.SendStringMessageWithPort(C, "1", F));
840
841 // This message should find its way to node1.
842 EXPECT_EQ(OK, node0.SendStringMessage(E, "hello"));
843
844 WaitForIdle();
845
846 EXPECT_EQ(OK, node0.node().ClosePort(C));
847 EXPECT_EQ(OK, node0.node().ClosePort(E));
848
849 EXPECT_EQ(OK, node0.node().ClosePort(A));
850 EXPECT_EQ(OK, node1.node().ClosePort(B));
851
852 bool got_hello = false;
853 ScopedMessage message;
854 while (node1.GetSavedMessage(&message)) {
855 node1.ClosePortsInEvent(message.get());
856 if (MessageEquals(message, "hello")) {
857 got_hello = true;
858 break;
859 }
860 }
861
862 EXPECT_TRUE(got_hello);
863
864 WaitForIdle(); // Because closing ports may have generated tasks.
865 }
866
867 EXPECT_TRUE(node0.node().CanShutdownCleanly());
868 EXPECT_TRUE(node1.node().CanShutdownCleanly());
869 }
870
TEST_F(PortsTest,SendUninitialized)871 TEST_F(PortsTest, SendUninitialized) {
872 TestNode node(0);
873 AddNode(&node);
874
875 PortRef x0;
876 EXPECT_EQ(OK, node.node().CreateUninitializedPort(&x0));
877 EXPECT_EQ(ERROR_PORT_STATE_UNEXPECTED, node.SendStringMessage(x0, "oops"));
878 EXPECT_EQ(OK, node.node().ClosePort(x0));
879 EXPECT_TRUE(node.node().CanShutdownCleanly());
880 }
881
TEST_F(PortsTest,SendFailure)882 TEST_F(PortsTest, SendFailure) {
883 TestNode node(0);
884 AddNode(&node);
885
886 node.set_save_messages(true);
887
888 PortRef A, B;
889 EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
890
891 // Try to send A over itself.
892
893 EXPECT_EQ(ERROR_PORT_CANNOT_SEND_SELF,
894 node.SendStringMessageWithPort(A, "oops", A));
895
896 // Try to send B over A.
897
898 EXPECT_EQ(ERROR_PORT_CANNOT_SEND_PEER,
899 node.SendStringMessageWithPort(A, "nope", B));
900
901 // B should be closed immediately.
902 EXPECT_EQ(ERROR_PORT_UNKNOWN, node.node().GetPort(B.name(), &B));
903
904 WaitForIdle();
905
906 // There should have been no messages accepted.
907 ScopedMessage message;
908 EXPECT_FALSE(node.GetSavedMessage(&message));
909
910 EXPECT_EQ(OK, node.node().ClosePort(A));
911
912 WaitForIdle();
913
914 EXPECT_TRUE(node.node().CanShutdownCleanly());
915 }
916
TEST_F(PortsTest,DontLeakUnreceivedPorts)917 TEST_F(PortsTest, DontLeakUnreceivedPorts) {
918 TestNode node(0);
919 AddNode(&node);
920
921 PortRef A, B, C, D;
922 EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
923 EXPECT_EQ(OK, node.node().CreatePortPair(&C, &D));
924
925 EXPECT_EQ(OK, node.SendStringMessageWithPort(A, "foo", D));
926
927 EXPECT_EQ(OK, node.node().ClosePort(C));
928 EXPECT_EQ(OK, node.node().ClosePort(A));
929 EXPECT_EQ(OK, node.node().ClosePort(B));
930
931 WaitForIdle();
932
933 EXPECT_TRUE(node.node().CanShutdownCleanly());
934 }
935
TEST_F(PortsTest,AllowShutdownWithLocalPortsOpen)936 TEST_F(PortsTest, AllowShutdownWithLocalPortsOpen) {
937 TestNode node(0);
938 AddNode(&node);
939
940 PortRef A, B, C, D;
941 EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
942 EXPECT_EQ(OK, node.node().CreatePortPair(&C, &D));
943
944 EXPECT_EQ(OK, node.SendStringMessageWithPort(A, "foo", D));
945
946 ScopedMessage message;
947 EXPECT_TRUE(node.ReadMessage(B, &message));
948 ASSERT_EQ(1u, message->num_ports());
949 EXPECT_TRUE(MessageEquals(message, "foo"));
950 PortRef E;
951 ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E));
952
953 EXPECT_TRUE(
954 node.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
955
956 WaitForIdle();
957
958 EXPECT_TRUE(
959 node.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
960 EXPECT_FALSE(node.node().CanShutdownCleanly());
961
962 EXPECT_EQ(OK, node.node().ClosePort(A));
963 EXPECT_EQ(OK, node.node().ClosePort(B));
964 EXPECT_EQ(OK, node.node().ClosePort(C));
965 EXPECT_EQ(OK, node.node().ClosePort(E));
966
967 WaitForIdle();
968
969 EXPECT_TRUE(node.node().CanShutdownCleanly());
970 }
971
TEST_F(PortsTest,ProxyCollapse1)972 TEST_F(PortsTest, ProxyCollapse1) {
973 TestNode node(0);
974 AddNode(&node);
975
976 PortRef A, B;
977 EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
978
979 PortRef X, Y;
980 EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
981
982 ScopedMessage message;
983
984 // Send B and receive it as C.
985 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
986 ASSERT_TRUE(node.ReadMessage(Y, &message));
987 ASSERT_EQ(1u, message->num_ports());
988 PortRef C;
989 ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));
990
991 // Send C and receive it as D.
992 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", C));
993 ASSERT_TRUE(node.ReadMessage(Y, &message));
994 ASSERT_EQ(1u, message->num_ports());
995 PortRef D;
996 ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &D));
997
998 // Send D and receive it as E.
999 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", D));
1000 ASSERT_TRUE(node.ReadMessage(Y, &message));
1001 ASSERT_EQ(1u, message->num_ports());
1002 PortRef E;
1003 ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E));
1004
1005 EXPECT_EQ(OK, node.node().ClosePort(X));
1006 EXPECT_EQ(OK, node.node().ClosePort(Y));
1007
1008 EXPECT_EQ(OK, node.node().ClosePort(A));
1009 EXPECT_EQ(OK, node.node().ClosePort(E));
1010
1011 // The node should not idle until all proxies are collapsed.
1012 WaitForIdle();
1013
1014 EXPECT_TRUE(node.node().CanShutdownCleanly());
1015 }
1016
TEST_F(PortsTest,ProxyCollapse2)1017 TEST_F(PortsTest, ProxyCollapse2) {
1018 TestNode node(0);
1019 AddNode(&node);
1020
1021 PortRef A, B;
1022 EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
1023
1024 PortRef X, Y;
1025 EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
1026
1027 ScopedMessage message;
1028
1029 // Send B and A to create proxies in each direction.
1030 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
1031 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", A));
1032
1033 EXPECT_EQ(OK, node.node().ClosePort(X));
1034 EXPECT_EQ(OK, node.node().ClosePort(Y));
1035
1036 // At this point we have a scenario with:
1037 //
1038 // D -> [B] -> C -> [A]
1039 //
1040 // Ensure that the proxies can collapse. The sent ports will be closed
1041 // eventually as a result of Y's closure.
1042
1043 WaitForIdle();
1044
1045 EXPECT_TRUE(node.node().CanShutdownCleanly());
1046 }
1047
TEST_F(PortsTest,SendWithClosedPeer)1048 TEST_F(PortsTest, SendWithClosedPeer) {
1049 // This tests that if a port is sent when its peer is already known to be
1050 // closed, the newly created port will be aware of that peer closure, and the
1051 // proxy will eventually collapse.
1052
1053 TestNode node(0);
1054 AddNode(&node);
1055
1056 // Send a message from A to B, then close A.
1057 PortRef A, B;
1058 EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
1059 EXPECT_EQ(OK, node.SendStringMessage(A, "hey"));
1060 EXPECT_EQ(OK, node.node().ClosePort(A));
1061
1062 // Now send B over X-Y as new port C.
1063 PortRef X, Y;
1064 EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
1065 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
1066 ScopedMessage message;
1067 ASSERT_TRUE(node.ReadMessage(Y, &message));
1068 ASSERT_EQ(1u, message->num_ports());
1069 PortRef C;
1070 ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));
1071
1072 EXPECT_EQ(OK, node.node().ClosePort(X));
1073 EXPECT_EQ(OK, node.node().ClosePort(Y));
1074
1075 WaitForIdle();
1076
1077 // C should have received the message originally sent to B, and it should also
1078 // be aware of A's closure.
1079
1080 ASSERT_TRUE(node.ReadMessage(C, &message));
1081 EXPECT_TRUE(MessageEquals(message, "hey"));
1082
1083 PortStatus status;
1084 EXPECT_EQ(OK, node.node().GetStatus(C, &status));
1085 EXPECT_FALSE(status.receiving_messages);
1086 EXPECT_FALSE(status.has_messages);
1087 EXPECT_TRUE(status.peer_closed);
1088
1089 node.node().ClosePort(C);
1090
1091 WaitForIdle();
1092
1093 EXPECT_TRUE(node.node().CanShutdownCleanly());
1094 }
1095
TEST_F(PortsTest,SendWithClosedPeerSent)1096 TEST_F(PortsTest, SendWithClosedPeerSent) {
1097 // This tests that if a port is closed while some number of proxies are still
1098 // routing messages (directly or indirectly) to it, that the peer port is
1099 // eventually notified of the closure, and the dead-end proxies will
1100 // eventually be removed.
1101
1102 TestNode node(0);
1103 AddNode(&node);
1104
1105 PortRef X, Y;
1106 EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
1107
1108 PortRef A, B;
1109 EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
1110
1111 ScopedMessage message;
1112
1113 // Send A as new port C.
1114 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", A));
1115
1116 ASSERT_TRUE(node.ReadMessage(Y, &message));
1117 ASSERT_EQ(1u, message->num_ports());
1118 PortRef C;
1119 ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));
1120
1121 // Send C as new port D.
1122 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", C));
1123
1124 ASSERT_TRUE(node.ReadMessage(Y, &message));
1125 ASSERT_EQ(1u, message->num_ports());
1126 PortRef D;
1127 ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &D));
1128
1129 // Send a message to B through D, then close D.
1130 EXPECT_EQ(OK, node.SendStringMessage(D, "hey"));
1131 EXPECT_EQ(OK, node.node().ClosePort(D));
1132
1133 // Now send B as new port E.
1134
1135 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
1136 EXPECT_EQ(OK, node.node().ClosePort(X));
1137
1138 ASSERT_TRUE(node.ReadMessage(Y, &message));
1139 ASSERT_EQ(1u, message->num_ports());
1140 PortRef E;
1141 ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E));
1142
1143 EXPECT_EQ(OK, node.node().ClosePort(Y));
1144
1145 WaitForIdle();
1146
1147 // E should receive the message originally sent to B, and it should also be
1148 // aware of D's closure.
1149
1150 ASSERT_TRUE(node.ReadMessage(E, &message));
1151 EXPECT_TRUE(MessageEquals(message, "hey"));
1152
1153 PortStatus status;
1154 EXPECT_EQ(OK, node.node().GetStatus(E, &status));
1155 EXPECT_FALSE(status.receiving_messages);
1156 EXPECT_FALSE(status.has_messages);
1157 EXPECT_TRUE(status.peer_closed);
1158
1159 EXPECT_EQ(OK, node.node().ClosePort(E));
1160
1161 WaitForIdle();
1162
1163 EXPECT_TRUE(node.node().CanShutdownCleanly());
1164 }
1165
TEST_F(PortsTest,MergePorts)1166 TEST_F(PortsTest, MergePorts) {
1167 TestNode node0(0);
1168 AddNode(&node0);
1169
1170 TestNode node1(1);
1171 AddNode(&node1);
1172
1173 // Setup two independent port pairs, A-B on node0 and C-D on node1.
1174 PortRef A, B, C, D;
1175 EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
1176 EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
1177
1178 // Write a message on A.
1179 EXPECT_EQ(OK, node0.SendStringMessage(A, "hey"));
1180
1181 // Initiate a merge between B and C.
1182 EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
1183
1184 WaitForIdle();
1185
1186 // Expect all proxies to be gone once idle.
1187 EXPECT_TRUE(
1188 node0.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
1189 EXPECT_TRUE(
1190 node1.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
1191
1192 // Expect D to have received the message sent on A.
1193 ScopedMessage message;
1194 ASSERT_TRUE(node1.ReadMessage(D, &message));
1195 EXPECT_TRUE(MessageEquals(message, "hey"));
1196
1197 EXPECT_EQ(OK, node0.node().ClosePort(A));
1198 EXPECT_EQ(OK, node1.node().ClosePort(D));
1199
1200 // No more ports should be open.
1201 EXPECT_TRUE(node0.node().CanShutdownCleanly());
1202 EXPECT_TRUE(node1.node().CanShutdownCleanly());
1203 }
1204
TEST_F(PortsTest,MergePortWithClosedPeer1)1205 TEST_F(PortsTest, MergePortWithClosedPeer1) {
1206 // This tests that the right thing happens when initiating a merge on a port
1207 // whose peer has already been closed.
1208
1209 TestNode node0(0);
1210 AddNode(&node0);
1211
1212 TestNode node1(1);
1213 AddNode(&node1);
1214
1215 // Setup two independent port pairs, A-B on node0 and C-D on node1.
1216 PortRef A, B, C, D;
1217 EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
1218 EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
1219
1220 // Write a message on A.
1221 EXPECT_EQ(OK, node0.SendStringMessage(A, "hey"));
1222
1223 // Close A.
1224 EXPECT_EQ(OK, node0.node().ClosePort(A));
1225
1226 // Initiate a merge between B and C.
1227 EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
1228
1229 WaitForIdle();
1230
1231 // Expect all proxies to be gone once idle. node0 should have no ports since
1232 // A was explicitly closed.
1233 EXPECT_TRUE(node0.node().CanShutdownCleanly());
1234 EXPECT_TRUE(
1235 node1.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
1236
1237 // Expect D to have received the message sent on A.
1238 ScopedMessage message;
1239 ASSERT_TRUE(node1.ReadMessage(D, &message));
1240 EXPECT_TRUE(MessageEquals(message, "hey"));
1241
1242 EXPECT_EQ(OK, node1.node().ClosePort(D));
1243
1244 // No more ports should be open.
1245 EXPECT_TRUE(node0.node().CanShutdownCleanly());
1246 EXPECT_TRUE(node1.node().CanShutdownCleanly());
1247 }
1248
TEST_F(PortsTest,MergePortWithClosedPeer2)1249 TEST_F(PortsTest, MergePortWithClosedPeer2) {
1250 // This tests that the right thing happens when merging into a port whose peer
1251 // has already been closed.
1252
1253 TestNode node0(0);
1254 AddNode(&node0);
1255
1256 TestNode node1(1);
1257 AddNode(&node1);
1258
1259 // Setup two independent port pairs, A-B on node0 and C-D on node1.
1260 PortRef A, B, C, D;
1261 EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
1262 EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
1263
1264 // Write a message on D and close it.
1265 EXPECT_EQ(OK, node0.SendStringMessage(D, "hey"));
1266 EXPECT_EQ(OK, node1.node().ClosePort(D));
1267
1268 // Initiate a merge between B and C.
1269 EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
1270
1271 WaitForIdle();
1272
1273 // Expect all proxies to be gone once idle. node1 should have no ports since
1274 // D was explicitly closed.
1275 EXPECT_TRUE(
1276 node0.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
1277 EXPECT_TRUE(node1.node().CanShutdownCleanly());
1278
1279 // Expect A to have received the message sent on D.
1280 ScopedMessage message;
1281 ASSERT_TRUE(node0.ReadMessage(A, &message));
1282 EXPECT_TRUE(MessageEquals(message, "hey"));
1283
1284 EXPECT_EQ(OK, node0.node().ClosePort(A));
1285
1286 // No more ports should be open.
1287 EXPECT_TRUE(node0.node().CanShutdownCleanly());
1288 EXPECT_TRUE(node1.node().CanShutdownCleanly());
1289 }
1290
TEST_F(PortsTest,MergePortsWithClosedPeers)1291 TEST_F(PortsTest, MergePortsWithClosedPeers) {
1292 // This tests that no residual ports are left behind if two ports are merged
1293 // when both of their peers have been closed.
1294
1295 TestNode node0(0);
1296 AddNode(&node0);
1297
1298 TestNode node1(1);
1299 AddNode(&node1);
1300
1301 // Setup two independent port pairs, A-B on node0 and C-D on node1.
1302 PortRef A, B, C, D;
1303 EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
1304 EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
1305
1306 // Close A and D.
1307 EXPECT_EQ(OK, node0.node().ClosePort(A));
1308 EXPECT_EQ(OK, node1.node().ClosePort(D));
1309
1310 WaitForIdle();
1311
1312 // Initiate a merge between B and C.
1313 EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
1314
1315 WaitForIdle();
1316
1317 // Expect everything to have gone away.
1318 EXPECT_TRUE(node0.node().CanShutdownCleanly());
1319 EXPECT_TRUE(node1.node().CanShutdownCleanly());
1320 }
1321
TEST_F(PortsTest,MergePortsWithMovedPeers)1322 TEST_F(PortsTest, MergePortsWithMovedPeers) {
1323 // This tests that ports can be merged successfully even if their peers are
1324 // moved around.
1325
1326 TestNode node0(0);
1327 AddNode(&node0);
1328
1329 TestNode node1(1);
1330 AddNode(&node1);
1331
1332 // Setup two independent port pairs, A-B on node0 and C-D on node1.
1333 PortRef A, B, C, D;
1334 EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
1335 EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
1336
1337 // Set up another pair X-Y for moving ports on node0.
1338 PortRef X, Y;
1339 EXPECT_EQ(OK, node0.node().CreatePortPair(&X, &Y));
1340
1341 ScopedMessage message;
1342
1343 // Move A to new port E.
1344 EXPECT_EQ(OK, node0.SendStringMessageWithPort(X, "foo", A));
1345 ASSERT_TRUE(node0.ReadMessage(Y, &message));
1346 ASSERT_EQ(1u, message->num_ports());
1347 PortRef E;
1348 ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &E));
1349
1350 EXPECT_EQ(OK, node0.node().ClosePort(X));
1351 EXPECT_EQ(OK, node0.node().ClosePort(Y));
1352
1353 // Write messages on E and D.
1354 EXPECT_EQ(OK, node0.SendStringMessage(E, "hey"));
1355 EXPECT_EQ(OK, node1.SendStringMessage(D, "hi"));
1356
1357 // Initiate a merge between B and C.
1358 EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
1359
1360 WaitForIdle();
1361
1362 // Expect to receive D's message on E and E's message on D.
1363 ASSERT_TRUE(node0.ReadMessage(E, &message));
1364 EXPECT_TRUE(MessageEquals(message, "hi"));
1365 ASSERT_TRUE(node1.ReadMessage(D, &message));
1366 EXPECT_TRUE(MessageEquals(message, "hey"));
1367
1368 // Close E and D.
1369 EXPECT_EQ(OK, node0.node().ClosePort(E));
1370 EXPECT_EQ(OK, node1.node().ClosePort(D));
1371
1372 WaitForIdle();
1373
1374 // Expect everything to have gone away.
1375 EXPECT_TRUE(node0.node().CanShutdownCleanly());
1376 EXPECT_TRUE(node1.node().CanShutdownCleanly());
1377 }
1378
TEST_F(PortsTest,MergePortsFailsGracefully)1379 TEST_F(PortsTest, MergePortsFailsGracefully) {
1380 // This tests that the system remains in a well-defined state if something
1381 // goes wrong during port merge.
1382
1383 TestNode node0(0);
1384 AddNode(&node0);
1385
1386 TestNode node1(1);
1387 AddNode(&node1);
1388
1389 // Setup two independent port pairs, A-B on node0 and C-D on node1.
1390 PortRef A, B, C, D;
1391 EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
1392 EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
1393
1394 ScopedMessage message;
1395 PortRef X, Y;
1396 EXPECT_EQ(OK, node0.node().CreateUninitializedPort(&X));
1397 EXPECT_EQ(OK, node1.node().CreateUninitializedPort(&Y));
1398 EXPECT_EQ(OK, node0.node().InitializePort(X, node1.name(), Y.name()));
1399 EXPECT_EQ(OK, node1.node().InitializePort(Y, node0.name(), X.name()));
1400
1401 // Block the merge from proceeding until we can do something stupid with port
1402 // C. This avoids the test logic racing with async merge logic.
1403 node1.BlockOnEvent(Event::Type::kMergePort);
1404
1405 // Initiate the merge between B and C.
1406 EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
1407
1408 // Move C to a new port E. This is not a sane use of Node's public API but
1409 // is still hypothetically possible. It allows us to force a merge failure
1410 // because C will be in an invalid state by the time the merge is processed.
1411 // As a result, B should be closed.
1412 EXPECT_EQ(OK, node1.SendStringMessageWithPort(Y, "foo", C));
1413
1414 node1.Unblock();
1415
1416 WaitForIdle();
1417
1418 ASSERT_TRUE(node0.ReadMessage(X, &message));
1419 ASSERT_EQ(1u, message->num_ports());
1420 PortRef E;
1421 ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &E));
1422
1423 EXPECT_EQ(OK, node0.node().ClosePort(X));
1424 EXPECT_EQ(OK, node1.node().ClosePort(Y));
1425
1426 WaitForIdle();
1427
1428 // C goes away as a result of normal proxy removal. B should have been closed
1429 // cleanly by the failed MergePorts.
1430 EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(C.name(), &C));
1431 EXPECT_EQ(ERROR_PORT_UNKNOWN, node0.node().GetPort(B.name(), &B));
1432
1433 // Close A, D, and E.
1434 EXPECT_EQ(OK, node0.node().ClosePort(A));
1435 EXPECT_EQ(OK, node1.node().ClosePort(D));
1436 EXPECT_EQ(OK, node0.node().ClosePort(E));
1437
1438 WaitForIdle();
1439
1440 // Expect everything to have gone away.
1441 EXPECT_TRUE(node0.node().CanShutdownCleanly());
1442 EXPECT_TRUE(node1.node().CanShutdownCleanly());
1443 }
1444
TEST_F(PortsTest,RemotePeerStatus)1445 TEST_F(PortsTest, RemotePeerStatus) {
1446 TestNode node0(0);
1447 AddNode(&node0);
1448
1449 TestNode node1(1);
1450 AddNode(&node1);
1451
1452 // Create a local port pair. Neither port should appear to have a remote peer.
1453 PortRef a, b;
1454 PortStatus status;
1455 node0.node().CreatePortPair(&a, &b);
1456 ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
1457 EXPECT_FALSE(status.peer_remote);
1458 ASSERT_EQ(OK, node0.node().GetStatus(b, &status));
1459 EXPECT_FALSE(status.peer_remote);
1460
1461 // Create a port pair spanning the two nodes. Both spanning ports should
1462 // immediately appear to have a remote peer.
1463 PortRef x0, x1;
1464 CreatePortPair(&node0, &x0, &node1, &x1);
1465
1466 ASSERT_EQ(OK, node0.node().GetStatus(x0, &status));
1467 EXPECT_TRUE(status.peer_remote);
1468 ASSERT_EQ(OK, node1.node().GetStatus(x1, &status));
1469 EXPECT_TRUE(status.peer_remote);
1470
1471 PortRef x2, x3;
1472 CreatePortPair(&node0, &x2, &node1, &x3);
1473
1474 // Transfer |b| to |node1| and |x1| to |node0|. i.e., make the local peers
1475 // remote and the remote peers local.
1476 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x2, "foo", b));
1477 EXPECT_EQ(OK, node1.SendStringMessageWithPort(x3, "bar", x1));
1478 WaitForIdle();
1479
1480 ScopedMessage message;
1481 ASSERT_TRUE(node0.ReadMessage(x2, &message));
1482 ASSERT_EQ(1u, message->num_ports());
1483 ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &x1));
1484
1485 ASSERT_TRUE(node1.ReadMessage(x3, &message));
1486 ASSERT_EQ(1u, message->num_ports());
1487 ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &b));
1488
1489 // Now x0-x1 should be local to node0 and a-b should span the nodes.
1490 ASSERT_EQ(OK, node0.node().GetStatus(x0, &status));
1491 EXPECT_FALSE(status.peer_remote);
1492 ASSERT_EQ(OK, node0.node().GetStatus(x1, &status));
1493 EXPECT_FALSE(status.peer_remote);
1494 ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
1495 EXPECT_TRUE(status.peer_remote);
1496 ASSERT_EQ(OK, node1.node().GetStatus(b, &status));
1497 EXPECT_TRUE(status.peer_remote);
1498
1499 // And swap them back one more time.
1500 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x2, "foo", x1));
1501 EXPECT_EQ(OK, node1.SendStringMessageWithPort(x3, "bar", b));
1502 WaitForIdle();
1503
1504 ASSERT_TRUE(node0.ReadMessage(x2, &message));
1505 ASSERT_EQ(1u, message->num_ports());
1506 ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &b));
1507
1508 ASSERT_TRUE(node1.ReadMessage(x3, &message));
1509 ASSERT_EQ(1u, message->num_ports());
1510 ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &x1));
1511
1512 ASSERT_EQ(OK, node0.node().GetStatus(x0, &status));
1513 EXPECT_TRUE(status.peer_remote);
1514 ASSERT_EQ(OK, node1.node().GetStatus(x1, &status));
1515 EXPECT_TRUE(status.peer_remote);
1516 ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
1517 EXPECT_FALSE(status.peer_remote);
1518 ASSERT_EQ(OK, node0.node().GetStatus(b, &status));
1519 EXPECT_FALSE(status.peer_remote);
1520
1521 EXPECT_EQ(OK, node0.node().ClosePort(x0));
1522 EXPECT_EQ(OK, node1.node().ClosePort(x1));
1523 EXPECT_EQ(OK, node0.node().ClosePort(x2));
1524 EXPECT_EQ(OK, node1.node().ClosePort(x3));
1525 EXPECT_EQ(OK, node0.node().ClosePort(a));
1526 EXPECT_EQ(OK, node0.node().ClosePort(b));
1527
1528 EXPECT_TRUE(node0.node().CanShutdownCleanly());
1529 EXPECT_TRUE(node1.node().CanShutdownCleanly());
1530 }
1531
TEST_F(PortsTest,RemotePeerStatusAfterLocalPortMerge)1532 TEST_F(PortsTest, RemotePeerStatusAfterLocalPortMerge) {
1533 TestNode node0(0);
1534 AddNode(&node0);
1535
1536 TestNode node1(1);
1537 AddNode(&node1);
1538
1539 // Set up a-b on node0 and c-d spanning node0-node1.
1540 PortRef a, b, c, d;
1541 node0.node().CreatePortPair(&a, &b);
1542 CreatePortPair(&node0, &c, &node1, &d);
1543
1544 PortStatus status;
1545 ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
1546 EXPECT_FALSE(status.peer_remote);
1547 ASSERT_EQ(OK, node0.node().GetStatus(b, &status));
1548 EXPECT_FALSE(status.peer_remote);
1549 ASSERT_EQ(OK, node0.node().GetStatus(c, &status));
1550 EXPECT_TRUE(status.peer_remote);
1551 ASSERT_EQ(OK, node1.node().GetStatus(d, &status));
1552 EXPECT_TRUE(status.peer_remote);
1553
1554 EXPECT_EQ(OK, node0.node().MergeLocalPorts(b, c));
1555 WaitForIdle();
1556
1557 ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
1558 EXPECT_TRUE(status.peer_remote);
1559 ASSERT_EQ(OK, node1.node().GetStatus(d, &status));
1560 EXPECT_TRUE(status.peer_remote);
1561
1562 EXPECT_EQ(OK, node0.node().ClosePort(a));
1563 EXPECT_EQ(OK, node1.node().ClosePort(d));
1564 EXPECT_TRUE(node0.node().CanShutdownCleanly());
1565 EXPECT_TRUE(node1.node().CanShutdownCleanly());
1566 }
1567
TEST_F(PortsTest,RemotePeerStatusAfterRemotePortMerge)1568 TEST_F(PortsTest, RemotePeerStatusAfterRemotePortMerge) {
1569 TestNode node0(0);
1570 AddNode(&node0);
1571
1572 TestNode node1(1);
1573 AddNode(&node1);
1574
1575 // Set up a-b on node0 and c-d on node1.
1576 PortRef a, b, c, d;
1577 node0.node().CreatePortPair(&a, &b);
1578 node1.node().CreatePortPair(&c, &d);
1579
1580 PortStatus status;
1581 ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
1582 EXPECT_FALSE(status.peer_remote);
1583 ASSERT_EQ(OK, node0.node().GetStatus(b, &status));
1584 EXPECT_FALSE(status.peer_remote);
1585 ASSERT_EQ(OK, node1.node().GetStatus(c, &status));
1586 EXPECT_FALSE(status.peer_remote);
1587 ASSERT_EQ(OK, node1.node().GetStatus(d, &status));
1588 EXPECT_FALSE(status.peer_remote);
1589
1590 EXPECT_EQ(OK, node0.node().MergePorts(b, node1.name(), c.name()));
1591 WaitForIdle();
1592
1593 ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
1594 EXPECT_TRUE(status.peer_remote);
1595 ASSERT_EQ(OK, node1.node().GetStatus(d, &status));
1596 EXPECT_TRUE(status.peer_remote);
1597
1598 EXPECT_EQ(OK, node0.node().ClosePort(a));
1599 EXPECT_EQ(OK, node1.node().ClosePort(d));
1600 EXPECT_TRUE(node0.node().CanShutdownCleanly());
1601 EXPECT_TRUE(node1.node().CanShutdownCleanly());
1602 }
1603
TEST_F(PortsTest,RetransmitUserMessageEvents)1604 TEST_F(PortsTest, RetransmitUserMessageEvents) {
1605 // Ensures that user message events can be retransmitted properly.
1606 TestNode node0(0);
1607 AddNode(&node0);
1608
1609 PortRef a, b;
1610 node0.node().CreatePortPair(&a, &b);
1611
1612 // Ping.
1613 const char* kMessage = "hey";
1614 ScopedMessage message;
1615 EXPECT_EQ(OK, node0.SendStringMessage(a, kMessage));
1616 ASSERT_TRUE(node0.ReadMessage(b, &message));
1617 EXPECT_TRUE(MessageEquals(message, kMessage));
1618
1619 // Pong.
1620 EXPECT_EQ(OK, node0.node().SendUserMessage(b, std::move(message)));
1621 EXPECT_FALSE(message);
1622 ASSERT_TRUE(node0.ReadMessage(a, &message));
1623 EXPECT_TRUE(MessageEquals(message, kMessage));
1624
1625 // Ping again.
1626 EXPECT_EQ(OK, node0.node().SendUserMessage(a, std::move(message)));
1627 EXPECT_FALSE(message);
1628 ASSERT_TRUE(node0.ReadMessage(b, &message));
1629 EXPECT_TRUE(MessageEquals(message, kMessage));
1630
1631 // Pong again!
1632 EXPECT_EQ(OK, node0.node().SendUserMessage(b, std::move(message)));
1633 EXPECT_FALSE(message);
1634 ASSERT_TRUE(node0.ReadMessage(a, &message));
1635 EXPECT_TRUE(MessageEquals(message, kMessage));
1636
1637 EXPECT_EQ(OK, node0.node().ClosePort(a));
1638 EXPECT_EQ(OK, node0.node().ClosePort(b));
1639 }
1640
1641 } // namespace test
1642 } // namespace ports
1643 } // namespace core
1644 } // namespace mojo
1645