• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 
16 #include "data_sender_receiver.h"
17 #include "channel_common_definition.h"
18 #include "dtbcollabmgr_log.h"
19 #include "session.h"
20 #include "session_data_header.h"
21 #include "socket.h"
22 #include <chrono>
23 #include <map>
24 #include <memory>
25 #include "securec.h"
26 #include "softbus_error_code.h"
27 
28 namespace OHOS {
29 namespace DistributedCollab {
30 namespace {
31     static constexpr uint16_t PROTOCOL_VERSION = 1;
32     static const std::string TAG = "DSchedCollabDataSenderReceiver";
33 #define GET_SOFTBUS_SESSION_OPTION(socketId, value, valueSize)                                            \
34 do {                                                                                                  \
35     int32_t ret = GetSessionOption(socketId, SESSION_OPTION_MAX_SENDBYTES_SIZE, &(value), valueSize); \
36     if (ret != ERR_OK) {                                                                              \
37         HILOGE("GetSessionOption failed, ret: %{public}d, session: %{public}d",                       \
38             ret, socketId);                                                                           \
39         return GET_SESSION_OPTION_FAILED;                                                             \
40     }                                                                                                 \
41     HILOGD("GetSessionOption succeeded, session: %{public}d, value: %{public}d",                      \
42         socketId, value);                                                                             \
43 } while (0)
44 
45 }
46 
SendStreamData(const std::shared_ptr<AVTransStreamData> & sendData)47 int32_t DataSenderReceiver::SendStreamData(const std::shared_ptr<AVTransStreamData>& sendData)
48 {
49     const StreamData data = {
50         .buf = reinterpret_cast<char*>(sendData->StreamData()->Data()),
51         .bufLen = sendData->StreamData()->Size()
52     };
53     cJSON* extInfo = sendData->SerializeStreamDataExt();
54     char* jsonString = cJSON_PrintUnformatted(extInfo);
55     if (jsonString == nullptr) {
56         HILOGE("Failed to generate JSON string.");
57         return ERR_JSON_GENERATION_FAILED;
58     }
59     const StreamData ext = {
60         .buf = jsonString,
61         .bufLen = strlen(jsonString)
62     };
63     const StreamFrameInfo info = {
64         .frameType = 0,
65         .timeStamp = GetNowTimeStampUs(),
66         .seqNum = 0,
67         .seqSubNum = 0,
68         .level = 0,
69         .bitMap = 0,
70         .tvCount = 0
71     };
72     int32_t ret = SendStream(socketId_, &data, &ext, &info);
73     if (ret != SOFTBUS_OK) {
74         HILOGE("send stream data failed, %{public}d", socketId_);
75         cJSON_Delete(extInfo);
76         cJSON_free(jsonString);
77         return ret;
78     }
79     cJSON_Delete(extInfo);
80     cJSON_free(jsonString);
81     return ERR_OK;
82 }
83 
SendMessageData(const std::shared_ptr<AVTransDataBuffer> & sendData)84 int32_t DataSenderReceiver::SendMessageData(const std::shared_ptr<AVTransDataBuffer>& sendData)
85 {
86     HILOGI("start to send message, %{public}u", static_cast<uint32_t>(sendData->Size()));
87     if (sendData->Size() > MAX_SEND_MESSAGE_SIZE) {
88         HILOGE("too large send message");
89         return DATA_SIZE_EXCEED_LIMIT;
90     }
91     return SendMessage(socketId_, sendData->Data(), sendData->Size());
92 }
93 
SendFileData(const std::vector<std::string> & sFiles,const std::vector<std::string> & dFiles)94 int32_t DataSenderReceiver::SendFileData(const std::vector<std::string>& sFiles,
95     const std::vector<std::string>& dFiles)
96 {
97     HILOGI("start to send file");
98     const char* sFileList[sFiles.size()];
99     for (size_t i = 0; i < sFiles.size(); ++i) {
100         sFileList[i] = sFiles[i].c_str();
101     }
102     const char* dFileList[dFiles.size()];
103     for (size_t i = 0; i < dFiles.size(); ++i) {
104         dFileList[i] = dFiles[i].c_str();
105     }
106     return SendFile(socketId_, sFileList, dFileList, static_cast<uint32_t>(sFiles.size()));
107 }
108 
SendBytesData(const std::shared_ptr<AVTransDataBuffer> & sendData)109 int32_t DataSenderReceiver::SendBytesData(const std::shared_ptr<AVTransDataBuffer>& sendData)
110 {
111     int32_t dataType = static_cast<int32_t>(ChannelDataType::BYTES);
112     return SendUnpackData(sendData, dataType);
113 }
114 
SendUnpackData(const std::shared_ptr<AVTransDataBuffer> & sendData,const int32_t dataType)115 int32_t DataSenderReceiver::SendUnpackData(const std::shared_ptr<AVTransDataBuffer>& sendData,
116     const int32_t dataType)
117 {
118     HILOGI("start to send bytes");
119     uint32_t maxSendSize = 0;
120     GET_SOFTBUS_SESSION_OPTION(socketId_, maxSendSize, static_cast<uint32_t>(sizeof(maxSendSize)));
121 
122     if (sendData->Size() + SessionDataHeader::HEADER_LEN <= maxSendSize) {
123         return SendAllPackets(sendData, dataType);
124     }
125     const uint8_t* dataHeader = sendData->Data();
126     const uint8_t* const dataEnd = sendData->Data() + sendData->Size();
127     uint8_t* current = const_cast<uint8_t*>(dataHeader);
128     uint32_t totalLen = sendData->Size();
129     uint32_t packetLen = maxSendSize;
130     uint32_t payloadLen = packetLen - SessionDataHeader::HEADER_LEN;
131     uint16_t seqNum = 0;
132     uint16_t subSeq = 0;
133     SessionDataHeader headerPara(
134         PROTOCOL_VERSION,
135         FRAG_TYPE::FRAG_START,
136         dataType,
137         seqNum,
138         totalLen,
139         packetLen,
140         payloadLen,
141         subSeq);
142 
143     int32_t ret = DoSendPacket(headerPara, current, payloadLen);
144     if (ret != ERR_OK) {
145         return ret;
146     }
147     current += payloadLen;
148     while (current < dataEnd) {
149         GET_SOFTBUS_SESSION_OPTION(socketId_, maxSendSize, static_cast<uint32_t>(sizeof(maxSendSize)));
150         headerPara.packetLen_ = maxSendSize;
151         headerPara.payloadLen_ = maxSendSize - SessionDataHeader::HEADER_LEN;
152         headerPara.fragFlag_ = dataEnd - current > payloadLen ? FRAG_TYPE::FRAG_MID : FRAG_TYPE::FRAG_END;
153         headerPara.subSeq_++;
154         ret = DoSendPacket(headerPara, current, payloadLen);
155         if (ret != ERR_OK) {
156             return ret;
157         }
158         current += payloadLen;
159     }
160     HILOGI("finish send all bytes by packet");
161     return ERR_OK;
162 }
163 
SendAllPackets(const std::shared_ptr<AVTransDataBuffer> sendData,const int32_t dataType)164 int32_t DataSenderReceiver::SendAllPackets(const std::shared_ptr<AVTransDataBuffer> sendData,
165     const int32_t dataType)
166 {
167     HILOGI("send all data");
168     uint8_t* current = sendData->Data();
169     uint32_t totalLen = sendData->Size() + SessionDataHeader::HEADER_LEN;
170     uint32_t packetLen = sendData->Size() + SessionDataHeader::HEADER_LEN;
171     uint32_t payloadLen = packetLen - SessionDataHeader::HEADER_LEN;
172     uint16_t subSeq = 0;
173     uint16_t seqNum = 0;
174 
175     int32_t ret = ERR_OK;
176     SessionDataHeader headerPara(
177         PROTOCOL_VERSION,
178         FRAG_TYPE::FRAG_START_END,
179         dataType,
180         seqNum,
181         totalLen,
182         packetLen,
183         payloadLen,
184         subSeq
185     );
186     ret = DoSendPacket(headerPara, current, payloadLen);
187     if (ret != ERR_OK) {
188         return ret;
189     }
190     HILOGI("finish send all bytes");
191     return ERR_OK;
192 }
193 
DoSendPacket(SessionDataHeader & headerPara,const uint8_t * dataHeader,const uint32_t dataLen)194 int32_t DataSenderReceiver::DoSendPacket(SessionDataHeader& headerPara,
195     const uint8_t* dataHeader, const uint32_t dataLen)
196 {
197     HILOGI("start to send packet by softbus");
198     auto headerBuffer = headerPara.Serialize();
199     auto sendBuffer = std::make_unique<AVTransDataBuffer>(SessionDataHeader::HEADER_LEN + dataLen);
200     uint8_t* header = sendBuffer->Data();
201 
202     int32_t ret = ERR_OK;
203     // copy header
204     ret = memcpy_s(header, sendBuffer->Size(),
205         headerBuffer->Data(), SessionDataHeader::HEADER_LEN);
206     if (ret != EOK) {
207         HILOGE("Write header failed");
208         return WRITE_SESSION_HEADER_FAILED;
209     }
210     // copy data
211     ret = memcpy_s(header + SessionDataHeader::HEADER_LEN,
212         sendBuffer->Size() - SessionDataHeader::HEADER_LEN,
213             dataHeader, dataLen);
214     if (ret != EOK) {
215         HILOGE("Write data failed");
216         return WRITE_SEND_DATA_BUFFER_FAILED;
217     }
218     ret = SendBytes(socketId_, sendBuffer->Data(), sendBuffer->Size());
219     if (ret != SOFTBUS_OK) {
220         HILOGE("Send data buffer failed");
221         return SEND_DATA_BY_SOFTBUS_FAILED;
222     }
223     return ret;
224 }
225 
GetNowTimeStampUs()226 inline int64_t DataSenderReceiver::GetNowTimeStampUs()
227 {
228     std::chrono::microseconds nowUs = std::chrono::duration_cast<std::chrono::microseconds>(
229         std::chrono::system_clock::now().time_since_epoch());
230     return nowUs.count();
231 }
232 
PackRecvPacketData(const uint8_t * header,const uint32_t dataLen)233 int32_t DataSenderReceiver::PackRecvPacketData(const uint8_t* header, const uint32_t dataLen)
234 {
235     auto headerPara = SessionDataHeader::Deserialize(header, dataLen);
236     if (!headerPara) {
237         HILOGE("read session header from buffer failed");
238         return WRITE_SESSION_HEADER_FAILED;
239     }
240     SessionDataHeader& sessionHeader = *headerPara;
241     int32_t ret = CheckRecvSessionHeader(sessionHeader);
242     if (ret != ERR_OK) {
243         HILOGE("check session header failed");
244         return ret;
245     }
246     // pack recv data
247     uint8_t* dataHeader = const_cast<uint8_t*>(header);
248     switch (sessionHeader.fragFlag_) {
249         case FRAG_TYPE::FRAG_START_END:
250             ret = ProcessAllPacketRecv(dataHeader, dataLen, sessionHeader);
251             break;
252         case FRAG_TYPE::FRAG_START:
253             ret = ProcessStartPacketRecv(dataHeader, dataLen, sessionHeader);
254             break;
255         case FRAG_TYPE::FRAG_MID:
256             ret = ProcessMidPacketRecv(dataHeader, dataLen, sessionHeader);
257             break;
258         case FRAG_TYPE::FRAG_END:
259             ret = ProcessEndPacketRecv(dataHeader, dataLen, sessionHeader);
260             break;
261         default:
262             HILOGE("invalid flag type, %{public}d", static_cast<uint32_t>(sessionHeader.fragFlag_));
263             return INVALID_SESSION_HEADER_FLAG_TYPE;
264     }
265     return ret;
266 }
267 
CheckRecvSessionHeader(const SessionDataHeader & headerPara)268 int32_t DataSenderReceiver::CheckRecvSessionHeader(const SessionDataHeader& headerPara)
269 {
270     if (nowSeqNum_ != headerPara.seqNum_) {
271         HILOGE("seq error, nowSeq: %{public}d, actualSeq: %{public}d, sessionId: %{public}d",
272             nowSeqNum_, headerPara.seqNum_, socketId_);
273         return INVALID_SESSION_HEADER_SEQ_NUM;
274     }
275     if (nowSubSeq_ == 0 && headerPara.subSeq_ == 0) {
276         return ERR_OK;
277     }
278     if (nowSubSeq_ + 1 != headerPara.subSeq_) {
279         HILOGE("subSeq error, nowSeq: %{public}d, actualSeq: %{public}d, sessionId: %{public}d",
280             nowSubSeq_, headerPara.subSeq_, socketId_);
281         return INVALID_SESSION_HEADER_SUB_SEQ;
282     }
283     return ERR_OK;
284 }
285 
ProcessAllPacketRecv(const uint8_t * data,const uint32_t dataLen,const SessionDataHeader & headerPara)286 int32_t DataSenderReceiver::ProcessAllPacketRecv(const uint8_t* data, const uint32_t dataLen,
287     const SessionDataHeader& headerPara)
288 {
289     if (packBuffer_ != nullptr || isWaiting_) {
290         HILOGE("recv start data packet but buffer not empty or still waiting");
291         ResetFlag();
292         return FLAG_TYPE_NOT_MATCH_BUFFER_STATE;
293     }
294     int32_t ret = WriteRecvBytesDataToBuffer(data, dataLen, headerPara);
295     if (ret != ERR_OK) {
296         HILOGE("write payload to buffer failed");
297         ResetFlag();
298         return WRITE_PAYLOAD_TO_BUFFER_FAILED;
299     }
300     return ERR_OK;
301 }
302 
ProcessStartPacketRecv(const uint8_t * data,const uint32_t dataLen,const SessionDataHeader & headerPara)303 int32_t DataSenderReceiver::ProcessStartPacketRecv(const uint8_t* data, const uint32_t dataLen,
304     const SessionDataHeader& headerPara)
305 {
306     if (packBuffer_ != nullptr || isWaiting_) {
307         HILOGE("recv start data packet but buffer not empty or still waiting");
308         ResetFlag();
309         return FLAG_TYPE_NOT_MATCH_BUFFER_STATE;
310     }
311     int32_t ret = WriteRecvBytesDataToBuffer(data, dataLen, headerPara);
312     if (ret != ERR_OK) {
313         ResetFlag();
314         return ret;
315     }
316     isWaiting_ = true;
317     nowSeqNum_ = headerPara.seqNum_;
318     nowSubSeq_ = headerPara.subSeq_;
319     return ERR_OK;
320 }
321 
ProcessMidPacketRecv(const uint8_t * data,const uint32_t dataLen,const SessionDataHeader & headerPara)322 int32_t DataSenderReceiver::ProcessMidPacketRecv(const uint8_t* data,
323     const uint32_t dataLen, const SessionDataHeader& headerPara)
324 {
325     if (packBuffer_ == nullptr || !isWaiting_) {
326         HILOGE("recv mid data packet but buffer empty or end waiting");
327         ResetFlag();
328         return FLAG_TYPE_NOT_MATCH_BUFFER_STATE;
329     }
330     int32_t ret = WriteRecvBytesDataToBuffer(data, dataLen, headerPara);
331     if (ret != ERR_OK) {
332         ResetFlag();
333         return ret;
334     }
335     nowSeqNum_ = headerPara.seqNum_;
336     nowSubSeq_ = headerPara.subSeq_;
337     return ERR_OK;
338 }
339 
ProcessEndPacketRecv(const uint8_t * data,const uint32_t dataLen,const SessionDataHeader & headerPara)340 int32_t DataSenderReceiver::ProcessEndPacketRecv(const uint8_t* data,
341     const uint32_t dataLen, const SessionDataHeader& headerPara)
342 {
343     if (packBuffer_ == nullptr || !isWaiting_) {
344         HILOGE("recv end data packet but buffer empty or end waiting");
345         ResetFlag();
346         return FLAG_TYPE_NOT_MATCH_BUFFER_STATE;
347     }
348     int32_t ret = WriteRecvBytesDataToBuffer(data, dataLen, headerPara);
349     if (ret != ERR_OK) {
350         ResetFlag();
351         return ret;
352     }
353     isWaiting_ = false;
354     return ret;
355 }
356 
WriteRecvBytesDataToBuffer(const uint8_t * data,const uint32_t dataLen,const SessionDataHeader & headerPara)357 int32_t DataSenderReceiver::WriteRecvBytesDataToBuffer(const uint8_t* data,
358     const uint32_t dataLen, const SessionDataHeader& headerPara)
359 {
360     uint8_t* header = const_cast<uint8_t*>(data);
361     uint8_t* dataHeader = header + (headerPara.packetLen_ - headerPara.payloadLen_);
362     if (packBuffer_ == nullptr) {
363         packBuffer_ = std::make_unique<AVTransDataBuffer>(headerPara.totalLen_);
364         currentPos = packBuffer_->Data();
365     } else if (packBuffer_->Size() != headerPara.totalLen_) {
366         HILOGE("recv session header totalLen inconsistent");
367         return INVALID_SESSION_HEADER_TOTAL_LEN;
368     }
369 
370     int32_t ret = memcpy_s(currentPos,
371         packBuffer_->Size() - (currentPos - packBuffer_->Data()),
372         dataHeader, headerPara.payloadLen_);
373     if (ret != EOK) {
374         HILOGE("write payload to buffer failed");
375         return WRITE_PAYLOAD_TO_BUFFER_FAILED;
376     }
377     currentPos += headerPara.payloadLen_;
378     return ret;
379 }
380 
GetPacketedData()381 std::shared_ptr<AVTransDataBuffer> DataSenderReceiver::GetPacketedData()
382 {
383     if (isDataReady()) {
384         packBuffer_->SetRange(0, currentPos - packBuffer_->Data());
385         auto bytesData = std::shared_ptr<AVTransDataBuffer>(std::move(packBuffer_));
386         ResetFlag();
387         return bytesData;
388     }
389     return nullptr;
390 }
391 
isDataReady()392 inline bool DataSenderReceiver::isDataReady()
393 {
394     return !isWaiting_ && packBuffer_ != nullptr;
395 }
396 
ResetFlag()397 inline void DataSenderReceiver::ResetFlag()
398 {
399     isWaiting_ = false;
400     nowSeqNum_ = 0;
401     nowSubSeq_ = 0;
402     nowTotalLen_ = 0;
403     packBuffer_ = nullptr;
404     currentPos = nullptr;
405 }
406 } // namespace DistributedCollab
407 } // namespace OHOS