• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <assert.h>
18 #include <errno.h>
19 #include <stdio.h>
20 
21 #include <map>
22 
23 #include <glog/logging.h>
24 
25 #include "common/libs/fs/shared_select.h"
26 #include "host/commands/launch/process_monitor.h"
27 
28 namespace cvd {
29 
30 namespace {
31 
NotifyThread(SharedFD fd)32 void NotifyThread(SharedFD fd) {
33   // The restarter thread is (likely) blocked on a call to select, to make it
34   // wake up and do some work we write something (anything, the content is not
35   // important) into the main side of the socket pair so that the call to select
36   // returns and the notification fd (restarter side of the socket pair) is
37   // marked as ready to read.
38   char buffer = 'a';
39   fd->Write(&buffer, sizeof(buffer));
40 }
41 
ConsumeNotifications(SharedFD fd)42 void ConsumeNotifications(SharedFD fd) {
43   // Once the starter thread is waken up due to a notification, the calls to
44   // select will continue to return immediately unless we read what was written
45   // on the main side of the socket pair. More than one notification can
46   // accumulate before the restarter thread consumes them, so we attempt to read
47   // more than it's written to consume them all at once. In the unlikely case of
48   // more than 8 notifications acummulating we simply read the first 8 and have
49   // another iteration on the restarter thread loop.
50   char buffer[8];
51   fd->Read(buffer, sizeof(buffer));
52 }
53 
54 }  // namespace
55 
ProcessMonitor()56 ProcessMonitor::ProcessMonitor() {
57   if (!SharedFD::SocketPair(AF_LOCAL, SOCK_STREAM, 0, &thread_comm_main_,
58                             &thread_comm_monitor_)) {
59     LOG(ERROR) << "Unable to create restarter communication socket pair: "
60                << strerror(errno);
61     return;
62   }
63   monitor_thread_ = std::thread([this]() { MonitorRoutine(); });
64 }
65 
StartSubprocess(Command cmd,OnSocketReadyCb callback)66 void ProcessMonitor::StartSubprocess(Command cmd, OnSocketReadyCb callback) {
67   auto proc = cmd.Start(true);
68   if (!proc.Started()) {
69     LOG(ERROR) << "Failed to start process";
70     return;
71   }
72   MonitorExistingSubprocess(std::move(cmd), std::move(proc), callback);
73 }
74 
MonitorExistingSubprocess(Command cmd,Subprocess proc,OnSocketReadyCb callback)75 void ProcessMonitor::MonitorExistingSubprocess(Command cmd, Subprocess proc,
76                                                OnSocketReadyCb callback) {
77   {
78     std::lock_guard<std::mutex> lock(processes_mutex_);
79     monitored_processes_.push_back(MonitorEntry());
80     auto& entry = monitored_processes_.back();
81     entry.cmd.reset(new Command(std::move(cmd)));
82     entry.proc.reset(new Subprocess(std::move(proc)));
83     entry.on_control_socket_ready_cb = callback;
84   }
85   // Wake the restarter thread up so that it starts monitoring this subprocess
86   // Do this after releasing the lock so that the restarter thread is free to
87   // begin work as soon as select returns.
88   NotifyThread(thread_comm_main_);
89 }
90 
RestartOnExitCb(MonitorEntry * entry)91 bool ProcessMonitor::RestartOnExitCb(MonitorEntry* entry) {
92   // Make sure the process actually exited
93   char buffer[16];
94   auto bytes_read = entry->proc->control_socket()->Read(buffer, sizeof(buffer));
95   if (bytes_read > 0) {
96     LOG(WARNING) << "Subprocess " << entry->cmd->GetShortName() << " wrote "
97                  << bytes_read
98                  << " bytes on the control socket, this is unexpected";
99     // The process may not have exited, continue monitoring without restarting
100     return true;
101   }
102 
103   LOG(INFO) << "Detected exit of monitored subprocess";
104   // Make sure the subprocess isn't left in a zombie state, and that the
105   // pid is logged
106   int wstatus;
107   auto wait_ret = TEMP_FAILURE_RETRY(entry->proc->Wait(&wstatus, 0));
108   // None of the error conditions specified on waitpid(2) apply
109   assert(wait_ret > 0);
110   if (WIFEXITED(wstatus)) {
111     LOG(INFO) << "Subprocess " << entry->cmd->GetShortName() << " ("
112               << wait_ret << ") has exited with exit code "
113               << WEXITSTATUS(wstatus);
114   } else if (WIFSIGNALED(wstatus)) {
115     LOG(ERROR) << "Subprocess " << entry->cmd->GetShortName() << " ("
116                << wait_ret << ") was interrupted by a signal: "
117                << WTERMSIG(wstatus);
118   } else {
119     LOG(INFO) << "subprocess " << entry->cmd->GetShortName() << " ("
120                << wait_ret << ") has exited for unknown reasons";
121   }
122   entry->proc.reset(new Subprocess(entry->cmd->Start(true)));
123   return true;
124 }
125 
DoNotMonitorCb(MonitorEntry *)126 bool ProcessMonitor::DoNotMonitorCb(MonitorEntry*) {
127   return false;
128 }
129 
MonitorRoutine()130 void ProcessMonitor::MonitorRoutine() {
131   LOG(INFO) << "Started monitoring subprocesses";
132   do {
133     SharedFDSet read_set;
134     read_set.Set(thread_comm_monitor_);
135     {
136       std::lock_guard<std::mutex> lock(processes_mutex_);
137       for (auto& monitored_process: monitored_processes_) {
138         auto control_socket = monitored_process.proc->control_socket();
139         if (!control_socket->IsOpen())  {
140           LOG(ERROR) << "The control socket for "
141                      << monitored_process.cmd->GetShortName()
142                      << " is closed, it's effectively NOT being monitored";
143         }
144         read_set.Set(control_socket);
145       }
146     }
147     // We can't call select while holding the lock as it would lead to a
148     // deadlock (restarter thread waiting for notifications from main thread,
149     // main thread waiting for the lock)
150     int num_fds = cvd::Select(&read_set, nullptr, nullptr, nullptr);
151     if (num_fds < 0) {
152       LOG(ERROR) << "Select call returned error on restarter thread: "
153                  << strerror(errno);
154     }
155     if (num_fds > 0) {
156       // Try the communication fd, it's the most likely to be set
157       if (read_set.IsSet(thread_comm_monitor_)) {
158         --num_fds;
159         ConsumeNotifications(thread_comm_monitor_);
160       }
161     }
162     {
163       std::lock_guard<std::mutex> lock(processes_mutex_);
164       // Keep track of the number of file descriptors ready for read, chances
165       // are we don't need to go over the entire list of subprocesses
166       auto it = monitored_processes_.begin();
167       while (it != monitored_processes_.end()) {
168         auto control_socket = it->proc->control_socket();
169         bool keep_monitoring = true;
170         if (read_set.IsSet(control_socket)) {
171           --num_fds;
172           keep_monitoring = it->on_control_socket_ready_cb(&(*it));
173         }
174         if (keep_monitoring) {
175           ++it;
176         } else {
177           it = monitored_processes_.erase(it);
178         }
179       }
180     }
181     assert(num_fds == 0);
182   } while (true);
183 }
184 
185 }  // namespace cvd
186