1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "data_sender_receiver.h"
17 #include "channel_common_definition.h"
18 #include "dtbcollabmgr_log.h"
19 #include "session.h"
20 #include "session_data_header.h"
21 #include "socket.h"
22 #include <chrono>
23 #include <map>
24 #include <memory>
25 #include "securec.h"
26 #include "softbus_error_code.h"
27
28 namespace OHOS {
29 namespace DistributedCollab {
30 namespace {
31 static constexpr uint16_t PROTOCOL_VERSION = 1;
32 static const std::string TAG = "DSchedCollabDataSenderReceiver";
33 #define GET_SOFTBUS_SESSION_OPTION(socketId, value, valueSize) \
34 do { \
35 int32_t ret = GetSessionOption(socketId, SESSION_OPTION_MAX_SENDBYTES_SIZE, &(value), valueSize); \
36 if (ret != ERR_OK) { \
37 HILOGE("GetSessionOption failed, ret: %{public}d, session: %{public}d", \
38 ret, socketId); \
39 return GET_SESSION_OPTION_FAILED; \
40 } \
41 HILOGD("GetSessionOption succeeded, session: %{public}d, value: %{public}d", \
42 socketId, value); \
43 } while (0)
44
45 }
46
~DataSenderReceiver()47 DataSenderReceiver::~DataSenderReceiver()
48 {
49 HILOGI("destory data sender receiver for %{public}d", socketId_);
50 DeInit();
51 }
52
Init()53 void DataSenderReceiver::Init()
54 {
55 std::call_once(initFlag_, [this] {
56 HILOGI("start init data sender receiver for %{public}d", socketId_);
57 eventThread_ = std::thread(&DataSenderReceiver::StartEvent, this);
58 std::unique_lock<std::mutex> lock(eventMutex_);
59 eventCon_.wait(lock, [this] { return eventHandler_ != nullptr; });
60 });
61 }
62
StartEvent()63 void DataSenderReceiver::StartEvent()
64 {
65 HILOGI("StartEvent start");
66 auto runner = AppExecFwk::EventRunner::Create(false);
67 {
68 std::lock_guard<std::mutex> lock(eventMutex_);
69 eventHandler_ = std::make_shared<OHOS::AppExecFwk::EventHandler>(runner);
70 }
71 eventCon_.notify_one();
72 runner->Run();
73 HILOGI("StartEvent end");
74 }
75
DeInit()76 void DataSenderReceiver::DeInit()
77 {
78 HILOGI("start deinit data sender receiver for %{public}d", socketId_);
79 Shutdown(socketId_);
80 // stop all task
81 if (eventHandler_ != nullptr) {
82 eventHandler_->GetEventRunner()->Stop();
83 if (eventThread_.joinable()) {
84 eventThread_.join();
85 }
86 eventHandler_ = nullptr;
87 } else {
88 HILOGE("eventHandler_ is nullptr");
89 }
90 HILOGI("end");
91 }
92
SendStreamData(const std::shared_ptr<AVTransStreamData> & sendData)93 int32_t DataSenderReceiver::SendStreamData(const std::shared_ptr<AVTransStreamData>& sendData)
94 {
95 const StreamData data = {
96 .buf = reinterpret_cast<char*>(sendData->StreamData()->Data()),
97 .bufLen = sendData->StreamData()->Size()
98 };
99 cJSON* extInfo = sendData->SerializeStreamDataExt();
100 char* jsonString = cJSON_PrintUnformatted(extInfo);
101 if (jsonString == nullptr) {
102 HILOGE("Failed to generate JSON string.");
103 return ERR_JSON_GENERATION_FAILED;
104 }
105 const StreamData ext = {
106 .buf = jsonString,
107 .bufLen = strlen(jsonString)
108 };
109 const StreamFrameInfo info = {
110 .frameType = 0,
111 .timeStamp = GetNowTimeStampUs(),
112 .seqNum = 0,
113 .seqSubNum = 0,
114 .level = 0,
115 .bitMap = 0,
116 .tvCount = 0
117 };
118 int32_t ret = SendStream(socketId_, &data, &ext, &info);
119 if (ret != SOFTBUS_OK) {
120 HILOGE("send stream data failed, %{public}d", socketId_);
121 cJSON_Delete(extInfo);
122 cJSON_free(jsonString);
123 return ret;
124 }
125 cJSON_Delete(extInfo);
126 cJSON_free(jsonString);
127 return ERR_OK;
128 }
129
SendMessageData(const std::shared_ptr<AVTransDataBuffer> & sendData)130 int32_t DataSenderReceiver::SendMessageData(const std::shared_ptr<AVTransDataBuffer>& sendData)
131 {
132 HILOGI("start to send message, %{public}u", static_cast<uint32_t>(sendData->Size()));
133 Init();
134 if (eventHandler_ == nullptr) {
135 HILOGE("init eventhandler failed");
136 return NULL_EVENT_HANDLER;
137 }
138 if (sendData->Size() > MAX_SEND_MESSAGE_SIZE) {
139 HILOGE("too large send message");
140 return DATA_SIZE_EXCEED_LIMIT;
141 }
142 int32_t socketId = socketId_;
143 auto func = [this, socketId, sendData]() {
144 uint32_t dataSize = static_cast<uint32_t>(sendData->Size());
145 HILOGI("begin to send msg by softbus %{public}d:%{public}u", socketId, dataSize);
146 int32_t ret = SendMessage(socketId, sendData->Data(), sendData->Size());
147 HILOGI("finish send msg by softbus %{public}d:%{public}u, ret=%{public}d", socketId, dataSize, ret);
148 };
149 if (eventHandler_->PostTask(func, AppExecFwk::EventQueue::Priority::HIGH)) {
150 return ERR_OK;
151 }
152 HILOGI("finish send msg to task queue for %{public}d:%{public}u", socketId,
153 static_cast<uint32_t>(sendData->Size()));
154 return POST_TASK_FAILED;
155 }
156
SendFileData(const std::vector<std::string> & sFiles,const std::vector<std::string> & dFiles)157 int32_t DataSenderReceiver::SendFileData(const std::vector<std::string>& sFiles,
158 const std::vector<std::string>& dFiles)
159 {
160 HILOGI("start to send file");
161 const char* sFileList[sFiles.size()];
162 for (size_t i = 0; i < sFiles.size(); ++i) {
163 sFileList[i] = sFiles[i].c_str();
164 }
165 const char* dFileList[dFiles.size()];
166 for (size_t i = 0; i < dFiles.size(); ++i) {
167 dFileList[i] = dFiles[i].c_str();
168 }
169 return SendFile(socketId_, sFileList, dFileList, static_cast<uint32_t>(sFiles.size()));
170 }
171
SendBytesData(const std::shared_ptr<AVTransDataBuffer> & sendData)172 int32_t DataSenderReceiver::SendBytesData(const std::shared_ptr<AVTransDataBuffer>& sendData)
173 {
174 int32_t dataType = static_cast<int32_t>(ChannelDataType::BYTES);
175 return SendUnpackData(sendData, dataType);
176 }
177
SendUnpackData(const std::shared_ptr<AVTransDataBuffer> & sendData,const int32_t dataType)178 int32_t DataSenderReceiver::SendUnpackData(const std::shared_ptr<AVTransDataBuffer>& sendData,
179 const int32_t dataType)
180 {
181 HILOGI("start to send bytes");
182 uint32_t maxSendSize = 0;
183 GET_SOFTBUS_SESSION_OPTION(socketId_, maxSendSize, static_cast<uint32_t>(sizeof(maxSendSize)));
184
185 if (sendData->Size() + SessionDataHeader::HEADER_LEN <= maxSendSize) {
186 return SendAllPackets(sendData, dataType);
187 }
188 const uint8_t* dataHeader = sendData->Data();
189 const uint8_t* const dataEnd = sendData->Data() + sendData->Size();
190 uint8_t* current = const_cast<uint8_t*>(dataHeader);
191 uint32_t totalLen = sendData->Size();
192 uint32_t packetLen = maxSendSize;
193 uint32_t payloadLen = packetLen - SessionDataHeader::HEADER_LEN;
194 uint16_t seqNum = 0;
195 uint16_t subSeq = 0;
196 SessionDataHeader headerPara(
197 PROTOCOL_VERSION,
198 FRAG_TYPE::FRAG_START,
199 dataType,
200 seqNum,
201 totalLen,
202 packetLen,
203 payloadLen,
204 subSeq);
205
206 int32_t ret = DoSendPacket(headerPara, current, payloadLen);
207 if (ret != ERR_OK) {
208 return ret;
209 }
210 current += payloadLen;
211 while (current < dataEnd) {
212 GET_SOFTBUS_SESSION_OPTION(socketId_, maxSendSize, static_cast<uint32_t>(sizeof(maxSendSize)));
213 headerPara.packetLen_ = maxSendSize;
214 headerPara.payloadLen_ = maxSendSize - SessionDataHeader::HEADER_LEN;
215 headerPara.fragFlag_ = dataEnd - current > payloadLen ? FRAG_TYPE::FRAG_MID : FRAG_TYPE::FRAG_END;
216 headerPara.subSeq_++;
217 ret = DoSendPacket(headerPara, current, payloadLen);
218 if (ret != ERR_OK) {
219 return ret;
220 }
221 current += payloadLen;
222 }
223 HILOGI("finish send all bytes by packet");
224 return ERR_OK;
225 }
226
SendAllPackets(const std::shared_ptr<AVTransDataBuffer> sendData,const int32_t dataType)227 int32_t DataSenderReceiver::SendAllPackets(const std::shared_ptr<AVTransDataBuffer> sendData,
228 const int32_t dataType)
229 {
230 HILOGI("send all data");
231 uint8_t* current = sendData->Data();
232 uint32_t totalLen = sendData->Size() + SessionDataHeader::HEADER_LEN;
233 uint32_t packetLen = sendData->Size() + SessionDataHeader::HEADER_LEN;
234 uint32_t payloadLen = packetLen - SessionDataHeader::HEADER_LEN;
235 uint16_t subSeq = 0;
236 uint16_t seqNum = 0;
237
238 int32_t ret = ERR_OK;
239 SessionDataHeader headerPara(
240 PROTOCOL_VERSION,
241 FRAG_TYPE::FRAG_START_END,
242 dataType,
243 seqNum,
244 totalLen,
245 packetLen,
246 payloadLen,
247 subSeq
248 );
249 ret = DoSendPacket(headerPara, current, payloadLen);
250 if (ret != ERR_OK) {
251 return ret;
252 }
253 HILOGI("finish send all bytes");
254 return ERR_OK;
255 }
256
DoSendPacket(SessionDataHeader & headerPara,const uint8_t * dataHeader,const uint32_t dataLen)257 int32_t DataSenderReceiver::DoSendPacket(SessionDataHeader& headerPara,
258 const uint8_t* dataHeader, const uint32_t dataLen)
259 {
260 HILOGI("start to send packet by softbus");
261 auto headerBuffer = headerPara.Serialize();
262 auto sendBuffer = std::make_unique<AVTransDataBuffer>(SessionDataHeader::HEADER_LEN + dataLen);
263 uint8_t* header = sendBuffer->Data();
264
265 int32_t ret = ERR_OK;
266 // copy header
267 ret = memcpy_s(header, sendBuffer->Size(),
268 headerBuffer->Data(), SessionDataHeader::HEADER_LEN);
269 if (ret != EOK) {
270 HILOGE("Write header failed");
271 return WRITE_SESSION_HEADER_FAILED;
272 }
273 // copy data
274 ret = memcpy_s(header + SessionDataHeader::HEADER_LEN,
275 sendBuffer->Size() - SessionDataHeader::HEADER_LEN,
276 dataHeader, dataLen);
277 if (ret != EOK) {
278 HILOGE("Write data failed");
279 return WRITE_SEND_DATA_BUFFER_FAILED;
280 }
281 ret = SendBytes(socketId_, sendBuffer->Data(), sendBuffer->Size());
282 if (ret != SOFTBUS_OK) {
283 HILOGE("Send data buffer failed");
284 return SEND_DATA_BY_SOFTBUS_FAILED;
285 }
286 return ret;
287 }
288
GetNowTimeStampUs()289 inline int64_t DataSenderReceiver::GetNowTimeStampUs()
290 {
291 std::chrono::microseconds nowUs = std::chrono::duration_cast<std::chrono::microseconds>(
292 std::chrono::system_clock::now().time_since_epoch());
293 return nowUs.count();
294 }
295
PackRecvPacketData(const uint8_t * header,const uint32_t dataLen)296 int32_t DataSenderReceiver::PackRecvPacketData(const uint8_t* header, const uint32_t dataLen)
297 {
298 auto headerPara = SessionDataHeader::Deserialize(header, dataLen);
299 if (!headerPara) {
300 HILOGE("read session header from buffer failed");
301 return WRITE_SESSION_HEADER_FAILED;
302 }
303 SessionDataHeader& sessionHeader = *headerPara;
304 int32_t ret = CheckRecvSessionHeader(sessionHeader);
305 if (ret != ERR_OK) {
306 HILOGE("check session header failed");
307 return ret;
308 }
309 // pack recv data
310 uint8_t* dataHeader = const_cast<uint8_t*>(header);
311 switch (sessionHeader.fragFlag_) {
312 case FRAG_TYPE::FRAG_START_END:
313 ret = ProcessAllPacketRecv(dataHeader, dataLen, sessionHeader);
314 break;
315 case FRAG_TYPE::FRAG_START:
316 ret = ProcessStartPacketRecv(dataHeader, dataLen, sessionHeader);
317 break;
318 case FRAG_TYPE::FRAG_MID:
319 ret = ProcessMidPacketRecv(dataHeader, dataLen, sessionHeader);
320 break;
321 case FRAG_TYPE::FRAG_END:
322 ret = ProcessEndPacketRecv(dataHeader, dataLen, sessionHeader);
323 break;
324 default:
325 HILOGE("invalid flag type, %{public}d", static_cast<uint32_t>(sessionHeader.fragFlag_));
326 return INVALID_SESSION_HEADER_FLAG_TYPE;
327 }
328 return ret;
329 }
330
CheckRecvSessionHeader(const SessionDataHeader & headerPara)331 int32_t DataSenderReceiver::CheckRecvSessionHeader(const SessionDataHeader& headerPara)
332 {
333 if (nowSeqNum_ != headerPara.seqNum_) {
334 HILOGE("seq error, nowSeq: %{public}d, actualSeq: %{public}d, sessionId: %{public}d",
335 nowSeqNum_, headerPara.seqNum_, socketId_);
336 return INVALID_SESSION_HEADER_SEQ_NUM;
337 }
338 if (nowSubSeq_ == 0 && headerPara.subSeq_ == 0) {
339 return ERR_OK;
340 }
341 if (nowSubSeq_ + 1 != headerPara.subSeq_) {
342 HILOGE("subSeq error, nowSeq: %{public}d, actualSeq: %{public}d, sessionId: %{public}d",
343 nowSubSeq_, headerPara.subSeq_, socketId_);
344 return INVALID_SESSION_HEADER_SUB_SEQ;
345 }
346 return ERR_OK;
347 }
348
ProcessAllPacketRecv(const uint8_t * data,const uint32_t dataLen,const SessionDataHeader & headerPara)349 int32_t DataSenderReceiver::ProcessAllPacketRecv(const uint8_t* data, const uint32_t dataLen,
350 const SessionDataHeader& headerPara)
351 {
352 if (packBuffer_ != nullptr || isWaiting_) {
353 HILOGE("recv start data packet but buffer not empty or still waiting");
354 ResetFlag();
355 return FLAG_TYPE_NOT_MATCH_BUFFER_STATE;
356 }
357 int32_t ret = WriteRecvBytesDataToBuffer(data, dataLen, headerPara);
358 if (ret != ERR_OK) {
359 HILOGE("write payload to buffer failed");
360 ResetFlag();
361 return WRITE_PAYLOAD_TO_BUFFER_FAILED;
362 }
363 return ERR_OK;
364 }
365
ProcessStartPacketRecv(const uint8_t * data,const uint32_t dataLen,const SessionDataHeader & headerPara)366 int32_t DataSenderReceiver::ProcessStartPacketRecv(const uint8_t* data, const uint32_t dataLen,
367 const SessionDataHeader& headerPara)
368 {
369 if (packBuffer_ != nullptr || isWaiting_) {
370 HILOGE("recv start data packet but buffer not empty or still waiting");
371 ResetFlag();
372 return FLAG_TYPE_NOT_MATCH_BUFFER_STATE;
373 }
374 int32_t ret = WriteRecvBytesDataToBuffer(data, dataLen, headerPara);
375 if (ret != ERR_OK) {
376 ResetFlag();
377 return ret;
378 }
379 isWaiting_ = true;
380 nowSeqNum_ = headerPara.seqNum_;
381 nowSubSeq_ = headerPara.subSeq_;
382 return ERR_OK;
383 }
384
ProcessMidPacketRecv(const uint8_t * data,const uint32_t dataLen,const SessionDataHeader & headerPara)385 int32_t DataSenderReceiver::ProcessMidPacketRecv(const uint8_t* data,
386 const uint32_t dataLen, const SessionDataHeader& headerPara)
387 {
388 if (packBuffer_ == nullptr || !isWaiting_) {
389 HILOGE("recv mid data packet but buffer empty or end waiting");
390 ResetFlag();
391 return FLAG_TYPE_NOT_MATCH_BUFFER_STATE;
392 }
393 int32_t ret = WriteRecvBytesDataToBuffer(data, dataLen, headerPara);
394 if (ret != ERR_OK) {
395 ResetFlag();
396 return ret;
397 }
398 nowSeqNum_ = headerPara.seqNum_;
399 nowSubSeq_ = headerPara.subSeq_;
400 return ERR_OK;
401 }
402
ProcessEndPacketRecv(const uint8_t * data,const uint32_t dataLen,const SessionDataHeader & headerPara)403 int32_t DataSenderReceiver::ProcessEndPacketRecv(const uint8_t* data,
404 const uint32_t dataLen, const SessionDataHeader& headerPara)
405 {
406 if (packBuffer_ == nullptr || !isWaiting_) {
407 HILOGE("recv end data packet but buffer empty or end waiting");
408 ResetFlag();
409 return FLAG_TYPE_NOT_MATCH_BUFFER_STATE;
410 }
411 int32_t ret = WriteRecvBytesDataToBuffer(data, dataLen, headerPara);
412 if (ret != ERR_OK) {
413 ResetFlag();
414 return ret;
415 }
416 isWaiting_ = false;
417 return ret;
418 }
419
WriteRecvBytesDataToBuffer(const uint8_t * data,const uint32_t dataLen,const SessionDataHeader & headerPara)420 int32_t DataSenderReceiver::WriteRecvBytesDataToBuffer(const uint8_t* data,
421 const uint32_t dataLen, const SessionDataHeader& headerPara)
422 {
423 uint8_t* header = const_cast<uint8_t*>(data);
424 uint8_t* dataHeader = header + (headerPara.packetLen_ - headerPara.payloadLen_);
425 if (packBuffer_ == nullptr) {
426 packBuffer_ = std::make_unique<AVTransDataBuffer>(headerPara.totalLen_);
427 currentPos = packBuffer_->Data();
428 } else if (packBuffer_->Size() != headerPara.totalLen_) {
429 HILOGE("recv session header totalLen inconsistent");
430 return INVALID_SESSION_HEADER_TOTAL_LEN;
431 }
432
433 int32_t ret = memcpy_s(currentPos,
434 packBuffer_->Size() - (currentPos - packBuffer_->Data()),
435 dataHeader, headerPara.payloadLen_);
436 if (ret != EOK) {
437 HILOGE("write payload to buffer failed");
438 return WRITE_PAYLOAD_TO_BUFFER_FAILED;
439 }
440 currentPos += headerPara.payloadLen_;
441 return ret;
442 }
443
GetPacketedData()444 std::shared_ptr<AVTransDataBuffer> DataSenderReceiver::GetPacketedData()
445 {
446 if (isDataReady()) {
447 packBuffer_->SetRange(0, currentPos - packBuffer_->Data());
448 auto bytesData = std::shared_ptr<AVTransDataBuffer>(std::move(packBuffer_));
449 ResetFlag();
450 return bytesData;
451 }
452 return nullptr;
453 }
454
isDataReady()455 inline bool DataSenderReceiver::isDataReady()
456 {
457 return !isWaiting_ && packBuffer_ != nullptr;
458 }
459
ResetFlag()460 inline void DataSenderReceiver::ResetFlag()
461 {
462 isWaiting_ = false;
463 nowSeqNum_ = 0;
464 nowSubSeq_ = 0;
465 nowTotalLen_ = 0;
466 packBuffer_ = nullptr;
467 currentPos = nullptr;
468 }
469 } // namespace DistributedCollab
470 } // namespace OHOS