• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #!/usr/bin/env python2.7
2 
3 from __future__ import print_function
4 import argparse
5 import json
6 import uuid
7 import httplib2
8 
9 from apiclient import discovery
10 from apiclient.errors import HttpError
11 from oauth2client.client import GoogleCredentials
12 
13 # 30 days in milliseconds
14 _EXPIRATION_MS = 30 * 24 * 60 * 60 * 1000
15 NUM_RETRIES = 3
16 
17 
18 def create_big_query():
19     """Authenticates with cloud platform and gets a BiqQuery service object
20   """
21     creds = GoogleCredentials.get_application_default()
22     return discovery.build(
23         'bigquery', 'v2', credentials=creds, cache_discovery=False)
24 
25 
26 def create_dataset(biq_query, project_id, dataset_id):
27     is_success = True
28     body = {
29         'datasetReference': {
30             'projectId': project_id,
31             'datasetId': dataset_id
32         }
33     }
34 
35     try:
36         dataset_req = biq_query.datasets().insert(
37             projectId=project_id, body=body)
38         dataset_req.execute(num_retries=NUM_RETRIES)
39     except HttpError as http_error:
40         if http_error.resp.status == 409:
41             print('Warning: The dataset %s already exists' % dataset_id)
42         else:
43             # Note: For more debugging info, print "http_error.content"
44             print('Error in creating dataset: %s. Err: %s' % (dataset_id,
45                                                               http_error))
46             is_success = False
47     return is_success
48 
49 
50 def create_table(big_query, project_id, dataset_id, table_id, table_schema,
51                  description):
52     fields = [{
53         'name': field_name,
54         'type': field_type,
55         'description': field_description
56     } for (field_name, field_type, field_description) in table_schema]
57     return create_table2(big_query, project_id, dataset_id, table_id, fields,
58                          description)
59 
60 
61 def create_partitioned_table(big_query,
62                              project_id,
63                              dataset_id,
64                              table_id,
65                              table_schema,
66                              description,
67                              partition_type='DAY',
68                              expiration_ms=_EXPIRATION_MS):
69     """Creates a partitioned table. By default, a date-paritioned table is created with
70   each partition lasting 30 days after it was last modified.
71   """
72     fields = [{
73         'name': field_name,
74         'type': field_type,
75         'description': field_description
76     } for (field_name, field_type, field_description) in table_schema]
77     return create_table2(big_query, project_id, dataset_id, table_id, fields,
78                          description, partition_type, expiration_ms)
79 
80 
81 def create_table2(big_query,
82                   project_id,
83                   dataset_id,
84                   table_id,
85                   fields_schema,
86                   description,
87                   partition_type=None,
88                   expiration_ms=None):
89     is_success = True
90 
91     body = {
92         'description': description,
93         'schema': {
94             'fields': fields_schema
95         },
96         'tableReference': {
97             'datasetId': dataset_id,
98             'projectId': project_id,
99             'tableId': table_id
100         }
101     }
102 
103     if partition_type and expiration_ms:
104         body["timePartitioning"] = {
105             "type": partition_type,
106             "expirationMs": expiration_ms
107         }
108 
109     try:
110         table_req = big_query.tables().insert(
111             projectId=project_id, datasetId=dataset_id, body=body)
112         res = table_req.execute(num_retries=NUM_RETRIES)
113         print('Successfully created %s "%s"' % (res['kind'], res['id']))
114     except HttpError as http_error:
115         if http_error.resp.status == 409:
116             print('Warning: Table %s already exists' % table_id)
117         else:
118             print('Error in creating table: %s. Err: %s' % (table_id,
119                                                             http_error))
120             is_success = False
121     return is_success
122 
123 
124 def patch_table(big_query, project_id, dataset_id, table_id, fields_schema):
125     is_success = True
126 
127     body = {
128         'schema': {
129             'fields': fields_schema
130         },
131         'tableReference': {
132             'datasetId': dataset_id,
133             'projectId': project_id,
134             'tableId': table_id
135         }
136     }
137 
138     try:
139         table_req = big_query.tables().patch(
140             projectId=project_id,
141             datasetId=dataset_id,
142             tableId=table_id,
143             body=body)
144         res = table_req.execute(num_retries=NUM_RETRIES)
145         print('Successfully patched %s "%s"' % (res['kind'], res['id']))
146     except HttpError as http_error:
147         print('Error in creating table: %s. Err: %s' % (table_id, http_error))
148         is_success = False
149     return is_success
150 
151 
152 def insert_rows(big_query, project_id, dataset_id, table_id, rows_list):
153     is_success = True
154     body = {'rows': rows_list}
155     try:
156         insert_req = big_query.tabledata().insertAll(
157             projectId=project_id,
158             datasetId=dataset_id,
159             tableId=table_id,
160             body=body)
161         res = insert_req.execute(num_retries=NUM_RETRIES)
162         if res.get('insertErrors', None):
163             print('Error inserting rows! Response: %s' % res)
164             is_success = False
165     except HttpError as http_error:
166         print('Error inserting rows to the table %s' % table_id)
167         is_success = False
168 
169     return is_success
170 
171 
172 def sync_query_job(big_query, project_id, query, timeout=5000):
173     query_data = {'query': query, 'timeoutMs': timeout}
174     query_job = None
175     try:
176         query_job = big_query.jobs().query(
177             projectId=project_id,
178             body=query_data).execute(num_retries=NUM_RETRIES)
179     except HttpError as http_error:
180         print('Query execute job failed with error: %s' % http_error)
181         print(http_error.content)
182     return query_job
183 
184 
185     # List of (column name, column type, description) tuples
186 def make_row(unique_row_id, row_values_dict):
187     """row_values_dict is a dictionary of column name and column value.
188   """
189     return {'insertId': unique_row_id, 'json': row_values_dict}
190