1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/system/proxy_message_pipe_endpoint.h"
6
7 #include <string.h>
8
9 #include "base/logging.h"
10 #include "mojo/system/channel.h"
11 #include "mojo/system/local_message_pipe_endpoint.h"
12 #include "mojo/system/message_pipe_dispatcher.h"
13
14 namespace mojo {
15 namespace system {
16
ProxyMessagePipeEndpoint()17 ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint()
18 : local_id_(MessageInTransit::kInvalidEndpointId),
19 remote_id_(MessageInTransit::kInvalidEndpointId),
20 is_peer_open_(true) {
21 }
22
ProxyMessagePipeEndpoint(LocalMessagePipeEndpoint * local_message_pipe_endpoint,bool is_peer_open)23 ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint(
24 LocalMessagePipeEndpoint* local_message_pipe_endpoint,
25 bool is_peer_open)
26 : local_id_(MessageInTransit::kInvalidEndpointId),
27 remote_id_(MessageInTransit::kInvalidEndpointId),
28 is_peer_open_(is_peer_open),
29 paused_message_queue_(MessageInTransitQueue::PassContents(),
30 local_message_pipe_endpoint->message_queue()) {
31 local_message_pipe_endpoint->Close();
32 }
33
~ProxyMessagePipeEndpoint()34 ProxyMessagePipeEndpoint::~ProxyMessagePipeEndpoint() {
35 DCHECK(!is_running());
36 DCHECK(!is_attached());
37 AssertConsistentState();
38 DCHECK(paused_message_queue_.IsEmpty());
39 }
40
GetType() const41 MessagePipeEndpoint::Type ProxyMessagePipeEndpoint::GetType() const {
42 return kTypeProxy;
43 }
44
OnPeerClose()45 bool ProxyMessagePipeEndpoint::OnPeerClose() {
46 DCHECK(is_peer_open_);
47
48 is_peer_open_ = false;
49
50 // If our outgoing message queue isn't empty, we shouldn't be destroyed yet.
51 if (!paused_message_queue_.IsEmpty())
52 return true;
53
54 if (is_attached()) {
55 if (!is_running()) {
56 // If we're not running yet, we can't be destroyed yet, because we're
57 // still waiting for the "run" message from the other side.
58 return true;
59 }
60
61 Detach();
62 }
63
64 return false;
65 }
66
67 // Note: We may have to enqueue messages even when our (local) peer isn't open
68 // -- it may have been written to and closed immediately, before we were ready.
69 // This case is handled in |Run()| (which will call us).
EnqueueMessage(scoped_ptr<MessageInTransit> message)70 void ProxyMessagePipeEndpoint::EnqueueMessage(
71 scoped_ptr<MessageInTransit> message) {
72 if (is_running()) {
73 message->SerializeAndCloseDispatchers(channel_.get());
74
75 message->set_source_id(local_id_);
76 message->set_destination_id(remote_id_);
77 if (!channel_->WriteMessage(message.Pass()))
78 LOG(WARNING) << "Failed to write message to channel";
79 } else {
80 paused_message_queue_.AddMessage(message.Pass());
81 }
82 }
83
Attach(scoped_refptr<Channel> channel,MessageInTransit::EndpointId local_id)84 void ProxyMessagePipeEndpoint::Attach(scoped_refptr<Channel> channel,
85 MessageInTransit::EndpointId local_id) {
86 DCHECK(channel);
87 DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
88
89 DCHECK(!is_attached());
90
91 AssertConsistentState();
92 channel_ = channel;
93 local_id_ = local_id;
94 AssertConsistentState();
95 }
96
Run(MessageInTransit::EndpointId remote_id)97 bool ProxyMessagePipeEndpoint::Run(MessageInTransit::EndpointId remote_id) {
98 // Assertions about arguments:
99 DCHECK_NE(remote_id, MessageInTransit::kInvalidEndpointId);
100
101 // Assertions about current state:
102 DCHECK(is_attached());
103 DCHECK(!is_running());
104
105 AssertConsistentState();
106 remote_id_ = remote_id;
107 AssertConsistentState();
108
109 while (!paused_message_queue_.IsEmpty())
110 EnqueueMessage(paused_message_queue_.GetMessage());
111
112 if (is_peer_open_)
113 return true; // Stay alive.
114
115 // We were just waiting to die.
116 Detach();
117 return false;
118 }
119
OnRemove()120 void ProxyMessagePipeEndpoint::OnRemove() {
121 Detach();
122 }
123
Detach()124 void ProxyMessagePipeEndpoint::Detach() {
125 DCHECK(is_attached());
126
127 AssertConsistentState();
128 channel_->DetachMessagePipeEndpoint(local_id_, remote_id_);
129 channel_ = NULL;
130 local_id_ = MessageInTransit::kInvalidEndpointId;
131 remote_id_ = MessageInTransit::kInvalidEndpointId;
132 paused_message_queue_.Clear();
133 AssertConsistentState();
134 }
135
136 #ifndef NDEBUG
AssertConsistentState() const137 void ProxyMessagePipeEndpoint::AssertConsistentState() const {
138 if (is_attached()) {
139 DCHECK_NE(local_id_, MessageInTransit::kInvalidEndpointId);
140 } else { // Not attached.
141 DCHECK_EQ(local_id_, MessageInTransit::kInvalidEndpointId);
142 DCHECK_EQ(remote_id_, MessageInTransit::kInvalidEndpointId);
143 }
144 }
145 #endif
146
147 } // namespace system
148 } // namespace mojo
149