1 // Copyright 2014 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 package org.chromium.mojo.bindings; 6 7 import org.chromium.mojo.system.AsyncWaiter; 8 import org.chromium.mojo.system.AsyncWaiter.Callback; 9 import org.chromium.mojo.system.Core; 10 import org.chromium.mojo.system.MessagePipeHandle; 11 import org.chromium.mojo.system.MessagePipeHandle.ReadMessageResult; 12 import org.chromium.mojo.system.MojoException; 13 import org.chromium.mojo.system.MojoResult; 14 import org.chromium.mojo.system.Pair; 15 import org.chromium.mojo.system.ResultAnd; 16 17 import java.nio.ByteBuffer; 18 import java.util.ArrayList; 19 import java.util.List; 20 import java.util.concurrent.Executor; 21 22 /** 23 * A factory which provides per-thread executors, which enable execution on the thread from which 24 * they were obtained. 25 */ 26 class ExecutorFactory { 27 28 /** 29 * A null buffer which is used to send messages without any data on the PipedExecutor's 30 * signaling handles. 31 */ 32 private static final ByteBuffer NOTIFY_BUFFER = null; 33 34 /** 35 * Implementation of the executor which uses a pair of {@link MessagePipeHandle} for signaling. 36 * The executor will wait asynchronously on one end of a {@link MessagePipeHandle} on the thread 37 * on which it was created. Other threads can call execute with a {@link Runnable}, and the 38 * executor will queue the {@link Runnable} and write a message on the other end of the handle. 39 * This will wake up the executor which is waiting on the handle, which will then dequeue the 40 * {@link Runnable} and execute it on the original thread. 41 */ 42 private static class PipedExecutor implements Executor, Callback { 43 44 /** 45 * The handle which is written to. Access to this object must be protected with |mLock|. 46 */ 47 private final MessagePipeHandle mWriteHandle; 48 /** 49 * The handle which is read from. 50 */ 51 private final MessagePipeHandle mReadHandle; 52 /** 53 * The list of actions left to be run. Access to this object must be protected with |mLock|. 54 */ 55 private final List<Runnable> mPendingActions; 56 /** 57 * Lock protecting access to |mWriteHandle| and |mPendingActions|. 58 */ 59 private final Object mLock; 60 /** 61 * The {@link AsyncWaiter} to get notified of new message availability on |mReadHandle|. 62 */ 63 private final AsyncWaiter mWaiter; 64 65 /** 66 * Constructor. 67 */ PipedExecutor(Core core)68 public PipedExecutor(Core core) { 69 mWaiter = core.getDefaultAsyncWaiter(); 70 assert mWaiter != null; 71 mLock = new Object(); 72 Pair<MessagePipeHandle, MessagePipeHandle> handles = core.createMessagePipe( 73 new MessagePipeHandle.CreateOptions()); 74 mReadHandle = handles.first; 75 mWriteHandle = handles.second; 76 mPendingActions = new ArrayList<Runnable>(); 77 asyncWait(); 78 } 79 80 /** 81 * Asynchronously wait for the next command to arrive. This should only be called on the 82 * executor thread. 83 */ asyncWait()84 private void asyncWait() { 85 mWaiter.asyncWait(mReadHandle, Core.HandleSignals.READABLE, Core.DEADLINE_INFINITE, 86 this); 87 } 88 89 /** 90 * @see Callback#onResult(int) 91 */ 92 @Override onResult(int result)93 public void onResult(int result) { 94 if (result == MojoResult.OK && readNotifyBufferMessage()) { 95 runNextAction(); 96 } else { 97 close(); 98 } 99 } 100 101 /** 102 * @see Callback#onError(MojoException) 103 */ 104 @Override onError(MojoException exception)105 public void onError(MojoException exception) { 106 close(); 107 } 108 109 /** 110 * Close the handles. Should only be called on the executor thread. 111 */ close()112 private void close() { 113 synchronized (mLock) { 114 mWriteHandle.close(); 115 mPendingActions.clear(); 116 } 117 mReadHandle.close(); 118 } 119 120 /** 121 * Read the next message on |mReadHandle|, and return |true| if successful, |false| 122 * otherwise. 123 */ readNotifyBufferMessage()124 private boolean readNotifyBufferMessage() { 125 try { 126 ResultAnd<ReadMessageResult> readMessageResult = 127 mReadHandle.readMessage(NOTIFY_BUFFER, 0, MessagePipeHandle.ReadFlags.NONE); 128 if (readMessageResult.getMojoResult() == MojoResult.OK) { 129 asyncWait(); 130 return true; 131 } 132 } catch (MojoException e) { 133 // Will be closed by the fall back at the end of this method. 134 } 135 return false; 136 } 137 138 /** 139 * Run the next action in the |mPendingActions| queue. 140 */ runNextAction()141 private void runNextAction() { 142 Runnable toRun = null; 143 synchronized (mLock) { 144 toRun = mPendingActions.remove(0); 145 } 146 toRun.run(); 147 } 148 149 /** 150 * Execute the given |command| in the executor thread. This can be called on any thread. 151 * 152 * @see Executor#execute(Runnable) 153 */ 154 @Override execute(Runnable command)155 public void execute(Runnable command) { 156 // Accessing the write handle must be protected by the lock, because it can be closed 157 // from the executor's thread. 158 synchronized (mLock) { 159 if (!mWriteHandle.isValid()) { 160 throw new IllegalStateException( 161 "Trying to execute an action on a closed executor."); 162 } 163 mPendingActions.add(command); 164 mWriteHandle.writeMessage(NOTIFY_BUFFER, null, MessagePipeHandle.WriteFlags.NONE); 165 } 166 } 167 } 168 169 /** 170 * Keep one executor per executor thread. 171 */ 172 private static final ThreadLocal<Executor> EXECUTORS = new ThreadLocal<Executor>(); 173 174 /** 175 * Returns an {@link Executor} that will run all of its actions in the current thread. 176 */ getExecutorForCurrentThread(Core core)177 public static Executor getExecutorForCurrentThread(Core core) { 178 Executor executor = EXECUTORS.get(); 179 if (executor == null) { 180 executor = new PipedExecutor(core); 181 EXECUTORS.set(executor); 182 } 183 return executor; 184 } 185 } 186