1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 #ifndef TENSORFLOW_CORE_KERNELS_CLOUD_BIGQUERY_PARTITION_ACCESSOR_H_ 17 #define TENSORFLOW_CORE_KERNELS_CLOUD_BIGQUERY_PARTITION_ACCESSOR_H_ 18 19 #include <map> 20 #include <memory> 21 #include <vector> 22 23 #include "tensorflow/contrib/cloud/kernels/bigquery_table_partition.pb.h" 24 #include "tensorflow/core/example/example.pb.h" 25 #include "tensorflow/core/lib/core/errors.h" 26 #include "tensorflow/core/platform/cloud/curl_http_request.h" 27 #include "tensorflow/core/platform/cloud/google_auth_provider.h" 28 29 namespace tensorflow { 30 31 /// This class facilitates accessing BigQuery tables. 32 /// 33 /// Notes: 34 /// - Nested fields are not supported. 35 /// - BigQuery 'Record's are automatically flattened, 36 /// - BigQuery float type is a double but is converted to a C++ float in this 37 /// class. 38 /// - It is possible for a table snapshot to go out-of-scope in the BigQuery 39 /// service while accessing the table if a very old timestamp is used. For 40 /// exact details, see 'Table Decorators' in BigQuery docs. 41 class BigQueryTableAccessor { 42 public: 43 // Column types supported by BigQuery. 44 enum class ColumnType { 45 kString = 0, 46 kBytes, 47 kInteger, 48 kFloat, 49 kBoolean, 50 kTimestamp, 51 kDate, 52 kTime, 53 kDatetime, 54 kRecord, 55 kNone 56 }; 57 58 /// \brief Creates a new BigQueryTableAccessor object. 59 // 60 // We do not allow relative (negative or zero) snapshot times here since we 61 // want to have a consistent snapshot of the table for the lifetime of this 62 // object. 63 // Use end_point if you want to connect to a different end point than the 64 // official BigQuery end point. Otherwise send an empty string. 65 static Status New(const string& project_id, const string& dataset_id, 66 const string& table_id, int64 timestamp_millis, 67 int64 row_buffer_size, const string& end_point, 68 const std::vector<string>& columns, 69 const BigQueryTablePartition& partition, 70 std::unique_ptr<BigQueryTableAccessor>* accessor); 71 72 /// \brief Starts reading a new partition. 73 Status SetPartition(const BigQueryTablePartition& partition); 74 75 /// \brief Returns true if there are more rows available in the current 76 /// partition. 77 bool Done(); 78 79 /// \brief Returns a single row as example proto. 80 /// 81 /// This function will return an error if the table snapshot goes out of scope 82 /// in the BigQuery service. 83 Status ReadRow(int64* row_id, Example* example); 84 85 /// \brief Returns total number of rows in the table. total_num_rows()86 int64 total_num_rows() { return total_num_rows_; } 87 ~BigQueryTableAccessor()88 virtual ~BigQueryTableAccessor() {} 89 90 private: 91 friend class BigQueryTableAccessorTest; 92 93 // This struct encapsulates schema nodes for a BigQuery table. 94 struct SchemaNode { SchemaNodeSchemaNode95 SchemaNode() {} SchemaNodeSchemaNode96 SchemaNode(const string& name, ColumnType type) : name(name), type(type) {} 97 98 string name; 99 ColumnType type; 100 std::vector<SchemaNode> schema_nodes; 101 }; 102 103 /// If nullptr is passed for http_request_factory and auth_provider the 104 /// default production ones are used. This can be used by tests to override 105 /// these two variables. 106 static Status New(const string& project_id, const string& dataset_id, 107 const string& table_id, int64 timestamp_millis, 108 int64 row_buffer_size, const string& end_point, 109 const std::vector<string>& columns, 110 const BigQueryTablePartition& partition, 111 std::unique_ptr<AuthProvider> auth_provider, 112 std::unique_ptr<HttpRequest::Factory> http_request_factory, 113 std::unique_ptr<BigQueryTableAccessor>* accessor); 114 115 /// \brief Constructs an object for a given table and partition. 116 BigQueryTableAccessor(const string& project_id, const string& dataset_id, 117 const string& table_id, int64 timestamp_millis, 118 int64 row_buffer_size, const string& end_point, 119 const std::vector<string>& columns, 120 const BigQueryTablePartition& partition); 121 122 /// Used for unit testing. 123 BigQueryTableAccessor( 124 const string& project_id, const string& dataset_id, 125 const string& table_id, int64 timestamp_millis, int64 row_buffer_size, 126 const string& end_point, const std::vector<string>& columns, 127 const BigQueryTablePartition& partition, 128 std::unique_ptr<AuthProvider> auth_provider, 129 std::unique_ptr<HttpRequest::Factory> http_request_factory); 130 131 /// \brief Parses column values for a given row. 132 Status ParseColumnValues(const Json::Value& value, 133 const SchemaNode& root_schema_node, 134 Example* example); 135 136 /// \brief Reads the table schema and stores it. 137 Status ReadSchema(); 138 139 /// \brief Extracts column type from a column in schema. 140 Status ExtractColumnType(const Json::Value& columns, 141 const string& column_name_prefix, SchemaNode* root); 142 143 /// \brief Appends a single BigQuery column Value to 'example' for a given 144 /// column. 145 Status AppendValueToExample(const string& column_name, 146 const Json::Value& column_value, 147 const BigQueryTableAccessor::ColumnType type, 148 Example* example); 149 150 /// \brief Resets internal counters for reading a partition. 151 void Reset(); 152 153 /// \brief Helper function that returns BigQuery http endpoint prefix. 154 string BigQueryUriPrefix(); 155 156 /// \brief Computes the maxResults arg to send to BigQuery. 157 int64 ComputeMaxResultsArg(); 158 159 /// \brief Returns full name of the underlying table name. FullTableName()160 string FullTableName() { 161 return strings::StrCat(project_id_, ":", dataset_id_, ".", table_id_, "@", 162 timestamp_millis_); 163 } 164 165 const string project_id_; 166 const string dataset_id_; 167 const string table_id_; 168 169 // Snapshot timestamp. 170 const int64 timestamp_millis_; 171 172 // Columns that should be read. Empty means all columns. 173 const std::set<string> columns_; 174 175 // HTTP address of BigQuery end point to use. 176 const string bigquery_end_point_; 177 178 // Describes the portion of the table that we are currently accessing. 179 BigQueryTablePartition partition_; 180 181 // Total number of rows in the underlying table. 182 int64 total_num_rows_ = 0; 183 184 // Offset of the first row in the underlying row_buffer_. 185 int64 first_buffered_row_index_ = 0; 186 187 // Offset of the next row in the row_buffer_. -1 indicates that this index 188 // is invalid. 189 int next_row_in_buffer_ = -1; 190 191 // This buffer holds next rows to improve performance. Its size will be 192 // based on how much buffering was requested. 193 std::vector<Example> row_buffer_; 194 195 // If next_page is set, it will used to read next batch of data. 196 string next_page_token_; 197 198 // A tree representing the schema for the underlying table. 199 SchemaNode schema_root_; 200 201 std::unique_ptr<AuthProvider> auth_provider_; 202 std::unique_ptr<HttpRequest::Factory> http_request_factory_; 203 204 TF_DISALLOW_COPY_AND_ASSIGN(BigQueryTableAccessor); 205 }; 206 207 } // namespace tensorflow 208 #endif // TENSORFLOW_CORE_KERNELS_CLOUD_BIGQUERY_PARTITION_ACCESSOR_H_ 209