• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 //! hdctransfer
16 #![allow(missing_docs)]
17 
18 use std::collections::VecDeque;
19 
20 use crate::common::base::Base;
21 use crate::config::HdcCommand;
22 use crate::config::TaskMessage;
23 use crate::config::*;
24 use crate::serializer::native_struct::TransferConfig;
25 use crate::serializer::native_struct::TransferPayload;
26 use crate::serializer::serialize::Serialization;
27 use crate::transfer;
28 use std::fs::metadata;
29 use std::fs::OpenOptions;
30 use std::fs::{self, create_dir_all, File};
31 use std::io::{Read, Seek, Write};
32 use std::path::PathBuf;
33 use std::sync::Arc;
34 use ylong_runtime::sync::Mutex;
35 use ylong_runtime::task::JoinHandle;
36 
37 extern "C" {
LZ4CompressTransfer( data: *const libc::c_char, dataCompress: *mut libc::c_char, data_size: i32, compressCapacity: i32, ) -> i3238     fn LZ4CompressTransfer(
39         data: *const libc::c_char,
40         dataCompress: *mut libc::c_char,
41         data_size: i32,
42         compressCapacity: i32,
43     ) -> i32;
LZ4DeompressTransfer( data: *const libc::c_char, dataDecompress: *mut libc::c_char, data_size: i32, decompressCapacity: i32, ) -> i3244     fn LZ4DeompressTransfer(
45         data: *const libc::c_char,
46         dataDecompress: *mut libc::c_char,
47         data_size: i32,
48         decompressCapacity: i32,
49     ) -> i32;
50 }
51 
52 #[derive(Debug, Default, Clone, PartialEq, Eq)]
53 pub struct HdcTransferBase {
54     pub need_close_notify: bool,
55     pub io_index: u64,
56     pub last_error: u32,
57     pub is_io_finish: bool,
58     pub is_master: bool,
59     pub remote_path: String,
60     pub base_local_path: String,
61     pub local_path: String,
62     pub server_or_daemon: bool,
63     pub task_queue: Vec<String>,
64     pub local_name: String,
65     pub is_dir: bool,
66     pub file_size: u64,
67     pub dir_size: u64,
68     pub session_id: u32,
69     pub channel_id: u32,
70     pub index: u64,
71     pub file_cnt: u32,
72     pub is_file_mode_sync: bool,
73     pub file_begin_time: u64,
74     pub dir_begin_time: u64,
75 
76     pub transfer_config: TransferConfig,
77 }
78 
79 impl HdcTransferBase {
new(_session_id: u32, _channel_id: u32) -> Self80     pub fn new(_session_id: u32, _channel_id: u32) -> Self {
81         Self {
82             need_close_notify: false,
83             io_index: 0,
84             last_error: 0,
85             is_io_finish: false,
86             is_master: false,
87             remote_path: String::new(),
88             base_local_path: String::new(),
89             local_path: String::new(),
90             server_or_daemon: false,
91             task_queue: Vec::<String>::new(),
92             local_name: String::new(),
93             is_dir: false,
94             file_size: 0,
95             dir_size: 0,
96             session_id: _session_id,
97             channel_id: _channel_id,
98             index: 0,
99             file_cnt: 0,
100             is_file_mode_sync: false,
101             file_begin_time: 0,
102             dir_begin_time: 0,
103             transfer_config: TransferConfig::default(),
104         }
105     }
106 }
check_local_path( transfer: &mut HdcTransferBase, _local_path: &str, _optional_name: &str, _error: &mut str, ) -> bool107 pub fn check_local_path(
108     transfer: &mut HdcTransferBase,
109     _local_path: &str,
110     _optional_name: &str,
111     _error: &mut str,
112 ) -> bool {
113     let file = metadata(_local_path);
114     if let Ok(f) = file {
115         transfer.is_dir = f.is_dir();
116         if f.is_dir() && !transfer.local_path.ends_with(Base::get_path_sep()) {
117             transfer
118                 .local_path
119                 .push_str(Base::get_path_sep().to_string().as_str());
120         }
121     }
122     let mut op = _optional_name.replace('\\', Base::get_path_sep().to_string().as_str());
123     op = op.replace('/', Base::get_path_sep().to_string().as_str());
124 
125     if op.contains(Base::get_path_sep()) && !transfer.local_path.ends_with(Base::get_path_sep()) {
126         transfer
127             .local_path
128             .push_str(Base::get_path_sep().to_string().as_str());
129     }
130 
131     if transfer.local_path.ends_with(Base::get_path_sep()) {
132         transfer.local_path.push_str(op.as_str());
133     }
134     if transfer.local_path.ends_with(Base::get_path_sep()) {
135         create_dir_all(transfer.local_path.clone()).is_ok()
136     } else {
137         let last = transfer.local_path.rfind(Base::get_path_sep());
138         match last {
139             Some(index) => {
140                 let result = create_dir_all(&transfer.local_path[0..index]);
141                 if result.is_ok() {
142                     File::create(transfer.local_path.clone()).is_ok()
143                 } else {
144                     false
145                 }
146             }
147             None => File::create(transfer.local_path.clone()).is_ok(),
148         }
149     }
150 }
151 
spawn_handler( _command_data: HdcCommand, index: usize, local_path: String, _channel_id_: u32, transfer_config: &TransferConfig, ) -> JoinHandle<(bool, TaskMessage)>152 fn spawn_handler(
153     _command_data: HdcCommand,
154     index: usize,
155     local_path: String,
156     _channel_id_: u32,
157     transfer_config: &TransferConfig,
158 ) -> JoinHandle<(bool, TaskMessage)> {
159     let thread_path_ref = Arc::new(Mutex::new(local_path));
160     let pos = (index * FILE_PACKAGE_PAYLOAD_SIZE) as u64;
161     let compress_type = transfer_config.compress_type;
162     ylong_runtime::spawn(async move {
163         let path = thread_path_ref.lock().await;
164         let mut file = File::open(&*path).unwrap();
165         let _ = file.seek(std::io::SeekFrom::Start(pos));
166         let mut total = Vec::from([0; FILE_PACKAGE_HEAD]);
167         let mut buf: [u8; FILE_PACKAGE_PAYLOAD_SIZE] = [0; FILE_PACKAGE_PAYLOAD_SIZE];
168         let mut data_buf: [u8; FILE_PACKAGE_PAYLOAD_SIZE] = [0; FILE_PACKAGE_PAYLOAD_SIZE];
169         let read_len = file.read(&mut buf[..]).unwrap();
170         let transfer_compress_type = match CompressType::try_from(compress_type) {
171             Ok(compress_type) => compress_type,
172             Err(_) => CompressType::None,
173         };
174 
175         let mut header: TransferPayload = TransferPayload {
176             index: pos,
177             compress_type,
178             compress_size: 0,
179             uncompress_size: 0,
180         };
181         header.uncompress_size = read_len as u32;
182         let capacity = read_len as i32;
183 
184         match transfer_compress_type {
185             CompressType::Lz4 => {
186                 let compress_size: i32;
187                 header.compress_type = CompressType::Lz4 as u8;
188                 unsafe {
189                     compress_size = LZ4CompressTransfer(
190                         buf.as_ptr() as *const libc::c_char,
191                         data_buf.as_ptr() as *mut libc::c_char,
192                         capacity,
193                         capacity,
194                     );
195                 }
196                 if compress_size > 0 {
197                     header.compress_size = compress_size as u32;
198                 } else {
199                     header.compress_type = CompressType::None as u8;
200                     header.compress_size = read_len as u32;
201                     data_buf = buf;
202                 }
203             }
204             _ => {
205                 header.compress_type = CompressType::None as u8;
206                 header.compress_size = read_len as u32;
207                 data_buf = buf;
208             }
209         }
210 
211         let head_buffer = header.serialize();
212         total[..head_buffer.len()].copy_from_slice(&head_buffer[..]);
213         let data_len = header.compress_size as usize;
214         total.append(&mut data_buf[..data_len].to_vec());
215         let _data_message = TaskMessage {
216             channel_id: _channel_id_,
217             command: _command_data,
218             payload: total,
219         };
220         (read_len != FILE_PACKAGE_PAYLOAD_SIZE, _data_message)
221     })
222 }
223 
read_and_send_data( local_path: &str, session_id: u32, _channel_id_: u32, _file_size: u64, _command_data: HdcCommand, transfer_config: &TransferConfig, ) -> bool224 pub async fn read_and_send_data(
225     local_path: &str,
226     session_id: u32,
227     _channel_id_: u32,
228     _file_size: u64,
229     _command_data: HdcCommand,
230     transfer_config: &TransferConfig,
231 ) -> bool {
232     const MAX_WORKER_COUNT: usize = 5;
233     let mut pieces_count = (_file_size / FILE_PACKAGE_PAYLOAD_SIZE as u64) as usize;
234     if pieces_count == 0 {
235         pieces_count = 1;
236     }
237     let workers_count = if pieces_count > MAX_WORKER_COUNT {
238         MAX_WORKER_COUNT
239     } else {
240         pieces_count
241     };
242     let mut index = 0;
243     let mut queue = VecDeque::new();
244     while index < workers_count {
245         let worker = spawn_handler(
246             _command_data,
247             index,
248             local_path.to_owned(),
249             _channel_id_,
250             transfer_config,
251         );
252         queue.push_back(worker);
253         index += 1;
254     }
255     loop {
256         if queue.is_empty() {
257             break;
258         }
259         let handler = queue.pop_front();
260         let handler = handler.unwrap();
261         let (is_finish, task_message) = handler.await.unwrap();
262         transfer::put(session_id, task_message).await;
263         if is_finish {
264             return false;
265         }
266 
267         if ((index * FILE_PACKAGE_PAYLOAD_SIZE) as u64) < _file_size {
268             let worker = spawn_handler(
269                 _command_data,
270                 index,
271                 local_path.to_owned(),
272                 _channel_id_,
273                 transfer_config,
274             );
275             queue.push_back(worker);
276             index += 1;
277         }
278     }
279     true
280 }
281 
recv_and_write_file(tbase: &mut HdcTransferBase, _data: &[u8]) -> bool282 pub fn recv_and_write_file(tbase: &mut HdcTransferBase, _data: &[u8]) -> bool {
283     let mut header = TransferPayload {
284         ..Default::default()
285     };
286     let _ = header.parse(_data[..FILE_PACKAGE_HEAD].to_vec());
287     let file_index = header.index;
288     let mut buffer = _data[FILE_PACKAGE_HEAD..].to_vec();
289     let compress_type = match CompressType::try_from(tbase.transfer_config.compress_type) {
290         Ok(compress_type) => compress_type,
291         Err(_) => CompressType::None,
292     };
293 
294     if let CompressType::Lz4 = compress_type {
295         let buf: [u8; FILE_PACKAGE_PAYLOAD_SIZE] = [0; FILE_PACKAGE_PAYLOAD_SIZE];
296         let decompress_size = unsafe {
297             LZ4DeompressTransfer(
298                 _data[FILE_PACKAGE_HEAD..].as_ptr() as *const libc::c_char,
299                 buf.as_ptr() as *mut libc::c_char,
300                 header.compress_size as i32,
301                 header.uncompress_size as i32,
302             )
303         };
304         if decompress_size > 0 {
305             buffer = buf[..(decompress_size as usize)].to_vec();
306         }
307     }
308 
309     let path = tbase.local_path.clone();
310     let write_buf = buffer.clone();
311     ylong_runtime::spawn(async move {
312         let mut file = OpenOptions::new()
313             .write(true)
314             .create(true)
315             .open(path)
316             .unwrap();
317         let _ = file.seek(std::io::SeekFrom::Start(file_index));
318         file.write_all(write_buf.as_slice()).unwrap();
319     });
320 
321     tbase.index += buffer.len() as u64;
322     if tbase.index >= tbase.file_size {
323         return true;
324     }
325     false
326 }
327 
get_sub_files_resurively(_path: &String) -> Vec<String>328 pub fn get_sub_files_resurively(_path: &String) -> Vec<String> {
329     let mut result = Vec::new();
330     let dir_path = PathBuf::from(_path);
331     for entry in fs::read_dir(dir_path).unwrap() {
332         let path = entry.unwrap().path();
333         if path.is_file() {
334             result.push(path.display().to_string());
335         } else {
336             let p = path.display().to_string();
337             let mut sub_list = get_sub_files_resurively(&p);
338             result.append(&mut sub_list);
339         }
340     }
341     result.sort();
342     result
343 }
344 
transfer_begin(transfer: &HdcTransferBase, _command_data: HdcCommand)345 pub async fn transfer_begin(transfer: &HdcTransferBase, _command_data: HdcCommand) {
346     let local_path_ = transfer.local_path.clone();
347 
348     read_and_send_data(
349         &local_path_,
350         transfer.session_id,
351         transfer.channel_id,
352         transfer.file_size,
353         _command_data,
354         &transfer.transfer_config,
355     )
356     .await;
357 }
358 
transfer_data(tbase: &mut HdcTransferBase, _payload: &[u8]) -> bool359 pub fn transfer_data(tbase: &mut HdcTransferBase, _payload: &[u8]) -> bool {
360     recv_and_write_file(tbase, _payload)
361 }
362 
transfer_task_finish(channel_id: u32, _session_id: u32)363 pub async fn transfer_task_finish(channel_id: u32, _session_id: u32) {
364     let task_message = TaskMessage {
365         channel_id,
366         command: HdcCommand::KernelChannelClose,
367         payload: [1].to_vec(),
368     };
369     transfer::put(_session_id, task_message).await;
370 }
371 
transfer_file_finish(channel_id: u32, _session_id: u32, comamnd_finish: HdcCommand)372 pub async fn transfer_file_finish(channel_id: u32, _session_id: u32, comamnd_finish: HdcCommand) {
373     let task_message = TaskMessage {
374         channel_id,
375         command: comamnd_finish,
376         payload: [1].to_vec(),
377     };
378     transfer::put(_session_id, task_message).await;
379 }
380 
close_channel(channel_id: u32)381 pub async fn close_channel(channel_id: u32) {
382     transfer::TcpMap::end(channel_id).await;
383 }
384 
echo_client(session_id: u32, channel_id: u32, payload: Vec<u8>)385 pub async fn echo_client(session_id: u32, channel_id: u32, payload: Vec<u8>) {
386     let echo_message = TaskMessage {
387         channel_id,
388         command: HdcCommand::KernelEchoRaw,
389         payload,
390     };
391     transfer::put(session_id, echo_message).await;
392 }
393