• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Lint as: python2, python3
2# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6from __future__ import division
7from __future__ import print_function
8
9try:
10    import MySQLdb as driver
11except ImportError:
12    # This module (tko) is unconditionally imported by autoserv,
13    # even in environments where MyQSLdb is unavailable. Thus, we
14    # need to cope with import failure here.
15    # See https://bugs.chromium.org/p/chromium/issues/detail?id=860166#c17 for
16    # context.
17    class UtterlyFakeDb(object):
18        """Lame fake of MySQLdb for import time needs of this file."""
19        OperationalError = object()
20
21    driver = UtterlyFakeDb
22
23import math
24import os
25import random
26import re
27import sys
28import time
29
30import common
31from autotest_lib.client.common_lib import global_config
32from autotest_lib.client.common_lib import utils
33from autotest_lib.client.common_lib.cros import retry
34from autotest_lib.frontend import database_settings_helper
35import six
36
37try:
38    from autotest_lib.utils.frozen_chromite.lib import metrics
39except ImportError:
40    metrics = utils.metrics_mock
41
42
43def _log_error(msg):
44    """Log an error message.
45
46    @param msg: Message string
47    """
48    print(msg, file=sys.stderr)
49    sys.stderr.flush()  # we want these msgs to show up immediately
50
51
52def _format_operational_error(e):
53    """Format OperationalError.
54
55    @param e: OperationalError instance.
56    """
57    return ("%s: An operational error occurred during a database "
58            "operation: %s" % (time.strftime("%X %x"), str(e)))
59
60
61class MySQLTooManyRows(Exception):
62    """Too many records."""
63    pass
64
65
66def _connection_retry_callback():
67    """Callback method used to increment a retry metric."""
68    metrics.Counter('chromeos/autotest/tko/connection_retries').increment()
69
70
71class db_sql(object):
72    """Data access."""
73
74    def __init__(self, debug=False, autocommit=True, host=None,
75                 database=None, user=None, password=None):
76        self.debug = debug
77        self.autocommit = autocommit
78        self._load_config(host, database, user, password)
79
80        self.con = None
81        self._init_db()
82
83        # if not present, insert statuses
84        self.status_idx = {}
85        self.status_word = {}
86        status_rows = self.select('status_idx, word', 'tko_status', None)
87        for s in status_rows:
88            self.status_idx[s[1]] = s[0]
89            self.status_word[s[0]] = s[1]
90
91        machine_map = os.path.join(os.path.dirname(__file__),
92                                   'machines')
93        if os.path.exists(machine_map):
94            self.machine_map = machine_map
95        else:
96            self.machine_map = None
97        self.machine_group = {}
98
99
100    def _load_config(self, host, database, user, password):
101        """Loads configuration settings required to connect to the database.
102
103        This will try to connect to use the settings prefixed with global_db_.
104        If they do not exist, they un-prefixed settings will be used.
105
106        If parameters are supplied, these will be taken instead of the values
107        in global_config.
108
109        The setting of 'host' can be a real host, or a unix socket if it starts
110        with '/'.
111
112        @param host: If set, this host will be used, if not, the host will be
113                     retrieved from global_config.
114        @param database: If set, this database will be used, if not, the
115                         database will be retrieved from global_config.
116        @param user: If set, this user will be used, if not, the
117                         user will be retrieved from global_config.
118        @param password: If set, this password will be used, if not, the
119                         password will be retrieved from global_config.
120        """
121        database_settings = database_settings_helper.get_global_db_config()
122
123        # grab the host, database
124        self.host = host or database_settings['HOST']
125        self.database = database or database_settings['NAME']
126
127        # grab the user and password
128        self.user = user or database_settings['USER']
129        self.password = password or database_settings['PASSWORD']
130
131        # grab the timeout configuration
132        self.query_timeout =(
133                database_settings.get('OPTIONS', {}).get('timeout', 3600))
134
135        # Using fallback to non-global in order to work without configuration
136        # overhead on non-shard instances.
137        get_value = global_config.global_config.get_config_value_with_fallback
138        self.min_delay = get_value("AUTOTEST_WEB", "global_db_min_retry_delay",
139                                   "min_retry_delay", type=int, default=20)
140        self.max_delay = get_value("AUTOTEST_WEB", "global_db_max_retry_delay",
141                                   "max_retry_delay", type=int, default=60)
142
143        # TODO(beeps): Move this to django settings once we have routers.
144        # On test instances mysql connects through a different port. No point
145        # piping this through our entire infrastructure when it is only really
146        # used for testing; Ideally we would specify this through django
147        # settings and default it to the empty string so django will figure out
148        # the default based on the database backend (eg: mysql, 3306), but until
149        # we have database routers in place any django settings will apply to
150        # both tko and afe.
151        # The intended use of this port is to allow a testing shard vm to
152        # update the vm's database with test results. Specifying
153        # and empty string will fallback to not even specifying the port
154        # to the backend in tko/db.py. Unfortunately this means retries
155        # won't work on the test cluster till we've migrated to routers.
156        self.port = global_config.global_config.get_config_value(
157                "AUTOTEST_WEB", "global_db_port", type=str, default='')
158
159
160    def _init_db(self):
161        # make sure we clean up any existing connection
162        if self.con:
163            self.con.close()
164            self.con = None
165
166        # create the db connection and cursor
167        self.con = self.connect(self.host, self.database,
168                                self.user, self.password, self.port)
169        self.cur = self.con.cursor()
170
171
172    def _random_delay(self):
173        delay = random.randint(self.min_delay, self.max_delay)
174        time.sleep(delay)
175
176
177    @retry.retry(driver.OperationalError, timeout_min=10,
178                 delay_sec=5, callback=_connection_retry_callback)
179    def connect(self, host, database, user, password, port):
180        """Open and return a connection to mysql database."""
181        connection_args = {
182            'db': database,
183            'user': user,
184            'passwd': password,
185            'connect_timeout': 20,
186        }
187        if port:
188            connection_args['port'] = int(port)
189
190        if host.startswith('/'):
191            return driver.connect(unix_socket=host, **connection_args)
192
193        return driver.connect(host=host, **connection_args)
194
195
196    def run_with_retry(self, function, *args, **dargs):
197        """Call function(*args, **dargs) until either it passes
198        without an operational error, or a timeout is reached.
199        This will re-connect to the database, so it is NOT safe
200        to use this inside of a database transaction.
201
202        It can be safely used with transactions, but the
203        transaction start & end must be completely contained
204        within the call to 'function'.
205
206        @param function: The function to run with retry.
207        @param args: The arguments
208        @param dargs: The named arguments.
209        """
210        success = False
211        start_time = time.time()
212        while not success:
213            try:
214                result = function(*args, **dargs)
215            except driver.OperationalError as e:
216                _log_error("%s; retrying, don't panic yet"
217                           % _format_operational_error(e))
218                stop_time = time.time()
219                elapsed_time = stop_time - start_time
220                if elapsed_time > self.query_timeout:
221                    raise
222                else:
223                    try:
224                        self._random_delay()
225                        self._init_db()
226                    except driver.OperationalError as e:
227                        _log_error('%s; panic now'
228                                   % _format_operational_error(e))
229            else:
230                success = True
231        return result
232
233
234    def dprint(self, value):
235        """Print out debug value.
236
237        @param value: The value to print out.
238        """
239        if self.debug:
240            sys.stdout.write('SQL: ' + str(value) + '\n')
241
242
243    def _commit(self):
244        """Private method for function commit to call for retry.
245        """
246        return self.con.commit()
247
248
249    def commit(self):
250        """Commit the sql transaction."""
251        if self.autocommit:
252            return self.run_with_retry(self._commit)
253        else:
254            return self._commit()
255
256
257    def rollback(self):
258        """Rollback the sql transaction."""
259        self.con.rollback()
260
261
262    def get_last_autonumber_value(self):
263        """Gets the last auto number.
264
265        @return: The last auto number.
266        """
267        self.cur.execute('SELECT LAST_INSERT_ID()', [])
268        return self.cur.fetchall()[0][0]
269
270
271    def _quote(self, field):
272        return '`%s`' % field
273
274
275    def _where_clause(self, where):
276        if not where:
277            return '', []
278
279        if isinstance(where, dict):
280            # key/value pairs (which should be equal, or None for null)
281            keys, values = [], []
282            for field, value in six.iteritems(where):
283                quoted_field = self._quote(field)
284                if value is None:
285                    keys.append(quoted_field + ' is null')
286                else:
287                    keys.append(quoted_field + '=%s')
288                    values.append(value)
289            where_clause = ' and '.join(keys)
290        elif isinstance(where, six.string_types):
291            # the exact string
292            where_clause = where
293            values = []
294        elif isinstance(where, tuple):
295            # preformatted where clause + values
296            where_clause, values = where
297            assert where_clause
298        else:
299            raise ValueError('Invalid "where" value: %r' % where)
300
301        return ' WHERE ' + where_clause, values
302
303
304
305    def select(self, fields, table, where, distinct=False, group_by=None,
306               max_rows=None):
307        """\
308                This selects all the fields requested from a
309                specific table with a particular where clause.
310                The where clause can either be a dictionary of
311                field=value pairs, a string, or a tuple of (string,
312                a list of values).  The last option is what you
313                should use when accepting user input as it'll
314                protect you against sql injection attacks (if
315                all user data is placed in the array rather than
316                the raw SQL).
317
318                For example:
319                  where = ("a = %s AND b = %s", ['val', 'val'])
320                is better than
321                  where = "a = 'val' AND b = 'val'"
322
323        @param fields: The list of selected fields string.
324        @param table: The name of the database table.
325        @param where: The where clause string.
326        @param distinct: If select distinct values.
327        @param group_by: Group by clause.
328        @param max_rows: unused.
329        """
330        cmd = ['select']
331        if distinct:
332            cmd.append('distinct')
333        cmd += [fields, 'from', table]
334
335        where_clause, values = self._where_clause(where)
336        cmd.append(where_clause)
337
338        if group_by:
339            cmd.append(' GROUP BY ' + group_by)
340
341        self.dprint('%s %s' % (' '.join(cmd), values))
342
343        # create a re-runable function for executing the query
344        def exec_sql():
345            """Exeuctes an the sql command."""
346            sql = ' '.join(cmd)
347            numRec = self.cur.execute(sql, values)
348            if max_rows is not None and numRec > max_rows:
349                msg = 'Exceeded allowed number of records'
350                raise MySQLTooManyRows(msg)
351            return self.cur.fetchall()
352
353        # run the query, re-trying after operational errors
354        if self.autocommit:
355            return self.run_with_retry(exec_sql)
356        else:
357            return exec_sql()
358
359
360    def select_sql(self, fields, table, sql, values):
361        """\
362                select fields from table "sql"
363
364        @param fields: The list of selected fields string.
365        @param table: The name of the database table.
366        @param sql: The sql string.
367        @param values: The sql string parameter values.
368        """
369        cmd = 'select %s from %s %s' % (fields, table, sql)
370        self.dprint(cmd)
371
372        # create a -re-runable function for executing the query
373        def _exec_sql():
374            self.cur.execute(cmd, values)
375            return self.cur.fetchall()
376
377        # run the query, re-trying after operational errors
378        if self.autocommit:
379            return self.run_with_retry(_exec_sql)
380        else:
381            return _exec_sql()
382
383
384    def _exec_sql_with_commit(self, sql, values, commit):
385        if self.autocommit:
386            # re-run the query until it succeeds
387            def _exec_sql():
388                self.cur.execute(sql, values)
389                self.con.commit()
390            self.run_with_retry(_exec_sql)
391        else:
392            # take one shot at running the query
393            self.cur.execute(sql, values)
394            if commit:
395                self.con.commit()
396
397
398    def insert(self, table, data, commit=None):
399        """\
400                'insert into table (keys) values (%s ... %s)', values
401
402                data:
403                        dictionary of fields and data
404
405        @param table: The name of the table.
406        @param data: The insert data.
407        @param commit: If commit the transaction .
408        """
409        fields = list(data.keys())
410        refs = ['%s' for field in fields]
411        values = [data[field] for field in fields]
412        cmd = ('insert into %s (%s) values (%s)' %
413               (table, ','.join(self._quote(field) for field in fields),
414                ','.join(refs)))
415        self.dprint('%s %s' % (cmd, values))
416
417        self._exec_sql_with_commit(cmd, values, commit)
418
419
420    def delete(self, table, where, commit = None):
421        """Delete entries.
422
423        @param table: The name of the table.
424        @param where: The where clause.
425        @param commit: If commit the transaction .
426        """
427        cmd = ['delete from', table]
428        if commit is None:
429            commit = self.autocommit
430        where_clause, values = self._where_clause(where)
431        cmd.append(where_clause)
432        sql = ' '.join(cmd)
433        self.dprint('%s %s' % (sql, values))
434
435        self._exec_sql_with_commit(sql, values, commit)
436
437
438    def update(self, table, data, where, commit = None):
439        """\
440                'update table set data values (%s ... %s) where ...'
441
442                data:
443                        dictionary of fields and data
444
445        @param table: The name of the table.
446        @param data: The sql parameter values.
447        @param where: The where clause.
448        @param commit: If commit the transaction .
449        """
450        if commit is None:
451            commit = self.autocommit
452        cmd = 'update %s ' % table
453        fields = list(data.keys())
454        data_refs = [self._quote(field) + '=%s' for field in fields]
455        data_values = [data[field] for field in fields]
456        cmd += ' set ' + ', '.join(data_refs)
457
458        where_clause, where_values = self._where_clause(where)
459        cmd += where_clause
460
461        values = data_values + where_values
462        self.dprint('%s %s' % (cmd, values))
463
464        self._exec_sql_with_commit(cmd, values, commit)
465
466
467    def delete_job(self, tag, commit = None):
468        """Delete a tko job.
469
470        @param tag: The job tag.
471        @param commit: If commit the transaction .
472        """
473        job_idx = self.find_job(tag)
474        for test_idx in self.find_tests(job_idx):
475            where = {'test_idx' : test_idx}
476            self.delete('tko_iteration_result', where)
477            self.delete('tko_iteration_perf_value', where)
478            self.delete('tko_iteration_attributes', where)
479            self.delete('tko_test_attributes', where)
480            self.delete('tko_test_labels_tests', {'test_id': test_idx})
481        where = {'job_idx' : job_idx}
482        self.delete('tko_tests', where)
483        self.delete('tko_jobs', where)
484
485
486    def insert_job(self, tag, job, commit=None):
487        """Insert a tko job.
488
489        @param tag: The job tag.
490        @param job: The job object.
491        @param commit: If commit the transaction .
492        """
493        data = self._get_common_job_data(tag, job)
494        data.update({
495                'afe_job_id': job.afe_job_id,
496                'afe_parent_job_id': job.afe_parent_job_id,
497        })
498        if job.job_idx is not None:
499            self.update(
500                    'tko_jobs', data, {'job_idx': job.job_idx}, commit=commit)
501        else:
502            self.insert('tko_jobs', data, commit=commit)
503            job.job_idx = self.get_last_autonumber_value()
504
505
506    def _get_common_job_data(self, tag, job):
507        """Construct a dictionary with the common data to insert in job/task."""
508        return {
509                'tag':tag,
510                'label': job.label,
511                'username': job.user,
512                'machine_idx': job.machine_idx,
513                'queued_time': job.queued_time,
514                'started_time': job.started_time,
515                'finished_time': job.finished_time,
516                'build': job.build,
517                'build_version': job.build_version,
518                'board': job.board,
519                'suite': job.suite,
520        }
521
522
523    def insert_or_update_task_reference(self, job, reference_type, commit=None):
524        """Insert an entry in the tko_task_references table.
525
526        The job should already have been inserted in tko_jobs.
527        @param job: tko.models.job object.
528        @param reference_type: The type of reference to insert.
529                One of: {'afe', 'skylab'}
530        @param commit: Whether to commit this transaction.
531        """
532        assert reference_type in {'afe', 'skylab'}
533        if reference_type == 'afe':
534            task_id = job.afe_job_id
535            parent_task_id = job.afe_parent_job_id
536        else:
537            task_id = job.skylab_task_id
538            parent_task_id = job.skylab_parent_task_id
539        data = {
540                'reference_type': reference_type,
541                'tko_job_idx': job.job_idx,
542                'task_id': task_id,
543                'parent_task_id': parent_task_id,
544        }
545
546        task_reference_id = self._lookup_task_reference(job)
547        if task_reference_id is not None:
548            self.update('tko_task_references',
549                        data,
550                        {'id': task_reference_id},
551                        commit=commit)
552            job.task_reference_id = task_reference_id
553        else:
554            self.insert('tko_task_references', data, commit=commit)
555            job.task_reference_id = self.get_last_autonumber_value()
556
557
558    def update_job_keyvals(self, job, commit=None):
559        """Updates the job key values.
560
561        @param job: The job object.
562        @param commit: If commit the transaction .
563        """
564        for key, value in six.iteritems(job.keyval_dict):
565            where = {'job_id': job.job_idx, 'key': key}
566            data = dict(where, value=value)
567            exists = self.select('id', 'tko_job_keyvals', where=where)
568
569            if exists:
570                self.update('tko_job_keyvals', data, where=where, commit=commit)
571            else:
572                self.insert('tko_job_keyvals', data, commit=commit)
573
574
575    def insert_test(self, job, test, commit = None):
576        """Inserts a job test.
577
578        @param job: The job object.
579        @param test: The test object.
580        @param commit: If commit the transaction .
581        """
582        kver = self.insert_kernel(test.kernel, commit=commit)
583        data = {'job_idx':job.job_idx, 'test':test.testname,
584                'subdir':test.subdir, 'kernel_idx':kver,
585                'status':self.status_idx[test.status],
586                'reason':test.reason, 'machine_idx':job.machine_idx,
587                'started_time': test.started_time,
588                'finished_time':test.finished_time}
589        is_update = hasattr(test, "test_idx")
590        if is_update:
591            test_idx = test.test_idx
592            self.update('tko_tests', data,
593                        {'test_idx': test_idx}, commit=commit)
594            where = {'test_idx': test_idx}
595            self.delete('tko_iteration_result', where)
596            self.delete('tko_iteration_perf_value', where)
597            self.delete('tko_iteration_attributes', where)
598            where['user_created'] = 0
599            self.delete('tko_test_attributes', where)
600        else:
601            self.insert('tko_tests', data, commit=commit)
602            test_idx = test.test_idx = self.get_last_autonumber_value()
603        data = {'test_idx': test_idx}
604
605        for i in test.iterations:
606            data['iteration'] = i.index
607            for key, value in six.iteritems(i.attr_keyval):
608                data['attribute'] = key
609                data['value'] = value
610                self.insert('tko_iteration_attributes', data,
611                            commit=commit)
612            for key, value in six.iteritems(i.perf_keyval):
613                data['attribute'] = key
614                if math.isnan(value) or math.isinf(value):
615                    data['value'] = None
616                else:
617                    data['value'] = value
618                self.insert('tko_iteration_result', data,
619                            commit=commit)
620
621        data = {'test_idx': test_idx}
622
623        for key, value in six.iteritems(test.attributes):
624            data = {'test_idx': test_idx, 'attribute': key,
625                    'value': value}
626            try:
627                self.insert('tko_test_attributes', data, commit=commit)
628            except:
629                _log_error('Uploading attribute %r' % (data))
630                raise
631
632        if not is_update:
633            for label_index in test.labels:
634                data = {'test_id': test_idx, 'testlabel_id': label_index}
635                self.insert('tko_test_labels_tests', data, commit=commit)
636
637
638    def read_machine_map(self):
639        """Reads the machine map."""
640        if self.machine_group or not self.machine_map:
641            return
642        for line in open(self.machine_map, 'r').readlines():
643            (machine, group) = line.split()
644            self.machine_group[machine] = group
645
646
647    def machine_info_dict(self, job):
648        """Reads the machine information of a job.
649
650        @param job: The job object.
651
652        @return: The machine info dictionary.
653        """
654        hostname = job.machine
655        group = job.machine_group
656        owner = job.machine_owner
657
658        if not group:
659            self.read_machine_map()
660            group = self.machine_group.get(hostname, hostname)
661            if group == hostname and owner:
662                group = owner + '/' + hostname
663
664        return {'hostname': hostname, 'machine_group': group, 'owner': owner}
665
666
667    def insert_or_update_machine(self, job, commit=None):
668        """Insert or updates machine information for the given job.
669
670        Also updates the job object with new machine index, if any.
671
672        @param job: tko.models.job object.
673        @param commit: Whether to commit the database transaction.
674        """
675        job.machine_idx = self._lookup_machine(job.machine)
676        if not job.machine_idx:
677            job.machine_idx = self._insert_machine(job, commit=commit)
678        elif job.machine:
679            # Only try to update tko_machines record if machine is set. This
680            # prevents unnecessary db writes for suite jobs.
681            self._update_machine_information(job, commit=commit)
682
683
684    def _lookup_task_reference(self, job):
685        """Find the task_reference_id for a given job. Return None if not found.
686
687        @param job: tko.models.job object.
688        """
689        if job.job_idx is None:
690            return None
691        rows = self.select(
692                'id', 'tko_task_references', {'tko_job_idx': job.job_idx})
693        if not rows:
694            return None
695        if len(rows) > 1:
696            raise MySQLTooManyRows('Got %d tko_task_references for tko_job %d'
697                                   % (len(rows), job.job_idx))
698        return rows[0][0]
699
700
701    def _insert_machine(self, job, commit = None):
702        """Inserts the job machine.
703
704        @param job: The job object.
705        @param commit: If commit the transaction .
706        """
707        machine_info = self.machine_info_dict(job)
708        self.insert('tko_machines', machine_info, commit=commit)
709        return self.get_last_autonumber_value()
710
711
712    def _update_machine_information(self, job, commit = None):
713        """Updates the job machine information.
714
715        @param job: The job object.
716        @param commit: If commit the transaction .
717        """
718        machine_info = self.machine_info_dict(job)
719        self.update('tko_machines', machine_info,
720                    where={'hostname': machine_info['hostname']},
721                    commit=commit)
722
723
724    def _lookup_machine(self, hostname):
725        """Look up the machine information.
726
727        @param hostname: The hostname as string.
728        """
729        where = { 'hostname' : hostname }
730        rows = self.select('machine_idx', 'tko_machines', where)
731        if rows:
732            return rows[0][0]
733        else:
734            return None
735
736
737    def lookup_kernel(self, kernel):
738        """Look up the kernel.
739
740        @param kernel: The kernel object.
741        """
742        rows = self.select('kernel_idx', 'tko_kernels',
743                                {'kernel_hash':kernel.kernel_hash})
744        if rows:
745            return rows[0][0]
746        else:
747            return None
748
749
750    def insert_kernel(self, kernel, commit = None):
751        """Insert a kernel.
752
753        @param kernel: The kernel object.
754        @param commit: If commit the transaction .
755        """
756        kver = self.lookup_kernel(kernel)
757        if kver:
758            return kver
759
760        # If this kernel has any significant patches, append their hash
761        # as diferentiator.
762        printable = kernel.base
763        patch_count = 0
764        for patch in kernel.patches:
765            match = re.match(r'.*(-mm[0-9]+|-git[0-9]+)\.(bz2|gz)$',
766                                                    patch.reference)
767            if not match:
768                patch_count += 1
769
770        self.insert('tko_kernels',
771                    {'base':kernel.base,
772                     'kernel_hash':kernel.kernel_hash,
773                     'printable':printable},
774                    commit=commit)
775        kver = self.get_last_autonumber_value()
776
777        if patch_count > 0:
778            printable += ' p%d' % (kver)
779            self.update('tko_kernels',
780                    {'printable':printable},
781                    {'kernel_idx':kver})
782
783        for patch in kernel.patches:
784            self.insert_patch(kver, patch, commit=commit)
785        return kver
786
787
788    def insert_patch(self, kver, patch, commit = None):
789        """Insert a kernel patch.
790
791        @param kver: The kernel version.
792        @param patch: The kernel patch object.
793        @param commit: If commit the transaction .
794        """
795        print(patch.reference)
796        name = os.path.basename(patch.reference)[:80]
797        self.insert('tko_patches',
798                    {'kernel_idx': kver,
799                     'name':name,
800                     'url':patch.reference,
801                     'hash':patch.hash},
802                    commit=commit)
803
804
805    def find_tests(self, job_idx):
806        """Find all tests by job index.
807
808        @param job_idx: The job index.
809        @return: A list of tests.
810        """
811        where = { 'job_idx':job_idx }
812        rows = self.select('test_idx', 'tko_tests', where)
813        if rows:
814            return [row[0] for row in rows]
815        else:
816            return []
817
818
819    def find_job(self, tag):
820        """Find a job by tag.
821
822        @param tag: The job tag name.
823        @return: The job object or None.
824        """
825        rows = self.select('job_idx', 'tko_jobs', {'tag': tag})
826        if rows:
827            return rows[0][0]
828        else:
829            return None
830
831
832def db(*args, **dargs):
833    """Creates an instance of the database class with the arguments
834    provided in args and dargs, using the database type specified by
835    the global configuration (defaulting to mysql).
836
837    @param args: The db_type arguments.
838    @param dargs: The db_type named arguments.
839
840    @return: An db object.
841    """
842    return db_sql(*args, **dargs)
843