• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python2.7
2
3from __future__ import print_function
4import argparse
5import json
6import uuid
7import httplib2
8
9from apiclient import discovery
10from apiclient.errors import HttpError
11from oauth2client.client import GoogleCredentials
12
13# 30 days in milliseconds
14_EXPIRATION_MS = 30 * 24 * 60 * 60 * 1000
15NUM_RETRIES = 3
16
17
18def create_big_query():
19    """Authenticates with cloud platform and gets a BiqQuery service object
20  """
21    creds = GoogleCredentials.get_application_default()
22    return discovery.build(
23        'bigquery', 'v2', credentials=creds, cache_discovery=False)
24
25
26def create_dataset(biq_query, project_id, dataset_id):
27    is_success = True
28    body = {
29        'datasetReference': {
30            'projectId': project_id,
31            'datasetId': dataset_id
32        }
33    }
34
35    try:
36        dataset_req = biq_query.datasets().insert(
37            projectId=project_id, body=body)
38        dataset_req.execute(num_retries=NUM_RETRIES)
39    except HttpError as http_error:
40        if http_error.resp.status == 409:
41            print('Warning: The dataset %s already exists' % dataset_id)
42        else:
43            # Note: For more debugging info, print "http_error.content"
44            print('Error in creating dataset: %s. Err: %s' % (dataset_id,
45                                                              http_error))
46            is_success = False
47    return is_success
48
49
50def create_table(big_query, project_id, dataset_id, table_id, table_schema,
51                 description):
52    fields = [{
53        'name': field_name,
54        'type': field_type,
55        'description': field_description
56    } for (field_name, field_type, field_description) in table_schema]
57    return create_table2(big_query, project_id, dataset_id, table_id, fields,
58                         description)
59
60
61def create_partitioned_table(big_query,
62                             project_id,
63                             dataset_id,
64                             table_id,
65                             table_schema,
66                             description,
67                             partition_type='DAY',
68                             expiration_ms=_EXPIRATION_MS):
69    """Creates a partitioned table. By default, a date-paritioned table is created with
70  each partition lasting 30 days after it was last modified.
71  """
72    fields = [{
73        'name': field_name,
74        'type': field_type,
75        'description': field_description
76    } for (field_name, field_type, field_description) in table_schema]
77    return create_table2(big_query, project_id, dataset_id, table_id, fields,
78                         description, partition_type, expiration_ms)
79
80
81def create_table2(big_query,
82                  project_id,
83                  dataset_id,
84                  table_id,
85                  fields_schema,
86                  description,
87                  partition_type=None,
88                  expiration_ms=None):
89    is_success = True
90
91    body = {
92        'description': description,
93        'schema': {
94            'fields': fields_schema
95        },
96        'tableReference': {
97            'datasetId': dataset_id,
98            'projectId': project_id,
99            'tableId': table_id
100        }
101    }
102
103    if partition_type and expiration_ms:
104        body["timePartitioning"] = {
105            "type": partition_type,
106            "expirationMs": expiration_ms
107        }
108
109    try:
110        table_req = big_query.tables().insert(
111            projectId=project_id, datasetId=dataset_id, body=body)
112        res = table_req.execute(num_retries=NUM_RETRIES)
113        print('Successfully created %s "%s"' % (res['kind'], res['id']))
114    except HttpError as http_error:
115        if http_error.resp.status == 409:
116            print('Warning: Table %s already exists' % table_id)
117        else:
118            print('Error in creating table: %s. Err: %s' % (table_id,
119                                                            http_error))
120            is_success = False
121    return is_success
122
123
124def patch_table(big_query, project_id, dataset_id, table_id, fields_schema):
125    is_success = True
126
127    body = {
128        'schema': {
129            'fields': fields_schema
130        },
131        'tableReference': {
132            'datasetId': dataset_id,
133            'projectId': project_id,
134            'tableId': table_id
135        }
136    }
137
138    try:
139        table_req = big_query.tables().patch(
140            projectId=project_id,
141            datasetId=dataset_id,
142            tableId=table_id,
143            body=body)
144        res = table_req.execute(num_retries=NUM_RETRIES)
145        print('Successfully patched %s "%s"' % (res['kind'], res['id']))
146    except HttpError as http_error:
147        print('Error in creating table: %s. Err: %s' % (table_id, http_error))
148        is_success = False
149    return is_success
150
151
152def insert_rows(big_query, project_id, dataset_id, table_id, rows_list):
153    is_success = True
154    body = {'rows': rows_list}
155    try:
156        insert_req = big_query.tabledata().insertAll(
157            projectId=project_id,
158            datasetId=dataset_id,
159            tableId=table_id,
160            body=body)
161        res = insert_req.execute(num_retries=NUM_RETRIES)
162        if res.get('insertErrors', None):
163            print('Error inserting rows! Response: %s' % res)
164            is_success = False
165    except HttpError as http_error:
166        print('Error inserting rows to the table %s' % table_id)
167        is_success = False
168
169    return is_success
170
171
172def sync_query_job(big_query, project_id, query, timeout=5000):
173    query_data = {'query': query, 'timeoutMs': timeout}
174    query_job = None
175    try:
176        query_job = big_query.jobs().query(
177            projectId=project_id,
178            body=query_data).execute(num_retries=NUM_RETRIES)
179    except HttpError as http_error:
180        print('Query execute job failed with error: %s' % http_error)
181        print(http_error.content)
182    return query_job
183
184
185    # List of (column name, column type, description) tuples
186def make_row(unique_row_id, row_values_dict):
187    """row_values_dict is a dictionary of column name and column value.
188  """
189    return {'insertId': unique_row_id, 'json': row_values_dict}
190