• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# Copyright 2017 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""Standalone service to monitor AFE servers and report to ts_mon"""
7
8import argparse
9import sys
10import time
11import logging
12import multiprocessing
13from six.moves import urllib
14
15import common
16from autotest_lib.client.common_lib import global_config
17from autotest_lib.frontend.afe.json_rpc import proxy
18from autotest_lib.server import frontend
19# import needed to setup host_attributes
20# pylint: disable=unused-import
21from autotest_lib.server import site_host_attributes
22from autotest_lib.utils.frozen_chromite.lib import metrics
23from autotest_lib.utils.frozen_chromite.lib import ts_mon_config
24
25METRIC_ROOT = 'chromeos/autotest/blackbox/afe_rpc'
26METRIC_RPC_CALL_DURATIONS = METRIC_ROOT + '/rpc_call_durations'
27METRIC_TICK = METRIC_ROOT + '/tick'
28METRIC_MONITOR_ERROR = METRIC_ROOT + '/afe_monitor_error'
29
30FAILURE_REASONS = {
31        proxy.JSONRPCException: 'JSONRPCException',
32        }
33
34def afe_rpc_call(hostname):
35    """Perform one rpc call set on server
36
37    @param hostname: server's hostname to poll
38    """
39    afe_monitor = AfeMonitor(hostname)
40    try:
41        afe_monitor.run()
42    except Exception as e:
43        metrics.Counter(METRIC_MONITOR_ERROR).increment(
44                fields={'target_hostname': hostname})
45        logging.exception('Exception when running against host %s', hostname)
46
47
48def update_shards(shards, shards_lock, period=600, stop_event=None):
49    """Updates dict of shards
50
51    @param shards: list of shards to be updated
52    @param shards_lock: shared lock for accessing shards
53    @param period: time between polls
54    @param stop_event: Event that can be set to stop polling
55    """
56    while(not stop_event or not stop_event.is_set()):
57        start_time = time.time()
58
59        logging.debug('Updating Shards')
60
61        # server_manager_utils.get_shards() is deprecated.
62        new_shards = set()
63
64        with shards_lock:
65            current_shards = set(shards)
66            rm_shards = current_shards - new_shards
67            add_shards = new_shards - current_shards
68
69            if rm_shards:
70                for s in rm_shards:
71                    shards.remove(s)
72
73            if add_shards:
74                shards.extend(add_shards)
75
76        if rm_shards:
77            logging.info('Servers left production: %s', str(rm_shards))
78
79        if add_shards:
80            logging.info('Servers entered production: %s',
81                    str(add_shards))
82
83        wait_time = (start_time + period) - time.time()
84        if wait_time > 0:
85            time.sleep(wait_time)
86
87
88def poll_rpc_servers(servers, servers_lock, shards=None, period=60,
89                     stop_event=None):
90    """Blocking function that polls all servers and shards
91
92    @param servers: list of servers to poll
93    @param servers_lock: lock to be used when accessing servers or shards
94    @param shards: list of shards to poll
95    @param period: time between polls
96    @param stop_event: Event that can be set to stop polling
97    """
98    pool = multiprocessing.Pool(processes=multiprocessing.cpu_count() * 4)
99
100    while(not stop_event or not stop_event.is_set()):
101        start_time = time.time()
102        with servers_lock:
103            all_servers = set(servers).union(shards)
104
105        logging.debug('Starting Server Polling: %s', ', '.join(all_servers))
106        pool.map(afe_rpc_call, all_servers)
107
108        logging.debug('Finished Server Polling')
109
110        metrics.Counter(METRIC_TICK).increment()
111
112        wait_time = (start_time + period) - time.time()
113        if wait_time > 0:
114            time.sleep(wait_time)
115
116
117class RpcFlightRecorder(object):
118    """Monitors a list of AFE"""
119    def __init__(self, servers, with_shards=True, poll_period=60):
120        """
121        @param servers: list of afe services to monitor
122        @param with_shards: also record status on shards
123        @param poll_period: frequency to poll all services, in seconds
124        """
125        self._manager = multiprocessing.Manager()
126
127        self._poll_period = poll_period
128
129        self._servers = self._manager.list(servers)
130        self._servers_lock = self._manager.RLock()
131
132        self._with_shards = with_shards
133        self._shards = self._manager.list()
134        self._update_shards_ps = None
135        self._poll_rpc_server_ps = None
136
137        self._stop_event = multiprocessing.Event()
138
139    def start(self):
140        """Call to start recorder"""
141        if(self._with_shards):
142            shard_args = [self._shards, self._servers_lock]
143            shard_kwargs = {'stop_event': self._stop_event}
144            self._update_shards_ps = multiprocessing.Process(
145                    name='update_shards',
146                    target=update_shards,
147                    args=shard_args,
148                    kwargs=shard_kwargs)
149
150            self._update_shards_ps.start()
151
152        poll_args = [self._servers, self._servers_lock]
153        poll_kwargs= {'shards':self._shards,
154                     'period':self._poll_period,
155                     'stop_event':self._stop_event}
156        self._poll_rpc_server_ps = multiprocessing.Process(
157                name='poll_rpc_servers',
158                target=poll_rpc_servers,
159                args=poll_args,
160                kwargs=poll_kwargs)
161
162        self._poll_rpc_server_ps.start()
163
164    def close(self):
165        """Send close event to all sub processes"""
166        self._stop_event.set()
167
168
169    def join(self, timeout=None):
170        """Blocking call until closed and processes complete
171
172        @param timeout: passed to each process, so could be >timeout"""
173        if self._poll_rpc_server_ps:
174            self._poll_rpc_server_ps.join(timeout)
175
176        if self._update_shards_ps:
177            self._update_shards_ps.join(timeout)
178
179def _failed(fields, msg_str, reason, err=None):
180    """Mark current run failed
181
182    @param fields, ts_mon fields to mark as failed
183    @param msg_str, message string to be filled
184    @param reason: why it failed
185    @param err: optional error to log more debug info
186    """
187    fields['success'] = False
188    fields['failure_reason'] = reason
189    logging.warning("%s failed - %s", msg_str, reason)
190    if err:
191        logging.debug("%s fail_err - %s", msg_str, str(err))
192
193class AfeMonitor(object):
194    """Object that runs rpc calls against the given afe frontend"""
195
196    def __init__(self, hostname):
197        """
198        @param hostname: hostname of server to monitor, string
199        """
200        self._hostname = hostname
201        self._afe = frontend.AFE(server=self._hostname)
202        self._metric_fields = {'target_hostname': self._hostname}
203
204
205    def run_cmd(self, cmd, expected=None):
206        """Runs rpc command and log metrics
207
208        @param cmd: string of rpc command to send
209        @param expected: expected result of rpc
210        """
211        metric_fields = self._metric_fields.copy()
212        metric_fields['command'] = cmd
213        metric_fields['success'] = True
214        metric_fields['failure_reason'] = ''
215
216        with metrics.SecondsTimer(METRIC_RPC_CALL_DURATIONS,
217                fields=dict(metric_fields), scale=0.001) as f:
218
219            msg_str = "%s:%s" % (self._hostname, cmd)
220
221
222            try:
223                result = self._afe.run(cmd)
224                logging.debug("%s result = %s", msg_str, result)
225                if expected is not None and expected != result:
226                    _failed(f, msg_str, 'IncorrectResponse')
227
228            except urllib.error.HTTPError as e:
229                _failed(f, msg_str, 'HTTPError:%d' % e.code)
230
231            except Exception as e:
232                _failed(f, msg_str, FAILURE_REASONS.get(type(e), 'Unknown'),
233                        err=e)
234
235                if type(e) not in FAILURE_REASONS:
236                    raise
237
238            if f['success']:
239                logging.info("%s success", msg_str)
240
241
242    def run(self):
243        """Tests server and returns the result"""
244        self.run_cmd('get_server_time')
245        self.run_cmd('ping_db', [True])
246
247
248def get_parser():
249    """Returns argparse parser"""
250    parser = argparse.ArgumentParser(description=__doc__)
251
252    parser.add_argument('-a', '--afe', action='append', default=[],
253                        help='Autotest FrontEnd server to monitor')
254
255    parser.add_argument('-p', '--poll-period', type=int, default=60,
256                        help='Frequency to poll AFE servers')
257
258    parser.add_argument('--no-shards', action='store_false', dest='with_shards',
259                        help='Disable shard updating')
260
261    return parser
262
263
264def main(argv):
265    """Main function
266
267    @param argv: commandline arguments passed
268    """
269    parser = get_parser()
270    options = parser.parse_args(argv[1:])
271
272
273    if not options.afe:
274        options.afe = [global_config.global_config.get_config_value(
275                        'SERVER', 'global_afe_hostname', default='cautotest')]
276
277    with ts_mon_config.SetupTsMonGlobalState('rpc_flight_recorder',
278                                             indirect=True):
279        flight_recorder = RpcFlightRecorder(options.afe,
280                                            with_shards=options.with_shards,
281                                            poll_period=options.poll_period)
282
283        flight_recorder.start()
284        flight_recorder.join()
285
286
287if __name__ == '__main__':
288    main(sys.argv)
289