• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3# Copyright (C) 2018 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16"""Simple buffer interface that sends rows to specified tables in wearables-qa project in BigQuery."""
17import acts.controllers.buds_lib.data_storage.bigquery.bigquery_logger_utils as bq_utils
18import os
19import time
20import yaml
21
22CONFIG = 'config.yml'
23PATH_TO_CONFIG = os.path.join(os.path.dirname(__file__), CONFIG)
24
25queue = None
26
27
28class BigqueryBufferError(Exception):
29    """To be thrown if data storage queue malfunctions or cannot be reached"""
30
31
32class BigQueryProcessManager:
33    def __init__(self, config_path):
34        self.config_path = config_path
35        self.ip_address = None
36        self.port = None
37        self.load_config()
38
39    def load_config(self):
40        config = yaml.load(open(self.config_path, 'r'))
41        new_ip_address = config['ip_address']
42        new_port = config['port']
43        new_queue_size = config['queue_size']
44        new_authkey = config['authkey']
45        if new_ip_address == self.ip_address and new_port == self.port:
46            if new_authkey != self.authkey or new_queue_size != self.queue_size:
47                raise BigqueryBufferError(
48                    'To change queue size or server authkey, choose an unused port for a new server.'
49                )
50        self.project_id = config['project_id']
51        self.credentials_path = config['credentials_path']
52        self.queue_size = config['queue_size']
53        self.ip_address = config['ip_address']
54        self.port = config['port']
55        self.authkey = config['authkey']
56        self.flush_period = config['flush_period']
57
58    def start_subprocesses(self):
59        old_server_pid, old_queue = None, None
60
61        if not self.server_pid():
62            try:
63                # check if a BigqueryLoggerQueue currently exists but with different args
64                old_server_pid, old_queue = bq_utils.get_current_queue_and_server_pid(
65                )
66            except TypeError:
67                pass
68
69            # Start server to initialize new shared BigqueryLoggerQueue
70            bq_utils.start_queue_server(
71                queue_size=self.queue_size,
72                ip_address=self.ip_address,
73                port=self.port,
74                authkey=self.authkey)
75            time.sleep(5)
76
77        # Retrieve proxy object for new shared BigqueryLoggerQueue
78        global queue
79        queue = bq_utils.get_queue(
80            ip_address=self.ip_address, port=self.port, authkey=self.authkey)
81
82        if queue:
83
84            if old_queue and old_server_pid:  # If and older queue exists, transfer its items to new one
85                while not old_queue.empty():
86                    queue.put(old_queue.get())
87                bq_utils.kill_pid(old_server_pid)
88
89            # noinspection PyUnresolvedReferences
90            queue.set_flush_period(self.flush_period)
91
92            # noinspection PyUnresolvedReferences
93            if not self.automatic_logger_pid():
94                bq_utils.kill_current_scheduled_automatic_logger()
95
96                bq_utils.start_scheduled_automatic_logger(
97                    ip_address=self.ip_address,
98                    port=self.port,
99                    authkey=self.authkey,
100                    project_id=self.project_id,
101                    credentials_path=self.credentials_path)
102
103        if self.server_pid() and self.automatic_logger_pid():
104            return True
105
106        return False
107
108    def automatic_logger_pid(self):
109        return bq_utils.get_scheduled_automatic_logger_pid(
110            ip_address=self.ip_address,
111            port=self.port,
112            authkey=self.authkey,
113            project_id=self.project_id,
114            credentials_path=self.credentials_path)
115
116    def server_pid(self):
117        return bq_utils.get_logger_server_pid(
118            queue_size=self.queue_size,
119            ip_address=self.ip_address,
120            port=self.port,
121            authkey=self.authkey)
122
123
124process_manager = BigQueryProcessManager(PATH_TO_CONFIG)
125
126
127def log(dataset_id, table_id, row_dict):
128    """Sends a row dict to be flushed to a table in BigQuery.
129
130  Arguments:
131    dataset_id: dataset in which table resides.
132    table_id: table to update with row.
133    row_dict: dictionary for field: value pairs to send to table.
134  """
135    global queue
136
137    try:
138        process_manager.load_config()
139    except BigqueryBufferError as e:
140        print(e.message)
141        subprocesses_started = True
142    else:
143        subprocesses_started = process_manager.start_subprocesses()
144
145    if not subprocesses_started:
146        raise BigqueryBufferError('Could not start subprocesses')
147    if queue:
148        try:
149            # noinspection PyUnresolvedReferences
150            queue.add_row(dataset_id, table_id, row_dict)
151        except EOFError:
152            raise BigqueryBufferError(
153                'Could not push data to storage queue (EOFError)')
154    else:
155        raise BigqueryBufferError('No data queue exists to push data to...')
156