1#!/usr/bin/env python3 2# 3# Copyright (C) 2018 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16"""Queue wrapper object to be shared across all tests using the bigquery_buffer module.""" 17 18from multiprocessing import Queue 19 20DEFAULT_SIZE = 30000 21 22 23class BigqueryLoggerQueue: 24 """Organizes and stores all BigQuery table row updates sent to it.""" 25 26 def __init__(self, size=DEFAULT_SIZE): 27 self.queue = Queue(maxsize=size) 28 self.flush_period = 1 29 30 def add_row(self, dataset_id, table_id, row): 31 """Store row to be added with all other rows to be added to passed table. 32 33 Arguments: 34 dataset_id: the dataset in which table_id resides. 35 table_id: the id of the table to update. 36 row: a dictionary of field: value pairs representing the row to add. 37 """ 38 39 self.queue.put(((dataset_id, table_id), row)) 40 41 def get_insert_iterator(self): 42 """Organize queue into iterator of ((dataset_id, table_id), rows_list) tuples. 43 Takes state of queue upon invocation, ignoring items put in queue after. 44 45 Returns: 46 insert_iterator: an iterator of pairs dataset/table ids and the lists 47 of rows to insert into those tables. 48 """ 49 50 insert_dict = {} 51 num_entries_to_insert = self.queue.qsize() 52 53 for i in xrange(num_entries_to_insert): 54 if not self.queue.empty(): 55 dataset_table_tuple, row_dict = self.queue.get() 56 if dataset_table_tuple not in insert_dict.keys(): 57 insert_dict[dataset_table_tuple] = [] 58 insert_dict[dataset_table_tuple].append(row_dict) 59 60 return insert_dict.items() 61 62 def put(self, row_tuple): 63 self.queue.put(row_tuple) 64 65 def get(self): 66 return self.queue.get() 67 68 def empty(self): 69 return self.queue.empty() 70 71 def get_flush_period(self): 72 return self.flush_period 73 74 def set_flush_period(self, period): 75 self.flush_period = int(period) 76