• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "BurstUtils.h"
18 
19 #include <android-base/logging.h>
20 #include <android-base/properties.h>
21 #include <android/hardware/neuralnetworks/1.0/types.h>
22 #include <android/hardware/neuralnetworks/1.1/types.h>
23 #include <android/hardware/neuralnetworks/1.2/types.h>
24 #include <fmq/MessageQueue.h>
25 #include <hidl/MQDescriptor.h>
26 #include <nnapi/Result.h>
27 #include <nnapi/Types.h>
28 #include <nnapi/hal/1.0/ProtectCallback.h>
29 
30 #include <atomic>
31 #include <chrono>
32 #include <memory>
33 #include <thread>
34 #include <tuple>
35 #include <utility>
36 #include <vector>
37 
38 namespace android::hardware::neuralnetworks::V1_2::utils {
39 namespace {
40 
41 constexpr V1_2::Timing kNoTiming = {std::numeric_limits<uint64_t>::max(),
42                                     std::numeric_limits<uint64_t>::max()};
43 
getPollingTimeWindow(const std::string & property)44 std::chrono::microseconds getPollingTimeWindow(const std::string& property) {
45     constexpr int32_t kDefaultPollingTimeWindow = 0;
46 #ifdef NN_DEBUGGABLE
47     constexpr int32_t kMinPollingTimeWindow = 0;
48     const int32_t selectedPollingTimeWindow =
49             base::GetIntProperty(property, kDefaultPollingTimeWindow, kMinPollingTimeWindow);
50     return std::chrono::microseconds(selectedPollingTimeWindow);
51 #else
52     (void)property;
53     return std::chrono::microseconds(kDefaultPollingTimeWindow);
54 #endif  // NN_DEBUGGABLE
55 }
56 
57 }  // namespace
58 
getBurstControllerPollingTimeWindow()59 std::chrono::microseconds getBurstControllerPollingTimeWindow() {
60     return getPollingTimeWindow("debug.nn.burst-controller-polling-window");
61 }
62 
getBurstServerPollingTimeWindow()63 std::chrono::microseconds getBurstServerPollingTimeWindow() {
64     return getPollingTimeWindow("debug.nn.burst-server-polling-window");
65 }
66 
67 // serialize a request into a packet
serialize(const V1_0::Request & request,V1_2::MeasureTiming measure,const std::vector<int32_t> & slots)68 std::vector<FmqRequestDatum> serialize(const V1_0::Request& request, V1_2::MeasureTiming measure,
69                                        const std::vector<int32_t>& slots) {
70     // count how many elements need to be sent for a request
71     size_t count = 2 + request.inputs.size() + request.outputs.size() + slots.size();
72     for (const auto& input : request.inputs) {
73         count += input.dimensions.size();
74     }
75     for (const auto& output : request.outputs) {
76         count += output.dimensions.size();
77     }
78     CHECK_LE(count, std::numeric_limits<uint32_t>::max());
79 
80     // create buffer to temporarily store elements
81     std::vector<FmqRequestDatum> data;
82     data.reserve(count);
83 
84     // package packetInfo
85     data.emplace_back();
86     data.back().packetInformation(
87             {.packetSize = static_cast<uint32_t>(count),
88              .numberOfInputOperands = static_cast<uint32_t>(request.inputs.size()),
89              .numberOfOutputOperands = static_cast<uint32_t>(request.outputs.size()),
90              .numberOfPools = static_cast<uint32_t>(slots.size())});
91 
92     // package input data
93     for (const auto& input : request.inputs) {
94         // package operand information
95         data.emplace_back();
96         data.back().inputOperandInformation(
97                 {.hasNoValue = input.hasNoValue,
98                  .location = input.location,
99                  .numberOfDimensions = static_cast<uint32_t>(input.dimensions.size())});
100 
101         // package operand dimensions
102         for (uint32_t dimension : input.dimensions) {
103             data.emplace_back();
104             data.back().inputOperandDimensionValue(dimension);
105         }
106     }
107 
108     // package output data
109     for (const auto& output : request.outputs) {
110         // package operand information
111         data.emplace_back();
112         data.back().outputOperandInformation(
113                 {.hasNoValue = output.hasNoValue,
114                  .location = output.location,
115                  .numberOfDimensions = static_cast<uint32_t>(output.dimensions.size())});
116 
117         // package operand dimensions
118         for (uint32_t dimension : output.dimensions) {
119             data.emplace_back();
120             data.back().outputOperandDimensionValue(dimension);
121         }
122     }
123 
124     // package pool identifier
125     for (int32_t slot : slots) {
126         data.emplace_back();
127         data.back().poolIdentifier(slot);
128     }
129 
130     // package measureTiming
131     data.emplace_back();
132     data.back().measureTiming(measure);
133 
134     CHECK_EQ(data.size(), count);
135 
136     // return packet
137     return data;
138 }
139 
140 // serialize result
serialize(V1_0::ErrorStatus errorStatus,const std::vector<V1_2::OutputShape> & outputShapes,V1_2::Timing timing)141 std::vector<FmqResultDatum> serialize(V1_0::ErrorStatus errorStatus,
142                                       const std::vector<V1_2::OutputShape>& outputShapes,
143                                       V1_2::Timing timing) {
144     // count how many elements need to be sent for a request
145     size_t count = 2 + outputShapes.size();
146     for (const auto& outputShape : outputShapes) {
147         count += outputShape.dimensions.size();
148     }
149 
150     // create buffer to temporarily store elements
151     std::vector<FmqResultDatum> data;
152     data.reserve(count);
153 
154     // package packetInfo
155     data.emplace_back();
156     data.back().packetInformation({.packetSize = static_cast<uint32_t>(count),
157                                    .errorStatus = errorStatus,
158                                    .numberOfOperands = static_cast<uint32_t>(outputShapes.size())});
159 
160     // package output shape data
161     for (const auto& operand : outputShapes) {
162         // package operand information
163         data.emplace_back();
164         data.back().operandInformation(
165                 {.isSufficient = operand.isSufficient,
166                  .numberOfDimensions = static_cast<uint32_t>(operand.dimensions.size())});
167 
168         // package operand dimensions
169         for (uint32_t dimension : operand.dimensions) {
170             data.emplace_back();
171             data.back().operandDimensionValue(dimension);
172         }
173     }
174 
175     // package executionTiming
176     data.emplace_back();
177     data.back().executionTiming(timing);
178 
179     CHECK_EQ(data.size(), count);
180 
181     // return result
182     return data;
183 }
184 
185 // deserialize request
deserialize(const std::vector<FmqRequestDatum> & data)186 nn::Result<std::tuple<V1_0::Request, std::vector<int32_t>, V1_2::MeasureTiming>> deserialize(
187         const std::vector<FmqRequestDatum>& data) {
188     using discriminator = FmqRequestDatum::hidl_discriminator;
189 
190     size_t index = 0;
191 
192     // validate packet information
193     if (index >= data.size() ||
194         data.at(index).getDiscriminator() != discriminator::packetInformation) {
195         return NN_ERROR() << "FMQ Request packet ill-formed";
196     }
197 
198     // unpackage packet information
199     const FmqRequestDatum::PacketInformation& packetInfo = data.at(index).packetInformation();
200     index++;
201     const uint32_t packetSize = packetInfo.packetSize;
202     const uint32_t numberOfInputOperands = packetInfo.numberOfInputOperands;
203     const uint32_t numberOfOutputOperands = packetInfo.numberOfOutputOperands;
204     const uint32_t numberOfPools = packetInfo.numberOfPools;
205 
206     // verify packet size
207     if (data.size() != packetSize) {
208         return NN_ERROR() << "FMQ Request packet ill-formed";
209     }
210 
211     // unpackage input operands
212     std::vector<V1_0::RequestArgument> inputs;
213     inputs.reserve(numberOfInputOperands);
214     for (size_t operand = 0; operand < numberOfInputOperands; ++operand) {
215         // validate input operand information
216         if (index >= data.size() ||
217             data.at(index).getDiscriminator() != discriminator::inputOperandInformation) {
218             return NN_ERROR() << "FMQ Request packet ill-formed";
219         }
220 
221         // unpackage operand information
222         const FmqRequestDatum::OperandInformation& operandInfo =
223                 data.at(index).inputOperandInformation();
224         index++;
225         const bool hasNoValue = operandInfo.hasNoValue;
226         const V1_0::DataLocation location = operandInfo.location;
227         const uint32_t numberOfDimensions = operandInfo.numberOfDimensions;
228 
229         // unpackage operand dimensions
230         std::vector<uint32_t> dimensions;
231         dimensions.reserve(numberOfDimensions);
232         for (size_t i = 0; i < numberOfDimensions; ++i) {
233             // validate dimension
234             if (index >= data.size() ||
235                 data.at(index).getDiscriminator() != discriminator::inputOperandDimensionValue) {
236                 return NN_ERROR() << "FMQ Request packet ill-formed";
237             }
238 
239             // unpackage dimension
240             const uint32_t dimension = data.at(index).inputOperandDimensionValue();
241             index++;
242 
243             // store result
244             dimensions.push_back(dimension);
245         }
246 
247         // store result
248         inputs.push_back(
249                 {.hasNoValue = hasNoValue, .location = location, .dimensions = dimensions});
250     }
251 
252     // unpackage output operands
253     std::vector<V1_0::RequestArgument> outputs;
254     outputs.reserve(numberOfOutputOperands);
255     for (size_t operand = 0; operand < numberOfOutputOperands; ++operand) {
256         // validate output operand information
257         if (index >= data.size() ||
258             data.at(index).getDiscriminator() != discriminator::outputOperandInformation) {
259             return NN_ERROR() << "FMQ Request packet ill-formed";
260         }
261 
262         // unpackage operand information
263         const FmqRequestDatum::OperandInformation& operandInfo =
264                 data.at(index).outputOperandInformation();
265         index++;
266         const bool hasNoValue = operandInfo.hasNoValue;
267         const V1_0::DataLocation location = operandInfo.location;
268         const uint32_t numberOfDimensions = operandInfo.numberOfDimensions;
269 
270         // unpackage operand dimensions
271         std::vector<uint32_t> dimensions;
272         dimensions.reserve(numberOfDimensions);
273         for (size_t i = 0; i < numberOfDimensions; ++i) {
274             // validate dimension
275             if (index >= data.size() ||
276                 data.at(index).getDiscriminator() != discriminator::outputOperandDimensionValue) {
277                 return NN_ERROR() << "FMQ Request packet ill-formed";
278             }
279 
280             // unpackage dimension
281             const uint32_t dimension = data.at(index).outputOperandDimensionValue();
282             index++;
283 
284             // store result
285             dimensions.push_back(dimension);
286         }
287 
288         // store result
289         outputs.push_back(
290                 {.hasNoValue = hasNoValue, .location = location, .dimensions = dimensions});
291     }
292 
293     // unpackage pools
294     std::vector<int32_t> slots;
295     slots.reserve(numberOfPools);
296     for (size_t pool = 0; pool < numberOfPools; ++pool) {
297         // validate input operand information
298         if (index >= data.size() ||
299             data.at(index).getDiscriminator() != discriminator::poolIdentifier) {
300             return NN_ERROR() << "FMQ Request packet ill-formed";
301         }
302 
303         // unpackage operand information
304         const int32_t poolId = data.at(index).poolIdentifier();
305         index++;
306 
307         // store result
308         slots.push_back(poolId);
309     }
310 
311     // validate measureTiming
312     if (index >= data.size() || data.at(index).getDiscriminator() != discriminator::measureTiming) {
313         return NN_ERROR() << "FMQ Request packet ill-formed";
314     }
315 
316     // unpackage measureTiming
317     const V1_2::MeasureTiming measure = data.at(index).measureTiming();
318     index++;
319 
320     // validate packet information
321     if (index != packetSize) {
322         return NN_ERROR() << "FMQ Request packet ill-formed";
323     }
324 
325     // return request
326     V1_0::Request request = {.inputs = inputs, .outputs = outputs, .pools = {}};
327     return std::make_tuple(std::move(request), std::move(slots), measure);
328 }
329 
330 // deserialize a packet into the result
deserialize(const std::vector<FmqResultDatum> & data)331 nn::Result<std::tuple<V1_0::ErrorStatus, std::vector<V1_2::OutputShape>, V1_2::Timing>> deserialize(
332         const std::vector<FmqResultDatum>& data) {
333     using discriminator = FmqResultDatum::hidl_discriminator;
334     size_t index = 0;
335 
336     // validate packet information
337     if (index >= data.size() ||
338         data.at(index).getDiscriminator() != discriminator::packetInformation) {
339         return NN_ERROR() << "FMQ Result packet ill-formed";
340     }
341 
342     // unpackage packet information
343     const FmqResultDatum::PacketInformation& packetInfo = data.at(index).packetInformation();
344     index++;
345     const uint32_t packetSize = packetInfo.packetSize;
346     const V1_0::ErrorStatus errorStatus = packetInfo.errorStatus;
347     const uint32_t numberOfOperands = packetInfo.numberOfOperands;
348 
349     // verify packet size
350     if (data.size() != packetSize) {
351         return NN_ERROR() << "FMQ Result packet ill-formed";
352     }
353 
354     // unpackage operands
355     std::vector<V1_2::OutputShape> outputShapes;
356     outputShapes.reserve(numberOfOperands);
357     for (size_t operand = 0; operand < numberOfOperands; ++operand) {
358         // validate operand information
359         if (index >= data.size() ||
360             data.at(index).getDiscriminator() != discriminator::operandInformation) {
361             return NN_ERROR() << "FMQ Result packet ill-formed";
362         }
363 
364         // unpackage operand information
365         const FmqResultDatum::OperandInformation& operandInfo = data.at(index).operandInformation();
366         index++;
367         const bool isSufficient = operandInfo.isSufficient;
368         const uint32_t numberOfDimensions = operandInfo.numberOfDimensions;
369 
370         // unpackage operand dimensions
371         std::vector<uint32_t> dimensions;
372         dimensions.reserve(numberOfDimensions);
373         for (size_t i = 0; i < numberOfDimensions; ++i) {
374             // validate dimension
375             if (index >= data.size() ||
376                 data.at(index).getDiscriminator() != discriminator::operandDimensionValue) {
377                 return NN_ERROR() << "FMQ Result packet ill-formed";
378             }
379 
380             // unpackage dimension
381             const uint32_t dimension = data.at(index).operandDimensionValue();
382             index++;
383 
384             // store result
385             dimensions.push_back(dimension);
386         }
387 
388         // store result
389         outputShapes.push_back({.dimensions = dimensions, .isSufficient = isSufficient});
390     }
391 
392     // validate execution timing
393     if (index >= data.size() ||
394         data.at(index).getDiscriminator() != discriminator::executionTiming) {
395         return NN_ERROR() << "FMQ Result packet ill-formed";
396     }
397 
398     // unpackage execution timing
399     const V1_2::Timing timing = data.at(index).executionTiming();
400     index++;
401 
402     // validate packet information
403     if (index != packetSize) {
404         return NN_ERROR() << "FMQ Result packet ill-formed";
405     }
406 
407     // return result
408     return std::make_tuple(errorStatus, std::move(outputShapes), timing);
409 }
410 
411 // RequestChannelSender methods
412 
413 nn::GeneralResult<
414         std::pair<std::unique_ptr<RequestChannelSender>, const MQDescriptorSync<FmqRequestDatum>*>>
create(size_t channelLength)415 RequestChannelSender::create(size_t channelLength) {
416     auto requestChannelSender =
417             std::make_unique<RequestChannelSender>(PrivateConstructorTag{}, channelLength);
418     if (!requestChannelSender->mFmqRequestChannel.isValid()) {
419         return NN_ERROR() << "Unable to create RequestChannelSender";
420     }
421 
422     const MQDescriptorSync<FmqRequestDatum>* descriptor =
423             requestChannelSender->mFmqRequestChannel.getDesc();
424     return std::make_pair(std::move(requestChannelSender), descriptor);
425 }
426 
RequestChannelSender(PrivateConstructorTag,size_t channelLength)427 RequestChannelSender::RequestChannelSender(PrivateConstructorTag /*tag*/, size_t channelLength)
428     : mFmqRequestChannel(channelLength, /*configureEventFlagWord=*/true) {}
429 
send(const V1_0::Request & request,V1_2::MeasureTiming measure,const std::vector<int32_t> & slots)430 nn::Result<void> RequestChannelSender::send(const V1_0::Request& request,
431                                             V1_2::MeasureTiming measure,
432                                             const std::vector<int32_t>& slots) {
433     const std::vector<FmqRequestDatum> serialized = serialize(request, measure, slots);
434     return sendPacket(serialized);
435 }
436 
sendPacket(const std::vector<FmqRequestDatum> & packet)437 nn::Result<void> RequestChannelSender::sendPacket(const std::vector<FmqRequestDatum>& packet) {
438     if (!mValid) {
439         return NN_ERROR() << "FMQ object is invalid";
440     }
441 
442     if (packet.size() > mFmqRequestChannel.availableToWrite()) {
443         return NN_ERROR()
444                << "RequestChannelSender::sendPacket -- packet size exceeds size available in FMQ";
445     }
446 
447     // Always send the packet with "blocking" because this signals the futex and unblocks the
448     // consumer if it is waiting on the futex.
449     const bool success = mFmqRequestChannel.writeBlocking(packet.data(), packet.size());
450     if (!success) {
451         return NN_ERROR()
452                << "RequestChannelSender::sendPacket -- FMQ's writeBlocking returned an error";
453     }
454 
455     return {};
456 }
457 
notifyAsDeadObject()458 void RequestChannelSender::notifyAsDeadObject() {
459     mValid = false;
460 }
461 
462 // RequestChannelReceiver methods
463 
create(const MQDescriptorSync<FmqRequestDatum> & requestChannel,std::chrono::microseconds pollingTimeWindow)464 nn::GeneralResult<std::unique_ptr<RequestChannelReceiver>> RequestChannelReceiver::create(
465         const MQDescriptorSync<FmqRequestDatum>& requestChannel,
466         std::chrono::microseconds pollingTimeWindow) {
467     auto requestChannelReceiver = std::make_unique<RequestChannelReceiver>(
468             PrivateConstructorTag{}, requestChannel, pollingTimeWindow);
469 
470     if (!requestChannelReceiver->mFmqRequestChannel.isValid()) {
471         return NN_ERROR() << "Unable to create RequestChannelReceiver";
472     }
473     if (requestChannelReceiver->mFmqRequestChannel.getEventFlagWord() == nullptr) {
474         return NN_ERROR()
475                << "RequestChannelReceiver::create was passed an MQDescriptor without an EventFlag";
476     }
477 
478     return requestChannelReceiver;
479 }
480 
RequestChannelReceiver(PrivateConstructorTag,const MQDescriptorSync<FmqRequestDatum> & requestChannel,std::chrono::microseconds pollingTimeWindow)481 RequestChannelReceiver::RequestChannelReceiver(
482         PrivateConstructorTag /*tag*/, const MQDescriptorSync<FmqRequestDatum>& requestChannel,
483         std::chrono::microseconds pollingTimeWindow)
484     : mFmqRequestChannel(requestChannel), kPollingTimeWindow(pollingTimeWindow) {}
485 
486 nn::Result<std::tuple<V1_0::Request, std::vector<int32_t>, V1_2::MeasureTiming>>
getBlocking()487 RequestChannelReceiver::getBlocking() {
488     const auto packet = NN_TRY(getPacketBlocking());
489     return deserialize(packet);
490 }
491 
invalidate()492 void RequestChannelReceiver::invalidate() {
493     mTeardown = true;
494 
495     // force unblock
496     // ExecutionBurstServer is by default waiting on a request packet. If the client process
497     // destroys its burst object, the server may still be waiting on the futex. This force unblock
498     // wakes up any thread waiting on the futex.
499     const auto data = serialize(V1_0::Request{}, V1_2::MeasureTiming::NO, {});
500     mFmqRequestChannel.writeBlocking(data.data(), data.size());
501 }
502 
getPacketBlocking()503 nn::Result<std::vector<FmqRequestDatum>> RequestChannelReceiver::getPacketBlocking() {
504     if (mTeardown) {
505         return NN_ERROR() << "FMQ object is being torn down";
506     }
507 
508     // First spend time polling if results are available in FMQ instead of waiting on the futex.
509     // Polling is more responsive (yielding lower latencies), but can take up more power, so only
510     // poll for a limited period of time.
511 
512     auto& getCurrentTime = std::chrono::high_resolution_clock::now;
513     const auto timeToStopPolling = getCurrentTime() + kPollingTimeWindow;
514 
515     while (getCurrentTime() < timeToStopPolling) {
516         // if class is being torn down, immediately return
517         if (mTeardown.load(std::memory_order_relaxed)) {
518             return NN_ERROR() << "FMQ object is being torn down";
519         }
520 
521         // Check if data is available. If it is, immediately retrieve it and return.
522         const size_t available = mFmqRequestChannel.availableToRead();
523         if (available > 0) {
524             std::vector<FmqRequestDatum> packet(available);
525             const bool success = mFmqRequestChannel.readBlocking(packet.data(), available);
526             if (!success) {
527                 return NN_ERROR() << "Error receiving packet";
528             }
529             return packet;
530         }
531 
532         std::this_thread::yield();
533     }
534 
535     // If we get to this point, we either stopped polling because it was taking too long or polling
536     // was not allowed. Instead, perform a blocking call which uses a futex to save power.
537 
538     // wait for request packet and read first element of request packet
539     FmqRequestDatum datum;
540     bool success = mFmqRequestChannel.readBlocking(&datum, 1);
541 
542     // retrieve remaining elements
543     // NOTE: all of the data is already available at this point, so there's no need to do a blocking
544     // wait to wait for more data. This is known because in FMQ, all writes are published (made
545     // available) atomically. Currently, the producer always publishes the entire packet in one
546     // function call, so if the first element of the packet is available, the remaining elements are
547     // also available.
548     const size_t count = mFmqRequestChannel.availableToRead();
549     std::vector<FmqRequestDatum> packet(count + 1);
550     std::memcpy(&packet.front(), &datum, sizeof(datum));
551     success &= mFmqRequestChannel.read(packet.data() + 1, count);
552 
553     // terminate loop
554     if (mTeardown) {
555         return NN_ERROR() << "FMQ object is being torn down";
556     }
557 
558     // ensure packet was successfully received
559     if (!success) {
560         return NN_ERROR() << "Error receiving packet";
561     }
562 
563     return packet;
564 }
565 
566 // ResultChannelSender methods
567 
create(const MQDescriptorSync<FmqResultDatum> & resultChannel)568 nn::GeneralResult<std::unique_ptr<ResultChannelSender>> ResultChannelSender::create(
569         const MQDescriptorSync<FmqResultDatum>& resultChannel) {
570     auto resultChannelSender =
571             std::make_unique<ResultChannelSender>(PrivateConstructorTag{}, resultChannel);
572 
573     if (!resultChannelSender->mFmqResultChannel.isValid()) {
574         return NN_ERROR() << "Unable to create RequestChannelSender";
575     }
576     if (resultChannelSender->mFmqResultChannel.getEventFlagWord() == nullptr) {
577         return NN_ERROR()
578                << "ResultChannelSender::create was passed an MQDescriptor without an EventFlag";
579     }
580 
581     return resultChannelSender;
582 }
583 
ResultChannelSender(PrivateConstructorTag,const MQDescriptorSync<FmqResultDatum> & resultChannel)584 ResultChannelSender::ResultChannelSender(PrivateConstructorTag /*tag*/,
585                                          const MQDescriptorSync<FmqResultDatum>& resultChannel)
586     : mFmqResultChannel(resultChannel) {}
587 
send(V1_0::ErrorStatus errorStatus,const std::vector<V1_2::OutputShape> & outputShapes,V1_2::Timing timing)588 void ResultChannelSender::send(V1_0::ErrorStatus errorStatus,
589                                const std::vector<V1_2::OutputShape>& outputShapes,
590                                V1_2::Timing timing) {
591     const std::vector<FmqResultDatum> serialized = serialize(errorStatus, outputShapes, timing);
592     sendPacket(serialized);
593 }
594 
sendPacket(const std::vector<FmqResultDatum> & packet)595 void ResultChannelSender::sendPacket(const std::vector<FmqResultDatum>& packet) {
596     if (packet.size() > mFmqResultChannel.availableToWrite()) {
597         LOG(ERROR)
598                 << "ResultChannelSender::sendPacket -- packet size exceeds size available in FMQ";
599         const std::vector<FmqResultDatum> errorPacket =
600                 serialize(V1_0::ErrorStatus::GENERAL_FAILURE, {}, kNoTiming);
601 
602         // Always send the packet with "blocking" because this signals the futex and unblocks the
603         // consumer if it is waiting on the futex.
604         mFmqResultChannel.writeBlocking(errorPacket.data(), errorPacket.size());
605     } else {
606         // Always send the packet with "blocking" because this signals the futex and unblocks the
607         // consumer if it is waiting on the futex.
608         mFmqResultChannel.writeBlocking(packet.data(), packet.size());
609     }
610 }
611 
612 // ResultChannelReceiver methods
613 
614 nn::GeneralResult<
615         std::pair<std::unique_ptr<ResultChannelReceiver>, const MQDescriptorSync<FmqResultDatum>*>>
create(size_t channelLength,std::chrono::microseconds pollingTimeWindow)616 ResultChannelReceiver::create(size_t channelLength, std::chrono::microseconds pollingTimeWindow) {
617     auto resultChannelReceiver = std::make_unique<ResultChannelReceiver>(
618             PrivateConstructorTag{}, channelLength, pollingTimeWindow);
619     if (!resultChannelReceiver->mFmqResultChannel.isValid()) {
620         return NN_ERROR() << "Unable to create ResultChannelReceiver";
621     }
622 
623     const MQDescriptorSync<FmqResultDatum>* descriptor =
624             resultChannelReceiver->mFmqResultChannel.getDesc();
625     return std::make_pair(std::move(resultChannelReceiver), descriptor);
626 }
627 
ResultChannelReceiver(PrivateConstructorTag,size_t channelLength,std::chrono::microseconds pollingTimeWindow)628 ResultChannelReceiver::ResultChannelReceiver(PrivateConstructorTag /*tag*/, size_t channelLength,
629                                              std::chrono::microseconds pollingTimeWindow)
630     : mFmqResultChannel(channelLength, /*configureEventFlagWord=*/true),
631       kPollingTimeWindow(pollingTimeWindow) {}
632 
633 nn::Result<std::tuple<V1_0::ErrorStatus, std::vector<V1_2::OutputShape>, V1_2::Timing>>
getBlocking()634 ResultChannelReceiver::getBlocking() {
635     const auto packet = NN_TRY(getPacketBlocking());
636     return deserialize(packet);
637 }
638 
notifyAsDeadObject()639 void ResultChannelReceiver::notifyAsDeadObject() {
640     mValid = false;
641 
642     // force unblock
643     // ExecutionBurstController waits on a result packet after sending a request. If the driver
644     // containing ExecutionBurstServer crashes, the controller may be waiting on the futex. This
645     // force unblock wakes up any thread waiting on the futex.
646     const auto data = serialize(V1_0::ErrorStatus::GENERAL_FAILURE, {}, kNoTiming);
647     mFmqResultChannel.writeBlocking(data.data(), data.size());
648 }
649 
getPacketBlocking()650 nn::Result<std::vector<FmqResultDatum>> ResultChannelReceiver::getPacketBlocking() {
651     if (!mValid) {
652         return NN_ERROR() << "FMQ object is invalid";
653     }
654 
655     // First spend time polling if results are available in FMQ instead of waiting on the futex.
656     // Polling is more responsive (yielding lower latencies), but can take up more power, so only
657     // poll for a limited period of time.
658 
659     auto& getCurrentTime = std::chrono::high_resolution_clock::now;
660     const auto timeToStopPolling = getCurrentTime() + kPollingTimeWindow;
661 
662     while (getCurrentTime() < timeToStopPolling) {
663         // if class is being torn down, immediately return
664         if (!mValid.load(std::memory_order_relaxed)) {
665             return NN_ERROR() << "FMQ object is invalid";
666         }
667 
668         // Check if data is available. If it is, immediately retrieve it and return.
669         const size_t available = mFmqResultChannel.availableToRead();
670         if (available > 0) {
671             std::vector<FmqResultDatum> packet(available);
672             const bool success = mFmqResultChannel.readBlocking(packet.data(), available);
673             if (!success) {
674                 return NN_ERROR() << "Error receiving packet";
675             }
676             return packet;
677         }
678 
679         std::this_thread::yield();
680     }
681 
682     // If we get to this point, we either stopped polling because it was taking too long or polling
683     // was not allowed. Instead, perform a blocking call which uses a futex to save power.
684 
685     // wait for result packet and read first element of result packet
686     FmqResultDatum datum;
687     bool success = mFmqResultChannel.readBlocking(&datum, 1);
688 
689     // retrieve remaining elements
690     // NOTE: all of the data is already available at this point, so there's no need to do a blocking
691     // wait to wait for more data. This is known because in FMQ, all writes are published (made
692     // available) atomically. Currently, the producer always publishes the entire packet in one
693     // function call, so if the first element of the packet is available, the remaining elements are
694     // also available.
695     const size_t count = mFmqResultChannel.availableToRead();
696     std::vector<FmqResultDatum> packet(count + 1);
697     std::memcpy(&packet.front(), &datum, sizeof(datum));
698     success &= mFmqResultChannel.read(packet.data() + 1, count);
699 
700     if (!mValid) {
701         return NN_ERROR() << "FMQ object is invalid";
702     }
703 
704     // ensure packet was successfully received
705     if (!success) {
706         return NN_ERROR() << "Error receiving packet";
707     }
708 
709     return packet;
710 }
711 
712 }  // namespace android::hardware::neuralnetworks::V1_2::utils
713