• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 
16 #include "data_sender_receiver.h"
17 #include "channel_common_definition.h"
18 #include "dtbcollabmgr_log.h"
19 #include "session.h"
20 #include "session_data_header.h"
21 #include "socket.h"
22 #include <chrono>
23 #include <map>
24 #include <memory>
25 #include "securec.h"
26 #include "softbus_error_code.h"
27 
28 namespace OHOS {
29 namespace DistributedCollab {
30 namespace {
31     static constexpr uint16_t PROTOCOL_VERSION = 1;
32     static const std::string TAG = "DSchedCollabDataSenderReceiver";
33 #define GET_SOFTBUS_SESSION_OPTION(socketId, value, valueSize)                                            \
34 do {                                                                                                  \
35     int32_t ret = GetSessionOption(socketId, SESSION_OPTION_MAX_SENDBYTES_SIZE, &(value), valueSize); \
36     if (ret != ERR_OK) {                                                                              \
37         HILOGE("GetSessionOption failed, ret: %{public}d, session: %{public}d",                       \
38             ret, socketId);                                                                           \
39         return GET_SESSION_OPTION_FAILED;                                                             \
40     }                                                                                                 \
41     HILOGD("GetSessionOption succeeded, session: %{public}d, value: %{public}d",                      \
42         socketId, value);                                                                             \
43 } while (0)
44 
45 }
46 
~DataSenderReceiver()47 DataSenderReceiver::~DataSenderReceiver()
48 {
49     HILOGI("destory data sender receiver for %{public}d", socketId_);
50     DeInit();
51 }
52 
Init()53 void DataSenderReceiver::Init()
54 {
55     std::call_once(initFlag_, [this] {
56         HILOGI("start init data sender receiver for %{public}d", socketId_);
57         eventThread_ = std::thread(&DataSenderReceiver::StartEvent, this);
58         std::unique_lock<std::mutex> lock(eventMutex_);
59         eventCon_.wait(lock, [this] { return eventHandler_ != nullptr; });
60     });
61 }
62 
StartEvent()63 void DataSenderReceiver::StartEvent()
64 {
65     HILOGI("StartEvent start");
66     auto runner = AppExecFwk::EventRunner::Create(false);
67     {
68         std::lock_guard<std::mutex> lock(eventMutex_);
69         eventHandler_ = std::make_shared<OHOS::AppExecFwk::EventHandler>(runner);
70     }
71     eventCon_.notify_one();
72     runner->Run();
73     HILOGI("StartEvent end");
74 }
75 
DeInit()76 void DataSenderReceiver::DeInit()
77 {
78     HILOGI("start deinit data sender receiver for %{public}d", socketId_);
79     Shutdown(socketId_);
80     // stop all task
81     if (eventHandler_ != nullptr) {
82         eventHandler_->GetEventRunner()->Stop();
83         if (eventThread_.joinable()) {
84             eventThread_.join();
85         }
86         eventHandler_ = nullptr;
87     } else {
88         HILOGE("eventHandler_ is nullptr");
89     }
90     HILOGI("end");
91 }
92 
SendStreamData(const std::shared_ptr<AVTransStreamData> & sendData)93 int32_t DataSenderReceiver::SendStreamData(const std::shared_ptr<AVTransStreamData>& sendData)
94 {
95     const StreamData data = {
96         .buf = reinterpret_cast<char*>(sendData->StreamData()->Data()),
97         .bufLen = sendData->StreamData()->Size()
98     };
99     cJSON* extInfo = sendData->SerializeStreamDataExt();
100     char* jsonString = cJSON_PrintUnformatted(extInfo);
101     if (jsonString == nullptr) {
102         HILOGE("Failed to generate JSON string.");
103         return ERR_JSON_GENERATION_FAILED;
104     }
105     const StreamData ext = {
106         .buf = jsonString,
107         .bufLen = strlen(jsonString)
108     };
109     const StreamFrameInfo info = {
110         .frameType = 0,
111         .timeStamp = GetNowTimeStampUs(),
112         .seqNum = 0,
113         .seqSubNum = 0,
114         .level = 0,
115         .bitMap = 0,
116         .tvCount = 0
117     };
118     int32_t ret = SendStream(socketId_, &data, &ext, &info);
119     if (ret != SOFTBUS_OK) {
120         HILOGE("send stream data failed, %{public}d", socketId_);
121         cJSON_Delete(extInfo);
122         cJSON_free(jsonString);
123         return ret;
124     }
125     cJSON_Delete(extInfo);
126     cJSON_free(jsonString);
127     return ERR_OK;
128 }
129 
SendMessageData(const std::shared_ptr<AVTransDataBuffer> & sendData)130 int32_t DataSenderReceiver::SendMessageData(const std::shared_ptr<AVTransDataBuffer>& sendData)
131 {
132     HILOGI("start to send message, %{public}u", static_cast<uint32_t>(sendData->Size()));
133     Init();
134     if (eventHandler_ == nullptr) {
135         HILOGE("init eventhandler failed");
136         return NULL_EVENT_HANDLER;
137     }
138     if (sendData->Size() > MAX_SEND_MESSAGE_SIZE) {
139         HILOGE("too large send message");
140         return DATA_SIZE_EXCEED_LIMIT;
141     }
142     int32_t socketId = socketId_;
143     auto func = [this, socketId, sendData]() {
144         uint32_t dataSize = static_cast<uint32_t>(sendData->Size());
145         HILOGI("begin to send msg by softbus %{public}d:%{public}u", socketId, dataSize);
146         int32_t ret = SendMessage(socketId, sendData->Data(), sendData->Size());
147         HILOGI("finish send msg by softbus %{public}d:%{public}u, ret=%{public}d", socketId, dataSize, ret);
148     };
149     if (eventHandler_->PostTask(func, AppExecFwk::EventQueue::Priority::HIGH)) {
150         return ERR_OK;
151     }
152     HILOGI("finish send msg to task queue for %{public}d:%{public}u", socketId,
153         static_cast<uint32_t>(sendData->Size()));
154     return POST_TASK_FAILED;
155 }
156 
SendFileData(const std::vector<std::string> & sFiles,const std::vector<std::string> & dFiles)157 int32_t DataSenderReceiver::SendFileData(const std::vector<std::string>& sFiles,
158     const std::vector<std::string>& dFiles)
159 {
160     HILOGI("start to send file");
161     const char* sFileList[sFiles.size()];
162     for (size_t i = 0; i < sFiles.size(); ++i) {
163         sFileList[i] = sFiles[i].c_str();
164     }
165     const char* dFileList[dFiles.size()];
166     for (size_t i = 0; i < dFiles.size(); ++i) {
167         dFileList[i] = dFiles[i].c_str();
168     }
169     return SendFile(socketId_, sFileList, dFileList, static_cast<uint32_t>(sFiles.size()));
170 }
171 
SendBytesData(const std::shared_ptr<AVTransDataBuffer> & sendData)172 int32_t DataSenderReceiver::SendBytesData(const std::shared_ptr<AVTransDataBuffer>& sendData)
173 {
174     int32_t dataType = static_cast<int32_t>(ChannelDataType::BYTES);
175     return SendUnpackData(sendData, dataType);
176 }
177 
SendUnpackData(const std::shared_ptr<AVTransDataBuffer> & sendData,const int32_t dataType)178 int32_t DataSenderReceiver::SendUnpackData(const std::shared_ptr<AVTransDataBuffer>& sendData,
179     const int32_t dataType)
180 {
181     HILOGI("start to send bytes");
182     uint32_t maxSendSize = 0;
183     GET_SOFTBUS_SESSION_OPTION(socketId_, maxSendSize, static_cast<uint32_t>(sizeof(maxSendSize)));
184 
185     if (sendData->Size() + SessionDataHeader::HEADER_LEN <= maxSendSize) {
186         return SendAllPackets(sendData, dataType);
187     }
188     const uint8_t* dataHeader = sendData->Data();
189     const uint8_t* const dataEnd = sendData->Data() + sendData->Size();
190     uint8_t* current = const_cast<uint8_t*>(dataHeader);
191     uint32_t totalLen = sendData->Size();
192     uint32_t packetLen = maxSendSize;
193     uint32_t payloadLen = packetLen - SessionDataHeader::HEADER_LEN;
194     uint16_t seqNum = 0;
195     uint16_t subSeq = 0;
196     SessionDataHeader headerPara(
197         PROTOCOL_VERSION,
198         FRAG_TYPE::FRAG_START,
199         dataType,
200         seqNum,
201         totalLen,
202         packetLen,
203         payloadLen,
204         subSeq);
205 
206     int32_t ret = DoSendPacket(headerPara, current, payloadLen);
207     if (ret != ERR_OK) {
208         return ret;
209     }
210     current += payloadLen;
211     while (current < dataEnd) {
212         GET_SOFTBUS_SESSION_OPTION(socketId_, maxSendSize, static_cast<uint32_t>(sizeof(maxSendSize)));
213         headerPara.packetLen_ = maxSendSize;
214         headerPara.payloadLen_ = maxSendSize - SessionDataHeader::HEADER_LEN;
215         headerPara.fragFlag_ = dataEnd - current > payloadLen ? FRAG_TYPE::FRAG_MID : FRAG_TYPE::FRAG_END;
216         headerPara.subSeq_++;
217         ret = DoSendPacket(headerPara, current, payloadLen);
218         if (ret != ERR_OK) {
219             return ret;
220         }
221         current += payloadLen;
222     }
223     HILOGI("finish send all bytes by packet");
224     return ERR_OK;
225 }
226 
SendAllPackets(const std::shared_ptr<AVTransDataBuffer> sendData,const int32_t dataType)227 int32_t DataSenderReceiver::SendAllPackets(const std::shared_ptr<AVTransDataBuffer> sendData,
228     const int32_t dataType)
229 {
230     HILOGI("send all data");
231     uint8_t* current = sendData->Data();
232     uint32_t totalLen = sendData->Size() + SessionDataHeader::HEADER_LEN;
233     uint32_t packetLen = sendData->Size() + SessionDataHeader::HEADER_LEN;
234     uint32_t payloadLen = packetLen - SessionDataHeader::HEADER_LEN;
235     uint16_t subSeq = 0;
236     uint16_t seqNum = 0;
237 
238     int32_t ret = ERR_OK;
239     SessionDataHeader headerPara(
240         PROTOCOL_VERSION,
241         FRAG_TYPE::FRAG_START_END,
242         dataType,
243         seqNum,
244         totalLen,
245         packetLen,
246         payloadLen,
247         subSeq
248     );
249     ret = DoSendPacket(headerPara, current, payloadLen);
250     if (ret != ERR_OK) {
251         return ret;
252     }
253     HILOGI("finish send all bytes");
254     return ERR_OK;
255 }
256 
DoSendPacket(SessionDataHeader & headerPara,const uint8_t * dataHeader,const uint32_t dataLen)257 int32_t DataSenderReceiver::DoSendPacket(SessionDataHeader& headerPara,
258     const uint8_t* dataHeader, const uint32_t dataLen)
259 {
260     HILOGI("start to send packet by softbus");
261     auto headerBuffer = headerPara.Serialize();
262     auto sendBuffer = std::make_unique<AVTransDataBuffer>(SessionDataHeader::HEADER_LEN + dataLen);
263     uint8_t* header = sendBuffer->Data();
264 
265     int32_t ret = ERR_OK;
266     // copy header
267     ret = memcpy_s(header, sendBuffer->Size(),
268         headerBuffer->Data(), SessionDataHeader::HEADER_LEN);
269     if (ret != EOK) {
270         HILOGE("Write header failed");
271         return WRITE_SESSION_HEADER_FAILED;
272     }
273     // copy data
274     ret = memcpy_s(header + SessionDataHeader::HEADER_LEN,
275         sendBuffer->Size() - SessionDataHeader::HEADER_LEN,
276         dataHeader, dataLen);
277     if (ret != EOK) {
278         HILOGE("Write data failed");
279         return WRITE_SEND_DATA_BUFFER_FAILED;
280     }
281     ret = SendBytes(socketId_, sendBuffer->Data(), sendBuffer->Size());
282     if (ret != SOFTBUS_OK) {
283         HILOGE("Send data buffer failed");
284         return SEND_DATA_BY_SOFTBUS_FAILED;
285     }
286     return ret;
287 }
288 
GetNowTimeStampUs()289 inline int64_t DataSenderReceiver::GetNowTimeStampUs()
290 {
291     std::chrono::microseconds nowUs = std::chrono::duration_cast<std::chrono::microseconds>(
292         std::chrono::system_clock::now().time_since_epoch());
293     return nowUs.count();
294 }
295 
PackRecvPacketData(const uint8_t * header,const uint32_t dataLen)296 int32_t DataSenderReceiver::PackRecvPacketData(const uint8_t* header, const uint32_t dataLen)
297 {
298     auto headerPara = SessionDataHeader::Deserialize(header, dataLen);
299     if (!headerPara) {
300         HILOGE("read session header from buffer failed");
301         return WRITE_SESSION_HEADER_FAILED;
302     }
303     SessionDataHeader& sessionHeader = *headerPara;
304     int32_t ret = CheckRecvSessionHeader(sessionHeader);
305     if (ret != ERR_OK) {
306         HILOGE("check session header failed");
307         return ret;
308     }
309     // pack recv data
310     uint8_t* dataHeader = const_cast<uint8_t*>(header);
311     switch (sessionHeader.fragFlag_) {
312         case FRAG_TYPE::FRAG_START_END:
313             ret = ProcessAllPacketRecv(dataHeader, dataLen, sessionHeader);
314             break;
315         case FRAG_TYPE::FRAG_START:
316             ret = ProcessStartPacketRecv(dataHeader, dataLen, sessionHeader);
317             break;
318         case FRAG_TYPE::FRAG_MID:
319             ret = ProcessMidPacketRecv(dataHeader, dataLen, sessionHeader);
320             break;
321         case FRAG_TYPE::FRAG_END:
322             ret = ProcessEndPacketRecv(dataHeader, dataLen, sessionHeader);
323             break;
324         default:
325             HILOGE("invalid flag type, %{public}d", static_cast<uint32_t>(sessionHeader.fragFlag_));
326             return INVALID_SESSION_HEADER_FLAG_TYPE;
327     }
328     return ret;
329 }
330 
CheckRecvSessionHeader(const SessionDataHeader & headerPara)331 int32_t DataSenderReceiver::CheckRecvSessionHeader(const SessionDataHeader& headerPara)
332 {
333     if (nowSeqNum_ != headerPara.seqNum_) {
334         HILOGE("seq error, nowSeq: %{public}d, actualSeq: %{public}d, sessionId: %{public}d",
335             nowSeqNum_, headerPara.seqNum_, socketId_);
336         return INVALID_SESSION_HEADER_SEQ_NUM;
337     }
338     if (nowSubSeq_ == 0 && headerPara.subSeq_ == 0) {
339         return ERR_OK;
340     }
341     if (nowSubSeq_ + 1 != headerPara.subSeq_) {
342         HILOGE("subSeq error, nowSeq: %{public}d, actualSeq: %{public}d, sessionId: %{public}d",
343             nowSubSeq_, headerPara.subSeq_, socketId_);
344         return INVALID_SESSION_HEADER_SUB_SEQ;
345     }
346     return ERR_OK;
347 }
348 
ProcessAllPacketRecv(const uint8_t * data,const uint32_t dataLen,const SessionDataHeader & headerPara)349 int32_t DataSenderReceiver::ProcessAllPacketRecv(const uint8_t* data, const uint32_t dataLen,
350     const SessionDataHeader& headerPara)
351 {
352     if (packBuffer_ != nullptr || isWaiting_) {
353         HILOGE("recv start data packet but buffer not empty or still waiting");
354         ResetFlag();
355         return FLAG_TYPE_NOT_MATCH_BUFFER_STATE;
356     }
357     int32_t ret = WriteRecvBytesDataToBuffer(data, dataLen, headerPara);
358     if (ret != ERR_OK) {
359         HILOGE("write payload to buffer failed");
360         ResetFlag();
361         return WRITE_PAYLOAD_TO_BUFFER_FAILED;
362     }
363     return ERR_OK;
364 }
365 
ProcessStartPacketRecv(const uint8_t * data,const uint32_t dataLen,const SessionDataHeader & headerPara)366 int32_t DataSenderReceiver::ProcessStartPacketRecv(const uint8_t* data, const uint32_t dataLen,
367     const SessionDataHeader& headerPara)
368 {
369     if (packBuffer_ != nullptr || isWaiting_) {
370         HILOGE("recv start data packet but buffer not empty or still waiting");
371         ResetFlag();
372         return FLAG_TYPE_NOT_MATCH_BUFFER_STATE;
373     }
374     int32_t ret = WriteRecvBytesDataToBuffer(data, dataLen, headerPara);
375     if (ret != ERR_OK) {
376         ResetFlag();
377         return ret;
378     }
379     isWaiting_ = true;
380     nowSeqNum_ = headerPara.seqNum_;
381     nowSubSeq_ = headerPara.subSeq_;
382     return ERR_OK;
383 }
384 
ProcessMidPacketRecv(const uint8_t * data,const uint32_t dataLen,const SessionDataHeader & headerPara)385 int32_t DataSenderReceiver::ProcessMidPacketRecv(const uint8_t* data,
386     const uint32_t dataLen, const SessionDataHeader& headerPara)
387 {
388     if (packBuffer_ == nullptr || !isWaiting_) {
389         HILOGE("recv mid data packet but buffer empty or end waiting");
390         ResetFlag();
391         return FLAG_TYPE_NOT_MATCH_BUFFER_STATE;
392     }
393     int32_t ret = WriteRecvBytesDataToBuffer(data, dataLen, headerPara);
394     if (ret != ERR_OK) {
395         ResetFlag();
396         return ret;
397     }
398     nowSeqNum_ = headerPara.seqNum_;
399     nowSubSeq_ = headerPara.subSeq_;
400     return ERR_OK;
401 }
402 
ProcessEndPacketRecv(const uint8_t * data,const uint32_t dataLen,const SessionDataHeader & headerPara)403 int32_t DataSenderReceiver::ProcessEndPacketRecv(const uint8_t* data,
404     const uint32_t dataLen, const SessionDataHeader& headerPara)
405 {
406     if (packBuffer_ == nullptr || !isWaiting_) {
407         HILOGE("recv end data packet but buffer empty or end waiting");
408         ResetFlag();
409         return FLAG_TYPE_NOT_MATCH_BUFFER_STATE;
410     }
411     int32_t ret = WriteRecvBytesDataToBuffer(data, dataLen, headerPara);
412     if (ret != ERR_OK) {
413         ResetFlag();
414         return ret;
415     }
416     isWaiting_ = false;
417     return ret;
418 }
419 
WriteRecvBytesDataToBuffer(const uint8_t * data,const uint32_t dataLen,const SessionDataHeader & headerPara)420 int32_t DataSenderReceiver::WriteRecvBytesDataToBuffer(const uint8_t* data,
421     const uint32_t dataLen, const SessionDataHeader& headerPara)
422 {
423     uint8_t* header = const_cast<uint8_t*>(data);
424     uint8_t* dataHeader = header + (headerPara.packetLen_ - headerPara.payloadLen_);
425     if (packBuffer_ == nullptr) {
426         packBuffer_ = std::make_unique<AVTransDataBuffer>(headerPara.totalLen_);
427         currentPos = packBuffer_->Data();
428     } else if (packBuffer_->Size() != headerPara.totalLen_) {
429         HILOGE("recv session header totalLen inconsistent");
430         return INVALID_SESSION_HEADER_TOTAL_LEN;
431     }
432 
433     int32_t ret = memcpy_s(currentPos,
434         packBuffer_->Size() - (currentPos - packBuffer_->Data()),
435         dataHeader, headerPara.payloadLen_);
436     if (ret != EOK) {
437         HILOGE("write payload to buffer failed");
438         return WRITE_PAYLOAD_TO_BUFFER_FAILED;
439     }
440     currentPos += headerPara.payloadLen_;
441     return ret;
442 }
443 
GetPacketedData()444 std::shared_ptr<AVTransDataBuffer> DataSenderReceiver::GetPacketedData()
445 {
446     if (isDataReady()) {
447         packBuffer_->SetRange(0, currentPos - packBuffer_->Data());
448         auto bytesData = std::shared_ptr<AVTransDataBuffer>(std::move(packBuffer_));
449         ResetFlag();
450         return bytesData;
451     }
452     return nullptr;
453 }
454 
isDataReady()455 inline bool DataSenderReceiver::isDataReady()
456 {
457     return !isWaiting_ && packBuffer_ != nullptr;
458 }
459 
ResetFlag()460 inline void DataSenderReceiver::ResetFlag()
461 {
462     isWaiting_ = false;
463     nowSeqNum_ = 0;
464     nowSubSeq_ = 0;
465     nowTotalLen_ = 0;
466     packBuffer_ = nullptr;
467     currentPos = nullptr;
468 }
469 } // namespace DistributedCollab
470 } // namespace OHOS