1 /* Copyright 2020 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/core/platform/subprocess.h"
17
18 #include <io.h>
19 #include <signal.h>
20 #include <stdlib.h>
21 #include <string.h>
22 #include <sys/types.h>
23 #include <windows.h>
24
25 #include <vector>
26
27 #include "tensorflow/core/platform/logging.h"
28 #include "tensorflow/core/platform/strcat.h"
29
30 #define PIPE_BUF_SIZE 4096
31
32 namespace tensorflow {
33
34 namespace {
35
IsProcessFinished(HANDLE h)36 static bool IsProcessFinished(HANDLE h) {
37 DWORD process_return_code = STILL_ACTIVE;
38 // TODO handle failure
39 GetExitCodeProcess(h, &process_return_code);
40 return process_return_code != STILL_ACTIVE;
41 }
42
43 struct ThreadData {
44 string* iobuf;
45 HANDLE iohandle;
46 };
47
InputThreadFunction(LPVOID param)48 DWORD WINAPI InputThreadFunction(LPVOID param) {
49 ThreadData* args = reinterpret_cast<ThreadData*>(param);
50 string* input = args->iobuf;
51 HANDLE in_handle = args->iohandle;
52 size_t buffer_pointer = 0;
53
54 size_t total_bytes_written = 0;
55 bool ok = true;
56 while (ok && total_bytes_written < input->size()) {
57 DWORD bytes_written_this_time;
58 ok = WriteFile(in_handle, input->data() + total_bytes_written,
59 input->size() - total_bytes_written,
60 &bytes_written_this_time, nullptr);
61 total_bytes_written += bytes_written_this_time;
62 }
63 CloseHandle(in_handle);
64
65 if (!ok) {
66 return GetLastError();
67 } else {
68 return 0;
69 }
70 }
71
OutputThreadFunction(LPVOID param)72 DWORD WINAPI OutputThreadFunction(LPVOID param) {
73 ThreadData* args = reinterpret_cast<ThreadData*>(param);
74 string* output = args->iobuf;
75 HANDLE out_handle = args->iohandle;
76
77 char buf[PIPE_BUF_SIZE];
78 DWORD bytes_read;
79
80 bool wait_result = WaitForSingleObject(out_handle, INFINITE);
81 if (wait_result != WAIT_OBJECT_0) {
82 LOG(FATAL) << "WaitForSingleObject on child process output failed. "
83 "Error code: "
84 << wait_result;
85 }
86 while (ReadFile(out_handle, buf, sizeof(buf), &bytes_read, nullptr) &&
87 bytes_read > 0) {
88 output->append(buf, bytes_read);
89 }
90 CloseHandle(out_handle);
91 return 0;
92 }
93
94 } // namespace
95
SubProcess(int nfds)96 SubProcess::SubProcess(int nfds)
97 : running_(false),
98 exec_path_(nullptr),
99 exec_argv_(nullptr),
100 win_pi_(nullptr) {
101 // The input 'nfds' parameter is currently ignored and the internal constant
102 // 'kNFds' is used to support the 3 channels (stdin, stdout, stderr).
103 for (int i = 0; i < kNFds; i++) {
104 action_[i] = ACTION_CLOSE;
105 parent_pipe_[i] = nullptr;
106 }
107 }
108
~SubProcess()109 SubProcess::~SubProcess() {
110 mutex_lock procLock(proc_mu_);
111 mutex_lock dataLock(data_mu_);
112 if (win_pi_) {
113 CloseHandle(reinterpret_cast<PROCESS_INFORMATION*>(win_pi_)->hProcess);
114 CloseHandle(reinterpret_cast<PROCESS_INFORMATION*>(win_pi_)->hThread);
115 delete win_pi_;
116 }
117 running_ = false;
118 FreeArgs();
119 ClosePipes();
120 }
121
FreeArgs()122 void SubProcess::FreeArgs() {
123 free(exec_path_);
124 exec_path_ = nullptr;
125
126 if (exec_argv_) {
127 for (int i = 0; exec_argv_[i]; i++) {
128 free(exec_argv_[i]);
129 }
130 delete[] exec_argv_;
131 exec_argv_ = nullptr;
132 }
133 }
134
ClosePipes()135 void SubProcess::ClosePipes() {
136 for (int i = 0; i < kNFds; i++) {
137 if (parent_pipe_[i] != nullptr) {
138 CloseHandle(parent_pipe_[i]);
139 parent_pipe_[i] = nullptr;
140 }
141 }
142 }
143
SetProgram(const string & file,const std::vector<string> & argv)144 void SubProcess::SetProgram(const string& file,
145 const std::vector<string>& argv) {
146 mutex_lock procLock(proc_mu_);
147 mutex_lock dataLock(data_mu_);
148 if (running_) {
149 LOG(FATAL) << "SetProgram called after the process was started.";
150 return;
151 }
152
153 FreeArgs();
154 exec_path_ = _strdup(file.c_str());
155 if (exec_path_ == nullptr) {
156 LOG(FATAL) << "SetProgram failed to allocate file string.";
157 return;
158 }
159
160 int argc = argv.size();
161 exec_argv_ = new char*[argc + 1];
162 for (int i = 0; i < argc; i++) {
163 exec_argv_[i] = _strdup(argv[i].c_str());
164 if (exec_argv_[i] == nullptr) {
165 LOG(FATAL) << "SetProgram failed to allocate command argument.";
166 return;
167 }
168 }
169 exec_argv_[argc] = nullptr;
170 }
171
SetChannelAction(Channel chan,ChannelAction action)172 void SubProcess::SetChannelAction(Channel chan, ChannelAction action) {
173 mutex_lock procLock(proc_mu_);
174 mutex_lock dataLock(data_mu_);
175 if (running_) {
176 LOG(FATAL) << "SetChannelAction called after the process was started.";
177 } else if (!chan_valid(chan)) {
178 LOG(FATAL) << "SetChannelAction called with invalid channel: " << chan;
179 } else if ((action != ACTION_CLOSE) && (action != ACTION_PIPE) &&
180 (action != ACTION_DUPPARENT)) {
181 LOG(FATAL) << "SetChannelAction called with invalid action: " << action;
182 } else {
183 action_[chan] = action;
184 }
185 }
186
Start()187 bool SubProcess::Start() {
188 mutex_lock procLock(proc_mu_);
189 mutex_lock dataLock(data_mu_);
190 if (running_) {
191 LOG(ERROR) << "Start called after the process was started.";
192 return false;
193 }
194 if ((exec_path_ == nullptr) || (exec_argv_ == nullptr)) {
195 LOG(ERROR) << "Start called without setting a program.";
196 return false;
197 }
198
199 // SecurityAttributes to use in winapi calls below.
200 SECURITY_ATTRIBUTES attrs;
201 attrs.nLength = sizeof(SECURITY_ATTRIBUTES);
202 attrs.bInheritHandle = TRUE;
203 attrs.lpSecurityDescriptor = nullptr;
204
205 // No need to store subprocess end of the pipes, they will be closed before
206 // this function terminates.
207 HANDLE child_pipe_[kNFds] TF_GUARDED_BY(data_mu_);
208
209 // Create parent/child pipes for the specified channels and make the
210 // parent-side of the pipes non-blocking.
211 for (int i = 0; i < kNFds; i++) {
212 if (action_[i] == ACTION_PIPE) {
213 if (!CreatePipe(i == CHAN_STDIN ? child_pipe_ + i : parent_pipe_ + i,
214 i == CHAN_STDIN ? parent_pipe_ + i : child_pipe_ + i,
215 &attrs, PIPE_BUF_SIZE)) {
216 LOG(ERROR) << "Cannot create pipe. Error code: " << GetLastError();
217 ClosePipes();
218 return false;
219 }
220
221 // Parent pipes should not be inherited by the child process
222 if (!SetHandleInformation(parent_pipe_[i], HANDLE_FLAG_INHERIT, 0)) {
223 LOG(ERROR) << "Cannot set pipe handle attributes.";
224 ClosePipes();
225 return false;
226 }
227 } else if (action_[i] == ACTION_DUPPARENT) {
228 if (i == CHAN_STDIN) {
229 child_pipe_[i] = GetStdHandle(STD_INPUT_HANDLE);
230 } else if (i == CHAN_STDOUT) {
231 child_pipe_[i] = GetStdHandle(STD_OUTPUT_HANDLE);
232 } else {
233 child_pipe_[i] = GetStdHandle(STD_ERROR_HANDLE);
234 }
235 } else { // ACTION_CLOSE
236 parent_pipe_[i] = nullptr;
237 child_pipe_[i] = nullptr;
238 }
239 }
240
241 // Concatanate argv, because winapi wants it so.
242 string command_line = strings::StrCat("\"", exec_path_, "\"");
243 for (int i = 1; exec_argv_[i]; i++) {
244 command_line.append(strings::StrCat(" \"", exec_argv_[i], "\""));
245 }
246
247 // Set up the STARTUPINFO struct with information about the pipe handles.
248 STARTUPINFOA si;
249 ZeroMemory(&si, sizeof(STARTUPINFO));
250 si.cb = sizeof(STARTUPINFO);
251 si.dwFlags |= STARTF_USESTDHANDLES;
252
253 // Handle the pipes for the child process.
254 if (child_pipe_[CHAN_STDIN]) {
255 si.hStdInput = child_pipe_[CHAN_STDIN];
256 }
257 if (child_pipe_[CHAN_STDOUT]) {
258 si.hStdOutput = child_pipe_[CHAN_STDOUT];
259 }
260 if (child_pipe_[CHAN_STDERR]) {
261 si.hStdError = child_pipe_[CHAN_STDERR];
262 }
263
264 // Allocate the POROCESS_INFORMATION struct.
265 win_pi_ = new PROCESS_INFORMATION;
266
267 // Execute the child program.
268 bool bSuccess =
269 CreateProcessA(nullptr, const_cast<char*>(command_line.c_str()), nullptr,
270 nullptr, TRUE, 0, nullptr, nullptr, &si,
271 reinterpret_cast<PROCESS_INFORMATION*>(win_pi_));
272
273 if (bSuccess) {
274 for (int i = 0; i < kNFds; i++) {
275 if (child_pipe_[i] != nullptr) {
276 CloseHandle(child_pipe_[i]);
277 child_pipe_[i] = nullptr;
278 }
279 }
280 running_ = true;
281 return true;
282 } else {
283 LOG(ERROR) << "Call to CreateProcess failed. Error code: "
284 << GetLastError();
285 ClosePipes();
286 return false;
287 }
288 }
289
Wait()290 bool SubProcess::Wait() {
291 int status;
292 return WaitInternal(&status);
293 }
294
WaitInternal(int * status)295 bool SubProcess::WaitInternal(int* status) {
296 // The waiter must release proc_mu_ while waiting in order for Kill() to work.
297 proc_mu_.lock();
298 bool running = running_;
299 PROCESS_INFORMATION pi_ = *reinterpret_cast<PROCESS_INFORMATION*>(win_pi_);
300 proc_mu_.unlock();
301
302 bool ret = false;
303 if (running && pi_.hProcess) {
304 DWORD wait_status = WaitForSingleObject(pi_.hProcess, INFINITE);
305 if (wait_status == WAIT_OBJECT_0) {
306 DWORD process_exit_code = 0;
307 if (GetExitCodeProcess(pi_.hProcess, &process_exit_code)) {
308 *status = static_cast<int>(process_exit_code);
309 } else {
310 LOG(FATAL) << "Wait failed with code: " << GetLastError();
311 }
312 } else {
313 LOG(FATAL) << "WaitForSingleObject call on the process handle failed. "
314 "Error code: "
315 << wait_status;
316 }
317 }
318
319 proc_mu_.lock();
320 if ((running_ == running) &&
321 (pi_.hProcess ==
322 reinterpret_cast<PROCESS_INFORMATION*>(win_pi_)->hProcess)) {
323 running_ = false;
324 CloseHandle(reinterpret_cast<PROCESS_INFORMATION*>(win_pi_)->hProcess);
325 CloseHandle(reinterpret_cast<PROCESS_INFORMATION*>(win_pi_)->hThread);
326 reinterpret_cast<PROCESS_INFORMATION*>(win_pi_)->hProcess = nullptr;
327 reinterpret_cast<PROCESS_INFORMATION*>(win_pi_)->hThread = nullptr;
328 }
329 proc_mu_.unlock();
330 return *status == 0;
331 }
332
Kill(int unused_signal)333 bool SubProcess::Kill(int unused_signal) {
334 proc_mu_.lock();
335 bool running = running_;
336 PROCESS_INFORMATION pi_ = *reinterpret_cast<PROCESS_INFORMATION*>(win_pi_);
337 proc_mu_.unlock();
338
339 bool ret = false;
340 if (running && pi_.hProcess) {
341 ret = TerminateProcess(pi_.hProcess, 0);
342 }
343 return ret;
344 }
345
Communicate(const string * stdin_input,string * stdout_output,string * stderr_output)346 int SubProcess::Communicate(const string* stdin_input, string* stdout_output,
347 string* stderr_output) {
348 proc_mu_.lock();
349 bool running = running_;
350 proc_mu_.unlock();
351 if (!running) {
352 LOG(ERROR) << "Communicate called without a running process.";
353 return 1;
354 }
355
356 HANDLE thread_handles[kNFds];
357 int thread_count = 0;
358 ThreadData thread_params[kNFds];
359
360 // Lock data_mu_ but not proc_mu_ while communicating with the child process
361 // in order for Kill() to be able to terminate the child from another thread.
362 data_mu_.lock();
363 if (!IsProcessFinished(
364 reinterpret_cast<PROCESS_INFORMATION*>(win_pi_)->hProcess) ||
365 (parent_pipe_[CHAN_STDOUT] != nullptr) ||
366 (parent_pipe_[CHAN_STDERR] != nullptr)) {
367 if (parent_pipe_[CHAN_STDIN] != nullptr) {
368 if (stdin_input) {
369 thread_params[thread_count].iobuf = const_cast<string*>(stdin_input);
370 thread_params[thread_count].iohandle = parent_pipe_[CHAN_STDIN];
371 parent_pipe_[CHAN_STDIN] = nullptr;
372 thread_handles[thread_count] =
373 CreateThread(NULL, 0, InputThreadFunction,
374 thread_params + thread_count, 0, NULL);
375 thread_count++;
376 }
377 } else {
378 CloseHandle(parent_pipe_[CHAN_STDIN]);
379 parent_pipe_[CHAN_STDIN] = NULL;
380 }
381
382 if (parent_pipe_[CHAN_STDOUT] != nullptr) {
383 if (stdout_output != nullptr) {
384 thread_params[thread_count].iobuf = stdout_output;
385 thread_params[thread_count].iohandle = parent_pipe_[CHAN_STDOUT];
386 parent_pipe_[CHAN_STDOUT] = NULL;
387 thread_handles[thread_count] =
388 CreateThread(NULL, 0, OutputThreadFunction,
389 thread_params + thread_count, 0, NULL);
390 thread_count++;
391 } else {
392 CloseHandle(parent_pipe_[CHAN_STDOUT]);
393 parent_pipe_[CHAN_STDOUT] = nullptr;
394 }
395 }
396
397 if (parent_pipe_[CHAN_STDERR] != nullptr) {
398 if (stderr_output != nullptr) {
399 thread_params[thread_count].iobuf = stderr_output;
400 thread_params[thread_count].iohandle = parent_pipe_[CHAN_STDERR];
401 parent_pipe_[CHAN_STDERR] = NULL;
402 thread_handles[thread_count] =
403 CreateThread(NULL, 0, OutputThreadFunction,
404 thread_params + thread_count, 0, NULL);
405 thread_count++;
406 } else {
407 CloseHandle(parent_pipe_[CHAN_STDERR]);
408 parent_pipe_[CHAN_STDERR] = nullptr;
409 }
410 }
411 }
412
413 // Wait for all IO threads to exit.
414 if (thread_count > 0) {
415 DWORD wait_result = WaitForMultipleObjects(thread_count, thread_handles,
416 true, // wait all threads
417 INFINITE);
418 if (wait_result != WAIT_OBJECT_0) {
419 LOG(ERROR) << "Waiting on the io threads failed! result: " << wait_result
420 << std::endl;
421 return -1;
422 }
423
424 for (int i = 0; i < thread_count; i++) {
425 DWORD exit_code;
426 if (GetExitCodeThread(thread_handles[i], &exit_code)) {
427 if (exit_code) {
428 LOG(ERROR) << "One of the IO threads failed with code: " << exit_code;
429 }
430 } else {
431 LOG(ERROR) << "Error checking io thread exit statuses. Error Code: "
432 << GetLastError();
433 }
434 }
435 }
436
437 data_mu_.unlock();
438
439 // Wait for the child process to exit and return its status.
440 int status;
441 return WaitInternal(&status) ? status : -1;
442 }
443
CreateSubProcess(const std::vector<string> & argv)444 std::unique_ptr<SubProcess> CreateSubProcess(const std::vector<string>& argv) {
445 std::unique_ptr<SubProcess> proc(new SubProcess());
446 proc->SetProgram(argv[0], argv);
447 proc->SetChannelAction(CHAN_STDERR, ACTION_DUPPARENT);
448 proc->SetChannelAction(CHAN_STDOUT, ACTION_DUPPARENT);
449 return proc;
450 }
451
452 } // namespace tensorflow
453