1 /*
2 * Copyright (C) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 //! hdctransfer
16 #![allow(missing_docs)]
17
18 use std::collections::VecDeque;
19
20 use crate::common::base::Base;
21 use crate::config::HdcCommand;
22 use crate::config::TaskMessage;
23 use crate::config::*;
24 use crate::serializer::native_struct::TransferConfig;
25 use crate::serializer::native_struct::TransferPayload;
26 use crate::serializer::serialize::Serialization;
27 use crate::transfer;
28 use std::fs::metadata;
29 use std::fs::OpenOptions;
30 use std::fs::{self, create_dir_all, File};
31 use std::io::{Read, Seek, Write};
32 use std::path::PathBuf;
33 use std::sync::Arc;
34 use ylong_runtime::sync::Mutex;
35 use ylong_runtime::task::JoinHandle;
36
37 extern "C" {
LZ4CompressTransfer( data: *const libc::c_char, dataCompress: *mut libc::c_char, data_size: i32, compressCapacity: i32, ) -> i3238 fn LZ4CompressTransfer(
39 data: *const libc::c_char,
40 dataCompress: *mut libc::c_char,
41 data_size: i32,
42 compressCapacity: i32,
43 ) -> i32;
LZ4DeompressTransfer( data: *const libc::c_char, dataDecompress: *mut libc::c_char, data_size: i32, decompressCapacity: i32, ) -> i3244 fn LZ4DeompressTransfer(
45 data: *const libc::c_char,
46 dataDecompress: *mut libc::c_char,
47 data_size: i32,
48 decompressCapacity: i32,
49 ) -> i32;
50 }
51
52 #[derive(Debug, Default, Clone, PartialEq, Eq)]
53 pub struct HdcTransferBase {
54 pub need_close_notify: bool,
55 pub io_index: u64,
56 pub last_error: u32,
57 pub is_io_finish: bool,
58 pub is_master: bool,
59 pub remote_path: String,
60 pub base_local_path: String,
61 pub local_path: String,
62 pub server_or_daemon: bool,
63 pub task_queue: Vec<String>,
64 pub local_name: String,
65 pub is_dir: bool,
66 pub file_size: u64,
67 pub dir_size: u64,
68 pub session_id: u32,
69 pub channel_id: u32,
70 pub index: u64,
71 pub file_cnt: u32,
72 pub is_file_mode_sync: bool,
73 pub file_begin_time: u64,
74 pub dir_begin_time: u64,
75
76 pub transfer_config: TransferConfig,
77 }
78
79 impl HdcTransferBase {
new(_session_id: u32, _channel_id: u32) -> Self80 pub fn new(_session_id: u32, _channel_id: u32) -> Self {
81 Self {
82 need_close_notify: false,
83 io_index: 0,
84 last_error: 0,
85 is_io_finish: false,
86 is_master: false,
87 remote_path: String::new(),
88 base_local_path: String::new(),
89 local_path: String::new(),
90 server_or_daemon: false,
91 task_queue: Vec::<String>::new(),
92 local_name: String::new(),
93 is_dir: false,
94 file_size: 0,
95 dir_size: 0,
96 session_id: _session_id,
97 channel_id: _channel_id,
98 index: 0,
99 file_cnt: 0,
100 is_file_mode_sync: false,
101 file_begin_time: 0,
102 dir_begin_time: 0,
103 transfer_config: TransferConfig::default(),
104 }
105 }
106 }
check_local_path( transfer: &mut HdcTransferBase, _local_path: &str, _optional_name: &str, _error: &mut str, ) -> bool107 pub fn check_local_path(
108 transfer: &mut HdcTransferBase,
109 _local_path: &str,
110 _optional_name: &str,
111 _error: &mut str,
112 ) -> bool {
113 let file = metadata(_local_path);
114 if let Ok(f) = file {
115 transfer.is_dir = f.is_dir();
116 if f.is_dir() && !transfer.local_path.ends_with(Base::get_path_sep()) {
117 transfer
118 .local_path
119 .push_str(Base::get_path_sep().to_string().as_str());
120 }
121 }
122 let mut op = _optional_name.replace('\\', Base::get_path_sep().to_string().as_str());
123 op = op.replace('/', Base::get_path_sep().to_string().as_str());
124
125 if op.contains(Base::get_path_sep()) && !transfer.local_path.ends_with(Base::get_path_sep()) {
126 transfer
127 .local_path
128 .push_str(Base::get_path_sep().to_string().as_str());
129 }
130
131 if transfer.local_path.ends_with(Base::get_path_sep()) {
132 transfer.local_path.push_str(op.as_str());
133 }
134 if transfer.local_path.ends_with(Base::get_path_sep()) {
135 create_dir_all(transfer.local_path.clone()).is_ok()
136 } else {
137 let last = transfer.local_path.rfind(Base::get_path_sep());
138 match last {
139 Some(index) => {
140 let result = create_dir_all(&transfer.local_path[0..index]);
141 if result.is_ok() {
142 File::create(transfer.local_path.clone()).is_ok()
143 } else {
144 false
145 }
146 }
147 None => File::create(transfer.local_path.clone()).is_ok(),
148 }
149 }
150 }
151
spawn_handler( _command_data: HdcCommand, index: usize, local_path: String, _channel_id_: u32, transfer_config: &TransferConfig, ) -> JoinHandle<(bool, TaskMessage)>152 fn spawn_handler(
153 _command_data: HdcCommand,
154 index: usize,
155 local_path: String,
156 _channel_id_: u32,
157 transfer_config: &TransferConfig,
158 ) -> JoinHandle<(bool, TaskMessage)> {
159 let thread_path_ref = Arc::new(Mutex::new(local_path));
160 let pos = (index * FILE_PACKAGE_PAYLOAD_SIZE) as u64;
161 let compress_type = transfer_config.compress_type;
162 ylong_runtime::spawn(async move {
163 let path = thread_path_ref.lock().await;
164 let mut file = File::open(&*path).unwrap();
165 let _ = file.seek(std::io::SeekFrom::Start(pos));
166 let mut total = Vec::from([0; FILE_PACKAGE_HEAD]);
167 let mut buf: [u8; FILE_PACKAGE_PAYLOAD_SIZE] = [0; FILE_PACKAGE_PAYLOAD_SIZE];
168 let mut data_buf: [u8; FILE_PACKAGE_PAYLOAD_SIZE] = [0; FILE_PACKAGE_PAYLOAD_SIZE];
169 let read_len = file.read(&mut buf[..]).unwrap();
170 let transfer_compress_type = match CompressType::try_from(compress_type) {
171 Ok(compress_type) => compress_type,
172 Err(_) => CompressType::None,
173 };
174
175 let mut header: TransferPayload = TransferPayload {
176 index: pos,
177 compress_type,
178 compress_size: 0,
179 uncompress_size: 0,
180 };
181 header.uncompress_size = read_len as u32;
182 let capacity = read_len as i32;
183
184 match transfer_compress_type {
185 CompressType::Lz4 => {
186 let compress_size: i32;
187 header.compress_type = CompressType::Lz4 as u8;
188 unsafe {
189 compress_size = LZ4CompressTransfer(
190 buf.as_ptr() as *const libc::c_char,
191 data_buf.as_ptr() as *mut libc::c_char,
192 capacity,
193 capacity,
194 );
195 }
196 if compress_size > 0 {
197 header.compress_size = compress_size as u32;
198 } else {
199 header.compress_type = CompressType::None as u8;
200 header.compress_size = read_len as u32;
201 data_buf = buf;
202 }
203 }
204 _ => {
205 header.compress_type = CompressType::None as u8;
206 header.compress_size = read_len as u32;
207 data_buf = buf;
208 }
209 }
210
211 let head_buffer = header.serialize();
212 total[..head_buffer.len()].copy_from_slice(&head_buffer[..]);
213 let data_len = header.compress_size as usize;
214 total.append(&mut data_buf[..data_len].to_vec());
215 let _data_message = TaskMessage {
216 channel_id: _channel_id_,
217 command: _command_data,
218 payload: total,
219 };
220 (read_len != FILE_PACKAGE_PAYLOAD_SIZE, _data_message)
221 })
222 }
223
read_and_send_data( local_path: &str, session_id: u32, _channel_id_: u32, _file_size: u64, _command_data: HdcCommand, transfer_config: &TransferConfig, ) -> bool224 pub async fn read_and_send_data(
225 local_path: &str,
226 session_id: u32,
227 _channel_id_: u32,
228 _file_size: u64,
229 _command_data: HdcCommand,
230 transfer_config: &TransferConfig,
231 ) -> bool {
232 const MAX_WORKER_COUNT: usize = 5;
233 let mut pieces_count = (_file_size / FILE_PACKAGE_PAYLOAD_SIZE as u64) as usize;
234 if pieces_count == 0 {
235 pieces_count = 1;
236 }
237 let workers_count = if pieces_count > MAX_WORKER_COUNT {
238 MAX_WORKER_COUNT
239 } else {
240 pieces_count
241 };
242 let mut index = 0;
243 let mut queue = VecDeque::new();
244 while index < workers_count {
245 let worker = spawn_handler(
246 _command_data,
247 index,
248 local_path.to_owned(),
249 _channel_id_,
250 transfer_config,
251 );
252 queue.push_back(worker);
253 index += 1;
254 }
255 loop {
256 if queue.is_empty() {
257 break;
258 }
259 let handler = queue.pop_front();
260 let handler = handler.unwrap();
261 let (is_finish, task_message) = handler.await.unwrap();
262 transfer::put(session_id, task_message).await;
263 if is_finish {
264 return false;
265 }
266
267 if ((index * FILE_PACKAGE_PAYLOAD_SIZE) as u64) < _file_size {
268 let worker = spawn_handler(
269 _command_data,
270 index,
271 local_path.to_owned(),
272 _channel_id_,
273 transfer_config,
274 );
275 queue.push_back(worker);
276 index += 1;
277 }
278 }
279 true
280 }
281
recv_and_write_file(tbase: &mut HdcTransferBase, _data: &[u8]) -> bool282 pub fn recv_and_write_file(tbase: &mut HdcTransferBase, _data: &[u8]) -> bool {
283 let mut header = TransferPayload {
284 ..Default::default()
285 };
286 let _ = header.parse(_data[..FILE_PACKAGE_HEAD].to_vec());
287 let file_index = header.index;
288 let mut buffer = _data[FILE_PACKAGE_HEAD..].to_vec();
289 let compress_type = match CompressType::try_from(tbase.transfer_config.compress_type) {
290 Ok(compress_type) => compress_type,
291 Err(_) => CompressType::None,
292 };
293
294 if let CompressType::Lz4 = compress_type {
295 let buf: [u8; FILE_PACKAGE_PAYLOAD_SIZE] = [0; FILE_PACKAGE_PAYLOAD_SIZE];
296 let decompress_size = unsafe {
297 LZ4DeompressTransfer(
298 _data[FILE_PACKAGE_HEAD..].as_ptr() as *const libc::c_char,
299 buf.as_ptr() as *mut libc::c_char,
300 header.compress_size as i32,
301 header.uncompress_size as i32,
302 )
303 };
304 if decompress_size > 0 {
305 buffer = buf[..(decompress_size as usize)].to_vec();
306 }
307 }
308
309 let path = tbase.local_path.clone();
310 let write_buf = buffer.clone();
311 ylong_runtime::spawn(async move {
312 let mut file = OpenOptions::new()
313 .write(true)
314 .create(true)
315 .open(path)
316 .unwrap();
317 let _ = file.seek(std::io::SeekFrom::Start(file_index));
318 file.write_all(write_buf.as_slice()).unwrap();
319 });
320
321 tbase.index += buffer.len() as u64;
322 if tbase.index >= tbase.file_size {
323 return true;
324 }
325 false
326 }
327
get_sub_files_resurively(_path: &String) -> Vec<String>328 pub fn get_sub_files_resurively(_path: &String) -> Vec<String> {
329 let mut result = Vec::new();
330 let dir_path = PathBuf::from(_path);
331 for entry in fs::read_dir(dir_path).unwrap() {
332 let path = entry.unwrap().path();
333 if path.is_file() {
334 result.push(path.display().to_string());
335 } else {
336 let p = path.display().to_string();
337 let mut sub_list = get_sub_files_resurively(&p);
338 result.append(&mut sub_list);
339 }
340 }
341 result.sort();
342 result
343 }
344
transfer_begin(transfer: &HdcTransferBase, _command_data: HdcCommand)345 pub async fn transfer_begin(transfer: &HdcTransferBase, _command_data: HdcCommand) {
346 let local_path_ = transfer.local_path.clone();
347
348 read_and_send_data(
349 &local_path_,
350 transfer.session_id,
351 transfer.channel_id,
352 transfer.file_size,
353 _command_data,
354 &transfer.transfer_config,
355 )
356 .await;
357 }
358
transfer_data(tbase: &mut HdcTransferBase, _payload: &[u8]) -> bool359 pub fn transfer_data(tbase: &mut HdcTransferBase, _payload: &[u8]) -> bool {
360 recv_and_write_file(tbase, _payload)
361 }
362
transfer_task_finish(channel_id: u32, _session_id: u32)363 pub async fn transfer_task_finish(channel_id: u32, _session_id: u32) {
364 let task_message = TaskMessage {
365 channel_id,
366 command: HdcCommand::KernelChannelClose,
367 payload: [1].to_vec(),
368 };
369 transfer::put(_session_id, task_message).await;
370 }
371
transfer_file_finish(channel_id: u32, _session_id: u32, comamnd_finish: HdcCommand)372 pub async fn transfer_file_finish(channel_id: u32, _session_id: u32, comamnd_finish: HdcCommand) {
373 let task_message = TaskMessage {
374 channel_id,
375 command: comamnd_finish,
376 payload: [1].to_vec(),
377 };
378 transfer::put(_session_id, task_message).await;
379 }
380
close_channel(channel_id: u32)381 pub async fn close_channel(channel_id: u32) {
382 transfer::TcpMap::end(channel_id).await;
383 }
384
echo_client(session_id: u32, channel_id: u32, payload: Vec<u8>)385 pub async fn echo_client(session_id: u32, channel_id: u32, payload: Vec<u8>) {
386 let echo_message = TaskMessage {
387 channel_id,
388 command: HdcCommand::KernelEchoRaw,
389 payload,
390 };
391 transfer::put(session_id, echo_message).await;
392 }
393