1#!/usr/bin/env python2.7 2# Copyright 2015 gRPC authors. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16from __future__ import print_function 17 18import argparse 19import json 20import uuid 21import httplib2 22 23from apiclient import discovery 24from apiclient.errors import HttpError 25from oauth2client.client import GoogleCredentials 26 27# 30 days in milliseconds 28_EXPIRATION_MS = 30 * 24 * 60 * 60 * 1000 29NUM_RETRIES = 3 30 31 32def create_big_query(): 33 """Authenticates with cloud platform and gets a BiqQuery service object 34 """ 35 creds = GoogleCredentials.get_application_default() 36 return discovery.build('bigquery', 37 'v2', 38 credentials=creds, 39 cache_discovery=False) 40 41 42def create_dataset(biq_query, project_id, dataset_id): 43 is_success = True 44 body = { 45 'datasetReference': { 46 'projectId': project_id, 47 'datasetId': dataset_id 48 } 49 } 50 51 try: 52 dataset_req = biq_query.datasets().insert(projectId=project_id, 53 body=body) 54 dataset_req.execute(num_retries=NUM_RETRIES) 55 except HttpError as http_error: 56 if http_error.resp.status == 409: 57 print('Warning: The dataset %s already exists' % dataset_id) 58 else: 59 # Note: For more debugging info, print "http_error.content" 60 print('Error in creating dataset: %s. Err: %s' % 61 (dataset_id, http_error)) 62 is_success = False 63 return is_success 64 65 66def create_table(big_query, project_id, dataset_id, table_id, table_schema, 67 description): 68 fields = [{ 69 'name': field_name, 70 'type': field_type, 71 'description': field_description 72 } for (field_name, field_type, field_description) in table_schema] 73 return create_table2(big_query, project_id, dataset_id, table_id, fields, 74 description) 75 76 77def create_partitioned_table(big_query, 78 project_id, 79 dataset_id, 80 table_id, 81 table_schema, 82 description, 83 partition_type='DAY', 84 expiration_ms=_EXPIRATION_MS): 85 """Creates a partitioned table. By default, a date-paritioned table is created with 86 each partition lasting 30 days after it was last modified. 87 """ 88 fields = [{ 89 'name': field_name, 90 'type': field_type, 91 'description': field_description 92 } for (field_name, field_type, field_description) in table_schema] 93 return create_table2(big_query, project_id, dataset_id, table_id, fields, 94 description, partition_type, expiration_ms) 95 96 97def create_table2(big_query, 98 project_id, 99 dataset_id, 100 table_id, 101 fields_schema, 102 description, 103 partition_type=None, 104 expiration_ms=None): 105 is_success = True 106 107 body = { 108 'description': description, 109 'schema': { 110 'fields': fields_schema 111 }, 112 'tableReference': { 113 'datasetId': dataset_id, 114 'projectId': project_id, 115 'tableId': table_id 116 } 117 } 118 119 if partition_type and expiration_ms: 120 body["timePartitioning"] = { 121 "type": partition_type, 122 "expirationMs": expiration_ms 123 } 124 125 try: 126 table_req = big_query.tables().insert(projectId=project_id, 127 datasetId=dataset_id, 128 body=body) 129 res = table_req.execute(num_retries=NUM_RETRIES) 130 print('Successfully created %s "%s"' % (res['kind'], res['id'])) 131 except HttpError as http_error: 132 if http_error.resp.status == 409: 133 print('Warning: Table %s already exists' % table_id) 134 else: 135 print('Error in creating table: %s. Err: %s' % 136 (table_id, http_error)) 137 is_success = False 138 return is_success 139 140 141def patch_table(big_query, project_id, dataset_id, table_id, fields_schema): 142 is_success = True 143 144 body = { 145 'schema': { 146 'fields': fields_schema 147 }, 148 'tableReference': { 149 'datasetId': dataset_id, 150 'projectId': project_id, 151 'tableId': table_id 152 } 153 } 154 155 try: 156 table_req = big_query.tables().patch(projectId=project_id, 157 datasetId=dataset_id, 158 tableId=table_id, 159 body=body) 160 res = table_req.execute(num_retries=NUM_RETRIES) 161 print('Successfully patched %s "%s"' % (res['kind'], res['id'])) 162 except HttpError as http_error: 163 print('Error in creating table: %s. Err: %s' % (table_id, http_error)) 164 is_success = False 165 return is_success 166 167 168def insert_rows(big_query, project_id, dataset_id, table_id, rows_list): 169 is_success = True 170 body = {'rows': rows_list} 171 try: 172 insert_req = big_query.tabledata().insertAll(projectId=project_id, 173 datasetId=dataset_id, 174 tableId=table_id, 175 body=body) 176 res = insert_req.execute(num_retries=NUM_RETRIES) 177 if res.get('insertErrors', None): 178 print('Error inserting rows! Response: %s' % res) 179 is_success = False 180 except HttpError as http_error: 181 print('Error inserting rows to the table %s' % table_id) 182 print('Error message: %s' % http_error) 183 is_success = False 184 185 return is_success 186 187 188def sync_query_job(big_query, project_id, query, timeout=5000): 189 query_data = {'query': query, 'timeoutMs': timeout} 190 query_job = None 191 try: 192 query_job = big_query.jobs().query( 193 projectId=project_id, 194 body=query_data).execute(num_retries=NUM_RETRIES) 195 except HttpError as http_error: 196 print('Query execute job failed with error: %s' % http_error) 197 print(http_error.content) 198 return query_job 199 200 201 # List of (column name, column type, description) tuples 202def make_row(unique_row_id, row_values_dict): 203 """row_values_dict is a dictionary of column name and column value. 204 """ 205 return {'insertId': unique_row_id, 'json': row_values_dict} 206