1 /*
2 * Copyright (C) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 //! hdctransfer
16 #![allow(missing_docs)]
17 use std::collections::VecDeque;
18 use std::fs::{self, File, OpenOptions, metadata};
19 use std::io::{Read, Seek, Write, Error};
20 use std::path::PathBuf;
21 #[cfg(not(target_os = "windows"))]
22 use std::path::Path;
23 #[cfg(not(target_os = "windows"))]
24 use std::os::unix::fs::PermissionsExt;
25 use std::sync::Arc;
26
27 use crate::common::base::Base;
28 use crate::common::hdcfile::FileTaskMap;
29 use crate::config::HdcCommand;
30 use crate::config::TaskMessage;
31 use crate::{config::*, utils};
32 use crate::serializer::native_struct::TransferConfig;
33 use crate::serializer::native_struct::TransferPayload;
34 use crate::serializer::serialize::Serialization;
35 use crate::transfer;
36 #[cfg(not(feature = "host"))]
37 use crate::utils::hdc_log::*;
38 #[cfg(feature = "host")]
39 extern crate ylong_runtime_static as ylong_runtime;
40 use ylong_runtime::sync::Mutex;
41 use ylong_runtime::task::JoinHandle;
42
43 extern "C" {
LZ4CompressTransfer( data: *const libc::c_char, dataCompress: *mut libc::c_char, data_size: i32, compressCapacity: i32, ) -> i3244 fn LZ4CompressTransfer(
45 data: *const libc::c_char,
46 dataCompress: *mut libc::c_char,
47 data_size: i32,
48 compressCapacity: i32,
49 ) -> i32;
LZ4DeompressTransfer( data: *const libc::c_char, dataDecompress: *mut libc::c_char, data_size: i32, decompressCapacity: i32, ) -> i3250 fn LZ4DeompressTransfer(
51 data: *const libc::c_char,
52 dataDecompress: *mut libc::c_char,
53 data_size: i32,
54 decompressCapacity: i32,
55 ) -> i32;
56 }
57
58 #[derive(Debug, Default, Clone, PartialEq, Eq)]
59 pub struct HdcTransferBase {
60 pub need_close_notify: bool,
61 pub io_index: u64,
62 pub last_error: u32,
63 pub is_io_finish: bool,
64 pub is_master: bool,
65 pub remote_path: String,
66 pub base_local_path: String,
67 pub local_path: String,
68 pub server_or_daemon: bool,
69 pub task_queue: Vec<String>,
70 pub local_name: String,
71 pub local_tar_raw_path: String,
72 pub is_dir: bool,
73 pub file_size: u64,
74 pub dir_size: u64,
75 pub session_id: u32,
76 pub channel_id: u32,
77 pub index: u64,
78 pub file_cnt: u32,
79 pub is_file_mode_sync: bool,
80 pub file_begin_time: u64,
81 pub dir_begin_time: u64,
82 pub is_local_dir_exsit: Option<bool>,
83 pub empty_dirs: String,
84 pub stop_run: bool,
85 pub command_str: String,
86
87 pub transfer_config: TransferConfig,
88 }
89
90 impl HdcTransferBase {
new(_session_id: u32, _channel_id: u32) -> Self91 pub fn new(_session_id: u32, _channel_id: u32) -> Self {
92 Self {
93 need_close_notify: false,
94 io_index: 0,
95 last_error: 0,
96 is_io_finish: false,
97 is_master: false,
98 remote_path: String::new(),
99 base_local_path: String::new(),
100 local_path: String::new(),
101 local_tar_raw_path: String::new(),
102 server_or_daemon: false,
103 task_queue: Vec::<String>::new(),
104 local_name: String::new(),
105 is_dir: false,
106 file_size: 0,
107 dir_size: 0,
108 session_id: _session_id,
109 channel_id: _channel_id,
110 index: 0,
111 file_cnt: 0,
112 is_file_mode_sync: false,
113 file_begin_time: 0,
114 dir_begin_time: 0,
115 is_local_dir_exsit: None,
116 empty_dirs: String::new(),
117 stop_run: false,
118 command_str: String::new(),
119 transfer_config: TransferConfig::default(),
120 }
121 }
122 }
123
124 #[cfg(not(target_os = "windows"))]
set_file_permission(path: String, mode: u32) -> std::io::Result<()>125 fn set_file_permission(path: String, mode: u32) -> std::io::Result<()> {
126 let perms = std::fs::Permissions::from_mode(mode);
127 fs::set_permissions(std::path::Path::new(&path), perms)
128 }
129
130 #[cfg(not(target_os = "windows"))]
set_dir_permissions_recursive(dir: &Path, mode: u32) -> std::io::Result<()>131 fn set_dir_permissions_recursive(dir: &Path, mode: u32) -> std::io::Result<()> {
132 let perms = std::fs::Permissions::from_mode(mode);
133 fs::set_permissions(dir, perms)?;
134
135 for entry in fs::read_dir(dir)? {
136 let entry = entry?;
137 let entry_path = dir.join(entry.file_name());
138 if entry_path.is_dir() {
139 set_dir_permissions_recursive(&entry_path, mode)?;
140 }
141 }
142 Ok(())
143 }
144
145 #[allow(unused)]
create_dir_all_with_permission(path: String, mode: u32) -> std::io::Result<()>146 fn create_dir_all_with_permission(path: String, mode: u32) -> std::io::Result<()> {
147 let mut dir_path = std::path::Path::new(&path);
148 while let Some(p) = dir_path.parent() {
149 if p.exists() {
150 break;
151 }
152 dir_path = p;
153 }
154 #[cfg(not(target_os = "windows"))]
155 let exsit = dir_path.exists();
156 std::fs::create_dir_all(path.clone())?;
157 #[cfg(not(target_os = "windows"))]
158 if !exsit {
159 set_dir_permissions_recursive(dir_path, mode)
160 } else {
161 Ok(())
162 }
163 #[cfg(target_os = "windows")]
164 Ok(())
165 }
166
check_local_path( transfer: &mut HdcTransferBase, _local_path: &str, _optional_name: &str, ) -> Result<bool, Error>167 pub fn check_local_path(
168 transfer: &mut HdcTransferBase,
169 _local_path: &str,
170 _optional_name: &str,
171 ) -> Result<bool, Error> {
172 crate::info!(
173 "check_local_path, local_path:{}, optional_name:{}",
174 _local_path,
175 _optional_name
176 );
177 let file = metadata(_local_path);
178 if let Ok(f) = file {
179 if transfer.is_local_dir_exsit.is_none() {
180 transfer.is_local_dir_exsit = Some(true);
181 }
182 transfer.is_dir = f.is_dir();
183 if f.is_dir() && !transfer.local_path.ends_with(Base::get_path_sep()) {
184 transfer
185 .local_path
186 .push_str(Base::get_path_sep().to_string().as_str());
187 }
188 } else if transfer.is_local_dir_exsit.is_none() {
189 transfer.is_local_dir_exsit = Some(false);
190 }
191 let mut op = _optional_name.replace('\\', Base::get_path_sep().to_string().as_str());
192 op = op.replace('/', Base::get_path_sep().to_string().as_str());
193
194 if op.contains(Base::get_path_sep()) && !transfer.local_path.ends_with(Base::get_path_sep()) {
195 transfer
196 .local_path
197 .push_str(Base::get_path_sep().to_string().as_str());
198 }
199
200 if transfer.local_path.ends_with(Base::get_path_sep()) {
201 let local_dir = transfer
202 .local_path
203 .clone()
204 .replace('/', Base::get_path_sep().to_string().as_str());
205
206 if let Some(false) = transfer.is_local_dir_exsit {
207 if op.contains(Base::get_path_sep()) {
208 let first_sep_index = op.find(Base::get_path_sep()).unwrap_or_default();
209 op = op.as_str()[first_sep_index..].to_string();
210 crate::debug!(
211 "check_local_path, combine 2 local_dir:{}, op:{}",
212 local_dir,
213 op
214 );
215 }
216 }
217
218 transfer.local_path = Base::combine(local_dir, op);
219 }
220 crate::debug!(
221 "check_local_path, final transfer.local_path:{}",
222 transfer.local_path
223 );
224 if transfer.local_path.ends_with(Base::get_path_sep()) {
225 match create_dir_all_with_permission(transfer.local_path.clone(), 0o750) {
226 Ok(_) => Ok(true),
227 Err(error) => {
228 crate::error!("dir create failed, error:{}", &error);
229 Err(error)
230 },
231 }
232 } else {
233 let last = transfer.local_path.rfind(Base::get_path_sep());
234 match last {
235 Some(index) => {
236 match create_dir_all_with_permission((transfer.local_path[0..index]).to_string(), 0o750) {
237 Ok(_) => {
238 match File::create(transfer.local_path.clone()) {
239 Ok(_) => {
240 #[cfg(not(target_os = "windows"))]
241 set_file_permission(transfer.local_path.clone(), 0o644)?;
242 Ok(true)
243 },
244 Err(error) => {
245 crate::error!("file create failed, error:{}", &error);
246 Err(error)
247 },
248 }
249 }
250 Err(error) => {
251 crate::error!("dir create failed, error:{}", &error);
252 Err(error)
253 },
254 }
255 }
256 None => {
257 match File::create(transfer.local_path.clone()) {
258 Ok(_) => {
259 #[cfg(not(target_os = "windows"))]
260 set_file_permission(transfer.local_path.clone(), 0o644)?;
261 Ok(true)
262 },
263 Err(error) => {
264 crate::error!("file create failed, error:{}", &error);
265 Err(error)
266 },
267 }
268 }
269 }
270 }
271 }
272
spawn_handler( _command_data: HdcCommand, index: usize, local_path: String, _channel_id_: u32, transfer_config: &TransferConfig, ) -> JoinHandle<(bool, TaskMessage)>273 fn spawn_handler(
274 _command_data: HdcCommand,
275 index: usize,
276 local_path: String,
277 _channel_id_: u32,
278 transfer_config: &TransferConfig,
279 ) -> JoinHandle<(bool, TaskMessage)> {
280 let thread_path_ref = Arc::new(Mutex::new(local_path));
281 let pos = (index as u64) * (FILE_PACKAGE_PAYLOAD_SIZE as u64);
282 let compress_type = transfer_config.compress_type;
283 let file_size = transfer_config.file_size as u64;
284 ylong_runtime::spawn(async move {
285 let path = thread_path_ref.lock().await;
286 let Ok(mut file) = File::open(&*path) else {
287 crate::debug!("open file failed, path:{}", *path);
288 let _data_message = TaskMessage {
289 channel_id: _channel_id_,
290 command: _command_data,
291 payload: Vec::new(),
292 };
293 return (false, _data_message);
294 };
295 let _ = file.seek(std::io::SeekFrom::Start(pos));
296 let mut total = Vec::from([0; FILE_PACKAGE_HEAD]);
297 let mut buf: [u8; FILE_PACKAGE_PAYLOAD_SIZE] = [0; FILE_PACKAGE_PAYLOAD_SIZE];
298 let mut data_buf: [u8; FILE_PACKAGE_PAYLOAD_SIZE] = [0; FILE_PACKAGE_PAYLOAD_SIZE];
299 let mut read_len = 0usize;
300 let mut package_read_len = (file_size - pos) as usize;
301 if package_read_len > FILE_PACKAGE_PAYLOAD_SIZE {
302 package_read_len = FILE_PACKAGE_PAYLOAD_SIZE;
303 }
304 while read_len < package_read_len {
305 let Ok(single_len) = file.read(&mut buf[read_len..]) else {
306 crate::debug!("file read failed, path:{}", *path);
307 break;
308 };
309 read_len += single_len;
310 if single_len == 0 && read_len < package_read_len {
311 break;
312 }
313 }
314 let transfer_compress_type = match CompressType::try_from(compress_type) {
315 Ok(compress_type) => compress_type,
316 Err(_) => CompressType::None,
317 };
318
319 let mut header: TransferPayload = TransferPayload {
320 index: pos,
321 compress_type,
322 compress_size: 0,
323 uncompress_size: 0,
324 };
325 header.uncompress_size = read_len as u32;
326 let capacity = read_len as i32;
327
328 match transfer_compress_type {
329 CompressType::Lz4 => {
330 let compress_size: i32;
331 header.compress_type = CompressType::Lz4 as u8;
332 unsafe {
333 compress_size = LZ4CompressTransfer(
334 buf.as_ptr() as *const libc::c_char,
335 data_buf.as_ptr() as *mut libc::c_char,
336 capacity,
337 capacity,
338 );
339 }
340 if compress_size > 0 {
341 header.compress_size = compress_size as u32;
342 } else {
343 header.compress_type = CompressType::None as u8;
344 header.compress_size = read_len as u32;
345 data_buf = buf;
346 }
347 }
348 _ => {
349 header.compress_type = CompressType::None as u8;
350 header.compress_size = read_len as u32;
351 data_buf = buf;
352 }
353 }
354
355 let head_buffer = header.serialize();
356 total[..head_buffer.len()].copy_from_slice(&head_buffer[..]);
357 let data_len = header.compress_size as usize;
358 total.append(&mut data_buf[..data_len].to_vec());
359 let _data_message = TaskMessage {
360 channel_id: _channel_id_,
361 command: _command_data,
362 payload: total,
363 };
364 (read_len != FILE_PACKAGE_PAYLOAD_SIZE, _data_message)
365 })
366 }
367
is_dir_link(path: String) -> bool368 fn is_dir_link(path: String) -> bool {
369 let ret = std::fs::read_link(path);
370 match ret {
371 Ok(p) => {
372 crate::debug!("link to file:{}", p.display().to_string());
373 p.exists() && p.is_dir()
374 }
375 Err(e) => {
376 crate::error!("read_link fail:{:#?}", e);
377 false
378 }
379 }
380 }
381
is_file_access(path: String) -> bool382 fn is_file_access(path: String) -> bool {
383 let file = metadata(path.clone());
384 match file {
385 Ok(f) => {
386 if !f.is_symlink() {
387 crate::debug!("file is not a link, path:{}", path);
388 return true;
389 }
390 }
391 Err(_e) => {
392 crate::error!("metadata file is error, path:{}", path);
393 return false;
394 }
395 }
396 let ret = std::fs::read_link(path);
397 match ret {
398 Ok(p) => {
399 crate::debug!("link to file:{}", p.display().to_string());
400 p.exists()
401 }
402 Err(e) => {
403 crate::error!("read_link fail:{:#?}", e);
404 false
405 }
406 }
407 }
408
read_and_send_data( local_path: &str, session_id: u32, _channel_id_: u32, _file_size: u64, _command_data: HdcCommand, transfer_config: &TransferConfig, ) -> bool409 pub async fn read_and_send_data(
410 local_path: &str,
411 session_id: u32,
412 _channel_id_: u32,
413 _file_size: u64,
414 _command_data: HdcCommand,
415 transfer_config: &TransferConfig,
416 ) -> bool {
417 const MAX_WORKER_COUNT: usize = 5;
418 let mut pieces_count = (_file_size / FILE_PACKAGE_PAYLOAD_SIZE as u64) as usize;
419 if pieces_count == 0 {
420 pieces_count = 1;
421 }
422 let workers_count = if pieces_count > MAX_WORKER_COUNT {
423 MAX_WORKER_COUNT
424 } else {
425 pieces_count
426 };
427 let mut index = 0;
428 let mut queue = VecDeque::new();
429 while index < workers_count {
430 let worker = spawn_handler(
431 _command_data,
432 index,
433 local_path.to_owned(),
434 _channel_id_,
435 transfer_config,
436 );
437 queue.push_back(worker);
438 index += 1;
439 }
440 loop {
441 if queue.is_empty() {
442 crate::debug!("read_and_send_data queue is empty");
443 break;
444 }
445 let Some(handler) = queue.pop_front() else {
446 continue;
447 };
448 let Ok((is_finish, task_message)) = handler.await else {
449 continue;
450 };
451 transfer::put(session_id, task_message).await;
452 if is_finish {
453 crate::debug!("read_and_send_data is finish return false");
454 return false;
455 }
456
457 if (index as u64) * (FILE_PACKAGE_PAYLOAD_SIZE as u64) < _file_size {
458 let worker = spawn_handler(
459 _command_data,
460 index,
461 local_path.to_owned(),
462 _channel_id_,
463 transfer_config,
464 );
465 queue.push_back(worker);
466 index += 1;
467 }
468 }
469 true
470 }
471
recv_and_write_file(tbase: &mut HdcTransferBase, _data: &[u8]) -> bool472 pub fn recv_and_write_file(tbase: &mut HdcTransferBase, _data: &[u8]) -> bool {
473 let mut header = TransferPayload {
474 ..Default::default()
475 };
476 let _ = header.parse(_data[..FILE_PACKAGE_HEAD].to_vec());
477 let file_index = header.index;
478 let mut buffer = _data[FILE_PACKAGE_HEAD..].to_vec();
479 let compress_type = match CompressType::try_from(tbase.transfer_config.compress_type) {
480 Ok(compress_type) => compress_type,
481 Err(_) => CompressType::None,
482 };
483
484 if let CompressType::Lz4 = compress_type {
485 let buf: [u8; FILE_PACKAGE_PAYLOAD_SIZE] = [0; FILE_PACKAGE_PAYLOAD_SIZE];
486 let decompress_size = unsafe {
487 LZ4DeompressTransfer(
488 _data[FILE_PACKAGE_HEAD..].as_ptr() as *const libc::c_char,
489 buf.as_ptr() as *mut libc::c_char,
490 header.compress_size as i32,
491 header.uncompress_size as i32,
492 )
493 };
494 if decompress_size > 0 {
495 buffer = buf[..(decompress_size as usize)].to_vec();
496 }
497 }
498
499 let path = tbase.local_path.clone();
500 let write_buf = buffer.clone();
501 let session_id = tbase.session_id.to_owned();
502 let channel_id = tbase.channel_id.to_owned();
503 utils::spawn(async move {
504 let open_result = OpenOptions::new()
505 .write(true)
506 .create(true)
507 .open(path.clone());
508 match open_result {
509 Ok(mut file) => {
510 let _ = file.seek(std::io::SeekFrom::Start(file_index));
511 let write_result = file.write_all(write_buf.as_slice());
512 match write_result {
513 Ok(()) => {}
514 Err(e) => {
515 let _ = put_last_error(e, session_id, channel_id).await;
516 }
517 }
518 }
519 Err(e) => {
520 let _ = put_last_error(e, session_id, channel_id).await;
521 }
522 }
523 });
524
525 tbase.index += buffer.len() as u64;
526 crate::debug!(
527 "transfer file [{}] index {} / {}",
528 tbase.local_path.clone(),
529 tbase.index,
530 tbase.file_size
531 );
532 if tbase.index >= tbase.file_size {
533 return true;
534 }
535 false
536 }
537
put_last_error(error: std::io::Error, session_id: u32, channel_id: u32) ->bool538 async fn put_last_error(error: std::io::Error, session_id: u32, channel_id: u32) ->bool {
539 crate::warn!(
540 "put_last_error sesssion_id:{}, channel_id:{}, error:{}",
541 session_id,
542 channel_id,
543 error,
544 );
545 let errno = match error.raw_os_error() {
546 Some(errno) => errno as u32,
547 None => std::i32::MAX as u32
548 };
549 match FileTaskMap::get(session_id, channel_id).await {
550 Some(task) => {
551 let mut task = task.lock().await;
552 task.transfer.last_error = errno;
553 }
554 None => {
555 crate::error!(
556 "recv_and_write_file get task is none session_id:{},channel_id:{}",
557 session_id,
558 channel_id,
559 );
560 return false;
561 }
562 }
563 true
564 }
565
get_sub_files_resurively(_path: &String) -> Vec<String>566 pub fn get_sub_files_resurively(_path: &String) -> Vec<String> {
567 let mut result = Vec::new();
568 let dir_path = PathBuf::from(_path);
569 if !is_file_access(_path.clone()) {
570 crate::error!("file is invalid link, path:{}", _path);
571 return result;
572 }
573 let Ok(dir_list) = fs::read_dir(dir_path) else {
574 crate::error!("read dir fail, path:{}", _path);
575 return result;
576 };
577 for entry in dir_list {
578 let Ok(path) = entry else {
579 continue;
580 };
581 let path = path.path();
582 if is_dir_link(path.clone().display().to_string()) {
583 continue;
584 } else if path.is_file() {
585 result.push(Base::normalized_path(path).display().to_string());
586 } else {
587 let p = path.display().to_string();
588 let mut sub_list = get_sub_files_resurively(&p);
589 result.append(&mut sub_list);
590 }
591 }
592 result.sort();
593 result
594 }
595
transfer_begin(transfer: &HdcTransferBase, _command_data: HdcCommand)596 pub async fn transfer_begin(transfer: &HdcTransferBase, _command_data: HdcCommand) {
597 let local_path_ = transfer.local_path.clone();
598
599 read_and_send_data(
600 &local_path_,
601 transfer.session_id,
602 transfer.channel_id,
603 transfer.file_size,
604 _command_data,
605 &transfer.transfer_config,
606 )
607 .await;
608 }
609
transfer_data(tbase: &mut HdcTransferBase, _payload: &[u8]) -> bool610 pub fn transfer_data(tbase: &mut HdcTransferBase, _payload: &[u8]) -> bool {
611 recv_and_write_file(tbase, _payload)
612 }
613
transfer_task_finish(channel_id: u32, _session_id: u32)614 pub async fn transfer_task_finish(channel_id: u32, _session_id: u32) {
615 let task_message = TaskMessage {
616 channel_id,
617 command: HdcCommand::KernelChannelClose,
618 payload: [1].to_vec(),
619 };
620 transfer::put(_session_id, task_message).await;
621 }
622
transfer_file_finish(channel_id: u32, _session_id: u32, comamnd_finish: HdcCommand)623 pub async fn transfer_file_finish(channel_id: u32, _session_id: u32, comamnd_finish: HdcCommand) {
624 let task_message = TaskMessage {
625 channel_id,
626 command: comamnd_finish,
627 payload: [1].to_vec(),
628 };
629 transfer::put(_session_id, task_message).await;
630 }
631
close_channel(channel_id: u32)632 pub async fn close_channel(channel_id: u32) {
633 transfer::TcpMap::end(channel_id).await;
634 }
635
echo_client(_session_id: u32, channel_id: u32, message: &str, level: MessageLevel)636 pub async fn echo_client(_session_id: u32, channel_id: u32, message: &str, level: MessageLevel) {
637 #[cfg(feature = "host")]
638 {
639 let echo_level = match level {
640 MessageLevel::Ok => transfer::EchoLevel::OK,
641 MessageLevel::Fail => transfer::EchoLevel::FAIL,
642 MessageLevel::Info => transfer::EchoLevel::INFO,
643 };
644 let _ =
645 transfer::send_channel_msg(channel_id, echo_level, message.to_string())
646 .await;
647 }
648 #[cfg(not(feature = "host"))]
649 {
650 let mut data = Vec::<u8>::new();
651 data.push(level as u8);
652 data.append(&mut message.as_bytes().to_vec());
653 let echo_message = TaskMessage {
654 channel_id,
655 command: HdcCommand::KernelEcho,
656 payload: data,
657 };
658 transfer::put(_session_id, echo_message).await;
659 }
660 }