• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 //! hdctransfer
16 #![allow(missing_docs)]
17 use std::collections::VecDeque;
18 use std::fs::{self, File, OpenOptions, metadata};
19 use std::io::{Read, Seek, Write, Error};
20 use std::path::PathBuf;
21 #[cfg(not(target_os = "windows"))]
22 use std::path::Path;
23 #[cfg(not(target_os = "windows"))]
24 use std::os::unix::fs::PermissionsExt;
25 use std::sync::Arc;
26 
27 use crate::common::base::Base;
28 use crate::common::hdcfile::FileTaskMap;
29 use crate::config::HdcCommand;
30 use crate::config::TaskMessage;
31 use crate::{config::*, utils};
32 use crate::serializer::native_struct::TransferConfig;
33 use crate::serializer::native_struct::TransferPayload;
34 use crate::serializer::serialize::Serialization;
35 use crate::transfer;
36 #[cfg(not(feature = "host"))]
37 use crate::utils::hdc_log::*;
38 #[cfg(feature = "host")]
39 extern crate ylong_runtime_static as ylong_runtime;
40 use ylong_runtime::sync::Mutex;
41 use ylong_runtime::task::JoinHandle;
42 
43 extern "C" {
LZ4CompressTransfer( data: *const libc::c_char, dataCompress: *mut libc::c_char, data_size: i32, compressCapacity: i32, ) -> i3244     fn LZ4CompressTransfer(
45         data: *const libc::c_char,
46         dataCompress: *mut libc::c_char,
47         data_size: i32,
48         compressCapacity: i32,
49     ) -> i32;
LZ4DeompressTransfer( data: *const libc::c_char, dataDecompress: *mut libc::c_char, data_size: i32, decompressCapacity: i32, ) -> i3250     fn LZ4DeompressTransfer(
51         data: *const libc::c_char,
52         dataDecompress: *mut libc::c_char,
53         data_size: i32,
54         decompressCapacity: i32,
55     ) -> i32;
56 }
57 
58 #[derive(Debug, Default, Clone, PartialEq, Eq)]
59 pub struct HdcTransferBase {
60     pub need_close_notify: bool,
61     pub io_index: u64,
62     pub last_error: u32,
63     pub is_io_finish: bool,
64     pub is_master: bool,
65     pub remote_path: String,
66     pub base_local_path: String,
67     pub local_path: String,
68     pub server_or_daemon: bool,
69     pub task_queue: Vec<String>,
70     pub local_name: String,
71     pub local_tar_raw_path: String,
72     pub is_dir: bool,
73     pub file_size: u64,
74     pub dir_size: u64,
75     pub session_id: u32,
76     pub channel_id: u32,
77     pub index: u64,
78     pub file_cnt: u32,
79     pub is_file_mode_sync: bool,
80     pub file_begin_time: u64,
81     pub dir_begin_time: u64,
82     pub is_local_dir_exsit: Option<bool>,
83     pub empty_dirs: String,
84     pub stop_run: bool,
85     pub command_str: String,
86 
87     pub transfer_config: TransferConfig,
88 }
89 
90 impl HdcTransferBase {
new(_session_id: u32, _channel_id: u32) -> Self91     pub fn new(_session_id: u32, _channel_id: u32) -> Self {
92         Self {
93             need_close_notify: false,
94             io_index: 0,
95             last_error: 0,
96             is_io_finish: false,
97             is_master: false,
98             remote_path: String::new(),
99             base_local_path: String::new(),
100             local_path: String::new(),
101             local_tar_raw_path: String::new(),
102             server_or_daemon: false,
103             task_queue: Vec::<String>::new(),
104             local_name: String::new(),
105             is_dir: false,
106             file_size: 0,
107             dir_size: 0,
108             session_id: _session_id,
109             channel_id: _channel_id,
110             index: 0,
111             file_cnt: 0,
112             is_file_mode_sync: false,
113             file_begin_time: 0,
114             dir_begin_time: 0,
115             is_local_dir_exsit: None,
116             empty_dirs: String::new(),
117             stop_run: false,
118             command_str: String::new(),
119             transfer_config: TransferConfig::default(),
120         }
121     }
122 }
123 
124 #[cfg(not(target_os = "windows"))]
set_file_permission(path: String, mode: u32) -> std::io::Result<()>125 fn set_file_permission(path: String, mode: u32) -> std::io::Result<()> {
126     let perms = std::fs::Permissions::from_mode(mode);
127     fs::set_permissions(std::path::Path::new(&path), perms)
128 }
129 
130 #[cfg(not(target_os = "windows"))]
set_dir_permissions_recursive(dir: &Path, mode: u32) -> std::io::Result<()>131 fn set_dir_permissions_recursive(dir: &Path, mode: u32) -> std::io::Result<()> {
132     let perms = std::fs::Permissions::from_mode(mode);
133     fs::set_permissions(dir, perms)?;
134 
135     for entry in fs::read_dir(dir)? {
136         let entry = entry?;
137         let entry_path = dir.join(entry.file_name());
138         if entry_path.is_dir() {
139             set_dir_permissions_recursive(&entry_path, mode)?;
140         }
141     }
142     Ok(())
143 }
144 
145 #[allow(unused)]
create_dir_all_with_permission(path: String, mode: u32) -> std::io::Result<()>146 fn create_dir_all_with_permission(path: String, mode: u32) -> std::io::Result<()> {
147     let mut dir_path = std::path::Path::new(&path);
148     while let Some(p) = dir_path.parent() {
149         if p.exists() {
150             break;
151         }
152         dir_path = p;
153     }
154     #[cfg(not(target_os = "windows"))]
155     let exsit = dir_path.exists();
156     std::fs::create_dir_all(path.clone())?;
157     #[cfg(not(target_os = "windows"))]
158     if !exsit {
159         set_dir_permissions_recursive(dir_path, mode)
160     } else {
161         Ok(())
162     }
163     #[cfg(target_os = "windows")]
164     Ok(())
165 }
166 
check_local_path( transfer: &mut HdcTransferBase, _local_path: &str, _optional_name: &str, ) -> Result<bool, Error>167 pub fn check_local_path(
168     transfer: &mut HdcTransferBase,
169     _local_path: &str,
170     _optional_name: &str,
171 ) -> Result<bool, Error> {
172     crate::info!(
173         "check_local_path, local_path:{}, optional_name:{}",
174         _local_path,
175         _optional_name
176     );
177     let file = metadata(_local_path);
178     if let Ok(f) = file {
179         if transfer.is_local_dir_exsit.is_none() {
180             transfer.is_local_dir_exsit = Some(true);
181         }
182         transfer.is_dir = f.is_dir();
183         if f.is_dir() && !transfer.local_path.ends_with(Base::get_path_sep()) {
184             transfer
185                 .local_path
186                 .push_str(Base::get_path_sep().to_string().as_str());
187         }
188     } else if transfer.is_local_dir_exsit.is_none() {
189         transfer.is_local_dir_exsit = Some(false);
190     }
191     let mut op = _optional_name.replace('\\', Base::get_path_sep().to_string().as_str());
192     op = op.replace('/', Base::get_path_sep().to_string().as_str());
193 
194     if op.contains(Base::get_path_sep()) && !transfer.local_path.ends_with(Base::get_path_sep()) {
195         transfer
196             .local_path
197             .push_str(Base::get_path_sep().to_string().as_str());
198     }
199 
200     if transfer.local_path.ends_with(Base::get_path_sep()) {
201         let local_dir = transfer
202             .local_path
203             .clone()
204             .replace('/', Base::get_path_sep().to_string().as_str());
205 
206         if let Some(false) = transfer.is_local_dir_exsit {
207             if op.contains(Base::get_path_sep()) {
208                 let first_sep_index = op.find(Base::get_path_sep()).unwrap_or_default();
209                 op = op.as_str()[first_sep_index..].to_string();
210                 crate::debug!(
211                     "check_local_path, combine 2 local_dir:{}, op:{}",
212                     local_dir,
213                     op
214                 );
215             }
216         }
217 
218         transfer.local_path = Base::combine(local_dir, op);
219     }
220     crate::debug!(
221         "check_local_path, final transfer.local_path:{}",
222         transfer.local_path
223     );
224     if transfer.local_path.ends_with(Base::get_path_sep()) {
225         match create_dir_all_with_permission(transfer.local_path.clone(), 0o750) {
226             Ok(_) => Ok(true),
227             Err(error) => {
228                 crate::error!("dir create failed, error:{}", &error);
229                 Err(error)
230             },
231         }
232     } else {
233         let last = transfer.local_path.rfind(Base::get_path_sep());
234         match last {
235             Some(index) => {
236                 match create_dir_all_with_permission((transfer.local_path[0..index]).to_string(), 0o750) {
237                     Ok(_) => {
238                         match File::create(transfer.local_path.clone()) {
239                     Ok(_) => {
240                         #[cfg(not(target_os = "windows"))]
241                         set_file_permission(transfer.local_path.clone(), 0o644)?;
242                         Ok(true)
243                             },
244                     Err(error) => {
245                         crate::error!("file create failed, error:{}", &error);
246                         Err(error)
247                             },
248                         }
249                     }
250                 Err(error) => {
251                     crate::error!("dir create failed, error:{}", &error);
252                     Err(error)
253                     },
254                 }
255             }
256             None => {
257                 match File::create(transfer.local_path.clone()) {
258                 Ok(_) => {
259                     #[cfg(not(target_os = "windows"))]
260                     set_file_permission(transfer.local_path.clone(), 0o644)?;
261                     Ok(true)
262                     },
263                 Err(error) => {
264                     crate::error!("file create failed, error:{}", &error);
265                     Err(error)
266                     },
267                 }
268             }
269         }
270     }
271 }
272 
spawn_handler( _command_data: HdcCommand, index: usize, local_path: String, _channel_id_: u32, transfer_config: &TransferConfig, ) -> JoinHandle<(bool, TaskMessage)>273 fn spawn_handler(
274     _command_data: HdcCommand,
275     index: usize,
276     local_path: String,
277     _channel_id_: u32,
278     transfer_config: &TransferConfig,
279 ) -> JoinHandle<(bool, TaskMessage)> {
280     let thread_path_ref = Arc::new(Mutex::new(local_path));
281     let pos = (index as u64) * (FILE_PACKAGE_PAYLOAD_SIZE as u64);
282     let compress_type = transfer_config.compress_type;
283     let file_size = transfer_config.file_size as u64;
284     ylong_runtime::spawn(async move {
285         let path = thread_path_ref.lock().await;
286         let Ok(mut file) = File::open(&*path) else {
287             crate::debug!("open file failed, path:{}", *path);
288             let _data_message = TaskMessage {
289                 channel_id: _channel_id_,
290                 command: _command_data,
291                 payload: Vec::new(),
292             };
293             return (false, _data_message);
294         };
295         let _ = file.seek(std::io::SeekFrom::Start(pos));
296         let mut total = Vec::from([0; FILE_PACKAGE_HEAD]);
297         let mut buf: [u8; FILE_PACKAGE_PAYLOAD_SIZE] = [0; FILE_PACKAGE_PAYLOAD_SIZE];
298         let mut data_buf: [u8; FILE_PACKAGE_PAYLOAD_SIZE] = [0; FILE_PACKAGE_PAYLOAD_SIZE];
299         let mut read_len = 0usize;
300         let mut package_read_len = (file_size - pos) as usize;
301         if package_read_len > FILE_PACKAGE_PAYLOAD_SIZE {
302             package_read_len = FILE_PACKAGE_PAYLOAD_SIZE;
303         }
304         while read_len < package_read_len {
305             let Ok(single_len) = file.read(&mut buf[read_len..]) else {
306                 crate::debug!("file read failed, path:{}", *path);
307                 break;
308             };
309             read_len += single_len;
310             if single_len == 0 && read_len < package_read_len {
311                 break;
312             }
313         }
314         let transfer_compress_type = match CompressType::try_from(compress_type) {
315             Ok(compress_type) => compress_type,
316             Err(_) => CompressType::None,
317         };
318 
319         let mut header: TransferPayload = TransferPayload {
320             index: pos,
321             compress_type,
322             compress_size: 0,
323             uncompress_size: 0,
324         };
325         header.uncompress_size = read_len as u32;
326         let capacity = read_len as i32;
327 
328         match transfer_compress_type {
329             CompressType::Lz4 => {
330                 let compress_size: i32;
331                 header.compress_type = CompressType::Lz4 as u8;
332                 unsafe {
333                     compress_size = LZ4CompressTransfer(
334                         buf.as_ptr() as *const libc::c_char,
335                         data_buf.as_ptr() as *mut libc::c_char,
336                         capacity,
337                         capacity,
338                     );
339                 }
340                 if compress_size > 0 {
341                     header.compress_size = compress_size as u32;
342                 } else {
343                     header.compress_type = CompressType::None as u8;
344                     header.compress_size = read_len as u32;
345                     data_buf = buf;
346                 }
347             }
348             _ => {
349                 header.compress_type = CompressType::None as u8;
350                 header.compress_size = read_len as u32;
351                 data_buf = buf;
352             }
353         }
354 
355         let head_buffer = header.serialize();
356         total[..head_buffer.len()].copy_from_slice(&head_buffer[..]);
357         let data_len = header.compress_size as usize;
358         total.append(&mut data_buf[..data_len].to_vec());
359         let _data_message = TaskMessage {
360             channel_id: _channel_id_,
361             command: _command_data,
362             payload: total,
363         };
364         (read_len != FILE_PACKAGE_PAYLOAD_SIZE, _data_message)
365     })
366 }
367 
is_dir_link(path: String) -> bool368 fn is_dir_link(path: String) -> bool {
369     let ret = std::fs::read_link(path);
370     match ret {
371         Ok(p) => {
372             crate::debug!("link to file:{}", p.display().to_string());
373             p.exists() && p.is_dir()
374         }
375         Err(e) => {
376             crate::error!("read_link fail:{:#?}", e);
377             false
378         }
379     }
380 }
381 
is_file_access(path: String) -> bool382 fn is_file_access(path: String) -> bool {
383     let file = metadata(path.clone());
384     match file {
385         Ok(f) => {
386             if !f.is_symlink() {
387                 crate::debug!("file is not a link, path:{}", path);
388                 return true;
389             }
390         }
391         Err(_e) => {
392             crate::error!("metadata file is error, path:{}", path);
393             return false;
394         }
395     }
396     let ret = std::fs::read_link(path);
397     match ret {
398         Ok(p) => {
399             crate::debug!("link to file:{}", p.display().to_string());
400             p.exists()
401         }
402         Err(e) => {
403             crate::error!("read_link fail:{:#?}", e);
404             false
405         }
406     }
407 }
408 
read_and_send_data( local_path: &str, session_id: u32, _channel_id_: u32, _file_size: u64, _command_data: HdcCommand, transfer_config: &TransferConfig, ) -> bool409 pub async fn read_and_send_data(
410     local_path: &str,
411     session_id: u32,
412     _channel_id_: u32,
413     _file_size: u64,
414     _command_data: HdcCommand,
415     transfer_config: &TransferConfig,
416 ) -> bool {
417     const MAX_WORKER_COUNT: usize = 5;
418     let mut pieces_count = (_file_size / FILE_PACKAGE_PAYLOAD_SIZE as u64) as usize;
419     if pieces_count == 0 {
420         pieces_count = 1;
421     }
422     let workers_count = if pieces_count > MAX_WORKER_COUNT {
423         MAX_WORKER_COUNT
424     } else {
425         pieces_count
426     };
427     let mut index = 0;
428     let mut queue = VecDeque::new();
429     while index < workers_count {
430         let worker = spawn_handler(
431             _command_data,
432             index,
433             local_path.to_owned(),
434             _channel_id_,
435             transfer_config,
436         );
437         queue.push_back(worker);
438         index += 1;
439     }
440     loop {
441         if queue.is_empty() {
442             crate::debug!("read_and_send_data queue is empty");
443             break;
444         }
445         let Some(handler) = queue.pop_front() else {
446             continue;
447         };
448         let Ok((is_finish, task_message)) = handler.await else {
449             continue;
450         };
451         transfer::put(session_id, task_message).await;
452         if is_finish {
453             crate::debug!("read_and_send_data is finish return false");
454             return false;
455         }
456 
457         if (index as u64) * (FILE_PACKAGE_PAYLOAD_SIZE as u64) < _file_size {
458             let worker = spawn_handler(
459                 _command_data,
460                 index,
461                 local_path.to_owned(),
462                 _channel_id_,
463                 transfer_config,
464             );
465             queue.push_back(worker);
466             index += 1;
467         }
468     }
469     true
470 }
471 
recv_and_write_file(tbase: &mut HdcTransferBase, _data: &[u8]) -> bool472 pub fn recv_and_write_file(tbase: &mut HdcTransferBase, _data: &[u8]) -> bool {
473     let mut header = TransferPayload {
474         ..Default::default()
475     };
476     let _ = header.parse(_data[..FILE_PACKAGE_HEAD].to_vec());
477     let file_index = header.index;
478     let mut buffer = _data[FILE_PACKAGE_HEAD..].to_vec();
479     let compress_type = match CompressType::try_from(tbase.transfer_config.compress_type) {
480         Ok(compress_type) => compress_type,
481         Err(_) => CompressType::None,
482     };
483 
484     if let CompressType::Lz4 = compress_type {
485         let buf: [u8; FILE_PACKAGE_PAYLOAD_SIZE] = [0; FILE_PACKAGE_PAYLOAD_SIZE];
486         let decompress_size = unsafe {
487             LZ4DeompressTransfer(
488                 _data[FILE_PACKAGE_HEAD..].as_ptr() as *const libc::c_char,
489                 buf.as_ptr() as *mut libc::c_char,
490                 header.compress_size as i32,
491                 header.uncompress_size as i32,
492             )
493         };
494         if decompress_size > 0 {
495             buffer = buf[..(decompress_size as usize)].to_vec();
496         }
497     }
498 
499     let path = tbase.local_path.clone();
500     let write_buf = buffer.clone();
501     let session_id = tbase.session_id.to_owned();
502     let channel_id = tbase.channel_id.to_owned();
503     utils::spawn(async move {
504         let open_result = OpenOptions::new()
505             .write(true)
506             .create(true)
507             .open(path.clone());
508         match open_result {
509             Ok(mut file) => {
510                 let _ = file.seek(std::io::SeekFrom::Start(file_index));
511                 let write_result = file.write_all(write_buf.as_slice());
512                 match write_result {
513                     Ok(()) => {}
514                     Err(e) => {
515                         let _ = put_last_error(e, session_id, channel_id).await;
516                     }
517                 }
518             }
519             Err(e) => {
520                 let _ = put_last_error(e, session_id, channel_id).await;
521             }
522         }
523     });
524 
525     tbase.index += buffer.len() as u64;
526     crate::debug!(
527         "transfer file [{}] index {} / {}",
528         tbase.local_path.clone(),
529         tbase.index,
530         tbase.file_size
531     );
532     if tbase.index >= tbase.file_size {
533         return true;
534     }
535     false
536 }
537 
put_last_error(error: std::io::Error, session_id: u32, channel_id: u32) ->bool538 async fn put_last_error(error: std::io::Error, session_id: u32, channel_id: u32) ->bool {
539     crate::warn!(
540         "put_last_error sesssion_id:{}, channel_id:{}, error:{}",
541         session_id,
542         channel_id,
543         error,
544     );
545     let errno = match error.raw_os_error() {
546         Some(errno) => errno as u32,
547         None => std::i32::MAX as u32
548     };
549     match FileTaskMap::get(session_id, channel_id).await {
550         Some(task) => {
551             let mut task = task.lock().await;
552             task.transfer.last_error = errno;
553         }
554         None => {
555             crate::error!(
556                 "recv_and_write_file get task is none session_id:{},channel_id:{}",
557                 session_id,
558                 channel_id,
559             );
560             return false;
561         }
562     }
563     true
564 }
565 
get_sub_files_resurively(_path: &String) -> Vec<String>566 pub fn get_sub_files_resurively(_path: &String) -> Vec<String> {
567     let mut result = Vec::new();
568     let dir_path = PathBuf::from(_path);
569     if !is_file_access(_path.clone()) {
570         crate::error!("file is invalid link, path:{}", _path);
571         return result;
572     }
573     let Ok(dir_list) = fs::read_dir(dir_path) else {
574         crate::error!("read dir fail, path:{}", _path);
575         return result;
576     };
577     for entry in dir_list {
578         let Ok(path) = entry else {
579             continue;
580         };
581         let path = path.path();
582         if is_dir_link(path.clone().display().to_string()) {
583             continue;
584         } else if path.is_file() {
585             result.push(Base::normalized_path(path).display().to_string());
586         } else {
587             let p = path.display().to_string();
588             let mut sub_list = get_sub_files_resurively(&p);
589             result.append(&mut sub_list);
590         }
591     }
592     result.sort();
593     result
594 }
595 
transfer_begin(transfer: &HdcTransferBase, _command_data: HdcCommand)596 pub async fn transfer_begin(transfer: &HdcTransferBase, _command_data: HdcCommand) {
597     let local_path_ = transfer.local_path.clone();
598 
599     read_and_send_data(
600         &local_path_,
601         transfer.session_id,
602         transfer.channel_id,
603         transfer.file_size,
604         _command_data,
605         &transfer.transfer_config,
606     )
607     .await;
608 }
609 
transfer_data(tbase: &mut HdcTransferBase, _payload: &[u8]) -> bool610 pub fn transfer_data(tbase: &mut HdcTransferBase, _payload: &[u8]) -> bool {
611     recv_and_write_file(tbase, _payload)
612 }
613 
transfer_task_finish(channel_id: u32, _session_id: u32)614 pub async fn transfer_task_finish(channel_id: u32, _session_id: u32) {
615     let task_message = TaskMessage {
616         channel_id,
617         command: HdcCommand::KernelChannelClose,
618         payload: [1].to_vec(),
619     };
620     transfer::put(_session_id, task_message).await;
621 }
622 
transfer_file_finish(channel_id: u32, _session_id: u32, comamnd_finish: HdcCommand)623 pub async fn transfer_file_finish(channel_id: u32, _session_id: u32, comamnd_finish: HdcCommand) {
624     let task_message = TaskMessage {
625         channel_id,
626         command: comamnd_finish,
627         payload: [1].to_vec(),
628     };
629     transfer::put(_session_id, task_message).await;
630 }
631 
close_channel(channel_id: u32)632 pub async fn close_channel(channel_id: u32) {
633     transfer::TcpMap::end(channel_id).await;
634 }
635 
echo_client(_session_id: u32, channel_id: u32, message: &str, level: MessageLevel)636 pub async fn echo_client(_session_id: u32, channel_id: u32, message: &str, level: MessageLevel) {
637     #[cfg(feature = "host")]
638     {
639         let echo_level = match level {
640             MessageLevel::Ok => transfer::EchoLevel::OK,
641             MessageLevel::Fail => transfer::EchoLevel::FAIL,
642             MessageLevel::Info => transfer::EchoLevel::INFO,
643         };
644         let _ =
645             transfer::send_channel_msg(channel_id, echo_level, message.to_string())
646                 .await;
647     }
648     #[cfg(not(feature = "host"))]
649     {
650         let mut data = Vec::<u8>::new();
651         data.push(level as u8);
652         data.append(&mut message.as_bytes().to_vec());
653         let echo_message = TaskMessage {
654             channel_id,
655             command: HdcCommand::KernelEcho,
656             payload: data,
657         };
658         transfer::put(_session_id, echo_message).await;
659     }
660 }