• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2020 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef MINDSPORE_CCSRC_COMMON_DUPLEX_PIPE_H_
18 #define MINDSPORE_CCSRC_COMMON_DUPLEX_PIPE_H_
19 
20 #include <unistd.h>
21 #include <signal.h>
22 #include <string>
23 #include <memory>
24 #include <initializer_list>
25 #include <functional>
26 
27 #include "utils/log_adapter.h"
28 #define DP_DEBUG MS_LOG(DEBUG) << "[DuplexPipe] "
29 #define DP_INFO MS_LOG(INFO) << "[DuplexPipe] "
30 #define DP_ERROR MS_LOG(ERROR) << "[DuplexPipe] "
31 #define DP_EXCEPTION MS_LOG(EXCEPTION) << "[DuplexPipe] "
32 
33 namespace mindspore {
34 // A tool to run a command as child process and build a duplex pipe between them.
35 // Similar to 'popen()', but use duplex not simplex pipe, more like 'socketpair'.
36 class DuplexPipe : public std::enable_shared_from_this<mindspore::DuplexPipe> {
37  public:
38   constexpr inline static int kBufferSize = 4096;
39   constexpr inline static unsigned int kTimeOutSeconds = 5;
40 
41   DuplexPipe() = default;
42   ~DuplexPipe();
43 
44   // Create a subprocess and open a duplex pipe between local and remote
45   int Open(const std::initializer_list<std::string> &arg_list, bool append_fds = false);
46   void Close();
SetTimeOutSeconds(unsigned int secs)47   void SetTimeOutSeconds(unsigned int secs) { time_out_secs_ = secs; }
SetTimeOutCallback(const std::shared_ptr<std::function<void ()>> cb)48   void SetTimeOutCallback(const std::shared_ptr<std::function<void()>> cb) { time_out_callback_ = cb; }
SetFinalizeCallback(const std::shared_ptr<std::function<void ()>> cb)49   void SetFinalizeCallback(const std::shared_ptr<std::function<void()>> cb) { finalize_callback_ = cb; }
50 
51   // Write the 'buf' to remote stdin
52   void Write(const std::string &buf, bool flush = true) const;
53   // Read from remote stdout/stderr into 'c_buf_'
54   std::string Read();
55 
56   void WriteWithStdout(const std::string &buf, bool flush);
57   std::string ReadWithStdin();
58 
59   DuplexPipe &operator<<(const std::string &buf);
60   DuplexPipe &operator>>(std::string &buf);
61 
62  private:
SetTimeOut()63   void SetTimeOut() {
64     if (time_out_callback_ != nullptr && signal_handler_ != nullptr) {
65       signal_handler_->SetAlarm(time_out_secs_);
66     }
67   }
CancelTimeOut()68   void CancelTimeOut() {
69     if (time_out_callback_ != nullptr && signal_handler_ != nullptr) {
70       signal_handler_->CancelAlarm();
71     }
72   }
NotifyTimeOut()73   void NotifyTimeOut() {
74     if (time_out_callback_ != nullptr) {
75       (*time_out_callback_)();
76     }
77     Close();
78     DP_EXCEPTION << "Time out when read from pipe";
79   }
80 
NotifyFinalize()81   void NotifyFinalize() {
82     if (finalize_callback_ != nullptr) {
83       (*finalize_callback_)();
84     }
85   }
86 
87   // Pipe: { Local:fd1_[1] --> Remote:fd1_[0] }
88   // Remote:fd1_[0] would be redirected by subprocess's stdin.
89   // Local:fd1_[1] would be used by 'Write()' as output.
90   int fd1_[2];
91 
92   // Pipe: { Remote:fd2_[1] --> Local:fd2_[0] }
93   // Remote:fd2_[1] would be redirected by subprocess's stdout.
94   // Local:fd2_[0] would be used by 'Read()' as input.
95   int fd2_[2];
96 
97   // // Used and returned by 'Read()'.
98   // std::string buf_;
99   char c_buf_[kBufferSize];
100 
101   int local_stdin_;
102   int local_stdout_;
103   int remote_stdin_;
104   int remote_stdout_;
105 
106   class SignalHandler {
107    public:
108     SignalHandler(const std::weak_ptr<DuplexPipe> &dp, pid_t *pid);
109     ~SignalHandler();
110 
111     void SetAlarm(unsigned int interval_secs) const;
112     void CancelAlarm() const;
113 
114    private:
115     static void SigAlarmHandler(int sig);
116     static void SigPipeHandler(int sig);
117     static void SigChildHandler(int sig);
118 
119     inline static std::weak_ptr<DuplexPipe> dp_;
120     inline static pid_t *child_pid_;
121   };
122 
123   unsigned int time_out_secs_ = kTimeOutSeconds;
124   std::shared_ptr<std::function<void()>> time_out_callback_;
125   std::shared_ptr<std::function<void()>> finalize_callback_;
126   // signal_handler_ has a pid_ pointer, so it must be ahead of pid_
127   std::shared_ptr<SignalHandler> signal_handler_;
128 
129   // Subprocess id in parent process,
130   // otherwise zero in child process.
131   pid_t pid_;
132 };
133 }  // namespace mindspore
134 #endif  // MINDSPORE_CCSRC_COMMON_DUPLEX_PIPE_H_
135