1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5try: 6 import MySQLdb as driver 7except ImportError: 8 # This module (tko) is unconditionally imported by autoserv, 9 # even in environments where MyQSLdb is unavailable. Thus, we 10 # need to cope with import failure here. 11 # See https://bugs.chromium.org/p/chromium/issues/detail?id=860166#c17 for 12 # context. 13 class UtterlyFakeDb(object): 14 """Lame fake of MySQLdb for import time needs of this file.""" 15 OperationalError = object() 16 17 driver = UtterlyFakeDb 18 19import math 20import os 21import random 22import re 23import sys 24import time 25 26import common 27from autotest_lib.client.common_lib import global_config 28from autotest_lib.client.common_lib import utils 29from autotest_lib.client.common_lib.cros import retry 30from autotest_lib.frontend import database_settings_helper 31 32try: 33 from chromite.lib import metrics 34except ImportError: 35 metrics = utils.metrics_mock 36 37 38def _log_error(msg): 39 """Log an error message. 40 41 @param msg: Message string 42 """ 43 print >> sys.stderr, msg 44 sys.stderr.flush() # we want these msgs to show up immediately 45 46 47def _format_operational_error(e): 48 """Format OperationalError. 49 50 @param e: OperationalError instance. 51 """ 52 return ("%s: An operational error occurred during a database " 53 "operation: %s" % (time.strftime("%X %x"), str(e))) 54 55 56class MySQLTooManyRows(Exception): 57 """Too many records.""" 58 pass 59 60 61def _connection_retry_callback(): 62 """Callback method used to increment a retry metric.""" 63 metrics.Counter('chromeos/autotest/tko/connection_retries').increment() 64 65 66class db_sql(object): 67 """Data access.""" 68 69 def __init__(self, debug=False, autocommit=True, host=None, 70 database=None, user=None, password=None): 71 self.debug = debug 72 self.autocommit = autocommit 73 self._load_config(host, database, user, password) 74 75 self.con = None 76 self._init_db() 77 78 # if not present, insert statuses 79 self.status_idx = {} 80 self.status_word = {} 81 status_rows = self.select('status_idx, word', 'tko_status', None) 82 for s in status_rows: 83 self.status_idx[s[1]] = s[0] 84 self.status_word[s[0]] = s[1] 85 86 machine_map = os.path.join(os.path.dirname(__file__), 87 'machines') 88 if os.path.exists(machine_map): 89 self.machine_map = machine_map 90 else: 91 self.machine_map = None 92 self.machine_group = {} 93 94 95 def _load_config(self, host, database, user, password): 96 """Loads configuration settings required to connect to the database. 97 98 This will try to connect to use the settings prefixed with global_db_. 99 If they do not exist, they un-prefixed settings will be used. 100 101 If parameters are supplied, these will be taken instead of the values 102 in global_config. 103 104 The setting of 'host' can be a real host, or a unix socket if it starts 105 with '/'. 106 107 @param host: If set, this host will be used, if not, the host will be 108 retrieved from global_config. 109 @param database: If set, this database will be used, if not, the 110 database will be retrieved from global_config. 111 @param user: If set, this user will be used, if not, the 112 user will be retrieved from global_config. 113 @param password: If set, this password will be used, if not, the 114 password will be retrieved from global_config. 115 """ 116 database_settings = database_settings_helper.get_global_db_config() 117 118 # grab the host, database 119 self.host = host or database_settings['HOST'] 120 self.database = database or database_settings['NAME'] 121 122 # grab the user and password 123 self.user = user or database_settings['USER'] 124 self.password = password or database_settings['PASSWORD'] 125 126 # grab the timeout configuration 127 self.query_timeout =( 128 database_settings.get('OPTIONS', {}).get('timeout', 3600)) 129 130 # Using fallback to non-global in order to work without configuration 131 # overhead on non-shard instances. 132 get_value = global_config.global_config.get_config_value_with_fallback 133 self.min_delay = get_value("AUTOTEST_WEB", "global_db_min_retry_delay", 134 "min_retry_delay", type=int, default=20) 135 self.max_delay = get_value("AUTOTEST_WEB", "global_db_max_retry_delay", 136 "max_retry_delay", type=int, default=60) 137 138 # TODO(beeps): Move this to django settings once we have routers. 139 # On test instances mysql connects through a different port. No point 140 # piping this through our entire infrastructure when it is only really 141 # used for testing; Ideally we would specify this through django 142 # settings and default it to the empty string so django will figure out 143 # the default based on the database backend (eg: mysql, 3306), but until 144 # we have database routers in place any django settings will apply to 145 # both tko and afe. 146 # The intended use of this port is to allow a testing shard vm to 147 # update the master vm's database with test results. Specifying 148 # and empty string will fallback to not even specifying the port 149 # to the backend in tko/db.py. Unfortunately this means retries 150 # won't work on the test cluster till we've migrated to routers. 151 self.port = global_config.global_config.get_config_value( 152 "AUTOTEST_WEB", "global_db_port", type=str, default='') 153 154 155 def _init_db(self): 156 # make sure we clean up any existing connection 157 if self.con: 158 self.con.close() 159 self.con = None 160 161 # create the db connection and cursor 162 self.con = self.connect(self.host, self.database, 163 self.user, self.password, self.port) 164 self.cur = self.con.cursor() 165 166 167 def _random_delay(self): 168 delay = random.randint(self.min_delay, self.max_delay) 169 time.sleep(delay) 170 171 172 @retry.retry(driver.OperationalError, timeout_min=10, 173 delay_sec=5, callback=_connection_retry_callback) 174 def connect(self, host, database, user, password, port): 175 """Open and return a connection to mysql database.""" 176 connection_args = { 177 'db': database, 178 'user': user, 179 'passwd': password, 180 'connect_timeout': 20, 181 } 182 if port: 183 connection_args['port'] = int(port) 184 185 if host.startswith('/'): 186 return driver.connect(unix_socket=host, **connection_args) 187 188 return driver.connect(host=host, **connection_args) 189 190 191 def run_with_retry(self, function, *args, **dargs): 192 """Call function(*args, **dargs) until either it passes 193 without an operational error, or a timeout is reached. 194 This will re-connect to the database, so it is NOT safe 195 to use this inside of a database transaction. 196 197 It can be safely used with transactions, but the 198 transaction start & end must be completely contained 199 within the call to 'function'. 200 201 @param function: The function to run with retry. 202 @param args: The arguments 203 @param dargs: The named arguments. 204 """ 205 success = False 206 start_time = time.time() 207 while not success: 208 try: 209 result = function(*args, **dargs) 210 except driver.OperationalError, e: 211 _log_error("%s; retrying, don't panic yet" 212 % _format_operational_error(e)) 213 stop_time = time.time() 214 elapsed_time = stop_time - start_time 215 if elapsed_time > self.query_timeout: 216 raise 217 else: 218 try: 219 self._random_delay() 220 self._init_db() 221 except driver.OperationalError, e: 222 _log_error('%s; panic now' 223 % _format_operational_error(e)) 224 else: 225 success = True 226 return result 227 228 229 def dprint(self, value): 230 """Print out debug value. 231 232 @param value: The value to print out. 233 """ 234 if self.debug: 235 sys.stdout.write('SQL: ' + str(value) + '\n') 236 237 238 def _commit(self): 239 """Private method for function commit to call for retry. 240 """ 241 return self.con.commit() 242 243 244 def commit(self): 245 """Commit the sql transaction.""" 246 if self.autocommit: 247 return self.run_with_retry(self._commit) 248 else: 249 return self._commit() 250 251 252 def rollback(self): 253 """Rollback the sql transaction.""" 254 self.con.rollback() 255 256 257 def get_last_autonumber_value(self): 258 """Gets the last auto number. 259 260 @return: The last auto number. 261 """ 262 self.cur.execute('SELECT LAST_INSERT_ID()', []) 263 return self.cur.fetchall()[0][0] 264 265 266 def _quote(self, field): 267 return '`%s`' % field 268 269 270 def _where_clause(self, where): 271 if not where: 272 return '', [] 273 274 if isinstance(where, dict): 275 # key/value pairs (which should be equal, or None for null) 276 keys, values = [], [] 277 for field, value in where.iteritems(): 278 quoted_field = self._quote(field) 279 if value is None: 280 keys.append(quoted_field + ' is null') 281 else: 282 keys.append(quoted_field + '=%s') 283 values.append(value) 284 where_clause = ' and '.join(keys) 285 elif isinstance(where, basestring): 286 # the exact string 287 where_clause = where 288 values = [] 289 elif isinstance(where, tuple): 290 # preformatted where clause + values 291 where_clause, values = where 292 assert where_clause 293 else: 294 raise ValueError('Invalid "where" value: %r' % where) 295 296 return ' WHERE ' + where_clause, values 297 298 299 300 def select(self, fields, table, where, distinct=False, group_by=None, 301 max_rows=None): 302 """\ 303 This selects all the fields requested from a 304 specific table with a particular where clause. 305 The where clause can either be a dictionary of 306 field=value pairs, a string, or a tuple of (string, 307 a list of values). The last option is what you 308 should use when accepting user input as it'll 309 protect you against sql injection attacks (if 310 all user data is placed in the array rather than 311 the raw SQL). 312 313 For example: 314 where = ("a = %s AND b = %s", ['val', 'val']) 315 is better than 316 where = "a = 'val' AND b = 'val'" 317 318 @param fields: The list of selected fields string. 319 @param table: The name of the database table. 320 @param where: The where clause string. 321 @param distinct: If select distinct values. 322 @param group_by: Group by clause. 323 @param max_rows: unused. 324 """ 325 cmd = ['select'] 326 if distinct: 327 cmd.append('distinct') 328 cmd += [fields, 'from', table] 329 330 where_clause, values = self._where_clause(where) 331 cmd.append(where_clause) 332 333 if group_by: 334 cmd.append(' GROUP BY ' + group_by) 335 336 self.dprint('%s %s' % (' '.join(cmd), values)) 337 338 # create a re-runable function for executing the query 339 def exec_sql(): 340 """Exeuctes an the sql command.""" 341 sql = ' '.join(cmd) 342 numRec = self.cur.execute(sql, values) 343 if max_rows is not None and numRec > max_rows: 344 msg = 'Exceeded allowed number of records' 345 raise MySQLTooManyRows(msg) 346 return self.cur.fetchall() 347 348 # run the query, re-trying after operational errors 349 if self.autocommit: 350 return self.run_with_retry(exec_sql) 351 else: 352 return exec_sql() 353 354 355 def select_sql(self, fields, table, sql, values): 356 """\ 357 select fields from table "sql" 358 359 @param fields: The list of selected fields string. 360 @param table: The name of the database table. 361 @param sql: The sql string. 362 @param values: The sql string parameter values. 363 """ 364 cmd = 'select %s from %s %s' % (fields, table, sql) 365 self.dprint(cmd) 366 367 # create a -re-runable function for executing the query 368 def _exec_sql(): 369 self.cur.execute(cmd, values) 370 return self.cur.fetchall() 371 372 # run the query, re-trying after operational errors 373 if self.autocommit: 374 return self.run_with_retry(_exec_sql) 375 else: 376 return _exec_sql() 377 378 379 def _exec_sql_with_commit(self, sql, values, commit): 380 if self.autocommit: 381 # re-run the query until it succeeds 382 def _exec_sql(): 383 self.cur.execute(sql, values) 384 self.con.commit() 385 self.run_with_retry(_exec_sql) 386 else: 387 # take one shot at running the query 388 self.cur.execute(sql, values) 389 if commit: 390 self.con.commit() 391 392 393 def insert(self, table, data, commit=None): 394 """\ 395 'insert into table (keys) values (%s ... %s)', values 396 397 data: 398 dictionary of fields and data 399 400 @param table: The name of the table. 401 @param data: The insert data. 402 @param commit: If commit the transaction . 403 """ 404 fields = data.keys() 405 refs = ['%s' for field in fields] 406 values = [data[field] for field in fields] 407 cmd = ('insert into %s (%s) values (%s)' % 408 (table, ','.join(self._quote(field) for field in fields), 409 ','.join(refs))) 410 self.dprint('%s %s' % (cmd, values)) 411 412 self._exec_sql_with_commit(cmd, values, commit) 413 414 415 def delete(self, table, where, commit = None): 416 """Delete entries. 417 418 @param table: The name of the table. 419 @param where: The where clause. 420 @param commit: If commit the transaction . 421 """ 422 cmd = ['delete from', table] 423 if commit is None: 424 commit = self.autocommit 425 where_clause, values = self._where_clause(where) 426 cmd.append(where_clause) 427 sql = ' '.join(cmd) 428 self.dprint('%s %s' % (sql, values)) 429 430 self._exec_sql_with_commit(sql, values, commit) 431 432 433 def update(self, table, data, where, commit = None): 434 """\ 435 'update table set data values (%s ... %s) where ...' 436 437 data: 438 dictionary of fields and data 439 440 @param table: The name of the table. 441 @param data: The sql parameter values. 442 @param where: The where clause. 443 @param commit: If commit the transaction . 444 """ 445 if commit is None: 446 commit = self.autocommit 447 cmd = 'update %s ' % table 448 fields = data.keys() 449 data_refs = [self._quote(field) + '=%s' for field in fields] 450 data_values = [data[field] for field in fields] 451 cmd += ' set ' + ', '.join(data_refs) 452 453 where_clause, where_values = self._where_clause(where) 454 cmd += where_clause 455 456 values = data_values + where_values 457 self.dprint('%s %s' % (cmd, values)) 458 459 self._exec_sql_with_commit(cmd, values, commit) 460 461 462 def delete_job(self, tag, commit = None): 463 """Delete a tko job. 464 465 @param tag: The job tag. 466 @param commit: If commit the transaction . 467 """ 468 job_idx = self.find_job(tag) 469 for test_idx in self.find_tests(job_idx): 470 where = {'test_idx' : test_idx} 471 self.delete('tko_iteration_result', where) 472 self.delete('tko_iteration_perf_value', where) 473 self.delete('tko_iteration_attributes', where) 474 self.delete('tko_test_attributes', where) 475 self.delete('tko_test_labels_tests', {'test_id': test_idx}) 476 where = {'job_idx' : job_idx} 477 self.delete('tko_tests', where) 478 self.delete('tko_jobs', where) 479 480 481 def insert_job(self, tag, job, commit=None): 482 """Insert a tko job. 483 484 @param tag: The job tag. 485 @param job: The job object. 486 @param commit: If commit the transaction . 487 """ 488 data = self._get_common_job_data(tag, job) 489 data.update({ 490 'afe_job_id': job.afe_job_id, 491 'afe_parent_job_id': job.afe_parent_job_id, 492 }) 493 if job.job_idx is not None: 494 self.update( 495 'tko_jobs', data, {'job_idx': job.job_idx}, commit=commit) 496 else: 497 self.insert('tko_jobs', data, commit=commit) 498 job.job_idx = self.get_last_autonumber_value() 499 500 501 def _get_common_job_data(self, tag, job): 502 """Construct a dictionary with the common data to insert in job/task.""" 503 return { 504 'tag':tag, 505 'label': job.label, 506 'username': job.user, 507 'machine_idx': job.machine_idx, 508 'queued_time': job.queued_time, 509 'started_time': job.started_time, 510 'finished_time': job.finished_time, 511 'build': job.build, 512 'build_version': job.build_version, 513 'board': job.board, 514 'suite': job.suite, 515 } 516 517 518 def insert_or_update_task_reference(self, job, reference_type, commit=None): 519 """Insert an entry in the tko_task_references table. 520 521 The job should already have been inserted in tko_jobs. 522 @param job: tko.models.job object. 523 @param reference_type: The type of reference to insert. 524 One of: {'afe', 'skylab'} 525 @param commit: Whether to commit this transaction. 526 """ 527 assert reference_type in {'afe', 'skylab'} 528 if reference_type == 'afe': 529 task_id = job.afe_job_id 530 parent_task_id = job.afe_parent_job_id 531 else: 532 task_id = job.skylab_task_id 533 parent_task_id = job.skylab_parent_task_id 534 data = { 535 'reference_type': reference_type, 536 'tko_job_idx': job.job_idx, 537 'task_id': task_id, 538 'parent_task_id': parent_task_id, 539 } 540 541 task_reference_id = self._lookup_task_reference(job) 542 if task_reference_id is not None: 543 self.update('tko_task_references', 544 data, 545 {'id': task_reference_id}, 546 commit=commit) 547 job.task_reference_id = task_reference_id 548 else: 549 self.insert('tko_task_references', data, commit=commit) 550 job.task_reference_id = self.get_last_autonumber_value() 551 552 553 def update_job_keyvals(self, job, commit=None): 554 """Updates the job key values. 555 556 @param job: The job object. 557 @param commit: If commit the transaction . 558 """ 559 for key, value in job.keyval_dict.iteritems(): 560 where = {'job_id': job.job_idx, 'key': key} 561 data = dict(where, value=value) 562 exists = self.select('id', 'tko_job_keyvals', where=where) 563 564 if exists: 565 self.update('tko_job_keyvals', data, where=where, commit=commit) 566 else: 567 self.insert('tko_job_keyvals', data, commit=commit) 568 569 570 def insert_test(self, job, test, commit = None): 571 """Inserts a job test. 572 573 @param job: The job object. 574 @param test: The test object. 575 @param commit: If commit the transaction . 576 """ 577 kver = self.insert_kernel(test.kernel, commit=commit) 578 data = {'job_idx':job.job_idx, 'test':test.testname, 579 'subdir':test.subdir, 'kernel_idx':kver, 580 'status':self.status_idx[test.status], 581 'reason':test.reason, 'machine_idx':job.machine_idx, 582 'started_time': test.started_time, 583 'finished_time':test.finished_time} 584 is_update = hasattr(test, "test_idx") 585 if is_update: 586 test_idx = test.test_idx 587 self.update('tko_tests', data, 588 {'test_idx': test_idx}, commit=commit) 589 where = {'test_idx': test_idx} 590 self.delete('tko_iteration_result', where) 591 self.delete('tko_iteration_perf_value', where) 592 self.delete('tko_iteration_attributes', where) 593 where['user_created'] = 0 594 self.delete('tko_test_attributes', where) 595 else: 596 self.insert('tko_tests', data, commit=commit) 597 test_idx = test.test_idx = self.get_last_autonumber_value() 598 data = {'test_idx': test_idx} 599 600 for i in test.iterations: 601 data['iteration'] = i.index 602 for key, value in i.attr_keyval.iteritems(): 603 data['attribute'] = key 604 data['value'] = value 605 self.insert('tko_iteration_attributes', data, 606 commit=commit) 607 for key, value in i.perf_keyval.iteritems(): 608 data['attribute'] = key 609 if math.isnan(value) or math.isinf(value): 610 data['value'] = None 611 else: 612 data['value'] = value 613 self.insert('tko_iteration_result', data, 614 commit=commit) 615 616 data = {'test_idx': test_idx} 617 618 for key, value in test.attributes.iteritems(): 619 data = {'test_idx': test_idx, 'attribute': key, 620 'value': value} 621 try: 622 self.insert('tko_test_attributes', data, commit=commit) 623 except: 624 _log_error('Uploading attribute %r' % (data)) 625 raise 626 627 if not is_update: 628 for label_index in test.labels: 629 data = {'test_id': test_idx, 'testlabel_id': label_index} 630 self.insert('tko_test_labels_tests', data, commit=commit) 631 632 633 def read_machine_map(self): 634 """Reads the machine map.""" 635 if self.machine_group or not self.machine_map: 636 return 637 for line in open(self.machine_map, 'r').readlines(): 638 (machine, group) = line.split() 639 self.machine_group[machine] = group 640 641 642 def machine_info_dict(self, job): 643 """Reads the machine information of a job. 644 645 @param job: The job object. 646 647 @return: The machine info dictionary. 648 """ 649 hostname = job.machine 650 group = job.machine_group 651 owner = job.machine_owner 652 653 if not group: 654 self.read_machine_map() 655 group = self.machine_group.get(hostname, hostname) 656 if group == hostname and owner: 657 group = owner + '/' + hostname 658 659 return {'hostname': hostname, 'machine_group': group, 'owner': owner} 660 661 662 def insert_or_update_machine(self, job, commit=None): 663 """Insert or updates machine information for the given job. 664 665 Also updates the job object with new machine index, if any. 666 667 @param job: tko.models.job object. 668 @param commit: Whether to commit the database transaction. 669 """ 670 job.machine_idx = self._lookup_machine(job.machine) 671 if not job.machine_idx: 672 job.machine_idx = self._insert_machine(job, commit=commit) 673 elif job.machine: 674 # Only try to update tko_machines record if machine is set. This 675 # prevents unnecessary db writes for suite jobs. 676 self._update_machine_information(job, commit=commit) 677 678 679 def _lookup_task_reference(self, job): 680 """Find the task_reference_id for a given job. Return None if not found. 681 682 @param job: tko.models.job object. 683 """ 684 if job.job_idx is None: 685 return None 686 rows = self.select( 687 'id', 'tko_task_references', {'tko_job_idx': job.job_idx}) 688 if not rows: 689 return None 690 if len(rows) > 1: 691 raise MySQLTooManyRows('Got %d tko_task_references for tko_job %d' 692 % (len(rows), job.job_idx)) 693 return rows[0][0] 694 695 696 def _insert_machine(self, job, commit = None): 697 """Inserts the job machine. 698 699 @param job: The job object. 700 @param commit: If commit the transaction . 701 """ 702 machine_info = self.machine_info_dict(job) 703 self.insert('tko_machines', machine_info, commit=commit) 704 return self.get_last_autonumber_value() 705 706 707 def _update_machine_information(self, job, commit = None): 708 """Updates the job machine information. 709 710 @param job: The job object. 711 @param commit: If commit the transaction . 712 """ 713 machine_info = self.machine_info_dict(job) 714 self.update('tko_machines', machine_info, 715 where={'hostname': machine_info['hostname']}, 716 commit=commit) 717 718 719 def _lookup_machine(self, hostname): 720 """Look up the machine information. 721 722 @param hostname: The hostname as string. 723 """ 724 where = { 'hostname' : hostname } 725 rows = self.select('machine_idx', 'tko_machines', where) 726 if rows: 727 return rows[0][0] 728 else: 729 return None 730 731 732 def lookup_kernel(self, kernel): 733 """Look up the kernel. 734 735 @param kernel: The kernel object. 736 """ 737 rows = self.select('kernel_idx', 'tko_kernels', 738 {'kernel_hash':kernel.kernel_hash}) 739 if rows: 740 return rows[0][0] 741 else: 742 return None 743 744 745 def insert_kernel(self, kernel, commit = None): 746 """Insert a kernel. 747 748 @param kernel: The kernel object. 749 @param commit: If commit the transaction . 750 """ 751 kver = self.lookup_kernel(kernel) 752 if kver: 753 return kver 754 755 # If this kernel has any significant patches, append their hash 756 # as diferentiator. 757 printable = kernel.base 758 patch_count = 0 759 for patch in kernel.patches: 760 match = re.match(r'.*(-mm[0-9]+|-git[0-9]+)\.(bz2|gz)$', 761 patch.reference) 762 if not match: 763 patch_count += 1 764 765 self.insert('tko_kernels', 766 {'base':kernel.base, 767 'kernel_hash':kernel.kernel_hash, 768 'printable':printable}, 769 commit=commit) 770 kver = self.get_last_autonumber_value() 771 772 if patch_count > 0: 773 printable += ' p%d' % (kver) 774 self.update('tko_kernels', 775 {'printable':printable}, 776 {'kernel_idx':kver}) 777 778 for patch in kernel.patches: 779 self.insert_patch(kver, patch, commit=commit) 780 return kver 781 782 783 def insert_patch(self, kver, patch, commit = None): 784 """Insert a kernel patch. 785 786 @param kver: The kernel version. 787 @param patch: The kernel patch object. 788 @param commit: If commit the transaction . 789 """ 790 print patch.reference 791 name = os.path.basename(patch.reference)[:80] 792 self.insert('tko_patches', 793 {'kernel_idx': kver, 794 'name':name, 795 'url':patch.reference, 796 'hash':patch.hash}, 797 commit=commit) 798 799 800 def find_test(self, job_idx, testname, subdir): 801 """Find a test by name. 802 803 @param job_idx: The job index. 804 @param testname: The test name. 805 @param subdir: The test sub directory under the job directory. 806 """ 807 where = {'job_idx': job_idx , 'test': testname, 'subdir': subdir} 808 rows = self.select('test_idx', 'tko_tests', where) 809 if rows: 810 return rows[0][0] 811 else: 812 return None 813 814 815 def find_tests(self, job_idx): 816 """Find all tests by job index. 817 818 @param job_idx: The job index. 819 @return: A list of tests. 820 """ 821 where = { 'job_idx':job_idx } 822 rows = self.select('test_idx', 'tko_tests', where) 823 if rows: 824 return [row[0] for row in rows] 825 else: 826 return [] 827 828 829 def find_job(self, tag): 830 """Find a job by tag. 831 832 @param tag: The job tag name. 833 @return: The job object or None. 834 """ 835 rows = self.select('job_idx', 'tko_jobs', {'tag': tag}) 836 if rows: 837 return rows[0][0] 838 else: 839 return None 840 841 842 def get_child_tests_by_parent_task_id(self, parent_task_id): 843 """Get the child tests by a parent task id. 844 845 @param parent_task_id: A string parent task id in tko_task_references. 846 @return: A list of view dicts, which has key 'test_name' and 'status'. 847 """ 848 if not parent_task_id: 849 raise ValueError('no parent_task_id supplied') 850 rows = self.select('tko_job_idx', 'tko_task_references', 851 {'parent_task_id': parent_task_id}) 852 tko_job_ids = [str(r[0]) for r in rows] 853 if not tko_job_ids: 854 return [] 855 fields = ['test_name', 'status'] 856 where = 'job_idx in (%s)' % ', '.join(tko_job_ids) 857 rows = self.select(', '.join(fields), 'tko_test_view_2', where) 858 return [{'test_name': r[0], 'status': r[1]} for r in rows] 859 860 861def db(*args, **dargs): 862 """Creates an instance of the database class with the arguments 863 provided in args and dargs, using the database type specified by 864 the global configuration (defaulting to mysql). 865 866 @param args: The db_type arguments. 867 @param dargs: The db_type named arguments. 868 869 @return: An db object. 870 """ 871 return db_sql(*args, **dargs) 872