• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3# Copyright (C) 2018 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16
17import logging
18from datetime import datetime
19import inspect
20import os
21import socket
22import string
23import subprocess
24import time
25import yaml
26from multiprocessing.managers import BaseManager
27
28from google.api_core.exceptions import NotFound
29from google.cloud import bigquery
30
31_TIMESTAMP_STR_FORMAT = '%Y-%m-%d %H:%M:%S'
32_AUTOMATIC_LOGGER_SCRIPT = 'bigquery_scheduled_automatic_client.py'
33_SERVER_SCRIPT = 'bigquery_logger_server.py'
34
35
36def load_config(config_file_path):
37    with open(config_file_path, 'r') as f:
38        config = yaml.load(f)
39    return config
40
41
42class BigQueryLoggerUtilsError(Exception):
43    """Exception class for bigquery logger utils module"""
44
45
46#################################
47# Data transformation and preparation methods
48#################################
49
50
51def make_storeable(value):
52    """Casts non primitive data types to string.
53
54  Certain data types such as list can cause unexpected behavior with BigQuery.
55
56  Arguments:
57    value: an object to store in a BigQuery table.
58  Returns:
59    value or str(value): string version of passed value, if necessary.
60  """
61    if (isinstance(value, int) or isinstance(value, float)
62            or isinstance(value, str) or isinstance(value, bool)):
63        return value
64    elif isinstance(value, datetime):
65        return value.strftime(_TIMESTAMP_STR_FORMAT)
66    return str(value)
67
68
69def get_field_name(dirty_string):
70    """Converts field name to a BigQuery acceptable field name.
71
72  Arguments:
73    dirty_string: the string to convert to a standardized field name.
74  Returns:
75    field_name: the field name as a string.
76  """
77    valid_chars = '_ %s%s' % (string.ascii_letters, string.digits)
78    field_name = ''.join(c for c in dirty_string.upper() if c in valid_chars)
79    field_name = field_name.strip().replace(' ', '_')
80    if not field_name:
81        field_name = 'FIELD'
82    elif field_name[0] not in string.ascii_letters + '_':
83        field_name = 'FIELD_' + field_name
84    return field_name
85
86
87def get_bigquery_type(value):
88    """Returns BigQuery recognizable datatype string from value.
89
90  Arguments:
91    value: the item you want to store in BigQuery
92  Returns:
93    field_type: the BigQuery data type for the field to store your value.
94  """
95    # Dict for converting Python types to BigQuery recognizable schema fields
96    field_name = {
97        'STR': 'STRING',
98        'INT': 'INTEGER',
99        'FLOAT': 'FLOAT',
100        'BOOL': 'BOOLEAN'
101    }
102
103    # Default field type is STRING
104    field_type = 'STRING'
105    if isinstance(value, str):
106        try:
107            # Try to infer whether datatype is a timestamp by converting it to
108            # a timestamp object using the string format
109            time.strptime(value, _TIMESTAMP_STR_FORMAT)
110            field_type = 'TIMESTAMP'
111        except ValueError:
112            pass
113    else:
114        type_string = type(value).__name__
115        try:
116            field_type = field_name[type_string.upper()]
117        except KeyError:
118            logging.error('Datatype %s not recognized. Reverting to STRING.',
119                          type_string)
120    return field_type
121
122
123def add_row(dictionary, row_list_to_update):
124    # Convert dictionary key names to BigQuery field names
125    to_add = {
126        get_field_name(key): make_storeable(value)
127        for key, value in dictionary.items()
128    }
129
130    row_list_to_update.append(to_add)
131
132
133def change_field_name(old_name, new_name, row_list_to_update):
134    """Changes field name in row_list_to_update in place.
135
136  Arguments:
137    old_name: the old field name, to be replaced.
138    new_name: the new name to replace the old one.
139    row_list_to_update: the list of row dictionaries to update the field name for
140  Returns:
141    num_replacements: how many rows were affected by this change.
142  """
143    old_name = get_field_name(old_name)
144    new_name = get_field_name(new_name)
145    num_replacements = 0
146    for row in row_list_to_update:
147        if old_name in row.keys():
148            # Update all items in the rows with the new field name
149            row[new_name] = row[old_name]
150            del row[old_name]
151            num_replacements += 1
152    return num_replacements
153
154
155def get_tuple_from_schema(schema):
156    """Returns a tuple of all field names in the passed schema"""
157    return tuple(field.name for field in schema)
158
159
160def get_dict_from_schema(schema):
161    """Turns a BigQuery schema array into a more flexible dictionary.
162
163  Arguments:
164    schema: the schema array to be converted.
165  Returns:
166    dictionary: a dictionary from the schema. Maps field names to field types.
167  """
168    dictionary = {
169        schema_field.name: schema_field.field_type
170        for schema_field in schema
171    }
172    return dictionary
173
174
175def reconcile_schema_differences(schema_to_change_dict,
176                                 schema_to_preserve_dict):
177    """Returns a schema dict combining two schema dicts.
178
179  If there are conflicts between the schemas, for example if they share a
180  field name but those field names don't share the same type value, that field
181  name in one of the schema dicts will have to change to be added to the
182  combined schema.
183  Arguments:
184    schema_to_change_dict: a dict representing the schema that will be changed
185    if a conflict arises.
186    schema_to_preserve_dict: a dict representing the schema whose fields will
187    remain unchanged.
188  Returns:
189    new_schema_dict: a dict representing the combined schemas
190    changed_fields: a dict mapping old field names to their new field names,
191    if they were changed, in schema_to_change_dict.
192  """
193    new_schema_dict = schema_to_preserve_dict.copy()
194    changed_fields = {}
195    for field_name, field_type in schema_to_change_dict.items():
196        if field_name in schema_to_preserve_dict.keys():
197
198            # Field name already exists in remote table, but it might not accept the
199            # same value type the user is passing this time around
200            if schema_to_preserve_dict[field_name] == field_type:
201
202                # Same data type for fields, no need to do anything
203                continue
204            else:
205
206                # We need to create a new field with a unique name to store this
207                # different data type. Automatically makes new name:
208                # FIELD_NAME_FIELD_TYPE, ex. 'RESULT_BOOLEAN'
209                new_field_name = '%s_%s' % (field_name, field_type)
210
211                # On the off chance that this new field name is also already taken, we
212                # start appending numbers to it to make it unique. This should be an
213                # extreme edge case, hence the inelegance.
214                count = 1
215                merged_schemas = schema_to_preserve_dict.copy()
216                merged_schemas.update(schema_to_change_dict)
217                if new_field_name in merged_schemas.keys(
218                ) and merged_schemas[new_field_name] != field_type:
219                    new_field_name += str(count)
220                while new_field_name in merged_schemas.keys(
221                ) and merged_schemas[new_field_name] != field_type:
222                    count += 1
223                    new_field_name = new_field_name[:-1] + str(count)
224
225                # Update the actual rows in our logger as well as self.schema_dict to
226                # reflect the new field name.
227                changed_fields[field_name] = new_field_name
228
229                new_schema_dict[new_field_name] = field_type
230
231        else:
232            new_schema_dict[field_name] = field_type
233
234    return new_schema_dict, changed_fields
235
236
237#################################
238# BigQuery request data preparation methods
239#################################
240
241
242def get_schema_from_dict(dictionary):
243    """Turns dictionary into a schema formatted for BigQuery requests.
244
245  Arguments:
246    dictionary: the dictionary to convert into a schema array.
247  Returns:
248    schema: an array of SchemaField objects specifying name and type, listed alphabetically.
249  """
250    schema = []
251    for key in sorted(dictionary):
252        schema.append(
253            bigquery.SchemaField(key, dictionary[key], mode='nullable'))
254    return schema
255
256
257def get_schema_from_rows_list(rows_list):
258    """Deduces the BigQuery table schema represented by a list of row dictionaries.
259
260  Arguments:
261    rows_list: the list of row dictionaries to create a schema from.
262  Returns:
263    schema: a formatted BigQuery table schema with the fields in alphabetical order."""
264    schema = {}
265    for row in rows_list:
266        # Create new field names and corresponding types in self.schema_dict in case
267        # the schema of the remote table needs to be updated.
268        for key, value in row.items():
269            value_type = get_bigquery_type(value)
270            if key in schema.keys():
271                # We have another row with the same field name. Most of the time their
272                # types should match and we can just skip adding it to the fields to
273                # update
274
275                if value_type != schema[key]:
276                    # Their types don't match. Merge the fields and change the type to
277                    # string
278                    schema[key] = 'STRING'
279
280                    row[key] = str(row[key])
281            else:
282                schema[key] = value_type
283
284    return get_schema_from_dict(schema)
285
286
287def get_formatted_rows(rows_list, schema):
288    """Returns an InsertEntry object for adding to BQ insert request.
289
290  Arguments:
291    rows_list: a list of row dictionaries to turn into tuples of values corresponding to the schema fields.
292    schema: a tuple representing the column names in the table.
293  Returns:
294    rows: an array of tuples with the elements ordered corresponding to the order of the column names in schema.
295  """
296    rows = []
297    schema_tuple = get_tuple_from_schema(schema)
298    for row in rows_list:
299        row_tuple = tuple(
300            row[key] if key in row.keys() else None for key in schema_tuple)
301        rows.append(row_tuple)
302    return rows
303
304
305#################################
306#  BigQuery client class
307#################################
308
309
310class BigqueryLoggerClient:
311    """Client class for interacting with and preparing data for BigQuery"""
312
313    def __init__(self, project_id, google_application_credentials_path):
314        os.environ[
315            'GOOGLE_APPLICATION_CREDENTIALS'] = google_application_credentials_path
316        self.client = bigquery.Client(project_id)
317
318    #################################
319    # BigQuery request methods
320    #################################
321
322    def create_dataset(self, dataset_id):
323        """Creates a new dataset if it doesn't exist.
324
325    Arguments:
326      dataset_id: the name of the dataset you want to create.
327    Returns:
328      dataset: the resulting dataset object.
329    """
330        dataset_ref = self.client.dataset(dataset_id)
331        dataset = bigquery.Dataset(dataset_ref)
332        try:
333            dataset = self.client.get_dataset(dataset_ref)
334        except Exception as err:
335            self.client.create_dataset(dataset)
336        return dataset
337
338    def create_table(self, dataset_id, table_id, schema):
339        """Creates a new table if it doesn't exist.
340
341    Arguments:
342      dataset_id: the name of the dataset that will contain the table you want
343      to create.
344      table_id: the name of the table you want to create.
345      schema: a schema array for the table to be created.
346    Returns:
347      table: the resulting table object
348    """
349        dataset = self.create_dataset(dataset_id)
350        table_ref = dataset.table(table_id)
351        table = bigquery.Table(table_ref, schema=schema)
352        try:
353            table = self.client.get_table(table_ref)
354        except NotFound:
355            self.client.create_table(table)
356        return table
357
358    def update_table_schema(self, dataset_id, table_id, new_schema):
359        """Updates the schema for the given remote table.
360
361    Uses fields specified in self.schema_dict. This method will never remove
362    fields, to avoid loss of data.
363
364    Arguments:
365      dataset_id: the dataset containing the table to modify.
366      table_id: the table to modify.
367      new_schema: a new schema to update the remote table's schema with.
368    Returns:
369      table: the updated table object.
370      changed_fields: a dictionary mapping any changed field names to their new name strings.
371    """
372        table = self.create_table(dataset_id, table_id, new_schema)
373        remote_schema = table.schema
374        remote_schema_dict = get_dict_from_schema(remote_schema)
375        new_schema_dict = get_dict_from_schema(new_schema)
376
377        updated_schema_dict, changed_fields = reconcile_schema_differences(
378            new_schema_dict, remote_schema_dict)
379
380        if updated_schema_dict.items() != remote_schema_dict.items():
381            table.schema = get_schema_from_dict(updated_schema_dict)
382            table = self.client.update_table(
383                table=table, properties=['schema'])
384
385        return table, changed_fields
386
387    def delete(self, dataset_id, table_id=None):
388        """Deletes specified table in specified dataset.
389
390    Arguments:
391      dataset_id: the name of the dataset to be deleted or the dataset that
392      contains the table to be deleted.
393      table_id: the name of the table to be deleted.
394    """
395        dataset_ref = self.client.dataset(dataset_id)
396        dataset = bigquery.Dataset(dataset_ref)
397        try:
398            if table_id:
399                table_ref = dataset.table(table_id)
400                table = bigquery.Table(table_ref)
401                self.client.delete_table(table)
402            else:
403                self.client.delete_dataset(dataset)
404        except NotFound:
405            pass
406
407    def flush(self, rows_list, dataset_id, table_id, retries=5):
408        """Inserts key value store of data into the specified table.
409
410    Arguments:
411      rows_list: a list of row dictionaries to send to BigQuery
412      dataset_id: dataset name to store table in.
413      table_id: table name to store info in.
414      retries: how many times to retry insert upon failure
415    Returns:
416      erros: any errors resulting from the insert operation.
417    Raises:
418      DataNotStoredError: if data is not stored because of insertErrors in
419      query response or timeout.
420    """
421        correctly_formatted_rows_list = []
422
423        for row in rows_list:
424            add_row(row, correctly_formatted_rows_list)
425
426        local_schema = get_schema_from_rows_list(correctly_formatted_rows_list)
427        table, changed_fields = self.update_table_schema(
428            dataset_id, table_id, local_schema)
429
430        if changed_fields:
431            print('Changed Fields: ' + str(changed_fields))
432            for old_name, new_name in changed_fields.items():
433                change_field_name(old_name, new_name,
434                                  correctly_formatted_rows_list)
435
436        schema = table.schema
437
438        values = get_formatted_rows(correctly_formatted_rows_list, schema)
439        errors = self.client.create_rows(table, values)
440        if errors:
441            for retry in range(retries):
442                print('Retry ' + str(retry + 1))
443                time.sleep(30)
444                errors = self.client.create_rows(table, values)
445                if not errors:
446                    break
447
448        if errors:
449            print(errors)
450        return errors
451
452
453####################
454# Subprocess and helper methods to help with automated logger
455####################
456
457
458def start_queue_server(queue_size, ip_address, port, authkey):
459    """Starts a subprocess bigquery_logger_server.py.
460  Subprocess creates a server to handle the shared job queue.
461
462  Arguments:
463    queue_size: maximum number of items this queue can hold
464    ip_address: ip address of the machine on which to start queue management server
465    port: port on which to reach queue management server
466    authkey: password to be used by clients trying to access server
467  Returns:
468    process: the result of Popen on the subprocess.
469  """
470
471    # If ip_address is empty string (signifying local machine) we need to have '' in the command so it is counted
472    # as an actual argument to bigquery_logger_server
473    ip_address = ip_address or '\'\''
474    command = ' '.join([
475        _SERVER_SCRIPT,
476        str(queue_size),
477        str(ip_address),
478        str(port),
479        str(authkey)
480    ])
481    # Create error log file for user to check
482    error_log_name = os.path.join(
483        os.path.dirname(__file__), 'queue_server_err.log')
484    error_log = open(error_log_name, 'w+')
485    process = subprocess.Popen(
486        command,
487        shell=True,
488        stderr=error_log,
489        stdin=subprocess.PIPE,
490        stdout=subprocess.PIPE)
491    return process
492
493
494def start_scheduled_automatic_logger(ip_address, port, authkey, project_id,
495                                     credentials_path):
496    """Starts a subprocess bigquery_scheduled_automatic_logger.
497  Subprocess accesses the queue managed by the server at ip_address:port
498  and periodically sends items in queue to the BigQuery project identified by project_id.
499
500  Arguments:
501    ip_address: ip_address of the machine on which the server managing the shared queue to pull from is located
502    port: port on which the server managing the shared queue to pull from can be reached
503    authkey: password needed to access server
504    project_id: name of BigQuery project to send data to
505    credentials_path: path to directory where Google Service Account credentials for this BigQuery
506      project are stored
507  Returns:
508    process: the result of Popen on the subprocess.
509  """
510
511    # If ip_address is empty string (signifying local machine) we need to have '' in the command so it is counted
512    # as an actual argument to bigquery_scheduled_automatic_logger
513    ip_address = ip_address or '\'\''
514    print('starting scheduled automatic logger...')
515    command = ' '.join([
516        _AUTOMATIC_LOGGER_SCRIPT,
517        str(ip_address),
518        str(port),
519        str(authkey),
520        str(project_id),
521        str(credentials_path)
522    ])
523    # Create error log file for user to check
524    error_log_name = os.path.join(
525        os.path.dirname(__file__), 'scheduled_automatic_logger_err.log')
526    error_log = open(error_log_name, 'w+')
527    process = subprocess.Popen(
528        command,
529        shell=True,
530        stderr=error_log,
531        stdin=subprocess.PIPE,
532        stdout=subprocess.PIPE)
533    return process
534
535
536def get_queue(ip_address, port, authkey):
537    """Returns a proxy object for shared queue.
538  Shared queue is created and managed in start_server().
539
540  Arguments:
541    ip_address: ip_address of the machine on which the server managing the shared queue to proxy is located
542    port: port on which the server managing the shared queue to proxy can be reached
543    authkey: password needed to access server
544  Returns:
545    queue: the BigqueryLoggerQueue object that organizers and holds all BigQuery
546      inserts sent to server."""
547    BaseManager.register('get_queue')
548    m = BaseManager(address=(ip_address, int(port)), authkey=authkey)
549    try:
550        m.connect()
551        return m.get_queue()
552    except socket.error:
553        raise BigQueryLoggerUtilsError('Cannot connect to data storage queue.')
554
555
556def get_current_scheduled_automatic_logger():
557    """Returns process id and args of running scheduled automatic logger"""
558
559    processes = get_processes(_AUTOMATIC_LOGGER_SCRIPT)
560
561    pid = 0
562    args = {}
563    if processes:
564        process = processes[0]
565        pid = process[0]
566        process_argspec = inspect.getargspec(start_scheduled_automatic_logger)
567        process_arg_names = process_argspec.args
568        process_argv = process[-1 * len(process_arg_names):]
569        args = dict(zip(process_arg_names, process_argv))
570
571    return pid, args
572
573
574def get_current_logger_server():
575    """Returns process id and args of running logger servers"""
576
577    processes = get_processes(_SERVER_SCRIPT)
578
579    pid = 0
580    args = {}
581    if processes:
582        process = processes[0]
583        pid = process[0]
584        process_argspec = inspect.getargspec(start_queue_server)
585        process_arg_names = process_argspec.args
586        process_argv = process[-1 * len(process_arg_names):]
587        args = dict(zip(process_arg_names, process_argv))
588
589    return pid, args
590
591
592def get_current_queue_and_server_pid():
593    """Kills the current running queue server process.
594
595  Returns:
596    queue: the queue that the server used to serve.
597  """
598
599    pid, args = get_current_logger_server()
600    get_queue_args = inspect.getargspec(get_queue).args
601    if pid:
602        try:
603            kwargs = {arg_name: args[arg_name] for arg_name in get_queue_args}
604        except KeyError:
605            raise BigQueryLoggerUtilsError(
606                'Param names in get_queue %s must be subset of param names for start_queue_server %s'
607                % (get_queue_args, args.keys()))
608        else:
609            # Retrieve reference to current
610            queue = get_queue(**kwargs)
611            return pid, queue
612
613
614def kill_current_scheduled_automatic_logger():
615    pid, _ = get_current_scheduled_automatic_logger()
616    if pid:
617        kill_pid(pid)
618
619
620def get_scheduled_automatic_logger_pid(ip_address, port, authkey, project_id,
621                                       credentials_path):
622    """Returns the process id of a bigquery_scheduled_automatic_logger instance for a given set of configs.
623
624  Arguments:
625    ip_address: ip_address of the machine on which the server managing the shared queue to pull from is located
626    port: port on which the server managing the shared queue to pull from can be reached
627    authkey: password needed to access server
628    project_id: name of BigQuery project to send data to
629    credentials_path: path to directory where Google Service Account credentials for this BigQuery
630      project are stored
631  Returns:
632    pid: process id of process if found. Else 0
633  """
634
635    pids = get_pids(_AUTOMATIC_LOGGER_SCRIPT, ip_address, port, authkey,
636                    project_id, os.path.expanduser(credentials_path))
637
638    pid = 0
639    if pids:
640        pid = pids[0]
641    return pid
642
643
644def get_logger_server_pid(queue_size, ip_address, port, authkey):
645    """Returns the process id of a bigquery_logger_service instance for a given set of configs.
646
647  Arguments:
648    queue_size: the size of the shared data queue
649    ip_address: ip_address of the machine on which the server managing the shared queue to pull from is located
650    port: port on which the server managing the shared queue to pull from can be reached
651    authkey: password needed to access server
652  Returns:
653    pid: process id of process if found. Else 0
654  """
655
656    pids = get_pids(_SERVER_SCRIPT, queue_size, ip_address, port, authkey)
657    pid = 0
658    if pids:
659        pid = pids[0]
660    return pid
661
662
663def get_pids(*argv):
664    """Gets process ids based on arguments to concatenate and grep
665
666  Arguments:
667    *argv: any number of arguments to be joined and grepped
668  Returns:
669    pids: process ids of process if found.
670  """
671    processes = get_processes(*argv)
672    pids = [process[0] for process in processes]
673
674    return pids
675
676
677def get_processes(*argv):
678    """Returns process grepped by a set of arguments.
679
680  Arguments:
681    *argv: any number of arguments to be joined and grepped
682  Returns:
683    processes: processes returned by grep, as a list of lists.
684  """
685    expression = ' '.join([str(arg) for arg in argv])
686    processes = []
687    try:
688        results = subprocess.check_output(
689            'pgrep -af \"%s\"' % expression, shell=True)
690        for result in results.split('\n'):
691            items = result.split(' ')
692            if 'pgrep' not in items:
693                processes.append(items)
694    except subprocess.CalledProcessError:
695        pass
696
697    return processes
698
699
700def kill_pid(pid):
701    """To only be used on _SERVER_SCRIPT or _AUTOMATIC_LOGGER_SCRIPT"""
702
703    result = subprocess.check_output('kill -9 %s' % str(pid), shell=True)
704    return result
705