• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2020 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include <android-base/unique_fd.h>
19 #include <binder/IBinder.h>
20 #include <binder/RpcTransport.h>
21 #include <utils/Errors.h>
22 #include <utils/RefBase.h>
23 
24 #include <map>
25 #include <optional>
26 #include <thread>
27 #include <vector>
28 
29 namespace android {
30 
31 class Parcel;
32 class RpcServer;
33 class RpcSocketAddress;
34 class RpcState;
35 class RpcTransport;
36 class FdTrigger;
37 
38 constexpr uint32_t RPC_WIRE_PROTOCOL_VERSION_NEXT = 1;
39 constexpr uint32_t RPC_WIRE_PROTOCOL_VERSION_EXPERIMENTAL = 0xF0000000;
40 constexpr uint32_t RPC_WIRE_PROTOCOL_VERSION = 0;
41 
42 /**
43  * This represents a session (group of connections) between a client
44  * and a server. Multiple connections are needed for multiple parallel "binder"
45  * calls which may also have nested calls.
46  */
47 class RpcSession final : public virtual RefBase {
48 public:
49     static constexpr size_t kDefaultMaxOutgoingThreads = 10;
50 
51     // Create an RpcSession with default configuration (raw sockets).
52     static sp<RpcSession> make();
53 
54     // Create an RpcSession with the given configuration. |serverRpcCertificateFormat| and
55     // |serverCertificate| must have values or be nullopt simultaneously. If they have values, set
56     // server certificate.
57     static sp<RpcSession> make(std::unique_ptr<RpcTransportCtxFactory> rpcTransportCtxFactory);
58 
59     /**
60      * Set the maximum number of incoming threads allowed to be made (for things like callbacks).
61      * By default, this is 0. This must be called before setting up this connection as a client.
62      * Server sessions will inherits this value from RpcServer.
63      *
64      * If this is called, 'shutdown' on this session must also be called.
65      * Otherwise, a threadpool will leak.
66      *
67      * TODO(b/189955605): start these dynamically
68      */
69     void setMaxIncomingThreads(size_t threads);
70     size_t getMaxIncomingThreads();
71 
72     /**
73      * Set the maximum number of outgoing threads allowed to be made.
74      * By default, this is |kDefaultMaxOutgoingThreads|. This must be called before setting up this
75      * connection as a client.
76      *
77      * This limits the number of outgoing threads on top of the remote peer setting. This RpcSession
78      * will only instantiate |min(maxOutgoingThreads, remoteMaxThreads)| outgoing threads, where
79      * |remoteMaxThreads| can be retrieved from the remote peer via |getRemoteMaxThreads()|.
80      */
81     void setMaxOutgoingThreads(size_t threads);
82     size_t getMaxOutgoingThreads();
83 
84     /**
85      * By default, the minimum of the supported versions of the client and the
86      * server will be used. Usually, this API should only be used for debugging.
87      */
88     [[nodiscard]] bool setProtocolVersion(uint32_t version);
89     std::optional<uint32_t> getProtocolVersion();
90 
91     /**
92      * This should be called once per thread, matching 'join' in the remote
93      * process.
94      */
95     [[nodiscard]] status_t setupUnixDomainClient(const char* path);
96 
97     /**
98      * Connects to an RPC server at the CVD & port.
99      */
100     [[nodiscard]] status_t setupVsockClient(unsigned int cvd, unsigned int port);
101 
102     /**
103      * Connects to an RPC server at the given address and port.
104      */
105     [[nodiscard]] status_t setupInetClient(const char* addr, unsigned int port);
106 
107     /**
108      * Starts talking to an RPC server which has already been connected to. This
109      * is expected to be used when another process has permission to connect to
110      * a binder RPC service, but this process only has permission to talk to
111      * that service.
112      *
113      * For convenience, if 'fd' is -1, 'request' will be called.
114      *
115      * For future compatibility, 'request' should not reference any stack data.
116      */
117     [[nodiscard]] status_t setupPreconnectedClient(base::unique_fd fd,
118                                                    std::function<base::unique_fd()>&& request);
119 
120     /**
121      * For debugging!
122      *
123      * Sets up an empty connection. All queries to this connection which require a
124      * response will never be satisfied. All data sent here will be
125      * unceremoniously cast down the bottomless pit, /dev/null.
126      */
127     [[nodiscard]] status_t addNullDebuggingClient();
128 
129     /**
130      * Query the other side of the session for the root object hosted by that
131      * process's RpcServer (if one exists)
132      */
133     sp<IBinder> getRootObject();
134 
135     /**
136      * Query the other side of the session for the maximum number of threads
137      * it supports (maximum number of concurrent non-nested synchronous transactions)
138      */
139     [[nodiscard]] status_t getRemoteMaxThreads(size_t* maxThreads);
140 
141     /**
142      * See RpcTransportCtx::getCertificate
143      */
144     std::vector<uint8_t> getCertificate(RpcCertificateFormat);
145 
146     /**
147      * Shuts down the service.
148      *
149      * For client sessions, wait can be true or false. For server sessions,
150      * waiting is not currently supported (will abort).
151      *
152      * Warning: this is currently not active/nice (the server isn't told we're
153      * shutting down). Being nicer to the server could potentially make it
154      * reclaim resources faster.
155      *
156      * If this is called w/ 'wait' true, then this will wait for shutdown to
157      * complete before returning. This will hang if it is called from the
158      * session threadpool (when processing received calls).
159      */
160     [[nodiscard]] bool shutdownAndWait(bool wait);
161 
162     [[nodiscard]] status_t transact(const sp<IBinder>& binder, uint32_t code, const Parcel& data,
163                                     Parcel* reply, uint32_t flags);
164 
165     /**
166      * Generally, you should not call this, unless you are testing error
167      * conditions, as this is called automatically by BpBinders when they are
168      * deleted (this is also why a raw pointer is used here)
169      */
170     [[nodiscard]] status_t sendDecStrong(const BpBinder* binder);
171 
172     ~RpcSession();
173 
174     /**
175      * Server if this session is created as part of a server (symmetrical to
176      * client servers). Otherwise, nullptr.
177      */
178     sp<RpcServer> server();
179 
180     // internal only
state()181     const std::unique_ptr<RpcState>& state() { return mRpcBinderState; }
182 
183 private:
184     friend sp<RpcSession>;
185     friend RpcServer;
186     friend RpcState;
187     explicit RpcSession(std::unique_ptr<RpcTransportCtx> ctx);
188 
189     // for 'target', see RpcState::sendDecStrongToTarget
190     [[nodiscard]] status_t sendDecStrongToTarget(uint64_t address, size_t target);
191 
192     class EventListener : public virtual RefBase {
193     public:
194         virtual void onSessionAllIncomingThreadsEnded(const sp<RpcSession>& session) = 0;
195         virtual void onSessionIncomingThreadEnded() = 0;
196     };
197 
198     class WaitForShutdownListener : public EventListener {
199     public:
200         void onSessionAllIncomingThreadsEnded(const sp<RpcSession>& session) override;
201         void onSessionIncomingThreadEnded() override;
202         void waitForShutdown(std::unique_lock<std::mutex>& lock, const sp<RpcSession>& session);
203 
204     private:
205         std::condition_variable mCv;
206     };
207     friend WaitForShutdownListener;
208 
209     struct RpcConnection : public RefBase {
210         std::unique_ptr<RpcTransport> rpcTransport;
211 
212         // whether this or another thread is currently using this fd to make
213         // or receive transactions.
214         std::optional<pid_t> exclusiveTid;
215 
216         bool allowNested = false;
217     };
218 
219     [[nodiscard]] status_t readId();
220 
221     // A thread joining a server must always call these functions in order, and
222     // cleanup is only programmed once into join. These are in separate
223     // functions in order to allow for different locks to be taken during
224     // different parts of setup.
225     //
226     // transfer ownership of thread (usually done while a lock is taken on the
227     // structure which originally owns the thread)
228     void preJoinThreadOwnership(std::thread thread);
229     // pass FD to thread and read initial connection information
230     struct PreJoinSetupResult {
231         // Server connection object associated with this
232         sp<RpcConnection> connection;
233         // Status of setup
234         status_t status;
235     };
236     PreJoinSetupResult preJoinSetup(std::unique_ptr<RpcTransport> rpcTransport);
237     // join on thread passed to preJoinThreadOwnership
238     static void join(sp<RpcSession>&& session, PreJoinSetupResult&& result);
239 
240     [[nodiscard]] status_t setupClient(
241             const std::function<status_t(const std::vector<uint8_t>& sessionId, bool incoming)>&
242                     connectAndInit);
243     [[nodiscard]] status_t setupSocketClient(const RpcSocketAddress& address);
244     [[nodiscard]] status_t setupOneSocketConnection(const RpcSocketAddress& address,
245                                                     const std::vector<uint8_t>& sessionId,
246                                                     bool incoming);
247     [[nodiscard]] status_t initAndAddConnection(base::unique_fd fd,
248                                                 const std::vector<uint8_t>& sessionId,
249                                                 bool incoming);
250     [[nodiscard]] status_t addIncomingConnection(std::unique_ptr<RpcTransport> rpcTransport);
251     [[nodiscard]] status_t addOutgoingConnection(std::unique_ptr<RpcTransport> rpcTransport,
252                                                  bool init);
253     [[nodiscard]] bool setForServer(const wp<RpcServer>& server,
254                                     const wp<RpcSession::EventListener>& eventListener,
255                                     const std::vector<uint8_t>& sessionId,
256                                     const sp<IBinder>& sessionSpecificRoot);
257     sp<RpcConnection> assignIncomingConnectionToThisThread(
258             std::unique_ptr<RpcTransport> rpcTransport);
259     [[nodiscard]] bool removeIncomingConnection(const sp<RpcConnection>& connection);
260 
261     [[nodiscard]] status_t initShutdownTrigger();
262 
263     enum class ConnectionUse {
264         CLIENT,
265         CLIENT_ASYNC,
266         CLIENT_REFCOUNT,
267     };
268 
269     // Object representing exclusive access to a connection.
270     class ExclusiveConnection {
271     public:
272         [[nodiscard]] static status_t find(const sp<RpcSession>& session, ConnectionUse use,
273                                            ExclusiveConnection* connection);
274 
275         ~ExclusiveConnection();
get()276         const sp<RpcConnection>& get() { return mConnection; }
277 
278     private:
279         static void findConnection(pid_t tid, sp<RpcConnection>* exclusive,
280                                    sp<RpcConnection>* available,
281                                    std::vector<sp<RpcConnection>>& sockets,
282                                    size_t socketsIndexHint);
283 
284         sp<RpcSession> mSession; // avoid deallocation
285         sp<RpcConnection> mConnection;
286 
287         // whether this is being used for a nested transaction (being on the same
288         // thread guarantees we won't write in the middle of a message, the way
289         // the wire protocol is constructed guarantees this is safe).
290         bool mReentrant = false;
291     };
292 
293     const std::unique_ptr<RpcTransportCtx> mCtx;
294 
295     // On the other side of a session, for each of mOutgoing here, there should
296     // be one of mIncoming on the other side (and vice versa).
297     //
298     // For the simplest session, a single server with one client, you would
299     // have:
300     //  - the server has a single 'mIncoming' and a thread listening on this
301     //  - the client has a single 'mOutgoing' and makes calls to this
302     //  - here, when the client makes a call, the server can call back into it
303     //    (nested calls), but outside of this, the client will only ever read
304     //    calls from the server when it makes a call itself.
305     //
306     // For a more complicated case, the client might itself open up a thread to
307     // serve calls to the server at all times (e.g. if it hosts a callback)
308 
309     wp<RpcServer> mForServer; // maybe null, for client sessions
310     sp<WaitForShutdownListener> mShutdownListener; // used for client sessions
311     wp<EventListener> mEventListener; // mForServer if server, mShutdownListener if client
312 
313     // session-specific root object (if a different root is used for each
314     // session)
315     sp<IBinder> mSessionSpecificRootObject;
316 
317     std::vector<uint8_t> mId;
318 
319     std::unique_ptr<FdTrigger> mShutdownTrigger;
320 
321     std::unique_ptr<RpcState> mRpcBinderState;
322 
323     std::mutex mMutex; // for all below
324 
325     size_t mMaxIncomingThreads = 0;
326     size_t mMaxOutgoingThreads = kDefaultMaxOutgoingThreads;
327     std::optional<uint32_t> mProtocolVersion;
328 
329     std::condition_variable mAvailableConnectionCv; // for mWaitingThreads
330 
331     struct ThreadState {
332         size_t mWaitingThreads = 0;
333         // hint index into clients, ++ when sending an async transaction
334         size_t mOutgoingOffset = 0;
335         std::vector<sp<RpcConnection>> mOutgoing;
336         size_t mMaxIncoming = 0;
337         std::vector<sp<RpcConnection>> mIncoming;
338         std::map<std::thread::id, std::thread> mThreads;
339     } mConnections;
340 };
341 
342 } // namespace android
343