1#!/usr/bin/env python3 2# 3# Copyright (C) 2018 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16 17import logging 18from datetime import datetime 19import inspect 20import os 21import socket 22import string 23import subprocess 24import time 25import yaml 26from multiprocessing.managers import BaseManager 27 28from google.api_core.exceptions import NotFound 29from google.cloud import bigquery 30 31_TIMESTAMP_STR_FORMAT = '%Y-%m-%d %H:%M:%S' 32_AUTOMATIC_LOGGER_SCRIPT = 'bigquery_scheduled_automatic_client.py' 33_SERVER_SCRIPT = 'bigquery_logger_server.py' 34 35 36def load_config(config_file_path): 37 with open(config_file_path, 'r') as f: 38 config = yaml.load(f) 39 return config 40 41 42class BigQueryLoggerUtilsError(Exception): 43 """Exception class for bigquery logger utils module""" 44 45 46################################# 47# Data transformation and preparation methods 48################################# 49 50 51def make_storeable(value): 52 """Casts non primitive data types to string. 53 54 Certain data types such as list can cause unexpected behavior with BigQuery. 55 56 Arguments: 57 value: an object to store in a BigQuery table. 58 Returns: 59 value or str(value): string version of passed value, if necessary. 60 """ 61 if (isinstance(value, int) or isinstance(value, float) 62 or isinstance(value, str) or isinstance(value, bool)): 63 return value 64 elif isinstance(value, datetime): 65 return value.strftime(_TIMESTAMP_STR_FORMAT) 66 return str(value) 67 68 69def get_field_name(dirty_string): 70 """Converts field name to a BigQuery acceptable field name. 71 72 Arguments: 73 dirty_string: the string to convert to a standardized field name. 74 Returns: 75 field_name: the field name as a string. 76 """ 77 valid_chars = '_ %s%s' % (string.ascii_letters, string.digits) 78 field_name = ''.join(c for c in dirty_string.upper() if c in valid_chars) 79 field_name = field_name.strip().replace(' ', '_') 80 if not field_name: 81 field_name = 'FIELD' 82 elif field_name[0] not in string.ascii_letters + '_': 83 field_name = 'FIELD_' + field_name 84 return field_name 85 86 87def get_bigquery_type(value): 88 """Returns BigQuery recognizable datatype string from value. 89 90 Arguments: 91 value: the item you want to store in BigQuery 92 Returns: 93 field_type: the BigQuery data type for the field to store your value. 94 """ 95 # Dict for converting Python types to BigQuery recognizable schema fields 96 field_name = { 97 'STR': 'STRING', 98 'INT': 'INTEGER', 99 'FLOAT': 'FLOAT', 100 'BOOL': 'BOOLEAN' 101 } 102 103 # Default field type is STRING 104 field_type = 'STRING' 105 if isinstance(value, str): 106 try: 107 # Try to infer whether datatype is a timestamp by converting it to 108 # a timestamp object using the string format 109 time.strptime(value, _TIMESTAMP_STR_FORMAT) 110 field_type = 'TIMESTAMP' 111 except ValueError: 112 pass 113 else: 114 type_string = type(value).__name__ 115 try: 116 field_type = field_name[type_string.upper()] 117 except KeyError: 118 logging.error('Datatype %s not recognized. Reverting to STRING.', 119 type_string) 120 return field_type 121 122 123def add_row(dictionary, row_list_to_update): 124 # Convert dictionary key names to BigQuery field names 125 to_add = { 126 get_field_name(key): make_storeable(value) 127 for key, value in dictionary.items() 128 } 129 130 row_list_to_update.append(to_add) 131 132 133def change_field_name(old_name, new_name, row_list_to_update): 134 """Changes field name in row_list_to_update in place. 135 136 Arguments: 137 old_name: the old field name, to be replaced. 138 new_name: the new name to replace the old one. 139 row_list_to_update: the list of row dictionaries to update the field name for 140 Returns: 141 num_replacements: how many rows were affected by this change. 142 """ 143 old_name = get_field_name(old_name) 144 new_name = get_field_name(new_name) 145 num_replacements = 0 146 for row in row_list_to_update: 147 if old_name in row.keys(): 148 # Update all items in the rows with the new field name 149 row[new_name] = row[old_name] 150 del row[old_name] 151 num_replacements += 1 152 return num_replacements 153 154 155def get_tuple_from_schema(schema): 156 """Returns a tuple of all field names in the passed schema""" 157 return tuple(field.name for field in schema) 158 159 160def get_dict_from_schema(schema): 161 """Turns a BigQuery schema array into a more flexible dictionary. 162 163 Arguments: 164 schema: the schema array to be converted. 165 Returns: 166 dictionary: a dictionary from the schema. Maps field names to field types. 167 """ 168 dictionary = { 169 schema_field.name: schema_field.field_type 170 for schema_field in schema 171 } 172 return dictionary 173 174 175def reconcile_schema_differences(schema_to_change_dict, 176 schema_to_preserve_dict): 177 """Returns a schema dict combining two schema dicts. 178 179 If there are conflicts between the schemas, for example if they share a 180 field name but those field names don't share the same type value, that field 181 name in one of the schema dicts will have to change to be added to the 182 combined schema. 183 Arguments: 184 schema_to_change_dict: a dict representing the schema that will be changed 185 if a conflict arises. 186 schema_to_preserve_dict: a dict representing the schema whose fields will 187 remain unchanged. 188 Returns: 189 new_schema_dict: a dict representing the combined schemas 190 changed_fields: a dict mapping old field names to their new field names, 191 if they were changed, in schema_to_change_dict. 192 """ 193 new_schema_dict = schema_to_preserve_dict.copy() 194 changed_fields = {} 195 for field_name, field_type in schema_to_change_dict.items(): 196 if field_name in schema_to_preserve_dict.keys(): 197 198 # Field name already exists in remote table, but it might not accept the 199 # same value type the user is passing this time around 200 if schema_to_preserve_dict[field_name] == field_type: 201 202 # Same data type for fields, no need to do anything 203 continue 204 else: 205 206 # We need to create a new field with a unique name to store this 207 # different data type. Automatically makes new name: 208 # FIELD_NAME_FIELD_TYPE, ex. 'RESULT_BOOLEAN' 209 new_field_name = '%s_%s' % (field_name, field_type) 210 211 # On the off chance that this new field name is also already taken, we 212 # start appending numbers to it to make it unique. This should be an 213 # extreme edge case, hence the inelegance. 214 count = 1 215 merged_schemas = schema_to_preserve_dict.copy() 216 merged_schemas.update(schema_to_change_dict) 217 if new_field_name in merged_schemas.keys( 218 ) and merged_schemas[new_field_name] != field_type: 219 new_field_name += str(count) 220 while new_field_name in merged_schemas.keys( 221 ) and merged_schemas[new_field_name] != field_type: 222 count += 1 223 new_field_name = new_field_name[:-1] + str(count) 224 225 # Update the actual rows in our logger as well as self.schema_dict to 226 # reflect the new field name. 227 changed_fields[field_name] = new_field_name 228 229 new_schema_dict[new_field_name] = field_type 230 231 else: 232 new_schema_dict[field_name] = field_type 233 234 return new_schema_dict, changed_fields 235 236 237################################# 238# BigQuery request data preparation methods 239################################# 240 241 242def get_schema_from_dict(dictionary): 243 """Turns dictionary into a schema formatted for BigQuery requests. 244 245 Arguments: 246 dictionary: the dictionary to convert into a schema array. 247 Returns: 248 schema: an array of SchemaField objects specifying name and type, listed alphabetically. 249 """ 250 schema = [] 251 for key in sorted(dictionary): 252 schema.append( 253 bigquery.SchemaField(key, dictionary[key], mode='nullable')) 254 return schema 255 256 257def get_schema_from_rows_list(rows_list): 258 """Deduces the BigQuery table schema represented by a list of row dictionaries. 259 260 Arguments: 261 rows_list: the list of row dictionaries to create a schema from. 262 Returns: 263 schema: a formatted BigQuery table schema with the fields in alphabetical order.""" 264 schema = {} 265 for row in rows_list: 266 # Create new field names and corresponding types in self.schema_dict in case 267 # the schema of the remote table needs to be updated. 268 for key, value in row.items(): 269 value_type = get_bigquery_type(value) 270 if key in schema.keys(): 271 # We have another row with the same field name. Most of the time their 272 # types should match and we can just skip adding it to the fields to 273 # update 274 275 if value_type != schema[key]: 276 # Their types don't match. Merge the fields and change the type to 277 # string 278 schema[key] = 'STRING' 279 280 row[key] = str(row[key]) 281 else: 282 schema[key] = value_type 283 284 return get_schema_from_dict(schema) 285 286 287def get_formatted_rows(rows_list, schema): 288 """Returns an InsertEntry object for adding to BQ insert request. 289 290 Arguments: 291 rows_list: a list of row dictionaries to turn into tuples of values corresponding to the schema fields. 292 schema: a tuple representing the column names in the table. 293 Returns: 294 rows: an array of tuples with the elements ordered corresponding to the order of the column names in schema. 295 """ 296 rows = [] 297 schema_tuple = get_tuple_from_schema(schema) 298 for row in rows_list: 299 row_tuple = tuple( 300 row[key] if key in row.keys() else None for key in schema_tuple) 301 rows.append(row_tuple) 302 return rows 303 304 305################################# 306# BigQuery client class 307################################# 308 309 310class BigqueryLoggerClient: 311 """Client class for interacting with and preparing data for BigQuery""" 312 313 def __init__(self, project_id, google_application_credentials_path): 314 os.environ[ 315 'GOOGLE_APPLICATION_CREDENTIALS'] = google_application_credentials_path 316 self.client = bigquery.Client(project_id) 317 318 ################################# 319 # BigQuery request methods 320 ################################# 321 322 def create_dataset(self, dataset_id): 323 """Creates a new dataset if it doesn't exist. 324 325 Arguments: 326 dataset_id: the name of the dataset you want to create. 327 Returns: 328 dataset: the resulting dataset object. 329 """ 330 dataset_ref = self.client.dataset(dataset_id) 331 dataset = bigquery.Dataset(dataset_ref) 332 try: 333 dataset = self.client.get_dataset(dataset_ref) 334 except Exception as err: 335 self.client.create_dataset(dataset) 336 return dataset 337 338 def create_table(self, dataset_id, table_id, schema): 339 """Creates a new table if it doesn't exist. 340 341 Arguments: 342 dataset_id: the name of the dataset that will contain the table you want 343 to create. 344 table_id: the name of the table you want to create. 345 schema: a schema array for the table to be created. 346 Returns: 347 table: the resulting table object 348 """ 349 dataset = self.create_dataset(dataset_id) 350 table_ref = dataset.table(table_id) 351 table = bigquery.Table(table_ref, schema=schema) 352 try: 353 table = self.client.get_table(table_ref) 354 except NotFound: 355 self.client.create_table(table) 356 return table 357 358 def update_table_schema(self, dataset_id, table_id, new_schema): 359 """Updates the schema for the given remote table. 360 361 Uses fields specified in self.schema_dict. This method will never remove 362 fields, to avoid loss of data. 363 364 Arguments: 365 dataset_id: the dataset containing the table to modify. 366 table_id: the table to modify. 367 new_schema: a new schema to update the remote table's schema with. 368 Returns: 369 table: the updated table object. 370 changed_fields: a dictionary mapping any changed field names to their new name strings. 371 """ 372 table = self.create_table(dataset_id, table_id, new_schema) 373 remote_schema = table.schema 374 remote_schema_dict = get_dict_from_schema(remote_schema) 375 new_schema_dict = get_dict_from_schema(new_schema) 376 377 updated_schema_dict, changed_fields = reconcile_schema_differences( 378 new_schema_dict, remote_schema_dict) 379 380 if updated_schema_dict.items() != remote_schema_dict.items(): 381 table.schema = get_schema_from_dict(updated_schema_dict) 382 table = self.client.update_table( 383 table=table, properties=['schema']) 384 385 return table, changed_fields 386 387 def delete(self, dataset_id, table_id=None): 388 """Deletes specified table in specified dataset. 389 390 Arguments: 391 dataset_id: the name of the dataset to be deleted or the dataset that 392 contains the table to be deleted. 393 table_id: the name of the table to be deleted. 394 """ 395 dataset_ref = self.client.dataset(dataset_id) 396 dataset = bigquery.Dataset(dataset_ref) 397 try: 398 if table_id: 399 table_ref = dataset.table(table_id) 400 table = bigquery.Table(table_ref) 401 self.client.delete_table(table) 402 else: 403 self.client.delete_dataset(dataset) 404 except NotFound: 405 pass 406 407 def flush(self, rows_list, dataset_id, table_id, retries=5): 408 """Inserts key value store of data into the specified table. 409 410 Arguments: 411 rows_list: a list of row dictionaries to send to BigQuery 412 dataset_id: dataset name to store table in. 413 table_id: table name to store info in. 414 retries: how many times to retry insert upon failure 415 Returns: 416 erros: any errors resulting from the insert operation. 417 Raises: 418 DataNotStoredError: if data is not stored because of insertErrors in 419 query response or timeout. 420 """ 421 correctly_formatted_rows_list = [] 422 423 for row in rows_list: 424 add_row(row, correctly_formatted_rows_list) 425 426 local_schema = get_schema_from_rows_list(correctly_formatted_rows_list) 427 table, changed_fields = self.update_table_schema( 428 dataset_id, table_id, local_schema) 429 430 if changed_fields: 431 print('Changed Fields: ' + str(changed_fields)) 432 for old_name, new_name in changed_fields.items(): 433 change_field_name(old_name, new_name, 434 correctly_formatted_rows_list) 435 436 schema = table.schema 437 438 values = get_formatted_rows(correctly_formatted_rows_list, schema) 439 errors = self.client.create_rows(table, values) 440 if errors: 441 for retry in range(retries): 442 print('Retry ' + str(retry + 1)) 443 time.sleep(30) 444 errors = self.client.create_rows(table, values) 445 if not errors: 446 break 447 448 if errors: 449 print(errors) 450 return errors 451 452 453#################### 454# Subprocess and helper methods to help with automated logger 455#################### 456 457 458def start_queue_server(queue_size, ip_address, port, authkey): 459 """Starts a subprocess bigquery_logger_server.py. 460 Subprocess creates a server to handle the shared job queue. 461 462 Arguments: 463 queue_size: maximum number of items this queue can hold 464 ip_address: ip address of the machine on which to start queue management server 465 port: port on which to reach queue management server 466 authkey: password to be used by clients trying to access server 467 Returns: 468 process: the result of Popen on the subprocess. 469 """ 470 471 # If ip_address is empty string (signifying local machine) we need to have '' in the command so it is counted 472 # as an actual argument to bigquery_logger_server 473 ip_address = ip_address or '\'\'' 474 command = ' '.join([ 475 _SERVER_SCRIPT, 476 str(queue_size), 477 str(ip_address), 478 str(port), 479 str(authkey) 480 ]) 481 # Create error log file for user to check 482 error_log_name = os.path.join( 483 os.path.dirname(__file__), 'queue_server_err.log') 484 error_log = open(error_log_name, 'w+') 485 process = subprocess.Popen( 486 command, 487 shell=True, 488 stderr=error_log, 489 stdin=subprocess.PIPE, 490 stdout=subprocess.PIPE) 491 return process 492 493 494def start_scheduled_automatic_logger(ip_address, port, authkey, project_id, 495 credentials_path): 496 """Starts a subprocess bigquery_scheduled_automatic_logger. 497 Subprocess accesses the queue managed by the server at ip_address:port 498 and periodically sends items in queue to the BigQuery project identified by project_id. 499 500 Arguments: 501 ip_address: ip_address of the machine on which the server managing the shared queue to pull from is located 502 port: port on which the server managing the shared queue to pull from can be reached 503 authkey: password needed to access server 504 project_id: name of BigQuery project to send data to 505 credentials_path: path to directory where Google Service Account credentials for this BigQuery 506 project are stored 507 Returns: 508 process: the result of Popen on the subprocess. 509 """ 510 511 # If ip_address is empty string (signifying local machine) we need to have '' in the command so it is counted 512 # as an actual argument to bigquery_scheduled_automatic_logger 513 ip_address = ip_address or '\'\'' 514 print('starting scheduled automatic logger...') 515 command = ' '.join([ 516 _AUTOMATIC_LOGGER_SCRIPT, 517 str(ip_address), 518 str(port), 519 str(authkey), 520 str(project_id), 521 str(credentials_path) 522 ]) 523 # Create error log file for user to check 524 error_log_name = os.path.join( 525 os.path.dirname(__file__), 'scheduled_automatic_logger_err.log') 526 error_log = open(error_log_name, 'w+') 527 process = subprocess.Popen( 528 command, 529 shell=True, 530 stderr=error_log, 531 stdin=subprocess.PIPE, 532 stdout=subprocess.PIPE) 533 return process 534 535 536def get_queue(ip_address, port, authkey): 537 """Returns a proxy object for shared queue. 538 Shared queue is created and managed in start_server(). 539 540 Arguments: 541 ip_address: ip_address of the machine on which the server managing the shared queue to proxy is located 542 port: port on which the server managing the shared queue to proxy can be reached 543 authkey: password needed to access server 544 Returns: 545 queue: the BigqueryLoggerQueue object that organizers and holds all BigQuery 546 inserts sent to server.""" 547 BaseManager.register('get_queue') 548 m = BaseManager(address=(ip_address, int(port)), authkey=authkey) 549 try: 550 m.connect() 551 return m.get_queue() 552 except socket.error: 553 raise BigQueryLoggerUtilsError('Cannot connect to data storage queue.') 554 555 556def get_current_scheduled_automatic_logger(): 557 """Returns process id and args of running scheduled automatic logger""" 558 559 processes = get_processes(_AUTOMATIC_LOGGER_SCRIPT) 560 561 pid = 0 562 args = {} 563 if processes: 564 process = processes[0] 565 pid = process[0] 566 process_argspec = inspect.getargspec(start_scheduled_automatic_logger) 567 process_arg_names = process_argspec.args 568 process_argv = process[-1 * len(process_arg_names):] 569 args = dict(zip(process_arg_names, process_argv)) 570 571 return pid, args 572 573 574def get_current_logger_server(): 575 """Returns process id and args of running logger servers""" 576 577 processes = get_processes(_SERVER_SCRIPT) 578 579 pid = 0 580 args = {} 581 if processes: 582 process = processes[0] 583 pid = process[0] 584 process_argspec = inspect.getargspec(start_queue_server) 585 process_arg_names = process_argspec.args 586 process_argv = process[-1 * len(process_arg_names):] 587 args = dict(zip(process_arg_names, process_argv)) 588 589 return pid, args 590 591 592def get_current_queue_and_server_pid(): 593 """Kills the current running queue server process. 594 595 Returns: 596 queue: the queue that the server used to serve. 597 """ 598 599 pid, args = get_current_logger_server() 600 get_queue_args = inspect.getargspec(get_queue).args 601 if pid: 602 try: 603 kwargs = {arg_name: args[arg_name] for arg_name in get_queue_args} 604 except KeyError: 605 raise BigQueryLoggerUtilsError( 606 'Param names in get_queue %s must be subset of param names for start_queue_server %s' 607 % (get_queue_args, args.keys())) 608 else: 609 # Retrieve reference to current 610 queue = get_queue(**kwargs) 611 return pid, queue 612 613 614def kill_current_scheduled_automatic_logger(): 615 pid, _ = get_current_scheduled_automatic_logger() 616 if pid: 617 kill_pid(pid) 618 619 620def get_scheduled_automatic_logger_pid(ip_address, port, authkey, project_id, 621 credentials_path): 622 """Returns the process id of a bigquery_scheduled_automatic_logger instance for a given set of configs. 623 624 Arguments: 625 ip_address: ip_address of the machine on which the server managing the shared queue to pull from is located 626 port: port on which the server managing the shared queue to pull from can be reached 627 authkey: password needed to access server 628 project_id: name of BigQuery project to send data to 629 credentials_path: path to directory where Google Service Account credentials for this BigQuery 630 project are stored 631 Returns: 632 pid: process id of process if found. Else 0 633 """ 634 635 pids = get_pids(_AUTOMATIC_LOGGER_SCRIPT, ip_address, port, authkey, 636 project_id, os.path.expanduser(credentials_path)) 637 638 pid = 0 639 if pids: 640 pid = pids[0] 641 return pid 642 643 644def get_logger_server_pid(queue_size, ip_address, port, authkey): 645 """Returns the process id of a bigquery_logger_service instance for a given set of configs. 646 647 Arguments: 648 queue_size: the size of the shared data queue 649 ip_address: ip_address of the machine on which the server managing the shared queue to pull from is located 650 port: port on which the server managing the shared queue to pull from can be reached 651 authkey: password needed to access server 652 Returns: 653 pid: process id of process if found. Else 0 654 """ 655 656 pids = get_pids(_SERVER_SCRIPT, queue_size, ip_address, port, authkey) 657 pid = 0 658 if pids: 659 pid = pids[0] 660 return pid 661 662 663def get_pids(*argv): 664 """Gets process ids based on arguments to concatenate and grep 665 666 Arguments: 667 *argv: any number of arguments to be joined and grepped 668 Returns: 669 pids: process ids of process if found. 670 """ 671 processes = get_processes(*argv) 672 pids = [process[0] for process in processes] 673 674 return pids 675 676 677def get_processes(*argv): 678 """Returns process grepped by a set of arguments. 679 680 Arguments: 681 *argv: any number of arguments to be joined and grepped 682 Returns: 683 processes: processes returned by grep, as a list of lists. 684 """ 685 expression = ' '.join([str(arg) for arg in argv]) 686 processes = [] 687 try: 688 results = subprocess.check_output( 689 'pgrep -af \"%s\"' % expression, shell=True) 690 for result in results.split('\n'): 691 items = result.split(' ') 692 if 'pgrep' not in items: 693 processes.append(items) 694 except subprocess.CalledProcessError: 695 pass 696 697 return processes 698 699 700def kill_pid(pid): 701 """To only be used on _SERVER_SCRIPT or _AUTOMATIC_LOGGER_SCRIPT""" 702 703 result = subprocess.check_output('kill -9 %s' % str(pid), shell=True) 704 return result 705