1 /** 2 * Copyright 2020-2023 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef MINDSPORE_CCSRC_INCLUDE_COMMON_DUPLEX_PIPE_H_ 18 #define MINDSPORE_CCSRC_INCLUDE_COMMON_DUPLEX_PIPE_H_ 19 20 #include <csignal> 21 #include <string> 22 #include <memory> 23 #include <initializer_list> 24 #include <functional> 25 26 #include "utils/log_adapter.h" 27 #include "include/common/visible.h" 28 29 #define DP_INFO MS_LOG(INFO) << "[DuplexPipe] " 30 #define DP_WARNING MS_LOG(WARNING) << "[DuplexPipe] " 31 #define DP_ERROR MS_LOG(ERROR) << "[DuplexPipe] " 32 #define DP_EXCEPTION MS_LOG(EXCEPTION) << "[DuplexPipe] " 33 34 namespace mindspore { 35 // A tool to run a command as child process and build a duplex pipe between them. 36 // Similar to 'popen()', but use duplex not simplex pipe, more like 'socketpair'. 37 class COMMON_EXPORT DuplexPipe : public std::enable_shared_from_this<mindspore::DuplexPipe> { 38 public: 39 constexpr inline static int kBufferSize = 4096; 40 constexpr inline static unsigned int kTimeOutSeconds = 5; 41 42 DuplexPipe() = default; 43 virtual ~DuplexPipe(); 44 45 // Create a subprocess and open a duplex pipe between local and remote 46 int Open(const std::initializer_list<std::string> &arg_list, bool append_fds = false); 47 void Close() noexcept; SetTimeOutSeconds(unsigned int secs)48 void SetTimeOutSeconds(unsigned int secs) { time_out_secs_ = secs; } SetTimeOutCallback(const std::shared_ptr<std::function<void ()>> & cb)49 void SetTimeOutCallback(const std::shared_ptr<std::function<void()>> &cb) { time_out_callback_ = cb; } SetFinalizeCallback(const std::shared_ptr<std::function<void ()>> & cb)50 void SetFinalizeCallback(const std::shared_ptr<std::function<void()>> &cb) { finalize_callback_ = cb; } 51 52 // Write the 'buf' to remote stdin 53 void Write(const std::string &buf, bool flush = true) const; 54 // Read from remote stdout/stderr into 'c_buf_' 55 std::string Read(); 56 57 void WriteWithStdout(const std::string &buf, bool flush); 58 std::string ReadWithStdin(); 59 60 const DuplexPipe &operator<<(const std::string &buf) const; 61 DuplexPipe &operator>>(std::string &buf); 62 63 private: SetTimeOut()64 void SetTimeOut() { 65 if (time_out_callback_ != nullptr && signal_handler_ != nullptr) { 66 signal_handler_->SetAlarm(time_out_secs_); 67 } 68 } CancelTimeOut()69 void CancelTimeOut() { 70 if (time_out_callback_ != nullptr && signal_handler_ != nullptr) { 71 signal_handler_->CancelAlarm(); 72 } 73 } NotifyTimeOut()74 void NotifyTimeOut() { 75 if (time_out_callback_ != nullptr) { 76 (*time_out_callback_)(); 77 } 78 Close(); 79 DP_EXCEPTION << "Time out when read from pipe"; 80 } 81 NotifyFinalize()82 void NotifyFinalize() { 83 if (finalize_callback_ != nullptr) { 84 (*finalize_callback_)(); 85 } 86 } 87 88 // Pipe: { Local:fd1_[1] --> Remote:fd1_[0] } 89 // Remote:fd1_[0] would be redirected by subprocess's stdin. 90 // Local:fd1_[1] would be used by 'Write()' as output. 91 int fd1_[2]{}; 92 93 // Pipe: { Remote:fd2_[1] --> Local:fd2_[0] } 94 // Remote:fd2_[1] would be redirected by subprocess's stdout. 95 // Local:fd2_[0] would be used by 'Read()' as input. 96 int fd2_[2]{}; 97 98 // // Used and returned by 'Read()'. 99 // std::string buf_; 100 char c_buf_[kBufferSize]{}; 101 102 int local_stdin_{}; 103 int local_stdout_{}; 104 int remote_stdin_{}; 105 int remote_stdout_{}; 106 107 class COMMON_EXPORT SignalHandler { 108 public: 109 SignalHandler(const std::weak_ptr<DuplexPipe> &dp, pid_t *pid); 110 ~SignalHandler(); 111 112 void SetAlarm(unsigned int interval_secs) const; 113 void CancelAlarm() const; 114 115 private: 116 static void SigAlarmHandler(int sig); 117 static void SigPipeHandler(int sig); 118 static void SigChildHandler(int /* sig */); 119 120 inline static std::weak_ptr<DuplexPipe> dp_; 121 inline static pid_t *child_pid_; 122 }; 123 124 unsigned int time_out_secs_ = kTimeOutSeconds; 125 std::shared_ptr<std::function<void()>> time_out_callback_; 126 std::shared_ptr<std::function<void()>> finalize_callback_; 127 // signal_handler_ has a pid_ pointer, so it must be ahead of pid_ 128 std::shared_ptr<SignalHandler> signal_handler_; 129 130 // Subprocess id in parent process, 131 // otherwise zero in child process. 132 pid_t pid_{}; 133 }; 134 } // namespace mindspore 135 #endif // MINDSPORE_CCSRC_INCLUDE_COMMON_DUPLEX_PIPE_H_ 136