• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3# Copyright (C) 2018 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16
17from google.api_core.exceptions import NotFound
18from google.cloud import bigquery
19from mock import patch, Mock
20
21import acts.controllers.buds_lib.data_storage.bigquery.bigquery_logger_utils as utils
22
23_TIMESTAMP_STR_FORMAT = '%Y-%m-%d %H:%M:%S'
24
25
26def test_make_storable():
27    to_make_storable = ['one', 1, 1.0, True, [1]]
28    targets = ['one', 1, 1.0, True, str([1])]
29    assert [utils.make_storeable(item) for item in to_make_storable] == targets
30
31
32def test_get_field_name():
33    bad_names = [
34        'all_lowercase', 'b@d<h4r^c7=r$', '5tarts_with_digit', '_underscore',
35        '', 'hyphen-name'
36    ]
37    targets = [
38        'ALL_LOWERCASE', 'BDH4RC7R', 'FIELD_5TARTS_WITH_DIGIT', '_UNDERSCORE',
39        'FIELD', 'HYPHENNAME'
40    ]
41    assert [utils.get_field_name(item) for item in bad_names] == targets
42
43
44def test_get_bigquery_type():
45    items = ['one', '2017-11-03 12:30:00', 1, 1.0, True, utils]
46    targets = ['STRING', 'TIMESTAMP', 'INTEGER', 'FLOAT', 'BOOLEAN', 'STRING']
47    assert [utils.get_bigquery_type(item) for item in items] == targets
48
49
50def test_add_row():
51    row_list = []
52    utils.add_row({'int': 500, 'list': [1, 2, 3], 'float': 5.0}, row_list)
53    assert set(row_list[0].items()) == set({
54        'INT': 500,
55        'LIST': '[1, 2, 3]',
56        'FLOAT': 5.0
57    }.items())
58    utils.add_row({'int': 12, 'time': '2011-12-13 10:00:00'}, row_list)
59    assert set(row_list[1].items()) == set({
60        'INT': 12,
61        'TIME': '2011-12-13 10:00:00'
62    }.items())
63    utils.add_row({'1string': '1'}, row_list)
64    assert set(row_list[2].items()) == set({'FIELD_1STRING': '1'}.items())
65
66
67def test_change_field_name():
68    row_list = [{
69        'FIELD1': None,
70        'FIELD2': 300,
71        'FIELD3': True
72    }, {
73        'FIELD1': 'a string',
74        'FIELD2': 300,
75        'FIELD4': False
76    }, {
77        'FIELD1': 'another string',
78        'FIELD3': True,
79        'FIELD4': False
80    }]
81    num_replacements = utils.change_field_name('field1', 'new_name', row_list)
82    assert num_replacements == 3
83    assert set(row_list[0].items()) == set({
84        'NEW_NAME': None,
85        'FIELD2': 300,
86        'FIELD3': True
87    }.items())
88    assert set(row_list[1].items()) == set({
89        'NEW_NAME': 'a string',
90        'FIELD2': 300,
91        'FIELD4': False
92    }.items())
93    assert set(row_list[2].items()) == set({
94        'NEW_NAME': 'another string',
95        'FIELD3': True,
96        'FIELD4': False
97    }.items())
98    num_replacements = utils.change_field_name('field2', 'new_name2', row_list)
99    assert num_replacements == 2
100    assert set(row_list[0].items()) == set({
101        'NEW_NAME': None,
102        'NEW_NAME2': 300,
103        'FIELD3': True
104    }.items())
105    assert set(row_list[1].items()) == set({
106        'NEW_NAME': 'a string',
107        'NEW_NAME2': 300,
108        'FIELD4': False
109    }.items())
110    assert set(row_list[2].items()) == set({
111        'NEW_NAME': 'another string',
112        'FIELD3': True,
113        'FIELD4': False
114    }.items())
115    num_replacements = utils.change_field_name('field5', 'new_name3', row_list)
116    assert num_replacements == 0
117    assert set(row_list[0].items()) == set({
118        'NEW_NAME': None,
119        'NEW_NAME2': 300,
120        'FIELD3': True
121    }.items())
122    assert set(row_list[1].items()) == set({
123        'NEW_NAME': 'a string',
124        'NEW_NAME2': 300,
125        'FIELD4': False
126    }.items())
127    assert set(row_list[2].items()) == set({
128        'NEW_NAME': 'another string',
129        'FIELD3': True,
130        'FIELD4': False
131    }.items())
132
133
134def test_get_schema_from_dict():
135    dict = {'FIELD': 'STRING', 'IELD': 'BOOLEAN', 'ELD': 'TIMESTAMP'}
136    target = [
137        bigquery.SchemaField('ELD', 'TIMESTAMP', mode='nullable'),
138        bigquery.SchemaField('FIELD', 'STRING', mode='nullable'),
139        bigquery.SchemaField('IELD', 'BOOLEAN', mode='nullable')
140    ]
141    assert utils.get_schema_from_dict(dict) == target
142
143
144def test_get_dict_from_schema():
145    schema = [
146        bigquery.SchemaField('a_float'.upper(), 'FLOAT'),
147        bigquery.SchemaField('an_int'.upper(), 'INTEGER'),
148        bigquery.SchemaField('a_string'.upper(), 'STRING'),
149        bigquery.SchemaField('a_timestamp'.upper(), 'TIMESTAMP'),
150        bigquery.SchemaField('a_boolean'.upper(), 'BOOLEAN'),
151        bigquery.SchemaField('unknown'.upper(), 'STRING')
152    ]
153
154    dictionary = {
155        'a_float'.upper(): 'FLOAT',
156        'an_int'.upper(): 'INTEGER',
157        'a_string'.upper(): 'STRING',
158        'a_timestamp'.upper(): 'TIMESTAMP',
159        'a_boolean'.upper(): 'BOOLEAN',
160        'unknown'.upper(): 'STRING'
161    }
162
163    assert dictionary.items() == utils.get_dict_from_schema(schema).items()
164
165
166def test_reconcile_schema_differences():
167    schema_to_change = {
168        'FIELD1': 'TIMESTAMP',
169        'FIELD2': 'INTEGER',
170        'FIELD3': 'FLOAT',
171        'FIELD4': 'STRING',
172        'FIELD5': 'BOOLEAN',
173        'FIELD6': 'STRING'
174    }
175    schema_to_preserve = {
176        'FIELD1': 'TIMESTAMP',
177        'FIELD2': 'FLOAT',
178        'FIELD3_FLOAT': 'TIMESTAMP',
179        'FIELD3': 'BOOLEAN',
180        'FIELD5': 'TIMESTAMP',
181        'FIELD7': 'TIMESTAMP'
182    }
183    target_schema = {
184        'FIELD1': 'TIMESTAMP',
185        'FIELD2': 'FLOAT',
186        'FIELD2_INTEGER': 'INTEGER',
187        'FIELD3': 'BOOLEAN',
188        'FIELD3_FLOAT': 'TIMESTAMP',
189        'FIELD3_FLOAT1': 'FLOAT',
190        'FIELD4': 'STRING',
191        'FIELD5': 'TIMESTAMP',
192        'FIELD5_BOOLEAN': 'BOOLEAN',
193        'FIELD6': 'STRING',
194        'FIELD7': 'TIMESTAMP'
195    }
196    assert utils.reconcile_schema_differences(
197        schema_to_change,
198        schema_to_preserve)[0].items() == target_schema.items()
199
200
201def test_get_tuple_from_schema():
202    schema = [
203        bigquery.SchemaField('FIELD1', 'BOOLEAN', mode='nullable'),
204        bigquery.SchemaField('FIELD2', 'INTEGER', mode='nullable'),
205        bigquery.SchemaField('FIELD3', 'STRING', mode='nullable'),
206        bigquery.SchemaField('FIELD4', 'TIMESTAMP', mode='nullable'),
207        bigquery.SchemaField('FIELD5', 'FLOAT', mode='nullable')
208    ]
209    target = ('FIELD1', 'FIELD2', 'FIELD3', 'FIELD4', 'FIELD5')
210    assert utils.get_tuple_from_schema(schema) == target
211
212
213def test_get_schema_from_rows_list():
214    row_list = [{
215        'FIELD1': None,
216        'FIELD2': 300,
217        'FIELD3': True
218    }, {
219        'FIELD1': 'a string',
220        'FIELD2': 300.0,
221        'FIELD4': False
222    }, {
223        'FIELD1': 'another string',
224        'FIELD3': True,
225        'FIELD4': False
226    }]
227    schema = [
228        bigquery.SchemaField('FIELD1', 'STRING', mode='nullable'),
229        bigquery.SchemaField('FIELD2', 'STRING', mode='nullable'),
230        bigquery.SchemaField('FIELD3', 'BOOLEAN', mode='nullable'),
231        bigquery.SchemaField('FIELD4', 'BOOLEAN', mode='nullable')
232    ]
233    assert utils.get_schema_from_rows_list(row_list) == schema
234
235
236def test_get_formatted_rows():
237    row_list = [{
238        'FIELD1': None,
239        'FIELD2': 300,
240        'FIELD3': True
241    }, {
242        'FIELD1': 'a string',
243        'FIELD2': 300.0,
244        'FIELD4': False
245    }, {
246        'FIELD1': 'another string',
247        'FIELD3': True,
248        'FIELD4': False
249    }]
250    schema = (bigquery.SchemaField('FIELD5', 'TIMESTAMP', mode='nullable'),
251              bigquery.SchemaField('FIELD4', 'BOOLEAN', mode='nullable'),
252              bigquery.SchemaField('FIELD3.5', 'INTEGER', mode='nullable'),
253              bigquery.SchemaField('FIELD3', 'BOOLEAN', mode='nullable'),
254              bigquery.SchemaField('FIELD2', 'STRING', mode='nullable'),
255              bigquery.SchemaField('FIELD1', 'STRING', mode='nullable'))
256    target = [(None, None, None, True, 300, None), (None, False, None, None,
257                                                    300.0, 'a string'),
258              (None, False, None, True, None, 'another string')]
259    assert utils.get_formatted_rows(row_list, schema) == target
260
261
262class Client:
263    def get_dataset(self, name):
264        if name == 'existing_dataset':
265            return Dataset(name)
266        else:
267            raise NotFound('')
268
269    def create_dataset(self, dataset):
270        return dataset
271
272    def dataset(self, name):
273        return name
274
275    def delete_dataset(self, dataset):
276        return 'deleted dataset ' + dataset.name
277
278    def get_table(self, name):
279        if name == 'existing_table':
280            return Table(name, [])
281        else:
282            raise NotFound('')
283
284    def create_table(self, table):
285        return table
286
287    def update_table(self, table, properties):
288        return Table(table.name + '_changed', table.schema)
289
290    def delete_table(self, table):
291        return 'deleted table ' + table.name
292
293    def create_rows(self, table, rows):
294        if table.name == 'bad_table':
295            return ['errors']
296        return []
297
298
299class Dataset:
300    def __init__(self, name):
301        self.name = name
302
303    def __eq__(self, other):
304        return self.name == other.name
305
306    def table(self, name):
307        return name
308
309
310class Table:
311    def __init__(self, name, schema):
312        self.name = name
313        self.schema = schema
314
315    def __eq__(self, other):
316        return self.name == other.name and set(self.schema) == set(
317            other.schema)
318
319    def __str__(self):
320        return 'NAME: %s\nSCHEMA: %s' % (self.name, str(self.schema))
321
322
323@patch('infra.data_storage.bigquery.bigquery_logger_utils.bigquery.Dataset')
324@patch('infra.data_storage.bigquery.bigquery_logger_utils.bigquery.Client')
325def test_create_dataset_already_exists(mock_client, mock_dataset):
326    mock_client.return_value = Client()
327    client = utils.BigqueryLoggerClient('', '')
328    dataset = client.create_dataset('existing_dataset')
329    assert dataset == Dataset('existing_dataset')
330
331
332@patch('infra.data_storage.bigquery.bigquery_logger_utils.bigquery.Dataset')
333@patch('infra.data_storage.bigquery.bigquery_logger_utils.bigquery.Client')
334def test_create_dataset_does_not_exist(mock_client, mock_dataset):
335    mock_client.return_value = Client()
336    mock_dataset.return_value = Dataset('new_dataset')
337    client = utils.BigqueryLoggerClient('', '')
338    dataset = client.create_dataset('new_dataset')
339    assert dataset == Dataset('new_dataset')
340
341
342@patch('infra.data_storage.bigquery.bigquery_logger_utils.bigquery.Table')
343@patch(
344    'infra.data_storage.bigquery.bigquery_logger_utils.BigqueryLoggerClient.create_dataset'
345)
346@patch('infra.data_storage.bigquery.bigquery_logger_utils.bigquery.Client')
347def test_create_table_already_exists(mock_client, mock_dataset, mock_table):
348    mock_client.return_value = Client()
349    mock_dataset.return_value = Dataset('existing_dataset')
350    schema = {
351        bigquery.SchemaField('FIELD1', 'STRING', mode='nullable'),
352        bigquery.SchemaField('FIELD2', 'BOOLEAN', mode='nullable'),
353        bigquery.SchemaField('FIELD3', 'TIMESTAMP', mode='nullable')
354    }
355    mock_table.return_value = Table('existing_table', schema)
356    client = utils.BigqueryLoggerClient('', '')
357    table = client.create_table('existing_dataset', 'existing_table', schema)
358    assert table == Table('existing_table', [])
359
360
361@patch('infra.data_storage.bigquery.bigquery_logger_utils.bigquery.Table')
362@patch(
363    'infra.data_storage.bigquery.bigquery_logger_utils.BigqueryLoggerClient.create_dataset'
364)
365@patch('infra.data_storage.bigquery.bigquery_logger_utils.bigquery.Client')
366def test_create_table_does_not_exist(mock_client, mock_dataset, mock_table):
367    mock_client.return_value = Client()
368    mock_dataset.return_value = Dataset('existing_dataset')
369    schema = {
370        bigquery.SchemaField('FIELD1', 'STRING', mode='nullable'),
371        bigquery.SchemaField('FIELD2', 'BOOLEAN', mode='nullable'),
372        bigquery.SchemaField('FIELD3', 'TIMESTAMP', mode='nullable')
373    }
374    mock_table.return_value = Table('new_table', schema)
375    client = utils.BigqueryLoggerClient('', '')
376    table = client.create_table('existing_dataset', 'new_table', schema)
377    assert table == Table('new_table', schema)
378
379
380@patch(
381    'infra.data_storage.bigquery.bigquery_logger_utils.BigqueryLoggerClient.create_table'
382)
383@patch('infra.data_storage.bigquery.bigquery_logger_utils.bigquery.Client')
384def test_update_table_schema(mock_client, mock_table):
385    mock_client.return_value = Client()
386    schema = {
387        bigquery.SchemaField('FIELD1', 'STRING', mode='nullable'),
388        bigquery.SchemaField('FIELD2', 'BOOLEAN', mode='nullable'),
389        bigquery.SchemaField('FIELD3', 'TIMESTAMP', mode='nullable')
390    }
391    mock_table.return_value = Table('existing_table', schema)
392    new_schema = {
393        bigquery.SchemaField('FIELD1', 'INTEGER', mode='nullable'),
394        bigquery.SchemaField('FIELD2', 'BOOLEAN', mode='nullable'),
395        bigquery.SchemaField('FIELD5', 'FLOAT', mode='nullable')
396    }
397    client = utils.BigqueryLoggerClient('', '')
398    table, changed_fields = client.update_table_schema(
399        'existing_dataset', 'existing_table', new_schema)
400    print(table)
401    assert table == Table(
402        'existing_table_changed', {
403            bigquery.SchemaField('FIELD1_INTEGER', 'INTEGER', mode='nullable'),
404            bigquery.SchemaField('FIELD1', 'STRING', mode='nullable'),
405            bigquery.SchemaField('FIELD2', 'BOOLEAN', mode='nullable'),
406            bigquery.SchemaField('FIELD3', 'TIMESTAMP', mode='nullable'),
407            bigquery.SchemaField('FIELD5', 'FLOAT', mode='nullable')
408        })
409    assert set(changed_fields.items()) == set({
410        'FIELD1': 'FIELD1_INTEGER'
411    }.items())
412
413
414@patch(
415    'infra.data_storage.bigquery.bigquery_logger_utils.BigqueryLoggerClient.create_table'
416)
417@patch('infra.data_storage.bigquery.bigquery_logger_utils.bigquery.Client')
418def test_update_table_schema_no_change(mock_client, mock_table):
419    mock_client.return_value = Client()
420    schema = {
421        bigquery.SchemaField('FIELD1', 'STRING', mode='nullable'),
422        bigquery.SchemaField('FIELD2', 'BOOLEAN', mode='nullable'),
423        bigquery.SchemaField('FIELD3', 'TIMESTAMP', mode='nullable')
424    }
425    mock_table.return_value = Table('existing_table', schema)
426    new_schema = {
427        bigquery.SchemaField('FIELD1', 'STRING', mode='nullable'),
428        bigquery.SchemaField('FIELD2', 'BOOLEAN', mode='nullable')
429    }
430    client = utils.BigqueryLoggerClient('', '')
431    table, changed_fields = client.update_table_schema(
432        'existing_dataset', 'existing_table', new_schema)
433    print(table)
434    assert table == Table(
435        'existing_table', {
436            bigquery.SchemaField('FIELD1', 'STRING', mode='nullable'),
437            bigquery.SchemaField('FIELD2', 'BOOLEAN', mode='nullable'),
438            bigquery.SchemaField('FIELD3', 'TIMESTAMP', mode='nullable')
439        })
440    assert set(changed_fields.items()) == set({}.items())
441
442
443@patch('infra.data_storage.bigquery.test_bigquery_utils.Client.delete_dataset')
444@patch('infra.data_storage.bigquery.bigquery_logger_utils.bigquery.Dataset')
445@patch('infra.data_storage.bigquery.bigquery_logger_utils.bigquery.Client')
446def test_delete_dataset(mock_client, mock_dataset, mock_delete_dataset):
447    mock_client.return_value = Client()
448    ds = Dataset('existing_dataset')
449    mock_dataset.return_value = ds
450    client = utils.BigqueryLoggerClient('', '')
451    client.delete('existing_dataset')
452    mock_delete_dataset.assert_called_with(ds)
453
454
455@patch('infra.data_storage.bigquery.test_bigquery_utils.Client.delete_table')
456@patch('infra.data_storage.bigquery.bigquery_logger_utils.bigquery.Table')
457@patch('infra.data_storage.bigquery.bigquery_logger_utils.bigquery.Dataset')
458@patch('infra.data_storage.bigquery.bigquery_logger_utils.bigquery.Client')
459def test_delete_dataset(mock_client, mock_dataset, mock_table,
460                        mock_delete_table):
461    mock_client.return_value = Client()
462    schema = {
463        bigquery.SchemaField('FIELD1', 'STRING', mode='nullable'),
464        bigquery.SchemaField('FIELD2', 'BOOLEAN', mode='nullable'),
465        bigquery.SchemaField('FIELD3', 'TIMESTAMP', mode='nullable')
466    }
467    tb = Table('existing_table', schema)
468    mock_table.return_value = tb
469    client = utils.BigqueryLoggerClient('', '')
470    client.delete('existing_dataset', 'existing_table')
471    mock_delete_table.assert_called_with(tb)
472
473
474@patch('infra.data_storage.bigquery.test_bigquery_utils.Client.create_rows')
475@patch(
476    'infra.data_storage.bigquery.test_bigquery_utils.utils.get_schema_from_rows_list'
477)
478@patch(
479    'infra.data_storage.bigquery.test_bigquery_utils.utils.change_field_name')
480@patch(
481    'infra.data_storage.bigquery.test_bigquery_utils.utils.BigqueryLoggerClient.update_table_schema'
482)
483@patch('infra.data_storage.bigquery.bigquery_logger_utils.bigquery.Client')
484def test_flush(mock_client, mock_update_table_schema, mock_change_field_name,
485               mock_get_schema, mock_create_rows):
486    mock_create_rows.return_value = []
487    mock_client.return_value = Client()
488    schema = {
489        bigquery.SchemaField('FIELD1', 'STRING', mode='nullable'),
490        bigquery.SchemaField('FIELD2', 'BOOLEAN', mode='nullable'),
491        bigquery.SchemaField('FIELD3', 'TIMESTAMP', mode='nullable')
492    }
493    tb = Table('existing_table', schema)
494    mock_update_table_schema.return_value = tb, {'FIELD1': 'NEW_NAME1'}
495    row_list = [{
496        'FIELD1': 1,
497        'FIELD2': False,
498        'FIELD3': 'result'
499    }, {
500        'FIELD1': 2,
501        'FIELD2': True
502    }, {
503        'FIELD1': 3,
504        'FIELD3': 'result'
505    }]
506    client = utils.BigqueryLoggerClient('', '')
507    errors = client.flush(row_list, 'existing_dataset', 'existing_table')
508    mock_change_field_name.assert_called_with('FIELD1', 'NEW_NAME1', row_list)
509    mock_create_rows.assert_called_once()
510    assert errors == []
511