1# Lint as: python2, python3 2# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6from __future__ import division 7from __future__ import print_function 8 9try: 10 import MySQLdb as driver 11except ImportError: 12 # This module (tko) is unconditionally imported by autoserv, 13 # even in environments where MyQSLdb is unavailable. Thus, we 14 # need to cope with import failure here. 15 # See https://bugs.chromium.org/p/chromium/issues/detail?id=860166#c17 for 16 # context. 17 class UtterlyFakeDb(object): 18 """Lame fake of MySQLdb for import time needs of this file.""" 19 OperationalError = object() 20 21 driver = UtterlyFakeDb 22 23import math 24import os 25import random 26import re 27import sys 28import time 29 30import common 31from autotest_lib.client.common_lib import global_config 32from autotest_lib.client.common_lib import utils 33from autotest_lib.client.common_lib.cros import retry 34from autotest_lib.frontend import database_settings_helper 35import six 36 37try: 38 from autotest_lib.utils.frozen_chromite.lib import metrics 39except ImportError: 40 metrics = utils.metrics_mock 41 42 43def _log_error(msg): 44 """Log an error message. 45 46 @param msg: Message string 47 """ 48 print(msg, file=sys.stderr) 49 sys.stderr.flush() # we want these msgs to show up immediately 50 51 52def _format_operational_error(e): 53 """Format OperationalError. 54 55 @param e: OperationalError instance. 56 """ 57 return ("%s: An operational error occurred during a database " 58 "operation: %s" % (time.strftime("%X %x"), str(e))) 59 60 61class MySQLTooManyRows(Exception): 62 """Too many records.""" 63 pass 64 65 66def _connection_retry_callback(): 67 """Callback method used to increment a retry metric.""" 68 metrics.Counter('chromeos/autotest/tko/connection_retries').increment() 69 70 71class db_sql(object): 72 """Data access.""" 73 74 def __init__(self, debug=False, autocommit=True, host=None, 75 database=None, user=None, password=None): 76 self.debug = debug 77 self.autocommit = autocommit 78 self._load_config(host, database, user, password) 79 80 self.con = None 81 self._init_db() 82 83 # if not present, insert statuses 84 self.status_idx = {} 85 self.status_word = {} 86 status_rows = self.select('status_idx, word', 'tko_status', None) 87 for s in status_rows: 88 self.status_idx[s[1]] = s[0] 89 self.status_word[s[0]] = s[1] 90 91 machine_map = os.path.join(os.path.dirname(__file__), 92 'machines') 93 if os.path.exists(machine_map): 94 self.machine_map = machine_map 95 else: 96 self.machine_map = None 97 self.machine_group = {} 98 99 100 def _load_config(self, host, database, user, password): 101 """Loads configuration settings required to connect to the database. 102 103 This will try to connect to use the settings prefixed with global_db_. 104 If they do not exist, they un-prefixed settings will be used. 105 106 If parameters are supplied, these will be taken instead of the values 107 in global_config. 108 109 The setting of 'host' can be a real host, or a unix socket if it starts 110 with '/'. 111 112 @param host: If set, this host will be used, if not, the host will be 113 retrieved from global_config. 114 @param database: If set, this database will be used, if not, the 115 database will be retrieved from global_config. 116 @param user: If set, this user will be used, if not, the 117 user will be retrieved from global_config. 118 @param password: If set, this password will be used, if not, the 119 password will be retrieved from global_config. 120 """ 121 database_settings = database_settings_helper.get_global_db_config() 122 123 # grab the host, database 124 self.host = host or database_settings['HOST'] 125 self.database = database or database_settings['NAME'] 126 127 # grab the user and password 128 self.user = user or database_settings['USER'] 129 self.password = password or database_settings['PASSWORD'] 130 131 # grab the timeout configuration 132 self.query_timeout =( 133 database_settings.get('OPTIONS', {}).get('timeout', 3600)) 134 135 # Using fallback to non-global in order to work without configuration 136 # overhead on non-shard instances. 137 get_value = global_config.global_config.get_config_value_with_fallback 138 self.min_delay = get_value("AUTOTEST_WEB", "global_db_min_retry_delay", 139 "min_retry_delay", type=int, default=20) 140 self.max_delay = get_value("AUTOTEST_WEB", "global_db_max_retry_delay", 141 "max_retry_delay", type=int, default=60) 142 143 # TODO(beeps): Move this to django settings once we have routers. 144 # On test instances mysql connects through a different port. No point 145 # piping this through our entire infrastructure when it is only really 146 # used for testing; Ideally we would specify this through django 147 # settings and default it to the empty string so django will figure out 148 # the default based on the database backend (eg: mysql, 3306), but until 149 # we have database routers in place any django settings will apply to 150 # both tko and afe. 151 # The intended use of this port is to allow a testing shard vm to 152 # update the vm's database with test results. Specifying 153 # and empty string will fallback to not even specifying the port 154 # to the backend in tko/db.py. Unfortunately this means retries 155 # won't work on the test cluster till we've migrated to routers. 156 self.port = global_config.global_config.get_config_value( 157 "AUTOTEST_WEB", "global_db_port", type=str, default='') 158 159 160 def _init_db(self): 161 # make sure we clean up any existing connection 162 if self.con: 163 self.con.close() 164 self.con = None 165 166 # create the db connection and cursor 167 self.con = self.connect(self.host, self.database, 168 self.user, self.password, self.port) 169 self.cur = self.con.cursor() 170 171 172 def _random_delay(self): 173 delay = random.randint(self.min_delay, self.max_delay) 174 time.sleep(delay) 175 176 177 @retry.retry(driver.OperationalError, timeout_min=10, 178 delay_sec=5, callback=_connection_retry_callback) 179 def connect(self, host, database, user, password, port): 180 """Open and return a connection to mysql database.""" 181 connection_args = { 182 'db': database, 183 'user': user, 184 'passwd': password, 185 'connect_timeout': 20, 186 } 187 if port: 188 connection_args['port'] = int(port) 189 190 if host.startswith('/'): 191 return driver.connect(unix_socket=host, **connection_args) 192 193 return driver.connect(host=host, **connection_args) 194 195 196 def run_with_retry(self, function, *args, **dargs): 197 """Call function(*args, **dargs) until either it passes 198 without an operational error, or a timeout is reached. 199 This will re-connect to the database, so it is NOT safe 200 to use this inside of a database transaction. 201 202 It can be safely used with transactions, but the 203 transaction start & end must be completely contained 204 within the call to 'function'. 205 206 @param function: The function to run with retry. 207 @param args: The arguments 208 @param dargs: The named arguments. 209 """ 210 success = False 211 start_time = time.time() 212 while not success: 213 try: 214 result = function(*args, **dargs) 215 except driver.OperationalError as e: 216 _log_error("%s; retrying, don't panic yet" 217 % _format_operational_error(e)) 218 stop_time = time.time() 219 elapsed_time = stop_time - start_time 220 if elapsed_time > self.query_timeout: 221 raise 222 else: 223 try: 224 self._random_delay() 225 self._init_db() 226 except driver.OperationalError as e: 227 _log_error('%s; panic now' 228 % _format_operational_error(e)) 229 else: 230 success = True 231 return result 232 233 234 def dprint(self, value): 235 """Print out debug value. 236 237 @param value: The value to print out. 238 """ 239 if self.debug: 240 sys.stdout.write('SQL: ' + str(value) + '\n') 241 242 243 def _commit(self): 244 """Private method for function commit to call for retry. 245 """ 246 return self.con.commit() 247 248 249 def commit(self): 250 """Commit the sql transaction.""" 251 if self.autocommit: 252 return self.run_with_retry(self._commit) 253 else: 254 return self._commit() 255 256 257 def rollback(self): 258 """Rollback the sql transaction.""" 259 self.con.rollback() 260 261 262 def get_last_autonumber_value(self): 263 """Gets the last auto number. 264 265 @return: The last auto number. 266 """ 267 self.cur.execute('SELECT LAST_INSERT_ID()', []) 268 return self.cur.fetchall()[0][0] 269 270 271 def _quote(self, field): 272 return '`%s`' % field 273 274 275 def _where_clause(self, where): 276 if not where: 277 return '', [] 278 279 if isinstance(where, dict): 280 # key/value pairs (which should be equal, or None for null) 281 keys, values = [], [] 282 for field, value in six.iteritems(where): 283 quoted_field = self._quote(field) 284 if value is None: 285 keys.append(quoted_field + ' is null') 286 else: 287 keys.append(quoted_field + '=%s') 288 values.append(value) 289 where_clause = ' and '.join(keys) 290 elif isinstance(where, six.string_types): 291 # the exact string 292 where_clause = where 293 values = [] 294 elif isinstance(where, tuple): 295 # preformatted where clause + values 296 where_clause, values = where 297 assert where_clause 298 else: 299 raise ValueError('Invalid "where" value: %r' % where) 300 301 return ' WHERE ' + where_clause, values 302 303 304 305 def select(self, fields, table, where, distinct=False, group_by=None, 306 max_rows=None): 307 """\ 308 This selects all the fields requested from a 309 specific table with a particular where clause. 310 The where clause can either be a dictionary of 311 field=value pairs, a string, or a tuple of (string, 312 a list of values). The last option is what you 313 should use when accepting user input as it'll 314 protect you against sql injection attacks (if 315 all user data is placed in the array rather than 316 the raw SQL). 317 318 For example: 319 where = ("a = %s AND b = %s", ['val', 'val']) 320 is better than 321 where = "a = 'val' AND b = 'val'" 322 323 @param fields: The list of selected fields string. 324 @param table: The name of the database table. 325 @param where: The where clause string. 326 @param distinct: If select distinct values. 327 @param group_by: Group by clause. 328 @param max_rows: unused. 329 """ 330 cmd = ['select'] 331 if distinct: 332 cmd.append('distinct') 333 cmd += [fields, 'from', table] 334 335 where_clause, values = self._where_clause(where) 336 cmd.append(where_clause) 337 338 if group_by: 339 cmd.append(' GROUP BY ' + group_by) 340 341 self.dprint('%s %s' % (' '.join(cmd), values)) 342 343 # create a re-runable function for executing the query 344 def exec_sql(): 345 """Exeuctes an the sql command.""" 346 sql = ' '.join(cmd) 347 numRec = self.cur.execute(sql, values) 348 if max_rows is not None and numRec > max_rows: 349 msg = 'Exceeded allowed number of records' 350 raise MySQLTooManyRows(msg) 351 return self.cur.fetchall() 352 353 # run the query, re-trying after operational errors 354 if self.autocommit: 355 return self.run_with_retry(exec_sql) 356 else: 357 return exec_sql() 358 359 360 def select_sql(self, fields, table, sql, values): 361 """\ 362 select fields from table "sql" 363 364 @param fields: The list of selected fields string. 365 @param table: The name of the database table. 366 @param sql: The sql string. 367 @param values: The sql string parameter values. 368 """ 369 cmd = 'select %s from %s %s' % (fields, table, sql) 370 self.dprint(cmd) 371 372 # create a -re-runable function for executing the query 373 def _exec_sql(): 374 self.cur.execute(cmd, values) 375 return self.cur.fetchall() 376 377 # run the query, re-trying after operational errors 378 if self.autocommit: 379 return self.run_with_retry(_exec_sql) 380 else: 381 return _exec_sql() 382 383 384 def _exec_sql_with_commit(self, sql, values, commit): 385 if self.autocommit: 386 # re-run the query until it succeeds 387 def _exec_sql(): 388 self.cur.execute(sql, values) 389 self.con.commit() 390 self.run_with_retry(_exec_sql) 391 else: 392 # take one shot at running the query 393 self.cur.execute(sql, values) 394 if commit: 395 self.con.commit() 396 397 398 def insert(self, table, data, commit=None): 399 """\ 400 'insert into table (keys) values (%s ... %s)', values 401 402 data: 403 dictionary of fields and data 404 405 @param table: The name of the table. 406 @param data: The insert data. 407 @param commit: If commit the transaction . 408 """ 409 fields = list(data.keys()) 410 refs = ['%s' for field in fields] 411 values = [data[field] for field in fields] 412 cmd = ('insert into %s (%s) values (%s)' % 413 (table, ','.join(self._quote(field) for field in fields), 414 ','.join(refs))) 415 self.dprint('%s %s' % (cmd, values)) 416 417 self._exec_sql_with_commit(cmd, values, commit) 418 419 420 def delete(self, table, where, commit = None): 421 """Delete entries. 422 423 @param table: The name of the table. 424 @param where: The where clause. 425 @param commit: If commit the transaction . 426 """ 427 cmd = ['delete from', table] 428 if commit is None: 429 commit = self.autocommit 430 where_clause, values = self._where_clause(where) 431 cmd.append(where_clause) 432 sql = ' '.join(cmd) 433 self.dprint('%s %s' % (sql, values)) 434 435 self._exec_sql_with_commit(sql, values, commit) 436 437 438 def update(self, table, data, where, commit = None): 439 """\ 440 'update table set data values (%s ... %s) where ...' 441 442 data: 443 dictionary of fields and data 444 445 @param table: The name of the table. 446 @param data: The sql parameter values. 447 @param where: The where clause. 448 @param commit: If commit the transaction . 449 """ 450 if commit is None: 451 commit = self.autocommit 452 cmd = 'update %s ' % table 453 fields = list(data.keys()) 454 data_refs = [self._quote(field) + '=%s' for field in fields] 455 data_values = [data[field] for field in fields] 456 cmd += ' set ' + ', '.join(data_refs) 457 458 where_clause, where_values = self._where_clause(where) 459 cmd += where_clause 460 461 values = data_values + where_values 462 self.dprint('%s %s' % (cmd, values)) 463 464 self._exec_sql_with_commit(cmd, values, commit) 465 466 467 def delete_job(self, tag, commit = None): 468 """Delete a tko job. 469 470 @param tag: The job tag. 471 @param commit: If commit the transaction . 472 """ 473 job_idx = self.find_job(tag) 474 for test_idx in self.find_tests(job_idx): 475 where = {'test_idx' : test_idx} 476 self.delete('tko_iteration_result', where) 477 self.delete('tko_iteration_perf_value', where) 478 self.delete('tko_iteration_attributes', where) 479 self.delete('tko_test_attributes', where) 480 self.delete('tko_test_labels_tests', {'test_id': test_idx}) 481 where = {'job_idx' : job_idx} 482 self.delete('tko_tests', where) 483 self.delete('tko_jobs', where) 484 485 486 def insert_job(self, tag, job, commit=None): 487 """Insert a tko job. 488 489 @param tag: The job tag. 490 @param job: The job object. 491 @param commit: If commit the transaction . 492 """ 493 data = self._get_common_job_data(tag, job) 494 data.update({ 495 'afe_job_id': job.afe_job_id, 496 'afe_parent_job_id': job.afe_parent_job_id, 497 }) 498 if job.job_idx is not None: 499 self.update( 500 'tko_jobs', data, {'job_idx': job.job_idx}, commit=commit) 501 else: 502 self.insert('tko_jobs', data, commit=commit) 503 job.job_idx = self.get_last_autonumber_value() 504 505 506 def _get_common_job_data(self, tag, job): 507 """Construct a dictionary with the common data to insert in job/task.""" 508 return { 509 'tag':tag, 510 'label': job.label, 511 'username': job.user, 512 'machine_idx': job.machine_idx, 513 'queued_time': job.queued_time, 514 'started_time': job.started_time, 515 'finished_time': job.finished_time, 516 'build': job.build, 517 'build_version': job.build_version, 518 'board': job.board, 519 'suite': job.suite, 520 } 521 522 523 def insert_or_update_task_reference(self, job, reference_type, commit=None): 524 """Insert an entry in the tko_task_references table. 525 526 The job should already have been inserted in tko_jobs. 527 @param job: tko.models.job object. 528 @param reference_type: The type of reference to insert. 529 One of: {'afe', 'skylab'} 530 @param commit: Whether to commit this transaction. 531 """ 532 assert reference_type in {'afe', 'skylab'} 533 if reference_type == 'afe': 534 task_id = job.afe_job_id 535 parent_task_id = job.afe_parent_job_id 536 else: 537 task_id = job.skylab_task_id 538 parent_task_id = job.skylab_parent_task_id 539 data = { 540 'reference_type': reference_type, 541 'tko_job_idx': job.job_idx, 542 'task_id': task_id, 543 'parent_task_id': parent_task_id, 544 } 545 546 task_reference_id = self._lookup_task_reference(job) 547 if task_reference_id is not None: 548 self.update('tko_task_references', 549 data, 550 {'id': task_reference_id}, 551 commit=commit) 552 job.task_reference_id = task_reference_id 553 else: 554 self.insert('tko_task_references', data, commit=commit) 555 job.task_reference_id = self.get_last_autonumber_value() 556 557 558 def update_job_keyvals(self, job, commit=None): 559 """Updates the job key values. 560 561 @param job: The job object. 562 @param commit: If commit the transaction . 563 """ 564 for key, value in six.iteritems(job.keyval_dict): 565 where = {'job_id': job.job_idx, 'key': key} 566 data = dict(where, value=value) 567 exists = self.select('id', 'tko_job_keyvals', where=where) 568 569 if exists: 570 self.update('tko_job_keyvals', data, where=where, commit=commit) 571 else: 572 self.insert('tko_job_keyvals', data, commit=commit) 573 574 575 def insert_test(self, job, test, commit = None): 576 """Inserts a job test. 577 578 @param job: The job object. 579 @param test: The test object. 580 @param commit: If commit the transaction . 581 """ 582 kver = self.insert_kernel(test.kernel, commit=commit) 583 data = {'job_idx':job.job_idx, 'test':test.testname, 584 'subdir':test.subdir, 'kernel_idx':kver, 585 'status':self.status_idx[test.status], 586 'reason':test.reason, 'machine_idx':job.machine_idx, 587 'started_time': test.started_time, 588 'finished_time':test.finished_time} 589 is_update = hasattr(test, "test_idx") 590 if is_update: 591 test_idx = test.test_idx 592 self.update('tko_tests', data, 593 {'test_idx': test_idx}, commit=commit) 594 where = {'test_idx': test_idx} 595 self.delete('tko_iteration_result', where) 596 self.delete('tko_iteration_perf_value', where) 597 self.delete('tko_iteration_attributes', where) 598 where['user_created'] = 0 599 self.delete('tko_test_attributes', where) 600 else: 601 self.insert('tko_tests', data, commit=commit) 602 test_idx = test.test_idx = self.get_last_autonumber_value() 603 data = {'test_idx': test_idx} 604 605 for i in test.iterations: 606 data['iteration'] = i.index 607 for key, value in six.iteritems(i.attr_keyval): 608 data['attribute'] = key 609 data['value'] = value 610 self.insert('tko_iteration_attributes', data, 611 commit=commit) 612 for key, value in six.iteritems(i.perf_keyval): 613 data['attribute'] = key 614 if math.isnan(value) or math.isinf(value): 615 data['value'] = None 616 else: 617 data['value'] = value 618 self.insert('tko_iteration_result', data, 619 commit=commit) 620 621 data = {'test_idx': test_idx} 622 623 for key, value in six.iteritems(test.attributes): 624 data = {'test_idx': test_idx, 'attribute': key, 625 'value': value} 626 try: 627 self.insert('tko_test_attributes', data, commit=commit) 628 except: 629 _log_error('Uploading attribute %r' % (data)) 630 raise 631 632 if not is_update: 633 for label_index in test.labels: 634 data = {'test_id': test_idx, 'testlabel_id': label_index} 635 self.insert('tko_test_labels_tests', data, commit=commit) 636 637 638 def read_machine_map(self): 639 """Reads the machine map.""" 640 if self.machine_group or not self.machine_map: 641 return 642 for line in open(self.machine_map, 'r').readlines(): 643 (machine, group) = line.split() 644 self.machine_group[machine] = group 645 646 647 def machine_info_dict(self, job): 648 """Reads the machine information of a job. 649 650 @param job: The job object. 651 652 @return: The machine info dictionary. 653 """ 654 hostname = job.machine 655 group = job.machine_group 656 owner = job.machine_owner 657 658 if not group: 659 self.read_machine_map() 660 group = self.machine_group.get(hostname, hostname) 661 if group == hostname and owner: 662 group = owner + '/' + hostname 663 664 return {'hostname': hostname, 'machine_group': group, 'owner': owner} 665 666 667 def insert_or_update_machine(self, job, commit=None): 668 """Insert or updates machine information for the given job. 669 670 Also updates the job object with new machine index, if any. 671 672 @param job: tko.models.job object. 673 @param commit: Whether to commit the database transaction. 674 """ 675 job.machine_idx = self._lookup_machine(job.machine) 676 if not job.machine_idx: 677 job.machine_idx = self._insert_machine(job, commit=commit) 678 elif job.machine: 679 # Only try to update tko_machines record if machine is set. This 680 # prevents unnecessary db writes for suite jobs. 681 self._update_machine_information(job, commit=commit) 682 683 684 def _lookup_task_reference(self, job): 685 """Find the task_reference_id for a given job. Return None if not found. 686 687 @param job: tko.models.job object. 688 """ 689 if job.job_idx is None: 690 return None 691 rows = self.select( 692 'id', 'tko_task_references', {'tko_job_idx': job.job_idx}) 693 if not rows: 694 return None 695 if len(rows) > 1: 696 raise MySQLTooManyRows('Got %d tko_task_references for tko_job %d' 697 % (len(rows), job.job_idx)) 698 return rows[0][0] 699 700 701 def _insert_machine(self, job, commit = None): 702 """Inserts the job machine. 703 704 @param job: The job object. 705 @param commit: If commit the transaction . 706 """ 707 machine_info = self.machine_info_dict(job) 708 self.insert('tko_machines', machine_info, commit=commit) 709 return self.get_last_autonumber_value() 710 711 712 def _update_machine_information(self, job, commit = None): 713 """Updates the job machine information. 714 715 @param job: The job object. 716 @param commit: If commit the transaction . 717 """ 718 machine_info = self.machine_info_dict(job) 719 self.update('tko_machines', machine_info, 720 where={'hostname': machine_info['hostname']}, 721 commit=commit) 722 723 724 def _lookup_machine(self, hostname): 725 """Look up the machine information. 726 727 @param hostname: The hostname as string. 728 """ 729 where = { 'hostname' : hostname } 730 rows = self.select('machine_idx', 'tko_machines', where) 731 if rows: 732 return rows[0][0] 733 else: 734 return None 735 736 737 def lookup_kernel(self, kernel): 738 """Look up the kernel. 739 740 @param kernel: The kernel object. 741 """ 742 rows = self.select('kernel_idx', 'tko_kernels', 743 {'kernel_hash':kernel.kernel_hash}) 744 if rows: 745 return rows[0][0] 746 else: 747 return None 748 749 750 def insert_kernel(self, kernel, commit = None): 751 """Insert a kernel. 752 753 @param kernel: The kernel object. 754 @param commit: If commit the transaction . 755 """ 756 kver = self.lookup_kernel(kernel) 757 if kver: 758 return kver 759 760 # If this kernel has any significant patches, append their hash 761 # as diferentiator. 762 printable = kernel.base 763 patch_count = 0 764 for patch in kernel.patches: 765 match = re.match(r'.*(-mm[0-9]+|-git[0-9]+)\.(bz2|gz)$', 766 patch.reference) 767 if not match: 768 patch_count += 1 769 770 self.insert('tko_kernels', 771 {'base':kernel.base, 772 'kernel_hash':kernel.kernel_hash, 773 'printable':printable}, 774 commit=commit) 775 kver = self.get_last_autonumber_value() 776 777 if patch_count > 0: 778 printable += ' p%d' % (kver) 779 self.update('tko_kernels', 780 {'printable':printable}, 781 {'kernel_idx':kver}) 782 783 for patch in kernel.patches: 784 self.insert_patch(kver, patch, commit=commit) 785 return kver 786 787 788 def insert_patch(self, kver, patch, commit = None): 789 """Insert a kernel patch. 790 791 @param kver: The kernel version. 792 @param patch: The kernel patch object. 793 @param commit: If commit the transaction . 794 """ 795 print(patch.reference) 796 name = os.path.basename(patch.reference)[:80] 797 self.insert('tko_patches', 798 {'kernel_idx': kver, 799 'name':name, 800 'url':patch.reference, 801 'hash':patch.hash}, 802 commit=commit) 803 804 805 def find_tests(self, job_idx): 806 """Find all tests by job index. 807 808 @param job_idx: The job index. 809 @return: A list of tests. 810 """ 811 where = { 'job_idx':job_idx } 812 rows = self.select('test_idx', 'tko_tests', where) 813 if rows: 814 return [row[0] for row in rows] 815 else: 816 return [] 817 818 819 def find_job(self, tag): 820 """Find a job by tag. 821 822 @param tag: The job tag name. 823 @return: The job object or None. 824 """ 825 rows = self.select('job_idx', 'tko_jobs', {'tag': tag}) 826 if rows: 827 return rows[0][0] 828 else: 829 return None 830 831 832def db(*args, **dargs): 833 """Creates an instance of the database class with the arguments 834 provided in args and dargs, using the database type specified by 835 the global configuration (defaulting to mysql). 836 837 @param args: The db_type arguments. 838 @param dargs: The db_type named arguments. 839 840 @return: An db object. 841 """ 842 return db_sql(*args, **dargs) 843