• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/system/proxy_message_pipe_endpoint.h"
6 
7 #include <string.h>
8 
9 #include "base/logging.h"
10 #include "mojo/system/channel.h"
11 #include "mojo/system/local_message_pipe_endpoint.h"
12 #include "mojo/system/message_pipe_dispatcher.h"
13 
14 namespace mojo {
15 namespace system {
16 
ProxyMessagePipeEndpoint()17 ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint()
18     : local_id_(MessageInTransit::kInvalidEndpointId),
19       remote_id_(MessageInTransit::kInvalidEndpointId),
20       is_peer_open_(true) {
21 }
22 
ProxyMessagePipeEndpoint(LocalMessagePipeEndpoint * local_message_pipe_endpoint,bool is_peer_open)23 ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint(
24     LocalMessagePipeEndpoint* local_message_pipe_endpoint,
25     bool is_peer_open)
26     : local_id_(MessageInTransit::kInvalidEndpointId),
27       remote_id_(MessageInTransit::kInvalidEndpointId),
28       is_peer_open_(is_peer_open),
29       paused_message_queue_(MessageInTransitQueue::PassContents(),
30                             local_message_pipe_endpoint->message_queue()) {
31   local_message_pipe_endpoint->Close();
32 }
33 
~ProxyMessagePipeEndpoint()34 ProxyMessagePipeEndpoint::~ProxyMessagePipeEndpoint() {
35   DCHECK(!is_running());
36   DCHECK(!is_attached());
37   AssertConsistentState();
38   DCHECK(paused_message_queue_.IsEmpty());
39 }
40 
GetType() const41 MessagePipeEndpoint::Type ProxyMessagePipeEndpoint::GetType() const {
42   return kTypeProxy;
43 }
44 
OnPeerClose()45 bool ProxyMessagePipeEndpoint::OnPeerClose() {
46   DCHECK(is_peer_open_);
47 
48   is_peer_open_ = false;
49 
50   // If our outgoing message queue isn't empty, we shouldn't be destroyed yet.
51   if (!paused_message_queue_.IsEmpty())
52     return true;
53 
54   if (is_attached()) {
55     if (!is_running()) {
56       // If we're not running yet, we can't be destroyed yet, because we're
57       // still waiting for the "run" message from the other side.
58       return true;
59     }
60 
61     Detach();
62   }
63 
64   return false;
65 }
66 
67 // Note: We may have to enqueue messages even when our (local) peer isn't open
68 // -- it may have been written to and closed immediately, before we were ready.
69 // This case is handled in |Run()| (which will call us).
EnqueueMessage(scoped_ptr<MessageInTransit> message)70 void ProxyMessagePipeEndpoint::EnqueueMessage(
71     scoped_ptr<MessageInTransit> message) {
72   if (is_running()) {
73     message->SerializeAndCloseDispatchers(channel_.get());
74 
75     message->set_source_id(local_id_);
76     message->set_destination_id(remote_id_);
77     if (!channel_->WriteMessage(message.Pass()))
78       LOG(WARNING) << "Failed to write message to channel";
79   } else {
80     paused_message_queue_.AddMessage(message.Pass());
81   }
82 }
83 
Attach(scoped_refptr<Channel> channel,MessageInTransit::EndpointId local_id)84 void ProxyMessagePipeEndpoint::Attach(scoped_refptr<Channel> channel,
85                                       MessageInTransit::EndpointId local_id) {
86   DCHECK(channel);
87   DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
88 
89   DCHECK(!is_attached());
90 
91   AssertConsistentState();
92   channel_ = channel;
93   local_id_ = local_id;
94   AssertConsistentState();
95 }
96 
Run(MessageInTransit::EndpointId remote_id)97 bool ProxyMessagePipeEndpoint::Run(MessageInTransit::EndpointId remote_id) {
98   // Assertions about arguments:
99   DCHECK_NE(remote_id, MessageInTransit::kInvalidEndpointId);
100 
101   // Assertions about current state:
102   DCHECK(is_attached());
103   DCHECK(!is_running());
104 
105   AssertConsistentState();
106   remote_id_ = remote_id;
107   AssertConsistentState();
108 
109   while (!paused_message_queue_.IsEmpty())
110     EnqueueMessage(paused_message_queue_.GetMessage());
111 
112   if (is_peer_open_)
113     return true;  // Stay alive.
114 
115   // We were just waiting to die.
116   Detach();
117   return false;
118 }
119 
OnRemove()120 void ProxyMessagePipeEndpoint::OnRemove() {
121   Detach();
122 }
123 
Detach()124 void ProxyMessagePipeEndpoint::Detach() {
125   DCHECK(is_attached());
126 
127   AssertConsistentState();
128   channel_->DetachMessagePipeEndpoint(local_id_, remote_id_);
129   channel_ = NULL;
130   local_id_ = MessageInTransit::kInvalidEndpointId;
131   remote_id_ = MessageInTransit::kInvalidEndpointId;
132   paused_message_queue_.Clear();
133   AssertConsistentState();
134 }
135 
136 #ifndef NDEBUG
AssertConsistentState() const137 void ProxyMessagePipeEndpoint::AssertConsistentState() const {
138   if (is_attached()) {
139     DCHECK_NE(local_id_, MessageInTransit::kInvalidEndpointId);
140   } else {  // Not attached.
141     DCHECK_EQ(local_id_, MessageInTransit::kInvalidEndpointId);
142     DCHECK_EQ(remote_id_, MessageInTransit::kInvalidEndpointId);
143   }
144 }
145 #endif
146 
147 }  // namespace system
148 }  // namespace mojo
149