• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2024 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define LOG_TAG "InputConsumerNoResampling"
18 #define ATRACE_TAG ATRACE_TAG_INPUT
19 
20 #include <inttypes.h>
21 #include <set>
22 
23 #include <android-base/logging.h>
24 #include <android-base/properties.h>
25 #include <android-base/stringprintf.h>
26 #include <cutils/properties.h>
27 #include <ftl/enum.h>
28 #include <utils/Trace.h>
29 
30 #include <com_android_input_flags.h>
31 #include <input/InputConsumerNoResampling.h>
32 #include <input/PrintTools.h>
33 #include <input/TraceTools.h>
34 
35 namespace android {
36 
37 namespace {
38 
39 using std::chrono::nanoseconds;
40 
41 using android::base::Result;
42 
43 /**
44  * Log debug messages relating to the consumer end of the transport channel.
45  * Enable this via "adb shell setprop log.tag.InputTransportConsumer DEBUG" (requires restart)
46  */
47 const bool DEBUG_TRANSPORT_CONSUMER =
48         __android_log_is_loggable(ANDROID_LOG_DEBUG, LOG_TAG "Consumer", ANDROID_LOG_INFO);
49 
createKeyEvent(const InputMessage & msg)50 std::unique_ptr<KeyEvent> createKeyEvent(const InputMessage& msg) {
51     std::unique_ptr<KeyEvent> event = std::make_unique<KeyEvent>();
52     event->initialize(msg.body.key.eventId, msg.body.key.deviceId, msg.body.key.source,
53                       ui::LogicalDisplayId{msg.body.key.displayId}, msg.body.key.hmac,
54                       msg.body.key.action, msg.body.key.flags, msg.body.key.keyCode,
55                       msg.body.key.scanCode, msg.body.key.metaState, msg.body.key.repeatCount,
56                       msg.body.key.downTime, msg.body.key.eventTime);
57     return event;
58 }
59 
createFocusEvent(const InputMessage & msg)60 std::unique_ptr<FocusEvent> createFocusEvent(const InputMessage& msg) {
61     std::unique_ptr<FocusEvent> event = std::make_unique<FocusEvent>();
62     event->initialize(msg.body.focus.eventId, msg.body.focus.hasFocus);
63     return event;
64 }
65 
createCaptureEvent(const InputMessage & msg)66 std::unique_ptr<CaptureEvent> createCaptureEvent(const InputMessage& msg) {
67     std::unique_ptr<CaptureEvent> event = std::make_unique<CaptureEvent>();
68     event->initialize(msg.body.capture.eventId, msg.body.capture.pointerCaptureEnabled);
69     return event;
70 }
71 
createDragEvent(const InputMessage & msg)72 std::unique_ptr<DragEvent> createDragEvent(const InputMessage& msg) {
73     std::unique_ptr<DragEvent> event = std::make_unique<DragEvent>();
74     event->initialize(msg.body.drag.eventId, msg.body.drag.x, msg.body.drag.y,
75                       msg.body.drag.isExiting);
76     return event;
77 }
78 
createMotionEvent(const InputMessage & msg)79 std::unique_ptr<MotionEvent> createMotionEvent(const InputMessage& msg) {
80     std::unique_ptr<MotionEvent> event = std::make_unique<MotionEvent>();
81     const uint32_t pointerCount = msg.body.motion.pointerCount;
82     std::vector<PointerProperties> pointerProperties;
83     pointerProperties.reserve(pointerCount);
84     std::vector<PointerCoords> pointerCoords;
85     pointerCoords.reserve(pointerCount);
86     for (uint32_t i = 0; i < pointerCount; i++) {
87         pointerProperties.push_back(msg.body.motion.pointers[i].properties);
88         pointerCoords.push_back(msg.body.motion.pointers[i].coords);
89     }
90 
91     ui::Transform transform;
92     transform.set({msg.body.motion.dsdx, msg.body.motion.dtdx, msg.body.motion.tx,
93                    msg.body.motion.dtdy, msg.body.motion.dsdy, msg.body.motion.ty, 0, 0, 1});
94     ui::Transform displayTransform;
95     displayTransform.set({msg.body.motion.dsdxRaw, msg.body.motion.dtdxRaw, msg.body.motion.txRaw,
96                           msg.body.motion.dtdyRaw, msg.body.motion.dsdyRaw, msg.body.motion.tyRaw,
97                           0, 0, 1});
98     event->initialize(msg.body.motion.eventId, msg.body.motion.deviceId, msg.body.motion.source,
99                       ui::LogicalDisplayId{msg.body.motion.displayId}, msg.body.motion.hmac,
100                       msg.body.motion.action, msg.body.motion.actionButton, msg.body.motion.flags,
101                       msg.body.motion.edgeFlags, msg.body.motion.metaState,
102                       msg.body.motion.buttonState, msg.body.motion.classification, transform,
103                       msg.body.motion.xPrecision, msg.body.motion.yPrecision,
104                       msg.body.motion.xCursorPosition, msg.body.motion.yCursorPosition,
105                       displayTransform, msg.body.motion.downTime, msg.body.motion.eventTime,
106                       pointerCount, pointerProperties.data(), pointerCoords.data());
107     return event;
108 }
109 
addSample(MotionEvent & event,const InputMessage & msg)110 void addSample(MotionEvent& event, const InputMessage& msg) {
111     uint32_t pointerCount = msg.body.motion.pointerCount;
112     std::vector<PointerCoords> pointerCoords;
113     pointerCoords.reserve(pointerCount);
114     for (uint32_t i = 0; i < pointerCount; i++) {
115         pointerCoords.push_back(msg.body.motion.pointers[i].coords);
116     }
117 
118     // TODO(b/329770983): figure out if it's safe to combine events with mismatching metaState
119     event.setMetaState(event.getMetaState() | msg.body.motion.metaState);
120     event.addSample(msg.body.motion.eventTime, pointerCoords.data(), msg.body.motion.eventId);
121 }
122 
createTouchModeEvent(const InputMessage & msg)123 std::unique_ptr<TouchModeEvent> createTouchModeEvent(const InputMessage& msg) {
124     std::unique_ptr<TouchModeEvent> event = std::make_unique<TouchModeEvent>();
125     event->initialize(msg.body.touchMode.eventId, msg.body.touchMode.isInTouchMode);
126     return event;
127 }
128 
outboundMessageToString(const InputMessage & outboundMsg)129 std::string outboundMessageToString(const InputMessage& outboundMsg) {
130     switch (outboundMsg.header.type) {
131         case InputMessage::Type::FINISHED: {
132             return android::base::StringPrintf("  Finish: seq=%" PRIu32 " handled=%s",
133                                                outboundMsg.header.seq,
134                                                toString(outboundMsg.body.finished.handled));
135         }
136         case InputMessage::Type::TIMELINE: {
137             return android::base::
138                     StringPrintf("  Timeline: inputEventId=%" PRId32 " gpuCompletedTime=%" PRId64
139                                  ", presentTime=%" PRId64,
140                                  outboundMsg.body.timeline.eventId,
141                                  outboundMsg.body.timeline
142                                          .graphicsTimeline[GraphicsTimeline::GPU_COMPLETED_TIME],
143                                  outboundMsg.body.timeline
144                                          .graphicsTimeline[GraphicsTimeline::PRESENT_TIME]);
145         }
146         default: {
147             LOG(FATAL) << "Outbound message must be FINISHED or TIMELINE, got "
148                        << ftl::enum_string(outboundMsg.header.type);
149             return "Unreachable";
150         }
151     }
152 }
153 
createFinishedMessage(uint32_t seq,bool handled,nsecs_t consumeTime)154 InputMessage createFinishedMessage(uint32_t seq, bool handled, nsecs_t consumeTime) {
155     InputMessage msg;
156     msg.header.type = InputMessage::Type::FINISHED;
157     msg.header.seq = seq;
158     msg.body.finished.handled = handled;
159     msg.body.finished.consumeTime = consumeTime;
160     return msg;
161 }
162 
createTimelineMessage(int32_t inputEventId,nsecs_t gpuCompletedTime,nsecs_t presentTime)163 InputMessage createTimelineMessage(int32_t inputEventId, nsecs_t gpuCompletedTime,
164                                    nsecs_t presentTime) {
165     InputMessage msg;
166     msg.header.type = InputMessage::Type::TIMELINE;
167     msg.header.seq = 0;
168     msg.body.timeline.eventId = inputEventId;
169     msg.body.timeline.graphicsTimeline[GraphicsTimeline::GPU_COMPLETED_TIME] = gpuCompletedTime;
170     msg.body.timeline.graphicsTimeline[GraphicsTimeline::PRESENT_TIME] = presentTime;
171     return msg;
172 }
173 
174 } // namespace
175 
176 // --- InputConsumerNoResampling ---
177 
InputConsumerNoResampling(const std::shared_ptr<InputChannel> & channel,sp<Looper> looper,InputConsumerCallbacks & callbacks,std::function<std::unique_ptr<Resampler> ()> resamplerCreator)178 InputConsumerNoResampling::InputConsumerNoResampling(
179         const std::shared_ptr<InputChannel>& channel, sp<Looper> looper,
180         InputConsumerCallbacks& callbacks,
181         std::function<std::unique_ptr<Resampler>()> resamplerCreator)
182       : mChannel{channel},
183         mLooper{looper},
184         mCallbacks{callbacks},
185         mResamplerCreator{std::move(resamplerCreator)},
186         mFdEvents(0) {
187     LOG_ALWAYS_FATAL_IF(mLooper == nullptr);
188     mCallback = sp<LooperEventCallback>::make(
189             std::bind(&InputConsumerNoResampling::handleReceiveCallback, this,
190                       std::placeholders::_1));
191     // In the beginning, there are no pending outbounds events; we only care about receiving
192     // incoming data.
193     setFdEvents(ALOOPER_EVENT_INPUT);
194 }
195 
~InputConsumerNoResampling()196 InputConsumerNoResampling::~InputConsumerNoResampling() {
197     ensureCalledOnLooperThread(__func__);
198     // If there are any remaining unread batches, send an ack for them and don't deliver
199     // them to callbacks.
200     for (auto& [_, batches] : mBatches) {
201         while (!batches.empty()) {
202             finishInputEvent(batches.front().header.seq, /*handled=*/false);
203             batches.pop();
204         }
205     }
206 
207     while (!mOutboundQueue.empty()) {
208         processOutboundEvents();
209         // This is our last chance to ack the events. If we don't ack them here, we will get an ANR,
210         // so keep trying to send the events as long as they are present in the queue.
211     }
212     // However, it is still up to the app to finish any events that have already been delivered
213     // to the callbacks. If we wanted to change that behaviour and auto-finish all unfinished events
214     // that were already sent to callbacks, we could potentially loop through "mConsumeTimes"
215     // instead. We can't use "mBatchedSequenceNumbers" for this purpose, because it only contains
216     // batchable (i.e., ACTION_MOVE) events that were sent to the callbacks.
217     const size_t unfinishedEvents = mConsumeTimes.size();
218     LOG_IF(INFO, unfinishedEvents != 0)
219             << getName() << " has " << unfinishedEvents << " unfinished event(s)";
220     // Remove the fd from epoll, so that Looper does not call 'handleReceiveCallback' anymore.
221     // This must be done at the end of the destructor; otherwise, some of the other functions may
222     // call 'setFdEvents' as a side-effect, thus adding the fd back to the epoll set of the looper.
223     setFdEvents(0);
224 }
225 
handleReceiveCallback(int events)226 int InputConsumerNoResampling::handleReceiveCallback(int events) {
227     // Allowed return values of this function as documented in LooperCallback::handleEvent
228     constexpr int REMOVE_CALLBACK = 0;
229     constexpr int KEEP_CALLBACK = 1;
230 
231     if (events & (ALOOPER_EVENT_ERROR | ALOOPER_EVENT_HANGUP)) {
232         // This error typically occurs when the publisher has closed the input channel
233         // as part of removing a window or finishing an IME session, in which case
234         // the consumer will soon be disposed as well.
235         if (DEBUG_TRANSPORT_CONSUMER) {
236             LOG(INFO) << "The channel was hung up or an error occurred: " << mChannel->getName();
237         }
238         return REMOVE_CALLBACK;
239     }
240 
241     int handledEvents = 0;
242     if (events & ALOOPER_EVENT_INPUT) {
243         handleMessages(readAllMessages());
244         handledEvents |= ALOOPER_EVENT_INPUT;
245     }
246 
247     if (events & ALOOPER_EVENT_OUTPUT) {
248         processOutboundEvents();
249         handledEvents |= ALOOPER_EVENT_OUTPUT;
250     }
251     if (handledEvents != events) {
252         LOG(FATAL) << "Mismatch: handledEvents=" << handledEvents << ", events=" << events;
253     }
254     return KEEP_CALLBACK;
255 }
256 
processOutboundEvents()257 void InputConsumerNoResampling::processOutboundEvents() {
258     while (!mOutboundQueue.empty()) {
259         const InputMessage& outboundMsg = mOutboundQueue.front();
260 
261         const status_t result = mChannel->sendMessage(&outboundMsg);
262         if (result == OK) {
263             if (outboundMsg.header.type == InputMessage::Type::FINISHED) {
264                 ATRACE_ASYNC_END("InputConsumer processing", /*cookie=*/outboundMsg.header.seq);
265             }
266             // Successful send. Erase the entry and keep trying to send more
267             mOutboundQueue.pop();
268             continue;
269         }
270 
271         // Publisher is busy, try again later. Keep this entry (do not erase)
272         if (result == WOULD_BLOCK) {
273             setFdEvents(ALOOPER_EVENT_INPUT | ALOOPER_EVENT_OUTPUT);
274             return; // try again later
275         }
276 
277         if (result == DEAD_OBJECT) {
278             // If there's no one to receive events in the channel, there's no point in sending them.
279             // Drop all outbound events.
280             LOG(INFO) << "Channel " << mChannel->getName() << " died. Dropping outbound event "
281                       << outboundMsg;
282             mOutboundQueue.pop();
283             setFdEvents(0);
284             continue;
285         }
286         // Some other error. Give up
287         LOG(FATAL) << "Failed to send outbound event on channel '" << mChannel->getName()
288                    << "'.  status=" << statusToString(result) << "(" << result << ")";
289     }
290 
291     // The queue is now empty. Tell looper there's no more output to expect.
292     setFdEvents(ALOOPER_EVENT_INPUT);
293 }
294 
finishInputEvent(uint32_t seq,bool handled)295 void InputConsumerNoResampling::finishInputEvent(uint32_t seq, bool handled) {
296     ensureCalledOnLooperThread(__func__);
297     mOutboundQueue.push(createFinishedMessage(seq, handled, popConsumeTime(seq)));
298     // also produce finish events for all batches for this seq (if any)
299     const auto it = mBatchedSequenceNumbers.find(seq);
300     if (it != mBatchedSequenceNumbers.end()) {
301         for (uint32_t subSeq : it->second) {
302             mOutboundQueue.push(createFinishedMessage(subSeq, handled, popConsumeTime(subSeq)));
303         }
304         mBatchedSequenceNumbers.erase(it);
305     }
306     processOutboundEvents();
307 }
308 
probablyHasInput() const309 bool InputConsumerNoResampling::probablyHasInput() const {
310     // Ideally, this would only be allowed to run on the looper thread, and in production, it will.
311     // However, for testing, it's convenient to call this while the looper thread is blocked, so
312     // we do not call ensureCalledOnLooperThread here.
313     return (!mBatches.empty()) || mChannel->probablyHasInput();
314 }
315 
reportTimeline(int32_t inputEventId,nsecs_t gpuCompletedTime,nsecs_t presentTime)316 void InputConsumerNoResampling::reportTimeline(int32_t inputEventId, nsecs_t gpuCompletedTime,
317                                                nsecs_t presentTime) {
318     ensureCalledOnLooperThread(__func__);
319     mOutboundQueue.push(createTimelineMessage(inputEventId, gpuCompletedTime, presentTime));
320     processOutboundEvents();
321 }
322 
popConsumeTime(uint32_t seq)323 nsecs_t InputConsumerNoResampling::popConsumeTime(uint32_t seq) {
324     auto it = mConsumeTimes.find(seq);
325     // Consume time will be missing if either 'finishInputEvent' is called twice, or if it was
326     // called for the wrong (synthetic?) input event. Either way, it is a bug that should be fixed.
327     LOG_ALWAYS_FATAL_IF(it == mConsumeTimes.end(), "Could not find consume time for seq=%" PRIu32,
328                         seq);
329     nsecs_t consumeTime = it->second;
330     mConsumeTimes.erase(it);
331     return consumeTime;
332 }
333 
setFdEvents(int events)334 void InputConsumerNoResampling::setFdEvents(int events) {
335     if (mFdEvents != events) {
336         mFdEvents = events;
337         if (events != 0) {
338             mLooper->addFd(mChannel->getFd(), 0, events, mCallback, nullptr);
339         } else {
340             mLooper->removeFd(mChannel->getFd());
341         }
342     }
343 }
344 
handleMessages(std::vector<InputMessage> && messages)345 void InputConsumerNoResampling::handleMessages(std::vector<InputMessage>&& messages) {
346     for (const InputMessage& msg : messages) {
347         if (msg.header.type == InputMessage::Type::MOTION) {
348             const int32_t action = msg.body.motion.action;
349             const DeviceId deviceId = msg.body.motion.deviceId;
350             const int32_t source = msg.body.motion.source;
351             const bool batchableEvent = (action == AMOTION_EVENT_ACTION_MOVE ||
352                                          action == AMOTION_EVENT_ACTION_HOVER_MOVE) &&
353                     (isFromSource(source, AINPUT_SOURCE_CLASS_POINTER) ||
354                      isFromSource(source, AINPUT_SOURCE_CLASS_JOYSTICK));
355 
356             const bool canResample = (mResamplerCreator != nullptr) &&
357                     (isFromSource(source, AINPUT_SOURCE_CLASS_POINTER));
358             if (canResample) {
359                 if (action == AMOTION_EVENT_ACTION_DOWN) {
360                     if (std::unique_ptr<Resampler> resampler = mResamplerCreator();
361                         resampler != nullptr) {
362                         const auto [_, inserted] =
363                                 mResamplers.insert(std::pair(deviceId, std::move(resampler)));
364                         LOG_IF(WARNING, !inserted) << deviceId << "already exists in mResamplers";
365                     }
366                 }
367             }
368 
369             if (batchableEvent) {
370                 // add it to batch
371                 mBatches[deviceId].emplace(msg);
372             } else {
373                 // consume all pending batches for this device immediately
374                 consumeBatchedInputEvents(deviceId, /*requestedFrameTime=*/
375                                           std::numeric_limits<nsecs_t>::max());
376                 if (canResample &&
377                     (action == AMOTION_EVENT_ACTION_UP || action == AMOTION_EVENT_ACTION_CANCEL)) {
378                     LOG_IF(INFO, mResamplers.erase(deviceId) == 0)
379                             << deviceId << "does not exist in mResamplers";
380                 }
381                 handleMessage(msg);
382             }
383         } else {
384             // Non-motion events shouldn't force the consumption of pending batched events
385             handleMessage(msg);
386         }
387     }
388     // At the end of this, if we still have pending batches, notify the receiver about it.
389 
390     // We need to carefully notify the InputConsumerCallbacks about the pending batch. The receiver
391     // could choose to consume all events when notified about the batch. That means that the
392     // "mBatches" variable could change when 'InputConsumerCallbacks::onBatchedInputEventPending' is
393     // invoked. We also can't notify the InputConsumerCallbacks in a while loop until mBatches is
394     // empty, because the receiver could choose to not consume the batch immediately.
395     std::set<int32_t> pendingBatchSources;
396     for (const auto& [_, pendingMessages] : mBatches) {
397         // Assume that all messages for a given device has the same source.
398         pendingBatchSources.insert(pendingMessages.front().body.motion.source);
399     }
400     for (const int32_t source : pendingBatchSources) {
401         const bool sourceStillRemaining =
402                 std::any_of(mBatches.begin(), mBatches.end(), [=](const auto& pair) {
403                     return pair.second.front().body.motion.source == source;
404                 });
405         if (sourceStillRemaining) {
406             mCallbacks.onBatchedInputEventPending(source);
407         }
408     }
409 }
410 
readAllMessages()411 std::vector<InputMessage> InputConsumerNoResampling::readAllMessages() {
412     std::vector<InputMessage> messages;
413     while (true) {
414         android::base::Result<InputMessage> result = mChannel->receiveMessage();
415         if (result.ok()) {
416             const InputMessage& msg = *result;
417             const auto [_, inserted] =
418                     mConsumeTimes.emplace(msg.header.seq, systemTime(SYSTEM_TIME_MONOTONIC));
419             LOG_ALWAYS_FATAL_IF(!inserted, "Already have a consume time for seq=%" PRIu32,
420                                 msg.header.seq);
421 
422             // Trace the event processing timeline - event was just read from the socket
423             // TODO(b/329777420): distinguish between multiple instances of InputConsumer
424             // in the same process.
425             ATRACE_ASYNC_BEGIN("InputConsumer processing", /*cookie=*/msg.header.seq);
426             messages.push_back(msg);
427         } else { // !result.ok()
428             switch (result.error().code()) {
429                 case WOULD_BLOCK: {
430                     return messages;
431                 }
432                 case DEAD_OBJECT: {
433                     LOG(FATAL) << "Got a dead object for " << mChannel->getName();
434                     break;
435                 }
436                 case BAD_VALUE: {
437                     LOG(FATAL) << "Got a bad value for " << mChannel->getName();
438                     break;
439                 }
440                 default: {
441                     LOG(FATAL) << "Unexpected error: " << result.error().message();
442                     break;
443                 }
444             }
445         }
446     }
447 }
448 
handleMessage(const InputMessage & msg) const449 void InputConsumerNoResampling::handleMessage(const InputMessage& msg) const {
450     switch (msg.header.type) {
451         case InputMessage::Type::KEY: {
452             std::unique_ptr<KeyEvent> keyEvent = createKeyEvent(msg);
453             mCallbacks.onKeyEvent(std::move(keyEvent), msg.header.seq);
454             break;
455         }
456 
457         case InputMessage::Type::MOTION: {
458             std::unique_ptr<MotionEvent> motionEvent = createMotionEvent(msg);
459             mCallbacks.onMotionEvent(std::move(motionEvent), msg.header.seq);
460             break;
461         }
462 
463         case InputMessage::Type::FINISHED:
464         case InputMessage::Type::TIMELINE: {
465             LOG(FATAL) << "Consumed a " << ftl::enum_string(msg.header.type)
466                        << " message, which should never be seen by InputConsumer on "
467                        << mChannel->getName();
468             break;
469         }
470 
471         case InputMessage::Type::FOCUS: {
472             std::unique_ptr<FocusEvent> focusEvent = createFocusEvent(msg);
473             mCallbacks.onFocusEvent(std::move(focusEvent), msg.header.seq);
474             break;
475         }
476 
477         case InputMessage::Type::CAPTURE: {
478             std::unique_ptr<CaptureEvent> captureEvent = createCaptureEvent(msg);
479             mCallbacks.onCaptureEvent(std::move(captureEvent), msg.header.seq);
480             break;
481         }
482 
483         case InputMessage::Type::DRAG: {
484             std::unique_ptr<DragEvent> dragEvent = createDragEvent(msg);
485             mCallbacks.onDragEvent(std::move(dragEvent), msg.header.seq);
486             break;
487         }
488 
489         case InputMessage::Type::TOUCH_MODE: {
490             std::unique_ptr<TouchModeEvent> touchModeEvent = createTouchModeEvent(msg);
491             mCallbacks.onTouchModeEvent(std::move(touchModeEvent), msg.header.seq);
492             break;
493         }
494     }
495 }
496 
497 std::pair<std::unique_ptr<MotionEvent>, std::optional<uint32_t>>
createBatchedMotionEvent(const std::optional<nsecs_t> requestedFrameTime,std::queue<InputMessage> & messages)498 InputConsumerNoResampling::createBatchedMotionEvent(const std::optional<nsecs_t> requestedFrameTime,
499                                                     std::queue<InputMessage>& messages) {
500     std::unique_ptr<MotionEvent> motionEvent;
501     std::optional<uint32_t> firstSeqForBatch;
502 
503     LOG_IF(FATAL, messages.empty()) << "messages queue is empty!";
504     const DeviceId deviceId = messages.front().body.motion.deviceId;
505     const auto resampler = mResamplers.find(deviceId);
506     const nanoseconds resampleLatency = (resampler != mResamplers.cend())
507             ? resampler->second->getResampleLatency()
508             : nanoseconds{0};
509     // When batching is not enabled, we want to consume all events. That's equivalent to having an
510     // infinite requestedFrameTime.
511     const nanoseconds adjustedFrameTime = (requestedFrameTime.has_value())
512             ? (nanoseconds{*requestedFrameTime} - resampleLatency)
513             : nanoseconds{std::numeric_limits<nsecs_t>::max()};
514 
515     while (!messages.empty() &&
516            (messages.front().body.motion.eventTime <= adjustedFrameTime.count())) {
517         if (motionEvent == nullptr) {
518             motionEvent = createMotionEvent(messages.front());
519             firstSeqForBatch = messages.front().header.seq;
520             const auto [_, inserted] = mBatchedSequenceNumbers.insert({*firstSeqForBatch, {}});
521             LOG_IF(FATAL, !inserted)
522                     << "The sequence " << messages.front().header.seq << " was already present!";
523         } else {
524             addSample(*motionEvent, messages.front());
525             mBatchedSequenceNumbers[*firstSeqForBatch].push_back(messages.front().header.seq);
526         }
527         messages.pop();
528     }
529 
530     // Check if resampling should be performed.
531     InputMessage* futureSample = nullptr;
532     if (!messages.empty()) {
533         futureSample = &messages.front();
534     }
535     if ((motionEvent != nullptr) && (resampler != mResamplers.cend()) &&
536         (requestedFrameTime.has_value())) {
537         resampler->second->resampleMotionEvent(nanoseconds{*requestedFrameTime}, *motionEvent,
538                                                futureSample);
539     }
540 
541     return std::make_pair(std::move(motionEvent), firstSeqForBatch);
542 }
543 
consumeBatchedInputEvents(std::optional<DeviceId> deviceId,std::optional<nsecs_t> requestedFrameTime)544 bool InputConsumerNoResampling::consumeBatchedInputEvents(
545         std::optional<DeviceId> deviceId, std::optional<nsecs_t> requestedFrameTime) {
546     ensureCalledOnLooperThread(__func__);
547     bool producedEvents = false;
548 
549     for (auto deviceIdIter = (deviceId.has_value()) ? (mBatches.find(*deviceId))
550                                                     : (mBatches.begin());
551          deviceIdIter != mBatches.cend(); ++deviceIdIter) {
552         std::queue<InputMessage>& messages = deviceIdIter->second;
553         auto [motion, firstSeqForBatch] = createBatchedMotionEvent(requestedFrameTime, messages);
554         if (motion != nullptr) {
555             LOG_ALWAYS_FATAL_IF(!firstSeqForBatch.has_value());
556             mCallbacks.onMotionEvent(std::move(motion), *firstSeqForBatch);
557             producedEvents = true;
558         } else {
559             // This is OK, it just means that the requestedFrameTime is too old (all events that we
560             // have pending are in the future of the requestedFrameTime). Maybe print a warning? If
561             // there are multiple devices active though, this might be normal and can just be
562             // ignored, unless none of them resulted in any consumption (in that case, this function
563             // would already return "false" so we could just leave it up to the caller).
564         }
565 
566         if (deviceId.has_value()) {
567             // We already consumed events for this device. Break here to prevent iterating over the
568             // other devices.
569             break;
570         }
571     }
572     std::erase_if(mBatches, [](const auto& pair) { return pair.second.empty(); });
573     return producedEvents;
574 }
575 
consumeBatchedInputEvents(std::optional<nsecs_t> requestedFrameTime)576 bool InputConsumerNoResampling::consumeBatchedInputEvents(
577         std::optional<nsecs_t> requestedFrameTime) {
578     return consumeBatchedInputEvents(/*deviceId=*/std::nullopt, requestedFrameTime);
579 }
580 
ensureCalledOnLooperThread(const char * func) const581 void InputConsumerNoResampling::ensureCalledOnLooperThread(const char* func) const {
582     sp<Looper> callingThreadLooper = Looper::getForThread();
583     if (callingThreadLooper != mLooper) {
584         LOG(FATAL) << "The function " << func << " can only be called on the looper thread";
585     }
586 }
587 
dump() const588 std::string InputConsumerNoResampling::dump() const {
589     ensureCalledOnLooperThread(__func__);
590     std::string out;
591     if (mOutboundQueue.empty()) {
592         out += "mOutboundQueue: <empty>\n";
593     } else {
594         out += "mOutboundQueue:\n";
595         // Make a copy of mOutboundQueue for printing destructively. Unfortunately std::queue
596         // doesn't provide a good way to iterate over the entire container.
597         std::queue<InputMessage> tmpQueue = mOutboundQueue;
598         while (!tmpQueue.empty()) {
599             out += std::string("  ") + outboundMessageToString(tmpQueue.front()) + "\n";
600             tmpQueue.pop();
601         }
602     }
603 
604     if (mBatches.empty()) {
605         out += "mBatches: <empty>\n";
606     } else {
607         out += "mBatches:\n";
608         for (const auto& [deviceId, messages] : mBatches) {
609             out += "  Device id ";
610             out += std::to_string(deviceId);
611             out += ":\n";
612             // Make a copy of mOutboundQueue for printing destructively. Unfortunately std::queue
613             // doesn't provide a good way to iterate over the entire container.
614             std::queue<InputMessage> tmpQueue = messages;
615             while (!tmpQueue.empty()) {
616                 LOG_ALWAYS_FATAL_IF(tmpQueue.front().header.type != InputMessage::Type::MOTION);
617                 std::unique_ptr<MotionEvent> motion = createMotionEvent(tmpQueue.front());
618                 out += std::string("    ") + streamableToString(*motion) + "\n";
619                 tmpQueue.pop();
620             }
621         }
622     }
623 
624     return out;
625 }
626 
627 } // namespace android
628