1#!/usr/bin/env python3 2# 3# Copyright (C) 2018 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16 17from google.api_core.exceptions import NotFound 18from google.cloud import bigquery 19from mock import patch, Mock 20 21import acts.controllers.buds_lib.data_storage.bigquery.bigquery_logger_utils as utils 22 23_TIMESTAMP_STR_FORMAT = '%Y-%m-%d %H:%M:%S' 24 25 26def test_make_storable(): 27 to_make_storable = ['one', 1, 1.0, True, [1]] 28 targets = ['one', 1, 1.0, True, str([1])] 29 assert [utils.make_storeable(item) for item in to_make_storable] == targets 30 31 32def test_get_field_name(): 33 bad_names = [ 34 'all_lowercase', 'b@d<h4r^c7=r$', '5tarts_with_digit', '_underscore', 35 '', 'hyphen-name' 36 ] 37 targets = [ 38 'ALL_LOWERCASE', 'BDH4RC7R', 'FIELD_5TARTS_WITH_DIGIT', '_UNDERSCORE', 39 'FIELD', 'HYPHENNAME' 40 ] 41 assert [utils.get_field_name(item) for item in bad_names] == targets 42 43 44def test_get_bigquery_type(): 45 items = ['one', '2017-11-03 12:30:00', 1, 1.0, True, utils] 46 targets = ['STRING', 'TIMESTAMP', 'INTEGER', 'FLOAT', 'BOOLEAN', 'STRING'] 47 assert [utils.get_bigquery_type(item) for item in items] == targets 48 49 50def test_add_row(): 51 row_list = [] 52 utils.add_row({'int': 500, 'list': [1, 2, 3], 'float': 5.0}, row_list) 53 assert set(row_list[0].items()) == set({ 54 'INT': 500, 55 'LIST': '[1, 2, 3]', 56 'FLOAT': 5.0 57 }.items()) 58 utils.add_row({'int': 12, 'time': '2011-12-13 10:00:00'}, row_list) 59 assert set(row_list[1].items()) == set({ 60 'INT': 12, 61 'TIME': '2011-12-13 10:00:00' 62 }.items()) 63 utils.add_row({'1string': '1'}, row_list) 64 assert set(row_list[2].items()) == set({'FIELD_1STRING': '1'}.items()) 65 66 67def test_change_field_name(): 68 row_list = [{ 69 'FIELD1': None, 70 'FIELD2': 300, 71 'FIELD3': True 72 }, { 73 'FIELD1': 'a string', 74 'FIELD2': 300, 75 'FIELD4': False 76 }, { 77 'FIELD1': 'another string', 78 'FIELD3': True, 79 'FIELD4': False 80 }] 81 num_replacements = utils.change_field_name('field1', 'new_name', row_list) 82 assert num_replacements == 3 83 assert set(row_list[0].items()) == set({ 84 'NEW_NAME': None, 85 'FIELD2': 300, 86 'FIELD3': True 87 }.items()) 88 assert set(row_list[1].items()) == set({ 89 'NEW_NAME': 'a string', 90 'FIELD2': 300, 91 'FIELD4': False 92 }.items()) 93 assert set(row_list[2].items()) == set({ 94 'NEW_NAME': 'another string', 95 'FIELD3': True, 96 'FIELD4': False 97 }.items()) 98 num_replacements = utils.change_field_name('field2', 'new_name2', row_list) 99 assert num_replacements == 2 100 assert set(row_list[0].items()) == set({ 101 'NEW_NAME': None, 102 'NEW_NAME2': 300, 103 'FIELD3': True 104 }.items()) 105 assert set(row_list[1].items()) == set({ 106 'NEW_NAME': 'a string', 107 'NEW_NAME2': 300, 108 'FIELD4': False 109 }.items()) 110 assert set(row_list[2].items()) == set({ 111 'NEW_NAME': 'another string', 112 'FIELD3': True, 113 'FIELD4': False 114 }.items()) 115 num_replacements = utils.change_field_name('field5', 'new_name3', row_list) 116 assert num_replacements == 0 117 assert set(row_list[0].items()) == set({ 118 'NEW_NAME': None, 119 'NEW_NAME2': 300, 120 'FIELD3': True 121 }.items()) 122 assert set(row_list[1].items()) == set({ 123 'NEW_NAME': 'a string', 124 'NEW_NAME2': 300, 125 'FIELD4': False 126 }.items()) 127 assert set(row_list[2].items()) == set({ 128 'NEW_NAME': 'another string', 129 'FIELD3': True, 130 'FIELD4': False 131 }.items()) 132 133 134def test_get_schema_from_dict(): 135 dict = {'FIELD': 'STRING', 'IELD': 'BOOLEAN', 'ELD': 'TIMESTAMP'} 136 target = [ 137 bigquery.SchemaField('ELD', 'TIMESTAMP', mode='nullable'), 138 bigquery.SchemaField('FIELD', 'STRING', mode='nullable'), 139 bigquery.SchemaField('IELD', 'BOOLEAN', mode='nullable') 140 ] 141 assert utils.get_schema_from_dict(dict) == target 142 143 144def test_get_dict_from_schema(): 145 schema = [ 146 bigquery.SchemaField('a_float'.upper(), 'FLOAT'), 147 bigquery.SchemaField('an_int'.upper(), 'INTEGER'), 148 bigquery.SchemaField('a_string'.upper(), 'STRING'), 149 bigquery.SchemaField('a_timestamp'.upper(), 'TIMESTAMP'), 150 bigquery.SchemaField('a_boolean'.upper(), 'BOOLEAN'), 151 bigquery.SchemaField('unknown'.upper(), 'STRING') 152 ] 153 154 dictionary = { 155 'a_float'.upper(): 'FLOAT', 156 'an_int'.upper(): 'INTEGER', 157 'a_string'.upper(): 'STRING', 158 'a_timestamp'.upper(): 'TIMESTAMP', 159 'a_boolean'.upper(): 'BOOLEAN', 160 'unknown'.upper(): 'STRING' 161 } 162 163 assert dictionary.items() == utils.get_dict_from_schema(schema).items() 164 165 166def test_reconcile_schema_differences(): 167 schema_to_change = { 168 'FIELD1': 'TIMESTAMP', 169 'FIELD2': 'INTEGER', 170 'FIELD3': 'FLOAT', 171 'FIELD4': 'STRING', 172 'FIELD5': 'BOOLEAN', 173 'FIELD6': 'STRING' 174 } 175 schema_to_preserve = { 176 'FIELD1': 'TIMESTAMP', 177 'FIELD2': 'FLOAT', 178 'FIELD3_FLOAT': 'TIMESTAMP', 179 'FIELD3': 'BOOLEAN', 180 'FIELD5': 'TIMESTAMP', 181 'FIELD7': 'TIMESTAMP' 182 } 183 target_schema = { 184 'FIELD1': 'TIMESTAMP', 185 'FIELD2': 'FLOAT', 186 'FIELD2_INTEGER': 'INTEGER', 187 'FIELD3': 'BOOLEAN', 188 'FIELD3_FLOAT': 'TIMESTAMP', 189 'FIELD3_FLOAT1': 'FLOAT', 190 'FIELD4': 'STRING', 191 'FIELD5': 'TIMESTAMP', 192 'FIELD5_BOOLEAN': 'BOOLEAN', 193 'FIELD6': 'STRING', 194 'FIELD7': 'TIMESTAMP' 195 } 196 assert utils.reconcile_schema_differences( 197 schema_to_change, 198 schema_to_preserve)[0].items() == target_schema.items() 199 200 201def test_get_tuple_from_schema(): 202 schema = [ 203 bigquery.SchemaField('FIELD1', 'BOOLEAN', mode='nullable'), 204 bigquery.SchemaField('FIELD2', 'INTEGER', mode='nullable'), 205 bigquery.SchemaField('FIELD3', 'STRING', mode='nullable'), 206 bigquery.SchemaField('FIELD4', 'TIMESTAMP', mode='nullable'), 207 bigquery.SchemaField('FIELD5', 'FLOAT', mode='nullable') 208 ] 209 target = ('FIELD1', 'FIELD2', 'FIELD3', 'FIELD4', 'FIELD5') 210 assert utils.get_tuple_from_schema(schema) == target 211 212 213def test_get_schema_from_rows_list(): 214 row_list = [{ 215 'FIELD1': None, 216 'FIELD2': 300, 217 'FIELD3': True 218 }, { 219 'FIELD1': 'a string', 220 'FIELD2': 300.0, 221 'FIELD4': False 222 }, { 223 'FIELD1': 'another string', 224 'FIELD3': True, 225 'FIELD4': False 226 }] 227 schema = [ 228 bigquery.SchemaField('FIELD1', 'STRING', mode='nullable'), 229 bigquery.SchemaField('FIELD2', 'STRING', mode='nullable'), 230 bigquery.SchemaField('FIELD3', 'BOOLEAN', mode='nullable'), 231 bigquery.SchemaField('FIELD4', 'BOOLEAN', mode='nullable') 232 ] 233 assert utils.get_schema_from_rows_list(row_list) == schema 234 235 236def test_get_formatted_rows(): 237 row_list = [{ 238 'FIELD1': None, 239 'FIELD2': 300, 240 'FIELD3': True 241 }, { 242 'FIELD1': 'a string', 243 'FIELD2': 300.0, 244 'FIELD4': False 245 }, { 246 'FIELD1': 'another string', 247 'FIELD3': True, 248 'FIELD4': False 249 }] 250 schema = (bigquery.SchemaField('FIELD5', 'TIMESTAMP', mode='nullable'), 251 bigquery.SchemaField('FIELD4', 'BOOLEAN', mode='nullable'), 252 bigquery.SchemaField('FIELD3.5', 'INTEGER', mode='nullable'), 253 bigquery.SchemaField('FIELD3', 'BOOLEAN', mode='nullable'), 254 bigquery.SchemaField('FIELD2', 'STRING', mode='nullable'), 255 bigquery.SchemaField('FIELD1', 'STRING', mode='nullable')) 256 target = [(None, None, None, True, 300, None), (None, False, None, None, 257 300.0, 'a string'), 258 (None, False, None, True, None, 'another string')] 259 assert utils.get_formatted_rows(row_list, schema) == target 260 261 262class Client: 263 def get_dataset(self, name): 264 if name == 'existing_dataset': 265 return Dataset(name) 266 else: 267 raise NotFound('') 268 269 def create_dataset(self, dataset): 270 return dataset 271 272 def dataset(self, name): 273 return name 274 275 def delete_dataset(self, dataset): 276 return 'deleted dataset ' + dataset.name 277 278 def get_table(self, name): 279 if name == 'existing_table': 280 return Table(name, []) 281 else: 282 raise NotFound('') 283 284 def create_table(self, table): 285 return table 286 287 def update_table(self, table, properties): 288 return Table(table.name + '_changed', table.schema) 289 290 def delete_table(self, table): 291 return 'deleted table ' + table.name 292 293 def create_rows(self, table, rows): 294 if table.name == 'bad_table': 295 return ['errors'] 296 return [] 297 298 299class Dataset: 300 def __init__(self, name): 301 self.name = name 302 303 def __eq__(self, other): 304 return self.name == other.name 305 306 def table(self, name): 307 return name 308 309 310class Table: 311 def __init__(self, name, schema): 312 self.name = name 313 self.schema = schema 314 315 def __eq__(self, other): 316 return self.name == other.name and set(self.schema) == set( 317 other.schema) 318 319 def __str__(self): 320 return 'NAME: %s\nSCHEMA: %s' % (self.name, str(self.schema)) 321 322 323@patch('infra.data_storage.bigquery.bigquery_logger_utils.bigquery.Dataset') 324@patch('infra.data_storage.bigquery.bigquery_logger_utils.bigquery.Client') 325def test_create_dataset_already_exists(mock_client, mock_dataset): 326 mock_client.return_value = Client() 327 client = utils.BigqueryLoggerClient('', '') 328 dataset = client.create_dataset('existing_dataset') 329 assert dataset == Dataset('existing_dataset') 330 331 332@patch('infra.data_storage.bigquery.bigquery_logger_utils.bigquery.Dataset') 333@patch('infra.data_storage.bigquery.bigquery_logger_utils.bigquery.Client') 334def test_create_dataset_does_not_exist(mock_client, mock_dataset): 335 mock_client.return_value = Client() 336 mock_dataset.return_value = Dataset('new_dataset') 337 client = utils.BigqueryLoggerClient('', '') 338 dataset = client.create_dataset('new_dataset') 339 assert dataset == Dataset('new_dataset') 340 341 342@patch('infra.data_storage.bigquery.bigquery_logger_utils.bigquery.Table') 343@patch( 344 'infra.data_storage.bigquery.bigquery_logger_utils.BigqueryLoggerClient.create_dataset' 345) 346@patch('infra.data_storage.bigquery.bigquery_logger_utils.bigquery.Client') 347def test_create_table_already_exists(mock_client, mock_dataset, mock_table): 348 mock_client.return_value = Client() 349 mock_dataset.return_value = Dataset('existing_dataset') 350 schema = { 351 bigquery.SchemaField('FIELD1', 'STRING', mode='nullable'), 352 bigquery.SchemaField('FIELD2', 'BOOLEAN', mode='nullable'), 353 bigquery.SchemaField('FIELD3', 'TIMESTAMP', mode='nullable') 354 } 355 mock_table.return_value = Table('existing_table', schema) 356 client = utils.BigqueryLoggerClient('', '') 357 table = client.create_table('existing_dataset', 'existing_table', schema) 358 assert table == Table('existing_table', []) 359 360 361@patch('infra.data_storage.bigquery.bigquery_logger_utils.bigquery.Table') 362@patch( 363 'infra.data_storage.bigquery.bigquery_logger_utils.BigqueryLoggerClient.create_dataset' 364) 365@patch('infra.data_storage.bigquery.bigquery_logger_utils.bigquery.Client') 366def test_create_table_does_not_exist(mock_client, mock_dataset, mock_table): 367 mock_client.return_value = Client() 368 mock_dataset.return_value = Dataset('existing_dataset') 369 schema = { 370 bigquery.SchemaField('FIELD1', 'STRING', mode='nullable'), 371 bigquery.SchemaField('FIELD2', 'BOOLEAN', mode='nullable'), 372 bigquery.SchemaField('FIELD3', 'TIMESTAMP', mode='nullable') 373 } 374 mock_table.return_value = Table('new_table', schema) 375 client = utils.BigqueryLoggerClient('', '') 376 table = client.create_table('existing_dataset', 'new_table', schema) 377 assert table == Table('new_table', schema) 378 379 380@patch( 381 'infra.data_storage.bigquery.bigquery_logger_utils.BigqueryLoggerClient.create_table' 382) 383@patch('infra.data_storage.bigquery.bigquery_logger_utils.bigquery.Client') 384def test_update_table_schema(mock_client, mock_table): 385 mock_client.return_value = Client() 386 schema = { 387 bigquery.SchemaField('FIELD1', 'STRING', mode='nullable'), 388 bigquery.SchemaField('FIELD2', 'BOOLEAN', mode='nullable'), 389 bigquery.SchemaField('FIELD3', 'TIMESTAMP', mode='nullable') 390 } 391 mock_table.return_value = Table('existing_table', schema) 392 new_schema = { 393 bigquery.SchemaField('FIELD1', 'INTEGER', mode='nullable'), 394 bigquery.SchemaField('FIELD2', 'BOOLEAN', mode='nullable'), 395 bigquery.SchemaField('FIELD5', 'FLOAT', mode='nullable') 396 } 397 client = utils.BigqueryLoggerClient('', '') 398 table, changed_fields = client.update_table_schema( 399 'existing_dataset', 'existing_table', new_schema) 400 print(table) 401 assert table == Table( 402 'existing_table_changed', { 403 bigquery.SchemaField('FIELD1_INTEGER', 'INTEGER', mode='nullable'), 404 bigquery.SchemaField('FIELD1', 'STRING', mode='nullable'), 405 bigquery.SchemaField('FIELD2', 'BOOLEAN', mode='nullable'), 406 bigquery.SchemaField('FIELD3', 'TIMESTAMP', mode='nullable'), 407 bigquery.SchemaField('FIELD5', 'FLOAT', mode='nullable') 408 }) 409 assert set(changed_fields.items()) == set({ 410 'FIELD1': 'FIELD1_INTEGER' 411 }.items()) 412 413 414@patch( 415 'infra.data_storage.bigquery.bigquery_logger_utils.BigqueryLoggerClient.create_table' 416) 417@patch('infra.data_storage.bigquery.bigquery_logger_utils.bigquery.Client') 418def test_update_table_schema_no_change(mock_client, mock_table): 419 mock_client.return_value = Client() 420 schema = { 421 bigquery.SchemaField('FIELD1', 'STRING', mode='nullable'), 422 bigquery.SchemaField('FIELD2', 'BOOLEAN', mode='nullable'), 423 bigquery.SchemaField('FIELD3', 'TIMESTAMP', mode='nullable') 424 } 425 mock_table.return_value = Table('existing_table', schema) 426 new_schema = { 427 bigquery.SchemaField('FIELD1', 'STRING', mode='nullable'), 428 bigquery.SchemaField('FIELD2', 'BOOLEAN', mode='nullable') 429 } 430 client = utils.BigqueryLoggerClient('', '') 431 table, changed_fields = client.update_table_schema( 432 'existing_dataset', 'existing_table', new_schema) 433 print(table) 434 assert table == Table( 435 'existing_table', { 436 bigquery.SchemaField('FIELD1', 'STRING', mode='nullable'), 437 bigquery.SchemaField('FIELD2', 'BOOLEAN', mode='nullable'), 438 bigquery.SchemaField('FIELD3', 'TIMESTAMP', mode='nullable') 439 }) 440 assert set(changed_fields.items()) == set({}.items()) 441 442 443@patch('infra.data_storage.bigquery.test_bigquery_utils.Client.delete_dataset') 444@patch('infra.data_storage.bigquery.bigquery_logger_utils.bigquery.Dataset') 445@patch('infra.data_storage.bigquery.bigquery_logger_utils.bigquery.Client') 446def test_delete_dataset(mock_client, mock_dataset, mock_delete_dataset): 447 mock_client.return_value = Client() 448 ds = Dataset('existing_dataset') 449 mock_dataset.return_value = ds 450 client = utils.BigqueryLoggerClient('', '') 451 client.delete('existing_dataset') 452 mock_delete_dataset.assert_called_with(ds) 453 454 455@patch('infra.data_storage.bigquery.test_bigquery_utils.Client.delete_table') 456@patch('infra.data_storage.bigquery.bigquery_logger_utils.bigquery.Table') 457@patch('infra.data_storage.bigquery.bigquery_logger_utils.bigquery.Dataset') 458@patch('infra.data_storage.bigquery.bigquery_logger_utils.bigquery.Client') 459def test_delete_dataset(mock_client, mock_dataset, mock_table, 460 mock_delete_table): 461 mock_client.return_value = Client() 462 schema = { 463 bigquery.SchemaField('FIELD1', 'STRING', mode='nullable'), 464 bigquery.SchemaField('FIELD2', 'BOOLEAN', mode='nullable'), 465 bigquery.SchemaField('FIELD3', 'TIMESTAMP', mode='nullable') 466 } 467 tb = Table('existing_table', schema) 468 mock_table.return_value = tb 469 client = utils.BigqueryLoggerClient('', '') 470 client.delete('existing_dataset', 'existing_table') 471 mock_delete_table.assert_called_with(tb) 472 473 474@patch('infra.data_storage.bigquery.test_bigquery_utils.Client.create_rows') 475@patch( 476 'infra.data_storage.bigquery.test_bigquery_utils.utils.get_schema_from_rows_list' 477) 478@patch( 479 'infra.data_storage.bigquery.test_bigquery_utils.utils.change_field_name') 480@patch( 481 'infra.data_storage.bigquery.test_bigquery_utils.utils.BigqueryLoggerClient.update_table_schema' 482) 483@patch('infra.data_storage.bigquery.bigquery_logger_utils.bigquery.Client') 484def test_flush(mock_client, mock_update_table_schema, mock_change_field_name, 485 mock_get_schema, mock_create_rows): 486 mock_create_rows.return_value = [] 487 mock_client.return_value = Client() 488 schema = { 489 bigquery.SchemaField('FIELD1', 'STRING', mode='nullable'), 490 bigquery.SchemaField('FIELD2', 'BOOLEAN', mode='nullable'), 491 bigquery.SchemaField('FIELD3', 'TIMESTAMP', mode='nullable') 492 } 493 tb = Table('existing_table', schema) 494 mock_update_table_schema.return_value = tb, {'FIELD1': 'NEW_NAME1'} 495 row_list = [{ 496 'FIELD1': 1, 497 'FIELD2': False, 498 'FIELD3': 'result' 499 }, { 500 'FIELD1': 2, 501 'FIELD2': True 502 }, { 503 'FIELD1': 3, 504 'FIELD3': 'result' 505 }] 506 client = utils.BigqueryLoggerClient('', '') 507 errors = client.flush(row_list, 'existing_dataset', 'existing_table') 508 mock_change_field_name.assert_called_with('FIELD1', 'NEW_NAME1', row_list) 509 mock_create_rows.assert_called_once() 510 assert errors == [] 511