• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2016 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <android-base/unique_fd.h>
20 #include <cutils/ashmem.h>
21 #include <fmq/EventFlag.h>
22 #include <sys/mman.h>
23 #include <sys/user.h>
24 #include <utils/Log.h>
25 #include <utils/SystemClock.h>
26 #include <atomic>
27 #include <new>
28 
29 using android::hardware::kSynchronizedReadWrite;
30 using android::hardware::kUnsynchronizedWrite;
31 using android::hardware::MQFlavor;
32 
33 namespace android {
34 
35 /* sentinel payload type that indicates the MQ will be used with a mismatching
36 MQDescriptor type, where type safety must be enforced elsewhere because the real
37 element type T is not statically known. This is used to instantiate
38 MessageQueueBase instances for Rust where we cannot generate additional template
39 instantiations across the language boundary. */
40 enum MQErased {};
41 
42 template <template <typename, MQFlavor> class MQDescriptorType, typename T, MQFlavor flavor>
43 struct MessageQueueBase {
44     typedef MQDescriptorType<T, flavor> Descriptor;
45     enum Error : int {
46         NONE,
47         POINTER_CORRUPTION, /** Read/write pointers mismatch */
48     };
49 
50     /**
51      * @param Desc MQDescriptor describing the FMQ.
52      * @param resetPointers bool indicating whether the read/write pointers
53      * should be reset or not.
54      */
55     MessageQueueBase(const Descriptor& Desc, bool resetPointers = true);
56 
57     ~MessageQueueBase();
58 
59     /**
60      * This constructor uses Ashmem shared memory to create an FMQ
61      * that can contain a maximum of 'numElementsInQueue' elements of type T.
62      *
63      * @param numElementsInQueue Capacity of the MessageQueue in terms of T.
64      * @param configureEventFlagWord Boolean that specifies if memory should
65      * also be allocated and mapped for an EventFlag word.
66      * @param bufferFd User-supplied file descriptor to map the memory for the ringbuffer
67      * By default, bufferFd=-1 means library will allocate ashmem region for ringbuffer.
68      * MessageQueue takes ownership of the file descriptor.
69      * @param bufferSize size of buffer in bytes that bufferFd represents. This
70      * size must be larger than or equal to (numElementsInQueue * sizeof(T)).
71      * Otherwise, operations will cause out-of-bounds memory access.
72      */
73 
MessageQueueBaseMessageQueueBase74     MessageQueueBase(size_t numElementsInQueue, bool configureEventFlagWord,
75                      android::base::unique_fd bufferFd, size_t bufferSize)
76         : MessageQueueBase(numElementsInQueue, configureEventFlagWord, std::move(bufferFd),
77                            bufferSize, sizeof(T)) {
78         /* We must not pass sizeof(T) as quantum for MQErased element type. */
79         static_assert(!std::is_same_v<T, MQErased>,
80                       "MessageQueueBase<..., MQErased, ...> must be constructed via a"
81                       " constructor that accepts a descriptor or a quantum size");
82     };
83 
84     MessageQueueBase(size_t numElementsInQueue, bool configureEventFlagWord = false)
85         : MessageQueueBase(numElementsInQueue, configureEventFlagWord, android::base::unique_fd(),
86                            0) {}
87 
88     /**
89      * @param errorDetected Optional output parameter which indicates
90      * any errors that the client might care about.
91      * @param errorMessage Optional output parameter for a human-readable
92      * error description.
93      *
94      * @return Number of items of type T that can be written into the FMQ
95      * without a read.
96      */
97     size_t availableToWrite(Error* errorDetected = nullptr,
98                             std::string* errorMessage = nullptr) const;
99 
100     /**
101      * @param errorDetected Optional output parameter which indicates
102      * any errors that the client might care about.
103      * @param errorMessage Optional output parameter for a human-readable
104      * error description.
105      *
106      * @return Number of items of type T that are waiting to be read from the
107      * FMQ.
108      */
109     size_t availableToRead(Error* errorDetected = nullptr,
110                            std::string* errorMessage = nullptr) const;
111 
112     /**
113      * Returns the size of type T in bytes.
114      *
115      * @return Size of T.
116      */
117     size_t getQuantumSize() const;
118 
119     /**
120      * Returns the size of the FMQ in terms of the size of type T.
121      *
122      * @return Number of items of type T that will fit in the FMQ.
123      */
124     size_t getQuantumCount() const;
125 
126     /**
127      * @return Whether the FMQ is configured correctly.
128      */
129     bool isValid() const;
130 
131     /**
132      * Non-blocking write to FMQ.
133      *
134      * @param data Pointer to the object of type T to be written into the FMQ.
135      *
136      * @return Whether the write was successful.
137      */
138     bool write(const T* data);
139 
140     /**
141      * Non-blocking read from FMQ.
142      *
143      * @param data Pointer to the memory where the object read from the FMQ is
144      * copied to.
145      *
146      * @return Whether the read was successful.
147      */
148     bool read(T* data);
149 
150     /**
151      * Write some data into the FMQ without blocking.
152      *
153      * @param data Pointer to the array of items of type T.
154      * @param count Number of items in array.
155      *
156      * @return Whether the write was successful.
157      */
158     bool write(const T* data, size_t count);
159 
160     /**
161      * Perform a blocking write of 'count' items into the FMQ using EventFlags.
162      * Does not support partial writes.
163      *
164      * If 'evFlag' is nullptr, it is checked whether there is an EventFlag object
165      * associated with the FMQ and it is used in that case.
166      *
167      * The application code must ensure that 'evFlag' used by the
168      * reader(s)/writer is based upon the same EventFlag word.
169      *
170      * The method will return false without blocking if any of the following
171      * conditions are true:
172      * - If 'evFlag' is nullptr and the FMQ does not own an EventFlag object.
173      * - If the 'readNotification' bit mask is zero.
174      * - If 'count' is greater than the FMQ size.
175      *
176      * If the there is insufficient space available to write into it, the
177      * EventFlag bit mask 'readNotification' is is waited upon.
178      *
179      * This method should only be used with a MessageQueue of the flavor
180      * 'kSynchronizedReadWrite'.
181      *
182      * Upon a successful write, wake is called on 'writeNotification' (if
183      * non-zero).
184      *
185      * @param data Pointer to the array of items of type T.
186      * @param count Number of items in array.
187      * @param readNotification The EventFlag bit mask to wait on if there is not
188      * enough space in FMQ to write 'count' items.
189      * @param writeNotification The EventFlag bit mask to call wake on
190      * a successful write. No wake is called if 'writeNotification' is zero.
191      * @param timeOutNanos Number of nanoseconds after which the blocking
192      * write attempt is aborted.
193      * @param evFlag The EventFlag object to be used for blocking. If nullptr,
194      * it is checked whether the FMQ owns an EventFlag object and that is used
195      * for blocking instead.
196      *
197      * @return Whether the write was successful.
198      */
199     bool writeBlocking(const T* data, size_t count, uint32_t readNotification,
200                        uint32_t writeNotification, int64_t timeOutNanos = 0,
201                        android::hardware::EventFlag* evFlag = nullptr);
202 
203     bool writeBlocking(const T* data, size_t count, int64_t timeOutNanos = 0);
204 
205     /**
206      * Read some data from the FMQ without blocking.
207      *
208      * @param data Pointer to the array to which read data is to be written.
209      * @param count Number of items to be read.
210      *
211      * @return Whether the read was successful.
212      */
213     bool read(T* data, size_t count);
214 
215     /**
216      * Perform a blocking read operation of 'count' items from the FMQ. Does not
217      * perform a partial read.
218      *
219      * If 'evFlag' is nullptr, it is checked whether there is an EventFlag object
220      * associated with the FMQ and it is used in that case.
221      *
222      * The application code must ensure that 'evFlag' used by the
223      * reader(s)/writer is based upon the same EventFlag word.
224      *
225      * The method will return false without blocking if any of the following
226      * conditions are true:
227      * -If 'evFlag' is nullptr and the FMQ does not own an EventFlag object.
228      * -If the 'writeNotification' bit mask is zero.
229      * -If 'count' is greater than the FMQ size.
230      *
231      * This method should only be used with a MessageQueue of the flavor
232      * 'kSynchronizedReadWrite'.
233 
234      * If FMQ does not contain 'count' items, the eventFlag bit mask
235      * 'writeNotification' is waited upon. Upon a successful read from the FMQ,
236      * wake is called on 'readNotification' (if non-zero).
237      *
238      * @param data Pointer to the array to which read data is to be written.
239      * @param count Number of items to be read.
240      * @param readNotification The EventFlag bit mask to call wake on after
241      * a successful read. No wake is called if 'readNotification' is zero.
242      * @param writeNotification The EventFlag bit mask to call a wait on
243      * if there is insufficient data in the FMQ to be read.
244      * @param timeOutNanos Number of nanoseconds after which the blocking
245      * read attempt is aborted.
246      * @param evFlag The EventFlag object to be used for blocking.
247      *
248      * @return Whether the read was successful.
249      */
250     bool readBlocking(T* data, size_t count, uint32_t readNotification, uint32_t writeNotification,
251                       int64_t timeOutNanos = 0, android::hardware::EventFlag* evFlag = nullptr);
252 
253     bool readBlocking(T* data, size_t count, int64_t timeOutNanos = 0);
254 
255     /**
256      * Get a pointer to the MQDescriptor object that describes this FMQ.
257      *
258      * @return Pointer to the MQDescriptor associated with the FMQ.
259      */
getDescMessageQueueBase260     const Descriptor* getDesc() const { return mDesc.get(); }
261 
262     /**
263      * Get a pointer to the EventFlag word if there is one associated with this FMQ.
264      *
265      * @return Pointer to an EventFlag word, will return nullptr if not
266      * configured. This method does not transfer ownership. The EventFlag
267      * word will be unmapped by the MessageQueue destructor.
268      */
getEventFlagWordMessageQueueBase269     std::atomic<uint32_t>* getEventFlagWord() const { return mEvFlagWord; }
270 
271     /**
272      * Describes a memory region in the FMQ.
273      */
274     struct MemRegion {
MemRegionMessageQueueBase::MemRegion275         MemRegion() : MemRegion(nullptr, 0) {}
276 
MemRegionMessageQueueBase::MemRegion277         MemRegion(T* base, size_t size) : address(base), length(size) {}
278 
279         MemRegion& operator=(const MemRegion& other) {
280             address = other.address;
281             length = other.length;
282             return *this;
283         }
284 
285         /**
286          * Gets a pointer to the base address of the MemRegion.
287          */
getAddressMessageQueueBase::MemRegion288         inline T* getAddress() const { return address; }
289 
290         /**
291          * Gets the length of the MemRegion. This would equal to the number
292          * of items of type T that can be read from/written into the MemRegion.
293          */
getLengthMessageQueueBase::MemRegion294         inline size_t getLength() const { return length; }
295 
296         /**
297          * Gets the length of the MemRegion in bytes.
298          */
299         template <class U = T>
getLengthInBytesMessageQueueBase::MemRegion300         inline std::enable_if_t<!std::is_same_v<U, MQErased>, size_t> getLengthInBytes() const {
301             return length * kQuantumValue<U>;
302         }
303 
304       private:
305         /* Base address */
306         T* address;
307 
308         /*
309          * Number of items of type T that can be written to/read from the base
310          * address.
311          */
312         size_t length;
313     };
314 
315     /**
316      * Describes the memory regions to be used for a read or write.
317      * The struct contains two MemRegion objects since the FMQ is a ring
318      * buffer and a read or write operation can wrap around. A single message
319      * of type T will never be broken between the two MemRegions.
320      */
321     struct MemTransaction {
MemTransactionMessageQueueBase::MemTransaction322         MemTransaction() : MemTransaction(MemRegion(), MemRegion()) {}
323 
MemTransactionMessageQueueBase::MemTransaction324         MemTransaction(const MemRegion& regionFirst, const MemRegion& regionSecond)
325             : first(regionFirst), second(regionSecond) {}
326 
327         MemTransaction& operator=(const MemTransaction& other) {
328             first = other.first;
329             second = other.second;
330             return *this;
331         }
332 
333         /**
334          * Helper method to calculate the address for a particular index for
335          * the MemTransaction object.
336          *
337          * @param idx Index of the slot to be read/written. If the
338          * MemTransaction object is representing the memory region to read/write
339          * N items of type T, the valid range of idx is between 0 and N-1.
340          *
341          * @return Pointer to the slot idx. Will be nullptr for an invalid idx.
342          */
343         T* getSlot(size_t idx);
344 
345         /**
346          * Helper method to write 'nMessages' items of type T into the memory
347          * regions described by the object starting from 'startIdx'. This method
348          * uses memcpy() and is not to meant to be used for a zero copy operation.
349          * Partial writes are not supported.
350          *
351          * @param data Pointer to the source buffer.
352          * @param nMessages Number of items of type T.
353          * @param startIdx The slot number to begin the write from. If the
354          * MemTransaction object is representing the memory region to read/write
355          * N items of type T, the valid range of startIdx is between 0 and N-1;
356          *
357          * @return Whether the write operation of size 'nMessages' succeeded.
358          */
359         bool copyTo(const T* data, size_t startIdx, size_t nMessages = 1);
360 
361         /*
362          * Helper method to read 'nMessages' items of type T from the memory
363          * regions described by the object starting from 'startIdx'. This method uses
364          * memcpy() and is not meant to be used for a zero copy operation. Partial reads
365          * are not supported.
366          *
367          * @param data Pointer to the destination buffer.
368          * @param nMessages Number of items of type T.
369          * @param startIdx The slot number to begin the read from. If the
370          * MemTransaction object is representing the memory region to read/write
371          * N items of type T, the valid range of startIdx is between 0 and N-1.
372          *
373          * @return Whether the read operation of size 'nMessages' succeeded.
374          */
375         bool copyFrom(T* data, size_t startIdx, size_t nMessages = 1);
376 
377         /**
378          * Returns a const reference to the first MemRegion in the
379          * MemTransaction object.
380          */
getFirstRegionMessageQueueBase::MemTransaction381         inline const MemRegion& getFirstRegion() const { return first; }
382 
383         /**
384          * Returns a const reference to the second MemRegion in the
385          * MemTransaction object.
386          */
getSecondRegionMessageQueueBase::MemTransaction387         inline const MemRegion& getSecondRegion() const { return second; }
388 
389       private:
390         friend MessageQueueBase<MQDescriptorType, T, flavor>;
391 
392         bool copyToSized(const T* data, size_t startIdx, size_t nMessages, size_t messageSize);
393         bool copyFromSized(T* data, size_t startIdx, size_t nMessages, size_t messageSize);
394 
395         /*
396          * Given a start index and the number of messages to be
397          * read/written, this helper method calculates the
398          * number of messages that should should be written to both the first
399          * and second MemRegions and the base addresses to be used for
400          * the read/write operation.
401          *
402          * Returns false if the 'startIdx' and 'nMessages' is
403          * invalid for the MemTransaction object.
404          */
405         bool inline getMemRegionInfo(size_t idx, size_t nMessages, size_t& firstCount,
406                                      size_t& secondCount, T** firstBaseAddress,
407                                      T** secondBaseAddress);
408         MemRegion first;
409         MemRegion second;
410     };
411 
412     /**
413      * Get a MemTransaction object to write 'nMessages' items of type T.
414      * Once the write is performed using the information from MemTransaction,
415      * the write operation is to be committed using a call to commitWrite().
416      *
417      * @param nMessages Number of messages of type T.
418      * @param Pointer to MemTransaction struct that describes memory to write 'nMessages'
419      * items of type T. If a write of size 'nMessages' is not possible, the base
420      * addresses in the MemTransaction object would be set to nullptr.
421      *
422      * @return Whether it is possible to write 'nMessages' items of type T
423      * into the FMQ.
424      */
425     bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
426 
427     /**
428      * Commit a write of size 'nMessages'. To be only used after a call to beginWrite().
429      *
430      * @param nMessages number of messages of type T to be written.
431      *
432      * @return Whether the write operation of size 'nMessages' succeeded.
433      */
434     bool commitWrite(size_t nMessages);
435 
436     /**
437      * Get a MemTransaction object to read 'nMessages' items of type T.
438      * Once the read is performed using the information from MemTransaction,
439      * the read operation is to be committed using a call to commitRead().
440      *
441      * @param nMessages Number of messages of type T.
442      * @param pointer to MemTransaction struct that describes memory to read 'nMessages'
443      * items of type T. If a read of size 'nMessages' is not possible, the base
444      * pointers in the MemTransaction object returned will be set to nullptr.
445      *
446      * @return bool Whether it is possible to read 'nMessages' items of type T
447      * from the FMQ.
448      */
449     bool beginRead(size_t nMessages, MemTransaction* memTx) const;
450 
451     /**
452      * Commit a read of size 'nMessages'. To be only used after a call to beginRead().
453      * For the unsynchronized flavor of FMQ, this method will return a failure
454      * if a write overflow happened after beginRead() was invoked.
455      *
456      * @param nMessages number of messages of type T to be read.
457      *
458      * @return bool Whether the read operation of size 'nMessages' succeeded.
459      */
460     bool commitRead(size_t nMessages);
461 
462     /**
463      * Get the pointer to the ring buffer. Useful for debugging and fuzzing.
464      */
getRingBufferPtrMessageQueueBase465     uint8_t* getRingBufferPtr() const { return mRing; }
466 
467   protected:
468     /**
469      * Protected constructor that can manually specify the quantum to use.
470      * The only external consumer of this ctor is ErasedMessageQueue, but the
471      * constructor cannot be private because this is a base class.
472      *
473      * @param quantum Size of the element type, in bytes.
474      * Other parameters have semantics given in the corresponding public ctor.
475      */
476 
477     MessageQueueBase(size_t numElementsInQueue, bool configureEventFlagWord,
478                      android::base::unique_fd bufferFd, size_t bufferSize, size_t quantum);
479 
480   private:
481     template <class U = T,
482               typename std::enable_if<!std::is_same<U, MQErased>::value, bool>::type = true>
483     static constexpr size_t kQuantumValue = sizeof(T);
484     inline size_t quantum() const;
485     size_t availableToWriteBytes(Error* errorDetected, std::string* errorMessage) const;
486     size_t availableToReadBytes(Error* errorDetected, std::string* errorMessage) const;
487 
488     MessageQueueBase(const MessageQueueBase& other) = delete;
489     MessageQueueBase& operator=(const MessageQueueBase& other) = delete;
490 
491     void* mapGrantorDescr(uint32_t grantorIdx);
492     void unmapGrantorDescr(void* address, uint32_t grantorIdx);
493     void initMemory(bool resetPointers);
494 
495     enum DefaultEventNotification : uint32_t {
496         /*
497          * These are only used internally by the readBlocking()/writeBlocking()
498          * methods and hence once other bit combinations are not required.
499          */
500         FMQ_NOT_FULL = 0x01,
501         FMQ_NOT_EMPTY = 0x02
502     };
503     std::unique_ptr<Descriptor> mDesc;
504     uint8_t* mRing = nullptr;
505     /*
506      * TODO(b/31550092): Change to 32 bit read and write pointer counters.
507      */
508     std::atomic<uint64_t>* mReadPtr = nullptr;
509     std::atomic<uint64_t>* mWritePtr = nullptr;
510 
511     std::atomic<uint32_t>* mEvFlagWord = nullptr;
512 
513     /*
514      * This EventFlag object will be owned by the FMQ and will have the same
515      * lifetime.
516      */
517     android::hardware::EventFlag* mEventFlag = nullptr;
518 
519     const size_t kPageSize = getpagesize();
520 };
521 
522 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
getSlot(size_t idx)523 T* MessageQueueBase<MQDescriptorType, T, flavor>::MemTransaction::getSlot(size_t idx) {
524     size_t firstRegionLength = first.getLength();
525     size_t secondRegionLength = second.getLength();
526 
527     if (idx > firstRegionLength + secondRegionLength) {
528         return nullptr;
529     }
530 
531     if (idx < firstRegionLength) {
532         return first.getAddress() + idx;
533     }
534 
535     return second.getAddress() + idx - firstRegionLength;
536 }
537 
538 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
getMemRegionInfo(size_t startIdx,size_t nMessages,size_t & firstCount,size_t & secondCount,T ** firstBaseAddress,T ** secondBaseAddress)539 bool MessageQueueBase<MQDescriptorType, T, flavor>::MemTransaction::getMemRegionInfo(
540         size_t startIdx, size_t nMessages, size_t& firstCount, size_t& secondCount,
541         T** firstBaseAddress, T** secondBaseAddress) {
542     size_t firstRegionLength = first.getLength();
543     size_t secondRegionLength = second.getLength();
544 
545     if (startIdx + nMessages > firstRegionLength + secondRegionLength) {
546         /*
547          * Return false if 'nMessages' starting at 'startIdx' cannot be
548          * accommodated by the MemTransaction object.
549          */
550         return false;
551     }
552 
553     /* Number of messages to be read/written to the first MemRegion. */
554     firstCount =
555             startIdx < firstRegionLength ? std::min(nMessages, firstRegionLength - startIdx) : 0;
556 
557     /* Number of messages to be read/written to the second MemRegion. */
558     secondCount = nMessages - firstCount;
559 
560     if (firstCount != 0) {
561         *firstBaseAddress = first.getAddress() + startIdx;
562     }
563 
564     if (secondCount != 0) {
565         size_t secondStartIdx = startIdx > firstRegionLength ? startIdx - firstRegionLength : 0;
566         *secondBaseAddress = second.getAddress() + secondStartIdx;
567     }
568 
569     return true;
570 }
571 
572 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
copyFrom(T * data,size_t startIdx,size_t nMessages)573 bool MessageQueueBase<MQDescriptorType, T, flavor>::MemTransaction::copyFrom(T* data,
574                                                                              size_t startIdx,
575                                                                              size_t nMessages) {
576     if constexpr (!std::is_same<T, MQErased>::value) {
577         return copyFromSized(data, startIdx, nMessages, kQuantumValue<T>);
578     } else {
579         /* Compile error. */
580         static_assert(!std::is_same<T, MQErased>::value,
581                       "copyFrom without messageSize argument cannot be used with MQErased (use "
582                       "copyFromSized)");
583     }
584 }
585 
586 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
copyFromSized(T * data,size_t startIdx,size_t nMessages,size_t messageSize)587 bool MessageQueueBase<MQDescriptorType, T, flavor>::MemTransaction::copyFromSized(
588         T* data, size_t startIdx, size_t nMessages, size_t messageSize) {
589     if (data == nullptr) {
590         return false;
591     }
592 
593     size_t firstReadCount = 0, secondReadCount = 0;
594     T *firstBaseAddress = nullptr, *secondBaseAddress = nullptr;
595 
596     if (getMemRegionInfo(startIdx, nMessages, firstReadCount, secondReadCount, &firstBaseAddress,
597                          &secondBaseAddress) == false) {
598         /*
599          * Returns false if 'startIdx' and 'nMessages' are invalid for this
600          * MemTransaction object.
601          */
602         return false;
603     }
604 
605     if (firstReadCount != 0) {
606         memcpy(data, firstBaseAddress, firstReadCount * messageSize);
607     }
608 
609     if (secondReadCount != 0) {
610         memcpy(data + firstReadCount, secondBaseAddress, secondReadCount * messageSize);
611     }
612 
613     return true;
614 }
615 
616 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
copyTo(const T * data,size_t startIdx,size_t nMessages)617 bool MessageQueueBase<MQDescriptorType, T, flavor>::MemTransaction::copyTo(const T* data,
618                                                                            size_t startIdx,
619                                                                            size_t nMessages) {
620     if constexpr (!std::is_same<T, MQErased>::value) {
621         return copyToSized(data, startIdx, nMessages, kQuantumValue<T>);
622     } else {
623         /* Compile error. */
624         static_assert(!std::is_same<T, MQErased>::value,
625                       "copyTo without messageSize argument cannot be used with MQErased (use "
626                       "copyToSized)");
627     }
628 }
629 
630 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
copyToSized(const T * data,size_t startIdx,size_t nMessages,size_t messageSize)631 bool MessageQueueBase<MQDescriptorType, T, flavor>::MemTransaction::copyToSized(
632         const T* data, size_t startIdx, size_t nMessages, size_t messageSize) {
633     if (data == nullptr) {
634         return false;
635     }
636 
637     size_t firstWriteCount = 0, secondWriteCount = 0;
638     T *firstBaseAddress = nullptr, *secondBaseAddress = nullptr;
639 
640     if (getMemRegionInfo(startIdx, nMessages, firstWriteCount, secondWriteCount, &firstBaseAddress,
641                          &secondBaseAddress) == false) {
642         /*
643          * Returns false if 'startIdx' and 'nMessages' are invalid for this
644          * MemTransaction object.
645          */
646         return false;
647     }
648 
649     if (firstWriteCount != 0) {
650         memcpy(firstBaseAddress, data, firstWriteCount * messageSize);
651     }
652 
653     if (secondWriteCount != 0) {
654         memcpy(secondBaseAddress, data + firstWriteCount, secondWriteCount * messageSize);
655     }
656 
657     return true;
658 }
659 
660 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
initMemory(bool resetPointers)661 void MessageQueueBase<MQDescriptorType, T, flavor>::initMemory(bool resetPointers) {
662     /*
663      * Verify that the Descriptor contains the minimum number of grantors
664      * the native_handle is valid and T matches quantum size.
665      */
666     if ((mDesc == nullptr) || !mDesc->isHandleValid() ||
667         (mDesc->countGrantors() < hardware::details::kMinGrantorCount)) {
668         return;
669     }
670     if (mDesc->getQuantum() != quantum()) {
671         hardware::details::logError(
672                 "Payload size differs between the queue instantiation and the "
673                 "MQDescriptor.");
674         return;
675     }
676 
677     if (flavor == kSynchronizedReadWrite) {
678         mReadPtr = reinterpret_cast<std::atomic<uint64_t>*>(
679                 mapGrantorDescr(hardware::details::READPTRPOS));
680     } else {
681         /*
682          * The unsynchronized write flavor of the FMQ may have multiple readers
683          * and each reader would have their own read pointer counter.
684          */
685         mReadPtr = new (std::nothrow) std::atomic<uint64_t>;
686     }
687     if (mReadPtr == nullptr) goto error;
688 
689     mWritePtr = reinterpret_cast<std::atomic<uint64_t>*>(
690             mapGrantorDescr(hardware::details::WRITEPTRPOS));
691     if (mWritePtr == nullptr) goto error;
692 
693     if (resetPointers) {
694         mReadPtr->store(0, std::memory_order_release);
695         mWritePtr->store(0, std::memory_order_release);
696     } else if (flavor != kSynchronizedReadWrite) {
697         // Always reset the read pointer.
698         mReadPtr->store(0, std::memory_order_release);
699     }
700 
701     mRing = reinterpret_cast<uint8_t*>(mapGrantorDescr(hardware::details::DATAPTRPOS));
702     if (mRing == nullptr) goto error;
703 
704     if (mDesc->countGrantors() > hardware::details::EVFLAGWORDPOS) {
705         mEvFlagWord = static_cast<std::atomic<uint32_t>*>(
706                 mapGrantorDescr(hardware::details::EVFLAGWORDPOS));
707         if (mEvFlagWord == nullptr) goto error;
708         android::hardware::EventFlag::createEventFlag(mEvFlagWord, &mEventFlag);
709     }
710     return;
711 error:
712     if (mReadPtr) {
713         if (flavor == kSynchronizedReadWrite) {
714             unmapGrantorDescr(mReadPtr, hardware::details::READPTRPOS);
715         } else {
716             delete mReadPtr;
717         }
718         mReadPtr = nullptr;
719     }
720     if (mWritePtr) {
721         unmapGrantorDescr(mWritePtr, hardware::details::WRITEPTRPOS);
722         mWritePtr = nullptr;
723     }
724     if (mRing) {
725         unmapGrantorDescr(mRing, hardware::details::EVFLAGWORDPOS);
726         mRing = nullptr;
727     }
728 }
729 
730 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
MessageQueueBase(const Descriptor & Desc,bool resetPointers)731 MessageQueueBase<MQDescriptorType, T, flavor>::MessageQueueBase(const Descriptor& Desc,
732                                                                 bool resetPointers) {
733     mDesc = std::unique_ptr<Descriptor>(new (std::nothrow) Descriptor(Desc));
734     if (mDesc == nullptr || mDesc->getSize() == 0) {
735         hardware::details::logError("MQDescriptor is invalid or queue size is 0.");
736         return;
737     }
738 
739     initMemory(resetPointers);
740 }
741 
742 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
MessageQueueBase(size_t numElementsInQueue,bool configureEventFlagWord,android::base::unique_fd bufferFd,size_t bufferSize,size_t quantum)743 MessageQueueBase<MQDescriptorType, T, flavor>::MessageQueueBase(size_t numElementsInQueue,
744                                                                 bool configureEventFlagWord,
745                                                                 android::base::unique_fd bufferFd,
746                                                                 size_t bufferSize, size_t quantum) {
747     // Check if the buffer size would not overflow size_t
748     if (numElementsInQueue > SIZE_MAX / quantum) {
749         hardware::details::logError("Requested message queue size too large. Size of elements: " +
750                                     std::to_string(quantum) +
751                                     ". Number of elements: " + std::to_string(numElementsInQueue));
752         return;
753     }
754     if (numElementsInQueue == 0) {
755         hardware::details::logError("Requested queue size of 0.");
756         return;
757     }
758     if (bufferFd != -1 && numElementsInQueue * quantum > bufferSize) {
759         hardware::details::logError("The supplied buffer size(" + std::to_string(bufferSize) +
760                                     ") is smaller than the required size(" +
761                                     std::to_string(numElementsInQueue * quantum) + ").");
762         return;
763     }
764     /*
765      * The FMQ needs to allocate memory for the ringbuffer as well as for the
766      * read and write pointer counters. If an EventFlag word is to be configured,
767      * we also need to allocate memory for the same/
768      */
769     size_t kQueueSizeBytes = numElementsInQueue * quantum;
770     size_t kMetaDataSize = 2 * sizeof(android::hardware::details::RingBufferPosition);
771 
772     if (configureEventFlagWord) {
773         kMetaDataSize += sizeof(std::atomic<uint32_t>);
774     }
775 
776     /*
777      * Ashmem memory region size needs to be specified in page-aligned bytes.
778      * kQueueSizeBytes needs to be aligned to word boundary so that all offsets
779      * in the grantorDescriptor will be word aligned.
780      */
781     size_t kAshmemSizePageAligned;
782     if (bufferFd != -1) {
783         // Allocate read counter and write counter only. User-supplied memory will be used for the
784         // ringbuffer.
785         kAshmemSizePageAligned = (kMetaDataSize + kPageSize - 1) & ~(kPageSize - 1);
786     } else {
787         // Allocate ringbuffer, read counter and write counter.
788         kAshmemSizePageAligned = (hardware::details::alignToWordBoundary(kQueueSizeBytes) +
789                                   kMetaDataSize + kPageSize - 1) &
790                                  ~(kPageSize - 1);
791     }
792 
793     /*
794      * The native handle will contain the fds to be mapped.
795      */
796     int numFds = (bufferFd != -1) ? 2 : 1;
797     native_handle_t* mqHandle = native_handle_create(numFds, 0 /* numInts */);
798     if (mqHandle == nullptr) {
799         return;
800     }
801 
802     /*
803      * Create an ashmem region to map the memory.
804      */
805     int ashmemFd = ashmem_create_region("MessageQueue", kAshmemSizePageAligned);
806     ashmem_set_prot_region(ashmemFd, PROT_READ | PROT_WRITE);
807     mqHandle->data[0] = ashmemFd;
808 
809     if (bufferFd != -1) {
810         // Use user-supplied file descriptor for fdIndex 1
811         mqHandle->data[1] = bufferFd.get();
812         // release ownership of fd. mqHandle owns it now.
813         if (bufferFd.release() < 0) {
814             hardware::details::logError("Error releasing supplied bufferFd");
815         }
816 
817         std::vector<android::hardware::GrantorDescriptor> grantors;
818         grantors.resize(configureEventFlagWord ? hardware::details::kMinGrantorCountForEvFlagSupport
819                                                : hardware::details::kMinGrantorCount);
820 
821         size_t memSize[] = {
822                 sizeof(hardware::details::RingBufferPosition), /* memory to be allocated for read
823                                                                   pointer counter */
824                 sizeof(hardware::details::RingBufferPosition), /* memory to be allocated for write
825                                                                   pointer counter */
826                 kQueueSizeBytes,              /* memory to be allocated for data buffer */
827                 sizeof(std::atomic<uint32_t>) /* memory to be allocated for EventFlag word */
828         };
829 
830         for (size_t grantorPos = 0, offset = 0; grantorPos < grantors.size(); grantorPos++) {
831             uint32_t grantorFdIndex;
832             size_t grantorOffset;
833             if (grantorPos == hardware::details::DATAPTRPOS) {
834                 grantorFdIndex = 1;
835                 grantorOffset = 0;
836             } else {
837                 grantorFdIndex = 0;
838                 grantorOffset = offset;
839                 offset += memSize[grantorPos];
840             }
841             grantors[grantorPos] = {
842                     0 /* grantor flags */, grantorFdIndex,
843                     static_cast<uint32_t>(hardware::details::alignToWordBoundary(grantorOffset)),
844                     memSize[grantorPos]};
845         }
846 
847         mDesc = std::unique_ptr<Descriptor>(new (std::nothrow)
848                                                     Descriptor(grantors, mqHandle, quantum));
849     } else {
850         mDesc = std::unique_ptr<Descriptor>(new (std::nothrow) Descriptor(
851                 kQueueSizeBytes, mqHandle, quantum, configureEventFlagWord));
852     }
853     if (mDesc == nullptr) {
854         native_handle_close(mqHandle);
855         native_handle_delete(mqHandle);
856         return;
857     }
858     initMemory(true);
859 }
860 
861 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
~MessageQueueBase()862 MessageQueueBase<MQDescriptorType, T, flavor>::~MessageQueueBase() {
863     if (flavor == kSynchronizedReadWrite && mReadPtr != nullptr) {
864         unmapGrantorDescr(mReadPtr, hardware::details::READPTRPOS);
865     } else if (mReadPtr != nullptr) {
866         delete mReadPtr;
867     }
868     if (mWritePtr != nullptr) {
869         unmapGrantorDescr(mWritePtr, hardware::details::WRITEPTRPOS);
870     }
871     if (mRing != nullptr) {
872         unmapGrantorDescr(mRing, hardware::details::DATAPTRPOS);
873     }
874     if (mEvFlagWord != nullptr) {
875         unmapGrantorDescr(mEvFlagWord, hardware::details::EVFLAGWORDPOS);
876         android::hardware::EventFlag::deleteEventFlag(&mEventFlag);
877     }
878 }
879 
880 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
write(const T * data)881 bool MessageQueueBase<MQDescriptorType, T, flavor>::write(const T* data) {
882     return write(data, 1);
883 }
884 
885 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
read(T * data)886 bool MessageQueueBase<MQDescriptorType, T, flavor>::read(T* data) {
887     return read(data, 1);
888 }
889 
890 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
write(const T * data,size_t nMessages)891 bool MessageQueueBase<MQDescriptorType, T, flavor>::write(const T* data, size_t nMessages) {
892     MemTransaction tx;
893     return beginWrite(nMessages, &tx) &&
894            tx.copyToSized(data, 0 /* startIdx */, nMessages, quantum()) && commitWrite(nMessages);
895 }
896 
897 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
writeBlocking(const T * data,size_t count,uint32_t readNotification,uint32_t writeNotification,int64_t timeOutNanos,android::hardware::EventFlag * evFlag)898 bool MessageQueueBase<MQDescriptorType, T, flavor>::writeBlocking(
899         const T* data, size_t count, uint32_t readNotification, uint32_t writeNotification,
900         int64_t timeOutNanos, android::hardware::EventFlag* evFlag) {
901     static_assert(flavor == kSynchronizedReadWrite,
902                   "writeBlocking can only be used with the "
903                   "kSynchronizedReadWrite flavor.");
904     /*
905      * If evFlag is null and the FMQ does not have its own EventFlag object
906      * return false;
907      * If the flavor is kSynchronizedReadWrite and the readNotification
908      * bit mask is zero return false;
909      * If the count is greater than queue size, return false
910      * to prevent blocking until timeOut.
911      */
912     if (evFlag == nullptr) {
913         evFlag = mEventFlag;
914         if (evFlag == nullptr) {
915             hardware::details::logError(
916                     "writeBlocking failed: called on MessageQueue with no Eventflag"
917                     "configured or provided");
918             return false;
919         }
920     }
921 
922     if (readNotification == 0 || (count > getQuantumCount())) {
923         return false;
924     }
925 
926     /*
927      * There is no need to wait for a readNotification if there is sufficient
928      * space to write is already present in the FMQ. The latter would be the case when
929      * read operations read more number of messages than write operations write.
930      * In other words, a single large read may clear the FMQ after multiple small
931      * writes. This would fail to clear a pending readNotification bit since
932      * EventFlag bits can only be cleared by a wait() call, however the bit would
933      * be correctly cleared by the next writeBlocking() call.
934      */
935 
936     bool result = write(data, count);
937     if (result) {
938         if (writeNotification) {
939             evFlag->wake(writeNotification);
940         }
941         return result;
942     }
943 
944     bool shouldTimeOut = timeOutNanos != 0;
945     int64_t prevTimeNanos = shouldTimeOut ? android::elapsedRealtimeNano() : 0;
946 
947     while (true) {
948         /* It is not required to adjust 'timeOutNanos' if 'shouldTimeOut' is false */
949         if (shouldTimeOut) {
950             /*
951              * The current time and 'prevTimeNanos' are both CLOCK_BOOTTIME clock values(converted
952              * to Nanoseconds)
953              */
954             int64_t currentTimeNs = android::elapsedRealtimeNano();
955             /*
956              * Decrement 'timeOutNanos' to account for the time taken to complete the last
957              * iteration of the while loop.
958              */
959             timeOutNanos -= currentTimeNs - prevTimeNanos;
960             prevTimeNanos = currentTimeNs;
961 
962             if (timeOutNanos <= 0) {
963                 /*
964                  * Attempt write in case a context switch happened outside of
965                  * evFlag->wait().
966                  */
967                 result = write(data, count);
968                 break;
969             }
970         }
971 
972         /*
973          * wait() will return immediately if there was a pending read
974          * notification.
975          */
976         uint32_t efState = 0;
977         status_t status = evFlag->wait(readNotification, &efState, timeOutNanos,
978                                        true /* retry on spurious wake */);
979 
980         if (status != android::TIMED_OUT && status != android::NO_ERROR) {
981             hardware::details::logError("Unexpected error code from EventFlag Wait status " +
982                                         std::to_string(status));
983             break;
984         }
985 
986         if (status == android::TIMED_OUT) {
987             break;
988         }
989 
990         /*
991          * If there is still insufficient space to write to the FMQ,
992          * keep waiting for another readNotification.
993          */
994         if ((efState & readNotification) && write(data, count)) {
995             result = true;
996             break;
997         }
998     }
999 
1000     if (result && writeNotification != 0) {
1001         evFlag->wake(writeNotification);
1002     }
1003 
1004     return result;
1005 }
1006 
1007 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
writeBlocking(const T * data,size_t count,int64_t timeOutNanos)1008 bool MessageQueueBase<MQDescriptorType, T, flavor>::writeBlocking(const T* data, size_t count,
1009                                                                   int64_t timeOutNanos) {
1010     return writeBlocking(data, count, FMQ_NOT_FULL, FMQ_NOT_EMPTY, timeOutNanos);
1011 }
1012 
1013 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
readBlocking(T * data,size_t count,uint32_t readNotification,uint32_t writeNotification,int64_t timeOutNanos,android::hardware::EventFlag * evFlag)1014 bool MessageQueueBase<MQDescriptorType, T, flavor>::readBlocking(
1015         T* data, size_t count, uint32_t readNotification, uint32_t writeNotification,
1016         int64_t timeOutNanos, android::hardware::EventFlag* evFlag) {
1017     static_assert(flavor == kSynchronizedReadWrite,
1018                   "readBlocking can only be used with the "
1019                   "kSynchronizedReadWrite flavor.");
1020 
1021     /*
1022      * If evFlag is null and the FMQ does not own its own EventFlag object
1023      * return false;
1024      * If the writeNotification bit mask is zero return false;
1025      * If the count is greater than queue size, return false to prevent
1026      * blocking until timeOut.
1027      */
1028     if (evFlag == nullptr) {
1029         evFlag = mEventFlag;
1030         if (evFlag == nullptr) {
1031             hardware::details::logError(
1032                     "readBlocking failed: called on MessageQueue with no Eventflag"
1033                     "configured or provided");
1034             return false;
1035         }
1036     }
1037 
1038     if (writeNotification == 0 || count > getQuantumCount()) {
1039         return false;
1040     }
1041 
1042     /*
1043      * There is no need to wait for a write notification if sufficient
1044      * data to read is already present in the FMQ. This would be the
1045      * case when read operations read lesser number of messages than
1046      * a write operation and multiple reads would be required to clear the queue
1047      * after a single write operation. This check would fail to clear a pending
1048      * writeNotification bit since EventFlag bits can only be cleared
1049      * by a wait() call, however the bit would be correctly cleared by the next
1050      * readBlocking() call.
1051      */
1052 
1053     bool result = read(data, count);
1054     if (result) {
1055         if (readNotification) {
1056             evFlag->wake(readNotification);
1057         }
1058         return result;
1059     }
1060 
1061     bool shouldTimeOut = timeOutNanos != 0;
1062     int64_t prevTimeNanos = shouldTimeOut ? android::elapsedRealtimeNano() : 0;
1063 
1064     while (true) {
1065         /* It is not required to adjust 'timeOutNanos' if 'shouldTimeOut' is false */
1066         if (shouldTimeOut) {
1067             /*
1068              * The current time and 'prevTimeNanos' are both CLOCK_BOOTTIME clock values(converted
1069              * to Nanoseconds)
1070              */
1071             int64_t currentTimeNs = android::elapsedRealtimeNano();
1072             /*
1073              * Decrement 'timeOutNanos' to account for the time taken to complete the last
1074              * iteration of the while loop.
1075              */
1076             timeOutNanos -= currentTimeNs - prevTimeNanos;
1077             prevTimeNanos = currentTimeNs;
1078 
1079             if (timeOutNanos <= 0) {
1080                 /*
1081                  * Attempt read in case a context switch happened outside of
1082                  * evFlag->wait().
1083                  */
1084                 result = read(data, count);
1085                 break;
1086             }
1087         }
1088 
1089         /*
1090          * wait() will return immediately if there was a pending write
1091          * notification.
1092          */
1093         uint32_t efState = 0;
1094         status_t status = evFlag->wait(writeNotification, &efState, timeOutNanos,
1095                                        true /* retry on spurious wake */);
1096 
1097         if (status != android::TIMED_OUT && status != android::NO_ERROR) {
1098             hardware::details::logError("Unexpected error code from EventFlag Wait status " +
1099                                         std::to_string(status));
1100             break;
1101         }
1102 
1103         if (status == android::TIMED_OUT) {
1104             break;
1105         }
1106 
1107         /*
1108          * If the data in FMQ is still insufficient, go back to waiting
1109          * for another write notification.
1110          */
1111         if ((efState & writeNotification) && read(data, count)) {
1112             result = true;
1113             break;
1114         }
1115     }
1116 
1117     if (result && readNotification != 0) {
1118         evFlag->wake(readNotification);
1119     }
1120     return result;
1121 }
1122 
1123 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
readBlocking(T * data,size_t count,int64_t timeOutNanos)1124 bool MessageQueueBase<MQDescriptorType, T, flavor>::readBlocking(T* data, size_t count,
1125                                                                  int64_t timeOutNanos) {
1126     return readBlocking(data, count, FMQ_NOT_FULL, FMQ_NOT_EMPTY, timeOutNanos);
1127 }
1128 
1129 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
quantum()1130 inline size_t MessageQueueBase<MQDescriptorType, T, flavor>::quantum() const {
1131     if constexpr (std::is_same<T, MQErased>::value) {
1132         return mDesc->getQuantum();
1133     } else {
1134         return kQuantumValue<T>;
1135     }
1136 }
1137 
1138 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
availableToWriteBytes(Error * errorDetected,std::string * errorMessage)1139 size_t MessageQueueBase<MQDescriptorType, T, flavor>::availableToWriteBytes(
1140         Error* errorDetected, std::string* errorMessage) const {
1141     size_t queueSizeBytes = mDesc->getSize();
1142     Error localErrorDetected = Error::NONE;
1143     size_t availableBytes = availableToReadBytes(&localErrorDetected, errorMessage);
1144     if (localErrorDetected != Error::NONE) {
1145         if (errorDetected != nullptr) {
1146             *errorDetected = localErrorDetected;
1147         }
1148         return 0;
1149     }
1150     if (queueSizeBytes < availableBytes) {
1151         std::string errorMsg =
1152                 "The write or read pointer has become corrupted. Writing to the queue is no "
1153                 "longer possible. Queue size: " +
1154                 std::to_string(queueSizeBytes) + ", available: " + std::to_string(availableBytes);
1155         hardware::details::logError(errorMsg);
1156         if (errorDetected != nullptr) {
1157             *errorDetected = Error::POINTER_CORRUPTION;
1158         }
1159         if (errorMessage != nullptr) {
1160             *errorMessage = std::move(errorMsg);
1161         }
1162         return 0;
1163     }
1164     return queueSizeBytes - availableBytes;
1165 }
1166 
1167 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
availableToWrite(Error * errorDetected,std::string * errorMessage)1168 size_t MessageQueueBase<MQDescriptorType, T, flavor>::availableToWrite(
1169         Error* errorDetected, std::string* errorMessage) const {
1170     return availableToWriteBytes(errorDetected, errorMessage) / quantum();
1171 }
1172 
1173 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
availableToRead(Error * errorDetected,std::string * errorMessage)1174 size_t MessageQueueBase<MQDescriptorType, T, flavor>::availableToRead(
1175         Error* errorDetected, std::string* errorMessage) const {
1176     return availableToReadBytes(errorDetected, errorMessage) / quantum();
1177 }
1178 
1179 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
beginWrite(size_t nMessages,MemTransaction * result)1180 bool MessageQueueBase<MQDescriptorType, T, flavor>::beginWrite(size_t nMessages,
1181                                                                MemTransaction* result) const {
1182     /*
1183      * If nMessages is greater than size of FMQ or in case of the synchronized
1184      * FMQ flavor, if there is not enough space to write nMessages, then return
1185      * result with null addresses.
1186      */
1187     if ((flavor == kSynchronizedReadWrite && (availableToWrite() < nMessages)) ||
1188         nMessages > getQuantumCount()) {
1189         *result = MemTransaction();
1190         return false;
1191     }
1192 
1193     auto writePtr = mWritePtr->load(std::memory_order_relaxed);
1194     if (writePtr % quantum() != 0) {
1195         hardware::details::logError(
1196                 "The write pointer has become misaligned. Writing to the queue is no longer "
1197                 "possible.");
1198         hardware::details::errorWriteLog(0x534e4554, "184963385");
1199         return false;
1200     }
1201     size_t writeOffset = writePtr % mDesc->getSize();
1202 
1203     /*
1204      * From writeOffset, the number of messages that can be written
1205      * contiguously without wrapping around the ring buffer are calculated.
1206      */
1207     size_t contiguousMessages = (mDesc->getSize() - writeOffset) / quantum();
1208 
1209     if (contiguousMessages < nMessages) {
1210         /*
1211          * Wrap around is required. Both result.first and result.second are
1212          * populated.
1213          */
1214         *result = MemTransaction(
1215                 MemRegion(reinterpret_cast<T*>(mRing + writeOffset), contiguousMessages),
1216                 MemRegion(reinterpret_cast<T*>(mRing), nMessages - contiguousMessages));
1217     } else {
1218         /*
1219          * A wrap around is not required to write nMessages. Only result.first
1220          * is populated.
1221          */
1222         *result = MemTransaction(MemRegion(reinterpret_cast<T*>(mRing + writeOffset), nMessages),
1223                                  MemRegion());
1224     }
1225 
1226     return true;
1227 }
1228 
1229 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
1230 /*
1231  * Disable integer sanitization since integer overflow here is allowed
1232  * and legal.
1233  */
1234 __attribute__((no_sanitize("integer"))) bool
commitWrite(size_t nMessages)1235 MessageQueueBase<MQDescriptorType, T, flavor>::commitWrite(size_t nMessages) {
1236     size_t nBytesWritten = nMessages * quantum();
1237     auto writePtr = mWritePtr->load(std::memory_order_relaxed);
1238     writePtr += nBytesWritten;
1239     mWritePtr->store(writePtr, std::memory_order_release);
1240     /*
1241      * This method cannot fail now since we are only incrementing the writePtr
1242      * counter.
1243      */
1244     return true;
1245 }
1246 
1247 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
availableToReadBytes(Error * errorDetected,std::string * errorMessage)1248 size_t MessageQueueBase<MQDescriptorType, T, flavor>::availableToReadBytes(
1249         Error* errorDetected, std::string* errorMessage) const {
1250     /*
1251      * This method is invoked by implementations of both read() and write() and
1252      * hence requires a memory_order_acquired load for both mReadPtr and
1253      * mWritePtr.
1254      */
1255     uint64_t writePtr = mWritePtr->load(std::memory_order_acquire);
1256     uint64_t readPtr = mReadPtr->load(std::memory_order_acquire);
1257     if (writePtr < readPtr) {
1258         std::string errorMsg =
1259                 "The write or read pointer has become corrupted. Reading from the queue is no "
1260                 "longer possible. Write pointer: " +
1261                 std::to_string(writePtr) + ", read pointer: " + std::to_string(readPtr);
1262         hardware::details::logError(errorMsg);
1263         if (errorDetected != nullptr) {
1264             *errorDetected = Error::POINTER_CORRUPTION;
1265         }
1266         if (errorMessage != nullptr) {
1267             *errorMessage = std::move(errorMsg);
1268         }
1269         return 0;
1270     }
1271     return writePtr - readPtr;
1272 }
1273 
1274 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
read(T * data,size_t nMessages)1275 bool MessageQueueBase<MQDescriptorType, T, flavor>::read(T* data, size_t nMessages) {
1276     MemTransaction tx;
1277     return beginRead(nMessages, &tx) &&
1278            tx.copyFromSized(data, 0 /* startIdx */, nMessages, quantum()) && commitRead(nMessages);
1279 }
1280 
1281 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
1282 /*
1283  * Disable integer sanitization since integer overflow here is allowed
1284  * and legal.
1285  */
1286 __attribute__((no_sanitize("integer"))) bool
beginRead(size_t nMessages,MemTransaction * result)1287 MessageQueueBase<MQDescriptorType, T, flavor>::beginRead(size_t nMessages,
1288                                                          MemTransaction* result) const {
1289     *result = MemTransaction();
1290     /*
1291      * If it is detected that the data in the queue was overwritten
1292      * due to the reader process being too slow, the read pointer counter
1293      * is set to the same as the write pointer counter to indicate error
1294      * and the read returns false;
1295      * Need acquire/release memory ordering for mWritePtr.
1296      */
1297     auto writePtr = mWritePtr->load(std::memory_order_acquire);
1298     /*
1299      * A relaxed load is sufficient for mReadPtr since there will be no
1300      * stores to mReadPtr from a different thread.
1301      */
1302     auto readPtr = mReadPtr->load(std::memory_order_relaxed);
1303     if (writePtr % quantum() != 0 || readPtr % quantum() != 0) {
1304         hardware::details::logError(
1305                 "The write or read pointer has become misaligned. Reading from the queue is no "
1306                 "longer possible.");
1307         hardware::details::errorWriteLog(0x534e4554, "184963385");
1308         return false;
1309     }
1310 
1311     if (writePtr - readPtr > mDesc->getSize()) {
1312         mReadPtr->store(writePtr, std::memory_order_release);
1313         return false;
1314     }
1315 
1316     size_t nBytesDesired = nMessages * quantum();
1317     /*
1318      * Return if insufficient data to read in FMQ.
1319      */
1320     if (writePtr - readPtr < nBytesDesired) {
1321         return false;
1322     }
1323 
1324     size_t readOffset = readPtr % mDesc->getSize();
1325     /*
1326      * From readOffset, the number of messages that can be read contiguously
1327      * without wrapping around the ring buffer are calculated.
1328      */
1329     size_t contiguousMessages = (mDesc->getSize() - readOffset) / quantum();
1330 
1331     if (contiguousMessages < nMessages) {
1332         /*
1333          * A wrap around is required. Both result.first and result.second
1334          * are populated.
1335          */
1336         *result = MemTransaction(
1337                 MemRegion(reinterpret_cast<T*>(mRing + readOffset), contiguousMessages),
1338                 MemRegion(reinterpret_cast<T*>(mRing), nMessages - contiguousMessages));
1339     } else {
1340         /*
1341          * A wrap around is not required. Only result.first need to be
1342          * populated.
1343          */
1344         *result = MemTransaction(MemRegion(reinterpret_cast<T*>(mRing + readOffset), nMessages),
1345                                  MemRegion());
1346     }
1347 
1348     return true;
1349 }
1350 
1351 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
1352 /*
1353  * Disable integer sanitization since integer overflow here is allowed
1354  * and legal.
1355  */
1356 __attribute__((no_sanitize("integer"))) bool
commitRead(size_t nMessages)1357 MessageQueueBase<MQDescriptorType, T, flavor>::commitRead(size_t nMessages) {
1358     // TODO: Use a local copy of readPtr to avoid relazed mReadPtr loads.
1359     auto readPtr = mReadPtr->load(std::memory_order_relaxed);
1360     auto writePtr = mWritePtr->load(std::memory_order_acquire);
1361     /*
1362      * If the flavor is unsynchronized, it is possible that a write overflow may
1363      * have occurred between beginRead() and commitRead().
1364      */
1365     if (writePtr - readPtr > mDesc->getSize()) {
1366         mReadPtr->store(writePtr, std::memory_order_release);
1367         return false;
1368     }
1369 
1370     size_t nBytesRead = nMessages * quantum();
1371     readPtr += nBytesRead;
1372     mReadPtr->store(readPtr, std::memory_order_release);
1373     return true;
1374 }
1375 
1376 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
getQuantumSize()1377 size_t MessageQueueBase<MQDescriptorType, T, flavor>::getQuantumSize() const {
1378     return mDesc->getQuantum();
1379 }
1380 
1381 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
getQuantumCount()1382 size_t MessageQueueBase<MQDescriptorType, T, flavor>::getQuantumCount() const {
1383     return mDesc->getSize() / mDesc->getQuantum();
1384 }
1385 
1386 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
isValid()1387 bool MessageQueueBase<MQDescriptorType, T, flavor>::isValid() const {
1388     return mRing != nullptr && mReadPtr != nullptr && mWritePtr != nullptr;
1389 }
1390 
1391 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
mapGrantorDescr(uint32_t grantorIdx)1392 void* MessageQueueBase<MQDescriptorType, T, flavor>::mapGrantorDescr(uint32_t grantorIdx) {
1393     const native_handle_t* handle = mDesc->handle();
1394     const std::vector<android::hardware::GrantorDescriptor> grantors = mDesc->grantors();
1395     if (handle == nullptr) {
1396         hardware::details::logError("mDesc->handle is null");
1397         return nullptr;
1398     }
1399 
1400     if (grantorIdx >= grantors.size()) {
1401         hardware::details::logError(std::string("grantorIdx must be less than ") +
1402                                     std::to_string(grantors.size()));
1403         return nullptr;
1404     }
1405 
1406     int fdIndex = grantors[grantorIdx].fdIndex;
1407     if (fdIndex < 0 || fdIndex >= handle->numFds) {
1408         hardware::details::logError(
1409                 std::string("fdIndex (" + std::to_string(fdIndex) + ") from grantor (index " +
1410                             std::to_string(grantorIdx) +
1411                             ") must be smaller than the number of fds in the handle: " +
1412                             std::to_string(handle->numFds)));
1413         return nullptr;
1414     }
1415 
1416     /*
1417      * Offset for mmap must be a multiple of kPageSize.
1418      */
1419     if (!hardware::details::isAlignedToWordBoundary(grantors[grantorIdx].offset)) {
1420         hardware::details::logError("Grantor (index " + std::to_string(grantorIdx) +
1421                                     ") offset needs to be aligned to word boundary but is: " +
1422                                     std::to_string(grantors[grantorIdx].offset));
1423         return nullptr;
1424     }
1425 
1426     /*
1427      * Expect some grantors to be at least a min size
1428      */
1429     for (uint32_t i = 0; i < grantors.size(); i++) {
1430         switch (i) {
1431             case hardware::details::READPTRPOS:
1432                 if (grantors[i].extent < sizeof(uint64_t)) return nullptr;
1433                 break;
1434             case hardware::details::WRITEPTRPOS:
1435                 if (grantors[i].extent < sizeof(uint64_t)) return nullptr;
1436                 break;
1437             case hardware::details::DATAPTRPOS:
1438                 // We don't expect specific data size
1439                 break;
1440             case hardware::details::EVFLAGWORDPOS:
1441                 if (grantors[i].extent < sizeof(uint32_t)) return nullptr;
1442                 break;
1443             default:
1444                 // We don't care about unknown grantors
1445                 break;
1446         }
1447     }
1448 
1449     int mapOffset = (grantors[grantorIdx].offset / kPageSize) * kPageSize;
1450     if (grantors[grantorIdx].extent < 0 || grantors[grantorIdx].extent > INT_MAX - kPageSize) {
1451         hardware::details::logError(std::string("Grantor (index " + std::to_string(grantorIdx) +
1452                                                 ") extent value is too large or negative: " +
1453                                                 std::to_string(grantors[grantorIdx].extent)));
1454         return nullptr;
1455     }
1456     int mapLength = grantors[grantorIdx].offset - mapOffset + grantors[grantorIdx].extent;
1457 
1458     void* address = mmap(0, mapLength, PROT_READ | PROT_WRITE, MAP_SHARED, handle->data[fdIndex],
1459                          mapOffset);
1460     if (address == MAP_FAILED && errno == EPERM && flavor == kUnsynchronizedWrite) {
1461         // If the supplied memory is read-only, it would fail with EPERM.
1462         // Try again to mmap read-only for the kUnsynchronizedWrite case.
1463         // kSynchronizedReadWrite cannot use read-only memory because the
1464         // read pointer is stored in the shared memory as well.
1465         address = mmap(0, mapLength, PROT_READ, MAP_SHARED, handle->data[fdIndex], mapOffset);
1466     }
1467     if (address == MAP_FAILED) {
1468         hardware::details::logError(std::string("mmap failed: ") + std::to_string(errno));
1469         return nullptr;
1470     }
1471     return reinterpret_cast<uint8_t*>(address) + (grantors[grantorIdx].offset - mapOffset);
1472 }
1473 
1474 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
unmapGrantorDescr(void * address,uint32_t grantorIdx)1475 void MessageQueueBase<MQDescriptorType, T, flavor>::unmapGrantorDescr(void* address,
1476                                                                       uint32_t grantorIdx) {
1477     auto grantors = mDesc->grantors();
1478     if ((address == nullptr) || (grantorIdx >= grantors.size())) {
1479         return;
1480     }
1481 
1482     int mapOffset = (grantors[grantorIdx].offset / kPageSize) * kPageSize;
1483     int mapLength = grantors[grantorIdx].offset - mapOffset + grantors[grantorIdx].extent;
1484     void* baseAddress =
1485             reinterpret_cast<uint8_t*>(address) - (grantors[grantorIdx].offset - mapOffset);
1486     if (baseAddress) munmap(baseAddress, mapLength);
1487 }
1488 
1489 }  // namespace android
1490