• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2020-2023 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef MINDSPORE_CCSRC_INCLUDE_COMMON_DUPLEX_PIPE_H_
18 #define MINDSPORE_CCSRC_INCLUDE_COMMON_DUPLEX_PIPE_H_
19 
20 #include <csignal>
21 #include <string>
22 #include <memory>
23 #include <initializer_list>
24 #include <functional>
25 
26 #include "utils/log_adapter.h"
27 #include "include/common/visible.h"
28 
29 #define DP_INFO MS_LOG(INFO) << "[DuplexPipe] "
30 #define DP_WARNING MS_LOG(WARNING) << "[DuplexPipe] "
31 #define DP_ERROR MS_LOG(ERROR) << "[DuplexPipe] "
32 #define DP_EXCEPTION MS_LOG(EXCEPTION) << "[DuplexPipe] "
33 
34 namespace mindspore {
35 // A tool to run a command as child process and build a duplex pipe between them.
36 // Similar to 'popen()', but use duplex not simplex pipe, more like 'socketpair'.
37 class COMMON_EXPORT DuplexPipe : public std::enable_shared_from_this<mindspore::DuplexPipe> {
38  public:
39   constexpr inline static int kBufferSize = 4096;
40   constexpr inline static unsigned int kTimeOutSeconds = 5;
41 
42   DuplexPipe() = default;
43   virtual ~DuplexPipe();
44 
45   // Create a subprocess and open a duplex pipe between local and remote
46   int Open(const std::initializer_list<std::string> &arg_list, bool append_fds = false);
47   void Close() noexcept;
SetTimeOutSeconds(unsigned int secs)48   void SetTimeOutSeconds(unsigned int secs) { time_out_secs_ = secs; }
SetTimeOutCallback(const std::shared_ptr<std::function<void ()>> & cb)49   void SetTimeOutCallback(const std::shared_ptr<std::function<void()>> &cb) { time_out_callback_ = cb; }
SetFinalizeCallback(const std::shared_ptr<std::function<void ()>> & cb)50   void SetFinalizeCallback(const std::shared_ptr<std::function<void()>> &cb) { finalize_callback_ = cb; }
51 
52   // Write the 'buf' to remote stdin
53   void Write(const std::string &buf, bool flush = true) const;
54   // Read from remote stdout/stderr into 'c_buf_'
55   std::string Read();
56 
57   void WriteWithStdout(const std::string &buf, bool flush);
58   std::string ReadWithStdin();
59 
60   const DuplexPipe &operator<<(const std::string &buf) const;
61   DuplexPipe &operator>>(std::string &buf);
62 
63  private:
SetTimeOut()64   void SetTimeOut() {
65     if (time_out_callback_ != nullptr && signal_handler_ != nullptr) {
66       signal_handler_->SetAlarm(time_out_secs_);
67     }
68   }
CancelTimeOut()69   void CancelTimeOut() {
70     if (time_out_callback_ != nullptr && signal_handler_ != nullptr) {
71       signal_handler_->CancelAlarm();
72     }
73   }
NotifyTimeOut()74   void NotifyTimeOut() {
75     if (time_out_callback_ != nullptr) {
76       (*time_out_callback_)();
77     }
78     Close();
79     DP_EXCEPTION << "Time out when read from pipe";
80   }
81 
NotifyFinalize()82   void NotifyFinalize() {
83     if (finalize_callback_ != nullptr) {
84       (*finalize_callback_)();
85     }
86   }
87 
88   // Pipe: { Local:fd1_[1] --> Remote:fd1_[0] }
89   // Remote:fd1_[0] would be redirected by subprocess's stdin.
90   // Local:fd1_[1] would be used by 'Write()' as output.
91   int fd1_[2]{};
92 
93   // Pipe: { Remote:fd2_[1] --> Local:fd2_[0] }
94   // Remote:fd2_[1] would be redirected by subprocess's stdout.
95   // Local:fd2_[0] would be used by 'Read()' as input.
96   int fd2_[2]{};
97 
98   // // Used and returned by 'Read()'.
99   // std::string buf_;
100   char c_buf_[kBufferSize]{};
101 
102   int local_stdin_{};
103   int local_stdout_{};
104   int remote_stdin_{};
105   int remote_stdout_{};
106 
107   class COMMON_EXPORT SignalHandler {
108    public:
109     SignalHandler(const std::weak_ptr<DuplexPipe> &dp, pid_t *pid);
110     ~SignalHandler();
111 
112     void SetAlarm(unsigned int interval_secs) const;
113     void CancelAlarm() const;
114 
115    private:
116     static void SigAlarmHandler(int sig);
117     static void SigPipeHandler(int sig);
118     static void SigChildHandler(int /* sig */);
119 
120     inline static std::weak_ptr<DuplexPipe> dp_;
121     inline static pid_t *child_pid_;
122   };
123 
124   unsigned int time_out_secs_ = kTimeOutSeconds;
125   std::shared_ptr<std::function<void()>> time_out_callback_;
126   std::shared_ptr<std::function<void()>> finalize_callback_;
127   // signal_handler_ has a pid_ pointer, so it must be ahead of pid_
128   std::shared_ptr<SignalHandler> signal_handler_;
129 
130   // Subprocess id in parent process,
131   // otherwise zero in child process.
132   pid_t pid_{};
133 };
134 }  // namespace mindspore
135 #endif  // MINDSPORE_CCSRC_INCLUDE_COMMON_DUPLEX_PIPE_H_
136