1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 #ifndef TENSORFLOW_CONTRIB_CLOUD_KERNELS_BIGQUERY_TABLE_ACCESSOR_H_ 17 #define TENSORFLOW_CONTRIB_CLOUD_KERNELS_BIGQUERY_TABLE_ACCESSOR_H_ 18 19 #include <map> 20 #include <memory> 21 #include <vector> 22 23 #include "tensorflow/contrib/cloud/kernels/bigquery_table_partition.pb.h" 24 #include "tensorflow/core/example/example.pb.h" 25 #include "tensorflow/core/lib/core/errors.h" 26 #include "tensorflow/core/platform/cloud/curl_http_request.h" 27 #include "tensorflow/core/platform/cloud/google_auth_provider.h" 28 29 namespace tensorflow { 30 31 /// This class facilitates accessing BigQuery tables. 32 /// 33 /// Notes: 34 /// - Nested fields are not supported. 35 /// - BigQuery 'Record's are automatically flattened, 36 /// - BigQuery float type is a double but is converted to a C++ float in this 37 /// class. 38 /// - It is possible for a table snapshot to go out-of-scope in the BigQuery 39 /// service while accessing the table if a very old timestamp is used. For 40 /// exact details, see 'Table Decorators' in BigQuery docs. 41 class BigQueryTableAccessor { 42 public: 43 // Column types supported by BigQuery. 44 enum class ColumnType { 45 kString = 0, 46 kBytes, 47 kInteger, 48 kFloat, 49 kBoolean, 50 kTimestamp, 51 kDate, 52 kTime, 53 kDatetime, 54 kRecord, 55 kNone 56 }; 57 58 /// \brief Creates a new BigQueryTableAccessor object. 59 // 60 // We do not allow relative (negative or zero) snapshot times here since we 61 // want to have a consistent snapshot of the table for the lifetime of this 62 // object. 63 // Use end_point if you want to connect to a different end point than the 64 // official BigQuery end point. Otherwise send an empty string. 65 static Status New(const string& project_id, const string& dataset_id, 66 const string& table_id, int64 timestamp_millis, 67 int64 row_buffer_size, const string& end_point, 68 const std::vector<string>& columns, 69 const BigQueryTablePartition& partition, 70 std::unique_ptr<BigQueryTableAccessor>* accessor); 71 72 /// \brief Starts reading a new partition. 73 Status SetPartition(const BigQueryTablePartition& partition); 74 75 /// \brief Returns true if there are more rows available in the current 76 /// partition. 77 bool Done(); 78 79 /// \brief Returns a single row as example proto. 80 /// 81 /// This function will return an error if the table snapshot goes out of scope 82 /// in the BigQuery service. 83 Status ReadRow(int64* row_id, Example* example); 84 85 /// \brief Returns total number of rows in the table. total_num_rows()86 int64 total_num_rows() { return total_num_rows_; } 87 ~BigQueryTableAccessor()88 virtual ~BigQueryTableAccessor() {} 89 90 private: 91 friend class BigQueryTableAccessorTest; 92 93 // This struct encapsulates schema nodes for a BigQuery table. 94 struct SchemaNode { SchemaNodeSchemaNode95 SchemaNode() {} SchemaNodeSchemaNode96 SchemaNode(const string& name, ColumnType type) : name(name), type(type) {} 97 98 string name; 99 ColumnType type; 100 std::vector<SchemaNode> schema_nodes; 101 }; 102 103 /// If nullptr is passed for http_request_factory and auth_provider the 104 /// default production ones are used. This can be used by tests to override 105 /// these two variables. 106 static Status New(const string& project_id, const string& dataset_id, 107 const string& table_id, int64 timestamp_millis, 108 int64 row_buffer_size, const string& end_point, 109 const std::vector<string>& columns, 110 const BigQueryTablePartition& partition, 111 std::unique_ptr<AuthProvider> auth_provider, 112 std::shared_ptr<HttpRequest::Factory> http_request_factory, 113 std::unique_ptr<BigQueryTableAccessor>* accessor); 114 115 /// \brief Constructs an object for a given table and partition. 116 BigQueryTableAccessor( 117 const string& project_id, const string& dataset_id, 118 const string& table_id, int64 timestamp_millis, int64 row_buffer_size, 119 const string& end_point, const std::vector<string>& columns, 120 const BigQueryTablePartition& partition, 121 std::unique_ptr<AuthProvider> auth_provider, 122 std::shared_ptr<HttpRequest::Factory> http_request_factory); 123 124 /// \brief Parses column values for a given row. 125 Status ParseColumnValues(const Json::Value& value, 126 const SchemaNode& root_schema_node, 127 Example* example); 128 129 /// \brief Reads the table schema and stores it. 130 Status ReadSchema(); 131 132 /// \brief Extracts column type from a column in schema. 133 Status ExtractColumnType(const Json::Value& columns, 134 const string& column_name_prefix, SchemaNode* root); 135 136 /// \brief Appends a single BigQuery column Value to 'example' for a given 137 /// column. 138 Status AppendValueToExample(const string& column_name, 139 const Json::Value& column_value, 140 const BigQueryTableAccessor::ColumnType type, 141 Example* example); 142 143 /// \brief Resets internal counters for reading a partition. 144 void Reset(); 145 146 /// \brief Helper function that returns BigQuery http endpoint prefix. 147 string BigQueryUriPrefix(); 148 149 /// \brief Computes the maxResults arg to send to BigQuery. 150 int64 ComputeMaxResultsArg(); 151 152 /// \brief Returns full name of the underlying table name. FullTableName()153 string FullTableName() { 154 return strings::StrCat(project_id_, ":", dataset_id_, ".", table_id_, "@", 155 timestamp_millis_); 156 } 157 158 const string project_id_; 159 const string dataset_id_; 160 const string table_id_; 161 162 // Snapshot timestamp. 163 const int64 timestamp_millis_; 164 165 // Columns that should be read. Empty means all columns. 166 const std::set<string> columns_; 167 168 // HTTP address of BigQuery end point to use. 169 const string bigquery_end_point_; 170 171 // Describes the portion of the table that we are currently accessing. 172 BigQueryTablePartition partition_; 173 174 // Total number of rows in the underlying table. 175 int64 total_num_rows_ = 0; 176 177 // Offset of the first row in the underlying row_buffer_. 178 int64 first_buffered_row_index_ = 0; 179 180 // Offset of the next row in the row_buffer_. -1 indicates that this index 181 // is invalid. 182 int next_row_in_buffer_ = -1; 183 184 // This buffer holds next rows to improve performance. Its size will be 185 // based on how much buffering was requested. 186 std::vector<Example> row_buffer_; 187 188 // If next_page is set, it will used to read next batch of data. 189 string next_page_token_; 190 191 // A tree representing the schema for the underlying table. 192 SchemaNode schema_root_; 193 194 std::unique_ptr<AuthProvider> auth_provider_; 195 std::shared_ptr<HttpRequest::Factory> http_request_factory_; 196 197 TF_DISALLOW_COPY_AND_ASSIGN(BigQueryTableAccessor); 198 }; 199 200 } // namespace tensorflow 201 #endif // TENSORFLOW_CONTRIB_CLOUD_KERNELS_BIGQUERY_TABLE_ACCESSOR_H_ 202