1 /*
2 * Copyright (C) 2016 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #pragma once
18
19 #include <android-base/unique_fd.h>
20 #include <cutils/ashmem.h>
21 #include <fmq/EventFlag.h>
22 #include <sys/mman.h>
23 #include <sys/user.h>
24 #include <utils/Log.h>
25 #include <utils/SystemClock.h>
26 #include <atomic>
27 #include <new>
28
29 using android::hardware::kSynchronizedReadWrite;
30 using android::hardware::kUnsynchronizedWrite;
31 using android::hardware::MQFlavor;
32
33 namespace android {
34
35 /* sentinel payload type that indicates the MQ will be used with a mismatching
36 MQDescriptor type, where type safety must be enforced elsewhere because the real
37 element type T is not statically known. This is used to instantiate
38 MessageQueueBase instances for Rust where we cannot generate additional template
39 instantiations across the language boundary. */
40 enum MQErased {};
41
42 template <template <typename, MQFlavor> class MQDescriptorType, typename T, MQFlavor flavor>
43 struct MessageQueueBase {
44 typedef MQDescriptorType<T, flavor> Descriptor;
45 enum Error : int {
46 NONE,
47 POINTER_CORRUPTION, /** Read/write pointers mismatch */
48 };
49
50 /**
51 * @param Desc MQDescriptor describing the FMQ.
52 * @param resetPointers bool indicating whether the read/write pointers
53 * should be reset or not.
54 */
55 MessageQueueBase(const Descriptor& Desc, bool resetPointers = true);
56
57 ~MessageQueueBase();
58
59 /**
60 * This constructor uses Ashmem shared memory to create an FMQ
61 * that can contain a maximum of 'numElementsInQueue' elements of type T.
62 *
63 * @param numElementsInQueue Capacity of the MessageQueue in terms of T.
64 * @param configureEventFlagWord Boolean that specifies if memory should
65 * also be allocated and mapped for an EventFlag word.
66 * @param bufferFd User-supplied file descriptor to map the memory for the ringbuffer
67 * By default, bufferFd=-1 means library will allocate ashmem region for ringbuffer.
68 * MessageQueue takes ownership of the file descriptor.
69 * @param bufferSize size of buffer in bytes that bufferFd represents. This
70 * size must be larger than or equal to (numElementsInQueue * sizeof(T)).
71 * Otherwise, operations will cause out-of-bounds memory access.
72 */
73
MessageQueueBaseMessageQueueBase74 MessageQueueBase(size_t numElementsInQueue, bool configureEventFlagWord,
75 android::base::unique_fd bufferFd, size_t bufferSize)
76 : MessageQueueBase(numElementsInQueue, configureEventFlagWord, std::move(bufferFd),
77 bufferSize, sizeof(T)) {
78 /* We must not pass sizeof(T) as quantum for MQErased element type. */
79 static_assert(!std::is_same_v<T, MQErased>,
80 "MessageQueueBase<..., MQErased, ...> must be constructed via a"
81 " constructor that accepts a descriptor or a quantum size");
82 };
83
84 MessageQueueBase(size_t numElementsInQueue, bool configureEventFlagWord = false)
85 : MessageQueueBase(numElementsInQueue, configureEventFlagWord, android::base::unique_fd(),
86 0) {}
87
88 /**
89 * @param errorDetected Optional output parameter which indicates
90 * any errors that the client might care about.
91 * @param errorMessage Optional output parameter for a human-readable
92 * error description.
93 *
94 * @return Number of items of type T that can be written into the FMQ
95 * without a read.
96 */
97 size_t availableToWrite(Error* errorDetected = nullptr,
98 std::string* errorMessage = nullptr) const;
99
100 /**
101 * @param errorDetected Optional output parameter which indicates
102 * any errors that the client might care about.
103 * @param errorMessage Optional output parameter for a human-readable
104 * error description.
105 *
106 * @return Number of items of type T that are waiting to be read from the
107 * FMQ.
108 */
109 size_t availableToRead(Error* errorDetected = nullptr,
110 std::string* errorMessage = nullptr) const;
111
112 /**
113 * Returns the size of type T in bytes.
114 *
115 * @return Size of T.
116 */
117 size_t getQuantumSize() const;
118
119 /**
120 * Returns the size of the FMQ in terms of the size of type T.
121 *
122 * @return Number of items of type T that will fit in the FMQ.
123 */
124 size_t getQuantumCount() const;
125
126 /**
127 * @return Whether the FMQ is configured correctly.
128 */
129 bool isValid() const;
130
131 /**
132 * Non-blocking write to FMQ.
133 *
134 * @param data Pointer to the object of type T to be written into the FMQ.
135 *
136 * @return Whether the write was successful.
137 */
138 bool write(const T* data);
139
140 /**
141 * Non-blocking read from FMQ.
142 *
143 * @param data Pointer to the memory where the object read from the FMQ is
144 * copied to.
145 *
146 * @return Whether the read was successful.
147 */
148 bool read(T* data);
149
150 /**
151 * Write some data into the FMQ without blocking.
152 *
153 * @param data Pointer to the array of items of type T.
154 * @param count Number of items in array.
155 *
156 * @return Whether the write was successful.
157 */
158 bool write(const T* data, size_t count);
159
160 /**
161 * Perform a blocking write of 'count' items into the FMQ using EventFlags.
162 * Does not support partial writes.
163 *
164 * If 'evFlag' is nullptr, it is checked whether there is an EventFlag object
165 * associated with the FMQ and it is used in that case.
166 *
167 * The application code must ensure that 'evFlag' used by the
168 * reader(s)/writer is based upon the same EventFlag word.
169 *
170 * The method will return false without blocking if any of the following
171 * conditions are true:
172 * - If 'evFlag' is nullptr and the FMQ does not own an EventFlag object.
173 * - If the 'readNotification' bit mask is zero.
174 * - If 'count' is greater than the FMQ size.
175 *
176 * If the there is insufficient space available to write into it, the
177 * EventFlag bit mask 'readNotification' is is waited upon.
178 *
179 * This method should only be used with a MessageQueue of the flavor
180 * 'kSynchronizedReadWrite'.
181 *
182 * Upon a successful write, wake is called on 'writeNotification' (if
183 * non-zero).
184 *
185 * @param data Pointer to the array of items of type T.
186 * @param count Number of items in array.
187 * @param readNotification The EventFlag bit mask to wait on if there is not
188 * enough space in FMQ to write 'count' items.
189 * @param writeNotification The EventFlag bit mask to call wake on
190 * a successful write. No wake is called if 'writeNotification' is zero.
191 * @param timeOutNanos Number of nanoseconds after which the blocking
192 * write attempt is aborted.
193 * @param evFlag The EventFlag object to be used for blocking. If nullptr,
194 * it is checked whether the FMQ owns an EventFlag object and that is used
195 * for blocking instead.
196 *
197 * @return Whether the write was successful.
198 */
199 bool writeBlocking(const T* data, size_t count, uint32_t readNotification,
200 uint32_t writeNotification, int64_t timeOutNanos = 0,
201 android::hardware::EventFlag* evFlag = nullptr);
202
203 bool writeBlocking(const T* data, size_t count, int64_t timeOutNanos = 0);
204
205 /**
206 * Read some data from the FMQ without blocking.
207 *
208 * @param data Pointer to the array to which read data is to be written.
209 * @param count Number of items to be read.
210 *
211 * @return Whether the read was successful.
212 */
213 bool read(T* data, size_t count);
214
215 /**
216 * Perform a blocking read operation of 'count' items from the FMQ. Does not
217 * perform a partial read.
218 *
219 * If 'evFlag' is nullptr, it is checked whether there is an EventFlag object
220 * associated with the FMQ and it is used in that case.
221 *
222 * The application code must ensure that 'evFlag' used by the
223 * reader(s)/writer is based upon the same EventFlag word.
224 *
225 * The method will return false without blocking if any of the following
226 * conditions are true:
227 * -If 'evFlag' is nullptr and the FMQ does not own an EventFlag object.
228 * -If the 'writeNotification' bit mask is zero.
229 * -If 'count' is greater than the FMQ size.
230 *
231 * This method should only be used with a MessageQueue of the flavor
232 * 'kSynchronizedReadWrite'.
233
234 * If FMQ does not contain 'count' items, the eventFlag bit mask
235 * 'writeNotification' is waited upon. Upon a successful read from the FMQ,
236 * wake is called on 'readNotification' (if non-zero).
237 *
238 * @param data Pointer to the array to which read data is to be written.
239 * @param count Number of items to be read.
240 * @param readNotification The EventFlag bit mask to call wake on after
241 * a successful read. No wake is called if 'readNotification' is zero.
242 * @param writeNotification The EventFlag bit mask to call a wait on
243 * if there is insufficient data in the FMQ to be read.
244 * @param timeOutNanos Number of nanoseconds after which the blocking
245 * read attempt is aborted.
246 * @param evFlag The EventFlag object to be used for blocking.
247 *
248 * @return Whether the read was successful.
249 */
250 bool readBlocking(T* data, size_t count, uint32_t readNotification, uint32_t writeNotification,
251 int64_t timeOutNanos = 0, android::hardware::EventFlag* evFlag = nullptr);
252
253 bool readBlocking(T* data, size_t count, int64_t timeOutNanos = 0);
254
255 /**
256 * Get a pointer to the MQDescriptor object that describes this FMQ.
257 *
258 * @return Pointer to the MQDescriptor associated with the FMQ.
259 */
getDescMessageQueueBase260 const Descriptor* getDesc() const { return mDesc.get(); }
261
262 /**
263 * Get a pointer to the EventFlag word if there is one associated with this FMQ.
264 *
265 * @return Pointer to an EventFlag word, will return nullptr if not
266 * configured. This method does not transfer ownership. The EventFlag
267 * word will be unmapped by the MessageQueue destructor.
268 */
getEventFlagWordMessageQueueBase269 std::atomic<uint32_t>* getEventFlagWord() const { return mEvFlagWord; }
270
271 /**
272 * Describes a memory region in the FMQ.
273 */
274 struct MemRegion {
MemRegionMessageQueueBase::MemRegion275 MemRegion() : MemRegion(nullptr, 0) {}
276
MemRegionMessageQueueBase::MemRegion277 MemRegion(T* base, size_t size) : address(base), length(size) {}
278
279 MemRegion& operator=(const MemRegion& other) {
280 address = other.address;
281 length = other.length;
282 return *this;
283 }
284
285 /**
286 * Gets a pointer to the base address of the MemRegion.
287 */
getAddressMessageQueueBase::MemRegion288 inline T* getAddress() const { return address; }
289
290 /**
291 * Gets the length of the MemRegion. This would equal to the number
292 * of items of type T that can be read from/written into the MemRegion.
293 */
getLengthMessageQueueBase::MemRegion294 inline size_t getLength() const { return length; }
295
296 /**
297 * Gets the length of the MemRegion in bytes.
298 */
299 template <class U = T>
getLengthInBytesMessageQueueBase::MemRegion300 inline std::enable_if_t<!std::is_same_v<U, MQErased>, size_t> getLengthInBytes() const {
301 return length * kQuantumValue<U>;
302 }
303
304 private:
305 /* Base address */
306 T* address;
307
308 /*
309 * Number of items of type T that can be written to/read from the base
310 * address.
311 */
312 size_t length;
313 };
314
315 /**
316 * Describes the memory regions to be used for a read or write.
317 * The struct contains two MemRegion objects since the FMQ is a ring
318 * buffer and a read or write operation can wrap around. A single message
319 * of type T will never be broken between the two MemRegions.
320 */
321 struct MemTransaction {
MemTransactionMessageQueueBase::MemTransaction322 MemTransaction() : MemTransaction(MemRegion(), MemRegion()) {}
323
MemTransactionMessageQueueBase::MemTransaction324 MemTransaction(const MemRegion& regionFirst, const MemRegion& regionSecond)
325 : first(regionFirst), second(regionSecond) {}
326
327 MemTransaction& operator=(const MemTransaction& other) {
328 first = other.first;
329 second = other.second;
330 return *this;
331 }
332
333 /**
334 * Helper method to calculate the address for a particular index for
335 * the MemTransaction object.
336 *
337 * @param idx Index of the slot to be read/written. If the
338 * MemTransaction object is representing the memory region to read/write
339 * N items of type T, the valid range of idx is between 0 and N-1.
340 *
341 * @return Pointer to the slot idx. Will be nullptr for an invalid idx.
342 */
343 T* getSlot(size_t idx);
344
345 /**
346 * Helper method to write 'nMessages' items of type T into the memory
347 * regions described by the object starting from 'startIdx'. This method
348 * uses memcpy() and is not to meant to be used for a zero copy operation.
349 * Partial writes are not supported.
350 *
351 * @param data Pointer to the source buffer.
352 * @param nMessages Number of items of type T.
353 * @param startIdx The slot number to begin the write from. If the
354 * MemTransaction object is representing the memory region to read/write
355 * N items of type T, the valid range of startIdx is between 0 and N-1;
356 *
357 * @return Whether the write operation of size 'nMessages' succeeded.
358 */
359 bool copyTo(const T* data, size_t startIdx, size_t nMessages = 1);
360
361 /*
362 * Helper method to read 'nMessages' items of type T from the memory
363 * regions described by the object starting from 'startIdx'. This method uses
364 * memcpy() and is not meant to be used for a zero copy operation. Partial reads
365 * are not supported.
366 *
367 * @param data Pointer to the destination buffer.
368 * @param nMessages Number of items of type T.
369 * @param startIdx The slot number to begin the read from. If the
370 * MemTransaction object is representing the memory region to read/write
371 * N items of type T, the valid range of startIdx is between 0 and N-1.
372 *
373 * @return Whether the read operation of size 'nMessages' succeeded.
374 */
375 bool copyFrom(T* data, size_t startIdx, size_t nMessages = 1);
376
377 /**
378 * Returns a const reference to the first MemRegion in the
379 * MemTransaction object.
380 */
getFirstRegionMessageQueueBase::MemTransaction381 inline const MemRegion& getFirstRegion() const { return first; }
382
383 /**
384 * Returns a const reference to the second MemRegion in the
385 * MemTransaction object.
386 */
getSecondRegionMessageQueueBase::MemTransaction387 inline const MemRegion& getSecondRegion() const { return second; }
388
389 private:
390 friend MessageQueueBase<MQDescriptorType, T, flavor>;
391
392 bool copyToSized(const T* data, size_t startIdx, size_t nMessages, size_t messageSize);
393 bool copyFromSized(T* data, size_t startIdx, size_t nMessages, size_t messageSize);
394
395 /*
396 * Given a start index and the number of messages to be
397 * read/written, this helper method calculates the
398 * number of messages that should should be written to both the first
399 * and second MemRegions and the base addresses to be used for
400 * the read/write operation.
401 *
402 * Returns false if the 'startIdx' and 'nMessages' is
403 * invalid for the MemTransaction object.
404 */
405 bool inline getMemRegionInfo(size_t idx, size_t nMessages, size_t& firstCount,
406 size_t& secondCount, T** firstBaseAddress,
407 T** secondBaseAddress);
408 MemRegion first;
409 MemRegion second;
410 };
411
412 /**
413 * Get a MemTransaction object to write 'nMessages' items of type T.
414 * Once the write is performed using the information from MemTransaction,
415 * the write operation is to be committed using a call to commitWrite().
416 *
417 * @param nMessages Number of messages of type T.
418 * @param Pointer to MemTransaction struct that describes memory to write 'nMessages'
419 * items of type T. If a write of size 'nMessages' is not possible, the base
420 * addresses in the MemTransaction object would be set to nullptr.
421 *
422 * @return Whether it is possible to write 'nMessages' items of type T
423 * into the FMQ.
424 */
425 bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
426
427 /**
428 * Commit a write of size 'nMessages'. To be only used after a call to beginWrite().
429 *
430 * @param nMessages number of messages of type T to be written.
431 *
432 * @return Whether the write operation of size 'nMessages' succeeded.
433 */
434 bool commitWrite(size_t nMessages);
435
436 /**
437 * Get a MemTransaction object to read 'nMessages' items of type T.
438 * Once the read is performed using the information from MemTransaction,
439 * the read operation is to be committed using a call to commitRead().
440 *
441 * @param nMessages Number of messages of type T.
442 * @param pointer to MemTransaction struct that describes memory to read 'nMessages'
443 * items of type T. If a read of size 'nMessages' is not possible, the base
444 * pointers in the MemTransaction object returned will be set to nullptr.
445 *
446 * @return bool Whether it is possible to read 'nMessages' items of type T
447 * from the FMQ.
448 */
449 bool beginRead(size_t nMessages, MemTransaction* memTx) const;
450
451 /**
452 * Commit a read of size 'nMessages'. To be only used after a call to beginRead().
453 * For the unsynchronized flavor of FMQ, this method will return a failure
454 * if a write overflow happened after beginRead() was invoked.
455 *
456 * @param nMessages number of messages of type T to be read.
457 *
458 * @return bool Whether the read operation of size 'nMessages' succeeded.
459 */
460 bool commitRead(size_t nMessages);
461
462 /**
463 * Get the pointer to the ring buffer. Useful for debugging and fuzzing.
464 */
getRingBufferPtrMessageQueueBase465 uint8_t* getRingBufferPtr() const { return mRing; }
466
467 protected:
468 /**
469 * Protected constructor that can manually specify the quantum to use.
470 * The only external consumer of this ctor is ErasedMessageQueue, but the
471 * constructor cannot be private because this is a base class.
472 *
473 * @param quantum Size of the element type, in bytes.
474 * Other parameters have semantics given in the corresponding public ctor.
475 */
476
477 MessageQueueBase(size_t numElementsInQueue, bool configureEventFlagWord,
478 android::base::unique_fd bufferFd, size_t bufferSize, size_t quantum);
479
480 private:
481 template <class U = T,
482 typename std::enable_if<!std::is_same<U, MQErased>::value, bool>::type = true>
483 static constexpr size_t kQuantumValue = sizeof(T);
484 inline size_t quantum() const;
485 size_t availableToWriteBytes(Error* errorDetected, std::string* errorMessage) const;
486 size_t availableToReadBytes(Error* errorDetected, std::string* errorMessage) const;
487
488 MessageQueueBase(const MessageQueueBase& other) = delete;
489 MessageQueueBase& operator=(const MessageQueueBase& other) = delete;
490
491 void* mapGrantorDescr(uint32_t grantorIdx);
492 void unmapGrantorDescr(void* address, uint32_t grantorIdx);
493 void initMemory(bool resetPointers);
494
495 enum DefaultEventNotification : uint32_t {
496 /*
497 * These are only used internally by the readBlocking()/writeBlocking()
498 * methods and hence once other bit combinations are not required.
499 */
500 FMQ_NOT_FULL = 0x01,
501 FMQ_NOT_EMPTY = 0x02
502 };
503 std::unique_ptr<Descriptor> mDesc;
504 uint8_t* mRing = nullptr;
505 /*
506 * TODO(b/31550092): Change to 32 bit read and write pointer counters.
507 */
508 std::atomic<uint64_t>* mReadPtr = nullptr;
509 std::atomic<uint64_t>* mWritePtr = nullptr;
510
511 std::atomic<uint32_t>* mEvFlagWord = nullptr;
512
513 /*
514 * This EventFlag object will be owned by the FMQ and will have the same
515 * lifetime.
516 */
517 android::hardware::EventFlag* mEventFlag = nullptr;
518
519 const size_t kPageSize = getpagesize();
520 };
521
522 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
getSlot(size_t idx)523 T* MessageQueueBase<MQDescriptorType, T, flavor>::MemTransaction::getSlot(size_t idx) {
524 size_t firstRegionLength = first.getLength();
525 size_t secondRegionLength = second.getLength();
526
527 if (idx > firstRegionLength + secondRegionLength) {
528 return nullptr;
529 }
530
531 if (idx < firstRegionLength) {
532 return first.getAddress() + idx;
533 }
534
535 return second.getAddress() + idx - firstRegionLength;
536 }
537
538 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
getMemRegionInfo(size_t startIdx,size_t nMessages,size_t & firstCount,size_t & secondCount,T ** firstBaseAddress,T ** secondBaseAddress)539 bool MessageQueueBase<MQDescriptorType, T, flavor>::MemTransaction::getMemRegionInfo(
540 size_t startIdx, size_t nMessages, size_t& firstCount, size_t& secondCount,
541 T** firstBaseAddress, T** secondBaseAddress) {
542 size_t firstRegionLength = first.getLength();
543 size_t secondRegionLength = second.getLength();
544
545 if (startIdx + nMessages > firstRegionLength + secondRegionLength) {
546 /*
547 * Return false if 'nMessages' starting at 'startIdx' cannot be
548 * accommodated by the MemTransaction object.
549 */
550 return false;
551 }
552
553 /* Number of messages to be read/written to the first MemRegion. */
554 firstCount =
555 startIdx < firstRegionLength ? std::min(nMessages, firstRegionLength - startIdx) : 0;
556
557 /* Number of messages to be read/written to the second MemRegion. */
558 secondCount = nMessages - firstCount;
559
560 if (firstCount != 0) {
561 *firstBaseAddress = first.getAddress() + startIdx;
562 }
563
564 if (secondCount != 0) {
565 size_t secondStartIdx = startIdx > firstRegionLength ? startIdx - firstRegionLength : 0;
566 *secondBaseAddress = second.getAddress() + secondStartIdx;
567 }
568
569 return true;
570 }
571
572 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
copyFrom(T * data,size_t startIdx,size_t nMessages)573 bool MessageQueueBase<MQDescriptorType, T, flavor>::MemTransaction::copyFrom(T* data,
574 size_t startIdx,
575 size_t nMessages) {
576 if constexpr (!std::is_same<T, MQErased>::value) {
577 return copyFromSized(data, startIdx, nMessages, kQuantumValue<T>);
578 } else {
579 /* Compile error. */
580 static_assert(!std::is_same<T, MQErased>::value,
581 "copyFrom without messageSize argument cannot be used with MQErased (use "
582 "copyFromSized)");
583 }
584 }
585
586 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
copyFromSized(T * data,size_t startIdx,size_t nMessages,size_t messageSize)587 bool MessageQueueBase<MQDescriptorType, T, flavor>::MemTransaction::copyFromSized(
588 T* data, size_t startIdx, size_t nMessages, size_t messageSize) {
589 if (data == nullptr) {
590 return false;
591 }
592
593 size_t firstReadCount = 0, secondReadCount = 0;
594 T *firstBaseAddress = nullptr, *secondBaseAddress = nullptr;
595
596 if (getMemRegionInfo(startIdx, nMessages, firstReadCount, secondReadCount, &firstBaseAddress,
597 &secondBaseAddress) == false) {
598 /*
599 * Returns false if 'startIdx' and 'nMessages' are invalid for this
600 * MemTransaction object.
601 */
602 return false;
603 }
604
605 if (firstReadCount != 0) {
606 memcpy(data, firstBaseAddress, firstReadCount * messageSize);
607 }
608
609 if (secondReadCount != 0) {
610 memcpy(data + firstReadCount, secondBaseAddress, secondReadCount * messageSize);
611 }
612
613 return true;
614 }
615
616 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
copyTo(const T * data,size_t startIdx,size_t nMessages)617 bool MessageQueueBase<MQDescriptorType, T, flavor>::MemTransaction::copyTo(const T* data,
618 size_t startIdx,
619 size_t nMessages) {
620 if constexpr (!std::is_same<T, MQErased>::value) {
621 return copyToSized(data, startIdx, nMessages, kQuantumValue<T>);
622 } else {
623 /* Compile error. */
624 static_assert(!std::is_same<T, MQErased>::value,
625 "copyTo without messageSize argument cannot be used with MQErased (use "
626 "copyToSized)");
627 }
628 }
629
630 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
copyToSized(const T * data,size_t startIdx,size_t nMessages,size_t messageSize)631 bool MessageQueueBase<MQDescriptorType, T, flavor>::MemTransaction::copyToSized(
632 const T* data, size_t startIdx, size_t nMessages, size_t messageSize) {
633 if (data == nullptr) {
634 return false;
635 }
636
637 size_t firstWriteCount = 0, secondWriteCount = 0;
638 T *firstBaseAddress = nullptr, *secondBaseAddress = nullptr;
639
640 if (getMemRegionInfo(startIdx, nMessages, firstWriteCount, secondWriteCount, &firstBaseAddress,
641 &secondBaseAddress) == false) {
642 /*
643 * Returns false if 'startIdx' and 'nMessages' are invalid for this
644 * MemTransaction object.
645 */
646 return false;
647 }
648
649 if (firstWriteCount != 0) {
650 memcpy(firstBaseAddress, data, firstWriteCount * messageSize);
651 }
652
653 if (secondWriteCount != 0) {
654 memcpy(secondBaseAddress, data + firstWriteCount, secondWriteCount * messageSize);
655 }
656
657 return true;
658 }
659
660 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
initMemory(bool resetPointers)661 void MessageQueueBase<MQDescriptorType, T, flavor>::initMemory(bool resetPointers) {
662 /*
663 * Verify that the Descriptor contains the minimum number of grantors
664 * the native_handle is valid and T matches quantum size.
665 */
666 if ((mDesc == nullptr) || !mDesc->isHandleValid() ||
667 (mDesc->countGrantors() < hardware::details::kMinGrantorCount)) {
668 return;
669 }
670 if (mDesc->getQuantum() != quantum()) {
671 hardware::details::logError(
672 "Payload size differs between the queue instantiation and the "
673 "MQDescriptor.");
674 return;
675 }
676
677 if (flavor == kSynchronizedReadWrite) {
678 mReadPtr = reinterpret_cast<std::atomic<uint64_t>*>(
679 mapGrantorDescr(hardware::details::READPTRPOS));
680 } else {
681 /*
682 * The unsynchronized write flavor of the FMQ may have multiple readers
683 * and each reader would have their own read pointer counter.
684 */
685 mReadPtr = new (std::nothrow) std::atomic<uint64_t>;
686 }
687 if (mReadPtr == nullptr) goto error;
688
689 mWritePtr = reinterpret_cast<std::atomic<uint64_t>*>(
690 mapGrantorDescr(hardware::details::WRITEPTRPOS));
691 if (mWritePtr == nullptr) goto error;
692
693 if (resetPointers) {
694 mReadPtr->store(0, std::memory_order_release);
695 mWritePtr->store(0, std::memory_order_release);
696 } else if (flavor != kSynchronizedReadWrite) {
697 // Always reset the read pointer.
698 mReadPtr->store(0, std::memory_order_release);
699 }
700
701 mRing = reinterpret_cast<uint8_t*>(mapGrantorDescr(hardware::details::DATAPTRPOS));
702 if (mRing == nullptr) goto error;
703
704 if (mDesc->countGrantors() > hardware::details::EVFLAGWORDPOS) {
705 mEvFlagWord = static_cast<std::atomic<uint32_t>*>(
706 mapGrantorDescr(hardware::details::EVFLAGWORDPOS));
707 if (mEvFlagWord == nullptr) goto error;
708 android::hardware::EventFlag::createEventFlag(mEvFlagWord, &mEventFlag);
709 }
710 return;
711 error:
712 if (mReadPtr) {
713 if (flavor == kSynchronizedReadWrite) {
714 unmapGrantorDescr(mReadPtr, hardware::details::READPTRPOS);
715 } else {
716 delete mReadPtr;
717 }
718 mReadPtr = nullptr;
719 }
720 if (mWritePtr) {
721 unmapGrantorDescr(mWritePtr, hardware::details::WRITEPTRPOS);
722 mWritePtr = nullptr;
723 }
724 if (mRing) {
725 unmapGrantorDescr(mRing, hardware::details::EVFLAGWORDPOS);
726 mRing = nullptr;
727 }
728 }
729
730 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
MessageQueueBase(const Descriptor & Desc,bool resetPointers)731 MessageQueueBase<MQDescriptorType, T, flavor>::MessageQueueBase(const Descriptor& Desc,
732 bool resetPointers) {
733 mDesc = std::unique_ptr<Descriptor>(new (std::nothrow) Descriptor(Desc));
734 if (mDesc == nullptr || mDesc->getSize() == 0) {
735 hardware::details::logError("MQDescriptor is invalid or queue size is 0.");
736 return;
737 }
738
739 initMemory(resetPointers);
740 }
741
742 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
MessageQueueBase(size_t numElementsInQueue,bool configureEventFlagWord,android::base::unique_fd bufferFd,size_t bufferSize,size_t quantum)743 MessageQueueBase<MQDescriptorType, T, flavor>::MessageQueueBase(size_t numElementsInQueue,
744 bool configureEventFlagWord,
745 android::base::unique_fd bufferFd,
746 size_t bufferSize, size_t quantum) {
747 // Check if the buffer size would not overflow size_t
748 if (numElementsInQueue > SIZE_MAX / quantum) {
749 hardware::details::logError("Requested message queue size too large. Size of elements: " +
750 std::to_string(quantum) +
751 ". Number of elements: " + std::to_string(numElementsInQueue));
752 return;
753 }
754 if (numElementsInQueue == 0) {
755 hardware::details::logError("Requested queue size of 0.");
756 return;
757 }
758 if (bufferFd != -1 && numElementsInQueue * quantum > bufferSize) {
759 hardware::details::logError("The supplied buffer size(" + std::to_string(bufferSize) +
760 ") is smaller than the required size(" +
761 std::to_string(numElementsInQueue * quantum) + ").");
762 return;
763 }
764 /*
765 * The FMQ needs to allocate memory for the ringbuffer as well as for the
766 * read and write pointer counters. If an EventFlag word is to be configured,
767 * we also need to allocate memory for the same/
768 */
769 size_t kQueueSizeBytes = numElementsInQueue * quantum;
770 size_t kMetaDataSize = 2 * sizeof(android::hardware::details::RingBufferPosition);
771
772 if (configureEventFlagWord) {
773 kMetaDataSize += sizeof(std::atomic<uint32_t>);
774 }
775
776 /*
777 * Ashmem memory region size needs to be specified in page-aligned bytes.
778 * kQueueSizeBytes needs to be aligned to word boundary so that all offsets
779 * in the grantorDescriptor will be word aligned.
780 */
781 size_t kAshmemSizePageAligned;
782 if (bufferFd != -1) {
783 // Allocate read counter and write counter only. User-supplied memory will be used for the
784 // ringbuffer.
785 kAshmemSizePageAligned = (kMetaDataSize + kPageSize - 1) & ~(kPageSize - 1);
786 } else {
787 // Allocate ringbuffer, read counter and write counter.
788 kAshmemSizePageAligned = (hardware::details::alignToWordBoundary(kQueueSizeBytes) +
789 kMetaDataSize + kPageSize - 1) &
790 ~(kPageSize - 1);
791 }
792
793 /*
794 * The native handle will contain the fds to be mapped.
795 */
796 int numFds = (bufferFd != -1) ? 2 : 1;
797 native_handle_t* mqHandle = native_handle_create(numFds, 0 /* numInts */);
798 if (mqHandle == nullptr) {
799 return;
800 }
801
802 /*
803 * Create an ashmem region to map the memory.
804 */
805 int ashmemFd = ashmem_create_region("MessageQueue", kAshmemSizePageAligned);
806 ashmem_set_prot_region(ashmemFd, PROT_READ | PROT_WRITE);
807 mqHandle->data[0] = ashmemFd;
808
809 if (bufferFd != -1) {
810 // Use user-supplied file descriptor for fdIndex 1
811 mqHandle->data[1] = bufferFd.get();
812 // release ownership of fd. mqHandle owns it now.
813 if (bufferFd.release() < 0) {
814 hardware::details::logError("Error releasing supplied bufferFd");
815 }
816
817 std::vector<android::hardware::GrantorDescriptor> grantors;
818 grantors.resize(configureEventFlagWord ? hardware::details::kMinGrantorCountForEvFlagSupport
819 : hardware::details::kMinGrantorCount);
820
821 size_t memSize[] = {
822 sizeof(hardware::details::RingBufferPosition), /* memory to be allocated for read
823 pointer counter */
824 sizeof(hardware::details::RingBufferPosition), /* memory to be allocated for write
825 pointer counter */
826 kQueueSizeBytes, /* memory to be allocated for data buffer */
827 sizeof(std::atomic<uint32_t>) /* memory to be allocated for EventFlag word */
828 };
829
830 for (size_t grantorPos = 0, offset = 0; grantorPos < grantors.size(); grantorPos++) {
831 uint32_t grantorFdIndex;
832 size_t grantorOffset;
833 if (grantorPos == hardware::details::DATAPTRPOS) {
834 grantorFdIndex = 1;
835 grantorOffset = 0;
836 } else {
837 grantorFdIndex = 0;
838 grantorOffset = offset;
839 offset += memSize[grantorPos];
840 }
841 grantors[grantorPos] = {
842 0 /* grantor flags */, grantorFdIndex,
843 static_cast<uint32_t>(hardware::details::alignToWordBoundary(grantorOffset)),
844 memSize[grantorPos]};
845 }
846
847 mDesc = std::unique_ptr<Descriptor>(new (std::nothrow)
848 Descriptor(grantors, mqHandle, quantum));
849 } else {
850 mDesc = std::unique_ptr<Descriptor>(new (std::nothrow) Descriptor(
851 kQueueSizeBytes, mqHandle, quantum, configureEventFlagWord));
852 }
853 if (mDesc == nullptr) {
854 native_handle_close(mqHandle);
855 native_handle_delete(mqHandle);
856 return;
857 }
858 initMemory(true);
859 }
860
861 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
~MessageQueueBase()862 MessageQueueBase<MQDescriptorType, T, flavor>::~MessageQueueBase() {
863 if (flavor == kSynchronizedReadWrite && mReadPtr != nullptr) {
864 unmapGrantorDescr(mReadPtr, hardware::details::READPTRPOS);
865 } else if (mReadPtr != nullptr) {
866 delete mReadPtr;
867 }
868 if (mWritePtr != nullptr) {
869 unmapGrantorDescr(mWritePtr, hardware::details::WRITEPTRPOS);
870 }
871 if (mRing != nullptr) {
872 unmapGrantorDescr(mRing, hardware::details::DATAPTRPOS);
873 }
874 if (mEvFlagWord != nullptr) {
875 unmapGrantorDescr(mEvFlagWord, hardware::details::EVFLAGWORDPOS);
876 android::hardware::EventFlag::deleteEventFlag(&mEventFlag);
877 }
878 }
879
880 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
write(const T * data)881 bool MessageQueueBase<MQDescriptorType, T, flavor>::write(const T* data) {
882 return write(data, 1);
883 }
884
885 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
read(T * data)886 bool MessageQueueBase<MQDescriptorType, T, flavor>::read(T* data) {
887 return read(data, 1);
888 }
889
890 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
write(const T * data,size_t nMessages)891 bool MessageQueueBase<MQDescriptorType, T, flavor>::write(const T* data, size_t nMessages) {
892 MemTransaction tx;
893 return beginWrite(nMessages, &tx) &&
894 tx.copyToSized(data, 0 /* startIdx */, nMessages, quantum()) && commitWrite(nMessages);
895 }
896
897 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
writeBlocking(const T * data,size_t count,uint32_t readNotification,uint32_t writeNotification,int64_t timeOutNanos,android::hardware::EventFlag * evFlag)898 bool MessageQueueBase<MQDescriptorType, T, flavor>::writeBlocking(
899 const T* data, size_t count, uint32_t readNotification, uint32_t writeNotification,
900 int64_t timeOutNanos, android::hardware::EventFlag* evFlag) {
901 static_assert(flavor == kSynchronizedReadWrite,
902 "writeBlocking can only be used with the "
903 "kSynchronizedReadWrite flavor.");
904 /*
905 * If evFlag is null and the FMQ does not have its own EventFlag object
906 * return false;
907 * If the flavor is kSynchronizedReadWrite and the readNotification
908 * bit mask is zero return false;
909 * If the count is greater than queue size, return false
910 * to prevent blocking until timeOut.
911 */
912 if (evFlag == nullptr) {
913 evFlag = mEventFlag;
914 if (evFlag == nullptr) {
915 hardware::details::logError(
916 "writeBlocking failed: called on MessageQueue with no Eventflag"
917 "configured or provided");
918 return false;
919 }
920 }
921
922 if (readNotification == 0 || (count > getQuantumCount())) {
923 return false;
924 }
925
926 /*
927 * There is no need to wait for a readNotification if there is sufficient
928 * space to write is already present in the FMQ. The latter would be the case when
929 * read operations read more number of messages than write operations write.
930 * In other words, a single large read may clear the FMQ after multiple small
931 * writes. This would fail to clear a pending readNotification bit since
932 * EventFlag bits can only be cleared by a wait() call, however the bit would
933 * be correctly cleared by the next writeBlocking() call.
934 */
935
936 bool result = write(data, count);
937 if (result) {
938 if (writeNotification) {
939 evFlag->wake(writeNotification);
940 }
941 return result;
942 }
943
944 bool shouldTimeOut = timeOutNanos != 0;
945 int64_t prevTimeNanos = shouldTimeOut ? android::elapsedRealtimeNano() : 0;
946
947 while (true) {
948 /* It is not required to adjust 'timeOutNanos' if 'shouldTimeOut' is false */
949 if (shouldTimeOut) {
950 /*
951 * The current time and 'prevTimeNanos' are both CLOCK_BOOTTIME clock values(converted
952 * to Nanoseconds)
953 */
954 int64_t currentTimeNs = android::elapsedRealtimeNano();
955 /*
956 * Decrement 'timeOutNanos' to account for the time taken to complete the last
957 * iteration of the while loop.
958 */
959 timeOutNanos -= currentTimeNs - prevTimeNanos;
960 prevTimeNanos = currentTimeNs;
961
962 if (timeOutNanos <= 0) {
963 /*
964 * Attempt write in case a context switch happened outside of
965 * evFlag->wait().
966 */
967 result = write(data, count);
968 break;
969 }
970 }
971
972 /*
973 * wait() will return immediately if there was a pending read
974 * notification.
975 */
976 uint32_t efState = 0;
977 status_t status = evFlag->wait(readNotification, &efState, timeOutNanos,
978 true /* retry on spurious wake */);
979
980 if (status != android::TIMED_OUT && status != android::NO_ERROR) {
981 hardware::details::logError("Unexpected error code from EventFlag Wait status " +
982 std::to_string(status));
983 break;
984 }
985
986 if (status == android::TIMED_OUT) {
987 break;
988 }
989
990 /*
991 * If there is still insufficient space to write to the FMQ,
992 * keep waiting for another readNotification.
993 */
994 if ((efState & readNotification) && write(data, count)) {
995 result = true;
996 break;
997 }
998 }
999
1000 if (result && writeNotification != 0) {
1001 evFlag->wake(writeNotification);
1002 }
1003
1004 return result;
1005 }
1006
1007 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
writeBlocking(const T * data,size_t count,int64_t timeOutNanos)1008 bool MessageQueueBase<MQDescriptorType, T, flavor>::writeBlocking(const T* data, size_t count,
1009 int64_t timeOutNanos) {
1010 return writeBlocking(data, count, FMQ_NOT_FULL, FMQ_NOT_EMPTY, timeOutNanos);
1011 }
1012
1013 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
readBlocking(T * data,size_t count,uint32_t readNotification,uint32_t writeNotification,int64_t timeOutNanos,android::hardware::EventFlag * evFlag)1014 bool MessageQueueBase<MQDescriptorType, T, flavor>::readBlocking(
1015 T* data, size_t count, uint32_t readNotification, uint32_t writeNotification,
1016 int64_t timeOutNanos, android::hardware::EventFlag* evFlag) {
1017 static_assert(flavor == kSynchronizedReadWrite,
1018 "readBlocking can only be used with the "
1019 "kSynchronizedReadWrite flavor.");
1020
1021 /*
1022 * If evFlag is null and the FMQ does not own its own EventFlag object
1023 * return false;
1024 * If the writeNotification bit mask is zero return false;
1025 * If the count is greater than queue size, return false to prevent
1026 * blocking until timeOut.
1027 */
1028 if (evFlag == nullptr) {
1029 evFlag = mEventFlag;
1030 if (evFlag == nullptr) {
1031 hardware::details::logError(
1032 "readBlocking failed: called on MessageQueue with no Eventflag"
1033 "configured or provided");
1034 return false;
1035 }
1036 }
1037
1038 if (writeNotification == 0 || count > getQuantumCount()) {
1039 return false;
1040 }
1041
1042 /*
1043 * There is no need to wait for a write notification if sufficient
1044 * data to read is already present in the FMQ. This would be the
1045 * case when read operations read lesser number of messages than
1046 * a write operation and multiple reads would be required to clear the queue
1047 * after a single write operation. This check would fail to clear a pending
1048 * writeNotification bit since EventFlag bits can only be cleared
1049 * by a wait() call, however the bit would be correctly cleared by the next
1050 * readBlocking() call.
1051 */
1052
1053 bool result = read(data, count);
1054 if (result) {
1055 if (readNotification) {
1056 evFlag->wake(readNotification);
1057 }
1058 return result;
1059 }
1060
1061 bool shouldTimeOut = timeOutNanos != 0;
1062 int64_t prevTimeNanos = shouldTimeOut ? android::elapsedRealtimeNano() : 0;
1063
1064 while (true) {
1065 /* It is not required to adjust 'timeOutNanos' if 'shouldTimeOut' is false */
1066 if (shouldTimeOut) {
1067 /*
1068 * The current time and 'prevTimeNanos' are both CLOCK_BOOTTIME clock values(converted
1069 * to Nanoseconds)
1070 */
1071 int64_t currentTimeNs = android::elapsedRealtimeNano();
1072 /*
1073 * Decrement 'timeOutNanos' to account for the time taken to complete the last
1074 * iteration of the while loop.
1075 */
1076 timeOutNanos -= currentTimeNs - prevTimeNanos;
1077 prevTimeNanos = currentTimeNs;
1078
1079 if (timeOutNanos <= 0) {
1080 /*
1081 * Attempt read in case a context switch happened outside of
1082 * evFlag->wait().
1083 */
1084 result = read(data, count);
1085 break;
1086 }
1087 }
1088
1089 /*
1090 * wait() will return immediately if there was a pending write
1091 * notification.
1092 */
1093 uint32_t efState = 0;
1094 status_t status = evFlag->wait(writeNotification, &efState, timeOutNanos,
1095 true /* retry on spurious wake */);
1096
1097 if (status != android::TIMED_OUT && status != android::NO_ERROR) {
1098 hardware::details::logError("Unexpected error code from EventFlag Wait status " +
1099 std::to_string(status));
1100 break;
1101 }
1102
1103 if (status == android::TIMED_OUT) {
1104 break;
1105 }
1106
1107 /*
1108 * If the data in FMQ is still insufficient, go back to waiting
1109 * for another write notification.
1110 */
1111 if ((efState & writeNotification) && read(data, count)) {
1112 result = true;
1113 break;
1114 }
1115 }
1116
1117 if (result && readNotification != 0) {
1118 evFlag->wake(readNotification);
1119 }
1120 return result;
1121 }
1122
1123 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
readBlocking(T * data,size_t count,int64_t timeOutNanos)1124 bool MessageQueueBase<MQDescriptorType, T, flavor>::readBlocking(T* data, size_t count,
1125 int64_t timeOutNanos) {
1126 return readBlocking(data, count, FMQ_NOT_FULL, FMQ_NOT_EMPTY, timeOutNanos);
1127 }
1128
1129 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
quantum()1130 inline size_t MessageQueueBase<MQDescriptorType, T, flavor>::quantum() const {
1131 if constexpr (std::is_same<T, MQErased>::value) {
1132 return mDesc->getQuantum();
1133 } else {
1134 return kQuantumValue<T>;
1135 }
1136 }
1137
1138 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
availableToWriteBytes(Error * errorDetected,std::string * errorMessage)1139 size_t MessageQueueBase<MQDescriptorType, T, flavor>::availableToWriteBytes(
1140 Error* errorDetected, std::string* errorMessage) const {
1141 size_t queueSizeBytes = mDesc->getSize();
1142 Error localErrorDetected = Error::NONE;
1143 size_t availableBytes = availableToReadBytes(&localErrorDetected, errorMessage);
1144 if (localErrorDetected != Error::NONE) {
1145 if (errorDetected != nullptr) {
1146 *errorDetected = localErrorDetected;
1147 }
1148 return 0;
1149 }
1150 if (queueSizeBytes < availableBytes) {
1151 std::string errorMsg =
1152 "The write or read pointer has become corrupted. Writing to the queue is no "
1153 "longer possible. Queue size: " +
1154 std::to_string(queueSizeBytes) + ", available: " + std::to_string(availableBytes);
1155 hardware::details::logError(errorMsg);
1156 if (errorDetected != nullptr) {
1157 *errorDetected = Error::POINTER_CORRUPTION;
1158 }
1159 if (errorMessage != nullptr) {
1160 *errorMessage = std::move(errorMsg);
1161 }
1162 return 0;
1163 }
1164 return queueSizeBytes - availableBytes;
1165 }
1166
1167 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
availableToWrite(Error * errorDetected,std::string * errorMessage)1168 size_t MessageQueueBase<MQDescriptorType, T, flavor>::availableToWrite(
1169 Error* errorDetected, std::string* errorMessage) const {
1170 return availableToWriteBytes(errorDetected, errorMessage) / quantum();
1171 }
1172
1173 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
availableToRead(Error * errorDetected,std::string * errorMessage)1174 size_t MessageQueueBase<MQDescriptorType, T, flavor>::availableToRead(
1175 Error* errorDetected, std::string* errorMessage) const {
1176 return availableToReadBytes(errorDetected, errorMessage) / quantum();
1177 }
1178
1179 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
beginWrite(size_t nMessages,MemTransaction * result)1180 bool MessageQueueBase<MQDescriptorType, T, flavor>::beginWrite(size_t nMessages,
1181 MemTransaction* result) const {
1182 /*
1183 * If nMessages is greater than size of FMQ or in case of the synchronized
1184 * FMQ flavor, if there is not enough space to write nMessages, then return
1185 * result with null addresses.
1186 */
1187 if ((flavor == kSynchronizedReadWrite && (availableToWrite() < nMessages)) ||
1188 nMessages > getQuantumCount()) {
1189 *result = MemTransaction();
1190 return false;
1191 }
1192
1193 auto writePtr = mWritePtr->load(std::memory_order_relaxed);
1194 if (writePtr % quantum() != 0) {
1195 hardware::details::logError(
1196 "The write pointer has become misaligned. Writing to the queue is no longer "
1197 "possible.");
1198 hardware::details::errorWriteLog(0x534e4554, "184963385");
1199 return false;
1200 }
1201 size_t writeOffset = writePtr % mDesc->getSize();
1202
1203 /*
1204 * From writeOffset, the number of messages that can be written
1205 * contiguously without wrapping around the ring buffer are calculated.
1206 */
1207 size_t contiguousMessages = (mDesc->getSize() - writeOffset) / quantum();
1208
1209 if (contiguousMessages < nMessages) {
1210 /*
1211 * Wrap around is required. Both result.first and result.second are
1212 * populated.
1213 */
1214 *result = MemTransaction(
1215 MemRegion(reinterpret_cast<T*>(mRing + writeOffset), contiguousMessages),
1216 MemRegion(reinterpret_cast<T*>(mRing), nMessages - contiguousMessages));
1217 } else {
1218 /*
1219 * A wrap around is not required to write nMessages. Only result.first
1220 * is populated.
1221 */
1222 *result = MemTransaction(MemRegion(reinterpret_cast<T*>(mRing + writeOffset), nMessages),
1223 MemRegion());
1224 }
1225
1226 return true;
1227 }
1228
1229 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
1230 /*
1231 * Disable integer sanitization since integer overflow here is allowed
1232 * and legal.
1233 */
1234 __attribute__((no_sanitize("integer"))) bool
commitWrite(size_t nMessages)1235 MessageQueueBase<MQDescriptorType, T, flavor>::commitWrite(size_t nMessages) {
1236 size_t nBytesWritten = nMessages * quantum();
1237 auto writePtr = mWritePtr->load(std::memory_order_relaxed);
1238 writePtr += nBytesWritten;
1239 mWritePtr->store(writePtr, std::memory_order_release);
1240 /*
1241 * This method cannot fail now since we are only incrementing the writePtr
1242 * counter.
1243 */
1244 return true;
1245 }
1246
1247 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
availableToReadBytes(Error * errorDetected,std::string * errorMessage)1248 size_t MessageQueueBase<MQDescriptorType, T, flavor>::availableToReadBytes(
1249 Error* errorDetected, std::string* errorMessage) const {
1250 /*
1251 * This method is invoked by implementations of both read() and write() and
1252 * hence requires a memory_order_acquired load for both mReadPtr and
1253 * mWritePtr.
1254 */
1255 uint64_t writePtr = mWritePtr->load(std::memory_order_acquire);
1256 uint64_t readPtr = mReadPtr->load(std::memory_order_acquire);
1257 if (writePtr < readPtr) {
1258 std::string errorMsg =
1259 "The write or read pointer has become corrupted. Reading from the queue is no "
1260 "longer possible. Write pointer: " +
1261 std::to_string(writePtr) + ", read pointer: " + std::to_string(readPtr);
1262 hardware::details::logError(errorMsg);
1263 if (errorDetected != nullptr) {
1264 *errorDetected = Error::POINTER_CORRUPTION;
1265 }
1266 if (errorMessage != nullptr) {
1267 *errorMessage = std::move(errorMsg);
1268 }
1269 return 0;
1270 }
1271 return writePtr - readPtr;
1272 }
1273
1274 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
read(T * data,size_t nMessages)1275 bool MessageQueueBase<MQDescriptorType, T, flavor>::read(T* data, size_t nMessages) {
1276 MemTransaction tx;
1277 return beginRead(nMessages, &tx) &&
1278 tx.copyFromSized(data, 0 /* startIdx */, nMessages, quantum()) && commitRead(nMessages);
1279 }
1280
1281 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
1282 /*
1283 * Disable integer sanitization since integer overflow here is allowed
1284 * and legal.
1285 */
1286 __attribute__((no_sanitize("integer"))) bool
beginRead(size_t nMessages,MemTransaction * result)1287 MessageQueueBase<MQDescriptorType, T, flavor>::beginRead(size_t nMessages,
1288 MemTransaction* result) const {
1289 *result = MemTransaction();
1290 /*
1291 * If it is detected that the data in the queue was overwritten
1292 * due to the reader process being too slow, the read pointer counter
1293 * is set to the same as the write pointer counter to indicate error
1294 * and the read returns false;
1295 * Need acquire/release memory ordering for mWritePtr.
1296 */
1297 auto writePtr = mWritePtr->load(std::memory_order_acquire);
1298 /*
1299 * A relaxed load is sufficient for mReadPtr since there will be no
1300 * stores to mReadPtr from a different thread.
1301 */
1302 auto readPtr = mReadPtr->load(std::memory_order_relaxed);
1303 if (writePtr % quantum() != 0 || readPtr % quantum() != 0) {
1304 hardware::details::logError(
1305 "The write or read pointer has become misaligned. Reading from the queue is no "
1306 "longer possible.");
1307 hardware::details::errorWriteLog(0x534e4554, "184963385");
1308 return false;
1309 }
1310
1311 if (writePtr - readPtr > mDesc->getSize()) {
1312 mReadPtr->store(writePtr, std::memory_order_release);
1313 return false;
1314 }
1315
1316 size_t nBytesDesired = nMessages * quantum();
1317 /*
1318 * Return if insufficient data to read in FMQ.
1319 */
1320 if (writePtr - readPtr < nBytesDesired) {
1321 return false;
1322 }
1323
1324 size_t readOffset = readPtr % mDesc->getSize();
1325 /*
1326 * From readOffset, the number of messages that can be read contiguously
1327 * without wrapping around the ring buffer are calculated.
1328 */
1329 size_t contiguousMessages = (mDesc->getSize() - readOffset) / quantum();
1330
1331 if (contiguousMessages < nMessages) {
1332 /*
1333 * A wrap around is required. Both result.first and result.second
1334 * are populated.
1335 */
1336 *result = MemTransaction(
1337 MemRegion(reinterpret_cast<T*>(mRing + readOffset), contiguousMessages),
1338 MemRegion(reinterpret_cast<T*>(mRing), nMessages - contiguousMessages));
1339 } else {
1340 /*
1341 * A wrap around is not required. Only result.first need to be
1342 * populated.
1343 */
1344 *result = MemTransaction(MemRegion(reinterpret_cast<T*>(mRing + readOffset), nMessages),
1345 MemRegion());
1346 }
1347
1348 return true;
1349 }
1350
1351 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
1352 /*
1353 * Disable integer sanitization since integer overflow here is allowed
1354 * and legal.
1355 */
1356 __attribute__((no_sanitize("integer"))) bool
commitRead(size_t nMessages)1357 MessageQueueBase<MQDescriptorType, T, flavor>::commitRead(size_t nMessages) {
1358 // TODO: Use a local copy of readPtr to avoid relazed mReadPtr loads.
1359 auto readPtr = mReadPtr->load(std::memory_order_relaxed);
1360 auto writePtr = mWritePtr->load(std::memory_order_acquire);
1361 /*
1362 * If the flavor is unsynchronized, it is possible that a write overflow may
1363 * have occurred between beginRead() and commitRead().
1364 */
1365 if (writePtr - readPtr > mDesc->getSize()) {
1366 mReadPtr->store(writePtr, std::memory_order_release);
1367 return false;
1368 }
1369
1370 size_t nBytesRead = nMessages * quantum();
1371 readPtr += nBytesRead;
1372 mReadPtr->store(readPtr, std::memory_order_release);
1373 return true;
1374 }
1375
1376 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
getQuantumSize()1377 size_t MessageQueueBase<MQDescriptorType, T, flavor>::getQuantumSize() const {
1378 return mDesc->getQuantum();
1379 }
1380
1381 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
getQuantumCount()1382 size_t MessageQueueBase<MQDescriptorType, T, flavor>::getQuantumCount() const {
1383 return mDesc->getSize() / mDesc->getQuantum();
1384 }
1385
1386 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
isValid()1387 bool MessageQueueBase<MQDescriptorType, T, flavor>::isValid() const {
1388 return mRing != nullptr && mReadPtr != nullptr && mWritePtr != nullptr;
1389 }
1390
1391 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
mapGrantorDescr(uint32_t grantorIdx)1392 void* MessageQueueBase<MQDescriptorType, T, flavor>::mapGrantorDescr(uint32_t grantorIdx) {
1393 const native_handle_t* handle = mDesc->handle();
1394 const std::vector<android::hardware::GrantorDescriptor> grantors = mDesc->grantors();
1395 if (handle == nullptr) {
1396 hardware::details::logError("mDesc->handle is null");
1397 return nullptr;
1398 }
1399
1400 if (grantorIdx >= grantors.size()) {
1401 hardware::details::logError(std::string("grantorIdx must be less than ") +
1402 std::to_string(grantors.size()));
1403 return nullptr;
1404 }
1405
1406 int fdIndex = grantors[grantorIdx].fdIndex;
1407 if (fdIndex < 0 || fdIndex >= handle->numFds) {
1408 hardware::details::logError(
1409 std::string("fdIndex (" + std::to_string(fdIndex) + ") from grantor (index " +
1410 std::to_string(grantorIdx) +
1411 ") must be smaller than the number of fds in the handle: " +
1412 std::to_string(handle->numFds)));
1413 return nullptr;
1414 }
1415
1416 /*
1417 * Offset for mmap must be a multiple of kPageSize.
1418 */
1419 if (!hardware::details::isAlignedToWordBoundary(grantors[grantorIdx].offset)) {
1420 hardware::details::logError("Grantor (index " + std::to_string(grantorIdx) +
1421 ") offset needs to be aligned to word boundary but is: " +
1422 std::to_string(grantors[grantorIdx].offset));
1423 return nullptr;
1424 }
1425
1426 /*
1427 * Expect some grantors to be at least a min size
1428 */
1429 for (uint32_t i = 0; i < grantors.size(); i++) {
1430 switch (i) {
1431 case hardware::details::READPTRPOS:
1432 if (grantors[i].extent < sizeof(uint64_t)) return nullptr;
1433 break;
1434 case hardware::details::WRITEPTRPOS:
1435 if (grantors[i].extent < sizeof(uint64_t)) return nullptr;
1436 break;
1437 case hardware::details::DATAPTRPOS:
1438 // We don't expect specific data size
1439 break;
1440 case hardware::details::EVFLAGWORDPOS:
1441 if (grantors[i].extent < sizeof(uint32_t)) return nullptr;
1442 break;
1443 default:
1444 // We don't care about unknown grantors
1445 break;
1446 }
1447 }
1448
1449 int mapOffset = (grantors[grantorIdx].offset / kPageSize) * kPageSize;
1450 if (grantors[grantorIdx].extent < 0 || grantors[grantorIdx].extent > INT_MAX - kPageSize) {
1451 hardware::details::logError(std::string("Grantor (index " + std::to_string(grantorIdx) +
1452 ") extent value is too large or negative: " +
1453 std::to_string(grantors[grantorIdx].extent)));
1454 return nullptr;
1455 }
1456 int mapLength = grantors[grantorIdx].offset - mapOffset + grantors[grantorIdx].extent;
1457
1458 void* address = mmap(0, mapLength, PROT_READ | PROT_WRITE, MAP_SHARED, handle->data[fdIndex],
1459 mapOffset);
1460 if (address == MAP_FAILED && errno == EPERM && flavor == kUnsynchronizedWrite) {
1461 // If the supplied memory is read-only, it would fail with EPERM.
1462 // Try again to mmap read-only for the kUnsynchronizedWrite case.
1463 // kSynchronizedReadWrite cannot use read-only memory because the
1464 // read pointer is stored in the shared memory as well.
1465 address = mmap(0, mapLength, PROT_READ, MAP_SHARED, handle->data[fdIndex], mapOffset);
1466 }
1467 if (address == MAP_FAILED) {
1468 hardware::details::logError(std::string("mmap failed: ") + std::to_string(errno));
1469 return nullptr;
1470 }
1471 return reinterpret_cast<uint8_t*>(address) + (grantors[grantorIdx].offset - mapOffset);
1472 }
1473
1474 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
unmapGrantorDescr(void * address,uint32_t grantorIdx)1475 void MessageQueueBase<MQDescriptorType, T, flavor>::unmapGrantorDescr(void* address,
1476 uint32_t grantorIdx) {
1477 auto grantors = mDesc->grantors();
1478 if ((address == nullptr) || (grantorIdx >= grantors.size())) {
1479 return;
1480 }
1481
1482 int mapOffset = (grantors[grantorIdx].offset / kPageSize) * kPageSize;
1483 int mapLength = grantors[grantorIdx].offset - mapOffset + grantors[grantorIdx].extent;
1484 void* baseAddress =
1485 reinterpret_cast<uint8_t*>(address) - (grantors[grantorIdx].offset - mapOffset);
1486 if (baseAddress) munmap(baseAddress, mapLength);
1487 }
1488
1489 } // namespace android
1490