1#!/usr/bin/env python3 2# 3# Copyright (C) 2018 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16"""Simple buffer interface that sends rows to specified tables in wearables-qa project in BigQuery.""" 17import acts.controllers.buds_lib.data_storage.bigquery.bigquery_logger_utils as bq_utils 18import os 19import time 20import yaml 21 22CONFIG = 'config.yml' 23PATH_TO_CONFIG = os.path.join(os.path.dirname(__file__), CONFIG) 24 25queue = None 26 27 28class BigqueryBufferError(Exception): 29 """To be thrown if data storage queue malfunctions or cannot be reached""" 30 31 32class BigQueryProcessManager: 33 def __init__(self, config_path): 34 self.config_path = config_path 35 self.ip_address = None 36 self.port = None 37 self.load_config() 38 39 def load_config(self): 40 config = yaml.load(open(self.config_path, 'r')) 41 new_ip_address = config['ip_address'] 42 new_port = config['port'] 43 new_queue_size = config['queue_size'] 44 new_authkey = config['authkey'] 45 if new_ip_address == self.ip_address and new_port == self.port: 46 if new_authkey != self.authkey or new_queue_size != self.queue_size: 47 raise BigqueryBufferError( 48 'To change queue size or server authkey, choose an unused port for a new server.' 49 ) 50 self.project_id = config['project_id'] 51 self.credentials_path = config['credentials_path'] 52 self.queue_size = config['queue_size'] 53 self.ip_address = config['ip_address'] 54 self.port = config['port'] 55 self.authkey = config['authkey'] 56 self.flush_period = config['flush_period'] 57 58 def start_subprocesses(self): 59 old_server_pid, old_queue = None, None 60 61 if not self.server_pid(): 62 try: 63 # check if a BigqueryLoggerQueue currently exists but with different args 64 old_server_pid, old_queue = bq_utils.get_current_queue_and_server_pid( 65 ) 66 except TypeError: 67 pass 68 69 # Start server to initialize new shared BigqueryLoggerQueue 70 bq_utils.start_queue_server( 71 queue_size=self.queue_size, 72 ip_address=self.ip_address, 73 port=self.port, 74 authkey=self.authkey) 75 time.sleep(5) 76 77 # Retrieve proxy object for new shared BigqueryLoggerQueue 78 global queue 79 queue = bq_utils.get_queue( 80 ip_address=self.ip_address, port=self.port, authkey=self.authkey) 81 82 if queue: 83 84 if old_queue and old_server_pid: # If and older queue exists, transfer its items to new one 85 while not old_queue.empty(): 86 queue.put(old_queue.get()) 87 bq_utils.kill_pid(old_server_pid) 88 89 # noinspection PyUnresolvedReferences 90 queue.set_flush_period(self.flush_period) 91 92 # noinspection PyUnresolvedReferences 93 if not self.automatic_logger_pid(): 94 bq_utils.kill_current_scheduled_automatic_logger() 95 96 bq_utils.start_scheduled_automatic_logger( 97 ip_address=self.ip_address, 98 port=self.port, 99 authkey=self.authkey, 100 project_id=self.project_id, 101 credentials_path=self.credentials_path) 102 103 if self.server_pid() and self.automatic_logger_pid(): 104 return True 105 106 return False 107 108 def automatic_logger_pid(self): 109 return bq_utils.get_scheduled_automatic_logger_pid( 110 ip_address=self.ip_address, 111 port=self.port, 112 authkey=self.authkey, 113 project_id=self.project_id, 114 credentials_path=self.credentials_path) 115 116 def server_pid(self): 117 return bq_utils.get_logger_server_pid( 118 queue_size=self.queue_size, 119 ip_address=self.ip_address, 120 port=self.port, 121 authkey=self.authkey) 122 123 124process_manager = BigQueryProcessManager(PATH_TO_CONFIG) 125 126 127def log(dataset_id, table_id, row_dict): 128 """Sends a row dict to be flushed to a table in BigQuery. 129 130 Arguments: 131 dataset_id: dataset in which table resides. 132 table_id: table to update with row. 133 row_dict: dictionary for field: value pairs to send to table. 134 """ 135 global queue 136 137 try: 138 process_manager.load_config() 139 except BigqueryBufferError as e: 140 print(e.message) 141 subprocesses_started = True 142 else: 143 subprocesses_started = process_manager.start_subprocesses() 144 145 if not subprocesses_started: 146 raise BigqueryBufferError('Could not start subprocesses') 147 if queue: 148 try: 149 # noinspection PyUnresolvedReferences 150 queue.add_row(dataset_id, table_id, row_dict) 151 except EOFError: 152 raise BigqueryBufferError( 153 'Could not push data to storage queue (EOFError)') 154 else: 155 raise BigqueryBufferError('No data queue exists to push data to...') 156