Lines Matching refs:port
29 bool CanAcceptMoreMessages(const Port* port) { in CanAcceptMoreMessages() argument
32 uint64_t next_sequence_num = port->message_queue.next_sequence_num(); in CanAcceptMoreMessages()
33 if (port->state == Port::kClosed) in CanAcceptMoreMessages()
35 if (port->peer_closed || port->remove_proxy_on_last_message) { in CanAcceptMoreMessages()
36 if (port->last_sequence_num_to_receive == next_sequence_num - 1) in CanAcceptMoreMessages()
46 explicit LockedPort(Port* port) : port_(port) { in LockedPort() argument
105 scoped_refptr<Port> port = GetPort(port_name); in GetPort() local
106 if (!port) in GetPort()
109 *port_ref = PortRef(port_name, std::move(port)); in GetPort()
117 scoped_refptr<Port> port = make_scoped_refptr(new Port(kInitialSequenceNum, in CreateUninitializedPort() local
119 int rv = AddPortWithName(port_name, port); in CreateUninitializedPort()
123 *port_ref = PortRef(port_name, std::move(port)); in CreateUninitializedPort()
130 Port* port = port_ref.port(); in InitializePort() local
133 base::AutoLock lock(port->lock); in InitializePort()
134 if (port->state != Port::kUninitialized) in InitializePort()
137 port->state = Port::kReceiving; in InitializePort()
138 port->peer_node_name = peer_node_name; in InitializePort()
139 port->peer_port_name = peer_port_name; in InitializePort()
171 Port* port = port_ref.port(); in SetUserData() local
173 base::AutoLock lock(port->lock); in SetUserData()
174 if (port->state == Port::kClosed) in SetUserData()
177 port->user_data = std::move(user_data); in SetUserData()
184 Port* port = port_ref.port(); in GetUserData() local
186 base::AutoLock lock(port->lock); in GetUserData()
187 if (port->state == Port::kClosed) in GetUserData()
190 *user_data = port->user_data; in GetUserData()
202 Port* port = port_ref.port(); in ClosePort() local
208 base::AutoLock lock(port->lock); in ClosePort()
209 if (port->state == Port::kUninitialized) { in ClosePort()
215 if (port->state != Port::kReceiving) in ClosePort()
218 port->state = Port::kClosed; in ClosePort()
223 data.last_sequence_num = port->next_sequence_num_to_send - 1; in ClosePort()
225 peer_node_name = port->peer_node_name; in ClosePort()
226 peer_port_name = port->peer_port_name; in ClosePort()
230 port->message_queue.GetReferencedPorts(&referenced_port_names); in ClosePort()
251 Port* port = port_ref.port(); in GetStatus() local
253 base::AutoLock lock(port->lock); in GetStatus()
255 if (port->state != Port::kReceiving) in GetStatus()
258 port_status->has_messages = port->message_queue.HasNextMessage(); in GetStatus()
259 port_status->receiving_messages = CanAcceptMoreMessages(port); in GetStatus()
260 port_status->peer_closed = port->peer_closed; in GetStatus()
275 Port* port = port_ref.port(); in GetMessageIf() local
277 base::AutoLock lock(port->lock); in GetMessageIf()
281 if (port->state != Port::kReceiving) in GetMessageIf()
286 if (!CanAcceptMoreMessages(port)) in GetMessageIf()
289 port->message_queue.GetNextMessageIf(std::move(selector), message); in GetMessageIf()
321 PortRef port; in SendMessage() local
322 if (GetPort(message->ports()[i], &port) == OK) in SendMessage()
323 ClosePort(port); in SendMessage()
358 Port* port = port_ref.port(); in MergePorts() local
361 base::AutoLock lock(port->lock); in MergePorts()
369 WillSendPort(LockedPort(port), destination_node_name, &data.new_port_name, in MergePorts()
380 Port* port0 = port0_ref.port(); in MergeLocalPorts()
381 Port* port1 = port1_ref.port(); in MergeLocalPorts()
434 scoped_refptr<Port> port = GetPort(port_name); in OnUserMessage() local
451 if (port) { in OnUserMessage()
455 base::AutoLock lock(port->lock); in OnUserMessage()
459 if (CanAcceptMoreMessages(port.get())) { in OnUserMessage()
461 port->message_queue.AcceptMessage(std::move(message), &has_next_message); in OnUserMessage()
463 if (port->state == Port::kBuffering) { in OnUserMessage()
465 } else if (port->state == Port::kProxying) { in OnUserMessage()
472 int rv = ForwardMessages_Locked(LockedPort(port.get()), port_name); in OnUserMessage()
476 MaybeRemoveProxy_Locked(LockedPort(port.get()), port_name); in OnUserMessage()
493 PortRef port_ref(port_name, port); in OnUserMessage()
501 scoped_refptr<Port> port = GetPort(port_name); in OnPortAccepted() local
502 if (!port) in OnPortAccepted()
507 << port->peer_port_name << "@" << port->peer_node_name; in OnPortAccepted()
509 return BeginProxying(PortRef(port_name, port)); in OnPortAccepted()
531 scoped_refptr<Port> port = GetPort(port_name); in OnObserveProxy() local
532 if (!port) { in OnObserveProxy()
555 base::AutoLock lock(port->lock); in OnObserveProxy()
557 if (port->peer_node_name == event.proxy_node_name && in OnObserveProxy()
558 port->peer_port_name == event.proxy_port_name) { in OnObserveProxy()
559 if (port->state == Port::kReceiving) { in OnObserveProxy()
560 port->peer_node_name = event.proxy_to_node_name; in OnObserveProxy()
561 port->peer_port_name = event.proxy_to_port_name; in OnObserveProxy()
564 ack.last_sequence_num = port->next_sequence_num_to_send - 1; in OnObserveProxy()
588 port->send_on_proxy_removal.reset( in OnObserveProxy()
599 port->peer_node_name, in OnObserveProxy()
600 NewInternalMessage(port->peer_port_name, in OnObserveProxy()
613 scoped_refptr<Port> port = GetPort(port_name); in OnObserveProxyAck() local
614 if (!port) in OnObserveProxyAck()
619 base::AutoLock lock(port->lock); in OnObserveProxyAck()
621 if (port->state != Port::kProxying) in OnObserveProxyAck()
626 InitiateProxyRemoval(LockedPort(port.get()), port_name); in OnObserveProxyAck()
632 port->remove_proxy_on_last_message = true; in OnObserveProxyAck()
633 port->last_sequence_num_to_receive = last_sequence_num; in OnObserveProxyAck()
635 TryRemoveProxy(PortRef(port_name, port)); in OnObserveProxyAck()
642 scoped_refptr<Port> port = GetPort(port_name); in OnObserveClosure() local
643 if (!port) in OnObserveClosure()
657 base::AutoLock lock(port->lock); in OnObserveClosure()
659 port->peer_closed = true; in OnObserveClosure()
660 port->last_sequence_num_to_receive = last_sequence_num; in OnObserveClosure()
663 << " (state=" << port->state << ") pointing to " in OnObserveClosure()
664 << port->peer_port_name << "@" << port->peer_node_name in OnObserveClosure()
671 if (port->state == Port::kReceiving) { in OnObserveClosure()
681 forwarded_data.last_sequence_num = port->next_sequence_num_to_send - 1; in OnObserveClosure()
689 port->remove_proxy_on_last_message = true; in OnObserveClosure()
690 if (port->state == Port::kProxying) in OnObserveClosure()
696 << port->peer_port_name << "@" << port->peer_node_name in OnObserveClosure()
700 peer_node_name = port->peer_node_name; in OnObserveClosure()
701 peer_port_name = port->peer_port_name; in OnObserveClosure()
704 TryRemoveProxy(PortRef(port_name, port)); in OnObserveClosure()
712 PortRef port_ref(port_name, port); in OnObserveClosure()
720 scoped_refptr<Port> port = GetPort(port_name); in OnMergePort() local
723 << (port ? port->state : -1) << ") merging with proxy " in OnMergePort()
739 } else if (port) { in OnMergePort()
743 base::AutoLock lock(port->lock); in OnMergePort()
745 if (port->state != Port::kReceiving) { in OnMergePort()
755 PortRef port0_ref(port_name, port); in OnMergePort()
788 const scoped_refptr<Port>& port) { in AddPortWithName() argument
791 if (!ports_.insert(std::make_pair(port_name, port)).second) in AddPortWithName()
830 Port* port = port_ref.port(); in SendMessageInternal() local
836 base::AutoLock lock(port->lock); in SendMessageInternal()
838 if (port->state != Port::kReceiving) in SendMessageInternal()
841 if (port->peer_closed) in SendMessageInternal()
844 int rv = WillSendMessage_Locked(LockedPort(port), port_ref.name(), m.get()); in SendMessageInternal()
853 peer_node_name = port->peer_node_name; in SendMessageInternal()
872 Port* port0 = port0_ref.port(); in MergePorts_Locked()
873 Port* port1 = port1_ref.port(); in MergePorts_Locked()
949 void Node::WillSendPort(const LockedPort& port, in WillSendPort() argument
953 port->lock.AssertAcquired(); in WillSendPort()
962 DCHECK(port->state == Port::kReceiving); in WillSendPort()
963 port->state = Port::kBuffering; in WillSendPort()
967 if (port->peer_closed) in WillSendPort()
968 port->remove_proxy_on_last_message = true; in WillSendPort()
972 port_descriptor->peer_node_name = port->peer_node_name; in WillSendPort()
973 port_descriptor->peer_port_name = port->peer_port_name; in WillSendPort()
976 port_descriptor->next_sequence_num_to_send = port->next_sequence_num_to_send; in WillSendPort()
978 port->message_queue.next_sequence_num(); in WillSendPort()
980 port->last_sequence_num_to_receive; in WillSendPort()
981 port_descriptor->peer_closed = port->peer_closed; in WillSendPort()
985 port->peer_node_name = to_node_name; in WillSendPort()
986 port->peer_port_name = new_port_name; in WillSendPort()
991 scoped_refptr<Port> port = make_scoped_refptr( in AcceptPort() local
994 port->state = Port::kReceiving; in AcceptPort()
995 port->peer_node_name = port_descriptor.peer_node_name; in AcceptPort()
996 port->peer_port_name = port_descriptor.peer_port_name; in AcceptPort()
997 port->last_sequence_num_to_receive = in AcceptPort()
999 port->peer_closed = port_descriptor.peer_closed; in AcceptPort()
1002 << port->peer_closed << "; last_sequence_num_to_receive=" in AcceptPort()
1003 << port->last_sequence_num_to_receive << "]"; in AcceptPort()
1007 port->message_queue.set_signalable(false); in AcceptPort()
1009 int rv = AddPortWithName(port_name, port); in AcceptPort()
1021 int Node::WillSendMessage_Locked(const LockedPort& port, in WillSendMessage_Locked() argument
1025 port->lock.AssertAcquired(); in WillSendMessage_Locked()
1034 *sequence_num = port->next_sequence_num_to_send++; in WillSendMessage_Locked()
1061 else if (message->ports()[i] == port->peer_port_name) in WillSendMessage_Locked()
1069 port->next_sequence_num_to_send--; in WillSendMessage_Locked()
1080 port->peer_node_name, in WillSendMessage_Locked()
1094 << " to " << port->peer_port_name << "@" << port->peer_node_name; in WillSendMessage_Locked()
1097 GetMutableEventHeader(message)->port_name = port->peer_port_name; in WillSendMessage_Locked()
1101 int Node::BeginProxying_Locked(const LockedPort& port, in BeginProxying_Locked() argument
1104 port->lock.AssertAcquired(); in BeginProxying_Locked()
1106 if (port->state != Port::kBuffering) in BeginProxying_Locked()
1109 port->state = Port::kProxying; in BeginProxying_Locked()
1111 int rv = ForwardMessages_Locked(LockedPort(port), port_name); in BeginProxying_Locked()
1119 if (port->remove_proxy_on_last_message) { in BeginProxying_Locked()
1120 MaybeRemoveProxy_Locked(LockedPort(port), port_name); in BeginProxying_Locked()
1124 data.last_sequence_num = port->last_sequence_num_to_receive; in BeginProxying_Locked()
1126 port->peer_node_name, in BeginProxying_Locked()
1127 NewInternalMessage(port->peer_port_name, in BeginProxying_Locked()
1130 InitiateProxyRemoval(LockedPort(port), port_name); in BeginProxying_Locked()
1137 Port* port = port_ref.port(); in BeginProxying() local
1140 base::AutoLock lock(port->lock); in BeginProxying()
1142 if (port->state != Port::kBuffering) in BeginProxying()
1145 port->state = Port::kProxying; in BeginProxying()
1147 int rv = ForwardMessages_Locked(LockedPort(port), port_ref.name()); in BeginProxying()
1156 base::AutoLock lock(port->lock); in BeginProxying()
1157 if (port->state != Port::kProxying) in BeginProxying()
1160 should_remove = port->remove_proxy_on_last_message; in BeginProxying()
1164 data.last_sequence_num = port->last_sequence_num_to_receive; in BeginProxying()
1165 peer_node_name = port->peer_node_name; in BeginProxying()
1166 closure_message = NewInternalMessage(port->peer_port_name, in BeginProxying()
1169 InitiateProxyRemoval(LockedPort(port), port_ref.name()); in BeginProxying()
1181 int Node::ForwardMessages_Locked(const LockedPort& port, in ForwardMessages_Locked() argument
1184 port->lock.AssertAcquired(); in ForwardMessages_Locked()
1188 port->message_queue.GetNextMessageIf(nullptr, &message); in ForwardMessages_Locked()
1192 int rv = WillSendMessage_Locked(LockedPort(port), port_name, message.get()); in ForwardMessages_Locked()
1196 delegate_->ForwardMessage(port->peer_node_name, std::move(message)); in ForwardMessages_Locked()
1201 void Node::InitiateProxyRemoval(const LockedPort& port, in InitiateProxyRemoval() argument
1203 port->lock.AssertAcquired(); in InitiateProxyRemoval()
1213 data.proxy_to_node_name = port->peer_node_name; in InitiateProxyRemoval()
1214 data.proxy_to_port_name = port->peer_port_name; in InitiateProxyRemoval()
1217 port->peer_node_name, in InitiateProxyRemoval()
1218 NewInternalMessage(port->peer_port_name, EventType::kObserveProxy, data)); in InitiateProxyRemoval()
1221 void Node::MaybeRemoveProxy_Locked(const LockedPort& port, in MaybeRemoveProxy_Locked() argument
1225 port->lock.AssertAcquired(); in MaybeRemoveProxy_Locked()
1227 DCHECK(port->state == Port::kProxying); in MaybeRemoveProxy_Locked()
1230 if (!port->remove_proxy_on_last_message) in MaybeRemoveProxy_Locked()
1233 if (!CanAcceptMoreMessages(port.get())) { in MaybeRemoveProxy_Locked()
1237 if (port->send_on_proxy_removal) { in MaybeRemoveProxy_Locked()
1238 NodeName to_node = port->send_on_proxy_removal->first; in MaybeRemoveProxy_Locked()
1239 ScopedMessage& message = port->send_on_proxy_removal->second; in MaybeRemoveProxy_Locked()
1242 port->send_on_proxy_removal.reset(); in MaybeRemoveProxy_Locked()
1251 Port* port = port_ref.port(); in TryRemoveProxy() local
1256 base::AutoLock lock(port->lock); in TryRemoveProxy()
1259 if (port->state == Port::kClosed) in TryRemoveProxy()
1262 DCHECK(port->state == Port::kProxying); in TryRemoveProxy()
1265 if (!port->remove_proxy_on_last_message) in TryRemoveProxy()
1268 if (!CanAcceptMoreMessages(port)) { in TryRemoveProxy()
1272 if (port->send_on_proxy_removal) { in TryRemoveProxy()
1273 to_node = port->send_on_proxy_removal->first; in TryRemoveProxy()
1274 msg = std::move(port->send_on_proxy_removal->second); in TryRemoveProxy()
1275 port->send_on_proxy_removal.reset(); in TryRemoveProxy()
1304 Port* port = iter->second.get(); in DestroyAllPortsWithPeer() local
1306 base::AutoLock port_lock(port->lock); in DestroyAllPortsWithPeer()
1308 if (port->peer_node_name == node_name && in DestroyAllPortsWithPeer()
1310 port->peer_port_name == port_name)) { in DestroyAllPortsWithPeer()
1311 if (!port->peer_closed) { in DestroyAllPortsWithPeer()
1316 port->peer_closed = true; in DestroyAllPortsWithPeer()
1317 port->last_sequence_num_to_receive = in DestroyAllPortsWithPeer()
1318 port->message_queue.next_sequence_num() - 1; in DestroyAllPortsWithPeer()
1320 if (port->state == Port::kReceiving) in DestroyAllPortsWithPeer()
1321 ports_to_notify.push_back(PortRef(iter->first, port)); in DestroyAllPortsWithPeer()
1330 if (port->state != Port::kReceiving) { in DestroyAllPortsWithPeer()
1346 for (const auto& port : ports_to_notify) in DestroyAllPortsWithPeer() local
1347 delegate_->PortStatusChanged(port); in DestroyAllPortsWithPeer()