• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import math
6import os
7import random
8import re
9import sys
10import time
11
12import common
13from autotest_lib.client.common_lib import global_config
14from autotest_lib.frontend import database_settings_helper
15from autotest_lib.tko import utils
16
17
18def _log_error(msg):
19    """Log an error message.
20
21    @param msg: Message string
22    """
23    print >> sys.stderr, msg
24    sys.stderr.flush()  # we want these msgs to show up immediately
25
26
27def _format_operational_error(e):
28    """Format OperationalError.
29
30    @param e: OperationalError instance.
31    """
32    return ("%s: An operational error occurred during a database "
33            "operation: %s" % (time.strftime("%X %x"), str(e)))
34
35
36class MySQLTooManyRows(Exception):
37    """Too many records."""
38    pass
39
40
41class db_sql(object):
42    """Data access."""
43
44    def __init__(self, debug=False, autocommit=True, host=None,
45                 database=None, user=None, password=None):
46        self.debug = debug
47        self.autocommit = autocommit
48        self._load_config(host, database, user, password)
49
50        self.con = None
51        self._init_db()
52
53        # if not present, insert statuses
54        self.status_idx = {}
55        self.status_word = {}
56        status_rows = self.select('status_idx, word', 'tko_status', None)
57        for s in status_rows:
58            self.status_idx[s[1]] = s[0]
59            self.status_word[s[0]] = s[1]
60
61        machine_map = os.path.join(os.path.dirname(__file__),
62                                   'machines')
63        if os.path.exists(machine_map):
64            self.machine_map = machine_map
65        else:
66            self.machine_map = None
67        self.machine_group = {}
68
69
70    def _load_config(self, host, database, user, password):
71        """Loads configuration settings required to connect to the database.
72
73        This will try to connect to use the settings prefixed with global_db_.
74        If they do not exist, they un-prefixed settings will be used.
75
76        If parameters are supplied, these will be taken instead of the values
77        in global_config.
78
79        @param host: If set, this host will be used, if not, the host will be
80                     retrieved from global_config.
81        @param database: If set, this database will be used, if not, the
82                         database will be retrieved from global_config.
83        @param user: If set, this user will be used, if not, the
84                         user will be retrieved from global_config.
85        @param password: If set, this password will be used, if not, the
86                         password will be retrieved from global_config.
87        """
88        database_settings = database_settings_helper.get_global_db_config()
89
90        # grab the host, database
91        self.host = host or database_settings['HOST']
92        self.database = database or database_settings['NAME']
93
94        # grab the user and password
95        self.user = user or database_settings['USER']
96        self.password = password or database_settings['PASSWORD']
97
98        # grab the timeout configuration
99        self.query_timeout =(
100                database_settings.get('OPTIONS', {}).get('timeout', 3600))
101
102        # Using fallback to non-global in order to work without configuration
103        # overhead on non-shard instances.
104        get_value = global_config.global_config.get_config_value_with_fallback
105        self.min_delay = get_value("AUTOTEST_WEB", "global_db_min_retry_delay",
106                                   "min_retry_delay", type=int, default=20)
107        self.max_delay = get_value("AUTOTEST_WEB", "global_db_max_retry_delay",
108                                   "max_retry_delay", type=int, default=60)
109
110        # TODO(beeps): Move this to django settings once we have routers.
111        # On test instances mysql connects through a different port. No point
112        # piping this through our entire infrastructure when it is only really
113        # used for testing; Ideally we would specify this through django
114        # settings and default it to the empty string so django will figure out
115        # the default based on the database backend (eg: mysql, 3306), but until
116        # we have database routers in place any django settings will apply to
117        # both tko and afe.
118        # The intended use of this port is to allow a testing shard vm to
119        # update the master vm's database with test results. Specifying
120        # and empty string will fallback to not even specifying the port
121        # to the backend in tko/db.py. Unfortunately this means retries
122        # won't work on the test cluster till we've migrated to routers.
123        self.port = global_config.global_config.get_config_value(
124                "AUTOTEST_WEB", "global_db_port", type=str, default='')
125
126
127    def _init_db(self):
128        # make sure we clean up any existing connection
129        if self.con:
130            self.con.close()
131            self.con = None
132
133        # create the db connection and cursor
134        self.con = self.connect(self.host, self.database,
135                                self.user, self.password, self.port)
136        self.cur = self.con.cursor()
137
138
139    def _random_delay(self):
140        delay = random.randint(self.min_delay, self.max_delay)
141        time.sleep(delay)
142
143
144    def run_with_retry(self, function, *args, **dargs):
145        """Call function(*args, **dargs) until either it passes
146        without an operational error, or a timeout is reached.
147        This will re-connect to the database, so it is NOT safe
148        to use this inside of a database transaction.
149
150        It can be safely used with transactions, but the
151        transaction start & end must be completely contained
152        within the call to 'function'.
153
154        @param function: The function to run with retry.
155        @param args: The arguments
156        @param dargs: The named arguments.
157        """
158        OperationalError = _get_error_class("OperationalError")
159
160        success = False
161        start_time = time.time()
162        while not success:
163            try:
164                result = function(*args, **dargs)
165            except OperationalError, e:
166                _log_error("%s; retrying, don't panic yet"
167                           % _format_operational_error(e))
168                stop_time = time.time()
169                elapsed_time = stop_time - start_time
170                if elapsed_time > self.query_timeout:
171                    raise
172                else:
173                    try:
174                        self._random_delay()
175                        self._init_db()
176                    except OperationalError, e:
177                        _log_error('%s; panic now'
178                                   % _format_operational_error(e))
179            else:
180                success = True
181        return result
182
183
184    def dprint(self, value):
185        """Print out debug value.
186
187        @param value: The value to print out.
188        """
189        if self.debug:
190            sys.stdout.write('SQL: ' + str(value) + '\n')
191
192
193    def _commit(self):
194        """Private method for function commit to call for retry.
195        """
196        return self.con.commit()
197
198
199    def commit(self):
200        """Commit the sql transaction."""
201        if self.autocommit:
202            return self.run_with_retry(self._commit)
203        else:
204            return self._commit()
205
206
207    def rollback(self):
208        """Rollback the sql transaction."""
209        self.con.rollback()
210
211
212    def get_last_autonumber_value(self):
213        """Gets the last auto number.
214
215        @return: The last auto number.
216        """
217        self.cur.execute('SELECT LAST_INSERT_ID()', [])
218        return self.cur.fetchall()[0][0]
219
220
221    def _quote(self, field):
222        return '`%s`' % field
223
224
225    def _where_clause(self, where):
226        if not where:
227            return '', []
228
229        if isinstance(where, dict):
230            # key/value pairs (which should be equal, or None for null)
231            keys, values = [], []
232            for field, value in where.iteritems():
233                quoted_field = self._quote(field)
234                if value is None:
235                    keys.append(quoted_field + ' is null')
236                else:
237                    keys.append(quoted_field + '=%s')
238                    values.append(value)
239            where_clause = ' and '.join(keys)
240        elif isinstance(where, basestring):
241            # the exact string
242            where_clause = where
243            values = []
244        elif isinstance(where, tuple):
245            # preformatted where clause + values
246            where_clause, values = where
247            assert where_clause
248        else:
249            raise ValueError('Invalid "where" value: %r' % where)
250
251        return ' WHERE ' + where_clause, values
252
253
254
255    def select(self, fields, table, where, distinct=False, group_by=None,
256               max_rows=None):
257        """\
258                This selects all the fields requested from a
259                specific table with a particular where clause.
260                The where clause can either be a dictionary of
261                field=value pairs, a string, or a tuple of (string,
262                a list of values).  The last option is what you
263                should use when accepting user input as it'll
264                protect you against sql injection attacks (if
265                all user data is placed in the array rather than
266                the raw SQL).
267
268                For example:
269                  where = ("a = %s AND b = %s", ['val', 'val'])
270                is better than
271                  where = "a = 'val' AND b = 'val'"
272
273        @param fields: The list of selected fields string.
274        @param table: The name of the database table.
275        @param where: The where clause string.
276        @param distinct: If select distinct values.
277        @param group_by: Group by clause.
278        @param max_rows: unused.
279        """
280        cmd = ['select']
281        if distinct:
282            cmd.append('distinct')
283        cmd += [fields, 'from', table]
284
285        where_clause, values = self._where_clause(where)
286        cmd.append(where_clause)
287
288        if group_by:
289            cmd.append(' GROUP BY ' + group_by)
290
291        self.dprint('%s %s' % (' '.join(cmd), values))
292
293        # create a re-runable function for executing the query
294        def exec_sql():
295            """Exeuctes an the sql command."""
296            sql = ' '.join(cmd)
297            numRec = self.cur.execute(sql, values)
298            if max_rows is not None and numRec > max_rows:
299                msg = 'Exceeded allowed number of records'
300                raise MySQLTooManyRows(msg)
301            return self.cur.fetchall()
302
303        # run the query, re-trying after operational errors
304        if self.autocommit:
305            return self.run_with_retry(exec_sql)
306        else:
307            return exec_sql()
308
309
310    def select_sql(self, fields, table, sql, values):
311        """\
312                select fields from table "sql"
313
314        @param fields: The list of selected fields string.
315        @param table: The name of the database table.
316        @param sql: The sql string.
317        @param values: The sql string parameter values.
318        """
319        cmd = 'select %s from %s %s' % (fields, table, sql)
320        self.dprint(cmd)
321
322        # create a -re-runable function for executing the query
323        def _exec_sql():
324            self.cur.execute(cmd, values)
325            return self.cur.fetchall()
326
327        # run the query, re-trying after operational errors
328        if self.autocommit:
329            return self.run_with_retry(_exec_sql)
330        else:
331            return _exec_sql()
332
333
334    def _exec_sql_with_commit(self, sql, values, commit):
335        if self.autocommit:
336            # re-run the query until it succeeds
337            def _exec_sql():
338                self.cur.execute(sql, values)
339                self.con.commit()
340            self.run_with_retry(_exec_sql)
341        else:
342            # take one shot at running the query
343            self.cur.execute(sql, values)
344            if commit:
345                self.con.commit()
346
347
348    def insert(self, table, data, commit=None):
349        """\
350                'insert into table (keys) values (%s ... %s)', values
351
352                data:
353                        dictionary of fields and data
354
355        @param table: The name of the table.
356        @param data: The insert data.
357        @param commit: If commit the transaction .
358        """
359        fields = data.keys()
360        refs = ['%s' for field in fields]
361        values = [data[field] for field in fields]
362        cmd = ('insert into %s (%s) values (%s)' %
363               (table, ','.join(self._quote(field) for field in fields),
364                ','.join(refs)))
365        self.dprint('%s %s' % (cmd, values))
366
367        self._exec_sql_with_commit(cmd, values, commit)
368
369
370    def delete(self, table, where, commit = None):
371        """Delete entries.
372
373        @param table: The name of the table.
374        @param where: The where clause.
375        @param commit: If commit the transaction .
376        """
377        cmd = ['delete from', table]
378        if commit is None:
379            commit = self.autocommit
380        where_clause, values = self._where_clause(where)
381        cmd.append(where_clause)
382        sql = ' '.join(cmd)
383        self.dprint('%s %s' % (sql, values))
384
385        self._exec_sql_with_commit(sql, values, commit)
386
387
388    def update(self, table, data, where, commit = None):
389        """\
390                'update table set data values (%s ... %s) where ...'
391
392                data:
393                        dictionary of fields and data
394
395        @param table: The name of the table.
396        @param data: The sql parameter values.
397        @param where: The where clause.
398        @param commit: If commit the transaction .
399        """
400        if commit is None:
401            commit = self.autocommit
402        cmd = 'update %s ' % table
403        fields = data.keys()
404        data_refs = [self._quote(field) + '=%s' for field in fields]
405        data_values = [data[field] for field in fields]
406        cmd += ' set ' + ', '.join(data_refs)
407
408        where_clause, where_values = self._where_clause(where)
409        cmd += where_clause
410
411        values = data_values + where_values
412        self.dprint('%s %s' % (cmd, values))
413
414        self._exec_sql_with_commit(cmd, values, commit)
415
416
417    def delete_job(self, tag, commit = None):
418        """Delete a tko job.
419
420        @param tag: The job tag.
421        @param commit: If commit the transaction .
422        """
423        job_idx = self.find_job(tag)
424        for test_idx in self.find_tests(job_idx):
425            where = {'test_idx' : test_idx}
426            self.delete('tko_iteration_result', where)
427            self.delete('tko_iteration_perf_value', where)
428            self.delete('tko_iteration_attributes', where)
429            self.delete('tko_test_attributes', where)
430            self.delete('tko_test_labels_tests', {'test_id': test_idx})
431        where = {'job_idx' : job_idx}
432        self.delete('tko_tests', where)
433        self.delete('tko_jobs', where)
434
435
436    def insert_job(self, tag, job, parent_job_id=None, commit=None):
437        """Insert a tko job.
438
439        @param tag: The job tag.
440        @param job: The job object.
441        @param parent_job_id: The parent job id.
442        @param commit: If commit the transaction .
443
444        @return The dict of data inserted into the tko_jobs table.
445        """
446        job.machine_idx = self.lookup_machine(job.machine)
447        if not job.machine_idx:
448            job.machine_idx = self.insert_machine(job, commit=commit)
449        elif job.machine:
450            # Only try to update tko_machines record if machine is set. This
451            # prevents unnecessary db writes for suite jobs.
452            self.update_machine_information(job, commit=commit)
453
454        afe_job_id = utils.get_afe_job_id(tag)
455
456        data = {'tag':tag,
457                'label': job.label,
458                'username': job.user,
459                'machine_idx': job.machine_idx,
460                'queued_time': job.queued_time,
461                'started_time': job.started_time,
462                'finished_time': job.finished_time,
463                'afe_job_id': afe_job_id,
464                'afe_parent_job_id': parent_job_id,
465                'build': job.build,
466                'build_version': job.build_version,
467                'board': job.board,
468                'suite': job.suite}
469        job.afe_job_id = afe_job_id
470        if parent_job_id:
471            job.afe_parent_job_id = str(parent_job_id)
472
473        # TODO(ntang): check job.index directly.
474        is_update = hasattr(job, 'index')
475        if is_update:
476            self.update('tko_jobs', data, {'job_idx': job.index}, commit=commit)
477        else:
478            self.insert('tko_jobs', data, commit=commit)
479            job.index = self.get_last_autonumber_value()
480        self.update_job_keyvals(job, commit=commit)
481        for test in job.tests:
482            self.insert_test(job, test, commit=commit)
483
484        data['job_idx'] = job.index
485        return data
486
487
488    def update_job_keyvals(self, job, commit=None):
489        """Updates the job key values.
490
491        @param job: The job object.
492        @param commit: If commit the transaction .
493        """
494        for key, value in job.keyval_dict.iteritems():
495            where = {'job_id': job.index, 'key': key}
496            data = dict(where, value=value)
497            exists = self.select('id', 'tko_job_keyvals', where=where)
498
499            if exists:
500                self.update('tko_job_keyvals', data, where=where, commit=commit)
501            else:
502                self.insert('tko_job_keyvals', data, commit=commit)
503
504
505    def insert_test(self, job, test, commit = None):
506        """Inserts a job test.
507
508        @param job: The job object.
509        @param test: The test object.
510        @param commit: If commit the transaction .
511        """
512        kver = self.insert_kernel(test.kernel, commit=commit)
513        data = {'job_idx':job.index, 'test':test.testname,
514                'subdir':test.subdir, 'kernel_idx':kver,
515                'status':self.status_idx[test.status],
516                'reason':test.reason, 'machine_idx':job.machine_idx,
517                'started_time': test.started_time,
518                'finished_time':test.finished_time}
519        is_update = hasattr(test, "test_idx")
520        if is_update:
521            test_idx = test.test_idx
522            self.update('tko_tests', data,
523                        {'test_idx': test_idx}, commit=commit)
524            where = {'test_idx': test_idx}
525            self.delete('tko_iteration_result', where)
526            self.delete('tko_iteration_perf_value', where)
527            self.delete('tko_iteration_attributes', where)
528            where['user_created'] = 0
529            self.delete('tko_test_attributes', where)
530        else:
531            self.insert('tko_tests', data, commit=commit)
532            test_idx = test.test_idx = self.get_last_autonumber_value()
533        data = {'test_idx': test_idx}
534
535        for i in test.iterations:
536            data['iteration'] = i.index
537            for key, value in i.attr_keyval.iteritems():
538                data['attribute'] = key
539                data['value'] = value
540                self.insert('tko_iteration_attributes', data,
541                            commit=commit)
542            for key, value in i.perf_keyval.iteritems():
543                data['attribute'] = key
544                if math.isnan(value) or math.isinf(value):
545                    data['value'] = None
546                else:
547                    data['value'] = value
548                self.insert('tko_iteration_result', data,
549                            commit=commit)
550
551        data = {'test_idx': test_idx}
552
553        for key, value in test.attributes.iteritems():
554            data = {'test_idx': test_idx, 'attribute': key,
555                    'value': value}
556            self.insert('tko_test_attributes', data, commit=commit)
557
558        if not is_update:
559            for label_index in test.labels:
560                data = {'test_id': test_idx, 'testlabel_id': label_index}
561                self.insert('tko_test_labels_tests', data, commit=commit)
562
563
564    def read_machine_map(self):
565        """Reads the machine map."""
566        if self.machine_group or not self.machine_map:
567            return
568        for line in open(self.machine_map, 'r').readlines():
569            (machine, group) = line.split()
570            self.machine_group[machine] = group
571
572
573    def machine_info_dict(self, job):
574        """Reads the machine information of a job.
575
576        @param job: The job object.
577
578        @return: The machine info dictionary.
579        """
580        hostname = job.machine
581        group = job.machine_group
582        owner = job.machine_owner
583
584        if not group:
585            self.read_machine_map()
586            group = self.machine_group.get(hostname, hostname)
587            if group == hostname and owner:
588                group = owner + '/' + hostname
589
590        return {'hostname': hostname, 'machine_group': group, 'owner': owner}
591
592
593    def insert_machine(self, job, commit = None):
594        """Inserts the job machine.
595
596        @param job: The job object.
597        @param commit: If commit the transaction .
598        """
599        machine_info = self.machine_info_dict(job)
600        self.insert('tko_machines', machine_info, commit=commit)
601        return self.get_last_autonumber_value()
602
603
604    def update_machine_information(self, job, commit = None):
605        """Updates the job machine information.
606
607        @param job: The job object.
608        @param commit: If commit the transaction .
609        """
610        machine_info = self.machine_info_dict(job)
611        self.update('tko_machines', machine_info,
612                    where={'hostname': machine_info['hostname']},
613                    commit=commit)
614
615
616    def lookup_machine(self, hostname):
617        """Look up the machine information.
618
619        @param hostname: The hostname as string.
620        """
621        where = { 'hostname' : hostname }
622        rows = self.select('machine_idx', 'tko_machines', where)
623        if rows:
624            return rows[0][0]
625        else:
626            return None
627
628
629    def lookup_kernel(self, kernel):
630        """Look up the kernel.
631
632        @param kernel: The kernel object.
633        """
634        rows = self.select('kernel_idx', 'tko_kernels',
635                                {'kernel_hash':kernel.kernel_hash})
636        if rows:
637            return rows[0][0]
638        else:
639            return None
640
641
642    def insert_kernel(self, kernel, commit = None):
643        """Insert a kernel.
644
645        @param kernel: The kernel object.
646        @param commit: If commit the transaction .
647        """
648        kver = self.lookup_kernel(kernel)
649        if kver:
650            return kver
651
652        # If this kernel has any significant patches, append their hash
653        # as diferentiator.
654        printable = kernel.base
655        patch_count = 0
656        for patch in kernel.patches:
657            match = re.match(r'.*(-mm[0-9]+|-git[0-9]+)\.(bz2|gz)$',
658                                                    patch.reference)
659            if not match:
660                patch_count += 1
661
662        self.insert('tko_kernels',
663                    {'base':kernel.base,
664                     'kernel_hash':kernel.kernel_hash,
665                     'printable':printable},
666                    commit=commit)
667        kver = self.get_last_autonumber_value()
668
669        if patch_count > 0:
670            printable += ' p%d' % (kver)
671            self.update('tko_kernels',
672                    {'printable':printable},
673                    {'kernel_idx':kver})
674
675        for patch in kernel.patches:
676            self.insert_patch(kver, patch, commit=commit)
677        return kver
678
679
680    def insert_patch(self, kver, patch, commit = None):
681        """Insert a kernel patch.
682
683        @param kver: The kernel version.
684        @param patch: The kernel patch object.
685        @param commit: If commit the transaction .
686        """
687        print patch.reference
688        name = os.path.basename(patch.reference)[:80]
689        self.insert('tko_patches',
690                    {'kernel_idx': kver,
691                     'name':name,
692                     'url':patch.reference,
693                     'hash':patch.hash},
694                    commit=commit)
695
696
697    def find_test(self, job_idx, testname, subdir):
698        """Find a test by name.
699
700        @param job_idx: The job index.
701        @param testname: The test name.
702        @param subdir: The test sub directory under the job directory.
703        """
704        where = {'job_idx': job_idx , 'test': testname, 'subdir': subdir}
705        rows = self.select('test_idx', 'tko_tests', where)
706        if rows:
707            return rows[0][0]
708        else:
709            return None
710
711
712    def find_tests(self, job_idx):
713        """Find all tests by job index.
714
715        @param job_idx: The job index.
716        @return: A list of tests.
717        """
718        where = { 'job_idx':job_idx }
719        rows = self.select('test_idx', 'tko_tests', where)
720        if rows:
721            return [row[0] for row in rows]
722        else:
723            return []
724
725
726    def find_job(self, tag):
727        """Find a job by tag.
728
729        @param tag: The job tag name.
730        @return: The job object or None.
731        """
732        rows = self.select('job_idx', 'tko_jobs', {'tag': tag})
733        if rows:
734            return rows[0][0]
735        else:
736            return None
737
738
739def _get_db_type():
740    """Get the database type name to use from the global config."""
741    get_value = global_config.global_config.get_config_value_with_fallback
742    return "db_" + get_value("AUTOTEST_WEB", "global_db_type", "db_type",
743                             default="mysql")
744
745
746def _get_error_class(class_name):
747    """Retrieves the appropriate error class by name from the database
748    module."""
749    db_module = __import__("autotest_lib.tko." + _get_db_type(),
750                           globals(), locals(), ["driver"])
751    return getattr(db_module.driver, class_name)
752
753
754def db(*args, **dargs):
755    """Creates an instance of the database class with the arguments
756    provided in args and dargs, using the database type specified by
757    the global configuration (defaulting to mysql).
758
759    @param args: The db_type arguments.
760    @param dargs: The db_type named arguments.
761
762    @return: An db object.
763    """
764    db_type = _get_db_type()
765    db_module = __import__("autotest_lib.tko." + db_type, globals(),
766                           locals(), [db_type])
767    db = getattr(db_module, db_type)(*args, **dargs)
768    return db
769