1 /*
2 * Copyright (C) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 //! hdctransfer
16 #![allow(missing_docs)]
17 use std::collections::VecDeque;
18 use std::fs::{self, File, OpenOptions, metadata};
19 use std::io::{Read, Seek, Write, Error};
20 use std::path::PathBuf;
21 use std::path::Path;
22 use std::os::unix::fs::PermissionsExt;
23 use std::sync::Arc;
24
25 use crate::common::base::Base;
26 use crate::common::hdcfile::FileTaskMap;
27 use crate::config::HdcCommand;
28 use crate::config::TaskMessage;
29 use crate::{config::*, utils};
30 use crate::serializer::native_struct::TransferConfig;
31 use crate::serializer::native_struct::TransferPayload;
32 use crate::serializer::serialize::Serialization;
33 use crate::transfer;
34 #[cfg(not(feature = "host"))]
35 use crate::utils::hdc_log::*;
36
37 use ylong_runtime::sync::Mutex;
38 use ylong_runtime::task::JoinHandle;
39
40 extern "C" {
LZ4CompressTransfer( data: *const libc::c_char, dataCompress: *mut libc::c_char, data_size: i32, compressCapacity: i32, ) -> i3241 fn LZ4CompressTransfer(
42 data: *const libc::c_char,
43 dataCompress: *mut libc::c_char,
44 data_size: i32,
45 compressCapacity: i32,
46 ) -> i32;
LZ4DeompressTransfer( data: *const libc::c_char, dataDecompress: *mut libc::c_char, data_size: i32, decompressCapacity: i32, ) -> i3247 fn LZ4DeompressTransfer(
48 data: *const libc::c_char,
49 dataDecompress: *mut libc::c_char,
50 data_size: i32,
51 decompressCapacity: i32,
52 ) -> i32;
53 }
54
55 #[derive(Debug, Default, Clone, PartialEq, Eq)]
56 pub struct HdcTransferBase {
57 pub need_close_notify: bool,
58 pub io_index: u64,
59 pub last_error: u32,
60 pub is_io_finish: bool,
61 pub is_master: bool,
62 pub remote_path: String,
63 pub base_local_path: String,
64 pub local_path: String,
65 pub server_or_daemon: bool,
66 pub task_queue: Vec<String>,
67 pub local_name: String,
68 pub is_dir: bool,
69 pub file_size: u64,
70 pub dir_size: u64,
71 pub session_id: u32,
72 pub channel_id: u32,
73 pub index: u64,
74 pub file_cnt: u32,
75 pub is_file_mode_sync: bool,
76 pub file_begin_time: u64,
77 pub dir_begin_time: u64,
78 pub is_local_dir_exsit: Option<bool>,
79 pub empty_dirs: String,
80 pub stop_run: bool,
81 pub command_str: String,
82
83 pub transfer_config: TransferConfig,
84 }
85
86 impl HdcTransferBase {
new(_session_id: u32, _channel_id: u32) -> Self87 pub fn new(_session_id: u32, _channel_id: u32) -> Self {
88 Self {
89 need_close_notify: false,
90 io_index: 0,
91 last_error: 0,
92 is_io_finish: false,
93 is_master: false,
94 remote_path: String::new(),
95 base_local_path: String::new(),
96 local_path: String::new(),
97 server_or_daemon: false,
98 task_queue: Vec::<String>::new(),
99 local_name: String::new(),
100 is_dir: false,
101 file_size: 0,
102 dir_size: 0,
103 session_id: _session_id,
104 channel_id: _channel_id,
105 index: 0,
106 file_cnt: 0,
107 is_file_mode_sync: false,
108 file_begin_time: 0,
109 dir_begin_time: 0,
110 is_local_dir_exsit: None,
111 empty_dirs: String::new(),
112 stop_run: false,
113 command_str: String::new(),
114 transfer_config: TransferConfig::default(),
115 }
116 }
117 }
118
set_file_permission(path: String, mode: u32) -> std::io::Result<()>119 fn set_file_permission(path: String, mode: u32) -> std::io::Result<()> {
120 let perms = std::fs::Permissions::from_mode(mode);
121 fs::set_permissions(std::path::Path::new(&path), perms)
122 }
123
set_dir_permissions_recursive(dir: &Path, mode: u32) -> std::io::Result<()>124 fn set_dir_permissions_recursive(dir: &Path, mode: u32) -> std::io::Result<()> {
125 let perms = std::fs::Permissions::from_mode(mode);
126 fs::set_permissions(dir, perms)?;
127
128 for entry in fs::read_dir(dir)? {
129 let entry = entry?;
130 let entry_path = dir.join(entry.file_name());
131 if entry_path.is_dir() {
132 set_dir_permissions_recursive(&entry_path, mode)?;
133 }
134 }
135 Ok(())
136 }
137
create_dir_all_with_permission(path: String, mode: u32) -> std::io::Result<()>138 fn create_dir_all_with_permission(path: String, mode: u32) -> std::io::Result<()> {
139 let mut dir_path = std::path::Path::new(&path);
140 while let Some(p) = dir_path.parent() {
141 if p.exists() {
142 break;
143 }
144 dir_path = p;
145 }
146 let exsit = dir_path.exists();
147 std::fs::create_dir_all(path.clone())?;
148 if !exsit {
149 set_dir_permissions_recursive(dir_path, mode)
150 } else {
151 Ok(())
152 }
153 }
154
check_local_path( transfer: &mut HdcTransferBase, _local_path: &str, _optional_name: &str, ) -> Result<bool, Error>155 pub fn check_local_path(
156 transfer: &mut HdcTransferBase,
157 _local_path: &str,
158 _optional_name: &str,
159 ) -> Result<bool, Error> {
160 crate::info!(
161 "check_local_path, local_path:{}, optional_name:{}",
162 _local_path,
163 _optional_name
164 );
165 let file = metadata(_local_path);
166 if let Ok(f) = file {
167 if transfer.is_local_dir_exsit.is_none() {
168 transfer.is_local_dir_exsit = Some(true);
169 }
170 transfer.is_dir = f.is_dir();
171 if f.is_dir() && !transfer.local_path.ends_with(Base::get_path_sep()) {
172 transfer
173 .local_path
174 .push_str(Base::get_path_sep().to_string().as_str());
175 }
176 } else if transfer.is_local_dir_exsit.is_none() {
177 transfer.is_local_dir_exsit = Some(false);
178 }
179 let mut op = _optional_name.replace('\\', Base::get_path_sep().to_string().as_str());
180 op = op.replace('/', Base::get_path_sep().to_string().as_str());
181
182 if op.contains(Base::get_path_sep()) && !transfer.local_path.ends_with(Base::get_path_sep()) {
183 transfer
184 .local_path
185 .push_str(Base::get_path_sep().to_string().as_str());
186 }
187
188 if transfer.local_path.ends_with(Base::get_path_sep()) {
189 let local_dir = transfer
190 .local_path
191 .clone()
192 .replace('/', Base::get_path_sep().to_string().as_str());
193
194 if let Some(false) = transfer.is_local_dir_exsit {
195 if op.contains(Base::get_path_sep()) {
196 let first_sep_index = op.find(Base::get_path_sep()).unwrap_or_default();
197 op = op.as_str()[first_sep_index..].to_string();
198 crate::debug!(
199 "check_local_path, combine 2 local_dir:{}, op:{}",
200 local_dir,
201 op
202 );
203 }
204 }
205
206 transfer.local_path = Base::combine(local_dir, op);
207 }
208 crate::debug!(
209 "check_local_path, final transfer.local_path:{}",
210 transfer.local_path
211 );
212 if transfer.local_path.ends_with(Base::get_path_sep()) {
213 match create_dir_all_with_permission(transfer.local_path.clone(), 0o750) {
214 Ok(_) => Ok(true),
215 Err(error) => {
216 crate::error!("dir create failed, error:{}", &error);
217 Err(error)
218 },
219 }
220 } else {
221 let last = transfer.local_path.rfind(Base::get_path_sep());
222 match last {
223 Some(index) => {
224 match create_dir_all_with_permission((transfer.local_path[0..index]).to_string(), 0o750) {
225 Ok(_) => {
226 match File::create(transfer.local_path.clone()) {
227 Ok(_) => {
228 #[cfg(not(target_os = "windows"))]
229 set_file_permission(transfer.local_path.clone(), 0o644)?;
230 Ok(true)
231 },
232 Err(error) => {
233 crate::error!("file create failed, error:{}", &error);
234 Err(error)
235 },
236 }
237 }
238 Err(error) => {
239 crate::error!("dir create failed, error:{}", &error);
240 Err(error)
241 },
242 }
243 }
244 None => {
245 match File::create(transfer.local_path.clone()) {
246 Ok(_) => {
247 #[cfg(not(target_os = "windows"))]
248 set_file_permission(transfer.local_path.clone(), 0o644)?;
249 Ok(true)
250 },
251 Err(error) => {
252 crate::error!("file create failed, error:{}", &error);
253 Err(error)
254 },
255 }
256 }
257 }
258 }
259 }
260
spawn_handler( _command_data: HdcCommand, index: usize, local_path: String, _channel_id_: u32, transfer_config: &TransferConfig, ) -> JoinHandle<(bool, TaskMessage)>261 fn spawn_handler(
262 _command_data: HdcCommand,
263 index: usize,
264 local_path: String,
265 _channel_id_: u32,
266 transfer_config: &TransferConfig,
267 ) -> JoinHandle<(bool, TaskMessage)> {
268 let thread_path_ref = Arc::new(Mutex::new(local_path));
269 let pos = (index as u64) * (FILE_PACKAGE_PAYLOAD_SIZE as u64);
270 let compress_type = transfer_config.compress_type;
271 let file_size = transfer_config.file_size as u64;
272 ylong_runtime::spawn(async move {
273 let path = thread_path_ref.lock().await;
274 let Ok(mut file) = File::open(&*path) else {
275 crate::debug!("open file failed, path:{}", *path);
276 let _data_message = TaskMessage {
277 channel_id: _channel_id_,
278 command: _command_data,
279 payload: Vec::new(),
280 };
281 return (false, _data_message);
282 };
283 let _ = file.seek(std::io::SeekFrom::Start(pos));
284 let mut total = Vec::from([0; FILE_PACKAGE_HEAD]);
285 let mut buf: [u8; FILE_PACKAGE_PAYLOAD_SIZE] = [0; FILE_PACKAGE_PAYLOAD_SIZE];
286 let mut data_buf: [u8; FILE_PACKAGE_PAYLOAD_SIZE] = [0; FILE_PACKAGE_PAYLOAD_SIZE];
287 let mut read_len = 0usize;
288 let mut package_read_len = (file_size - pos) as usize;
289 if package_read_len > FILE_PACKAGE_PAYLOAD_SIZE {
290 package_read_len = FILE_PACKAGE_PAYLOAD_SIZE;
291 }
292 while read_len < package_read_len {
293 let Ok(single_len) = file.read(&mut buf[read_len..]) else {
294 crate::debug!("file read failed, path:{}", *path);
295 break;
296 };
297 read_len += single_len;
298 if single_len == 0 && read_len < package_read_len {
299 break;
300 }
301 }
302 let transfer_compress_type = match CompressType::try_from(compress_type) {
303 Ok(compress_type) => compress_type,
304 Err(_) => CompressType::None,
305 };
306
307 let mut header: TransferPayload = TransferPayload {
308 index: pos,
309 compress_type,
310 compress_size: 0,
311 uncompress_size: 0,
312 };
313 header.uncompress_size = read_len as u32;
314 let capacity = read_len as i32;
315
316 match transfer_compress_type {
317 CompressType::Lz4 => {
318 let compress_size: i32;
319 header.compress_type = CompressType::Lz4 as u8;
320 unsafe {
321 compress_size = LZ4CompressTransfer(
322 buf.as_ptr() as *const libc::c_char,
323 data_buf.as_ptr() as *mut libc::c_char,
324 capacity,
325 capacity,
326 );
327 }
328 if compress_size > 0 {
329 header.compress_size = compress_size as u32;
330 } else {
331 header.compress_type = CompressType::None as u8;
332 header.compress_size = read_len as u32;
333 data_buf = buf;
334 }
335 }
336 _ => {
337 header.compress_type = CompressType::None as u8;
338 header.compress_size = read_len as u32;
339 data_buf = buf;
340 }
341 }
342
343 let head_buffer = header.serialize();
344 total[..head_buffer.len()].copy_from_slice(&head_buffer[..]);
345 let data_len = header.compress_size as usize;
346 total.append(&mut data_buf[..data_len].to_vec());
347 let _data_message = TaskMessage {
348 channel_id: _channel_id_,
349 command: _command_data,
350 payload: total,
351 };
352 (read_len != FILE_PACKAGE_PAYLOAD_SIZE, _data_message)
353 })
354 }
355
is_dir_link(path: String) -> bool356 fn is_dir_link(path: String) -> bool {
357 let ret = std::fs::read_link(path);
358 match ret {
359 Ok(p) => {
360 crate::debug!("link to file:{}", p.display().to_string());
361 p.exists() && p.is_dir()
362 }
363 Err(e) => {
364 crate::error!("read_link fail:{:#?}", e);
365 false
366 }
367 }
368 }
369
is_file_access(path: String) -> bool370 fn is_file_access(path: String) -> bool {
371 let file = metadata(path.clone());
372 match file {
373 Ok(f) => {
374 if !f.is_symlink() {
375 crate::debug!("file is not a link, path:{}", path);
376 return true;
377 }
378 }
379 Err(_e) => {
380 crate::error!("metadata file is error, path:{}", path);
381 return false;
382 }
383 }
384 let ret = std::fs::read_link(path);
385 match ret {
386 Ok(p) => {
387 crate::debug!("link to file:{}", p.display().to_string());
388 p.exists()
389 }
390 Err(e) => {
391 crate::error!("read_link fail:{:#?}", e);
392 false
393 }
394 }
395 }
396
read_and_send_data( local_path: &str, session_id: u32, _channel_id_: u32, _file_size: u64, _command_data: HdcCommand, transfer_config: &TransferConfig, ) -> bool397 pub async fn read_and_send_data(
398 local_path: &str,
399 session_id: u32,
400 _channel_id_: u32,
401 _file_size: u64,
402 _command_data: HdcCommand,
403 transfer_config: &TransferConfig,
404 ) -> bool {
405 const MAX_WORKER_COUNT: usize = 5;
406 let mut pieces_count = (_file_size / FILE_PACKAGE_PAYLOAD_SIZE as u64) as usize;
407 if pieces_count == 0 {
408 pieces_count = 1;
409 }
410 let workers_count = if pieces_count > MAX_WORKER_COUNT {
411 MAX_WORKER_COUNT
412 } else {
413 pieces_count
414 };
415 let mut index = 0;
416 let mut queue = VecDeque::new();
417 while index < workers_count {
418 let worker = spawn_handler(
419 _command_data,
420 index,
421 local_path.to_owned(),
422 _channel_id_,
423 transfer_config,
424 );
425 queue.push_back(worker);
426 index += 1;
427 }
428 loop {
429 if queue.is_empty() {
430 crate::debug!("read_and_send_data queue is empty");
431 break;
432 }
433 let Some(handler) = queue.pop_front() else {
434 continue;
435 };
436 let Ok((is_finish, task_message)) = handler.await else {
437 continue;
438 };
439 transfer::put(session_id, task_message).await;
440 if is_finish {
441 crate::debug!("read_and_send_data is finish return false");
442 return false;
443 }
444
445 if (index as u64) * (FILE_PACKAGE_PAYLOAD_SIZE as u64) < _file_size {
446 let worker = spawn_handler(
447 _command_data,
448 index,
449 local_path.to_owned(),
450 _channel_id_,
451 transfer_config,
452 );
453 queue.push_back(worker);
454 index += 1;
455 }
456 }
457 true
458 }
459
recv_and_write_file(tbase: &mut HdcTransferBase, _data: &[u8]) -> bool460 pub fn recv_and_write_file(tbase: &mut HdcTransferBase, _data: &[u8]) -> bool {
461 let mut header = TransferPayload {
462 ..Default::default()
463 };
464 let _ = header.parse(_data[..FILE_PACKAGE_HEAD].to_vec());
465 let file_index = header.index;
466 let mut buffer = _data[FILE_PACKAGE_HEAD..].to_vec();
467 let compress_type = match CompressType::try_from(tbase.transfer_config.compress_type) {
468 Ok(compress_type) => compress_type,
469 Err(_) => CompressType::None,
470 };
471
472 if let CompressType::Lz4 = compress_type {
473 let buf: [u8; FILE_PACKAGE_PAYLOAD_SIZE] = [0; FILE_PACKAGE_PAYLOAD_SIZE];
474 let decompress_size = unsafe {
475 LZ4DeompressTransfer(
476 _data[FILE_PACKAGE_HEAD..].as_ptr() as *const libc::c_char,
477 buf.as_ptr() as *mut libc::c_char,
478 header.compress_size as i32,
479 header.uncompress_size as i32,
480 )
481 };
482 if decompress_size > 0 {
483 buffer = buf[..(decompress_size as usize)].to_vec();
484 }
485 }
486
487 let path = tbase.local_path.clone();
488 let write_buf = buffer.clone();
489 let session_id = tbase.session_id.to_owned();
490 let channel_id = tbase.channel_id.to_owned();
491 utils::spawn(async move {
492 let open_result = OpenOptions::new()
493 .write(true)
494 .create(true)
495 .open(path.clone());
496 match open_result {
497 Ok(mut file) => {
498 let _ = file.seek(std::io::SeekFrom::Start(file_index));
499 let write_result = file.write_all(write_buf.as_slice());
500 match write_result {
501 Ok(()) => {}
502 Err(e) => {
503 let _ = put_last_error(e, session_id, channel_id).await;
504 }
505 }
506 }
507 Err(e) => {
508 let _ = put_last_error(e, session_id, channel_id).await;
509 }
510 }
511 });
512
513 tbase.index += buffer.len() as u64;
514 crate::debug!(
515 "transfer file [{}] index {} / {}",
516 tbase.local_path.clone(),
517 tbase.index,
518 tbase.file_size
519 );
520 if tbase.index >= tbase.file_size {
521 return true;
522 }
523 false
524 }
525
put_last_error(error: std::io::Error, session_id: u32, channel_id: u32) ->bool526 async fn put_last_error(error: std::io::Error, session_id: u32, channel_id: u32) ->bool {
527 crate::warn!(
528 "put_last_error sesssion_id:{}, channel_id:{}, error:{}",
529 session_id,
530 channel_id,
531 error,
532 );
533 let errno = match error.raw_os_error() {
534 Some(errno) => errno as u32,
535 None => std::i32::MAX as u32
536 };
537 match FileTaskMap::get(session_id, channel_id).await {
538 Some(task) => {
539 let mut task = task.lock().await;
540 task.transfer.last_error = errno;
541 }
542 None => {
543 crate::error!(
544 "recv_and_write_file get task is none session_id:{},channel_id:{}",
545 session_id,
546 channel_id,
547 );
548 return false;
549 }
550 }
551 true
552 }
553
get_sub_files_resurively(_path: &String) -> Vec<String>554 pub fn get_sub_files_resurively(_path: &String) -> Vec<String> {
555 let mut result = Vec::new();
556 let dir_path = PathBuf::from(_path);
557 if !is_file_access(_path.clone()) {
558 crate::error!("file is invalid link, path:{}", _path);
559 return result;
560 }
561 let Ok(dir_list) = fs::read_dir(dir_path) else {
562 crate::error!("read dir fail, path:{}", _path);
563 return result;
564 };
565 for entry in dir_list {
566 let Ok(path) = entry else {
567 continue;
568 };
569 let path = path.path();
570 if is_dir_link(path.clone().display().to_string()) {
571 continue;
572 } else if path.is_file() {
573 result.push(Base::normalized_path(path).display().to_string());
574 } else {
575 let p = path.display().to_string();
576 let mut sub_list = get_sub_files_resurively(&p);
577 result.append(&mut sub_list);
578 }
579 }
580 result.sort();
581 result
582 }
583
transfer_begin(transfer: &HdcTransferBase, _command_data: HdcCommand)584 pub async fn transfer_begin(transfer: &HdcTransferBase, _command_data: HdcCommand) {
585 let local_path_ = transfer.local_path.clone();
586
587 read_and_send_data(
588 &local_path_,
589 transfer.session_id,
590 transfer.channel_id,
591 transfer.file_size,
592 _command_data,
593 &transfer.transfer_config,
594 )
595 .await;
596 }
597
transfer_data(tbase: &mut HdcTransferBase, _payload: &[u8]) -> bool598 pub fn transfer_data(tbase: &mut HdcTransferBase, _payload: &[u8]) -> bool {
599 recv_and_write_file(tbase, _payload)
600 }
601
transfer_task_finish(channel_id: u32, _session_id: u32)602 pub async fn transfer_task_finish(channel_id: u32, _session_id: u32) {
603 let task_message = TaskMessage {
604 channel_id,
605 command: HdcCommand::KernelChannelClose,
606 payload: [1].to_vec(),
607 };
608 transfer::put(_session_id, task_message).await;
609 }
610
transfer_file_finish(channel_id: u32, _session_id: u32, comamnd_finish: HdcCommand)611 pub async fn transfer_file_finish(channel_id: u32, _session_id: u32, comamnd_finish: HdcCommand) {
612 let task_message = TaskMessage {
613 channel_id,
614 command: comamnd_finish,
615 payload: [1].to_vec(),
616 };
617 transfer::put(_session_id, task_message).await;
618 }
619
close_channel(channel_id: u32)620 pub async fn close_channel(channel_id: u32) {
621 transfer::TcpMap::end(channel_id).await;
622 }
623
echo_client(_session_id: u32, channel_id: u32, message: &str, level: MessageLevel)624 pub async fn echo_client(_session_id: u32, channel_id: u32, message: &str, level: MessageLevel) {
625 #[cfg(feature = "host")]
626 {
627 let echo_level = match level {
628 MessageLevel::Ok => transfer::EchoLevel::OK,
629 MessageLevel::Fail => transfer::EchoLevel::FAIL,
630 MessageLevel::Info => transfer::EchoLevel::INFO,
631 };
632 let _ =
633 transfer::send_channel_msg(channel_id, echo_level, message.to_string())
634 .await;
635 }
636 #[cfg(not(feature = "host"))]
637 {
638 let mut data = Vec::<u8>::new();
639 data.push(level as u8);
640 data.append(&mut message.as_bytes().to_vec());
641 let echo_message = TaskMessage {
642 channel_id,
643 command: HdcCommand::KernelEcho,
644 payload: data,
645 };
646 transfer::put(_session_id, echo_message).await;
647 }
648 }