1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "data_sender_receiver.h"
17 #include "channel_common_definition.h"
18 #include "dtbcollabmgr_log.h"
19 #include "session.h"
20 #include "session_data_header.h"
21 #include "socket.h"
22 #include <chrono>
23 #include <map>
24 #include <memory>
25 #include "securec.h"
26 #include "softbus_error_code.h"
27
28 namespace OHOS {
29 namespace DistributedCollab {
30 namespace {
31 static constexpr uint16_t PROTOCOL_VERSION = 1;
32 static const std::string TAG = "DSchedCollabDataSenderReceiver";
33 #define GET_SOFTBUS_SESSION_OPTION(socketId, value, valueSize) \
34 do { \
35 int32_t ret = GetSessionOption(socketId, SESSION_OPTION_MAX_SENDBYTES_SIZE, &(value), valueSize); \
36 if (ret != ERR_OK) { \
37 HILOGE("GetSessionOption failed, ret: %{public}d, session: %{public}d", \
38 ret, socketId); \
39 return GET_SESSION_OPTION_FAILED; \
40 } \
41 HILOGD("GetSessionOption succeeded, session: %{public}d, value: %{public}d", \
42 socketId, value); \
43 } while (0)
44
45 }
46
SendStreamData(const std::shared_ptr<AVTransStreamData> & sendData)47 int32_t DataSenderReceiver::SendStreamData(const std::shared_ptr<AVTransStreamData>& sendData)
48 {
49 const StreamData data = {
50 .buf = reinterpret_cast<char*>(sendData->StreamData()->Data()),
51 .bufLen = sendData->StreamData()->Size()
52 };
53 cJSON* extInfo = sendData->SerializeStreamDataExt();
54 char* jsonString = cJSON_PrintUnformatted(extInfo);
55 if (jsonString == nullptr) {
56 HILOGE("Failed to generate JSON string.");
57 return ERR_JSON_GENERATION_FAILED;
58 }
59 const StreamData ext = {
60 .buf = jsonString,
61 .bufLen = strlen(jsonString)
62 };
63 const StreamFrameInfo info = {
64 .frameType = 0,
65 .timeStamp = GetNowTimeStampUs(),
66 .seqNum = 0,
67 .seqSubNum = 0,
68 .level = 0,
69 .bitMap = 0,
70 .tvCount = 0
71 };
72 int32_t ret = SendStream(socketId_, &data, &ext, &info);
73 if (ret != SOFTBUS_OK) {
74 HILOGE("send stream data failed, %{public}d", socketId_);
75 cJSON_Delete(extInfo);
76 cJSON_free(jsonString);
77 return ret;
78 }
79 cJSON_Delete(extInfo);
80 cJSON_free(jsonString);
81 return ERR_OK;
82 }
83
SendMessageData(const std::shared_ptr<AVTransDataBuffer> & sendData)84 int32_t DataSenderReceiver::SendMessageData(const std::shared_ptr<AVTransDataBuffer>& sendData)
85 {
86 HILOGI("start to send message, %{public}u", static_cast<uint32_t>(sendData->Size()));
87 if (sendData->Size() > MAX_SEND_MESSAGE_SIZE) {
88 HILOGE("too large send message");
89 return DATA_SIZE_EXCEED_LIMIT;
90 }
91 return SendMessage(socketId_, sendData->Data(), sendData->Size());
92 }
93
SendFileData(const std::vector<std::string> & sFiles,const std::vector<std::string> & dFiles)94 int32_t DataSenderReceiver::SendFileData(const std::vector<std::string>& sFiles,
95 const std::vector<std::string>& dFiles)
96 {
97 HILOGI("start to send file");
98 const char* sFileList[sFiles.size()];
99 for (size_t i = 0; i < sFiles.size(); ++i) {
100 sFileList[i] = sFiles[i].c_str();
101 }
102 const char* dFileList[dFiles.size()];
103 for (size_t i = 0; i < dFiles.size(); ++i) {
104 dFileList[i] = dFiles[i].c_str();
105 }
106 return SendFile(socketId_, sFileList, dFileList, static_cast<uint32_t>(sFiles.size()));
107 }
108
SendBytesData(const std::shared_ptr<AVTransDataBuffer> & sendData)109 int32_t DataSenderReceiver::SendBytesData(const std::shared_ptr<AVTransDataBuffer>& sendData)
110 {
111 int32_t dataType = static_cast<int32_t>(ChannelDataType::BYTES);
112 return SendUnpackData(sendData, dataType);
113 }
114
SendUnpackData(const std::shared_ptr<AVTransDataBuffer> & sendData,const int32_t dataType)115 int32_t DataSenderReceiver::SendUnpackData(const std::shared_ptr<AVTransDataBuffer>& sendData,
116 const int32_t dataType)
117 {
118 HILOGI("start to send bytes");
119 uint32_t maxSendSize = 0;
120 GET_SOFTBUS_SESSION_OPTION(socketId_, maxSendSize, static_cast<uint32_t>(sizeof(maxSendSize)));
121
122 if (sendData->Size() + SessionDataHeader::HEADER_LEN <= maxSendSize) {
123 return SendAllPackets(sendData, dataType);
124 }
125 const uint8_t* dataHeader = sendData->Data();
126 const uint8_t* const dataEnd = sendData->Data() + sendData->Size();
127 uint8_t* current = const_cast<uint8_t*>(dataHeader);
128 uint32_t totalLen = sendData->Size();
129 uint32_t packetLen = maxSendSize;
130 uint32_t payloadLen = packetLen - SessionDataHeader::HEADER_LEN;
131 uint16_t seqNum = 0;
132 uint16_t subSeq = 0;
133 SessionDataHeader headerPara(
134 PROTOCOL_VERSION,
135 FRAG_TYPE::FRAG_START,
136 dataType,
137 seqNum,
138 totalLen,
139 packetLen,
140 payloadLen,
141 subSeq);
142
143 int32_t ret = DoSendPacket(headerPara, current, payloadLen);
144 if (ret != ERR_OK) {
145 return ret;
146 }
147 current += payloadLen;
148 while (current < dataEnd) {
149 GET_SOFTBUS_SESSION_OPTION(socketId_, maxSendSize, static_cast<uint32_t>(sizeof(maxSendSize)));
150 headerPara.packetLen_ = maxSendSize;
151 headerPara.payloadLen_ = maxSendSize - SessionDataHeader::HEADER_LEN;
152 headerPara.fragFlag_ = dataEnd - current > payloadLen ? FRAG_TYPE::FRAG_MID : FRAG_TYPE::FRAG_END;
153 headerPara.subSeq_++;
154 ret = DoSendPacket(headerPara, current, payloadLen);
155 if (ret != ERR_OK) {
156 return ret;
157 }
158 current += payloadLen;
159 }
160 HILOGI("finish send all bytes by packet");
161 return ERR_OK;
162 }
163
SendAllPackets(const std::shared_ptr<AVTransDataBuffer> sendData,const int32_t dataType)164 int32_t DataSenderReceiver::SendAllPackets(const std::shared_ptr<AVTransDataBuffer> sendData,
165 const int32_t dataType)
166 {
167 HILOGI("send all data");
168 uint8_t* current = sendData->Data();
169 uint32_t totalLen = sendData->Size() + SessionDataHeader::HEADER_LEN;
170 uint32_t packetLen = sendData->Size() + SessionDataHeader::HEADER_LEN;
171 uint32_t payloadLen = packetLen - SessionDataHeader::HEADER_LEN;
172 uint16_t subSeq = 0;
173 uint16_t seqNum = 0;
174
175 int32_t ret = ERR_OK;
176 SessionDataHeader headerPara(
177 PROTOCOL_VERSION,
178 FRAG_TYPE::FRAG_START_END,
179 dataType,
180 seqNum,
181 totalLen,
182 packetLen,
183 payloadLen,
184 subSeq
185 );
186 ret = DoSendPacket(headerPara, current, payloadLen);
187 if (ret != ERR_OK) {
188 return ret;
189 }
190 HILOGI("finish send all bytes");
191 return ERR_OK;
192 }
193
DoSendPacket(SessionDataHeader & headerPara,const uint8_t * dataHeader,const uint32_t dataLen)194 int32_t DataSenderReceiver::DoSendPacket(SessionDataHeader& headerPara,
195 const uint8_t* dataHeader, const uint32_t dataLen)
196 {
197 HILOGI("start to send packet by softbus");
198 auto headerBuffer = headerPara.Serialize();
199 auto sendBuffer = std::make_unique<AVTransDataBuffer>(SessionDataHeader::HEADER_LEN + dataLen);
200 uint8_t* header = sendBuffer->Data();
201
202 int32_t ret = ERR_OK;
203 // copy header
204 ret = memcpy_s(header, sendBuffer->Size(),
205 headerBuffer->Data(), SessionDataHeader::HEADER_LEN);
206 if (ret != EOK) {
207 HILOGE("Write header failed");
208 return WRITE_SESSION_HEADER_FAILED;
209 }
210 // copy data
211 ret = memcpy_s(header + SessionDataHeader::HEADER_LEN,
212 sendBuffer->Size() - SessionDataHeader::HEADER_LEN,
213 dataHeader, dataLen);
214 if (ret != EOK) {
215 HILOGE("Write data failed");
216 return WRITE_SEND_DATA_BUFFER_FAILED;
217 }
218 ret = SendBytes(socketId_, sendBuffer->Data(), sendBuffer->Size());
219 if (ret != SOFTBUS_OK) {
220 HILOGE("Send data buffer failed");
221 return SEND_DATA_BY_SOFTBUS_FAILED;
222 }
223 return ret;
224 }
225
GetNowTimeStampUs()226 inline int64_t DataSenderReceiver::GetNowTimeStampUs()
227 {
228 std::chrono::microseconds nowUs = std::chrono::duration_cast<std::chrono::microseconds>(
229 std::chrono::system_clock::now().time_since_epoch());
230 return nowUs.count();
231 }
232
PackRecvPacketData(const uint8_t * header,const uint32_t dataLen)233 int32_t DataSenderReceiver::PackRecvPacketData(const uint8_t* header, const uint32_t dataLen)
234 {
235 auto headerPara = SessionDataHeader::Deserialize(header, dataLen);
236 if (!headerPara) {
237 HILOGE("read session header from buffer failed");
238 return WRITE_SESSION_HEADER_FAILED;
239 }
240 SessionDataHeader& sessionHeader = *headerPara;
241 int32_t ret = CheckRecvSessionHeader(sessionHeader);
242 if (ret != ERR_OK) {
243 HILOGE("check session header failed");
244 return ret;
245 }
246 // pack recv data
247 uint8_t* dataHeader = const_cast<uint8_t*>(header);
248 switch (sessionHeader.fragFlag_) {
249 case FRAG_TYPE::FRAG_START_END:
250 ret = ProcessAllPacketRecv(dataHeader, dataLen, sessionHeader);
251 break;
252 case FRAG_TYPE::FRAG_START:
253 ret = ProcessStartPacketRecv(dataHeader, dataLen, sessionHeader);
254 break;
255 case FRAG_TYPE::FRAG_MID:
256 ret = ProcessMidPacketRecv(dataHeader, dataLen, sessionHeader);
257 break;
258 case FRAG_TYPE::FRAG_END:
259 ret = ProcessEndPacketRecv(dataHeader, dataLen, sessionHeader);
260 break;
261 default:
262 HILOGE("invalid flag type, %{public}d", static_cast<uint32_t>(sessionHeader.fragFlag_));
263 return INVALID_SESSION_HEADER_FLAG_TYPE;
264 }
265 return ret;
266 }
267
CheckRecvSessionHeader(const SessionDataHeader & headerPara)268 int32_t DataSenderReceiver::CheckRecvSessionHeader(const SessionDataHeader& headerPara)
269 {
270 if (nowSeqNum_ != headerPara.seqNum_) {
271 HILOGE("seq error, nowSeq: %{public}d, actualSeq: %{public}d, sessionId: %{public}d",
272 nowSeqNum_, headerPara.seqNum_, socketId_);
273 return INVALID_SESSION_HEADER_SEQ_NUM;
274 }
275 if (nowSubSeq_ == 0 && headerPara.subSeq_ == 0) {
276 return ERR_OK;
277 }
278 if (nowSubSeq_ + 1 != headerPara.subSeq_) {
279 HILOGE("subSeq error, nowSeq: %{public}d, actualSeq: %{public}d, sessionId: %{public}d",
280 nowSubSeq_, headerPara.subSeq_, socketId_);
281 return INVALID_SESSION_HEADER_SUB_SEQ;
282 }
283 return ERR_OK;
284 }
285
ProcessAllPacketRecv(const uint8_t * data,const uint32_t dataLen,const SessionDataHeader & headerPara)286 int32_t DataSenderReceiver::ProcessAllPacketRecv(const uint8_t* data, const uint32_t dataLen,
287 const SessionDataHeader& headerPara)
288 {
289 if (packBuffer_ != nullptr || isWaiting_) {
290 HILOGE("recv start data packet but buffer not empty or still waiting");
291 ResetFlag();
292 return FLAG_TYPE_NOT_MATCH_BUFFER_STATE;
293 }
294 int32_t ret = WriteRecvBytesDataToBuffer(data, dataLen, headerPara);
295 if (ret != ERR_OK) {
296 HILOGE("write payload to buffer failed");
297 ResetFlag();
298 return WRITE_PAYLOAD_TO_BUFFER_FAILED;
299 }
300 return ERR_OK;
301 }
302
ProcessStartPacketRecv(const uint8_t * data,const uint32_t dataLen,const SessionDataHeader & headerPara)303 int32_t DataSenderReceiver::ProcessStartPacketRecv(const uint8_t* data, const uint32_t dataLen,
304 const SessionDataHeader& headerPara)
305 {
306 if (packBuffer_ != nullptr || isWaiting_) {
307 HILOGE("recv start data packet but buffer not empty or still waiting");
308 ResetFlag();
309 return FLAG_TYPE_NOT_MATCH_BUFFER_STATE;
310 }
311 int32_t ret = WriteRecvBytesDataToBuffer(data, dataLen, headerPara);
312 if (ret != ERR_OK) {
313 ResetFlag();
314 return ret;
315 }
316 isWaiting_ = true;
317 nowSeqNum_ = headerPara.seqNum_;
318 nowSubSeq_ = headerPara.subSeq_;
319 return ERR_OK;
320 }
321
ProcessMidPacketRecv(const uint8_t * data,const uint32_t dataLen,const SessionDataHeader & headerPara)322 int32_t DataSenderReceiver::ProcessMidPacketRecv(const uint8_t* data,
323 const uint32_t dataLen, const SessionDataHeader& headerPara)
324 {
325 if (packBuffer_ == nullptr || !isWaiting_) {
326 HILOGE("recv mid data packet but buffer empty or end waiting");
327 ResetFlag();
328 return FLAG_TYPE_NOT_MATCH_BUFFER_STATE;
329 }
330 int32_t ret = WriteRecvBytesDataToBuffer(data, dataLen, headerPara);
331 if (ret != ERR_OK) {
332 ResetFlag();
333 return ret;
334 }
335 nowSeqNum_ = headerPara.seqNum_;
336 nowSubSeq_ = headerPara.subSeq_;
337 return ERR_OK;
338 }
339
ProcessEndPacketRecv(const uint8_t * data,const uint32_t dataLen,const SessionDataHeader & headerPara)340 int32_t DataSenderReceiver::ProcessEndPacketRecv(const uint8_t* data,
341 const uint32_t dataLen, const SessionDataHeader& headerPara)
342 {
343 if (packBuffer_ == nullptr || !isWaiting_) {
344 HILOGE("recv end data packet but buffer empty or end waiting");
345 ResetFlag();
346 return FLAG_TYPE_NOT_MATCH_BUFFER_STATE;
347 }
348 int32_t ret = WriteRecvBytesDataToBuffer(data, dataLen, headerPara);
349 if (ret != ERR_OK) {
350 ResetFlag();
351 return ret;
352 }
353 isWaiting_ = false;
354 return ret;
355 }
356
WriteRecvBytesDataToBuffer(const uint8_t * data,const uint32_t dataLen,const SessionDataHeader & headerPara)357 int32_t DataSenderReceiver::WriteRecvBytesDataToBuffer(const uint8_t* data,
358 const uint32_t dataLen, const SessionDataHeader& headerPara)
359 {
360 uint8_t* header = const_cast<uint8_t*>(data);
361 uint8_t* dataHeader = header + (headerPara.packetLen_ - headerPara.payloadLen_);
362 if (packBuffer_ == nullptr) {
363 packBuffer_ = std::make_unique<AVTransDataBuffer>(headerPara.totalLen_);
364 currentPos = packBuffer_->Data();
365 } else if (packBuffer_->Size() != headerPara.totalLen_) {
366 HILOGE("recv session header totalLen inconsistent");
367 return INVALID_SESSION_HEADER_TOTAL_LEN;
368 }
369
370 int32_t ret = memcpy_s(currentPos,
371 packBuffer_->Size() - (currentPos - packBuffer_->Data()),
372 dataHeader, headerPara.payloadLen_);
373 if (ret != EOK) {
374 HILOGE("write payload to buffer failed");
375 return WRITE_PAYLOAD_TO_BUFFER_FAILED;
376 }
377 currentPos += headerPara.payloadLen_;
378 return ret;
379 }
380
GetPacketedData()381 std::shared_ptr<AVTransDataBuffer> DataSenderReceiver::GetPacketedData()
382 {
383 if (isDataReady()) {
384 packBuffer_->SetRange(0, currentPos - packBuffer_->Data());
385 auto bytesData = std::shared_ptr<AVTransDataBuffer>(std::move(packBuffer_));
386 ResetFlag();
387 return bytesData;
388 }
389 return nullptr;
390 }
391
isDataReady()392 inline bool DataSenderReceiver::isDataReady()
393 {
394 return !isWaiting_ && packBuffer_ != nullptr;
395 }
396
ResetFlag()397 inline void DataSenderReceiver::ResetFlag()
398 {
399 isWaiting_ = false;
400 nowSeqNum_ = 0;
401 nowSubSeq_ = 0;
402 nowTotalLen_ = 0;
403 packBuffer_ = nullptr;
404 currentPos = nullptr;
405 }
406 } // namespace DistributedCollab
407 } // namespace OHOS