// Copyright 2016 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include #include #include #include #include #include #include "base/logging.h" #include "base/rand_util.h" #include "mojo/edk/system/ports/event.h" #include "mojo/edk/system/ports/node.h" #include "mojo/edk/system/ports/node_delegate.h" #include "testing/gtest/include/gtest/gtest.h" namespace mojo { namespace edk { namespace ports { namespace test { namespace { void LogMessage(const Message* message) { std::stringstream ports; for (size_t i = 0; i < message->num_ports(); ++i) { if (i > 0) ports << ","; ports << message->ports()[i]; } DVLOG(1) << "message: \"" << static_cast(message->payload_bytes()) << "\" ports=[" << ports.str() << "]"; } void ClosePortsInMessage(Node* node, Message* message) { for (size_t i = 0; i < message->num_ports(); ++i) { PortRef port; ASSERT_EQ(OK, node->GetPort(message->ports()[i], &port)); EXPECT_EQ(OK, node->ClosePort(port)); } } class TestMessage : public Message { public: static ScopedMessage NewUserMessage(size_t num_payload_bytes, size_t num_ports) { return ScopedMessage(new TestMessage(num_payload_bytes, num_ports)); } TestMessage(size_t num_payload_bytes, size_t num_ports) : Message(num_payload_bytes, num_ports) { start_ = new char[num_header_bytes_ + num_ports_bytes_ + num_payload_bytes]; InitializeUserMessageHeader(start_); } TestMessage(size_t num_header_bytes, size_t num_payload_bytes, size_t num_ports_bytes) : Message(num_header_bytes, num_payload_bytes, num_ports_bytes) { start_ = new char[num_header_bytes + num_payload_bytes + num_ports_bytes]; } ~TestMessage() override { delete[] start_; } }; struct Task { Task(NodeName node_name, ScopedMessage message) : node_name(node_name), message(std::move(message)), priority(base::RandUint64()) { } NodeName node_name; ScopedMessage message; uint64_t priority; }; struct TaskComparator { bool operator()(const Task* a, const Task* b) { return a->priority < b->priority; } }; const size_t kMaxNodes = 3; std::priority_queue, TaskComparator> task_queue; Node* node_map[kMaxNodes]; Node* GetNode(const NodeName& name) { return node_map[name.v1]; } void SetNode(const NodeName& name, Node* node) { node_map[name.v1] = node; } void PumpTasks() { while (!task_queue.empty()) { Task* task = task_queue.top(); task_queue.pop(); Node* node = GetNode(task->node_name); if (node) node->AcceptMessage(std::move(task->message)); delete task; } } void PumpUntilTask(EventType type) { while (!task_queue.empty()) { Task* task = task_queue.top(); const EventHeader* header = GetEventHeader(*task->message); if (header->type == type) return; task_queue.pop(); Node* node = GetNode(task->node_name); if (node) node->AcceptMessage(std::move(task->message)); delete task; } } void DiscardPendingTasks() { while (!task_queue.empty()) { Task* task = task_queue.top(); task_queue.pop(); delete task; } } int SendStringMessage(Node* node, const PortRef& port, const std::string& s) { size_t size = s.size() + 1; ScopedMessage message = TestMessage::NewUserMessage(size, 0); memcpy(message->mutable_payload_bytes(), s.data(), size); return node->SendMessage(port, std::move(message)); } int SendStringMessageWithPort(Node* node, const PortRef& port, const std::string& s, const PortName& sent_port_name) { size_t size = s.size() + 1; ScopedMessage message = TestMessage::NewUserMessage(size, 1); memcpy(message->mutable_payload_bytes(), s.data(), size); message->mutable_ports()[0] = sent_port_name; return node->SendMessage(port, std::move(message)); } int SendStringMessageWithPort(Node* node, const PortRef& port, const std::string& s, const PortRef& sent_port) { return SendStringMessageWithPort(node, port, s, sent_port.name()); } const char* ToString(const ScopedMessage& message) { return static_cast(message->payload_bytes()); } class TestNodeDelegate : public NodeDelegate { public: explicit TestNodeDelegate(const NodeName& node_name) : node_name_(node_name), drop_messages_(false), read_messages_(true), save_messages_(false) { } void set_drop_messages(bool value) { drop_messages_ = value; } void set_read_messages(bool value) { read_messages_ = value; } void set_save_messages(bool value) { save_messages_ = value; } bool GetSavedMessage(ScopedMessage* message) { if (saved_messages_.empty()) { message->reset(); return false; } *message = std::move(saved_messages_.front()); saved_messages_.pop(); return true; } void GenerateRandomPortName(PortName* port_name) override { static uint64_t next_port_name = 1; port_name->v1 = next_port_name++; port_name->v2 = 0; } void AllocMessage(size_t num_header_bytes, ScopedMessage* message) override { message->reset(new TestMessage(num_header_bytes, 0, 0)); } void ForwardMessage(const NodeName& node_name, ScopedMessage message) override { if (drop_messages_) { DVLOG(1) << "Dropping ForwardMessage from node " << node_name_ << " to " << node_name; ClosePortsInMessage(GetNode(node_name), message.get()); return; } DVLOG(1) << "ForwardMessage from node " << node_name_ << " to " << node_name; task_queue.push(new Task(node_name, std::move(message))); } void BroadcastMessage(ScopedMessage message) override { for (size_t i = 0; i < kMaxNodes; ++i) { Node* node = node_map[i]; // Broadcast doesn't deliver to the local node. if (node && node != GetNode(node_name_)) { // NOTE: We only need to support broadcast of events, which have no // payload or ports bytes. ScopedMessage new_message( new TestMessage(message->num_header_bytes(), 0, 0)); memcpy(new_message->mutable_header_bytes(), message->header_bytes(), message->num_header_bytes()); node->AcceptMessage(std::move(new_message)); } } } void PortStatusChanged(const PortRef& port) override { DVLOG(1) << "PortStatusChanged for " << port.name() << "@" << node_name_; if (!read_messages_) return; Node* node = GetNode(node_name_); for (;;) { ScopedMessage message; int rv = node->GetMessage(port, &message); EXPECT_TRUE(rv == OK || rv == ERROR_PORT_PEER_CLOSED); if (rv == ERROR_PORT_PEER_CLOSED || !message) break; if (save_messages_) { SaveMessage(std::move(message)); } else { LogMessage(message.get()); for (size_t i = 0; i < message->num_ports(); ++i) { std::stringstream buf; buf << "got port: " << message->ports()[i]; PortRef received_port; node->GetPort(message->ports()[i], &received_port); SendStringMessage(node, received_port, buf.str()); // Avoid leaking these ports. node->ClosePort(received_port); } } } } private: void SaveMessage(ScopedMessage message) { saved_messages_.emplace(std::move(message)); } std::queue saved_messages_; NodeName node_name_; bool drop_messages_; bool read_messages_; bool save_messages_; }; class PortsTest : public testing::Test { public: void SetUp() override { DiscardPendingTasks(); SetNode(NodeName(0, 1), nullptr); SetNode(NodeName(1, 1), nullptr); SetNode(NodeName(2, 1), nullptr); } }; } // namespace TEST_F(PortsTest, Basic1) { NodeName node0_name(0, 1); TestNodeDelegate node0_delegate(node0_name); Node node0(node0_name, &node0_delegate); SetNode(node0_name, &node0); NodeName node1_name(1, 1); TestNodeDelegate node1_delegate(node1_name); Node node1(node1_name, &node1_delegate); SetNode(node1_name, &node1); // Setup pipe between node0 and node1. PortRef x0, x1; EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0)); EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1)); EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name())); EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name())); // Transfer a port from node0 to node1. PortRef a0, a1; EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1)); EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "hello", a1)); EXPECT_EQ(OK, node0.ClosePort(a0)); EXPECT_EQ(OK, node0.ClosePort(x0)); EXPECT_EQ(OK, node1.ClosePort(x1)); PumpTasks(); EXPECT_TRUE(node0.CanShutdownCleanly(false)); EXPECT_TRUE(node1.CanShutdownCleanly(false)); } TEST_F(PortsTest, Basic2) { NodeName node0_name(0, 1); TestNodeDelegate node0_delegate(node0_name); Node node0(node0_name, &node0_delegate); SetNode(node0_name, &node0); NodeName node1_name(1, 1); TestNodeDelegate node1_delegate(node1_name); Node node1(node1_name, &node1_delegate); SetNode(node1_name, &node1); // Setup pipe between node0 and node1. PortRef x0, x1; EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0)); EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1)); EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name())); EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name())); PortRef b0, b1; EXPECT_EQ(OK, node0.CreatePortPair(&b0, &b1)); EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "hello", b1)); EXPECT_EQ(OK, SendStringMessage(&node0, b0, "hello again")); // This may cause a SendMessage(b1) failure. EXPECT_EQ(OK, node0.ClosePort(b0)); EXPECT_EQ(OK, node0.ClosePort(x0)); EXPECT_EQ(OK, node1.ClosePort(x1)); PumpTasks(); EXPECT_TRUE(node0.CanShutdownCleanly(false)); EXPECT_TRUE(node1.CanShutdownCleanly(false)); } TEST_F(PortsTest, Basic3) { NodeName node0_name(0, 1); TestNodeDelegate node0_delegate(node0_name); Node node0(node0_name, &node0_delegate); SetNode(node0_name, &node0); NodeName node1_name(1, 1); TestNodeDelegate node1_delegate(node1_name); Node node1(node1_name, &node1_delegate); SetNode(node1_name, &node1); // Setup pipe between node0 and node1. PortRef x0, x1; EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0)); EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1)); EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name())); EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name())); // Transfer a port from node0 to node1. PortRef a0, a1; EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1)); EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "hello", a1)); EXPECT_EQ(OK, SendStringMessage(&node0, a0, "hello again")); // Transfer a0 as well. EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "foo", a0)); PortRef b0, b1; EXPECT_EQ(OK, node0.CreatePortPair(&b0, &b1)); EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "bar", b1)); EXPECT_EQ(OK, SendStringMessage(&node0, b0, "baz")); // This may cause a SendMessage(b1) failure. EXPECT_EQ(OK, node0.ClosePort(b0)); EXPECT_EQ(OK, node0.ClosePort(x0)); EXPECT_EQ(OK, node1.ClosePort(x1)); PumpTasks(); EXPECT_TRUE(node0.CanShutdownCleanly(false)); EXPECT_TRUE(node1.CanShutdownCleanly(false)); } TEST_F(PortsTest, LostConnectionToNode1) { NodeName node0_name(0, 1); TestNodeDelegate node0_delegate(node0_name); Node node0(node0_name, &node0_delegate); SetNode(node0_name, &node0); NodeName node1_name(1, 1); TestNodeDelegate node1_delegate(node1_name); Node node1(node1_name, &node1_delegate); SetNode(node1_name, &node1); // Setup pipe between node0 and node1. PortRef x0, x1; EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0)); EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1)); EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name())); EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name())); // Transfer port to node1 and simulate a lost connection to node1. Dropping // events from node1 is how we simulate the lost connection. node1_delegate.set_drop_messages(true); PortRef a0, a1; EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1)); EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "foo", a1)); PumpTasks(); EXPECT_EQ(OK, node0.LostConnectionToNode(node1_name)); PumpTasks(); EXPECT_EQ(OK, node0.ClosePort(a0)); EXPECT_EQ(OK, node0.ClosePort(x0)); EXPECT_EQ(OK, node1.ClosePort(x1)); PumpTasks(); EXPECT_TRUE(node0.CanShutdownCleanly(false)); EXPECT_TRUE(node1.CanShutdownCleanly(false)); } TEST_F(PortsTest, LostConnectionToNode2) { NodeName node0_name(0, 1); TestNodeDelegate node0_delegate(node0_name); Node node0(node0_name, &node0_delegate); node_map[0] = &node0; NodeName node1_name(1, 1); TestNodeDelegate node1_delegate(node1_name); Node node1(node1_name, &node1_delegate); node_map[1] = &node1; // Setup pipe between node0 and node1. PortRef x0, x1; EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0)); EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1)); EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name())); EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name())); node1_delegate.set_read_messages(false); PortRef a0, a1; EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1)); EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "take a1", a1)); PumpTasks(); node1_delegate.set_drop_messages(true); EXPECT_EQ(OK, node0.LostConnectionToNode(node1_name)); PumpTasks(); ScopedMessage message; EXPECT_EQ(ERROR_PORT_PEER_CLOSED, node0.GetMessage(a0, &message)); EXPECT_FALSE(message); EXPECT_EQ(OK, node0.ClosePort(a0)); EXPECT_EQ(OK, node0.ClosePort(x0)); EXPECT_EQ(OK, node1.GetMessage(x1, &message)); EXPECT_TRUE(message); ClosePortsInMessage(&node1, message.get()); EXPECT_EQ(OK, node1.ClosePort(x1)); PumpTasks(); EXPECT_TRUE(node0.CanShutdownCleanly(false)); EXPECT_TRUE(node1.CanShutdownCleanly(false)); } TEST_F(PortsTest, LostConnectionToNodeWithSecondaryProxy) { // Tests that a proxy gets cleaned up when its indirect peer lives on a lost // node. NodeName node0_name(0, 1); TestNodeDelegate node0_delegate(node0_name); Node node0(node0_name, &node0_delegate); node_map[0] = &node0; NodeName node1_name(1, 1); TestNodeDelegate node1_delegate(node1_name); Node node1(node1_name, &node1_delegate); node_map[1] = &node1; NodeName node2_name(2, 1); TestNodeDelegate node2_delegate(node2_name); Node node2(node2_name, &node2_delegate); node_map[2] = &node2; node1_delegate.set_save_messages(true); // Create A-B spanning nodes 0 and 1 and C-D spanning 1 and 2. PortRef A, B, C, D; EXPECT_EQ(OK, node0.CreateUninitializedPort(&A)); EXPECT_EQ(OK, node1.CreateUninitializedPort(&B)); EXPECT_EQ(OK, node0.InitializePort(A, node1_name, B.name())); EXPECT_EQ(OK, node1.InitializePort(B, node0_name, A.name())); EXPECT_EQ(OK, node1.CreateUninitializedPort(&C)); EXPECT_EQ(OK, node2.CreateUninitializedPort(&D)); EXPECT_EQ(OK, node1.InitializePort(C, node2_name, D.name())); EXPECT_EQ(OK, node2.InitializePort(D, node1_name, C.name())); // Create E-F and send F over A to node 1. PortRef E, F; EXPECT_EQ(OK, node0.CreatePortPair(&E, &F)); EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, ".", F)); PumpTasks(); ScopedMessage message; ASSERT_TRUE(node1_delegate.GetSavedMessage(&message)); ASSERT_EQ(1u, message->num_ports()); EXPECT_EQ(OK, node1.GetPort(message->ports()[0], &F)); // Send F over C to node 2 and then simulate node 2 loss from node 1. Node 1 // will trivially become aware of the loss, and this test verifies that the // port A on node 0 will eventually also become aware of it. EXPECT_EQ(OK, SendStringMessageWithPort(&node1, C, ".", F)); node_map[2] = nullptr; EXPECT_EQ(OK, node1.LostConnectionToNode(node2_name)); PumpTasks(); // Port F should be gone. EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.GetPort(F.name(), &F)); // Port E should have detected peer closure despite the fact that there is // no longer a continuous route from F to E over which the event could travel. PortStatus status; EXPECT_EQ(OK, node0.GetStatus(E, &status)); EXPECT_TRUE(status.peer_closed); EXPECT_EQ(OK, node0.ClosePort(A)); EXPECT_EQ(OK, node1.ClosePort(B)); EXPECT_EQ(OK, node1.ClosePort(C)); EXPECT_EQ(OK, node0.ClosePort(E)); EXPECT_TRUE(node0.CanShutdownCleanly(false)); EXPECT_TRUE(node1.CanShutdownCleanly(false)); } TEST_F(PortsTest, LostConnectionToNodeWithLocalProxy) { // Tests that a proxy gets cleaned up when its direct peer lives on a lost // node and it's predecessor lives on the same node. NodeName node0_name(0, 1); TestNodeDelegate node0_delegate(node0_name); Node node0(node0_name, &node0_delegate); node_map[0] = &node0; NodeName node1_name(1, 1); TestNodeDelegate node1_delegate(node1_name); Node node1(node1_name, &node1_delegate); node_map[1] = &node1; node1_delegate.set_save_messages(true); // Create A-B spanning nodes 0 and 1. PortRef A, B; EXPECT_EQ(OK, node0.CreateUninitializedPort(&A)); EXPECT_EQ(OK, node1.CreateUninitializedPort(&B)); EXPECT_EQ(OK, node0.InitializePort(A, node1_name, B.name())); EXPECT_EQ(OK, node1.InitializePort(B, node0_name, A.name())); // Create C-D and send D over A to node 1. PortRef C, D; EXPECT_EQ(OK, node0.CreatePortPair(&C, &D)); EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, ".", D)); // Pump tasks until the start of port collapse for port D, which should become // a proxy. PumpUntilTask(EventType::kObserveProxy); ScopedMessage message; ASSERT_TRUE(node1_delegate.GetSavedMessage(&message)); ASSERT_EQ(1u, message->num_ports()); PortRef E; EXPECT_EQ(OK, node1.GetPort(message->ports()[0], &E)); EXPECT_EQ(OK, node0.LostConnectionToNode(node1_name)); PumpTasks(); // Port C should have detected peer closure. PortStatus status; EXPECT_EQ(OK, node0.GetStatus(C, &status)); EXPECT_TRUE(status.peer_closed); EXPECT_EQ(OK, node0.ClosePort(A)); EXPECT_EQ(OK, node1.ClosePort(B)); EXPECT_EQ(OK, node0.ClosePort(C)); EXPECT_EQ(OK, node1.ClosePort(E)); EXPECT_TRUE(node0.CanShutdownCleanly(false)); EXPECT_TRUE(node1.CanShutdownCleanly(false)); } TEST_F(PortsTest, GetMessage1) { NodeName node0_name(0, 1); TestNodeDelegate node0_delegate(node0_name); Node node0(node0_name, &node0_delegate); node_map[0] = &node0; PortRef a0, a1; EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1)); ScopedMessage message; EXPECT_EQ(OK, node0.GetMessage(a0, &message)); EXPECT_FALSE(message); EXPECT_EQ(OK, node0.ClosePort(a1)); EXPECT_EQ(OK, node0.GetMessage(a0, &message)); EXPECT_FALSE(message); PumpTasks(); EXPECT_EQ(ERROR_PORT_PEER_CLOSED, node0.GetMessage(a0, &message)); EXPECT_FALSE(message); EXPECT_EQ(OK, node0.ClosePort(a0)); EXPECT_TRUE(node0.CanShutdownCleanly(false)); } TEST_F(PortsTest, GetMessage2) { NodeName node0_name(0, 1); TestNodeDelegate node0_delegate(node0_name); Node node0(node0_name, &node0_delegate); node_map[0] = &node0; node0_delegate.set_read_messages(false); PortRef a0, a1; EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1)); EXPECT_EQ(OK, SendStringMessage(&node0, a1, "1")); ScopedMessage message; EXPECT_EQ(OK, node0.GetMessage(a0, &message)); ASSERT_TRUE(message); EXPECT_EQ(0, strcmp("1", ToString(message))); EXPECT_EQ(OK, node0.ClosePort(a0)); EXPECT_EQ(OK, node0.ClosePort(a1)); EXPECT_TRUE(node0.CanShutdownCleanly(false)); } TEST_F(PortsTest, GetMessage3) { NodeName node0_name(0, 1); TestNodeDelegate node0_delegate(node0_name); Node node0(node0_name, &node0_delegate); node_map[0] = &node0; node0_delegate.set_read_messages(false); PortRef a0, a1; EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1)); const char* kStrings[] = { "1", "2", "3" }; for (size_t i = 0; i < sizeof(kStrings)/sizeof(kStrings[0]); ++i) EXPECT_EQ(OK, SendStringMessage(&node0, a1, kStrings[i])); ScopedMessage message; for (size_t i = 0; i < sizeof(kStrings)/sizeof(kStrings[0]); ++i) { EXPECT_EQ(OK, node0.GetMessage(a0, &message)); ASSERT_TRUE(message); EXPECT_EQ(0, strcmp(kStrings[i], ToString(message))); DVLOG(1) << "got " << kStrings[i]; } EXPECT_EQ(OK, node0.ClosePort(a0)); EXPECT_EQ(OK, node0.ClosePort(a1)); EXPECT_TRUE(node0.CanShutdownCleanly(false)); } TEST_F(PortsTest, Delegation1) { NodeName node0_name(0, 1); TestNodeDelegate node0_delegate(node0_name); Node node0(node0_name, &node0_delegate); SetNode(node0_name, &node0); NodeName node1_name(1, 1); TestNodeDelegate node1_delegate(node1_name); Node node1(node1_name, &node1_delegate); node_map[1] = &node1; node0_delegate.set_save_messages(true); node1_delegate.set_save_messages(true); // Setup pipe between node0 and node1. PortRef x0, x1; EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0)); EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1)); EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name())); EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name())); // In this test, we send a message to a port that has been moved. PortRef a0, a1; EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1)); EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "a1", a1)); PumpTasks(); ScopedMessage message; ASSERT_TRUE(node1_delegate.GetSavedMessage(&message)); ASSERT_EQ(1u, message->num_ports()); // This is "a1" from the point of view of node1. PortName a2_name = message->ports()[0]; EXPECT_EQ(OK, SendStringMessageWithPort(&node1, x1, "a2", a2_name)); PumpTasks(); EXPECT_EQ(OK, SendStringMessage(&node0, a0, "hello")); PumpTasks(); ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); ASSERT_EQ(1u, message->num_ports()); // This is "a2" from the point of view of node1. PortName a3_name = message->ports()[0]; PortRef a3; EXPECT_EQ(OK, node0.GetPort(a3_name, &a3)); EXPECT_EQ(0, strcmp("a2", ToString(message))); ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); EXPECT_EQ(0u, message->num_ports()); EXPECT_EQ(0, strcmp("hello", ToString(message))); EXPECT_EQ(OK, node0.ClosePort(a0)); EXPECT_EQ(OK, node0.ClosePort(a3)); EXPECT_EQ(OK, node0.ClosePort(x0)); EXPECT_EQ(OK, node1.ClosePort(x1)); EXPECT_TRUE(node0.CanShutdownCleanly(false)); EXPECT_TRUE(node1.CanShutdownCleanly(false)); } TEST_F(PortsTest, Delegation2) { NodeName node0_name(0, 1); TestNodeDelegate node0_delegate(node0_name); Node node0(node0_name, &node0_delegate); SetNode(node0_name, &node0); NodeName node1_name(1, 1); TestNodeDelegate node1_delegate(node1_name); Node node1(node1_name, &node1_delegate); node_map[1] = &node1; node0_delegate.set_save_messages(true); node1_delegate.set_save_messages(true); for (int i = 0; i < 10; ++i) { // Setup pipe a<->b between node0 and node1. PortRef A, B; EXPECT_EQ(OK, node0.CreateUninitializedPort(&A)); EXPECT_EQ(OK, node1.CreateUninitializedPort(&B)); EXPECT_EQ(OK, node0.InitializePort(A, node1_name, B.name())); EXPECT_EQ(OK, node1.InitializePort(B, node0_name, A.name())); PortRef C, D; EXPECT_EQ(OK, node0.CreatePortPair(&C, &D)); PortRef E, F; EXPECT_EQ(OK, node0.CreatePortPair(&E, &F)); // Pass D over A to B. EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, "1", D)); // Pass F over C to D. EXPECT_EQ(OK, SendStringMessageWithPort(&node0, C, "1", F)); // This message should find its way to node1. EXPECT_EQ(OK, SendStringMessage(&node0, E, "hello")); PumpTasks(); EXPECT_EQ(OK, node0.ClosePort(C)); EXPECT_EQ(OK, node0.ClosePort(E)); EXPECT_EQ(OK, node0.ClosePort(A)); EXPECT_EQ(OK, node1.ClosePort(B)); for (;;) { ScopedMessage message; if (node1_delegate.GetSavedMessage(&message)) { ClosePortsInMessage(&node1, message.get()); if (strcmp("hello", ToString(message)) == 0) break; } else { ASSERT_TRUE(false); // "hello" message not delivered! break; } } PumpTasks(); // Because ClosePort may have generated tasks. } EXPECT_TRUE(node0.CanShutdownCleanly(false)); EXPECT_TRUE(node1.CanShutdownCleanly(false)); } TEST_F(PortsTest, SendUninitialized) { NodeName node0_name(0, 1); TestNodeDelegate node0_delegate(node0_name); Node node0(node0_name, &node0_delegate); node_map[0] = &node0; PortRef x0; EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0)); EXPECT_EQ(ERROR_PORT_STATE_UNEXPECTED, SendStringMessage(&node0, x0, "oops")); EXPECT_EQ(OK, node0.ClosePort(x0)); EXPECT_TRUE(node0.CanShutdownCleanly(false)); } TEST_F(PortsTest, SendFailure) { NodeName node0_name(0, 1); TestNodeDelegate node0_delegate(node0_name); Node node0(node0_name, &node0_delegate); node_map[0] = &node0; node0_delegate.set_save_messages(true); PortRef A, B; EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); // Try to send A over itself. EXPECT_EQ(ERROR_PORT_CANNOT_SEND_SELF, SendStringMessageWithPort(&node0, A, "oops", A)); // Try to send B over A. EXPECT_EQ(ERROR_PORT_CANNOT_SEND_PEER, SendStringMessageWithPort(&node0, A, "nope", B)); // B should be closed immediately. EXPECT_EQ(ERROR_PORT_UNKNOWN, node0.GetPort(B.name(), &B)); PumpTasks(); // There should have been no messages accepted. ScopedMessage message; EXPECT_FALSE(node0_delegate.GetSavedMessage(&message)); EXPECT_EQ(OK, node0.ClosePort(A)); PumpTasks(); EXPECT_TRUE(node0.CanShutdownCleanly(false)); } TEST_F(PortsTest, DontLeakUnreceivedPorts) { NodeName node0_name(0, 1); TestNodeDelegate node0_delegate(node0_name); Node node0(node0_name, &node0_delegate); node_map[0] = &node0; node0_delegate.set_read_messages(false); PortRef A, B; EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); PortRef C, D; EXPECT_EQ(OK, node0.CreatePortPair(&C, &D)); EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, "foo", D)); PumpTasks(); EXPECT_EQ(OK, node0.ClosePort(C)); EXPECT_EQ(OK, node0.ClosePort(A)); EXPECT_EQ(OK, node0.ClosePort(B)); PumpTasks(); EXPECT_TRUE(node0.CanShutdownCleanly(false)); } TEST_F(PortsTest, AllowShutdownWithLocalPortsOpen) { NodeName node0_name(0, 1); TestNodeDelegate node0_delegate(node0_name); Node node0(node0_name, &node0_delegate); node_map[0] = &node0; node0_delegate.set_save_messages(true); PortRef A, B; EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); PortRef C, D; EXPECT_EQ(OK, node0.CreatePortPair(&C, &D)); EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, "foo", D)); ScopedMessage message; EXPECT_TRUE(node0_delegate.GetSavedMessage(&message)); ASSERT_EQ(1u, message->num_ports()); PortRef E; ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &E)); EXPECT_TRUE(node0.CanShutdownCleanly(true)); PumpTasks(); EXPECT_TRUE(node0.CanShutdownCleanly(true)); EXPECT_FALSE(node0.CanShutdownCleanly(false)); EXPECT_EQ(OK, node0.ClosePort(A)); EXPECT_EQ(OK, node0.ClosePort(B)); EXPECT_EQ(OK, node0.ClosePort(C)); EXPECT_EQ(OK, node0.ClosePort(E)); PumpTasks(); EXPECT_TRUE(node0.CanShutdownCleanly(false)); } TEST_F(PortsTest, ProxyCollapse1) { NodeName node0_name(0, 1); TestNodeDelegate node0_delegate(node0_name); Node node0(node0_name, &node0_delegate); node_map[0] = &node0; node0_delegate.set_save_messages(true); PortRef A, B; EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); PortRef X, Y; EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y)); ScopedMessage message; // Send B and receive it as C. EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", B)); ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); ASSERT_EQ(1u, message->num_ports()); PortRef C; ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &C)); // Send C and receive it as D. EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", C)); ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); ASSERT_EQ(1u, message->num_ports()); PortRef D; ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &D)); // Send D and receive it as E. EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", D)); ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); ASSERT_EQ(1u, message->num_ports()); PortRef E; ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &E)); EXPECT_EQ(OK, node0.ClosePort(X)); EXPECT_EQ(OK, node0.ClosePort(Y)); EXPECT_EQ(OK, node0.ClosePort(A)); EXPECT_EQ(OK, node0.ClosePort(E)); PumpTasks(); EXPECT_TRUE(node0.CanShutdownCleanly(false)); } TEST_F(PortsTest, ProxyCollapse2) { NodeName node0_name(0, 1); TestNodeDelegate node0_delegate(node0_name); Node node0(node0_name, &node0_delegate); node_map[0] = &node0; node0_delegate.set_save_messages(true); PortRef A, B; EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); PortRef X, Y; EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y)); ScopedMessage message; // Send B and receive it as C. EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", B)); ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); ASSERT_EQ(1u, message->num_ports()); PortRef C; ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &C)); // Send A and receive it as D. EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", A)); ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); ASSERT_EQ(1u, message->num_ports()); PortRef D; ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &D)); // At this point we have a scenario with: // // D -> [B] -> C -> [A] // // Ensure that the proxies can collapse. EXPECT_EQ(OK, node0.ClosePort(X)); EXPECT_EQ(OK, node0.ClosePort(Y)); EXPECT_EQ(OK, node0.ClosePort(C)); EXPECT_EQ(OK, node0.ClosePort(D)); PumpTasks(); EXPECT_TRUE(node0.CanShutdownCleanly(false)); } TEST_F(PortsTest, SendWithClosedPeer) { // This tests that if a port is sent when its peer is already known to be // closed, the newly created port will be aware of that peer closure, and the // proxy will eventually collapse. NodeName node0_name(0, 1); TestNodeDelegate node0_delegate(node0_name); Node node0(node0_name, &node0_delegate); node_map[0] = &node0; node0_delegate.set_read_messages(false); // Send a message from A to B, then close A. PortRef A, B; EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); EXPECT_EQ(OK, SendStringMessage(&node0, A, "hey")); EXPECT_EQ(OK, node0.ClosePort(A)); PumpTasks(); // Now send B over X-Y as new port C. PortRef X, Y; EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y)); node0_delegate.set_read_messages(true); node0_delegate.set_save_messages(true); EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", B)); EXPECT_EQ(OK, node0.ClosePort(X)); EXPECT_EQ(OK, node0.ClosePort(Y)); ScopedMessage message; ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); ASSERT_EQ(1u, message->num_ports()); PortRef C; ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &C)); PumpTasks(); // C should receive the message originally sent to B, and it should also be // aware of A's closure. ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); EXPECT_EQ(0, strcmp("hey", ToString(message))); PortStatus status; EXPECT_EQ(OK, node0.GetStatus(C, &status)); EXPECT_FALSE(status.receiving_messages); EXPECT_FALSE(status.has_messages); EXPECT_TRUE(status.peer_closed); node0.ClosePort(C); EXPECT_TRUE(node0.CanShutdownCleanly(false)); } TEST_F(PortsTest, SendWithClosedPeerSent) { // This tests that if a port is closed while some number of proxies are still // routing messages (directly or indirectly) to it, that the peer port is // eventually notified of the closure, and the dead-end proxies will // eventually be removed. NodeName node0_name(0, 1); TestNodeDelegate node0_delegate(node0_name); Node node0(node0_name, &node0_delegate); node_map[0] = &node0; node0_delegate.set_save_messages(true); PortRef X, Y; EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y)); PortRef A, B; EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); ScopedMessage message; // Send A as new port C. EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", A)); ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); ASSERT_EQ(1u, message->num_ports()); PortRef C; ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &C)); // Send C as new port D. EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", C)); ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); ASSERT_EQ(1u, message->num_ports()); PortRef D; ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &D)); node0_delegate.set_read_messages(false); // Send a message to B through D, then close D. EXPECT_EQ(OK, SendStringMessage(&node0, D, "hey")); EXPECT_EQ(OK, node0.ClosePort(D)); PumpTasks(); // Now send B as new port E. node0_delegate.set_read_messages(true); EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", B)); EXPECT_EQ(OK, node0.ClosePort(X)); EXPECT_EQ(OK, node0.ClosePort(Y)); ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); ASSERT_EQ(1u, message->num_ports()); PortRef E; ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &E)); PumpTasks(); // E should receive the message originally sent to B, and it should also be // aware of D's closure. ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); EXPECT_EQ(0, strcmp("hey", ToString(message))); PortStatus status; EXPECT_EQ(OK, node0.GetStatus(E, &status)); EXPECT_FALSE(status.receiving_messages); EXPECT_FALSE(status.has_messages); EXPECT_TRUE(status.peer_closed); node0.ClosePort(E); PumpTasks(); EXPECT_TRUE(node0.CanShutdownCleanly(false)); } TEST_F(PortsTest, MergePorts) { NodeName node0_name(0, 1); TestNodeDelegate node0_delegate(node0_name); Node node0(node0_name, &node0_delegate); node_map[0] = &node0; NodeName node1_name(1, 1); TestNodeDelegate node1_delegate(node1_name); Node node1(node1_name, &node1_delegate); node_map[1] = &node1; // Setup two independent port pairs, A-B on node0 and C-D on node1. PortRef A, B, C, D; EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); EXPECT_EQ(OK, node1.CreatePortPair(&C, &D)); node0_delegate.set_read_messages(false); node1_delegate.set_save_messages(true); // Write a message on A. EXPECT_EQ(OK, SendStringMessage(&node0, A, "hey")); PumpTasks(); // Initiate a merge between B and C. EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name())); PumpTasks(); // Expect only two receiving ports to be left after pumping tasks. EXPECT_TRUE(node0.CanShutdownCleanly(true)); EXPECT_TRUE(node1.CanShutdownCleanly(true)); // Expect D to have received the message sent on A. ScopedMessage message; ASSERT_TRUE(node1_delegate.GetSavedMessage(&message)); EXPECT_EQ(0, strcmp("hey", ToString(message))); EXPECT_EQ(OK, node0.ClosePort(A)); EXPECT_EQ(OK, node1.ClosePort(D)); // No more ports should be open. EXPECT_TRUE(node0.CanShutdownCleanly(false)); EXPECT_TRUE(node1.CanShutdownCleanly(false)); } TEST_F(PortsTest, MergePortWithClosedPeer1) { // This tests that the right thing happens when initiating a merge on a port // whose peer has already been closed. NodeName node0_name(0, 1); TestNodeDelegate node0_delegate(node0_name); Node node0(node0_name, &node0_delegate); node_map[0] = &node0; NodeName node1_name(1, 1); TestNodeDelegate node1_delegate(node1_name); Node node1(node1_name, &node1_delegate); node_map[1] = &node1; // Setup two independent port pairs, A-B on node0 and C-D on node1. PortRef A, B, C, D; EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); EXPECT_EQ(OK, node1.CreatePortPair(&C, &D)); node0_delegate.set_read_messages(false); node1_delegate.set_save_messages(true); // Write a message on A. EXPECT_EQ(OK, SendStringMessage(&node0, A, "hey")); PumpTasks(); // Close A. EXPECT_EQ(OK, node0.ClosePort(A)); // Initiate a merge between B and C. EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name())); PumpTasks(); // Expect only one receiving port to be left after pumping tasks. EXPECT_TRUE(node0.CanShutdownCleanly(false)); EXPECT_TRUE(node1.CanShutdownCleanly(true)); // Expect D to have received the message sent on A. ScopedMessage message; ASSERT_TRUE(node1_delegate.GetSavedMessage(&message)); EXPECT_EQ(0, strcmp("hey", ToString(message))); EXPECT_EQ(OK, node1.ClosePort(D)); // No more ports should be open. EXPECT_TRUE(node0.CanShutdownCleanly(false)); EXPECT_TRUE(node1.CanShutdownCleanly(false)); } TEST_F(PortsTest, MergePortWithClosedPeer2) { // This tests that the right thing happens when merging into a port whose peer // has already been closed. NodeName node0_name(0, 1); TestNodeDelegate node0_delegate(node0_name); Node node0(node0_name, &node0_delegate); node_map[0] = &node0; NodeName node1_name(1, 1); TestNodeDelegate node1_delegate(node1_name); Node node1(node1_name, &node1_delegate); node_map[1] = &node1; // Setup two independent port pairs, A-B on node0 and C-D on node1. PortRef A, B, C, D; EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); EXPECT_EQ(OK, node1.CreatePortPair(&C, &D)); node0_delegate.set_save_messages(true); node1_delegate.set_read_messages(false); // Write a message on D. EXPECT_EQ(OK, SendStringMessage(&node0, D, "hey")); PumpTasks(); // Close D. EXPECT_EQ(OK, node1.ClosePort(D)); // Initiate a merge between B and C. EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name())); PumpTasks(); // Expect only one receiving port to be left after pumping tasks. EXPECT_TRUE(node0.CanShutdownCleanly(true)); EXPECT_TRUE(node1.CanShutdownCleanly(false)); // Expect A to have received the message sent on D. ScopedMessage message; ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); EXPECT_EQ(0, strcmp("hey", ToString(message))); EXPECT_EQ(OK, node0.ClosePort(A)); // No more ports should be open. EXPECT_TRUE(node0.CanShutdownCleanly(false)); EXPECT_TRUE(node1.CanShutdownCleanly(false)); } TEST_F(PortsTest, MergePortsWithClosedPeers) { // This tests that no residual ports are left behind if two ports are merged // when both of their peers have been closed. NodeName node0_name(0, 1); TestNodeDelegate node0_delegate(node0_name); Node node0(node0_name, &node0_delegate); node_map[0] = &node0; NodeName node1_name(1, 1); TestNodeDelegate node1_delegate(node1_name); Node node1(node1_name, &node1_delegate); node_map[1] = &node1; // Setup two independent port pairs, A-B on node0 and C-D on node1. PortRef A, B, C, D; EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); EXPECT_EQ(OK, node1.CreatePortPair(&C, &D)); node0_delegate.set_save_messages(true); node1_delegate.set_read_messages(false); // Close A and D. EXPECT_EQ(OK, node0.ClosePort(A)); EXPECT_EQ(OK, node1.ClosePort(D)); PumpTasks(); // Initiate a merge between B and C. EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name())); PumpTasks(); // Expect everything to have gone away. EXPECT_TRUE(node0.CanShutdownCleanly(false)); EXPECT_TRUE(node1.CanShutdownCleanly(false)); } TEST_F(PortsTest, MergePortsWithMovedPeers) { // This tests that no ports can be merged successfully even if their peers // are moved around. NodeName node0_name(0, 1); TestNodeDelegate node0_delegate(node0_name); Node node0(node0_name, &node0_delegate); node_map[0] = &node0; NodeName node1_name(1, 1); TestNodeDelegate node1_delegate(node1_name); Node node1(node1_name, &node1_delegate); node_map[1] = &node1; node0_delegate.set_save_messages(true); node1_delegate.set_read_messages(false); // Setup two independent port pairs, A-B on node0 and C-D on node1. PortRef A, B, C, D; EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); EXPECT_EQ(OK, node1.CreatePortPair(&C, &D)); // Set up another pair X-Y for moving ports on node0. PortRef X, Y; EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y)); ScopedMessage message; // Move A to new port E. EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", A)); ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); ASSERT_EQ(1u, message->num_ports()); PortRef E; ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &E)); EXPECT_EQ(OK, node0.ClosePort(X)); EXPECT_EQ(OK, node0.ClosePort(Y)); node0_delegate.set_read_messages(false); // Write messages on E and D. EXPECT_EQ(OK, SendStringMessage(&node0, E, "hey")); EXPECT_EQ(OK, SendStringMessage(&node1, D, "hi")); // Initiate a merge between B and C. EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name())); node0_delegate.set_read_messages(true); node1_delegate.set_read_messages(true); node1_delegate.set_save_messages(true); PumpTasks(); // Expect to receive D's message on E and E's message on D. ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); EXPECT_EQ(0, strcmp("hi", ToString(message))); ASSERT_TRUE(node1_delegate.GetSavedMessage(&message)); EXPECT_EQ(0, strcmp("hey", ToString(message))); // Close E and D. EXPECT_EQ(OK, node0.ClosePort(E)); EXPECT_EQ(OK, node1.ClosePort(D)); PumpTasks(); // Expect everything to have gone away. EXPECT_TRUE(node0.CanShutdownCleanly(false)); EXPECT_TRUE(node1.CanShutdownCleanly(false)); } TEST_F(PortsTest, MergePortsFailsGracefully) { // This tests that the system remains in a well-defined state if something // goes wrong during port merge. NodeName node0_name(0, 1); TestNodeDelegate node0_delegate(node0_name); Node node0(node0_name, &node0_delegate); node_map[0] = &node0; NodeName node1_name(1, 1); TestNodeDelegate node1_delegate(node1_name); Node node1(node1_name, &node1_delegate); node_map[1] = &node1; // Setup two independent port pairs, A-B on node0 and C-D on node1. PortRef A, B, C, D; EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); EXPECT_EQ(OK, node1.CreatePortPair(&C, &D)); PumpTasks(); // Initiate a merge between B and C. EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name())); // Move C to a new port E. This is dumb and nobody should do it, but it's // possible. MergePorts will fail as a result because C won't be in a // receiving state when the event arrives at node1, so B should be closed. ScopedMessage message; PortRef X, Y; EXPECT_EQ(OK, node1.CreatePortPair(&X, &Y)); node1_delegate.set_save_messages(true); EXPECT_EQ(OK, SendStringMessageWithPort(&node1, X, "foo", C)); ASSERT_TRUE(node1_delegate.GetSavedMessage(&message)); ASSERT_EQ(1u, message->num_ports()); PortRef E; ASSERT_EQ(OK, node1.GetPort(message->ports()[0], &E)); EXPECT_EQ(OK, node1.ClosePort(X)); EXPECT_EQ(OK, node1.ClosePort(Y)); // C goes away as a result of normal proxy removal. PumpTasks(); EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.GetPort(C.name(), &C)); // B should have been closed cleanly. EXPECT_EQ(ERROR_PORT_UNKNOWN, node0.GetPort(B.name(), &B)); // Close A, D, and E. EXPECT_EQ(OK, node0.ClosePort(A)); EXPECT_EQ(OK, node1.ClosePort(D)); EXPECT_EQ(OK, node1.ClosePort(E)); PumpTasks(); // Expect everything to have gone away. EXPECT_TRUE(node0.CanShutdownCleanly(false)); EXPECT_TRUE(node1.CanShutdownCleanly(false)); } } // namespace test } // namespace ports } // namespace edk } // namespace mojo