1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/public/cpp/bindings/lib/connector.h"
6
7 #include <assert.h>
8 #include <stdlib.h>
9
10 #include "mojo/public/cpp/bindings/error_handler.h"
11
12 namespace mojo {
13 namespace internal {
14
15 // ----------------------------------------------------------------------------
16
Connector(ScopedMessagePipeHandle message_pipe,const MojoAsyncWaiter * waiter)17 Connector::Connector(ScopedMessagePipeHandle message_pipe,
18 const MojoAsyncWaiter* waiter)
19 : error_handler_(NULL),
20 waiter_(waiter),
21 message_pipe_(message_pipe.Pass()),
22 incoming_receiver_(NULL),
23 async_wait_id_(0),
24 error_(false),
25 drop_writes_(false),
26 enforce_errors_from_incoming_receiver_(true),
27 destroyed_flag_(NULL) {
28 // Even though we don't have an incoming receiver, we still want to monitor
29 // the message pipe to know if is closed or encounters an error.
30 WaitToReadMore();
31 }
32
~Connector()33 Connector::~Connector() {
34 if (destroyed_flag_)
35 *destroyed_flag_ = true;
36
37 if (async_wait_id_)
38 waiter_->CancelWait(async_wait_id_);
39 }
40
CloseMessagePipe()41 void Connector::CloseMessagePipe() {
42 Close(message_pipe_.Pass());
43 }
44
PassMessagePipe()45 ScopedMessagePipeHandle Connector::PassMessagePipe() {
46 if (async_wait_id_) {
47 waiter_->CancelWait(async_wait_id_);
48 async_wait_id_ = 0;
49 }
50 return message_pipe_.Pass();
51 }
52
Accept(Message * message)53 bool Connector::Accept(Message* message) {
54 assert(message_pipe_.is_valid());
55
56 if (error_)
57 return false;
58
59 if (drop_writes_)
60 return true;
61
62 MojoResult rv = WriteMessageRaw(
63 message_pipe_.get(),
64 message->data(),
65 message->data_num_bytes(),
66 message->mutable_handles()->empty() ? NULL :
67 reinterpret_cast<const MojoHandle*>(
68 &message->mutable_handles()->front()),
69 static_cast<uint32_t>(message->mutable_handles()->size()),
70 MOJO_WRITE_MESSAGE_FLAG_NONE);
71
72 switch (rv) {
73 case MOJO_RESULT_OK:
74 // The handles were successfully transferred, so we don't need the message
75 // to track their lifetime any longer.
76 message->mutable_handles()->clear();
77 break;
78 case MOJO_RESULT_FAILED_PRECONDITION:
79 // There's no point in continuing to write to this pipe since the other
80 // end is gone. Avoid writing any future messages. Hide write failures
81 // from the caller since we'd like them to continue consuming any backlog
82 // of incoming messages before regarding the message pipe as closed.
83 drop_writes_ = true;
84 break;
85 default:
86 // This particular write was rejected, presumably because of bad input.
87 // The pipe is not necessarily in a bad state.
88 return false;
89 }
90 return true;
91 }
92
93 // static
CallOnHandleReady(void * closure,MojoResult result)94 void Connector::CallOnHandleReady(void* closure, MojoResult result) {
95 Connector* self = static_cast<Connector*>(closure);
96 self->OnHandleReady(result);
97 }
98
OnHandleReady(MojoResult result)99 void Connector::OnHandleReady(MojoResult result) {
100 assert(async_wait_id_ != 0);
101 async_wait_id_ = 0;
102
103 if (result == MOJO_RESULT_OK) {
104 // Return immediately if |this| was destroyed. Do not touch any members!
105 if (!ReadMore())
106 return;
107 } else {
108 error_ = true;
109 }
110
111 if (error_ && error_handler_)
112 error_handler_->OnConnectionError();
113 }
114
WaitToReadMore()115 void Connector::WaitToReadMore() {
116 async_wait_id_ = waiter_->AsyncWait(message_pipe_.get().value(),
117 MOJO_HANDLE_SIGNAL_READABLE,
118 MOJO_DEADLINE_INDEFINITE,
119 &Connector::CallOnHandleReady,
120 this);
121 }
122
ReadMore()123 bool Connector::ReadMore() {
124 while (true) {
125 bool receiver_result = false;
126
127 // Detect if |this| was destroyed during message dispatch. Allow for the
128 // possibility of re-entering ReadMore() through message dispatch.
129 bool was_destroyed_during_dispatch = false;
130 bool* previous_destroyed_flag = destroyed_flag_;
131 destroyed_flag_ = &was_destroyed_during_dispatch;
132
133 MojoResult rv = ReadAndDispatchMessage(
134 message_pipe_.get(), incoming_receiver_, &receiver_result);
135
136 if (was_destroyed_during_dispatch) {
137 if (previous_destroyed_flag)
138 *previous_destroyed_flag = true; // Propagate flag.
139 return false;
140 }
141 destroyed_flag_ = previous_destroyed_flag;
142
143 if (rv == MOJO_RESULT_SHOULD_WAIT) {
144 WaitToReadMore();
145 break;
146 }
147 if (rv != MOJO_RESULT_OK ||
148 (enforce_errors_from_incoming_receiver_ && !receiver_result)) {
149 error_ = true;
150 break;
151 }
152 }
153 return true;
154 }
155
156 } // namespace internal
157 } // namespace mojo
158