• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3# Copyright (C) 2018 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16"""Queue wrapper object to be shared across all tests using the bigquery_buffer module."""
17
18from multiprocessing import Queue
19
20DEFAULT_SIZE = 30000
21
22
23class BigqueryLoggerQueue:
24    """Organizes and stores all BigQuery table row updates sent to it."""
25
26    def __init__(self, size=DEFAULT_SIZE):
27        self.queue = Queue(maxsize=size)
28        self.flush_period = 1
29
30    def add_row(self, dataset_id, table_id, row):
31        """Store row to be added with all other rows to be added to passed table.
32
33    Arguments:
34      dataset_id: the dataset in which table_id resides.
35      table_id: the id of the table to update.
36      row: a dictionary of field: value pairs representing the row to add.
37    """
38
39        self.queue.put(((dataset_id, table_id), row))
40
41    def get_insert_iterator(self):
42        """Organize queue into iterator of ((dataset_id, table_id), rows_list) tuples.
43    Takes state of queue upon invocation, ignoring items put in queue after.
44
45    Returns:
46      insert_iterator: an iterator of pairs dataset/table ids and the lists
47      of rows to insert into those tables.
48    """
49
50        insert_dict = {}
51        num_entries_to_insert = self.queue.qsize()
52
53        for i in xrange(num_entries_to_insert):
54            if not self.queue.empty():
55                dataset_table_tuple, row_dict = self.queue.get()
56                if dataset_table_tuple not in insert_dict.keys():
57                    insert_dict[dataset_table_tuple] = []
58                insert_dict[dataset_table_tuple].append(row_dict)
59
60        return insert_dict.items()
61
62    def put(self, row_tuple):
63        self.queue.put(row_tuple)
64
65    def get(self):
66        return self.queue.get()
67
68    def empty(self):
69        return self.queue.empty()
70
71    def get_flush_period(self):
72        return self.flush_period
73
74    def set_flush_period(self, period):
75        self.flush_period = int(period)
76