• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "sensor_data_processer.h"
17 
18 #include <cinttypes>
19 #include <sys/prctl.h>
20 #include <sys/socket.h>
21 #include <thread>
22 
23 #include "hisysevent.h"
24 #include "permission_util.h"
25 #include "securec.h"
26 #include "sensor_basic_data_channel.h"
27 #include "sensor_errors.h"
28 #include "system_ability_definition.h"
29 
30 #undef LOG_TAG
31 #define LOG_TAG "SensorDataProcesser"
32 
33 namespace OHOS {
34 namespace Sensors {
35 using namespace OHOS::HiviewDFX;
36 
37 namespace {
38 const std::string SENSOR_REPORT_THREAD_NAME = "OS_SenProducer";
39 } // namespace
40 
SensorDataProcesser(const std::unordered_map<int32_t,Sensor> & sensorMap)41 SensorDataProcesser::SensorDataProcesser(const std::unordered_map<int32_t, Sensor> &sensorMap)
42 {
43     sensorMap_.insert(sensorMap.begin(), sensorMap.end());
44     SEN_HILOGD("sensorMap_.size:%{public}d", int32_t { sensorMap_.size() });
45 }
46 
~SensorDataProcesser()47 SensorDataProcesser::~SensorDataProcesser()
48 {
49     dataCountMap_.clear();
50     sensorMap_.clear();
51 }
52 
SendNoneFifoCacheData(std::unordered_map<int32_t,SensorData> & cacheBuf,sptr<SensorBasicDataChannel> & channel,SensorData & data,uint64_t periodCount)53 void SensorDataProcesser::SendNoneFifoCacheData(std::unordered_map<int32_t, SensorData> &cacheBuf,
54                                                 sptr<SensorBasicDataChannel> &channel, SensorData &data,
55                                                 uint64_t periodCount)
56 {
57     std::vector<SensorData> sendEvents;
58     std::lock_guard<std::mutex> dataCountLock(dataCountMutex_);
59     sendEvents.push_back(data);
60     auto dataCountIt = dataCountMap_.find(data.sensorTypeId);
61     if (dataCountIt == dataCountMap_.end()) {
62         std::vector<sptr<FifoCacheData>> channelFifoList;
63         sptr<FifoCacheData> fifoCacheData = new (std::nothrow) FifoCacheData();
64         CHKPV(fifoCacheData);
65         fifoCacheData->SetChannel(channel);
66         channelFifoList.push_back(fifoCacheData);
67         dataCountMap_.insert(std::make_pair(data.sensorTypeId, channelFifoList));
68         SendRawData(cacheBuf, channel, sendEvents);
69         return;
70     }
71     bool channelExist = false;
72     for (auto fifoIt = dataCountIt->second.begin(); fifoIt != dataCountIt->second.end();) {
73         auto fifoCacheData = *fifoIt;
74         CHKPC(fifoCacheData);
75         auto fifoChannel = fifoCacheData->GetChannel();
76         if (fifoChannel == nullptr) {
77             fifoIt = dataCountIt->second.erase(fifoIt);
78             continue;
79         }
80         ++fifoIt;
81         if (fifoChannel != channel) {
82             continue;
83         }
84         channelExist = true;
85         uint64_t curCount = fifoCacheData->GetPeriodCount();
86         curCount++;
87         fifoCacheData->SetPeriodCount(curCount);
88         if (periodCount != 0 && fifoCacheData->GetPeriodCount() % periodCount != 0UL) {
89             continue;
90         }
91         SendRawData(cacheBuf, channel, sendEvents);
92         fifoCacheData->SetPeriodCount(0);
93         return;
94     }
95     if (!channelExist) {
96         sptr<FifoCacheData> fifoCacheData = new (std::nothrow) FifoCacheData();
97         CHKPV(fifoCacheData);
98         fifoCacheData->SetChannel(channel);
99         dataCountIt->second.push_back(fifoCacheData);
100         SendRawData(cacheBuf, channel, sendEvents);
101     }
102 }
103 
SendFifoCacheData(std::unordered_map<int32_t,SensorData> & cacheBuf,sptr<SensorBasicDataChannel> & channel,SensorData & data,uint64_t periodCount,uint64_t fifoCount)104 void SensorDataProcesser::SendFifoCacheData(std::unordered_map<int32_t, SensorData> &cacheBuf,
105                                             sptr<SensorBasicDataChannel> &channel, SensorData &data,
106                                             uint64_t periodCount, uint64_t fifoCount)
107 {
108     std::lock_guard<std::mutex> dataCountLock(dataCountMutex_);
109     auto dataCountIt = dataCountMap_.find(data.sensorTypeId);
110     // there is no channelFifoList
111     if (dataCountIt == dataCountMap_.end()) {
112         std::vector<sptr<FifoCacheData>> channelFifoList;
113         sptr<FifoCacheData> fifoCacheData = new (std::nothrow) FifoCacheData();
114         CHKPV(fifoCacheData);
115         fifoCacheData->SetChannel(channel);
116         channelFifoList.push_back(fifoCacheData);
117         dataCountMap_.insert(std::make_pair(data.sensorTypeId, channelFifoList));
118         return;
119     }
120     // find channel in channelFifoList
121     bool channelExist = false;
122     for (auto fifoIt = dataCountIt->second.begin(); fifoIt != dataCountIt->second.end();) {
123         auto fifoData = *fifoIt;
124         CHKPC(fifoData);
125         auto fifoChannel = fifoData->GetChannel();
126         if (fifoChannel == nullptr) {
127             fifoIt = dataCountIt->second.erase(fifoIt);
128             continue;
129         }
130         ++fifoIt;
131         if (fifoChannel != channel) {
132             continue;
133         }
134         channelExist = true;
135         uint64_t curCount = fifoData->GetPeriodCount();
136         curCount++;
137         fifoData->SetPeriodCount(curCount);
138         if (fifoData->GetPeriodCount() % periodCount != 0UL) {
139             continue;
140         }
141         fifoData->SetPeriodCount(0);
142         std::vector<SensorData> fifoDataList = fifoData->GetFifoCacheData();
143         fifoDataList.push_back(data);
144         fifoData->SetFifoCacheData(fifoDataList);
145         if ((fifoData->GetFifoCacheData()).size() != fifoCount) {
146             continue;
147         }
148         SendRawData(cacheBuf, channel, fifoData->GetFifoCacheData());
149         fifoData->InitFifoCache();
150         return;
151     }
152     // cannot find channel in channelFifoList
153     if (!channelExist) {
154         sptr<FifoCacheData> fifoCacheData = new (std::nothrow) FifoCacheData();
155         CHKPV(fifoCacheData);
156         fifoCacheData->SetChannel(channel);
157         dataCountIt->second.push_back(fifoCacheData);
158     }
159 }
160 
ReportData(sptr<SensorBasicDataChannel> & channel,SensorData & data)161 void SensorDataProcesser::ReportData(sptr<SensorBasicDataChannel> &channel, SensorData &data)
162 {
163     CHKPV(channel);
164     int32_t sensorId = data.sensorTypeId;
165     auto &cacheBuf = const_cast<std::unordered_map<int32_t, SensorData> &>(channel->GetDataCacheBuf());
166     if (ReportNotContinuousData(cacheBuf, channel, data)) {
167         return;
168     }
169     uint64_t periodCount = clientInfo_.ComputeBestPeriodCount(sensorId, channel);
170     if (periodCount == 0UL) {
171         return;
172     }
173     auto fifoCount = clientInfo_.ComputeBestFifoCount(sensorId, channel);
174     if (fifoCount <= 1) {
175         SendNoneFifoCacheData(cacheBuf, channel, data, periodCount);
176         return;
177     }
178     SendFifoCacheData(cacheBuf, channel, data, periodCount, fifoCount);
179 }
180 
ReportNotContinuousData(std::unordered_map<int32_t,SensorData> & cacheBuf,sptr<SensorBasicDataChannel> & channel,SensorData & data)181 bool SensorDataProcesser::ReportNotContinuousData(std::unordered_map<int32_t, SensorData> &cacheBuf,
182                                                   sptr<SensorBasicDataChannel> &channel, SensorData &data)
183 {
184     int32_t sensorId = data.sensorTypeId;
185     std::lock_guard<std::mutex> sensorLock(sensorMutex_);
186     auto sensor = sensorMap_.find(sensorId);
187     if (sensor == sensorMap_.end()) {
188         SEN_HILOGE("Data's sensorId is not supported");
189         return false;
190     }
191     sensor->second.SetFlags(data.mode);
192     if (((SENSOR_ON_CHANGE & sensor->second.GetFlags()) == SENSOR_ON_CHANGE) ||
193         ((SENSOR_ONE_SHOT & sensor->second.GetFlags()) == SENSOR_ONE_SHOT)) {
194         std::vector<SensorData> sendEvents;
195         sendEvents.push_back(data);
196         SendRawData(cacheBuf, channel, sendEvents);
197         return true;
198     }
199     return false;
200 }
201 
SendRawData(std::unordered_map<int32_t,SensorData> & cacheBuf,sptr<SensorBasicDataChannel> channel,std::vector<SensorData> events)202 void SensorDataProcesser::SendRawData(std::unordered_map<int32_t, SensorData> &cacheBuf,
203                                       sptr<SensorBasicDataChannel> channel, std::vector<SensorData> events)
204 {
205     CHKPV(channel);
206     if (events.empty()) {
207         return;
208     }
209     size_t eventSize = events.size();
210     auto ret = channel->SendData(events.data(), eventSize * sizeof(SensorData));
211     if (ret != ERR_OK) {
212         SEN_HILOGE("Send data failed, ret:%{public}d, sensorId:%{public}d, timestamp:%{public}" PRId64,
213             ret, events[eventSize - 1].sensorTypeId, events[eventSize - 1].timestamp);
214         int32_t sensorId = events[eventSize - 1].sensorTypeId;
215         cacheBuf[sensorId] = events[eventSize - 1];
216     }
217 }
218 
CacheSensorEvent(const SensorData & data,sptr<SensorBasicDataChannel> & channel)219 int32_t SensorDataProcesser::CacheSensorEvent(const SensorData &data, sptr<SensorBasicDataChannel> &channel)
220 {
221     CHKPR(channel, INVALID_POINTER);
222     int32_t ret = ERR_OK;
223     auto &cacheBuf = const_cast<std::unordered_map<int32_t, SensorData> &>(channel->GetDataCacheBuf());
224     int32_t sensorId = data.sensorTypeId;
225     auto cacheEvent = cacheBuf.find(sensorId);
226     if (cacheEvent != cacheBuf.end()) {
227         // Try to send the last failed value, if it still fails, replace the previous cache directly
228         const SensorData &cacheData = cacheEvent->second;
229         ret = channel->SendData(&cacheData, sizeof(SensorData));
230         if (ret != ERR_OK) {
231             SEN_HILOGE("retry send cache data failed, ret:%{public}d, sensorId:%{public}d, timestamp:%{public}" PRId64,
232                 ret, cacheData.sensorTypeId, cacheData.timestamp);
233         }
234         ret = channel->SendData(&data, sizeof(SensorData));
235         if (ret != ERR_OK) {
236             SEN_HILOGE("retry send data failed, ret:%{public}d, sensorId:%{public}d, timestamp:%{public}" PRId64,
237                 ret, data.sensorTypeId, data.timestamp);
238             cacheBuf[sensorId] = data;
239         } else {
240             cacheBuf.erase(cacheEvent);
241         }
242     } else {
243         ret = channel->SendData(&data, sizeof(SensorData));
244         if (ret != ERR_OK) {
245             SEN_HILOGE("directly retry failed, ret:%{public}d, sensorId:%{public}d, timestamp:%{public}" PRId64,
246                 ret, data.sensorTypeId, data.timestamp);
247             cacheBuf[sensorId] = data;
248         }
249     }
250     return ret;
251 }
252 
EventFilter(CircularEventBuf & eventsBuf)253 void SensorDataProcesser::EventFilter(CircularEventBuf &eventsBuf)
254 {
255     int32_t sensorId = eventsBuf.circularBuf[eventsBuf.readPos].sensorTypeId;
256     std::vector<sptr<SensorBasicDataChannel>> channelList = clientInfo_.GetSensorChannel(sensorId);
257     for (auto &channel : channelList) {
258         if (channel->GetSensorStatus()) {
259             SendEvents(channel, eventsBuf.circularBuf[eventsBuf.readPos]);
260         }
261     }
262 }
263 
ProcessEvents(sptr<ReportDataCallback> dataCallback)264 int32_t SensorDataProcesser::ProcessEvents(sptr<ReportDataCallback> dataCallback)
265 {
266     CHKPR(dataCallback, INVALID_POINTER);
267     std::unique_lock<std::mutex> lk(ISensorHdiConnection::dataMutex_);
268     ISensorHdiConnection::dataCondition_.wait(lk);
269     auto &eventsBuf = dataCallback->GetEventData();
270     if (eventsBuf.eventNum <= 0) {
271         SEN_HILOGE("Data cannot be empty");
272         return NO_EVENT;
273     }
274     int32_t eventNum = eventsBuf.eventNum;
275     for (int32_t i = 0; i < eventNum; i++) {
276         EventFilter(eventsBuf);
277 
278         eventsBuf.readPos++;
279         if (eventsBuf.readPos == CIRCULAR_BUF_LEN) {
280             eventsBuf.readPos = 0;
281         }
282         eventsBuf.eventNum--;
283     }
284     return SUCCESS;
285 }
286 
SendEvents(sptr<SensorBasicDataChannel> & channel,SensorData & data)287 int32_t SensorDataProcesser::SendEvents(sptr<SensorBasicDataChannel> &channel, SensorData &data)
288 {
289     CHKPR(channel, INVALID_POINTER);
290     clientInfo_.UpdateDataQueue(data.sensorTypeId, data);
291     auto &cacheBuf = channel->GetDataCacheBuf();
292     if (cacheBuf.empty()) {
293         ReportData(channel, data);
294     } else {
295         CacheSensorEvent(data, channel);
296     }
297     clientInfo_.StoreEvent(data);
298     return SUCCESS;
299 }
300 
DataThread(sptr<SensorDataProcesser> dataProcesser,sptr<ReportDataCallback> dataCallback)301 int32_t SensorDataProcesser::DataThread(sptr<SensorDataProcesser> dataProcesser, sptr<ReportDataCallback> dataCallback)
302 {
303     CALL_LOG_ENTER;
304     prctl(PR_SET_NAME, SENSOR_REPORT_THREAD_NAME.c_str());
305     do {
306         if (dataProcesser == nullptr || dataCallback == nullptr) {
307             SEN_HILOGE("dataProcesser or dataCallback is nullptr");
308             return INVALID_POINTER;
309         }
310         if (dataProcesser->ProcessEvents(dataCallback) == INVALID_POINTER) {
311             SEN_HILOGE("Callback cannot be null");
312             return INVALID_POINTER;
313         }
314     } while (1);
315 }
316 } // namespace Sensors
317 } // namespace OHOS
318