1#!/usr/bin/env python2.7 2# Copyright 2015 gRPC authors. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16import argparse 17import json 18import uuid 19import httplib2 20 21from apiclient import discovery 22from apiclient.errors import HttpError 23from oauth2client.client import GoogleCredentials 24 25# 30 days in milliseconds 26_EXPIRATION_MS = 30 * 24 * 60 * 60 * 1000 27NUM_RETRIES = 3 28 29 30def create_big_query(): 31 """Authenticates with cloud platform and gets a BiqQuery service object 32 """ 33 creds = GoogleCredentials.get_application_default() 34 return discovery.build( 35 'bigquery', 'v2', credentials=creds, cache_discovery=False) 36 37 38def create_dataset(biq_query, project_id, dataset_id): 39 is_success = True 40 body = { 41 'datasetReference': { 42 'projectId': project_id, 43 'datasetId': dataset_id 44 } 45 } 46 47 try: 48 dataset_req = biq_query.datasets().insert( 49 projectId=project_id, body=body) 50 dataset_req.execute(num_retries=NUM_RETRIES) 51 except HttpError as http_error: 52 if http_error.resp.status == 409: 53 print 'Warning: The dataset %s already exists' % dataset_id 54 else: 55 # Note: For more debugging info, print "http_error.content" 56 print 'Error in creating dataset: %s. Err: %s' % (dataset_id, 57 http_error) 58 is_success = False 59 return is_success 60 61 62def create_table(big_query, project_id, dataset_id, table_id, table_schema, 63 description): 64 fields = [{ 65 'name': field_name, 66 'type': field_type, 67 'description': field_description 68 } for (field_name, field_type, field_description) in table_schema] 69 return create_table2(big_query, project_id, dataset_id, table_id, fields, 70 description) 71 72 73def create_partitioned_table(big_query, 74 project_id, 75 dataset_id, 76 table_id, 77 table_schema, 78 description, 79 partition_type='DAY', 80 expiration_ms=_EXPIRATION_MS): 81 """Creates a partitioned table. By default, a date-paritioned table is created with 82 each partition lasting 30 days after it was last modified. 83 """ 84 fields = [{ 85 'name': field_name, 86 'type': field_type, 87 'description': field_description 88 } for (field_name, field_type, field_description) in table_schema] 89 return create_table2(big_query, project_id, dataset_id, table_id, fields, 90 description, partition_type, expiration_ms) 91 92 93def create_table2(big_query, 94 project_id, 95 dataset_id, 96 table_id, 97 fields_schema, 98 description, 99 partition_type=None, 100 expiration_ms=None): 101 is_success = True 102 103 body = { 104 'description': description, 105 'schema': { 106 'fields': fields_schema 107 }, 108 'tableReference': { 109 'datasetId': dataset_id, 110 'projectId': project_id, 111 'tableId': table_id 112 } 113 } 114 115 if partition_type and expiration_ms: 116 body["timePartitioning"] = { 117 "type": partition_type, 118 "expirationMs": expiration_ms 119 } 120 121 try: 122 table_req = big_query.tables().insert( 123 projectId=project_id, datasetId=dataset_id, body=body) 124 res = table_req.execute(num_retries=NUM_RETRIES) 125 print 'Successfully created %s "%s"' % (res['kind'], res['id']) 126 except HttpError as http_error: 127 if http_error.resp.status == 409: 128 print 'Warning: Table %s already exists' % table_id 129 else: 130 print 'Error in creating table: %s. Err: %s' % (table_id, 131 http_error) 132 is_success = False 133 return is_success 134 135 136def patch_table(big_query, project_id, dataset_id, table_id, fields_schema): 137 is_success = True 138 139 body = { 140 'schema': { 141 'fields': fields_schema 142 }, 143 'tableReference': { 144 'datasetId': dataset_id, 145 'projectId': project_id, 146 'tableId': table_id 147 } 148 } 149 150 try: 151 table_req = big_query.tables().patch( 152 projectId=project_id, 153 datasetId=dataset_id, 154 tableId=table_id, 155 body=body) 156 res = table_req.execute(num_retries=NUM_RETRIES) 157 print 'Successfully patched %s "%s"' % (res['kind'], res['id']) 158 except HttpError as http_error: 159 print 'Error in creating table: %s. Err: %s' % (table_id, http_error) 160 is_success = False 161 return is_success 162 163 164def insert_rows(big_query, project_id, dataset_id, table_id, rows_list): 165 is_success = True 166 body = {'rows': rows_list} 167 try: 168 insert_req = big_query.tabledata().insertAll( 169 projectId=project_id, 170 datasetId=dataset_id, 171 tableId=table_id, 172 body=body) 173 res = insert_req.execute(num_retries=NUM_RETRIES) 174 if res.get('insertErrors', None): 175 print 'Error inserting rows! Response: %s' % res 176 is_success = False 177 except HttpError as http_error: 178 print 'Error inserting rows to the table %s' % table_id 179 is_success = False 180 181 return is_success 182 183 184def sync_query_job(big_query, project_id, query, timeout=5000): 185 query_data = {'query': query, 'timeoutMs': timeout} 186 query_job = None 187 try: 188 query_job = big_query.jobs().query( 189 projectId=project_id, 190 body=query_data).execute(num_retries=NUM_RETRIES) 191 except HttpError as http_error: 192 print 'Query execute job failed with error: %s' % http_error 193 print http_error.content 194 return query_job 195 196 197 # List of (column name, column type, description) tuples 198def make_row(unique_row_id, row_values_dict): 199 """row_values_dict is a dictionary of column name and column value. 200 """ 201 return {'insertId': unique_row_id, 'json': row_values_dict} 202