• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/system/channel_endpoint.h"
6 
7 #include "base/logging.h"
8 #include "mojo/system/channel.h"
9 #include "mojo/system/message_pipe.h"
10 
11 namespace mojo {
12 namespace system {
13 
ChannelEndpoint(MessagePipe * message_pipe,unsigned port)14 ChannelEndpoint::ChannelEndpoint(MessagePipe* message_pipe, unsigned port)
15     : state_(STATE_NORMAL),
16       message_pipe_(message_pipe),
17       port_(port),
18       channel_(),
19       local_id_(MessageInTransit::kInvalidEndpointId),
20       remote_id_(MessageInTransit::kInvalidEndpointId) {
21   DCHECK(message_pipe_.get());
22   DCHECK(port_ == 0 || port_ == 1);
23 }
24 
TakeMessages(MessageInTransitQueue * message_queue)25 void ChannelEndpoint::TakeMessages(MessageInTransitQueue* message_queue) {
26   DCHECK(paused_message_queue_.IsEmpty());
27   paused_message_queue_.Swap(message_queue);
28 }
29 
EnqueueMessage(scoped_ptr<MessageInTransit> message)30 bool ChannelEndpoint::EnqueueMessage(scoped_ptr<MessageInTransit> message) {
31   DCHECK(message);
32 
33   base::AutoLock locker(lock_);
34 
35   if (!channel_ || remote_id_ == MessageInTransit::kInvalidEndpointId) {
36     // We may reach here if we haven't been attached or run yet.
37     // TODO(vtl): We may also reach here if the channel is shut down early for
38     // some reason (with live message pipes on it). We can't check |state_| yet,
39     // until it's protected under lock, but in this case we should return false
40     // (and not enqueue any messages).
41     paused_message_queue_.AddMessage(message.Pass());
42     return true;
43   }
44 
45   // TODO(vtl): Currently, this only works in the "running" case.
46   DCHECK_NE(remote_id_, MessageInTransit::kInvalidEndpointId);
47 
48   return WriteMessageNoLock(message.Pass());
49 }
50 
DetachFromMessagePipe()51 void ChannelEndpoint::DetachFromMessagePipe() {
52   // TODO(vtl): Once |message_pipe_| is under |lock_|, we should null it out
53   // here. For now, get the channel to do so for us.
54 
55   scoped_refptr<Channel> channel;
56   {
57     base::AutoLock locker(lock_);
58     if (!channel_)
59       return;
60     DCHECK_NE(local_id_, MessageInTransit::kInvalidEndpointId);
61     // TODO(vtl): Once we combine "run" into "attach", |remote_id_| should valid
62     // here as well.
63     channel = channel_;
64   }
65   // Don't call this under |lock_|, since it'll call us back.
66   // TODO(vtl): This seems pretty suboptimal.
67   channel->DetachMessagePipeEndpoint(local_id_, remote_id_);
68 }
69 
AttachToChannel(Channel * channel,MessageInTransit::EndpointId local_id)70 void ChannelEndpoint::AttachToChannel(Channel* channel,
71                                       MessageInTransit::EndpointId local_id) {
72   DCHECK(channel);
73   DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
74 
75   base::AutoLock locker(lock_);
76   DCHECK(!channel_);
77   DCHECK_EQ(local_id_, MessageInTransit::kInvalidEndpointId);
78   channel_ = channel;
79   local_id_ = local_id;
80 }
81 
Run(MessageInTransit::EndpointId remote_id)82 void ChannelEndpoint::Run(MessageInTransit::EndpointId remote_id) {
83   DCHECK_NE(remote_id, MessageInTransit::kInvalidEndpointId);
84 
85   base::AutoLock locker(lock_);
86   DCHECK(channel_);
87   DCHECK_EQ(remote_id_, MessageInTransit::kInvalidEndpointId);
88   remote_id_ = remote_id;
89 
90   while (!paused_message_queue_.IsEmpty()) {
91     LOG_IF(WARNING, !WriteMessageNoLock(paused_message_queue_.GetMessage()))
92         << "Failed to write enqueue message to channel";
93   }
94 }
95 
DetachFromChannel()96 void ChannelEndpoint::DetachFromChannel() {
97   base::AutoLock locker(lock_);
98   DCHECK(channel_);
99   DCHECK_NE(local_id_, MessageInTransit::kInvalidEndpointId);
100   // TODO(vtl): Once we combine "run" into "attach", |remote_id_| should valid
101   // here as well.
102   channel_ = nullptr;
103   local_id_ = MessageInTransit::kInvalidEndpointId;
104   remote_id_ = MessageInTransit::kInvalidEndpointId;
105 }
106 
~ChannelEndpoint()107 ChannelEndpoint::~ChannelEndpoint() {
108   DCHECK(!channel_);
109   DCHECK_EQ(local_id_, MessageInTransit::kInvalidEndpointId);
110   DCHECK_EQ(remote_id_, MessageInTransit::kInvalidEndpointId);
111 }
112 
WriteMessageNoLock(scoped_ptr<MessageInTransit> message)113 bool ChannelEndpoint::WriteMessageNoLock(scoped_ptr<MessageInTransit> message) {
114   DCHECK(message);
115 
116   lock_.AssertAcquired();
117 
118   DCHECK(channel_);
119   DCHECK_NE(local_id_, MessageInTransit::kInvalidEndpointId);
120   DCHECK_NE(remote_id_, MessageInTransit::kInvalidEndpointId);
121 
122   message->SerializeAndCloseDispatchers(channel_);
123   message->set_source_id(local_id_);
124   message->set_destination_id(remote_id_);
125   return channel_->WriteMessage(message.Pass());
126 }
127 
128 }  // namespace system
129 }  // namespace mojo
130