• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 //! hdctransfer
16 #![allow(missing_docs)]
17 use std::collections::VecDeque;
18 use std::fs::{self, File, OpenOptions, metadata};
19 use std::io::{Read, Seek, Write, Error};
20 use std::path::PathBuf;
21 use std::path::Path;
22 use std::os::unix::fs::PermissionsExt;
23 use std::sync::Arc;
24 
25 use crate::common::base::Base;
26 use crate::common::hdcfile::FileTaskMap;
27 use crate::config::HdcCommand;
28 use crate::config::TaskMessage;
29 use crate::{config::*, utils};
30 use crate::serializer::native_struct::TransferConfig;
31 use crate::serializer::native_struct::TransferPayload;
32 use crate::serializer::serialize::Serialization;
33 use crate::transfer;
34 #[cfg(not(feature = "host"))]
35 use crate::utils::hdc_log::*;
36 
37 use ylong_runtime::sync::Mutex;
38 use ylong_runtime::task::JoinHandle;
39 
40 extern "C" {
LZ4CompressTransfer( data: *const libc::c_char, dataCompress: *mut libc::c_char, data_size: i32, compressCapacity: i32, ) -> i3241     fn LZ4CompressTransfer(
42         data: *const libc::c_char,
43         dataCompress: *mut libc::c_char,
44         data_size: i32,
45         compressCapacity: i32,
46     ) -> i32;
LZ4DeompressTransfer( data: *const libc::c_char, dataDecompress: *mut libc::c_char, data_size: i32, decompressCapacity: i32, ) -> i3247     fn LZ4DeompressTransfer(
48         data: *const libc::c_char,
49         dataDecompress: *mut libc::c_char,
50         data_size: i32,
51         decompressCapacity: i32,
52     ) -> i32;
53 }
54 
55 #[derive(Debug, Default, Clone, PartialEq, Eq)]
56 pub struct HdcTransferBase {
57     pub need_close_notify: bool,
58     pub io_index: u64,
59     pub last_error: u32,
60     pub is_io_finish: bool,
61     pub is_master: bool,
62     pub remote_path: String,
63     pub base_local_path: String,
64     pub local_path: String,
65     pub server_or_daemon: bool,
66     pub task_queue: Vec<String>,
67     pub local_name: String,
68     pub is_dir: bool,
69     pub file_size: u64,
70     pub dir_size: u64,
71     pub session_id: u32,
72     pub channel_id: u32,
73     pub index: u64,
74     pub file_cnt: u32,
75     pub is_file_mode_sync: bool,
76     pub file_begin_time: u64,
77     pub dir_begin_time: u64,
78     pub is_local_dir_exsit: Option<bool>,
79     pub empty_dirs: String,
80     pub stop_run: bool,
81     pub command_str: String,
82 
83     pub transfer_config: TransferConfig,
84 }
85 
86 impl HdcTransferBase {
new(_session_id: u32, _channel_id: u32) -> Self87     pub fn new(_session_id: u32, _channel_id: u32) -> Self {
88         Self {
89             need_close_notify: false,
90             io_index: 0,
91             last_error: 0,
92             is_io_finish: false,
93             is_master: false,
94             remote_path: String::new(),
95             base_local_path: String::new(),
96             local_path: String::new(),
97             server_or_daemon: false,
98             task_queue: Vec::<String>::new(),
99             local_name: String::new(),
100             is_dir: false,
101             file_size: 0,
102             dir_size: 0,
103             session_id: _session_id,
104             channel_id: _channel_id,
105             index: 0,
106             file_cnt: 0,
107             is_file_mode_sync: false,
108             file_begin_time: 0,
109             dir_begin_time: 0,
110             is_local_dir_exsit: None,
111             empty_dirs: String::new(),
112             stop_run: false,
113             command_str: String::new(),
114             transfer_config: TransferConfig::default(),
115         }
116     }
117 }
118 
set_file_permission(path: String, mode: u32) -> std::io::Result<()>119 fn set_file_permission(path: String, mode: u32) -> std::io::Result<()> {
120     let perms = std::fs::Permissions::from_mode(mode);
121     fs::set_permissions(std::path::Path::new(&path), perms)
122 }
123 
set_dir_permissions_recursive(dir: &Path, mode: u32) -> std::io::Result<()>124 fn set_dir_permissions_recursive(dir: &Path, mode: u32) -> std::io::Result<()> {
125     let perms = std::fs::Permissions::from_mode(mode);
126     fs::set_permissions(dir, perms)?;
127 
128     for entry in fs::read_dir(dir)? {
129         let entry = entry?;
130         let entry_path = dir.join(entry.file_name());
131         if entry_path.is_dir() {
132             set_dir_permissions_recursive(&entry_path, mode)?;
133         }
134     }
135     Ok(())
136 }
137 
create_dir_all_with_permission(path: String, mode: u32) -> std::io::Result<()>138 fn create_dir_all_with_permission(path: String, mode: u32) -> std::io::Result<()> {
139     let mut dir_path = std::path::Path::new(&path);
140     while let Some(p) = dir_path.parent() {
141         if p.exists() {
142             break;
143         }
144         dir_path = p;
145     }
146     let exsit = dir_path.exists();
147     std::fs::create_dir_all(path.clone())?;
148     if !exsit {
149         set_dir_permissions_recursive(dir_path, mode)
150     } else {
151         Ok(())
152     }
153 }
154 
check_local_path( transfer: &mut HdcTransferBase, _local_path: &str, _optional_name: &str, ) -> Result<bool, Error>155 pub fn check_local_path(
156     transfer: &mut HdcTransferBase,
157     _local_path: &str,
158     _optional_name: &str,
159 ) -> Result<bool, Error> {
160     crate::info!(
161         "check_local_path, local_path:{}, optional_name:{}",
162         _local_path,
163         _optional_name
164     );
165     let file = metadata(_local_path);
166     if let Ok(f) = file {
167         if transfer.is_local_dir_exsit.is_none() {
168             transfer.is_local_dir_exsit = Some(true);
169         }
170         transfer.is_dir = f.is_dir();
171         if f.is_dir() && !transfer.local_path.ends_with(Base::get_path_sep()) {
172             transfer
173                 .local_path
174                 .push_str(Base::get_path_sep().to_string().as_str());
175         }
176     } else if transfer.is_local_dir_exsit.is_none() {
177         transfer.is_local_dir_exsit = Some(false);
178     }
179     let mut op = _optional_name.replace('\\', Base::get_path_sep().to_string().as_str());
180     op = op.replace('/', Base::get_path_sep().to_string().as_str());
181 
182     if op.contains(Base::get_path_sep()) && !transfer.local_path.ends_with(Base::get_path_sep()) {
183         transfer
184             .local_path
185             .push_str(Base::get_path_sep().to_string().as_str());
186     }
187 
188     if transfer.local_path.ends_with(Base::get_path_sep()) {
189         let local_dir = transfer
190             .local_path
191             .clone()
192             .replace('/', Base::get_path_sep().to_string().as_str());
193 
194         if let Some(false) = transfer.is_local_dir_exsit {
195             if op.contains(Base::get_path_sep()) {
196                 let first_sep_index = op.find(Base::get_path_sep()).unwrap_or_default();
197                 op = op.as_str()[first_sep_index..].to_string();
198                 crate::debug!(
199                     "check_local_path, combine 2 local_dir:{}, op:{}",
200                     local_dir,
201                     op
202                 );
203             }
204         }
205 
206         transfer.local_path = Base::combine(local_dir, op);
207     }
208     crate::debug!(
209         "check_local_path, final transfer.local_path:{}",
210         transfer.local_path
211     );
212     if transfer.local_path.ends_with(Base::get_path_sep()) {
213         match create_dir_all_with_permission(transfer.local_path.clone(), 0o750) {
214             Ok(_) => Ok(true),
215             Err(error) => {
216                 crate::error!("dir create failed, error:{}", &error);
217                 Err(error)
218             },
219         }
220     } else {
221         let last = transfer.local_path.rfind(Base::get_path_sep());
222         match last {
223             Some(index) => {
224                 match create_dir_all_with_permission((transfer.local_path[0..index]).to_string(), 0o750) {
225                     Ok(_) => {
226                         match File::create(transfer.local_path.clone()) {
227                     Ok(_) => {
228                         #[cfg(not(target_os = "windows"))]
229                         set_file_permission(transfer.local_path.clone(), 0o644)?;
230                         Ok(true)
231                             },
232                     Err(error) => {
233                         crate::error!("file create failed, error:{}", &error);
234                         Err(error)
235                             },
236                         }
237                     }
238                 Err(error) => {
239                     crate::error!("dir create failed, error:{}", &error);
240                     Err(error)
241                     },
242                 }
243             }
244             None => {
245                 match File::create(transfer.local_path.clone()) {
246                 Ok(_) => {
247                     #[cfg(not(target_os = "windows"))]
248                     set_file_permission(transfer.local_path.clone(), 0o644)?;
249                     Ok(true)
250                     },
251                 Err(error) => {
252                     crate::error!("file create failed, error:{}", &error);
253                     Err(error)
254                     },
255                 }
256             }
257         }
258     }
259 }
260 
spawn_handler( _command_data: HdcCommand, index: usize, local_path: String, _channel_id_: u32, transfer_config: &TransferConfig, ) -> JoinHandle<(bool, TaskMessage)>261 fn spawn_handler(
262     _command_data: HdcCommand,
263     index: usize,
264     local_path: String,
265     _channel_id_: u32,
266     transfer_config: &TransferConfig,
267 ) -> JoinHandle<(bool, TaskMessage)> {
268     let thread_path_ref = Arc::new(Mutex::new(local_path));
269     let pos = (index as u64) * (FILE_PACKAGE_PAYLOAD_SIZE as u64);
270     let compress_type = transfer_config.compress_type;
271     let file_size = transfer_config.file_size as u64;
272     ylong_runtime::spawn(async move {
273         let path = thread_path_ref.lock().await;
274         let Ok(mut file) = File::open(&*path) else {
275             crate::debug!("open file failed, path:{}", *path);
276             let _data_message = TaskMessage {
277                 channel_id: _channel_id_,
278                 command: _command_data,
279                 payload: Vec::new(),
280             };
281             return (false, _data_message);
282         };
283         let _ = file.seek(std::io::SeekFrom::Start(pos));
284         let mut total = Vec::from([0; FILE_PACKAGE_HEAD]);
285         let mut buf: [u8; FILE_PACKAGE_PAYLOAD_SIZE] = [0; FILE_PACKAGE_PAYLOAD_SIZE];
286         let mut data_buf: [u8; FILE_PACKAGE_PAYLOAD_SIZE] = [0; FILE_PACKAGE_PAYLOAD_SIZE];
287         let mut read_len = 0usize;
288         let mut package_read_len = (file_size - pos) as usize;
289         if package_read_len > FILE_PACKAGE_PAYLOAD_SIZE {
290             package_read_len = FILE_PACKAGE_PAYLOAD_SIZE;
291         }
292         while read_len < package_read_len {
293             let Ok(single_len) = file.read(&mut buf[read_len..]) else {
294                 crate::debug!("file read failed, path:{}", *path);
295                 break;
296             };
297             read_len += single_len;
298             if single_len == 0 && read_len < package_read_len {
299                 break;
300             }
301         }
302         let transfer_compress_type = match CompressType::try_from(compress_type) {
303             Ok(compress_type) => compress_type,
304             Err(_) => CompressType::None,
305         };
306 
307         let mut header: TransferPayload = TransferPayload {
308             index: pos,
309             compress_type,
310             compress_size: 0,
311             uncompress_size: 0,
312         };
313         header.uncompress_size = read_len as u32;
314         let capacity = read_len as i32;
315 
316         match transfer_compress_type {
317             CompressType::Lz4 => {
318                 let compress_size: i32;
319                 header.compress_type = CompressType::Lz4 as u8;
320                 unsafe {
321                     compress_size = LZ4CompressTransfer(
322                         buf.as_ptr() as *const libc::c_char,
323                         data_buf.as_ptr() as *mut libc::c_char,
324                         capacity,
325                         capacity,
326                     );
327                 }
328                 if compress_size > 0 {
329                     header.compress_size = compress_size as u32;
330                 } else {
331                     header.compress_type = CompressType::None as u8;
332                     header.compress_size = read_len as u32;
333                     data_buf = buf;
334                 }
335             }
336             _ => {
337                 header.compress_type = CompressType::None as u8;
338                 header.compress_size = read_len as u32;
339                 data_buf = buf;
340             }
341         }
342 
343         let head_buffer = header.serialize();
344         total[..head_buffer.len()].copy_from_slice(&head_buffer[..]);
345         let data_len = header.compress_size as usize;
346         total.append(&mut data_buf[..data_len].to_vec());
347         let _data_message = TaskMessage {
348             channel_id: _channel_id_,
349             command: _command_data,
350             payload: total,
351         };
352         (read_len != FILE_PACKAGE_PAYLOAD_SIZE, _data_message)
353     })
354 }
355 
is_dir_link(path: String) -> bool356 fn is_dir_link(path: String) -> bool {
357     let ret = std::fs::read_link(path);
358     match ret {
359         Ok(p) => {
360             crate::debug!("link to file:{}", p.display().to_string());
361             p.exists() && p.is_dir()
362         }
363         Err(e) => {
364             crate::error!("read_link fail:{:#?}", e);
365             false
366         }
367     }
368 }
369 
is_file_access(path: String) -> bool370 fn is_file_access(path: String) -> bool {
371     let file = metadata(path.clone());
372     match file {
373         Ok(f) => {
374             if !f.is_symlink() {
375                 crate::debug!("file is not a link, path:{}", path);
376                 return true;
377             }
378         }
379         Err(_e) => {
380             crate::error!("metadata file is error, path:{}", path);
381             return false;
382         }
383     }
384     let ret = std::fs::read_link(path);
385     match ret {
386         Ok(p) => {
387             crate::debug!("link to file:{}", p.display().to_string());
388             p.exists()
389         }
390         Err(e) => {
391             crate::error!("read_link fail:{:#?}", e);
392             false
393         }
394     }
395 }
396 
read_and_send_data( local_path: &str, session_id: u32, _channel_id_: u32, _file_size: u64, _command_data: HdcCommand, transfer_config: &TransferConfig, ) -> bool397 pub async fn read_and_send_data(
398     local_path: &str,
399     session_id: u32,
400     _channel_id_: u32,
401     _file_size: u64,
402     _command_data: HdcCommand,
403     transfer_config: &TransferConfig,
404 ) -> bool {
405     const MAX_WORKER_COUNT: usize = 5;
406     let mut pieces_count = (_file_size / FILE_PACKAGE_PAYLOAD_SIZE as u64) as usize;
407     if pieces_count == 0 {
408         pieces_count = 1;
409     }
410     let workers_count = if pieces_count > MAX_WORKER_COUNT {
411         MAX_WORKER_COUNT
412     } else {
413         pieces_count
414     };
415     let mut index = 0;
416     let mut queue = VecDeque::new();
417     while index < workers_count {
418         let worker = spawn_handler(
419             _command_data,
420             index,
421             local_path.to_owned(),
422             _channel_id_,
423             transfer_config,
424         );
425         queue.push_back(worker);
426         index += 1;
427     }
428     loop {
429         if queue.is_empty() {
430             crate::debug!("read_and_send_data queue is empty");
431             break;
432         }
433         let Some(handler) = queue.pop_front() else {
434             continue;
435         };
436         let Ok((is_finish, task_message)) = handler.await else {
437             continue;
438         };
439         transfer::put(session_id, task_message).await;
440         if is_finish {
441             crate::debug!("read_and_send_data is finish return false");
442             return false;
443         }
444 
445         if (index as u64) * (FILE_PACKAGE_PAYLOAD_SIZE as u64) < _file_size {
446             let worker = spawn_handler(
447                 _command_data,
448                 index,
449                 local_path.to_owned(),
450                 _channel_id_,
451                 transfer_config,
452             );
453             queue.push_back(worker);
454             index += 1;
455         }
456     }
457     true
458 }
459 
recv_and_write_file(tbase: &mut HdcTransferBase, _data: &[u8]) -> bool460 pub fn recv_and_write_file(tbase: &mut HdcTransferBase, _data: &[u8]) -> bool {
461     let mut header = TransferPayload {
462         ..Default::default()
463     };
464     let _ = header.parse(_data[..FILE_PACKAGE_HEAD].to_vec());
465     let file_index = header.index;
466     let mut buffer = _data[FILE_PACKAGE_HEAD..].to_vec();
467     let compress_type = match CompressType::try_from(tbase.transfer_config.compress_type) {
468         Ok(compress_type) => compress_type,
469         Err(_) => CompressType::None,
470     };
471 
472     if let CompressType::Lz4 = compress_type {
473         let buf: [u8; FILE_PACKAGE_PAYLOAD_SIZE] = [0; FILE_PACKAGE_PAYLOAD_SIZE];
474         let decompress_size = unsafe {
475             LZ4DeompressTransfer(
476                 _data[FILE_PACKAGE_HEAD..].as_ptr() as *const libc::c_char,
477                 buf.as_ptr() as *mut libc::c_char,
478                 header.compress_size as i32,
479                 header.uncompress_size as i32,
480             )
481         };
482         if decompress_size > 0 {
483             buffer = buf[..(decompress_size as usize)].to_vec();
484         }
485     }
486 
487     let path = tbase.local_path.clone();
488     let write_buf = buffer.clone();
489     let session_id = tbase.session_id.to_owned();
490     let channel_id = tbase.channel_id.to_owned();
491     utils::spawn(async move {
492         let open_result = OpenOptions::new()
493             .write(true)
494             .create(true)
495             .open(path.clone());
496         match open_result {
497             Ok(mut file) => {
498                 let _ = file.seek(std::io::SeekFrom::Start(file_index));
499                 let write_result = file.write_all(write_buf.as_slice());
500                 match write_result {
501                     Ok(()) => {}
502                     Err(e) => {
503                         let _ = put_last_error(e, session_id, channel_id).await;
504                     }
505                 }
506             }
507             Err(e) => {
508                 let _ = put_last_error(e, session_id, channel_id).await;
509             }
510         }
511     });
512 
513     tbase.index += buffer.len() as u64;
514     crate::debug!(
515         "transfer file [{}] index {} / {}",
516         tbase.local_path.clone(),
517         tbase.index,
518         tbase.file_size
519     );
520     if tbase.index >= tbase.file_size {
521         return true;
522     }
523     false
524 }
525 
put_last_error(error: std::io::Error, session_id: u32, channel_id: u32) ->bool526 async fn put_last_error(error: std::io::Error, session_id: u32, channel_id: u32) ->bool {
527     crate::warn!(
528         "put_last_error sesssion_id:{}, channel_id:{}, error:{}",
529         session_id,
530         channel_id,
531         error,
532     );
533     let errno = match error.raw_os_error() {
534         Some(errno) => errno as u32,
535         None => std::i32::MAX as u32
536     };
537     match FileTaskMap::get(session_id, channel_id).await {
538         Some(task) => {
539             let mut task = task.lock().await;
540             task.transfer.last_error = errno;
541         }
542         None => {
543             crate::error!(
544                 "recv_and_write_file get task is none session_id:{},channel_id:{}",
545                 session_id,
546                 channel_id,
547             );
548             return false;
549         }
550     }
551     true
552 }
553 
get_sub_files_resurively(_path: &String) -> Vec<String>554 pub fn get_sub_files_resurively(_path: &String) -> Vec<String> {
555     let mut result = Vec::new();
556     let dir_path = PathBuf::from(_path);
557     if !is_file_access(_path.clone()) {
558         crate::error!("file is invalid link, path:{}", _path);
559         return result;
560     }
561     let Ok(dir_list) = fs::read_dir(dir_path) else {
562         crate::error!("read dir fail, path:{}", _path);
563         return result;
564     };
565     for entry in dir_list {
566         let Ok(path) = entry else {
567             continue;
568         };
569         let path = path.path();
570         if is_dir_link(path.clone().display().to_string()) {
571             continue;
572         } else if path.is_file() {
573             result.push(Base::normalized_path(path).display().to_string());
574         } else {
575             let p = path.display().to_string();
576             let mut sub_list = get_sub_files_resurively(&p);
577             result.append(&mut sub_list);
578         }
579     }
580     result.sort();
581     result
582 }
583 
transfer_begin(transfer: &HdcTransferBase, _command_data: HdcCommand)584 pub async fn transfer_begin(transfer: &HdcTransferBase, _command_data: HdcCommand) {
585     let local_path_ = transfer.local_path.clone();
586 
587     read_and_send_data(
588         &local_path_,
589         transfer.session_id,
590         transfer.channel_id,
591         transfer.file_size,
592         _command_data,
593         &transfer.transfer_config,
594     )
595     .await;
596 }
597 
transfer_data(tbase: &mut HdcTransferBase, _payload: &[u8]) -> bool598 pub fn transfer_data(tbase: &mut HdcTransferBase, _payload: &[u8]) -> bool {
599     recv_and_write_file(tbase, _payload)
600 }
601 
transfer_task_finish(channel_id: u32, _session_id: u32)602 pub async fn transfer_task_finish(channel_id: u32, _session_id: u32) {
603     let task_message = TaskMessage {
604         channel_id,
605         command: HdcCommand::KernelChannelClose,
606         payload: [1].to_vec(),
607     };
608     transfer::put(_session_id, task_message).await;
609 }
610 
transfer_file_finish(channel_id: u32, _session_id: u32, comamnd_finish: HdcCommand)611 pub async fn transfer_file_finish(channel_id: u32, _session_id: u32, comamnd_finish: HdcCommand) {
612     let task_message = TaskMessage {
613         channel_id,
614         command: comamnd_finish,
615         payload: [1].to_vec(),
616     };
617     transfer::put(_session_id, task_message).await;
618 }
619 
close_channel(channel_id: u32)620 pub async fn close_channel(channel_id: u32) {
621     transfer::TcpMap::end(channel_id).await;
622 }
623 
echo_client(_session_id: u32, channel_id: u32, message: &str, level: MessageLevel)624 pub async fn echo_client(_session_id: u32, channel_id: u32, message: &str, level: MessageLevel) {
625     #[cfg(feature = "host")]
626     {
627         let echo_level = match level {
628             MessageLevel::Ok => transfer::EchoLevel::OK,
629             MessageLevel::Fail => transfer::EchoLevel::FAIL,
630             MessageLevel::Info => transfer::EchoLevel::INFO,
631         };
632         let _ =
633             transfer::send_channel_msg(channel_id, echo_level, message.to_string())
634                 .await;
635     }
636     #[cfg(not(feature = "host"))]
637     {
638         let mut data = Vec::<u8>::new();
639         data.push(level as u8);
640         data.append(&mut message.as_bytes().to_vec());
641         let echo_message = TaskMessage {
642             channel_id,
643             command: HdcCommand::KernelEcho,
644             payload: data,
645         };
646         transfer::put(_session_id, echo_message).await;
647     }
648 }