• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 package org.chromium.mojo.bindings;
6 
7 import org.chromium.mojo.system.AsyncWaiter;
8 import org.chromium.mojo.system.Core;
9 import org.chromium.mojo.system.MessagePipeHandle;
10 import org.chromium.mojo.system.MessagePipeHandle.ReadMessageResult;
11 import org.chromium.mojo.system.MojoException;
12 import org.chromium.mojo.system.MojoResult;
13 import org.chromium.mojo.system.ResultAnd;
14 
15 import java.nio.ByteBuffer;
16 
17 /**
18  * A {@link Connector} owns a {@link MessagePipeHandle} and will send any received messages to the
19  * registered {@link MessageReceiver}. It also acts as a {@link MessageReceiver} and will send any
20  * message through the handle.
21  * <p>
22  * The method |start| must be called before the {@link Connector} will start listening to incoming
23  * messages.
24  */
25 public class Connector implements MessageReceiver, HandleOwner<MessagePipeHandle> {
26 
27     /**
28      * The callback that is notified when the state of the owned handle changes.
29      */
30     private final AsyncWaiterCallback mAsyncWaiterCallback = new AsyncWaiterCallback();
31 
32     /**
33      * The owned message pipe.
34      */
35     private final MessagePipeHandle mMessagePipeHandle;
36 
37     /**
38      * A waiter which is notified when a new message is available on the owned message pipe.
39      */
40     private final AsyncWaiter mAsyncWaiter;
41 
42     /**
43      * The {@link MessageReceiver} to which received messages are sent.
44      */
45     private MessageReceiver mIncomingMessageReceiver;
46 
47     /**
48      * The Cancellable for the current wait. Is |null| when not currently waiting for new messages.
49      */
50     private AsyncWaiter.Cancellable mCancellable;
51 
52     /**
53      * The error handler to notify of errors.
54      */
55     private ConnectionErrorHandler mErrorHandler;
56 
57     /**
58      * Create a new connector over a |messagePipeHandle|. The created connector will use the default
59      * {@link AsyncWaiter} from the {@link Core} implementation of |messagePipeHandle|.
60      */
Connector(MessagePipeHandle messagePipeHandle)61     public Connector(MessagePipeHandle messagePipeHandle) {
62         this(messagePipeHandle, BindingsHelper.getDefaultAsyncWaiterForHandle(messagePipeHandle));
63     }
64 
65     /**
66      * Create a new connector over a |messagePipeHandle| using the given {@link AsyncWaiter} to get
67      * notified of changes on the handle.
68      */
Connector(MessagePipeHandle messagePipeHandle, AsyncWaiter asyncWaiter)69     public Connector(MessagePipeHandle messagePipeHandle, AsyncWaiter asyncWaiter) {
70         mCancellable = null;
71         mMessagePipeHandle = messagePipeHandle;
72         mAsyncWaiter = asyncWaiter;
73     }
74 
75     /**
76      * Set the {@link MessageReceiver} that will receive message from the owned message pipe.
77      */
setIncomingMessageReceiver(MessageReceiver incomingMessageReceiver)78     public void setIncomingMessageReceiver(MessageReceiver incomingMessageReceiver) {
79         mIncomingMessageReceiver = incomingMessageReceiver;
80     }
81 
82     /**
83      * Set the {@link ConnectionErrorHandler} that will be notified of errors on the owned message
84      * pipe.
85      */
setErrorHandler(ConnectionErrorHandler errorHandler)86     public void setErrorHandler(ConnectionErrorHandler errorHandler) {
87         mErrorHandler = errorHandler;
88     }
89 
90     /**
91      * Start listening for incoming messages.
92      */
start()93     public void start() {
94         assert mCancellable == null;
95         registerAsyncWaiterForRead();
96     }
97 
98     /**
99      * @see MessageReceiver#accept(Message)
100      */
101     @Override
accept(Message message)102     public boolean accept(Message message) {
103         try {
104             mMessagePipeHandle.writeMessage(message.getData(),
105                     message.getHandles(), MessagePipeHandle.WriteFlags.NONE);
106             return true;
107         } catch (MojoException e) {
108             onError(e);
109             return false;
110         }
111     }
112 
113     /**
114      * Pass the owned handle of the connector. After this, the connector is disconnected. It cannot
115      * accept new message and it isn't listening to the handle anymore.
116      *
117      * @see org.chromium.mojo.bindings.HandleOwner#passHandle()
118      */
119     @Override
passHandle()120     public MessagePipeHandle passHandle() {
121         cancelIfActive();
122         MessagePipeHandle handle = mMessagePipeHandle.pass();
123         if (mIncomingMessageReceiver != null) {
124             mIncomingMessageReceiver.close();
125         }
126         return handle;
127     }
128 
129     /**
130      * @see java.io.Closeable#close()
131      */
132     @Override
close()133     public void close() {
134         cancelIfActive();
135         mMessagePipeHandle.close();
136         if (mIncomingMessageReceiver != null) {
137             MessageReceiver incomingMessageReceiver = mIncomingMessageReceiver;
138             mIncomingMessageReceiver = null;
139             incomingMessageReceiver.close();
140         }
141     }
142 
143     private class AsyncWaiterCallback implements AsyncWaiter.Callback {
144 
145         /**
146          * @see org.chromium.mojo.system.AsyncWaiter.Callback#onResult(int)
147          */
148         @Override
onResult(int result)149         public void onResult(int result) {
150             Connector.this.onAsyncWaiterResult(result);
151         }
152 
153         /**
154          * @see org.chromium.mojo.system.AsyncWaiter.Callback#onError(MojoException)
155          */
156         @Override
onError(MojoException exception)157         public void onError(MojoException exception) {
158             mCancellable = null;
159             Connector.this.onError(exception);
160         }
161 
162     }
163 
164     /**
165      * @see org.chromium.mojo.system.AsyncWaiter.Callback#onResult(int)
166      */
onAsyncWaiterResult(int result)167     private void onAsyncWaiterResult(int result) {
168         mCancellable = null;
169         if (result == MojoResult.OK) {
170             readOutstandingMessages();
171         } else {
172             onError(new MojoException(result));
173         }
174     }
175 
onError(MojoException exception)176     private void onError(MojoException exception) {
177         close();
178         assert mCancellable == null;
179         if (mErrorHandler != null) {
180             mErrorHandler.onConnectionError(exception);
181         }
182     }
183 
184     /**
185      * Register to be called back when a new message is available on the owned message pipe.
186      */
registerAsyncWaiterForRead()187     private void registerAsyncWaiterForRead() {
188         assert mCancellable == null;
189         if (mAsyncWaiter != null) {
190             mCancellable = mAsyncWaiter.asyncWait(mMessagePipeHandle, Core.HandleSignals.READABLE,
191                     Core.DEADLINE_INFINITE, mAsyncWaiterCallback);
192         } else {
193             onError(new MojoException(MojoResult.INVALID_ARGUMENT));
194         }
195     }
196 
197     /**
198      * Read all available messages on the owned message pipe.
199      */
readOutstandingMessages()200     private void readOutstandingMessages() {
201         ResultAnd<Boolean> result;
202         do {
203             try {
204                 result = readAndDispatchMessage(mMessagePipeHandle, mIncomingMessageReceiver);
205             } catch (MojoException e) {
206                 onError(e);
207                 return;
208             }
209         } while (result.getValue());
210         if (result.getMojoResult() == MojoResult.SHOULD_WAIT) {
211             registerAsyncWaiterForRead();
212         } else {
213             onError(new MojoException(result.getMojoResult()));
214         }
215     }
216 
cancelIfActive()217     private void cancelIfActive() {
218         if (mCancellable != null) {
219             mCancellable.cancel();
220             mCancellable = null;
221         }
222     }
223 
224     /**
225      * Read a message, and pass it to the given |MessageReceiver| if not null. If the
226      * |MessageReceiver| is null, the message is lost.
227      *
228      * @param receiver The {@link MessageReceiver} that will receive the read {@link Message}. Can
229      *            be <code>null</code>, in which case the message is discarded.
230      */
readAndDispatchMessage( MessagePipeHandle handle, MessageReceiver receiver)231     static ResultAnd<Boolean> readAndDispatchMessage(
232             MessagePipeHandle handle, MessageReceiver receiver) {
233         // TODO(qsr) Allow usage of a pool of pre-allocated buffer for performance.
234         ResultAnd<ReadMessageResult> result =
235                 handle.readMessage(null, 0, MessagePipeHandle.ReadFlags.NONE);
236         if (result.getMojoResult() != MojoResult.RESOURCE_EXHAUSTED) {
237             return new ResultAnd<Boolean>(result.getMojoResult(), false);
238         }
239         ReadMessageResult readResult = result.getValue();
240         assert readResult != null;
241         ByteBuffer buffer = ByteBuffer.allocateDirect(readResult.getMessageSize());
242         result = handle.readMessage(
243                 buffer, readResult.getHandlesCount(), MessagePipeHandle.ReadFlags.NONE);
244         if (receiver != null && result.getMojoResult() == MojoResult.OK) {
245             boolean accepted = receiver.accept(new Message(buffer, result.getValue().getHandles()));
246             return new ResultAnd<Boolean>(result.getMojoResult(), accepted);
247         }
248         return new ResultAnd<Boolean>(result.getMojoResult(), false);
249     }
250 }
251