• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python2.7
2# Copyright 2015 gRPC authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16from __future__ import print_function
17
18import argparse
19import json
20import uuid
21import httplib2
22
23from apiclient import discovery
24from apiclient.errors import HttpError
25from oauth2client.client import GoogleCredentials
26
27# 30 days in milliseconds
28_EXPIRATION_MS = 30 * 24 * 60 * 60 * 1000
29NUM_RETRIES = 3
30
31
32def create_big_query():
33    """Authenticates with cloud platform and gets a BiqQuery service object
34  """
35    creds = GoogleCredentials.get_application_default()
36    return discovery.build('bigquery',
37                           'v2',
38                           credentials=creds,
39                           cache_discovery=False)
40
41
42def create_dataset(biq_query, project_id, dataset_id):
43    is_success = True
44    body = {
45        'datasetReference': {
46            'projectId': project_id,
47            'datasetId': dataset_id
48        }
49    }
50
51    try:
52        dataset_req = biq_query.datasets().insert(projectId=project_id,
53                                                  body=body)
54        dataset_req.execute(num_retries=NUM_RETRIES)
55    except HttpError as http_error:
56        if http_error.resp.status == 409:
57            print('Warning: The dataset %s already exists' % dataset_id)
58        else:
59            # Note: For more debugging info, print "http_error.content"
60            print('Error in creating dataset: %s. Err: %s' %
61                  (dataset_id, http_error))
62            is_success = False
63    return is_success
64
65
66def create_table(big_query, project_id, dataset_id, table_id, table_schema,
67                 description):
68    fields = [{
69        'name': field_name,
70        'type': field_type,
71        'description': field_description
72    } for (field_name, field_type, field_description) in table_schema]
73    return create_table2(big_query, project_id, dataset_id, table_id, fields,
74                         description)
75
76
77def create_partitioned_table(big_query,
78                             project_id,
79                             dataset_id,
80                             table_id,
81                             table_schema,
82                             description,
83                             partition_type='DAY',
84                             expiration_ms=_EXPIRATION_MS):
85    """Creates a partitioned table. By default, a date-paritioned table is created with
86  each partition lasting 30 days after it was last modified.
87  """
88    fields = [{
89        'name': field_name,
90        'type': field_type,
91        'description': field_description
92    } for (field_name, field_type, field_description) in table_schema]
93    return create_table2(big_query, project_id, dataset_id, table_id, fields,
94                         description, partition_type, expiration_ms)
95
96
97def create_table2(big_query,
98                  project_id,
99                  dataset_id,
100                  table_id,
101                  fields_schema,
102                  description,
103                  partition_type=None,
104                  expiration_ms=None):
105    is_success = True
106
107    body = {
108        'description': description,
109        'schema': {
110            'fields': fields_schema
111        },
112        'tableReference': {
113            'datasetId': dataset_id,
114            'projectId': project_id,
115            'tableId': table_id
116        }
117    }
118
119    if partition_type and expiration_ms:
120        body["timePartitioning"] = {
121            "type": partition_type,
122            "expirationMs": expiration_ms
123        }
124
125    try:
126        table_req = big_query.tables().insert(projectId=project_id,
127                                              datasetId=dataset_id,
128                                              body=body)
129        res = table_req.execute(num_retries=NUM_RETRIES)
130        print('Successfully created %s "%s"' % (res['kind'], res['id']))
131    except HttpError as http_error:
132        if http_error.resp.status == 409:
133            print('Warning: Table %s already exists' % table_id)
134        else:
135            print('Error in creating table: %s. Err: %s' %
136                  (table_id, http_error))
137            is_success = False
138    return is_success
139
140
141def patch_table(big_query, project_id, dataset_id, table_id, fields_schema):
142    is_success = True
143
144    body = {
145        'schema': {
146            'fields': fields_schema
147        },
148        'tableReference': {
149            'datasetId': dataset_id,
150            'projectId': project_id,
151            'tableId': table_id
152        }
153    }
154
155    try:
156        table_req = big_query.tables().patch(projectId=project_id,
157                                             datasetId=dataset_id,
158                                             tableId=table_id,
159                                             body=body)
160        res = table_req.execute(num_retries=NUM_RETRIES)
161        print('Successfully patched %s "%s"' % (res['kind'], res['id']))
162    except HttpError as http_error:
163        print('Error in creating table: %s. Err: %s' % (table_id, http_error))
164        is_success = False
165    return is_success
166
167
168def insert_rows(big_query, project_id, dataset_id, table_id, rows_list):
169    is_success = True
170    body = {'rows': rows_list}
171    try:
172        insert_req = big_query.tabledata().insertAll(projectId=project_id,
173                                                     datasetId=dataset_id,
174                                                     tableId=table_id,
175                                                     body=body)
176        res = insert_req.execute(num_retries=NUM_RETRIES)
177        if res.get('insertErrors', None):
178            print('Error inserting rows! Response: %s' % res)
179            is_success = False
180    except HttpError as http_error:
181        print('Error inserting rows to the table %s' % table_id)
182        print('Error message: %s' % http_error)
183        is_success = False
184
185    return is_success
186
187
188def sync_query_job(big_query, project_id, query, timeout=5000):
189    query_data = {'query': query, 'timeoutMs': timeout}
190    query_job = None
191    try:
192        query_job = big_query.jobs().query(
193            projectId=project_id,
194            body=query_data).execute(num_retries=NUM_RETRIES)
195    except HttpError as http_error:
196        print('Query execute job failed with error: %s' % http_error)
197        print(http_error.content)
198    return query_job
199
200
201    # List of (column name, column type, description) tuples
202def make_row(unique_row_id, row_values_dict):
203    """row_values_dict is a dictionary of column name and column value.
204  """
205    return {'insertId': unique_row_id, 'json': row_values_dict}
206