1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sensor_data_processer.h"
17
18 #include <sys/socket.h>
19 #include <thread>
20
21 #include "hisysevent.h"
22 #include "permission_util.h"
23 #include "securec.h"
24 #include "sensor_basic_data_channel.h"
25 #include "sensor_catalog.h"
26 #include "sensors_errors.h"
27 #include "system_ability_definition.h"
28
29 namespace OHOS {
30 namespace Sensors {
31 using namespace OHOS::HiviewDFX;
32
33 namespace {
34 constexpr HiLogLabel LABEL = { LOG_CORE, SENSOR_LOG_DOMAIN, "SensorDataProcesser" };
35
36 enum {
37 FIRST_INDEX = 1,
38 SECOND_INDEX = 2,
39 THIRD_INDEX = 3,
40 };
41
42 constexpr uint32_t SENSOR_INDEX_SHIFT = 8;
43 constexpr uint32_t SENSOR_TYPE_SHIFT = 16;
44 constexpr uint32_t SENSOR_CATAGORY_SHIFT = 24;
45
46 constexpr uint32_t FLUSH_COMPLETE_ID = ((uint32_t)OTHER << SENSOR_CATAGORY_SHIFT) |
47 ((uint32_t)SENSOR_TYPE_FLUSH << SENSOR_TYPE_SHIFT) |
48 ((uint32_t)FIRST_INDEX << SENSOR_INDEX_SHIFT);
49 } // namespace
50
SensorDataProcesser(const std::unordered_map<uint32_t,Sensor> & sensorMap)51 SensorDataProcesser::SensorDataProcesser(const std::unordered_map<uint32_t, Sensor> &sensorMap)
52 {
53 sensorMap_.insert(sensorMap.begin(), sensorMap.end());
54 SEN_HILOGD("sensorMap_.size:%{public}d", int32_t { sensorMap_.size() });
55 }
56
~SensorDataProcesser()57 SensorDataProcesser::~SensorDataProcesser()
58 {
59 dataCountMap_.clear();
60 sensorMap_.clear();
61 }
62
SendNoneFifoCacheData(std::unordered_map<uint32_t,SensorEvent> & cacheBuf,sptr<SensorBasicDataChannel> & channel,SensorEvent & event,uint64_t periodCount)63 void SensorDataProcesser::SendNoneFifoCacheData(std::unordered_map<uint32_t, SensorEvent> &cacheBuf,
64 sptr<SensorBasicDataChannel> &channel, SensorEvent &event,
65 uint64_t periodCount)
66 {
67 std::vector<SensorEvent> sendEvents;
68 std::lock_guard<std::mutex> dataCountLock(dataCountMutex_);
69 sendEvents.push_back(event);
70 uint32_t sensorId = static_cast<uint32_t>(event.sensorTypeId);
71 auto dataCountIt = dataCountMap_.find(sensorId);
72 if (dataCountIt == dataCountMap_.end()) {
73 std::vector<sptr<FifoCacheData>> channelFifoList;
74 sptr<FifoCacheData> fifoCacheData = new (std::nothrow) FifoCacheData();
75 CHKPV(fifoCacheData);
76 fifoCacheData->SetChannel(channel);
77 channelFifoList.push_back(fifoCacheData);
78 dataCountMap_.insert(std::make_pair(sensorId, channelFifoList));
79 SendRawData(cacheBuf, channel, sendEvents);
80 return;
81 }
82 bool channelExist = false;
83 for (auto fifoIt = dataCountIt->second.begin(); fifoIt != dataCountIt->second.end();) {
84 auto fifoCacheData = *fifoIt;
85 CHKPC(fifoCacheData);
86 auto fifoChannel = fifoCacheData->GetChannel();
87 if (fifoChannel == nullptr) {
88 fifoIt = dataCountIt->second.erase(fifoIt);
89 continue;
90 }
91 ++fifoIt;
92 if (fifoChannel != channel) {
93 continue;
94 }
95 channelExist = true;
96 uint64_t curCount = fifoCacheData->GetPeriodCount();
97 curCount++;
98 fifoCacheData->SetPeriodCount(curCount);
99 if (periodCount != 0 && fifoCacheData->GetPeriodCount() % periodCount != 0UL) {
100 continue;
101 }
102 SendRawData(cacheBuf, channel, sendEvents);
103 fifoCacheData->SetPeriodCount(0);
104 return;
105 }
106 if (!channelExist) {
107 sptr<FifoCacheData> fifoCacheData = new (std::nothrow) FifoCacheData();
108 CHKPV(fifoCacheData);
109 fifoCacheData->SetChannel(channel);
110 dataCountIt->second.push_back(fifoCacheData);
111 SendRawData(cacheBuf, channel, sendEvents);
112 }
113 }
114
SendFifoCacheData(std::unordered_map<uint32_t,SensorEvent> & cacheBuf,sptr<SensorBasicDataChannel> & channel,SensorEvent & event,uint64_t periodCount,uint64_t fifoCount)115 void SensorDataProcesser::SendFifoCacheData(std::unordered_map<uint32_t, SensorEvent> &cacheBuf,
116 sptr<SensorBasicDataChannel> &channel, SensorEvent &event,
117 uint64_t periodCount, uint64_t fifoCount)
118 {
119 uint32_t sensorId = static_cast<uint32_t>(event.sensorTypeId);
120 std::lock_guard<std::mutex> dataCountLock(dataCountMutex_);
121 auto dataCountIt = dataCountMap_.find(sensorId);
122 // there is no channelFifoList
123 if (dataCountIt == dataCountMap_.end()) {
124 std::vector<sptr<FifoCacheData>> channelFifoList;
125 sptr<FifoCacheData> fifoCacheData = new (std::nothrow) FifoCacheData();
126 CHKPV(fifoCacheData);
127 fifoCacheData->SetChannel(channel);
128 channelFifoList.push_back(fifoCacheData);
129 dataCountMap_.insert(std::make_pair(sensorId, channelFifoList));
130 return;
131 }
132 // find channel in channelFifoList
133 bool channelExist = false;
134 for (auto fifoIt = dataCountIt->second.begin(); fifoIt != dataCountIt->second.end();) {
135 auto fifoData = *fifoIt;
136 CHKPC(fifoData);
137 auto fifoChannel = fifoData->GetChannel();
138 if (fifoChannel == nullptr) {
139 fifoIt = dataCountIt->second.erase(fifoIt);
140 continue;
141 }
142 ++fifoIt;
143 if (fifoChannel != channel) {
144 continue;
145 }
146 channelExist = true;
147 uint64_t curCount = fifoData->GetPeriodCount();
148 curCount++;
149 fifoData->SetPeriodCount(curCount);
150 if (fifoData->GetPeriodCount() % periodCount != 0UL) {
151 continue;
152 }
153 fifoData->SetPeriodCount(0);
154 std::vector<SensorEvent> fifoDataList = fifoData->GetFifoCacheData();
155 fifoDataList.push_back(event);
156 fifoData->SetFifoCacheData(fifoDataList);
157 if ((fifoData->GetFifoCacheData()).size() != fifoCount) {
158 continue;
159 }
160 SendRawData(cacheBuf, channel, fifoData->GetFifoCacheData());
161 fifoData->InitFifoCache();
162 return;
163 }
164 // cannot find channel in channelFifoList
165 if (!channelExist) {
166 sptr<FifoCacheData> fifoCacheData = new (std::nothrow) FifoCacheData();
167 CHKPV(fifoCacheData);
168 fifoCacheData->SetChannel(channel);
169 dataCountIt->second.push_back(fifoCacheData);
170 }
171 }
172
ReportData(sptr<SensorBasicDataChannel> & channel,SensorEvent & event)173 void SensorDataProcesser::ReportData(sptr<SensorBasicDataChannel> &channel, SensorEvent &event)
174 {
175 CHKPV(channel);
176 uint32_t sensorId = static_cast<uint32_t>(event.sensorTypeId);
177 if (sensorId == FLUSH_COMPLETE_ID) {
178 sensorId = static_cast<uint32_t>(event.sensorTypeId);
179 }
180 auto &cacheBuf = const_cast<std::unordered_map<uint32_t, SensorEvent> &>(channel->GetDataCacheBuf());
181 if (ReportNotContinuousData(cacheBuf, channel, event)) {
182 return;
183 }
184 uint64_t periodCount = clientInfo_.ComputeBestPeriodCount(sensorId, channel);
185 if (periodCount == 0UL) {
186 return;
187 }
188 auto fifoCount = clientInfo_.ComputeBestFifoCount(sensorId, channel);
189 if (fifoCount <= 0) {
190 SendNoneFifoCacheData(cacheBuf, channel, event, periodCount);
191 return;
192 }
193 SendFifoCacheData(cacheBuf, channel, event, periodCount, fifoCount);
194 }
195
ReportNotContinuousData(std::unordered_map<uint32_t,SensorEvent> & cacheBuf,sptr<SensorBasicDataChannel> & channel,SensorEvent & event)196 bool SensorDataProcesser::ReportNotContinuousData(std::unordered_map<uint32_t, SensorEvent> &cacheBuf,
197 sptr<SensorBasicDataChannel> &channel, SensorEvent &event)
198 {
199 uint32_t sensorId = static_cast<uint32_t>(event.sensorTypeId);
200 if (sensorId == FLUSH_COMPLETE_ID) {
201 sensorId = static_cast<uint32_t>(event.sensorTypeId);
202 }
203 std::lock_guard<std::mutex> sensorLock(sensorMutex_);
204 auto sensor = sensorMap_.find(sensorId);
205 if (sensor == sensorMap_.end()) {
206 SEN_HILOGE("data's sensorId is not supported");
207 return false;
208 }
209 sensor->second.SetFlags(event.mode);
210 if (((SENSOR_ON_CHANGE & sensor->second.GetFlags()) == SENSOR_ON_CHANGE) ||
211 ((SENSOR_ONE_SHOT & sensor->second.GetFlags()) == SENSOR_ONE_SHOT)) {
212 std::vector<SensorEvent> sendEvents;
213 sendEvents.push_back(event);
214 SendRawData(cacheBuf, channel, sendEvents);
215 return true;
216 }
217 return false;
218 }
219
SendRawData(std::unordered_map<uint32_t,SensorEvent> & cacheBuf,sptr<SensorBasicDataChannel> channel,std::vector<SensorEvent> event)220 void SensorDataProcesser::SendRawData(std::unordered_map<uint32_t, SensorEvent> &cacheBuf,
221 sptr<SensorBasicDataChannel> channel, std::vector<SensorEvent> event)
222 {
223 CHKPV(channel);
224 if (event.empty()) {
225 return;
226 }
227 size_t eventSize = event.size();
228 std::vector<TransferSensorEvents> transferEvents;
229 for (size_t i = 0; i < eventSize; i++) {
230 TransferSensorEvents transferEvent = {
231 .sensorTypeId = event[i].sensorTypeId,
232 .version = event[i].version,
233 .timestamp = event[i].timestamp,
234 .option = event[i].option,
235 .mode = event[i].mode,
236 .dataLen = event[i].dataLen
237 };
238 errno_t ret = memcpy_s(transferEvent.data, SENSOR_MAX_LENGTH, event[i].data, event[i].dataLen);
239 if (ret != EOK) {
240 SEN_HILOGE("copy data failed");
241 return;
242 }
243 transferEvents.push_back(transferEvent);
244 }
245 auto ret = channel->SendData(transferEvents.data(), eventSize * sizeof(TransferSensorEvents));
246 if (ret != ERR_OK) {
247 SEN_HILOGE("send data failed, ret:%{public}d", ret);
248 uint32_t sensorId = static_cast<uint32_t>(event[eventSize - 1].sensorTypeId);
249 if (sensorId == FLUSH_COMPLETE_ID) {
250 sensorId = static_cast<uint32_t>(event[eventSize - 1].sensorTypeId);
251 }
252 cacheBuf[sensorId] = event[eventSize - 1];
253 }
254 }
255
CacheSensorEvent(const SensorEvent & event,sptr<SensorBasicDataChannel> & channel)256 int32_t SensorDataProcesser::CacheSensorEvent(const SensorEvent &event, sptr<SensorBasicDataChannel> &channel)
257 {
258 CHKPR(channel, INVALID_POINTER);
259 int32_t ret = ERR_OK;
260 auto &cacheBuf = const_cast<std::unordered_map<uint32_t, SensorEvent> &>(channel->GetDataCacheBuf());
261 uint32_t sensorId = static_cast<uint32_t>(event.sensorTypeId);
262 if (sensorId == FLUSH_COMPLETE_ID) {
263 sensorId = static_cast<uint32_t>(event.sensorTypeId);
264 }
265 auto cacheEvent = cacheBuf.find(sensorId);
266 if (cacheEvent != cacheBuf.end()) {
267 // Try to send the last failed value, if it still fails, replace the previous cache directly
268 ret = channel->SendData(&cacheEvent->second, sizeof(SensorEvent));
269 if (ret != ERR_OK) {
270 SEN_HILOGE("ret:%{public}d", ret);
271 }
272 ret = channel->SendData(&event, sizeof(SensorEvent));
273 if (ret != ERR_OK) {
274 SEN_HILOGE("ret:%{public}d", ret);
275 cacheBuf[sensorId] = event;
276 } else {
277 cacheBuf.erase(cacheEvent);
278 }
279 } else {
280 ret = channel->SendData(&event, sizeof(SensorEvent));
281 if (ret != ERR_OK) {
282 SEN_HILOGE("ret:%{public}d", ret);
283 cacheBuf[sensorId] = event;
284 }
285 }
286 return ret;
287 }
288
EventFilter(CircularEventBuf & eventsBuf)289 void SensorDataProcesser::EventFilter(CircularEventBuf &eventsBuf)
290 {
291 uint32_t realSensorId = 0;
292 uint32_t sensorId = static_cast<uint32_t>(eventsBuf.circularBuf[eventsBuf.readPos].sensorTypeId);
293 std::vector<sptr<SensorBasicDataChannel>> channelList;
294 if (sensorId == FLUSH_COMPLETE_ID) {
295 realSensorId = static_cast<uint32_t>(eventsBuf.circularBuf[eventsBuf.readPos].sensorTypeId);
296 channelList = clientInfo_.GetSensorChannel(realSensorId);
297 } else {
298 channelList = clientInfo_.GetSensorChannel(sensorId);
299 }
300 auto flushInfo = flushInfo_.GetFlushInfo();
301 std::vector<FlushInfo> flushVec;
302 if (sensorId == FLUSH_COMPLETE_ID) {
303 SEN_HILOGD("sensorId:%{public}u", sensorId);
304 auto it = flushInfo.find(realSensorId);
305 if (it != flushInfo.end()) {
306 flushVec = it->second;
307 for (auto &channel : flushVec) {
308 auto flushChannel = channel.flushChannel.promote();
309 if (flushInfo_.IsFlushChannelValid(channelList, flushChannel)) {
310 SendEvents(flushChannel, eventsBuf.circularBuf[eventsBuf.readPos]);
311 flushInfo_.ClearFlushInfoItem(realSensorId);
312 break;
313 } else {
314 // The channel that store in the flushVec has invalid, so erase this channel directly
315 SEN_HILOGD("clear flush info");
316 flushInfo_.ClearFlushInfoItem(realSensorId);
317 }
318 }
319 }
320 } else {
321 for (auto &channel : channelList) {
322 int32_t index = flushInfo_.GetFlushChannelIndex(flushVec, channel);
323 if (index >= 0) {
324 if (flushVec[index].flushFromEnable) {
325 SEN_HILOGI("flushFromEnable");
326 continue;
327 }
328 }
329 /* if has some suspend flush, but this flush come from the flush function rather than enable,
330 so we need to calling GetSensorStatus to decided whether send this event. */
331 if (channel->GetSensorStatus()) {
332 SendEvents(channel, eventsBuf.circularBuf[eventsBuf.readPos]);
333 }
334 }
335 }
336 }
337
ProcessEvents(sptr<ReportDataCallback> dataCallback)338 int32_t SensorDataProcesser::ProcessEvents(sptr<ReportDataCallback> dataCallback)
339 {
340 CHKPR(dataCallback, INVALID_POINTER);
341 std::unique_lock<std::mutex> lk(ISensorHdiConnection::dataMutex_);
342 ISensorHdiConnection::dataCondition_.wait(lk);
343 auto &eventsBuf = dataCallback->GetEventData();
344 if (eventsBuf.eventNum <= 0) {
345 SEN_HILOGE("data cannot be empty");
346 return NO_EVENT;
347 }
348 int32_t eventNum = eventsBuf.eventNum;
349 for (int32_t i = 0; i < eventNum; i++) {
350 EventFilter(eventsBuf);
351 if (eventsBuf.circularBuf[eventsBuf.readPos].data != nullptr) {
352 delete[] eventsBuf.circularBuf[eventsBuf.readPos].data;
353 eventsBuf.circularBuf[eventsBuf.readPos].data = nullptr;
354 }
355 eventsBuf.readPos++;
356 if (eventsBuf.readPos == CIRCULAR_BUF_LEN) {
357 eventsBuf.readPos = 0;
358 }
359 eventsBuf.eventNum--;
360 }
361 return SUCCESS;
362 }
363
SendEvents(sptr<SensorBasicDataChannel> & channel,SensorEvent & event)364 int32_t SensorDataProcesser::SendEvents(sptr<SensorBasicDataChannel> &channel, SensorEvent &event)
365 {
366 CHKPR(channel, INVALID_POINTER);
367 clientInfo_.UpdateDataQueue(event.sensorTypeId, event);
368 auto &cacheBuf = channel->GetDataCacheBuf();
369 if (cacheBuf.empty()) {
370 ReportData(channel, event);
371 } else {
372 CacheSensorEvent(event, channel);
373 }
374 clientInfo_.StoreEvent(event);
375 return SUCCESS;
376 }
377
DataThread(sptr<SensorDataProcesser> dataProcesser,sptr<ReportDataCallback> dataCallback)378 int32_t SensorDataProcesser::DataThread(sptr<SensorDataProcesser> dataProcesser, sptr<ReportDataCallback> dataCallback)
379 {
380 CALL_LOG_ENTER;
381 do {
382 if (dataProcesser->ProcessEvents(dataCallback) == INVALID_POINTER) {
383 SEN_HILOGE("callback cannot be null");
384 return INVALID_POINTER;
385 }
386 } while (1);
387 }
388 } // namespace Sensors
389 } // namespace OHOS
390