• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "sensor_data_processer.h"
17 
18 #include <cinttypes>
19 #include <sys/prctl.h>
20 #include <sys/socket.h>
21 
22 #ifdef HIVIEWDFX_HISYSEVENT_ENABLE
23 #include "hisysevent.h"
24 #endif // HIVIEWDFX_HISYSEVENT_ENABLE
25 #include "print_sensor_data.h"
26 
27 #undef LOG_TAG
28 #define LOG_TAG "SensorDataProcesser"
29 
30 namespace OHOS {
31 namespace Sensors {
32 using namespace OHOS::HiviewDFX;
33 
34 namespace {
35 const std::string SENSOR_REPORT_THREAD_NAME = "OS_SenProducer";
36 } // namespace
37 
SensorDataProcesser(const std::unordered_map<int32_t,Sensor> & sensorMap)38 SensorDataProcesser::SensorDataProcesser(const std::unordered_map<int32_t, Sensor> &sensorMap)
39 {
40     sensorMap_.insert(sensorMap.begin(), sensorMap.end());
41     SEN_HILOGD("sensorMap_.size:%{public}d", int32_t { sensorMap_.size() });
42 }
43 
~SensorDataProcesser()44 SensorDataProcesser::~SensorDataProcesser()
45 {
46     dataCountMap_.clear();
47     sensorMap_.clear();
48 }
49 
SendNoneFifoCacheData(std::unordered_map<int32_t,SensorData> & cacheBuf,sptr<SensorBasicDataChannel> & channel,SensorData & data,uint64_t periodCount)50 void SensorDataProcesser::SendNoneFifoCacheData(std::unordered_map<int32_t, SensorData> &cacheBuf,
51                                                 sptr<SensorBasicDataChannel> &channel, SensorData &data,
52                                                 uint64_t periodCount)
53 {
54     std::vector<SensorData> sendEvents;
55     std::lock_guard<std::mutex> dataCountLock(dataCountMutex_);
56     sendEvents.push_back(data);
57     auto dataCountIt = dataCountMap_.find(data.sensorTypeId);
58     if (dataCountIt == dataCountMap_.end()) {
59         std::vector<sptr<FifoCacheData>> channelFifoList;
60         sptr<FifoCacheData> fifoCacheData = new (std::nothrow) FifoCacheData();
61         CHKPV(fifoCacheData);
62         fifoCacheData->SetChannel(channel);
63         channelFifoList.push_back(fifoCacheData);
64         dataCountMap_.insert(std::make_pair(data.sensorTypeId, channelFifoList));
65         SendRawData(cacheBuf, channel, sendEvents);
66         return;
67     }
68     bool channelExist = false;
69     for (auto fifoIt = dataCountIt->second.begin(); fifoIt != dataCountIt->second.end();) {
70         auto fifoCacheData = *fifoIt;
71         CHKPC(fifoCacheData);
72         auto fifoChannel = fifoCacheData->GetChannel();
73         if (fifoChannel == nullptr) {
74             fifoIt = dataCountIt->second.erase(fifoIt);
75             continue;
76         }
77         ++fifoIt;
78         if (fifoChannel != channel) {
79             continue;
80         }
81         channelExist = true;
82         uint64_t curCount = fifoCacheData->GetPeriodCount();
83         curCount++;
84         fifoCacheData->SetPeriodCount(curCount);
85         if (periodCount != 0 && fifoCacheData->GetPeriodCount() % periodCount != 0UL) {
86             continue;
87         }
88         SendRawData(cacheBuf, channel, sendEvents);
89         fifoCacheData->SetPeriodCount(0);
90         return;
91     }
92     if (!channelExist) {
93         sptr<FifoCacheData> fifoCacheData = new (std::nothrow) FifoCacheData();
94         CHKPV(fifoCacheData);
95         fifoCacheData->SetChannel(channel);
96         dataCountIt->second.push_back(fifoCacheData);
97         SendRawData(cacheBuf, channel, sendEvents);
98     }
99 }
100 
SendFifoCacheData(std::unordered_map<int32_t,SensorData> & cacheBuf,sptr<SensorBasicDataChannel> & channel,SensorData & data,uint64_t periodCount,uint64_t fifoCount)101 void SensorDataProcesser::SendFifoCacheData(std::unordered_map<int32_t, SensorData> &cacheBuf,
102                                             sptr<SensorBasicDataChannel> &channel, SensorData &data,
103                                             uint64_t periodCount, uint64_t fifoCount)
104 {
105     std::lock_guard<std::mutex> dataCountLock(dataCountMutex_);
106     auto dataCountIt = dataCountMap_.find(data.sensorTypeId);
107     // there is no channelFifoList
108     if (dataCountIt == dataCountMap_.end()) {
109         std::vector<sptr<FifoCacheData>> channelFifoList;
110         sptr<FifoCacheData> fifoCacheData = new (std::nothrow) FifoCacheData();
111         CHKPV(fifoCacheData);
112         fifoCacheData->SetChannel(channel);
113         channelFifoList.push_back(fifoCacheData);
114         dataCountMap_.insert(std::make_pair(data.sensorTypeId, channelFifoList));
115         return;
116     }
117     // find channel in channelFifoList
118     bool channelExist = false;
119     for (auto fifoIt = dataCountIt->second.begin(); fifoIt != dataCountIt->second.end();) {
120         auto fifoData = *fifoIt;
121         CHKPC(fifoData);
122         auto fifoChannel = fifoData->GetChannel();
123         if (fifoChannel == nullptr) {
124             fifoIt = dataCountIt->second.erase(fifoIt);
125             continue;
126         }
127         ++fifoIt;
128         if (fifoChannel != channel) {
129             continue;
130         }
131         channelExist = true;
132         uint64_t curCount = fifoData->GetPeriodCount();
133         curCount++;
134         fifoData->SetPeriodCount(curCount);
135         if (fifoData->GetPeriodCount() % periodCount != 0UL) {
136             continue;
137         }
138         fifoData->SetPeriodCount(0);
139         std::vector<SensorData> fifoDataList = fifoData->GetFifoCacheData();
140         fifoDataList.push_back(data);
141         fifoData->SetFifoCacheData(fifoDataList);
142         if ((fifoData->GetFifoCacheData()).size() != fifoCount) {
143             continue;
144         }
145         SendRawData(cacheBuf, channel, fifoData->GetFifoCacheData());
146         fifoData->InitFifoCache();
147         return;
148     }
149     // cannot find channel in channelFifoList
150     if (!channelExist) {
151         sptr<FifoCacheData> fifoCacheData = new (std::nothrow) FifoCacheData();
152         CHKPV(fifoCacheData);
153         fifoCacheData->SetChannel(channel);
154         dataCountIt->second.push_back(fifoCacheData);
155     }
156 }
157 
ReportData(sptr<SensorBasicDataChannel> & channel,SensorData & data)158 void SensorDataProcesser::ReportData(sptr<SensorBasicDataChannel> &channel, SensorData &data)
159 {
160     CHKPV(channel);
161     int32_t sensorId = data.sensorTypeId;
162     if (sensorId == SENSOR_TYPE_ID_HALL_EXT) {
163         PrintSensorData::GetInstance().PrintSensorDataLog("ReportData", data);
164     }
165     auto &cacheBuf = const_cast<std::unordered_map<int32_t, SensorData> &>(channel->GetDataCacheBuf());
166     if (ReportNotContinuousData(cacheBuf, channel, data)) {
167         return;
168     }
169     uint64_t periodCount = clientInfo_.ComputeBestPeriodCount(sensorId, channel);
170     if (periodCount == 0UL) {
171         SEN_HILOGE("periodCount is zero");
172         return;
173     }
174     auto fifoCount = clientInfo_.ComputeBestFifoCount(sensorId, channel);
175     if (fifoCount <= 1) {
176         SendNoneFifoCacheData(cacheBuf, channel, data, periodCount);
177         return;
178     }
179     SendFifoCacheData(cacheBuf, channel, data, periodCount, fifoCount);
180 }
181 
ReportNotContinuousData(std::unordered_map<int32_t,SensorData> & cacheBuf,sptr<SensorBasicDataChannel> & channel,SensorData & data)182 bool SensorDataProcesser::ReportNotContinuousData(std::unordered_map<int32_t, SensorData> &cacheBuf,
183                                                   sptr<SensorBasicDataChannel> &channel, SensorData &data)
184 {
185     int32_t sensorId = data.sensorTypeId;
186     std::lock_guard<std::mutex> sensorLock(sensorMutex_);
187     auto sensor = sensorMap_.find(sensorId);
188     if (sensor == sensorMap_.end()) {
189         SEN_HILOGE("Data's sensorId is not supported");
190         return false;
191     }
192     sensor->second.SetFlags(data.mode);
193     if (((SENSOR_ON_CHANGE & sensor->second.GetFlags()) == SENSOR_ON_CHANGE) ||
194         ((SENSOR_ONE_SHOT & sensor->second.GetFlags()) == SENSOR_ONE_SHOT)) {
195         std::vector<SensorData> sendEvents;
196         sendEvents.push_back(data);
197         if (sensorId == SENSOR_TYPE_ID_HALL_EXT) {
198             PrintSensorData::GetInstance().PrintSensorDataLog("ReportNotContinuousData", data);
199         }
200         SendRawData(cacheBuf, channel, sendEvents);
201         return true;
202     }
203     return false;
204 }
205 
SendRawData(std::unordered_map<int32_t,SensorData> & cacheBuf,sptr<SensorBasicDataChannel> channel,std::vector<SensorData> events)206 void SensorDataProcesser::SendRawData(std::unordered_map<int32_t, SensorData> &cacheBuf,
207                                       sptr<SensorBasicDataChannel> channel, std::vector<SensorData> events)
208 {
209     CHKPV(channel);
210     if (events.empty()) {
211         return;
212     }
213     size_t eventSize = events.size();
214     auto ret = channel->SendData(events.data(), eventSize * sizeof(SensorData));
215     if (ret != ERR_OK) {
216         SEN_HILOGE("Send data failed, ret:%{public}d, sensorId:%{public}d, timestamp:%{public}" PRId64,
217             ret, events[eventSize - 1].sensorTypeId, events[eventSize - 1].timestamp);
218         int32_t sensorId = events[eventSize - 1].sensorTypeId;
219         cacheBuf[sensorId] = events[eventSize - 1];
220     }
221 }
222 
CacheSensorEvent(const SensorData & data,sptr<SensorBasicDataChannel> & channel)223 int32_t SensorDataProcesser::CacheSensorEvent(const SensorData &data, sptr<SensorBasicDataChannel> &channel)
224 {
225     CHKPR(channel, INVALID_POINTER);
226     int32_t ret = ERR_OK;
227     auto &cacheBuf = const_cast<std::unordered_map<int32_t, SensorData> &>(channel->GetDataCacheBuf());
228     int32_t sensorId = data.sensorTypeId;
229     if (sensorId == SENSOR_TYPE_ID_HALL_EXT) {
230         PrintSensorData::GetInstance().PrintSensorDataLog("CacheSensorEvent", data);
231     }
232     auto cacheEvent = cacheBuf.find(sensorId);
233     if (cacheEvent != cacheBuf.end()) {
234         // Try to send the last failed value, if it still fails, replace the previous cache directly
235         const SensorData &cacheData = cacheEvent->second;
236         ret = channel->SendData(&cacheData, sizeof(SensorData));
237         if (ret != ERR_OK) {
238             SEN_HILOGE("retry send cache data failed, ret:%{public}d, sensorId:%{public}d, timestamp:%{public}" PRId64,
239                 ret, cacheData.sensorTypeId, cacheData.timestamp);
240         }
241         ret = channel->SendData(&data, sizeof(SensorData));
242         if (ret != ERR_OK) {
243             SEN_HILOGE("retry send data failed, ret:%{public}d, sensorId:%{public}d, timestamp:%{public}" PRId64,
244                 ret, data.sensorTypeId, data.timestamp);
245             cacheBuf[sensorId] = data;
246         } else {
247             cacheBuf.erase(cacheEvent);
248         }
249     } else {
250         ret = channel->SendData(&data, sizeof(SensorData));
251         if (ret != ERR_OK) {
252             SEN_HILOGE("directly retry failed, ret:%{public}d, sensorId:%{public}d, timestamp:%{public}" PRId64,
253                 ret, data.sensorTypeId, data.timestamp);
254             cacheBuf[sensorId] = data;
255         }
256     }
257     return ret;
258 }
259 
EventFilter(CircularEventBuf & eventsBuf)260 void SensorDataProcesser::EventFilter(CircularEventBuf &eventsBuf)
261 {
262     int32_t sensorId = eventsBuf.circularBuf[eventsBuf.readPos].sensorTypeId;
263     if (sensorId == SENSOR_TYPE_ID_HALL_EXT) {
264         PrintSensorData::GetInstance().PrintSensorDataLog("EventFilter", eventsBuf.circularBuf[eventsBuf.readPos]);
265     }
266     std::vector<sptr<SensorBasicDataChannel>> channelList = clientInfo_.GetSensorChannel(sensorId);
267     for (auto &channel : channelList) {
268         if (!channel->GetSensorStatus()) {
269             SEN_HILOGW("Sensor status is not active");
270             continue;
271         }
272         SendEvents(channel, eventsBuf.circularBuf[eventsBuf.readPos]);
273     }
274 }
275 
ProcessEvents(sptr<ReportDataCallback> dataCallback)276 int32_t SensorDataProcesser::ProcessEvents(sptr<ReportDataCallback> dataCallback)
277 {
278     CHKPR(dataCallback, INVALID_POINTER);
279     std::unique_lock<std::mutex> lk(ISensorHdiConnection::dataMutex_);
280     ISensorHdiConnection::dataCondition_.wait(lk, [this] { return ISensorHdiConnection::dataReady_.load(); });
281     ISensorHdiConnection::dataReady_.store(false);
282     auto &eventsBuf = dataCallback->GetEventData();
283     if (eventsBuf.eventNum <= 0) {
284         SEN_HILOGE("Data cannot be empty");
285         return NO_EVENT;
286     }
287     int32_t eventNum = eventsBuf.eventNum;
288     for (int32_t i = 0; i < eventNum; i++) {
289         EventFilter(eventsBuf);
290         eventsBuf.readPos++;
291         if (eventsBuf.readPos >= CIRCULAR_BUF_LEN) {
292             eventsBuf.readPos = 0;
293         }
294         eventsBuf.eventNum--;
295     }
296     return SUCCESS;
297 }
298 
SendEvents(sptr<SensorBasicDataChannel> & channel,SensorData & data)299 int32_t SensorDataProcesser::SendEvents(sptr<SensorBasicDataChannel> &channel, SensorData &data)
300 {
301     CHKPR(channel, INVALID_POINTER);
302     clientInfo_.UpdateDataQueue(data.sensorTypeId, data);
303     auto &cacheBuf = channel->GetDataCacheBuf();
304     if (cacheBuf.empty()) {
305         ReportData(channel, data);
306     } else {
307         CacheSensorEvent(data, channel);
308     }
309     clientInfo_.StoreEvent(data);
310     return SUCCESS;
311 }
312 
DataThread(sptr<SensorDataProcesser> dataProcesser,sptr<ReportDataCallback> dataCallback)313 int32_t SensorDataProcesser::DataThread(sptr<SensorDataProcesser> dataProcesser, sptr<ReportDataCallback> dataCallback)
314 {
315     CALL_LOG_ENTER;
316     prctl(PR_SET_NAME, SENSOR_REPORT_THREAD_NAME.c_str());
317     do {
318         if (dataProcesser == nullptr || dataCallback == nullptr) {
319             SEN_HILOGE("dataProcesser or dataCallback is nullptr");
320             return INVALID_POINTER;
321         }
322         if (dataProcesser->ProcessEvents(dataCallback) == INVALID_POINTER) {
323             SEN_HILOGE("Callback cannot be null");
324             return INVALID_POINTER;
325         }
326     } while (1);
327 }
328 } // namespace Sensors
329 } // namespace OHOS
330