1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import math 6import os 7import random 8import re 9import sys 10import time 11 12import common 13from autotest_lib.client.common_lib import global_config 14from autotest_lib.frontend import database_settings_helper 15from autotest_lib.tko import utils 16 17 18def _log_error(msg): 19 """Log an error message. 20 21 @param msg: Message string 22 """ 23 print >> sys.stderr, msg 24 sys.stderr.flush() # we want these msgs to show up immediately 25 26 27def _format_operational_error(e): 28 """Format OperationalError. 29 30 @param e: OperationalError instance. 31 """ 32 return ("%s: An operational error occurred during a database " 33 "operation: %s" % (time.strftime("%X %x"), str(e))) 34 35 36class MySQLTooManyRows(Exception): 37 """Too many records.""" 38 pass 39 40 41class db_sql(object): 42 """Data access.""" 43 44 def __init__(self, debug=False, autocommit=True, host=None, 45 database=None, user=None, password=None): 46 self.debug = debug 47 self.autocommit = autocommit 48 self._load_config(host, database, user, password) 49 50 self.con = None 51 self._init_db() 52 53 # if not present, insert statuses 54 self.status_idx = {} 55 self.status_word = {} 56 status_rows = self.select('status_idx, word', 'tko_status', None) 57 for s in status_rows: 58 self.status_idx[s[1]] = s[0] 59 self.status_word[s[0]] = s[1] 60 61 machine_map = os.path.join(os.path.dirname(__file__), 62 'machines') 63 if os.path.exists(machine_map): 64 self.machine_map = machine_map 65 else: 66 self.machine_map = None 67 self.machine_group = {} 68 69 70 def _load_config(self, host, database, user, password): 71 """Loads configuration settings required to connect to the database. 72 73 This will try to connect to use the settings prefixed with global_db_. 74 If they do not exist, they un-prefixed settings will be used. 75 76 If parameters are supplied, these will be taken instead of the values 77 in global_config. 78 79 @param host: If set, this host will be used, if not, the host will be 80 retrieved from global_config. 81 @param database: If set, this database will be used, if not, the 82 database will be retrieved from global_config. 83 @param user: If set, this user will be used, if not, the 84 user will be retrieved from global_config. 85 @param password: If set, this password will be used, if not, the 86 password will be retrieved from global_config. 87 """ 88 database_settings = database_settings_helper.get_global_db_config() 89 90 # grab the host, database 91 self.host = host or database_settings['HOST'] 92 self.database = database or database_settings['NAME'] 93 94 # grab the user and password 95 self.user = user or database_settings['USER'] 96 self.password = password or database_settings['PASSWORD'] 97 98 # grab the timeout configuration 99 self.query_timeout =( 100 database_settings.get('OPTIONS', {}).get('timeout', 3600)) 101 102 # Using fallback to non-global in order to work without configuration 103 # overhead on non-shard instances. 104 get_value = global_config.global_config.get_config_value_with_fallback 105 self.min_delay = get_value("AUTOTEST_WEB", "global_db_min_retry_delay", 106 "min_retry_delay", type=int, default=20) 107 self.max_delay = get_value("AUTOTEST_WEB", "global_db_max_retry_delay", 108 "max_retry_delay", type=int, default=60) 109 110 # TODO(beeps): Move this to django settings once we have routers. 111 # On test instances mysql connects through a different port. No point 112 # piping this through our entire infrastructure when it is only really 113 # used for testing; Ideally we would specify this through django 114 # settings and default it to the empty string so django will figure out 115 # the default based on the database backend (eg: mysql, 3306), but until 116 # we have database routers in place any django settings will apply to 117 # both tko and afe. 118 # The intended use of this port is to allow a testing shard vm to 119 # update the master vm's database with test results. Specifying 120 # and empty string will fallback to not even specifying the port 121 # to the backend in tko/db.py. Unfortunately this means retries 122 # won't work on the test cluster till we've migrated to routers. 123 self.port = global_config.global_config.get_config_value( 124 "AUTOTEST_WEB", "global_db_port", type=str, default='') 125 126 127 def _init_db(self): 128 # make sure we clean up any existing connection 129 if self.con: 130 self.con.close() 131 self.con = None 132 133 # create the db connection and cursor 134 self.con = self.connect(self.host, self.database, 135 self.user, self.password, self.port) 136 self.cur = self.con.cursor() 137 138 139 def _random_delay(self): 140 delay = random.randint(self.min_delay, self.max_delay) 141 time.sleep(delay) 142 143 144 def run_with_retry(self, function, *args, **dargs): 145 """Call function(*args, **dargs) until either it passes 146 without an operational error, or a timeout is reached. 147 This will re-connect to the database, so it is NOT safe 148 to use this inside of a database transaction. 149 150 It can be safely used with transactions, but the 151 transaction start & end must be completely contained 152 within the call to 'function'. 153 154 @param function: The function to run with retry. 155 @param args: The arguments 156 @param dargs: The named arguments. 157 """ 158 OperationalError = _get_error_class("OperationalError") 159 160 success = False 161 start_time = time.time() 162 while not success: 163 try: 164 result = function(*args, **dargs) 165 except OperationalError, e: 166 _log_error("%s; retrying, don't panic yet" 167 % _format_operational_error(e)) 168 stop_time = time.time() 169 elapsed_time = stop_time - start_time 170 if elapsed_time > self.query_timeout: 171 raise 172 else: 173 try: 174 self._random_delay() 175 self._init_db() 176 except OperationalError, e: 177 _log_error('%s; panic now' 178 % _format_operational_error(e)) 179 else: 180 success = True 181 return result 182 183 184 def dprint(self, value): 185 """Print out debug value. 186 187 @param value: The value to print out. 188 """ 189 if self.debug: 190 sys.stdout.write('SQL: ' + str(value) + '\n') 191 192 193 def _commit(self): 194 """Private method for function commit to call for retry. 195 """ 196 return self.con.commit() 197 198 199 def commit(self): 200 """Commit the sql transaction.""" 201 if self.autocommit: 202 return self.run_with_retry(self._commit) 203 else: 204 return self._commit() 205 206 207 def rollback(self): 208 """Rollback the sql transaction.""" 209 self.con.rollback() 210 211 212 def get_last_autonumber_value(self): 213 """Gets the last auto number. 214 215 @return: The last auto number. 216 """ 217 self.cur.execute('SELECT LAST_INSERT_ID()', []) 218 return self.cur.fetchall()[0][0] 219 220 221 def _quote(self, field): 222 return '`%s`' % field 223 224 225 def _where_clause(self, where): 226 if not where: 227 return '', [] 228 229 if isinstance(where, dict): 230 # key/value pairs (which should be equal, or None for null) 231 keys, values = [], [] 232 for field, value in where.iteritems(): 233 quoted_field = self._quote(field) 234 if value is None: 235 keys.append(quoted_field + ' is null') 236 else: 237 keys.append(quoted_field + '=%s') 238 values.append(value) 239 where_clause = ' and '.join(keys) 240 elif isinstance(where, basestring): 241 # the exact string 242 where_clause = where 243 values = [] 244 elif isinstance(where, tuple): 245 # preformatted where clause + values 246 where_clause, values = where 247 assert where_clause 248 else: 249 raise ValueError('Invalid "where" value: %r' % where) 250 251 return ' WHERE ' + where_clause, values 252 253 254 255 def select(self, fields, table, where, distinct=False, group_by=None, 256 max_rows=None): 257 """\ 258 This selects all the fields requested from a 259 specific table with a particular where clause. 260 The where clause can either be a dictionary of 261 field=value pairs, a string, or a tuple of (string, 262 a list of values). The last option is what you 263 should use when accepting user input as it'll 264 protect you against sql injection attacks (if 265 all user data is placed in the array rather than 266 the raw SQL). 267 268 For example: 269 where = ("a = %s AND b = %s", ['val', 'val']) 270 is better than 271 where = "a = 'val' AND b = 'val'" 272 273 @param fields: The list of selected fields string. 274 @param table: The name of the database table. 275 @param where: The where clause string. 276 @param distinct: If select distinct values. 277 @param group_by: Group by clause. 278 @param max_rows: unused. 279 """ 280 cmd = ['select'] 281 if distinct: 282 cmd.append('distinct') 283 cmd += [fields, 'from', table] 284 285 where_clause, values = self._where_clause(where) 286 cmd.append(where_clause) 287 288 if group_by: 289 cmd.append(' GROUP BY ' + group_by) 290 291 self.dprint('%s %s' % (' '.join(cmd), values)) 292 293 # create a re-runable function for executing the query 294 def exec_sql(): 295 """Exeuctes an the sql command.""" 296 sql = ' '.join(cmd) 297 numRec = self.cur.execute(sql, values) 298 if max_rows is not None and numRec > max_rows: 299 msg = 'Exceeded allowed number of records' 300 raise MySQLTooManyRows(msg) 301 return self.cur.fetchall() 302 303 # run the query, re-trying after operational errors 304 if self.autocommit: 305 return self.run_with_retry(exec_sql) 306 else: 307 return exec_sql() 308 309 310 def select_sql(self, fields, table, sql, values): 311 """\ 312 select fields from table "sql" 313 314 @param fields: The list of selected fields string. 315 @param table: The name of the database table. 316 @param sql: The sql string. 317 @param values: The sql string parameter values. 318 """ 319 cmd = 'select %s from %s %s' % (fields, table, sql) 320 self.dprint(cmd) 321 322 # create a -re-runable function for executing the query 323 def _exec_sql(): 324 self.cur.execute(cmd, values) 325 return self.cur.fetchall() 326 327 # run the query, re-trying after operational errors 328 if self.autocommit: 329 return self.run_with_retry(_exec_sql) 330 else: 331 return _exec_sql() 332 333 334 def _exec_sql_with_commit(self, sql, values, commit): 335 if self.autocommit: 336 # re-run the query until it succeeds 337 def _exec_sql(): 338 self.cur.execute(sql, values) 339 self.con.commit() 340 self.run_with_retry(_exec_sql) 341 else: 342 # take one shot at running the query 343 self.cur.execute(sql, values) 344 if commit: 345 self.con.commit() 346 347 348 def insert(self, table, data, commit=None): 349 """\ 350 'insert into table (keys) values (%s ... %s)', values 351 352 data: 353 dictionary of fields and data 354 355 @param table: The name of the table. 356 @param data: The insert data. 357 @param commit: If commit the transaction . 358 """ 359 fields = data.keys() 360 refs = ['%s' for field in fields] 361 values = [data[field] for field in fields] 362 cmd = ('insert into %s (%s) values (%s)' % 363 (table, ','.join(self._quote(field) for field in fields), 364 ','.join(refs))) 365 self.dprint('%s %s' % (cmd, values)) 366 367 self._exec_sql_with_commit(cmd, values, commit) 368 369 370 def delete(self, table, where, commit = None): 371 """Delete entries. 372 373 @param table: The name of the table. 374 @param where: The where clause. 375 @param commit: If commit the transaction . 376 """ 377 cmd = ['delete from', table] 378 if commit is None: 379 commit = self.autocommit 380 where_clause, values = self._where_clause(where) 381 cmd.append(where_clause) 382 sql = ' '.join(cmd) 383 self.dprint('%s %s' % (sql, values)) 384 385 self._exec_sql_with_commit(sql, values, commit) 386 387 388 def update(self, table, data, where, commit = None): 389 """\ 390 'update table set data values (%s ... %s) where ...' 391 392 data: 393 dictionary of fields and data 394 395 @param table: The name of the table. 396 @param data: The sql parameter values. 397 @param where: The where clause. 398 @param commit: If commit the transaction . 399 """ 400 if commit is None: 401 commit = self.autocommit 402 cmd = 'update %s ' % table 403 fields = data.keys() 404 data_refs = [self._quote(field) + '=%s' for field in fields] 405 data_values = [data[field] for field in fields] 406 cmd += ' set ' + ', '.join(data_refs) 407 408 where_clause, where_values = self._where_clause(where) 409 cmd += where_clause 410 411 values = data_values + where_values 412 self.dprint('%s %s' % (cmd, values)) 413 414 self._exec_sql_with_commit(cmd, values, commit) 415 416 417 def delete_job(self, tag, commit = None): 418 """Delete a tko job. 419 420 @param tag: The job tag. 421 @param commit: If commit the transaction . 422 """ 423 job_idx = self.find_job(tag) 424 for test_idx in self.find_tests(job_idx): 425 where = {'test_idx' : test_idx} 426 self.delete('tko_iteration_result', where) 427 self.delete('tko_iteration_perf_value', where) 428 self.delete('tko_iteration_attributes', where) 429 self.delete('tko_test_attributes', where) 430 self.delete('tko_test_labels_tests', {'test_id': test_idx}) 431 where = {'job_idx' : job_idx} 432 self.delete('tko_tests', where) 433 self.delete('tko_jobs', where) 434 435 436 def insert_job(self, tag, job, parent_job_id=None, commit=None): 437 """Insert a tko job. 438 439 @param tag: The job tag. 440 @param job: The job object. 441 @param parent_job_id: The parent job id. 442 @param commit: If commit the transaction . 443 444 @return The dict of data inserted into the tko_jobs table. 445 """ 446 job.machine_idx = self.lookup_machine(job.machine) 447 if not job.machine_idx: 448 job.machine_idx = self.insert_machine(job, commit=commit) 449 elif job.machine: 450 # Only try to update tko_machines record if machine is set. This 451 # prevents unnecessary db writes for suite jobs. 452 self.update_machine_information(job, commit=commit) 453 454 afe_job_id = utils.get_afe_job_id(tag) 455 456 data = {'tag':tag, 457 'label': job.label, 458 'username': job.user, 459 'machine_idx': job.machine_idx, 460 'queued_time': job.queued_time, 461 'started_time': job.started_time, 462 'finished_time': job.finished_time, 463 'afe_job_id': afe_job_id, 464 'afe_parent_job_id': parent_job_id, 465 'build': job.build, 466 'build_version': job.build_version, 467 'board': job.board, 468 'suite': job.suite} 469 job.afe_job_id = afe_job_id 470 if parent_job_id: 471 job.afe_parent_job_id = str(parent_job_id) 472 473 # TODO(ntang): check job.index directly. 474 is_update = hasattr(job, 'index') 475 if is_update: 476 self.update('tko_jobs', data, {'job_idx': job.index}, commit=commit) 477 else: 478 self.insert('tko_jobs', data, commit=commit) 479 job.index = self.get_last_autonumber_value() 480 self.update_job_keyvals(job, commit=commit) 481 for test in job.tests: 482 self.insert_test(job, test, commit=commit) 483 484 data['job_idx'] = job.index 485 return data 486 487 488 def update_job_keyvals(self, job, commit=None): 489 """Updates the job key values. 490 491 @param job: The job object. 492 @param commit: If commit the transaction . 493 """ 494 for key, value in job.keyval_dict.iteritems(): 495 where = {'job_id': job.index, 'key': key} 496 data = dict(where, value=value) 497 exists = self.select('id', 'tko_job_keyvals', where=where) 498 499 if exists: 500 self.update('tko_job_keyvals', data, where=where, commit=commit) 501 else: 502 self.insert('tko_job_keyvals', data, commit=commit) 503 504 505 def insert_test(self, job, test, commit = None): 506 """Inserts a job test. 507 508 @param job: The job object. 509 @param test: The test object. 510 @param commit: If commit the transaction . 511 """ 512 kver = self.insert_kernel(test.kernel, commit=commit) 513 data = {'job_idx':job.index, 'test':test.testname, 514 'subdir':test.subdir, 'kernel_idx':kver, 515 'status':self.status_idx[test.status], 516 'reason':test.reason, 'machine_idx':job.machine_idx, 517 'started_time': test.started_time, 518 'finished_time':test.finished_time} 519 is_update = hasattr(test, "test_idx") 520 if is_update: 521 test_idx = test.test_idx 522 self.update('tko_tests', data, 523 {'test_idx': test_idx}, commit=commit) 524 where = {'test_idx': test_idx} 525 self.delete('tko_iteration_result', where) 526 self.delete('tko_iteration_perf_value', where) 527 self.delete('tko_iteration_attributes', where) 528 where['user_created'] = 0 529 self.delete('tko_test_attributes', where) 530 else: 531 self.insert('tko_tests', data, commit=commit) 532 test_idx = test.test_idx = self.get_last_autonumber_value() 533 data = {'test_idx': test_idx} 534 535 for i in test.iterations: 536 data['iteration'] = i.index 537 for key, value in i.attr_keyval.iteritems(): 538 data['attribute'] = key 539 data['value'] = value 540 self.insert('tko_iteration_attributes', data, 541 commit=commit) 542 for key, value in i.perf_keyval.iteritems(): 543 data['attribute'] = key 544 if math.isnan(value) or math.isinf(value): 545 data['value'] = None 546 else: 547 data['value'] = value 548 self.insert('tko_iteration_result', data, 549 commit=commit) 550 551 data = {'test_idx': test_idx} 552 553 for key, value in test.attributes.iteritems(): 554 data = {'test_idx': test_idx, 'attribute': key, 555 'value': value} 556 self.insert('tko_test_attributes', data, commit=commit) 557 558 if not is_update: 559 for label_index in test.labels: 560 data = {'test_id': test_idx, 'testlabel_id': label_index} 561 self.insert('tko_test_labels_tests', data, commit=commit) 562 563 564 def read_machine_map(self): 565 """Reads the machine map.""" 566 if self.machine_group or not self.machine_map: 567 return 568 for line in open(self.machine_map, 'r').readlines(): 569 (machine, group) = line.split() 570 self.machine_group[machine] = group 571 572 573 def machine_info_dict(self, job): 574 """Reads the machine information of a job. 575 576 @param job: The job object. 577 578 @return: The machine info dictionary. 579 """ 580 hostname = job.machine 581 group = job.machine_group 582 owner = job.machine_owner 583 584 if not group: 585 self.read_machine_map() 586 group = self.machine_group.get(hostname, hostname) 587 if group == hostname and owner: 588 group = owner + '/' + hostname 589 590 return {'hostname': hostname, 'machine_group': group, 'owner': owner} 591 592 593 def insert_machine(self, job, commit = None): 594 """Inserts the job machine. 595 596 @param job: The job object. 597 @param commit: If commit the transaction . 598 """ 599 machine_info = self.machine_info_dict(job) 600 self.insert('tko_machines', machine_info, commit=commit) 601 return self.get_last_autonumber_value() 602 603 604 def update_machine_information(self, job, commit = None): 605 """Updates the job machine information. 606 607 @param job: The job object. 608 @param commit: If commit the transaction . 609 """ 610 machine_info = self.machine_info_dict(job) 611 self.update('tko_machines', machine_info, 612 where={'hostname': machine_info['hostname']}, 613 commit=commit) 614 615 616 def lookup_machine(self, hostname): 617 """Look up the machine information. 618 619 @param hostname: The hostname as string. 620 """ 621 where = { 'hostname' : hostname } 622 rows = self.select('machine_idx', 'tko_machines', where) 623 if rows: 624 return rows[0][0] 625 else: 626 return None 627 628 629 def lookup_kernel(self, kernel): 630 """Look up the kernel. 631 632 @param kernel: The kernel object. 633 """ 634 rows = self.select('kernel_idx', 'tko_kernels', 635 {'kernel_hash':kernel.kernel_hash}) 636 if rows: 637 return rows[0][0] 638 else: 639 return None 640 641 642 def insert_kernel(self, kernel, commit = None): 643 """Insert a kernel. 644 645 @param kernel: The kernel object. 646 @param commit: If commit the transaction . 647 """ 648 kver = self.lookup_kernel(kernel) 649 if kver: 650 return kver 651 652 # If this kernel has any significant patches, append their hash 653 # as diferentiator. 654 printable = kernel.base 655 patch_count = 0 656 for patch in kernel.patches: 657 match = re.match(r'.*(-mm[0-9]+|-git[0-9]+)\.(bz2|gz)$', 658 patch.reference) 659 if not match: 660 patch_count += 1 661 662 self.insert('tko_kernels', 663 {'base':kernel.base, 664 'kernel_hash':kernel.kernel_hash, 665 'printable':printable}, 666 commit=commit) 667 kver = self.get_last_autonumber_value() 668 669 if patch_count > 0: 670 printable += ' p%d' % (kver) 671 self.update('tko_kernels', 672 {'printable':printable}, 673 {'kernel_idx':kver}) 674 675 for patch in kernel.patches: 676 self.insert_patch(kver, patch, commit=commit) 677 return kver 678 679 680 def insert_patch(self, kver, patch, commit = None): 681 """Insert a kernel patch. 682 683 @param kver: The kernel version. 684 @param patch: The kernel patch object. 685 @param commit: If commit the transaction . 686 """ 687 print patch.reference 688 name = os.path.basename(patch.reference)[:80] 689 self.insert('tko_patches', 690 {'kernel_idx': kver, 691 'name':name, 692 'url':patch.reference, 693 'hash':patch.hash}, 694 commit=commit) 695 696 697 def find_test(self, job_idx, testname, subdir): 698 """Find a test by name. 699 700 @param job_idx: The job index. 701 @param testname: The test name. 702 @param subdir: The test sub directory under the job directory. 703 """ 704 where = {'job_idx': job_idx , 'test': testname, 'subdir': subdir} 705 rows = self.select('test_idx', 'tko_tests', where) 706 if rows: 707 return rows[0][0] 708 else: 709 return None 710 711 712 def find_tests(self, job_idx): 713 """Find all tests by job index. 714 715 @param job_idx: The job index. 716 @return: A list of tests. 717 """ 718 where = { 'job_idx':job_idx } 719 rows = self.select('test_idx', 'tko_tests', where) 720 if rows: 721 return [row[0] for row in rows] 722 else: 723 return [] 724 725 726 def find_job(self, tag): 727 """Find a job by tag. 728 729 @param tag: The job tag name. 730 @return: The job object or None. 731 """ 732 rows = self.select('job_idx', 'tko_jobs', {'tag': tag}) 733 if rows: 734 return rows[0][0] 735 else: 736 return None 737 738 739def _get_db_type(): 740 """Get the database type name to use from the global config.""" 741 get_value = global_config.global_config.get_config_value_with_fallback 742 return "db_" + get_value("AUTOTEST_WEB", "global_db_type", "db_type", 743 default="mysql") 744 745 746def _get_error_class(class_name): 747 """Retrieves the appropriate error class by name from the database 748 module.""" 749 db_module = __import__("autotest_lib.tko." + _get_db_type(), 750 globals(), locals(), ["driver"]) 751 return getattr(db_module.driver, class_name) 752 753 754def db(*args, **dargs): 755 """Creates an instance of the database class with the arguments 756 provided in args and dargs, using the database type specified by 757 the global configuration (defaulting to mysql). 758 759 @param args: The db_type arguments. 760 @param dargs: The db_type named arguments. 761 762 @return: An db object. 763 """ 764 db_type = _get_db_type() 765 db_module = __import__("autotest_lib.tko." + db_type, globals(), 766 locals(), [db_type]) 767 db = getattr(db_module, db_type)(*args, **dargs) 768 return db 769