• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/public/cpp/bindings/lib/connector.h"
6 
7 #include <stddef.h>
8 
9 #include "mojo/public/cpp/bindings/error_handler.h"
10 #include "mojo/public/cpp/environment/logging.h"
11 
12 namespace mojo {
13 namespace internal {
14 
15 // ----------------------------------------------------------------------------
16 
Connector(ScopedMessagePipeHandle message_pipe,const MojoAsyncWaiter * waiter)17 Connector::Connector(ScopedMessagePipeHandle message_pipe,
18                      const MojoAsyncWaiter* waiter)
19     : error_handler_(NULL),
20       waiter_(waiter),
21       message_pipe_(message_pipe.Pass()),
22       incoming_receiver_(NULL),
23       async_wait_id_(0),
24       error_(false),
25       drop_writes_(false),
26       enforce_errors_from_incoming_receiver_(true),
27       destroyed_flag_(NULL) {
28   // Even though we don't have an incoming receiver, we still want to monitor
29   // the message pipe to know if is closed or encounters an error.
30   WaitToReadMore();
31 }
32 
~Connector()33 Connector::~Connector() {
34   if (destroyed_flag_)
35     *destroyed_flag_ = true;
36 
37   CancelWait();
38 }
39 
CloseMessagePipe()40 void Connector::CloseMessagePipe() {
41   CancelWait();
42   Close(message_pipe_.Pass());
43 }
44 
PassMessagePipe()45 ScopedMessagePipeHandle Connector::PassMessagePipe() {
46   CancelWait();
47   return message_pipe_.Pass();
48 }
49 
WaitForIncomingMessage()50 bool Connector::WaitForIncomingMessage() {
51   if (error_)
52     return false;
53 
54   MojoResult rv = Wait(message_pipe_.get(),
55                        MOJO_HANDLE_SIGNAL_READABLE,
56                        MOJO_DEADLINE_INDEFINITE);
57   if (rv != MOJO_RESULT_OK) {
58     NotifyError();
59     return false;
60   }
61   mojo_ignore_result(ReadSingleMessage(&rv));
62   return (rv == MOJO_RESULT_OK);
63 }
64 
Accept(Message * message)65 bool Connector::Accept(Message* message) {
66   MOJO_CHECK(message_pipe_.is_valid());
67 
68   if (error_)
69     return false;
70 
71   if (drop_writes_)
72     return true;
73 
74   MojoResult rv = WriteMessageRaw(
75       message_pipe_.get(),
76       message->data(),
77       message->data_num_bytes(),
78       message->mutable_handles()->empty() ? NULL :
79           reinterpret_cast<const MojoHandle*>(
80               &message->mutable_handles()->front()),
81       static_cast<uint32_t>(message->mutable_handles()->size()),
82       MOJO_WRITE_MESSAGE_FLAG_NONE);
83 
84   switch (rv) {
85     case MOJO_RESULT_OK:
86       // The handles were successfully transferred, so we don't need the message
87       // to track their lifetime any longer.
88       message->mutable_handles()->clear();
89       break;
90     case MOJO_RESULT_FAILED_PRECONDITION:
91       // There's no point in continuing to write to this pipe since the other
92       // end is gone. Avoid writing any future messages. Hide write failures
93       // from the caller since we'd like them to continue consuming any backlog
94       // of incoming messages before regarding the message pipe as closed.
95       drop_writes_ = true;
96       break;
97     case MOJO_RESULT_BUSY:
98       // We'd get a "busy" result if one of the message's handles is:
99       //   - |message_pipe_|'s own handle;
100       //   - simultaneously being used on another thread; or
101       //   - in a "busy" state that prohibits it from being transferred (e.g.,
102       //     a data pipe handle in the middle of a two-phase read/write,
103       //     regardless of which thread that two-phase read/write is happening
104       //     on).
105       // TODO(vtl): I wonder if this should be a |MOJO_DCHECK()|. (But, until
106       // crbug.com/389666, etc. are resolved, this will make tests fail quickly
107       // rather than hanging.)
108       MOJO_CHECK(false) << "Race condition or other bug detected";
109       return false;
110     default:
111       // This particular write was rejected, presumably because of bad input.
112       // The pipe is not necessarily in a bad state.
113       return false;
114   }
115   return true;
116 }
117 
118 // static
CallOnHandleReady(void * closure,MojoResult result)119 void Connector::CallOnHandleReady(void* closure, MojoResult result) {
120   Connector* self = static_cast<Connector*>(closure);
121   self->OnHandleReady(result);
122 }
123 
OnHandleReady(MojoResult result)124 void Connector::OnHandleReady(MojoResult result) {
125   MOJO_CHECK(async_wait_id_ != 0);
126   async_wait_id_ = 0;
127   if (result != MOJO_RESULT_OK) {
128     NotifyError();
129     return;
130   }
131   ReadAllAvailableMessages();
132   // At this point, this object might have been deleted. Return.
133 }
134 
WaitToReadMore()135 void Connector::WaitToReadMore() {
136   MOJO_CHECK(!async_wait_id_);
137   async_wait_id_ = waiter_->AsyncWait(message_pipe_.get().value(),
138                                       MOJO_HANDLE_SIGNAL_READABLE,
139                                       MOJO_DEADLINE_INDEFINITE,
140                                       &Connector::CallOnHandleReady,
141                                       this);
142 }
143 
ReadSingleMessage(MojoResult * read_result)144 bool Connector::ReadSingleMessage(MojoResult* read_result) {
145   bool receiver_result = false;
146 
147   // Detect if |this| was destroyed during message dispatch. Allow for the
148   // possibility of re-entering ReadMore() through message dispatch.
149   bool was_destroyed_during_dispatch = false;
150   bool* previous_destroyed_flag = destroyed_flag_;
151   destroyed_flag_ = &was_destroyed_during_dispatch;
152 
153   MojoResult rv = ReadAndDispatchMessage(
154       message_pipe_.get(), incoming_receiver_, &receiver_result);
155   if (read_result)
156     *read_result = rv;
157 
158   if (was_destroyed_during_dispatch) {
159     if (previous_destroyed_flag)
160       *previous_destroyed_flag = true;  // Propagate flag.
161     return false;
162   }
163   destroyed_flag_ = previous_destroyed_flag;
164 
165   if (rv == MOJO_RESULT_SHOULD_WAIT)
166     return true;
167 
168   if (rv != MOJO_RESULT_OK ||
169       (enforce_errors_from_incoming_receiver_ && !receiver_result)) {
170     NotifyError();
171     return false;
172   }
173   return true;
174 }
175 
ReadAllAvailableMessages()176 void Connector::ReadAllAvailableMessages() {
177   while (!error_) {
178     MojoResult rv;
179 
180     // Return immediately if |this| was destroyed. Do not touch any members!
181     if (!ReadSingleMessage(&rv))
182       return;
183 
184     if (rv == MOJO_RESULT_SHOULD_WAIT) {
185       WaitToReadMore();
186       break;
187     }
188   }
189 }
190 
CancelWait()191 void Connector::CancelWait() {
192   if (!async_wait_id_)
193     return;
194 
195   waiter_->CancelWait(async_wait_id_);
196   async_wait_id_ = 0;
197 }
198 
NotifyError()199 void Connector::NotifyError() {
200   error_ = true;
201   CancelWait();
202   if (error_handler_)
203     error_handler_->OnConnectionError();
204 }
205 
206 }  // namespace internal
207 }  // namespace mojo
208