1#!/usr/bin/env python2.7 2 3from __future__ import print_function 4import argparse 5import json 6import uuid 7import httplib2 8 9from apiclient import discovery 10from apiclient.errors import HttpError 11from oauth2client.client import GoogleCredentials 12 13# 30 days in milliseconds 14_EXPIRATION_MS = 30 * 24 * 60 * 60 * 1000 15NUM_RETRIES = 3 16 17 18def create_big_query(): 19 """Authenticates with cloud platform and gets a BiqQuery service object 20 """ 21 creds = GoogleCredentials.get_application_default() 22 return discovery.build( 23 'bigquery', 'v2', credentials=creds, cache_discovery=False) 24 25 26def create_dataset(biq_query, project_id, dataset_id): 27 is_success = True 28 body = { 29 'datasetReference': { 30 'projectId': project_id, 31 'datasetId': dataset_id 32 } 33 } 34 35 try: 36 dataset_req = biq_query.datasets().insert( 37 projectId=project_id, body=body) 38 dataset_req.execute(num_retries=NUM_RETRIES) 39 except HttpError as http_error: 40 if http_error.resp.status == 409: 41 print('Warning: The dataset %s already exists' % dataset_id) 42 else: 43 # Note: For more debugging info, print "http_error.content" 44 print('Error in creating dataset: %s. Err: %s' % (dataset_id, 45 http_error)) 46 is_success = False 47 return is_success 48 49 50def create_table(big_query, project_id, dataset_id, table_id, table_schema, 51 description): 52 fields = [{ 53 'name': field_name, 54 'type': field_type, 55 'description': field_description 56 } for (field_name, field_type, field_description) in table_schema] 57 return create_table2(big_query, project_id, dataset_id, table_id, fields, 58 description) 59 60 61def create_partitioned_table(big_query, 62 project_id, 63 dataset_id, 64 table_id, 65 table_schema, 66 description, 67 partition_type='DAY', 68 expiration_ms=_EXPIRATION_MS): 69 """Creates a partitioned table. By default, a date-paritioned table is created with 70 each partition lasting 30 days after it was last modified. 71 """ 72 fields = [{ 73 'name': field_name, 74 'type': field_type, 75 'description': field_description 76 } for (field_name, field_type, field_description) in table_schema] 77 return create_table2(big_query, project_id, dataset_id, table_id, fields, 78 description, partition_type, expiration_ms) 79 80 81def create_table2(big_query, 82 project_id, 83 dataset_id, 84 table_id, 85 fields_schema, 86 description, 87 partition_type=None, 88 expiration_ms=None): 89 is_success = True 90 91 body = { 92 'description': description, 93 'schema': { 94 'fields': fields_schema 95 }, 96 'tableReference': { 97 'datasetId': dataset_id, 98 'projectId': project_id, 99 'tableId': table_id 100 } 101 } 102 103 if partition_type and expiration_ms: 104 body["timePartitioning"] = { 105 "type": partition_type, 106 "expirationMs": expiration_ms 107 } 108 109 try: 110 table_req = big_query.tables().insert( 111 projectId=project_id, datasetId=dataset_id, body=body) 112 res = table_req.execute(num_retries=NUM_RETRIES) 113 print('Successfully created %s "%s"' % (res['kind'], res['id'])) 114 except HttpError as http_error: 115 if http_error.resp.status == 409: 116 print('Warning: Table %s already exists' % table_id) 117 else: 118 print('Error in creating table: %s. Err: %s' % (table_id, 119 http_error)) 120 is_success = False 121 return is_success 122 123 124def patch_table(big_query, project_id, dataset_id, table_id, fields_schema): 125 is_success = True 126 127 body = { 128 'schema': { 129 'fields': fields_schema 130 }, 131 'tableReference': { 132 'datasetId': dataset_id, 133 'projectId': project_id, 134 'tableId': table_id 135 } 136 } 137 138 try: 139 table_req = big_query.tables().patch( 140 projectId=project_id, 141 datasetId=dataset_id, 142 tableId=table_id, 143 body=body) 144 res = table_req.execute(num_retries=NUM_RETRIES) 145 print('Successfully patched %s "%s"' % (res['kind'], res['id'])) 146 except HttpError as http_error: 147 print('Error in creating table: %s. Err: %s' % (table_id, http_error)) 148 is_success = False 149 return is_success 150 151 152def insert_rows(big_query, project_id, dataset_id, table_id, rows_list): 153 is_success = True 154 body = {'rows': rows_list} 155 try: 156 insert_req = big_query.tabledata().insertAll( 157 projectId=project_id, 158 datasetId=dataset_id, 159 tableId=table_id, 160 body=body) 161 res = insert_req.execute(num_retries=NUM_RETRIES) 162 if res.get('insertErrors', None): 163 print('Error inserting rows! Response: %s' % res) 164 is_success = False 165 except HttpError as http_error: 166 print('Error inserting rows to the table %s' % table_id) 167 is_success = False 168 169 return is_success 170 171 172def sync_query_job(big_query, project_id, query, timeout=5000): 173 query_data = {'query': query, 'timeoutMs': timeout} 174 query_job = None 175 try: 176 query_job = big_query.jobs().query( 177 projectId=project_id, 178 body=query_data).execute(num_retries=NUM_RETRIES) 179 except HttpError as http_error: 180 print('Query execute job failed with error: %s' % http_error) 181 print(http_error.content) 182 return query_job 183 184 185 # List of (column name, column type, description) tuples 186def make_row(unique_row_id, row_values_dict): 187 """row_values_dict is a dictionary of column name and column value. 188 """ 189 return {'insertId': unique_row_id, 'json': row_values_dict} 190