• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5try:
6    import MySQLdb as driver
7except ImportError:
8    # This module (tko) is unconditionally imported by autoserv,
9    # even in environments where MyQSLdb is unavailable. Thus, we
10    # need to cope with import failure here.
11    # See https://bugs.chromium.org/p/chromium/issues/detail?id=860166#c17 for
12    # context.
13    class UtterlyFakeDb(object):
14        """Lame fake of MySQLdb for import time needs of this file."""
15        OperationalError = object()
16
17    driver = UtterlyFakeDb
18
19import math
20import os
21import random
22import re
23import sys
24import time
25
26import common
27from autotest_lib.client.common_lib import global_config
28from autotest_lib.client.common_lib import utils
29from autotest_lib.client.common_lib.cros import retry
30from autotest_lib.frontend import database_settings_helper
31
32try:
33    from chromite.lib import metrics
34except ImportError:
35    metrics = utils.metrics_mock
36
37
38def _log_error(msg):
39    """Log an error message.
40
41    @param msg: Message string
42    """
43    print >> sys.stderr, msg
44    sys.stderr.flush()  # we want these msgs to show up immediately
45
46
47def _format_operational_error(e):
48    """Format OperationalError.
49
50    @param e: OperationalError instance.
51    """
52    return ("%s: An operational error occurred during a database "
53            "operation: %s" % (time.strftime("%X %x"), str(e)))
54
55
56class MySQLTooManyRows(Exception):
57    """Too many records."""
58    pass
59
60
61def _connection_retry_callback():
62    """Callback method used to increment a retry metric."""
63    metrics.Counter('chromeos/autotest/tko/connection_retries').increment()
64
65
66class db_sql(object):
67    """Data access."""
68
69    def __init__(self, debug=False, autocommit=True, host=None,
70                 database=None, user=None, password=None):
71        self.debug = debug
72        self.autocommit = autocommit
73        self._load_config(host, database, user, password)
74
75        self.con = None
76        self._init_db()
77
78        # if not present, insert statuses
79        self.status_idx = {}
80        self.status_word = {}
81        status_rows = self.select('status_idx, word', 'tko_status', None)
82        for s in status_rows:
83            self.status_idx[s[1]] = s[0]
84            self.status_word[s[0]] = s[1]
85
86        machine_map = os.path.join(os.path.dirname(__file__),
87                                   'machines')
88        if os.path.exists(machine_map):
89            self.machine_map = machine_map
90        else:
91            self.machine_map = None
92        self.machine_group = {}
93
94
95    def _load_config(self, host, database, user, password):
96        """Loads configuration settings required to connect to the database.
97
98        This will try to connect to use the settings prefixed with global_db_.
99        If they do not exist, they un-prefixed settings will be used.
100
101        If parameters are supplied, these will be taken instead of the values
102        in global_config.
103
104        The setting of 'host' can be a real host, or a unix socket if it starts
105        with '/'.
106
107        @param host: If set, this host will be used, if not, the host will be
108                     retrieved from global_config.
109        @param database: If set, this database will be used, if not, the
110                         database will be retrieved from global_config.
111        @param user: If set, this user will be used, if not, the
112                         user will be retrieved from global_config.
113        @param password: If set, this password will be used, if not, the
114                         password will be retrieved from global_config.
115        """
116        database_settings = database_settings_helper.get_global_db_config()
117
118        # grab the host, database
119        self.host = host or database_settings['HOST']
120        self.database = database or database_settings['NAME']
121
122        # grab the user and password
123        self.user = user or database_settings['USER']
124        self.password = password or database_settings['PASSWORD']
125
126        # grab the timeout configuration
127        self.query_timeout =(
128                database_settings.get('OPTIONS', {}).get('timeout', 3600))
129
130        # Using fallback to non-global in order to work without configuration
131        # overhead on non-shard instances.
132        get_value = global_config.global_config.get_config_value_with_fallback
133        self.min_delay = get_value("AUTOTEST_WEB", "global_db_min_retry_delay",
134                                   "min_retry_delay", type=int, default=20)
135        self.max_delay = get_value("AUTOTEST_WEB", "global_db_max_retry_delay",
136                                   "max_retry_delay", type=int, default=60)
137
138        # TODO(beeps): Move this to django settings once we have routers.
139        # On test instances mysql connects through a different port. No point
140        # piping this through our entire infrastructure when it is only really
141        # used for testing; Ideally we would specify this through django
142        # settings and default it to the empty string so django will figure out
143        # the default based on the database backend (eg: mysql, 3306), but until
144        # we have database routers in place any django settings will apply to
145        # both tko and afe.
146        # The intended use of this port is to allow a testing shard vm to
147        # update the master vm's database with test results. Specifying
148        # and empty string will fallback to not even specifying the port
149        # to the backend in tko/db.py. Unfortunately this means retries
150        # won't work on the test cluster till we've migrated to routers.
151        self.port = global_config.global_config.get_config_value(
152                "AUTOTEST_WEB", "global_db_port", type=str, default='')
153
154
155    def _init_db(self):
156        # make sure we clean up any existing connection
157        if self.con:
158            self.con.close()
159            self.con = None
160
161        # create the db connection and cursor
162        self.con = self.connect(self.host, self.database,
163                                self.user, self.password, self.port)
164        self.cur = self.con.cursor()
165
166
167    def _random_delay(self):
168        delay = random.randint(self.min_delay, self.max_delay)
169        time.sleep(delay)
170
171
172    @retry.retry(driver.OperationalError, timeout_min=10,
173                 delay_sec=5, callback=_connection_retry_callback)
174    def connect(self, host, database, user, password, port):
175        """Open and return a connection to mysql database."""
176        connection_args = {
177            'db': database,
178            'user': user,
179            'passwd': password,
180            'connect_timeout': 20,
181        }
182        if port:
183            connection_args['port'] = int(port)
184
185        if host.startswith('/'):
186            return driver.connect(unix_socket=host, **connection_args)
187
188        return driver.connect(host=host, **connection_args)
189
190
191    def run_with_retry(self, function, *args, **dargs):
192        """Call function(*args, **dargs) until either it passes
193        without an operational error, or a timeout is reached.
194        This will re-connect to the database, so it is NOT safe
195        to use this inside of a database transaction.
196
197        It can be safely used with transactions, but the
198        transaction start & end must be completely contained
199        within the call to 'function'.
200
201        @param function: The function to run with retry.
202        @param args: The arguments
203        @param dargs: The named arguments.
204        """
205        success = False
206        start_time = time.time()
207        while not success:
208            try:
209                result = function(*args, **dargs)
210            except driver.OperationalError, e:
211                _log_error("%s; retrying, don't panic yet"
212                           % _format_operational_error(e))
213                stop_time = time.time()
214                elapsed_time = stop_time - start_time
215                if elapsed_time > self.query_timeout:
216                    raise
217                else:
218                    try:
219                        self._random_delay()
220                        self._init_db()
221                    except driver.OperationalError, e:
222                        _log_error('%s; panic now'
223                                   % _format_operational_error(e))
224            else:
225                success = True
226        return result
227
228
229    def dprint(self, value):
230        """Print out debug value.
231
232        @param value: The value to print out.
233        """
234        if self.debug:
235            sys.stdout.write('SQL: ' + str(value) + '\n')
236
237
238    def _commit(self):
239        """Private method for function commit to call for retry.
240        """
241        return self.con.commit()
242
243
244    def commit(self):
245        """Commit the sql transaction."""
246        if self.autocommit:
247            return self.run_with_retry(self._commit)
248        else:
249            return self._commit()
250
251
252    def rollback(self):
253        """Rollback the sql transaction."""
254        self.con.rollback()
255
256
257    def get_last_autonumber_value(self):
258        """Gets the last auto number.
259
260        @return: The last auto number.
261        """
262        self.cur.execute('SELECT LAST_INSERT_ID()', [])
263        return self.cur.fetchall()[0][0]
264
265
266    def _quote(self, field):
267        return '`%s`' % field
268
269
270    def _where_clause(self, where):
271        if not where:
272            return '', []
273
274        if isinstance(where, dict):
275            # key/value pairs (which should be equal, or None for null)
276            keys, values = [], []
277            for field, value in where.iteritems():
278                quoted_field = self._quote(field)
279                if value is None:
280                    keys.append(quoted_field + ' is null')
281                else:
282                    keys.append(quoted_field + '=%s')
283                    values.append(value)
284            where_clause = ' and '.join(keys)
285        elif isinstance(where, basestring):
286            # the exact string
287            where_clause = where
288            values = []
289        elif isinstance(where, tuple):
290            # preformatted where clause + values
291            where_clause, values = where
292            assert where_clause
293        else:
294            raise ValueError('Invalid "where" value: %r' % where)
295
296        return ' WHERE ' + where_clause, values
297
298
299
300    def select(self, fields, table, where, distinct=False, group_by=None,
301               max_rows=None):
302        """\
303                This selects all the fields requested from a
304                specific table with a particular where clause.
305                The where clause can either be a dictionary of
306                field=value pairs, a string, or a tuple of (string,
307                a list of values).  The last option is what you
308                should use when accepting user input as it'll
309                protect you against sql injection attacks (if
310                all user data is placed in the array rather than
311                the raw SQL).
312
313                For example:
314                  where = ("a = %s AND b = %s", ['val', 'val'])
315                is better than
316                  where = "a = 'val' AND b = 'val'"
317
318        @param fields: The list of selected fields string.
319        @param table: The name of the database table.
320        @param where: The where clause string.
321        @param distinct: If select distinct values.
322        @param group_by: Group by clause.
323        @param max_rows: unused.
324        """
325        cmd = ['select']
326        if distinct:
327            cmd.append('distinct')
328        cmd += [fields, 'from', table]
329
330        where_clause, values = self._where_clause(where)
331        cmd.append(where_clause)
332
333        if group_by:
334            cmd.append(' GROUP BY ' + group_by)
335
336        self.dprint('%s %s' % (' '.join(cmd), values))
337
338        # create a re-runable function for executing the query
339        def exec_sql():
340            """Exeuctes an the sql command."""
341            sql = ' '.join(cmd)
342            numRec = self.cur.execute(sql, values)
343            if max_rows is not None and numRec > max_rows:
344                msg = 'Exceeded allowed number of records'
345                raise MySQLTooManyRows(msg)
346            return self.cur.fetchall()
347
348        # run the query, re-trying after operational errors
349        if self.autocommit:
350            return self.run_with_retry(exec_sql)
351        else:
352            return exec_sql()
353
354
355    def select_sql(self, fields, table, sql, values):
356        """\
357                select fields from table "sql"
358
359        @param fields: The list of selected fields string.
360        @param table: The name of the database table.
361        @param sql: The sql string.
362        @param values: The sql string parameter values.
363        """
364        cmd = 'select %s from %s %s' % (fields, table, sql)
365        self.dprint(cmd)
366
367        # create a -re-runable function for executing the query
368        def _exec_sql():
369            self.cur.execute(cmd, values)
370            return self.cur.fetchall()
371
372        # run the query, re-trying after operational errors
373        if self.autocommit:
374            return self.run_with_retry(_exec_sql)
375        else:
376            return _exec_sql()
377
378
379    def _exec_sql_with_commit(self, sql, values, commit):
380        if self.autocommit:
381            # re-run the query until it succeeds
382            def _exec_sql():
383                self.cur.execute(sql, values)
384                self.con.commit()
385            self.run_with_retry(_exec_sql)
386        else:
387            # take one shot at running the query
388            self.cur.execute(sql, values)
389            if commit:
390                self.con.commit()
391
392
393    def insert(self, table, data, commit=None):
394        """\
395                'insert into table (keys) values (%s ... %s)', values
396
397                data:
398                        dictionary of fields and data
399
400        @param table: The name of the table.
401        @param data: The insert data.
402        @param commit: If commit the transaction .
403        """
404        fields = data.keys()
405        refs = ['%s' for field in fields]
406        values = [data[field] for field in fields]
407        cmd = ('insert into %s (%s) values (%s)' %
408               (table, ','.join(self._quote(field) for field in fields),
409                ','.join(refs)))
410        self.dprint('%s %s' % (cmd, values))
411
412        self._exec_sql_with_commit(cmd, values, commit)
413
414
415    def delete(self, table, where, commit = None):
416        """Delete entries.
417
418        @param table: The name of the table.
419        @param where: The where clause.
420        @param commit: If commit the transaction .
421        """
422        cmd = ['delete from', table]
423        if commit is None:
424            commit = self.autocommit
425        where_clause, values = self._where_clause(where)
426        cmd.append(where_clause)
427        sql = ' '.join(cmd)
428        self.dprint('%s %s' % (sql, values))
429
430        self._exec_sql_with_commit(sql, values, commit)
431
432
433    def update(self, table, data, where, commit = None):
434        """\
435                'update table set data values (%s ... %s) where ...'
436
437                data:
438                        dictionary of fields and data
439
440        @param table: The name of the table.
441        @param data: The sql parameter values.
442        @param where: The where clause.
443        @param commit: If commit the transaction .
444        """
445        if commit is None:
446            commit = self.autocommit
447        cmd = 'update %s ' % table
448        fields = data.keys()
449        data_refs = [self._quote(field) + '=%s' for field in fields]
450        data_values = [data[field] for field in fields]
451        cmd += ' set ' + ', '.join(data_refs)
452
453        where_clause, where_values = self._where_clause(where)
454        cmd += where_clause
455
456        values = data_values + where_values
457        self.dprint('%s %s' % (cmd, values))
458
459        self._exec_sql_with_commit(cmd, values, commit)
460
461
462    def delete_job(self, tag, commit = None):
463        """Delete a tko job.
464
465        @param tag: The job tag.
466        @param commit: If commit the transaction .
467        """
468        job_idx = self.find_job(tag)
469        for test_idx in self.find_tests(job_idx):
470            where = {'test_idx' : test_idx}
471            self.delete('tko_iteration_result', where)
472            self.delete('tko_iteration_perf_value', where)
473            self.delete('tko_iteration_attributes', where)
474            self.delete('tko_test_attributes', where)
475            self.delete('tko_test_labels_tests', {'test_id': test_idx})
476        where = {'job_idx' : job_idx}
477        self.delete('tko_tests', where)
478        self.delete('tko_jobs', where)
479
480
481    def insert_job(self, tag, job, commit=None):
482        """Insert a tko job.
483
484        @param tag: The job tag.
485        @param job: The job object.
486        @param commit: If commit the transaction .
487        """
488        data = self._get_common_job_data(tag, job)
489        data.update({
490                'afe_job_id': job.afe_job_id,
491                'afe_parent_job_id': job.afe_parent_job_id,
492        })
493        if job.job_idx is not None:
494            self.update(
495                    'tko_jobs', data, {'job_idx': job.job_idx}, commit=commit)
496        else:
497            self.insert('tko_jobs', data, commit=commit)
498            job.job_idx = self.get_last_autonumber_value()
499
500
501    def _get_common_job_data(self, tag, job):
502        """Construct a dictionary with the common data to insert in job/task."""
503        return {
504                'tag':tag,
505                'label': job.label,
506                'username': job.user,
507                'machine_idx': job.machine_idx,
508                'queued_time': job.queued_time,
509                'started_time': job.started_time,
510                'finished_time': job.finished_time,
511                'build': job.build,
512                'build_version': job.build_version,
513                'board': job.board,
514                'suite': job.suite,
515        }
516
517
518    def insert_or_update_task_reference(self, job, reference_type, commit=None):
519        """Insert an entry in the tko_task_references table.
520
521        The job should already have been inserted in tko_jobs.
522        @param job: tko.models.job object.
523        @param reference_type: The type of reference to insert.
524                One of: {'afe', 'skylab'}
525        @param commit: Whether to commit this transaction.
526        """
527        assert reference_type in {'afe', 'skylab'}
528        if reference_type == 'afe':
529            task_id = job.afe_job_id
530            parent_task_id = job.afe_parent_job_id
531        else:
532            task_id = job.skylab_task_id
533            parent_task_id = job.skylab_parent_task_id
534        data = {
535                'reference_type': reference_type,
536                'tko_job_idx': job.job_idx,
537                'task_id': task_id,
538                'parent_task_id': parent_task_id,
539        }
540
541        task_reference_id = self._lookup_task_reference(job)
542        if task_reference_id is not None:
543            self.update('tko_task_references',
544                        data,
545                        {'id': task_reference_id},
546                        commit=commit)
547            job.task_reference_id = task_reference_id
548        else:
549            self.insert('tko_task_references', data, commit=commit)
550            job.task_reference_id = self.get_last_autonumber_value()
551
552
553    def update_job_keyvals(self, job, commit=None):
554        """Updates the job key values.
555
556        @param job: The job object.
557        @param commit: If commit the transaction .
558        """
559        for key, value in job.keyval_dict.iteritems():
560            where = {'job_id': job.job_idx, 'key': key}
561            data = dict(where, value=value)
562            exists = self.select('id', 'tko_job_keyvals', where=where)
563
564            if exists:
565                self.update('tko_job_keyvals', data, where=where, commit=commit)
566            else:
567                self.insert('tko_job_keyvals', data, commit=commit)
568
569
570    def insert_test(self, job, test, commit = None):
571        """Inserts a job test.
572
573        @param job: The job object.
574        @param test: The test object.
575        @param commit: If commit the transaction .
576        """
577        kver = self.insert_kernel(test.kernel, commit=commit)
578        data = {'job_idx':job.job_idx, 'test':test.testname,
579                'subdir':test.subdir, 'kernel_idx':kver,
580                'status':self.status_idx[test.status],
581                'reason':test.reason, 'machine_idx':job.machine_idx,
582                'started_time': test.started_time,
583                'finished_time':test.finished_time}
584        is_update = hasattr(test, "test_idx")
585        if is_update:
586            test_idx = test.test_idx
587            self.update('tko_tests', data,
588                        {'test_idx': test_idx}, commit=commit)
589            where = {'test_idx': test_idx}
590            self.delete('tko_iteration_result', where)
591            self.delete('tko_iteration_perf_value', where)
592            self.delete('tko_iteration_attributes', where)
593            where['user_created'] = 0
594            self.delete('tko_test_attributes', where)
595        else:
596            self.insert('tko_tests', data, commit=commit)
597            test_idx = test.test_idx = self.get_last_autonumber_value()
598        data = {'test_idx': test_idx}
599
600        for i in test.iterations:
601            data['iteration'] = i.index
602            for key, value in i.attr_keyval.iteritems():
603                data['attribute'] = key
604                data['value'] = value
605                self.insert('tko_iteration_attributes', data,
606                            commit=commit)
607            for key, value in i.perf_keyval.iteritems():
608                data['attribute'] = key
609                if math.isnan(value) or math.isinf(value):
610                    data['value'] = None
611                else:
612                    data['value'] = value
613                self.insert('tko_iteration_result', data,
614                            commit=commit)
615
616        data = {'test_idx': test_idx}
617
618        for key, value in test.attributes.iteritems():
619            data = {'test_idx': test_idx, 'attribute': key,
620                    'value': value}
621            try:
622                self.insert('tko_test_attributes', data, commit=commit)
623            except:
624                _log_error('Uploading attribute %r' % (data))
625                raise
626
627        if not is_update:
628            for label_index in test.labels:
629                data = {'test_id': test_idx, 'testlabel_id': label_index}
630                self.insert('tko_test_labels_tests', data, commit=commit)
631
632
633    def read_machine_map(self):
634        """Reads the machine map."""
635        if self.machine_group or not self.machine_map:
636            return
637        for line in open(self.machine_map, 'r').readlines():
638            (machine, group) = line.split()
639            self.machine_group[machine] = group
640
641
642    def machine_info_dict(self, job):
643        """Reads the machine information of a job.
644
645        @param job: The job object.
646
647        @return: The machine info dictionary.
648        """
649        hostname = job.machine
650        group = job.machine_group
651        owner = job.machine_owner
652
653        if not group:
654            self.read_machine_map()
655            group = self.machine_group.get(hostname, hostname)
656            if group == hostname and owner:
657                group = owner + '/' + hostname
658
659        return {'hostname': hostname, 'machine_group': group, 'owner': owner}
660
661
662    def insert_or_update_machine(self, job, commit=None):
663        """Insert or updates machine information for the given job.
664
665        Also updates the job object with new machine index, if any.
666
667        @param job: tko.models.job object.
668        @param commit: Whether to commit the database transaction.
669        """
670        job.machine_idx = self._lookup_machine(job.machine)
671        if not job.machine_idx:
672            job.machine_idx = self._insert_machine(job, commit=commit)
673        elif job.machine:
674            # Only try to update tko_machines record if machine is set. This
675            # prevents unnecessary db writes for suite jobs.
676            self._update_machine_information(job, commit=commit)
677
678
679    def _lookup_task_reference(self, job):
680        """Find the task_reference_id for a given job. Return None if not found.
681
682        @param job: tko.models.job object.
683        """
684        if job.job_idx is None:
685            return None
686        rows = self.select(
687                'id', 'tko_task_references', {'tko_job_idx': job.job_idx})
688        if not rows:
689            return None
690        if len(rows) > 1:
691            raise MySQLTooManyRows('Got %d tko_task_references for tko_job %d'
692                                   % (len(rows), job.job_idx))
693        return rows[0][0]
694
695
696    def _insert_machine(self, job, commit = None):
697        """Inserts the job machine.
698
699        @param job: The job object.
700        @param commit: If commit the transaction .
701        """
702        machine_info = self.machine_info_dict(job)
703        self.insert('tko_machines', machine_info, commit=commit)
704        return self.get_last_autonumber_value()
705
706
707    def _update_machine_information(self, job, commit = None):
708        """Updates the job machine information.
709
710        @param job: The job object.
711        @param commit: If commit the transaction .
712        """
713        machine_info = self.machine_info_dict(job)
714        self.update('tko_machines', machine_info,
715                    where={'hostname': machine_info['hostname']},
716                    commit=commit)
717
718
719    def _lookup_machine(self, hostname):
720        """Look up the machine information.
721
722        @param hostname: The hostname as string.
723        """
724        where = { 'hostname' : hostname }
725        rows = self.select('machine_idx', 'tko_machines', where)
726        if rows:
727            return rows[0][0]
728        else:
729            return None
730
731
732    def lookup_kernel(self, kernel):
733        """Look up the kernel.
734
735        @param kernel: The kernel object.
736        """
737        rows = self.select('kernel_idx', 'tko_kernels',
738                                {'kernel_hash':kernel.kernel_hash})
739        if rows:
740            return rows[0][0]
741        else:
742            return None
743
744
745    def insert_kernel(self, kernel, commit = None):
746        """Insert a kernel.
747
748        @param kernel: The kernel object.
749        @param commit: If commit the transaction .
750        """
751        kver = self.lookup_kernel(kernel)
752        if kver:
753            return kver
754
755        # If this kernel has any significant patches, append their hash
756        # as diferentiator.
757        printable = kernel.base
758        patch_count = 0
759        for patch in kernel.patches:
760            match = re.match(r'.*(-mm[0-9]+|-git[0-9]+)\.(bz2|gz)$',
761                                                    patch.reference)
762            if not match:
763                patch_count += 1
764
765        self.insert('tko_kernels',
766                    {'base':kernel.base,
767                     'kernel_hash':kernel.kernel_hash,
768                     'printable':printable},
769                    commit=commit)
770        kver = self.get_last_autonumber_value()
771
772        if patch_count > 0:
773            printable += ' p%d' % (kver)
774            self.update('tko_kernels',
775                    {'printable':printable},
776                    {'kernel_idx':kver})
777
778        for patch in kernel.patches:
779            self.insert_patch(kver, patch, commit=commit)
780        return kver
781
782
783    def insert_patch(self, kver, patch, commit = None):
784        """Insert a kernel patch.
785
786        @param kver: The kernel version.
787        @param patch: The kernel patch object.
788        @param commit: If commit the transaction .
789        """
790        print patch.reference
791        name = os.path.basename(patch.reference)[:80]
792        self.insert('tko_patches',
793                    {'kernel_idx': kver,
794                     'name':name,
795                     'url':patch.reference,
796                     'hash':patch.hash},
797                    commit=commit)
798
799
800    def find_test(self, job_idx, testname, subdir):
801        """Find a test by name.
802
803        @param job_idx: The job index.
804        @param testname: The test name.
805        @param subdir: The test sub directory under the job directory.
806        """
807        where = {'job_idx': job_idx , 'test': testname, 'subdir': subdir}
808        rows = self.select('test_idx', 'tko_tests', where)
809        if rows:
810            return rows[0][0]
811        else:
812            return None
813
814
815    def find_tests(self, job_idx):
816        """Find all tests by job index.
817
818        @param job_idx: The job index.
819        @return: A list of tests.
820        """
821        where = { 'job_idx':job_idx }
822        rows = self.select('test_idx', 'tko_tests', where)
823        if rows:
824            return [row[0] for row in rows]
825        else:
826            return []
827
828
829    def find_job(self, tag):
830        """Find a job by tag.
831
832        @param tag: The job tag name.
833        @return: The job object or None.
834        """
835        rows = self.select('job_idx', 'tko_jobs', {'tag': tag})
836        if rows:
837            return rows[0][0]
838        else:
839            return None
840
841
842    def get_child_tests_by_parent_task_id(self, parent_task_id):
843        """Get the child tests by a parent task id.
844
845        @param parent_task_id: A string parent task id in tko_task_references.
846        @return: A list of view dicts, which has key 'test_name' and 'status'.
847        """
848        rows = self.select('tko_job_idx', 'tko_task_references',
849                           {'parent_task_id': parent_task_id})
850        tko_job_ids = [str(r[0]) for r in rows]
851        fields = ['test_name', 'status']
852        where = 'job_idx in (%s)' % ', '.join(tko_job_ids)
853        rows = self.select(', '.join(fields), 'tko_test_view_2', where)
854        return [{'test_name': r[0], 'status': r[1]} for r in rows]
855
856
857def db(*args, **dargs):
858    """Creates an instance of the database class with the arguments
859    provided in args and dargs, using the database type specified by
860    the global configuration (defaulting to mysql).
861
862    @param args: The db_type arguments.
863    @param dargs: The db_type named arguments.
864
865    @return: An db object.
866    """
867    return db_sql(*args, **dargs)
868