• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright 2020 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/core/platform/subprocess.h"
17 
18 #include <io.h>
19 #include <signal.h>
20 #include <stdlib.h>
21 #include <string.h>
22 #include <sys/types.h>
23 #include <windows.h>
24 
25 #include <vector>
26 
27 #include "tensorflow/core/platform/logging.h"
28 #include "tensorflow/core/platform/strcat.h"
29 
30 #define PIPE_BUF_SIZE 4096
31 
32 namespace tensorflow {
33 
34 namespace {
35 
IsProcessFinished(HANDLE h)36 static bool IsProcessFinished(HANDLE h) {
37   DWORD process_return_code = STILL_ACTIVE;
38   // TODO handle failure
39   GetExitCodeProcess(h, &process_return_code);
40   return process_return_code != STILL_ACTIVE;
41 }
42 
43 struct ThreadData {
44   string* iobuf;
45   HANDLE iohandle;
46 };
47 
InputThreadFunction(LPVOID param)48 DWORD WINAPI InputThreadFunction(LPVOID param) {
49   ThreadData* args = reinterpret_cast<ThreadData*>(param);
50   string* input = args->iobuf;
51   HANDLE in_handle = args->iohandle;
52   size_t buffer_pointer = 0;
53 
54   size_t total_bytes_written = 0;
55   bool ok = true;
56   while (ok && total_bytes_written < input->size()) {
57     DWORD bytes_written_this_time;
58     ok = WriteFile(in_handle, input->data() + total_bytes_written,
59                    input->size() - total_bytes_written,
60                    &bytes_written_this_time, nullptr);
61     total_bytes_written += bytes_written_this_time;
62   }
63   CloseHandle(in_handle);
64 
65   if (!ok) {
66     return GetLastError();
67   } else {
68     return 0;
69   }
70 }
71 
OutputThreadFunction(LPVOID param)72 DWORD WINAPI OutputThreadFunction(LPVOID param) {
73   ThreadData* args = reinterpret_cast<ThreadData*>(param);
74   string* output = args->iobuf;
75   HANDLE out_handle = args->iohandle;
76 
77   char buf[PIPE_BUF_SIZE];
78   DWORD bytes_read;
79 
80   bool wait_result = WaitForSingleObject(out_handle, INFINITE);
81   if (wait_result != WAIT_OBJECT_0) {
82     LOG(FATAL) << "WaitForSingleObject on child process output failed. "
83                   "Error code: "
84                << wait_result;
85   }
86   while (ReadFile(out_handle, buf, sizeof(buf), &bytes_read, nullptr) &&
87          bytes_read > 0) {
88     output->append(buf, bytes_read);
89   }
90   CloseHandle(out_handle);
91   return 0;
92 }
93 
94 }  // namespace
95 
SubProcess(int nfds)96 SubProcess::SubProcess(int nfds)
97     : running_(false),
98       exec_path_(nullptr),
99       exec_argv_(nullptr),
100       win_pi_(nullptr) {
101   // The input 'nfds' parameter is currently ignored and the internal constant
102   // 'kNFds' is used to support the 3 channels (stdin, stdout, stderr).
103   for (int i = 0; i < kNFds; i++) {
104     action_[i] = ACTION_CLOSE;
105     parent_pipe_[i] = nullptr;
106   }
107 }
108 
~SubProcess()109 SubProcess::~SubProcess() {
110   mutex_lock procLock(proc_mu_);
111   mutex_lock dataLock(data_mu_);
112   if (win_pi_) {
113     CloseHandle(reinterpret_cast<PROCESS_INFORMATION*>(win_pi_)->hProcess);
114     CloseHandle(reinterpret_cast<PROCESS_INFORMATION*>(win_pi_)->hThread);
115     delete win_pi_;
116   }
117   running_ = false;
118   FreeArgs();
119   ClosePipes();
120 }
121 
FreeArgs()122 void SubProcess::FreeArgs() {
123   free(exec_path_);
124   exec_path_ = nullptr;
125 
126   if (exec_argv_) {
127     for (int i = 0; exec_argv_[i]; i++) {
128       free(exec_argv_[i]);
129     }
130     delete[] exec_argv_;
131     exec_argv_ = nullptr;
132   }
133 }
134 
ClosePipes()135 void SubProcess::ClosePipes() {
136   for (int i = 0; i < kNFds; i++) {
137     if (parent_pipe_[i] != nullptr) {
138       CloseHandle(parent_pipe_[i]);
139       parent_pipe_[i] = nullptr;
140     }
141   }
142 }
143 
SetProgram(const string & file,const std::vector<string> & argv)144 void SubProcess::SetProgram(const string& file,
145                             const std::vector<string>& argv) {
146   mutex_lock procLock(proc_mu_);
147   mutex_lock dataLock(data_mu_);
148   if (running_) {
149     LOG(FATAL) << "SetProgram called after the process was started.";
150     return;
151   }
152 
153   FreeArgs();
154   exec_path_ = _strdup(file.c_str());
155   if (exec_path_ == nullptr) {
156     LOG(FATAL) << "SetProgram failed to allocate file string.";
157     return;
158   }
159 
160   int argc = argv.size();
161   exec_argv_ = new char*[argc + 1];
162   for (int i = 0; i < argc; i++) {
163     exec_argv_[i] = _strdup(argv[i].c_str());
164     if (exec_argv_[i] == nullptr) {
165       LOG(FATAL) << "SetProgram failed to allocate command argument.";
166       return;
167     }
168   }
169   exec_argv_[argc] = nullptr;
170 }
171 
SetChannelAction(Channel chan,ChannelAction action)172 void SubProcess::SetChannelAction(Channel chan, ChannelAction action) {
173   mutex_lock procLock(proc_mu_);
174   mutex_lock dataLock(data_mu_);
175   if (running_) {
176     LOG(FATAL) << "SetChannelAction called after the process was started.";
177   } else if (!chan_valid(chan)) {
178     LOG(FATAL) << "SetChannelAction called with invalid channel: " << chan;
179   } else if ((action != ACTION_CLOSE) && (action != ACTION_PIPE) &&
180              (action != ACTION_DUPPARENT)) {
181     LOG(FATAL) << "SetChannelAction called with invalid action: " << action;
182   } else {
183     action_[chan] = action;
184   }
185 }
186 
Start()187 bool SubProcess::Start() {
188   mutex_lock procLock(proc_mu_);
189   mutex_lock dataLock(data_mu_);
190   if (running_) {
191     LOG(ERROR) << "Start called after the process was started.";
192     return false;
193   }
194   if ((exec_path_ == nullptr) || (exec_argv_ == nullptr)) {
195     LOG(ERROR) << "Start called without setting a program.";
196     return false;
197   }
198 
199   // SecurityAttributes to use in winapi calls below.
200   SECURITY_ATTRIBUTES attrs;
201   attrs.nLength = sizeof(SECURITY_ATTRIBUTES);
202   attrs.bInheritHandle = TRUE;
203   attrs.lpSecurityDescriptor = nullptr;
204 
205   // No need to store subprocess end of the pipes, they will be closed before
206   // this function terminates.
207   HANDLE child_pipe_[kNFds] TF_GUARDED_BY(data_mu_);
208 
209   // Create parent/child pipes for the specified channels and make the
210   // parent-side of the pipes non-blocking.
211   for (int i = 0; i < kNFds; i++) {
212     if (action_[i] == ACTION_PIPE) {
213       if (!CreatePipe(i == CHAN_STDIN ? child_pipe_ + i : parent_pipe_ + i,
214                       i == CHAN_STDIN ? parent_pipe_ + i : child_pipe_ + i,
215                       &attrs, PIPE_BUF_SIZE)) {
216         LOG(ERROR) << "Cannot create pipe. Error code: " << GetLastError();
217         ClosePipes();
218         return false;
219       }
220 
221       // Parent pipes should not be inherited by the child process
222       if (!SetHandleInformation(parent_pipe_[i], HANDLE_FLAG_INHERIT, 0)) {
223         LOG(ERROR) << "Cannot set pipe handle attributes.";
224         ClosePipes();
225         return false;
226       }
227     } else if (action_[i] == ACTION_DUPPARENT) {
228       if (i == CHAN_STDIN) {
229         child_pipe_[i] = GetStdHandle(STD_INPUT_HANDLE);
230       } else if (i == CHAN_STDOUT) {
231         child_pipe_[i] = GetStdHandle(STD_OUTPUT_HANDLE);
232       } else {
233         child_pipe_[i] = GetStdHandle(STD_ERROR_HANDLE);
234       }
235     } else {  // ACTION_CLOSE
236       parent_pipe_[i] = nullptr;
237       child_pipe_[i] = nullptr;
238     }
239   }
240 
241   // Concatanate argv, because winapi wants it so.
242   string command_line = strings::StrCat("\"", exec_path_, "\"");
243   for (int i = 1; exec_argv_[i]; i++) {
244     command_line.append(strings::StrCat(" \"", exec_argv_[i], "\""));
245   }
246 
247   // Set up the STARTUPINFO struct with information about the pipe handles.
248   STARTUPINFOA si;
249   ZeroMemory(&si, sizeof(STARTUPINFO));
250   si.cb = sizeof(STARTUPINFO);
251 
252   // Prevent console window popping in case we are in GUI mode
253   si.dwFlags |= STARTF_USESHOWWINDOW;
254   si.wShowWindow = SW_HIDE;
255 
256   // Handle the pipes for the child process.
257   si.dwFlags |= STARTF_USESTDHANDLES;
258   if (child_pipe_[CHAN_STDIN]) {
259     si.hStdInput = child_pipe_[CHAN_STDIN];
260   }
261   if (child_pipe_[CHAN_STDOUT]) {
262     si.hStdOutput = child_pipe_[CHAN_STDOUT];
263   }
264   if (child_pipe_[CHAN_STDERR]) {
265     si.hStdError = child_pipe_[CHAN_STDERR];
266   }
267 
268   // Allocate the POROCESS_INFORMATION struct.
269   win_pi_ = new PROCESS_INFORMATION;
270 
271   // Execute the child program.
272   bool bSuccess =
273       CreateProcessA(nullptr, const_cast<char*>(command_line.c_str()), nullptr,
274                      nullptr, TRUE, CREATE_NO_WINDOW, nullptr, nullptr, &si,
275                      reinterpret_cast<PROCESS_INFORMATION*>(win_pi_));
276 
277   if (bSuccess) {
278     for (int i = 0; i < kNFds; i++) {
279       if (child_pipe_[i] != nullptr) {
280         CloseHandle(child_pipe_[i]);
281         child_pipe_[i] = nullptr;
282       }
283     }
284     running_ = true;
285     return true;
286   } else {
287     LOG(ERROR) << "Call to CreateProcess failed. Error code: " << GetLastError()
288                << ", command: '" << command_line << "'";
289     ClosePipes();
290     return false;
291   }
292 }
293 
Wait()294 bool SubProcess::Wait() {
295   int status;
296   return WaitInternal(&status);
297 }
298 
WaitInternal(int * status)299 bool SubProcess::WaitInternal(int* status) {
300   // The waiter must release proc_mu_ while waiting in order for Kill() to work.
301   proc_mu_.lock();
302   bool running = running_;
303   PROCESS_INFORMATION pi_ = *reinterpret_cast<PROCESS_INFORMATION*>(win_pi_);
304   proc_mu_.unlock();
305 
306   bool ret = false;
307   if (running && pi_.hProcess) {
308     DWORD wait_status = WaitForSingleObject(pi_.hProcess, INFINITE);
309     if (wait_status == WAIT_OBJECT_0) {
310       DWORD process_exit_code = 0;
311       if (GetExitCodeProcess(pi_.hProcess, &process_exit_code)) {
312         *status = static_cast<int>(process_exit_code);
313       } else {
314         LOG(FATAL) << "Wait failed with code: " << GetLastError();
315       }
316     } else {
317       LOG(FATAL) << "WaitForSingleObject call on the process handle failed. "
318                     "Error code: "
319                  << wait_status;
320     }
321   }
322 
323   proc_mu_.lock();
324   if ((running_ == running) &&
325       (pi_.hProcess ==
326        reinterpret_cast<PROCESS_INFORMATION*>(win_pi_)->hProcess)) {
327     running_ = false;
328     CloseHandle(reinterpret_cast<PROCESS_INFORMATION*>(win_pi_)->hProcess);
329     CloseHandle(reinterpret_cast<PROCESS_INFORMATION*>(win_pi_)->hThread);
330     reinterpret_cast<PROCESS_INFORMATION*>(win_pi_)->hProcess = nullptr;
331     reinterpret_cast<PROCESS_INFORMATION*>(win_pi_)->hThread = nullptr;
332   }
333   proc_mu_.unlock();
334   return *status == 0;
335 }
336 
Kill(int unused_signal)337 bool SubProcess::Kill(int unused_signal) {
338   proc_mu_.lock();
339   bool running = running_;
340   PROCESS_INFORMATION pi_ = *reinterpret_cast<PROCESS_INFORMATION*>(win_pi_);
341   proc_mu_.unlock();
342 
343   bool ret = false;
344   if (running && pi_.hProcess) {
345     ret = TerminateProcess(pi_.hProcess, 0);
346   }
347   return ret;
348 }
349 
Communicate(const string * stdin_input,string * stdout_output,string * stderr_output)350 int SubProcess::Communicate(const string* stdin_input, string* stdout_output,
351                             string* stderr_output) {
352   proc_mu_.lock();
353   bool running = running_;
354   proc_mu_.unlock();
355   if (!running) {
356     LOG(ERROR) << "Communicate called without a running process.";
357     return 1;
358   }
359 
360   HANDLE thread_handles[kNFds];
361   int thread_count = 0;
362   ThreadData thread_params[kNFds];
363 
364   // Lock data_mu_ but not proc_mu_ while communicating with the child process
365   // in order for Kill() to be able to terminate the child from another thread.
366   data_mu_.lock();
367   if (!IsProcessFinished(
368           reinterpret_cast<PROCESS_INFORMATION*>(win_pi_)->hProcess) ||
369       (parent_pipe_[CHAN_STDOUT] != nullptr) ||
370       (parent_pipe_[CHAN_STDERR] != nullptr)) {
371     if (parent_pipe_[CHAN_STDIN] != nullptr) {
372       if (stdin_input) {
373         thread_params[thread_count].iobuf = const_cast<string*>(stdin_input);
374         thread_params[thread_count].iohandle = parent_pipe_[CHAN_STDIN];
375         parent_pipe_[CHAN_STDIN] = nullptr;
376         thread_handles[thread_count] =
377             CreateThread(NULL, 0, InputThreadFunction,
378                          thread_params + thread_count, 0, NULL);
379         thread_count++;
380       }
381     } else {
382       CloseHandle(parent_pipe_[CHAN_STDIN]);
383       parent_pipe_[CHAN_STDIN] = NULL;
384     }
385 
386     if (parent_pipe_[CHAN_STDOUT] != nullptr) {
387       if (stdout_output != nullptr) {
388         thread_params[thread_count].iobuf = stdout_output;
389         thread_params[thread_count].iohandle = parent_pipe_[CHAN_STDOUT];
390         parent_pipe_[CHAN_STDOUT] = NULL;
391         thread_handles[thread_count] =
392             CreateThread(NULL, 0, OutputThreadFunction,
393                          thread_params + thread_count, 0, NULL);
394         thread_count++;
395       } else {
396         CloseHandle(parent_pipe_[CHAN_STDOUT]);
397         parent_pipe_[CHAN_STDOUT] = nullptr;
398       }
399     }
400 
401     if (parent_pipe_[CHAN_STDERR] != nullptr) {
402       if (stderr_output != nullptr) {
403         thread_params[thread_count].iobuf = stderr_output;
404         thread_params[thread_count].iohandle = parent_pipe_[CHAN_STDERR];
405         parent_pipe_[CHAN_STDERR] = NULL;
406         thread_handles[thread_count] =
407             CreateThread(NULL, 0, OutputThreadFunction,
408                          thread_params + thread_count, 0, NULL);
409         thread_count++;
410       } else {
411         CloseHandle(parent_pipe_[CHAN_STDERR]);
412         parent_pipe_[CHAN_STDERR] = nullptr;
413       }
414     }
415   }
416 
417   // Wait for all IO threads to exit.
418   if (thread_count > 0) {
419     DWORD wait_result = WaitForMultipleObjects(thread_count, thread_handles,
420                                                true,  // wait all threads
421                                                INFINITE);
422     if (wait_result != WAIT_OBJECT_0) {
423       LOG(ERROR) << "Waiting on the io threads failed! result: " << wait_result
424                  << std::endl;
425       return -1;
426     }
427 
428     for (int i = 0; i < thread_count; i++) {
429       DWORD exit_code;
430       if (GetExitCodeThread(thread_handles[i], &exit_code)) {
431         if (exit_code) {
432           LOG(ERROR) << "One of the IO threads failed with code: " << exit_code;
433         }
434       } else {
435         LOG(ERROR) << "Error checking io thread exit statuses. Error Code: "
436                    << GetLastError();
437       }
438     }
439   }
440 
441   data_mu_.unlock();
442 
443   // Wait for the child process to exit and return its status.
444   int status;
445   return WaitInternal(&status) ? status : -1;
446 }
447 
CreateSubProcess(const std::vector<string> & argv)448 std::unique_ptr<SubProcess> CreateSubProcess(const std::vector<string>& argv) {
449   std::unique_ptr<SubProcess> proc(new SubProcess());
450   proc->SetProgram(argv[0], argv);
451   proc->SetChannelAction(CHAN_STDERR, ACTION_DUPPARENT);
452   proc->SetChannelAction(CHAN_STDOUT, ACTION_DUPPARENT);
453   return proc;
454 }
455 
456 }  // namespace tensorflow
457