1 /* Copyright 2020 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/core/platform/subprocess.h"
17
18 #include <io.h>
19 #include <signal.h>
20 #include <stdlib.h>
21 #include <string.h>
22 #include <sys/types.h>
23 #include <windows.h>
24
25 #include <vector>
26
27 #include "tensorflow/core/platform/logging.h"
28 #include "tensorflow/core/platform/strcat.h"
29
30 #define PIPE_BUF_SIZE 4096
31
32 namespace tensorflow {
33
34 namespace {
35
IsProcessFinished(HANDLE h)36 static bool IsProcessFinished(HANDLE h) {
37 DWORD process_return_code = STILL_ACTIVE;
38 // TODO handle failure
39 GetExitCodeProcess(h, &process_return_code);
40 return process_return_code != STILL_ACTIVE;
41 }
42
43 struct ThreadData {
44 string* iobuf;
45 HANDLE iohandle;
46 };
47
InputThreadFunction(LPVOID param)48 DWORD WINAPI InputThreadFunction(LPVOID param) {
49 ThreadData* args = reinterpret_cast<ThreadData*>(param);
50 string* input = args->iobuf;
51 HANDLE in_handle = args->iohandle;
52 size_t buffer_pointer = 0;
53
54 size_t total_bytes_written = 0;
55 bool ok = true;
56 while (ok && total_bytes_written < input->size()) {
57 DWORD bytes_written_this_time;
58 ok = WriteFile(in_handle, input->data() + total_bytes_written,
59 input->size() - total_bytes_written,
60 &bytes_written_this_time, nullptr);
61 total_bytes_written += bytes_written_this_time;
62 }
63 CloseHandle(in_handle);
64
65 if (!ok) {
66 return GetLastError();
67 } else {
68 return 0;
69 }
70 }
71
OutputThreadFunction(LPVOID param)72 DWORD WINAPI OutputThreadFunction(LPVOID param) {
73 ThreadData* args = reinterpret_cast<ThreadData*>(param);
74 string* output = args->iobuf;
75 HANDLE out_handle = args->iohandle;
76
77 char buf[PIPE_BUF_SIZE];
78 DWORD bytes_read;
79
80 bool wait_result = WaitForSingleObject(out_handle, INFINITE);
81 if (wait_result != WAIT_OBJECT_0) {
82 LOG(FATAL) << "WaitForSingleObject on child process output failed. "
83 "Error code: "
84 << wait_result;
85 }
86 while (ReadFile(out_handle, buf, sizeof(buf), &bytes_read, nullptr) &&
87 bytes_read > 0) {
88 output->append(buf, bytes_read);
89 }
90 CloseHandle(out_handle);
91 return 0;
92 }
93
94 } // namespace
95
SubProcess(int nfds)96 SubProcess::SubProcess(int nfds)
97 : running_(false),
98 exec_path_(nullptr),
99 exec_argv_(nullptr),
100 win_pi_(nullptr) {
101 // The input 'nfds' parameter is currently ignored and the internal constant
102 // 'kNFds' is used to support the 3 channels (stdin, stdout, stderr).
103 for (int i = 0; i < kNFds; i++) {
104 action_[i] = ACTION_CLOSE;
105 parent_pipe_[i] = nullptr;
106 }
107 }
108
~SubProcess()109 SubProcess::~SubProcess() {
110 mutex_lock procLock(proc_mu_);
111 mutex_lock dataLock(data_mu_);
112 if (win_pi_) {
113 CloseHandle(reinterpret_cast<PROCESS_INFORMATION*>(win_pi_)->hProcess);
114 CloseHandle(reinterpret_cast<PROCESS_INFORMATION*>(win_pi_)->hThread);
115 delete win_pi_;
116 }
117 running_ = false;
118 FreeArgs();
119 ClosePipes();
120 }
121
FreeArgs()122 void SubProcess::FreeArgs() {
123 free(exec_path_);
124 exec_path_ = nullptr;
125
126 if (exec_argv_) {
127 for (int i = 0; exec_argv_[i]; i++) {
128 free(exec_argv_[i]);
129 }
130 delete[] exec_argv_;
131 exec_argv_ = nullptr;
132 }
133 }
134
ClosePipes()135 void SubProcess::ClosePipes() {
136 for (int i = 0; i < kNFds; i++) {
137 if (parent_pipe_[i] != nullptr) {
138 CloseHandle(parent_pipe_[i]);
139 parent_pipe_[i] = nullptr;
140 }
141 }
142 }
143
SetProgram(const string & file,const std::vector<string> & argv)144 void SubProcess::SetProgram(const string& file,
145 const std::vector<string>& argv) {
146 mutex_lock procLock(proc_mu_);
147 mutex_lock dataLock(data_mu_);
148 if (running_) {
149 LOG(FATAL) << "SetProgram called after the process was started.";
150 return;
151 }
152
153 FreeArgs();
154 exec_path_ = _strdup(file.c_str());
155 if (exec_path_ == nullptr) {
156 LOG(FATAL) << "SetProgram failed to allocate file string.";
157 return;
158 }
159
160 int argc = argv.size();
161 exec_argv_ = new char*[argc + 1];
162 for (int i = 0; i < argc; i++) {
163 exec_argv_[i] = _strdup(argv[i].c_str());
164 if (exec_argv_[i] == nullptr) {
165 LOG(FATAL) << "SetProgram failed to allocate command argument.";
166 return;
167 }
168 }
169 exec_argv_[argc] = nullptr;
170 }
171
SetChannelAction(Channel chan,ChannelAction action)172 void SubProcess::SetChannelAction(Channel chan, ChannelAction action) {
173 mutex_lock procLock(proc_mu_);
174 mutex_lock dataLock(data_mu_);
175 if (running_) {
176 LOG(FATAL) << "SetChannelAction called after the process was started.";
177 } else if (!chan_valid(chan)) {
178 LOG(FATAL) << "SetChannelAction called with invalid channel: " << chan;
179 } else if ((action != ACTION_CLOSE) && (action != ACTION_PIPE) &&
180 (action != ACTION_DUPPARENT)) {
181 LOG(FATAL) << "SetChannelAction called with invalid action: " << action;
182 } else {
183 action_[chan] = action;
184 }
185 }
186
Start()187 bool SubProcess::Start() {
188 mutex_lock procLock(proc_mu_);
189 mutex_lock dataLock(data_mu_);
190 if (running_) {
191 LOG(ERROR) << "Start called after the process was started.";
192 return false;
193 }
194 if ((exec_path_ == nullptr) || (exec_argv_ == nullptr)) {
195 LOG(ERROR) << "Start called without setting a program.";
196 return false;
197 }
198
199 // SecurityAttributes to use in winapi calls below.
200 SECURITY_ATTRIBUTES attrs;
201 attrs.nLength = sizeof(SECURITY_ATTRIBUTES);
202 attrs.bInheritHandle = TRUE;
203 attrs.lpSecurityDescriptor = nullptr;
204
205 // No need to store subprocess end of the pipes, they will be closed before
206 // this function terminates.
207 HANDLE child_pipe_[kNFds] TF_GUARDED_BY(data_mu_);
208
209 // Create parent/child pipes for the specified channels and make the
210 // parent-side of the pipes non-blocking.
211 for (int i = 0; i < kNFds; i++) {
212 if (action_[i] == ACTION_PIPE) {
213 if (!CreatePipe(i == CHAN_STDIN ? child_pipe_ + i : parent_pipe_ + i,
214 i == CHAN_STDIN ? parent_pipe_ + i : child_pipe_ + i,
215 &attrs, PIPE_BUF_SIZE)) {
216 LOG(ERROR) << "Cannot create pipe. Error code: " << GetLastError();
217 ClosePipes();
218 return false;
219 }
220
221 // Parent pipes should not be inherited by the child process
222 if (!SetHandleInformation(parent_pipe_[i], HANDLE_FLAG_INHERIT, 0)) {
223 LOG(ERROR) << "Cannot set pipe handle attributes.";
224 ClosePipes();
225 return false;
226 }
227 } else if (action_[i] == ACTION_DUPPARENT) {
228 if (i == CHAN_STDIN) {
229 child_pipe_[i] = GetStdHandle(STD_INPUT_HANDLE);
230 } else if (i == CHAN_STDOUT) {
231 child_pipe_[i] = GetStdHandle(STD_OUTPUT_HANDLE);
232 } else {
233 child_pipe_[i] = GetStdHandle(STD_ERROR_HANDLE);
234 }
235 } else { // ACTION_CLOSE
236 parent_pipe_[i] = nullptr;
237 child_pipe_[i] = nullptr;
238 }
239 }
240
241 // Concatanate argv, because winapi wants it so.
242 string command_line = strings::StrCat("\"", exec_path_, "\"");
243 for (int i = 1; exec_argv_[i]; i++) {
244 command_line.append(strings::StrCat(" \"", exec_argv_[i], "\""));
245 }
246
247 // Set up the STARTUPINFO struct with information about the pipe handles.
248 STARTUPINFOA si;
249 ZeroMemory(&si, sizeof(STARTUPINFO));
250 si.cb = sizeof(STARTUPINFO);
251
252 // Prevent console window popping in case we are in GUI mode
253 si.dwFlags |= STARTF_USESHOWWINDOW;
254 si.wShowWindow = SW_HIDE;
255
256 // Handle the pipes for the child process.
257 si.dwFlags |= STARTF_USESTDHANDLES;
258 if (child_pipe_[CHAN_STDIN]) {
259 si.hStdInput = child_pipe_[CHAN_STDIN];
260 }
261 if (child_pipe_[CHAN_STDOUT]) {
262 si.hStdOutput = child_pipe_[CHAN_STDOUT];
263 }
264 if (child_pipe_[CHAN_STDERR]) {
265 si.hStdError = child_pipe_[CHAN_STDERR];
266 }
267
268 // Allocate the POROCESS_INFORMATION struct.
269 win_pi_ = new PROCESS_INFORMATION;
270
271 // Execute the child program.
272 bool bSuccess =
273 CreateProcessA(nullptr, const_cast<char*>(command_line.c_str()), nullptr,
274 nullptr, TRUE, CREATE_NO_WINDOW, nullptr, nullptr, &si,
275 reinterpret_cast<PROCESS_INFORMATION*>(win_pi_));
276
277 if (bSuccess) {
278 for (int i = 0; i < kNFds; i++) {
279 if (child_pipe_[i] != nullptr) {
280 CloseHandle(child_pipe_[i]);
281 child_pipe_[i] = nullptr;
282 }
283 }
284 running_ = true;
285 return true;
286 } else {
287 LOG(ERROR) << "Call to CreateProcess failed. Error code: " << GetLastError()
288 << ", command: '" << command_line << "'";
289 ClosePipes();
290 return false;
291 }
292 }
293
Wait()294 bool SubProcess::Wait() {
295 int status;
296 return WaitInternal(&status);
297 }
298
WaitInternal(int * status)299 bool SubProcess::WaitInternal(int* status) {
300 // The waiter must release proc_mu_ while waiting in order for Kill() to work.
301 proc_mu_.lock();
302 bool running = running_;
303 PROCESS_INFORMATION pi_ = *reinterpret_cast<PROCESS_INFORMATION*>(win_pi_);
304 proc_mu_.unlock();
305
306 bool ret = false;
307 if (running && pi_.hProcess) {
308 DWORD wait_status = WaitForSingleObject(pi_.hProcess, INFINITE);
309 if (wait_status == WAIT_OBJECT_0) {
310 DWORD process_exit_code = 0;
311 if (GetExitCodeProcess(pi_.hProcess, &process_exit_code)) {
312 *status = static_cast<int>(process_exit_code);
313 } else {
314 LOG(FATAL) << "Wait failed with code: " << GetLastError();
315 }
316 } else {
317 LOG(FATAL) << "WaitForSingleObject call on the process handle failed. "
318 "Error code: "
319 << wait_status;
320 }
321 }
322
323 proc_mu_.lock();
324 if ((running_ == running) &&
325 (pi_.hProcess ==
326 reinterpret_cast<PROCESS_INFORMATION*>(win_pi_)->hProcess)) {
327 running_ = false;
328 CloseHandle(reinterpret_cast<PROCESS_INFORMATION*>(win_pi_)->hProcess);
329 CloseHandle(reinterpret_cast<PROCESS_INFORMATION*>(win_pi_)->hThread);
330 reinterpret_cast<PROCESS_INFORMATION*>(win_pi_)->hProcess = nullptr;
331 reinterpret_cast<PROCESS_INFORMATION*>(win_pi_)->hThread = nullptr;
332 }
333 proc_mu_.unlock();
334 return *status == 0;
335 }
336
Kill(int unused_signal)337 bool SubProcess::Kill(int unused_signal) {
338 proc_mu_.lock();
339 bool running = running_;
340 PROCESS_INFORMATION pi_ = *reinterpret_cast<PROCESS_INFORMATION*>(win_pi_);
341 proc_mu_.unlock();
342
343 bool ret = false;
344 if (running && pi_.hProcess) {
345 ret = TerminateProcess(pi_.hProcess, 0);
346 }
347 return ret;
348 }
349
Communicate(const string * stdin_input,string * stdout_output,string * stderr_output)350 int SubProcess::Communicate(const string* stdin_input, string* stdout_output,
351 string* stderr_output) {
352 proc_mu_.lock();
353 bool running = running_;
354 proc_mu_.unlock();
355 if (!running) {
356 LOG(ERROR) << "Communicate called without a running process.";
357 return 1;
358 }
359
360 HANDLE thread_handles[kNFds];
361 int thread_count = 0;
362 ThreadData thread_params[kNFds];
363
364 // Lock data_mu_ but not proc_mu_ while communicating with the child process
365 // in order for Kill() to be able to terminate the child from another thread.
366 data_mu_.lock();
367 if (!IsProcessFinished(
368 reinterpret_cast<PROCESS_INFORMATION*>(win_pi_)->hProcess) ||
369 (parent_pipe_[CHAN_STDOUT] != nullptr) ||
370 (parent_pipe_[CHAN_STDERR] != nullptr)) {
371 if (parent_pipe_[CHAN_STDIN] != nullptr) {
372 if (stdin_input) {
373 thread_params[thread_count].iobuf = const_cast<string*>(stdin_input);
374 thread_params[thread_count].iohandle = parent_pipe_[CHAN_STDIN];
375 parent_pipe_[CHAN_STDIN] = nullptr;
376 thread_handles[thread_count] =
377 CreateThread(NULL, 0, InputThreadFunction,
378 thread_params + thread_count, 0, NULL);
379 thread_count++;
380 }
381 } else {
382 CloseHandle(parent_pipe_[CHAN_STDIN]);
383 parent_pipe_[CHAN_STDIN] = NULL;
384 }
385
386 if (parent_pipe_[CHAN_STDOUT] != nullptr) {
387 if (stdout_output != nullptr) {
388 thread_params[thread_count].iobuf = stdout_output;
389 thread_params[thread_count].iohandle = parent_pipe_[CHAN_STDOUT];
390 parent_pipe_[CHAN_STDOUT] = NULL;
391 thread_handles[thread_count] =
392 CreateThread(NULL, 0, OutputThreadFunction,
393 thread_params + thread_count, 0, NULL);
394 thread_count++;
395 } else {
396 CloseHandle(parent_pipe_[CHAN_STDOUT]);
397 parent_pipe_[CHAN_STDOUT] = nullptr;
398 }
399 }
400
401 if (parent_pipe_[CHAN_STDERR] != nullptr) {
402 if (stderr_output != nullptr) {
403 thread_params[thread_count].iobuf = stderr_output;
404 thread_params[thread_count].iohandle = parent_pipe_[CHAN_STDERR];
405 parent_pipe_[CHAN_STDERR] = NULL;
406 thread_handles[thread_count] =
407 CreateThread(NULL, 0, OutputThreadFunction,
408 thread_params + thread_count, 0, NULL);
409 thread_count++;
410 } else {
411 CloseHandle(parent_pipe_[CHAN_STDERR]);
412 parent_pipe_[CHAN_STDERR] = nullptr;
413 }
414 }
415 }
416
417 // Wait for all IO threads to exit.
418 if (thread_count > 0) {
419 DWORD wait_result = WaitForMultipleObjects(thread_count, thread_handles,
420 true, // wait all threads
421 INFINITE);
422 if (wait_result != WAIT_OBJECT_0) {
423 LOG(ERROR) << "Waiting on the io threads failed! result: " << wait_result
424 << std::endl;
425 return -1;
426 }
427
428 for (int i = 0; i < thread_count; i++) {
429 DWORD exit_code;
430 if (GetExitCodeThread(thread_handles[i], &exit_code)) {
431 if (exit_code) {
432 LOG(ERROR) << "One of the IO threads failed with code: " << exit_code;
433 }
434 } else {
435 LOG(ERROR) << "Error checking io thread exit statuses. Error Code: "
436 << GetLastError();
437 }
438 }
439 }
440
441 data_mu_.unlock();
442
443 // Wait for the child process to exit and return its status.
444 int status;
445 return WaitInternal(&status) ? status : -1;
446 }
447
CreateSubProcess(const std::vector<string> & argv)448 std::unique_ptr<SubProcess> CreateSubProcess(const std::vector<string>& argv) {
449 std::unique_ptr<SubProcess> proc(new SubProcess());
450 proc->SetProgram(argv[0], argv);
451 proc->SetChannelAction(CHAN_STDERR, ACTION_DUPPARENT);
452 proc->SetChannelAction(CHAN_STDOUT, ACTION_DUPPARENT);
453 return proc;
454 }
455
456 } // namespace tensorflow
457