// Copyright 2017 The TensorFlow Authors. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // ============================================================================= #include #include #include #include #include "tensorflow/contrib/boosted_trees/lib/quantiles/weighted_quantiles_stream.h" #include "tensorflow/contrib/boosted_trees/lib/utils/parallel_for.h" #include "tensorflow/contrib/boosted_trees/lib/utils/tensor_utils.h" #include "tensorflow/contrib/boosted_trees/proto/quantiles.pb.h" #include "tensorflow/contrib/boosted_trees/resources/quantile_stream_resource.h" #include "tensorflow/core/framework/op_kernel.h" #include "tensorflow/core/framework/resource_mgr.h" #include "tensorflow/core/framework/tensor.h" #include "tensorflow/core/framework/tensor_shape.h" #include "tensorflow/core/framework/types.h" #include "tensorflow/core/lib/core/errors.h" #include "tensorflow/core/lib/core/status.h" #include "tensorflow/core/lib/strings/stringprintf.h" #include "tensorflow/core/platform/types.h" #include "tensorflow/core/util/work_sharder.h" namespace tensorflow { using ::boosted_trees::QuantileConfig; using boosted_trees::QuantileStreamResource; using boosted_trees::utils::TensorUtils; namespace { const char* const kExampleWeightsName = "example_weights"; const char* const kMaxElementsName = "max_elements"; const char* const kNextStampTokenName = "next_stamp_token"; const char* const kStampTokenName = "stamp_token"; const char* const kAreBucketsReadyName = "are_buckets_ready"; const char* const kGenerateQuantiles = "generate_quantiles"; // Names for sparse arguments. const char* const kNumSparseFeaturesName = "num_sparse_features"; const char* const kSparseBucketsName = "sparse_buckets"; const char* const kSparseValuesName = "sparse_values"; const char* const kSparseIndicesName = "sparse_indices"; const char* const kSparseSummariesName = "sparse_summaries"; const char* const kSparseConfigName = "sparse_config"; const char* const kSparseOutputTensorName = "sparse_quantiles"; // Names for dense arguments. const char* const kDenseBucketsName = "dense_buckets"; const char* const kDenseConfigName = "dense_config"; const char* const kDenseOutputTensorName = "dense_quantiles"; const char* const kDenseSummariesName = "dense_summaries"; const char* const kDenseValuesName = "dense_values"; const char* const kNumDenseFeaturesName = "num_dense_features"; const char* const kResourceHandlesName = "quantile_accumulator_handles"; const char* const kNumQuantilesName = "num_quantiles"; const char* const kEpsilonName = "epsilon"; const char* const kBucketsName = "buckets"; const char* const kStreamStateName = "stream_state"; const char* const kSummariesName = "summaries"; using QuantileStream = boosted_trees::quantiles::WeightedQuantilesStream; using QuantileSummary = boosted_trees::quantiles::WeightedQuantilesSummary; using QuantileSummaryEntry = boosted_trees::quantiles::WeightedQuantilesSummary::SummaryEntry; std::vector GetBuckets(const int32 feature, const OpInputList& buckets_list) { const auto& buckets = buckets_list[feature].flat(); std::vector buckets_vector(buckets.data(), buckets.data() + buckets.size()); return buckets_vector; } int32 GetFeatureDimension(const int32 feature_index, const int64 instance, const OpInputList* const indices_list) { if (indices_list != nullptr) { // Sparse multidimensional. return (*indices_list)[feature_index].matrix()(instance, 1); } // No indices, assume one-dimensional tensor. return 0; } // Allows quantization for each of multiple dimensions of a sparse feature. void QuantizeFeatures( const string& output_name, const OpInputList& values_list, const OpInputList& buckets_list, const OpInputList* const indices_list /** Optional, provide for sparse features **/, OpKernelContext* const context) { if (values_list.size() == 0) { return; } OpOutputList output_list; OP_REQUIRES_OK(context, context->output_list(output_name, &output_list)); for (int32 feature_index = 0; feature_index < values_list.size(); ++feature_index) { const Tensor& values_tensor = values_list[feature_index]; const int64 num_values = values_tensor.dim_size(0); Tensor* output_t = nullptr; // Output will have bucket id and dimension of the features for that bucket. OP_REQUIRES_OK( context, output_list.allocate(feature_index, TensorShape({num_values, 2}), &output_t)); auto output = output_t->matrix(); const std::vector& buckets_vector = GetBuckets(feature_index, buckets_list); auto flat_values = values_tensor.flat(); for (int64 instance = 0; instance < num_values; ++instance) { const float value = flat_values(instance); CHECK(!buckets_vector.empty()) << "Got empty buckets for feature " << feature_index; auto bucket_iter = std::lower_bound(buckets_vector.begin(), buckets_vector.end(), value); if (bucket_iter == buckets_vector.end()) { --bucket_iter; } const int32 bucket = static_cast(bucket_iter - buckets_vector.begin()); // Bucket id. output(instance, 0) = bucket; // Dimension. output(instance, 1) = GetFeatureDimension(feature_index, instance, indices_list); } } } // Validates attributes for the quantile ops. Status ReadAndValidateAttributes(OpKernelConstruction* const context, int* num_dense_features, int* num_sparse_features) { TF_RETURN_IF_ERROR( context->GetAttr(kNumDenseFeaturesName, num_dense_features)); TF_RETURN_IF_ERROR( context->GetAttr(kNumSparseFeaturesName, num_sparse_features)); if ((*num_dense_features) + (*num_sparse_features) == 0) { return errors::InvalidArgument( "Please provide at least sparse or dense features."); } return Status::OK(); } void ParseConfig(OpKernelConstruction* const context, const string& name, std::vector* output) { std::vector serialized_config; OP_REQUIRES_OK(context, context->GetAttr(name, &serialized_config)); output->reserve(serialized_config.size()); QuantileConfig tmp; for (const auto& serialized_string : serialized_config) { OP_REQUIRES(context, tmp.ParseFromString(serialized_string), errors::InvalidArgument("Malformed QuantileConfig passed in.")); output->push_back(tmp); } } // Generates quantiles on a finalized QuantileStream. std::vector GenerateBoundaries(const QuantileStream& stream, int num_boundaries) { std::vector boundaries = stream.GenerateBoundaries(num_boundaries); // Uniquify elements as we may get dupes. auto end_it = std::unique(boundaries.begin(), boundaries.end()); boundaries.resize(std::distance(boundaries.begin(), end_it)); return boundaries; } // Generates quantiles on a finalized QuantileStream. std::vector GenerateQuantiles(const QuantileStream& stream, int num_quantiles) { // Do not de-dup boundaries. Exactly num_quantiles+1 boundary values // will be returned. std::vector boundaries = stream.GenerateQuantiles(num_quantiles); CHECK_EQ(boundaries.size(), num_quantiles + 1); return boundaries; } // Copies quantiles to output list. void CopyBoundaries(OpKernelContext* const context, const std::vector& boundaries, const int64 index, OpOutputList* output_list) { // Output to tensor. Tensor* output_t = nullptr; OP_REQUIRES_OK( context, output_list->allocate( index, {static_cast(boundaries.size())}, &output_t)); auto* quantiles_flat = output_t->flat().data(); memcpy(quantiles_flat, boundaries.data(), sizeof(float) * boundaries.size()); } void CopySummaryToProto(const QuantileSummary& summary, ::boosted_trees::QuantileSummaryState* summary_proto) { summary_proto->mutable_entries()->Reserve(summary.Size()); for (const auto& entry : summary.GetEntryList()) { auto* new_entry = summary_proto->add_entries(); new_entry->set_value(entry.value); new_entry->set_weight(entry.weight); new_entry->set_min_rank(entry.min_rank); new_entry->set_max_rank(entry.max_rank); } } } // namespace // Accumulator for Quantile Summaries. REGISTER_RESOURCE_HANDLE_KERNEL(QuantileStreamResource); REGISTER_KERNEL_BUILDER( Name("QuantileAccumulatorIsInitialized").Device(DEVICE_CPU), IsResourceInitialized); class CreateQuantileAccumulatorOp : public OpKernel { public: explicit CreateQuantileAccumulatorOp(OpKernelConstruction* const context) : OpKernel(context) { OP_REQUIRES_OK(context, context->GetAttr(kEpsilonName, &epsilon_)); OP_REQUIRES_OK(context, context->GetAttr(kNumQuantilesName, &num_quantiles_)); OP_REQUIRES_OK(context, context->GetAttr(kMaxElementsName, &max_elements_)); OP_REQUIRES_OK(context, context->GetAttr(kGenerateQuantiles, &generate_quantiles_)); } void Compute(OpKernelContext* context) override { // Only create one, if one does not exist already. Report status for all // other exceptions. If one already exists, it unrefs the new one. const Tensor* stamp_token_t; OP_REQUIRES_OK(context, context->input(kStampTokenName, &stamp_token_t)); // An epsilon value of zero could cause perfoamance issues and is therefore, // disallowed. OP_REQUIRES( context, epsilon_ > 0, errors::InvalidArgument("An epsilon value of zero is not allowed.")); auto result = new QuantileStreamResource(epsilon_, num_quantiles_, max_elements_, generate_quantiles_, stamp_token_t->scalar()()); auto status = CreateResource(context, HandleFromInput(context, 0), result); if (!status.ok() && status.code() != tensorflow::error::ALREADY_EXISTS) { OP_REQUIRES(context, false, status); } } private: float epsilon_; int32 num_quantiles_; // An upper bound on the number of entries that the summaries might have // for a feature. int64 max_elements_; bool generate_quantiles_; }; REGISTER_KERNEL_BUILDER(Name("CreateQuantileAccumulator").Device(DEVICE_CPU), CreateQuantileAccumulatorOp); // Adds a summary to the quantile summary stream. class QuantileAccumulatorAddSummariesOp : public OpKernel { public: explicit QuantileAccumulatorAddSummariesOp( OpKernelConstruction* const context) : OpKernel(context) {} void Compute(OpKernelContext* context) override { OpInputList resource_handle_list; OP_REQUIRES_OK(context, context->input_list(kResourceHandlesName, &resource_handle_list)); OpInputList summary_list; OP_REQUIRES_OK(context, context->input_list(kSummariesName, &summary_list)); const Tensor* stamp_token_t; OP_REQUIRES_OK(context, context->input(kStampTokenName, &stamp_token_t)); int64 stamp_token = stamp_token_t->scalar()(); thread::ThreadPool* const worker_threads = context->device()->tensorflow_cpu_worker_threads()->workers; boosted_trees::utils::ParallelFor( resource_handle_list.size(), worker_threads->NumThreads(), worker_threads, [&context, &resource_handle_list, &summary_list, stamp_token]( int64 start, int64 end) { for (int resource_handle_idx = start; resource_handle_idx < end; ++resource_handle_idx) { const ResourceHandle& handle = resource_handle_list[resource_handle_idx] .flat()(0); QuantileStreamResource* streams_resource; // Create a reference to the underlying resource using the handle. OP_REQUIRES_OK(context, LookupResource(context, handle, &streams_resource)); // Remove the reference at the end of this scope. mutex_lock l(*streams_resource->mutex()); core::ScopedUnref unref_me(streams_resource); // If the stamp is invalid we drop the update. if (!streams_resource->is_stamp_valid(stamp_token)) { VLOG(1) << "Invalid stamp token in QuantileAccumulatorAddSummariesOp." << " Passed stamp token: " << stamp_token << " " << "Current token: " << streams_resource->stamp(); return; } protobuf::Arena arena; ::boosted_trees::QuantileSummaryState* summary_proto = protobuf::Arena::CreateMessage< ::boosted_trees::QuantileSummaryState>(&arena); OP_REQUIRES( context, ParseProtoUnlimited( summary_proto, summary_list[resource_handle_idx].scalar()()), errors::InvalidArgument("Unable to parse quantile summary.")); std::vector entries; entries.reserve(summary_proto->entries_size()); for (const auto& entry : summary_proto->entries()) { entries.emplace_back(entry.value(), entry.weight(), entry.min_rank(), entry.max_rank()); } // Add the summary to the quantile stream. streams_resource->stream(stamp_token)->PushSummary(entries); } }); } }; REGISTER_KERNEL_BUILDER( Name("QuantileAccumulatorAddSummaries").Device(DEVICE_CPU), QuantileAccumulatorAddSummariesOp); // Generates summaries for given set of float values, and the given config. class MakeQuantileSummariesOp : public OpKernel { public: explicit MakeQuantileSummariesOp(OpKernelConstruction* const context) : OpKernel(context) { OP_REQUIRES_OK(context, ReadAndValidateAttributes(context, &num_dense_features_, &num_sparse_features_)); OP_REQUIRES_OK(context, context->GetAttr(kEpsilonName, &epsilon_)); } void Compute(OpKernelContext* const context) override { // Read dense float features list; OpInputList dense_float_features_list; OP_REQUIRES_OK(context, TensorUtils::ReadDenseFloatFeatures( context, &dense_float_features_list)); // Read sparse float features list; OpInputList sparse_float_feature_indices_list; OpInputList sparse_float_feature_values_list; OpInputList sparse_float_feature_shapes_list; OP_REQUIRES_OK(context, TensorUtils::ReadSparseFloatFeatures( context, &sparse_float_feature_indices_list, &sparse_float_feature_values_list, &sparse_float_feature_shapes_list)); // Parse example weights and get batch size. const Tensor* example_weights_t; OP_REQUIRES_OK(context, context->input(kExampleWeightsName, &example_weights_t)); auto example_weights = example_weights_t->flat(); const int64 batch_size = example_weights.size(); OpOutputList sparse_summaries_output_list; OP_REQUIRES_OK(context, context->output_list(kSparseSummariesName, &sparse_summaries_output_list)); OpOutputList dense_summaries_output_list; OP_REQUIRES_OK(context, context->output_list(kDenseSummariesName, &dense_summaries_output_list)); auto do_quantile_summary_gen = [&](const int64 begin, const int64 end) { auto copy_over_summaries = [&](const QuantileStream& stream, const int64 index, OpOutputList* output_list) { protobuf::Arena arena; ::boosted_trees::QuantileSummaryState* summary_proto = protobuf::Arena::CreateMessage< ::boosted_trees::QuantileSummaryState>(&arena); const auto& summary = stream.GetFinalSummary(); CopySummaryToProto(summary, summary_proto); // Output to tensor. Tensor* output_t = nullptr; OP_REQUIRES_OK(context, output_list->allocate(index, {}, &output_t)); summary_proto->SerializeToString(&output_t->scalar()()); }; // These are blocks of ranges. We are iterating over both sparse and // dense features i.e. [0, sparse_features.size() + dense_features.size()] for (int64 i = begin; i < end; ++i) { if (i < num_dense_features_) { const int64 dense_index = i; const auto dense_values = dense_float_features_list[dense_index].flat(); QuantileStream stream(epsilon_, batch_size + 1); // Run quantile summary generation. for (int64 j = 0; j < batch_size; ++j) { stream.PushEntry(dense_values(j), example_weights(j)); } stream.Finalize(); // Copy summaries to output. copy_over_summaries(stream, dense_index, &dense_summaries_output_list); } else { const int64 sparse_index = i - num_dense_features_; const auto sparse_values = sparse_float_feature_values_list[sparse_index].flat(); const auto sparse_indices = sparse_float_feature_indices_list[sparse_index].matrix(); const auto dense_shape = sparse_float_feature_shapes_list[sparse_index].flat(); OP_REQUIRES(context, batch_size == dense_shape(0), errors::InvalidArgument( "Sparse column shape doesn't match the batch size.")); QuantileStream stream(epsilon_, batch_size + 1); // Run quantile summary generation. const int64 num_sparse_rows = sparse_float_feature_indices_list[sparse_index].dim_size(0); for (int64 j = 0; j < num_sparse_rows; ++j) { const int64 example_id = sparse_indices(j, 0); stream.PushEntry(sparse_values(j), example_weights(example_id)); } stream.Finalize(); // Copy summaries to output. copy_over_summaries(stream, sparse_index, &sparse_summaries_output_list); } } }; const int64 kCostPerUnit = 500 * batch_size; const int64 num_features = num_sparse_features_ + num_dense_features_; const DeviceBase::CpuWorkerThreads& worker_threads = *context->device()->tensorflow_cpu_worker_threads(); Shard(worker_threads.num_threads, worker_threads.workers, num_features, kCostPerUnit, do_quantile_summary_gen); } private: int num_dense_features_; int num_sparse_features_; float epsilon_; }; REGISTER_KERNEL_BUILDER(Name("MakeQuantileSummaries").Device(DEVICE_CPU), MakeQuantileSummariesOp); // Serializes the state of streams. class QuantileAccumulatorSerializeOp : public OpKernel { public: explicit QuantileAccumulatorSerializeOp(OpKernelConstruction* const context) : OpKernel(context) {} void Compute(OpKernelContext* context) override { QuantileStreamResource* streams_resource; // Create a reference to the underlying resource using the handle. OP_REQUIRES_OK(context, LookupResource(context, HandleFromInput(context, 0), &streams_resource)); // Remove the reference at the end of this scope. mutex_lock l(*streams_resource->mutex()); core::ScopedUnref unref_me(streams_resource); int64 stamp_token = streams_resource->stamp(); Tensor* stream_state_t; OP_REQUIRES_OK(context, context->allocate_output(kStreamStateName, TensorShape({}), &stream_state_t)); bool are_buckets_ready = streams_resource->are_buckets_ready(); // We are iterating over both dense and sparse features. First we go // through the dense features and then the sparse features. const QuantileStream& stream = *streams_resource->stream(stamp_token); const std::vector& boundaries = are_buckets_ready ? streams_resource->boundaries(stamp_token) : std::vector(); protobuf::Arena arena; ::boosted_trees::QuantileStreamState* stream_proto = protobuf::Arena::CreateMessage<::boosted_trees::QuantileStreamState>( &arena); for (const auto& summary : stream.SerializeInternalSummaries()) { CopySummaryToProto(summary, stream_proto->add_summaries()); } stream_proto->SerializeToString(&stream_state_t->scalar()()); Tensor* buckets_t = nullptr; OP_REQUIRES_OK( context, context->allocate_output( kBucketsName, {static_cast(boundaries.size())}, &buckets_t)); auto* quantiles_flat = buckets_t->flat().data(); memcpy(quantiles_flat, boundaries.data(), sizeof(float) * boundaries.size()); Tensor* stamp_token_t = nullptr; OP_REQUIRES_OK(context, context->allocate_output(kStampTokenName, TensorShape({}), &stamp_token_t)); stamp_token_t->scalar()() = stamp_token; Tensor* are_buckets_ready_t = nullptr; OP_REQUIRES_OK(context, context->allocate_output(kAreBucketsReadyName, {}, &are_buckets_ready_t)); are_buckets_ready_t->scalar()() = are_buckets_ready; } }; REGISTER_KERNEL_BUILDER(Name("QuantileAccumulatorSerialize").Device(DEVICE_CPU), QuantileAccumulatorSerializeOp); // Serializes the state of streams. class QuantileAccumulatorDeserializeOp : public OpKernel { public: explicit QuantileAccumulatorDeserializeOp(OpKernelConstruction* const context) : OpKernel(context) {} void Compute(OpKernelContext* context) override { QuantileStreamResource* streams_resource; // Create a reference to the underlying resource using the handle. OP_REQUIRES_OK(context, LookupResource(context, HandleFromInput(context, 0), &streams_resource)); // Remove the reference at the end of this scope. mutex_lock l(*streams_resource->mutex()); core::ScopedUnref unref_me(streams_resource); int64 old_stamp_token = streams_resource->stamp(); const Tensor* stream_state_t; OP_REQUIRES_OK(context, context->input(kStreamStateName, &stream_state_t)); const Tensor* buckets_t; OP_REQUIRES_OK(context, context->input(kBucketsName, &buckets_t)); QuantileStream* stream = streams_resource->stream(old_stamp_token); ::boosted_trees::QuantileStreamState state_proto; OP_REQUIRES( context, ParseProtoUnlimited(&state_proto, stream_state_t->scalar()()), errors::InvalidArgument("Unabnle to parse quantile stream state.")); std::vector summaries; summaries.reserve(state_proto.summaries_size()); std::vector entries; for (const auto& summary : state_proto.summaries()) { entries.clear(); entries.reserve(summary.entries_size()); for (const auto& entry : summary.entries()) { entries.emplace_back(entry.value(), entry.weight(), entry.min_rank(), entry.max_rank()); } summaries.emplace_back(); summaries[summaries.size() - 1].BuildFromSummaryEntries(entries); } stream->DeserializeInternalSummaries(summaries); const auto& buckets = buckets_t->vec(); std::vector result; result.reserve(buckets.size()); for (size_t i = 0; i < buckets.size(); ++i) { result.push_back(buckets(i)); } streams_resource->set_boundaries(old_stamp_token, result); // Reset the stamp token. const Tensor* stamp_token_t = nullptr; OP_REQUIRES_OK(context, context->input(kStampTokenName, &stamp_token_t)); int64 stamp_token = stamp_token_t->scalar()(); streams_resource->set_stamp(stamp_token); const Tensor* are_buckets_ready_t = nullptr; OP_REQUIRES_OK(context, context->input(kAreBucketsReadyName, &are_buckets_ready_t)); streams_resource->set_buckets_ready(are_buckets_ready_t->scalar()()); } }; REGISTER_KERNEL_BUILDER( Name("QuantileAccumulatorDeserialize").Device(DEVICE_CPU), QuantileAccumulatorDeserializeOp); // Flushes the quantile summary stream resource. class QuantileAccumulatorFlushOp : public OpKernel { public: explicit QuantileAccumulatorFlushOp(OpKernelConstruction* const context) : OpKernel(context) {} void Compute(OpKernelContext* context) override { QuantileStreamResource* streams_resource; // Create a reference to the underlying resource using the handle. OP_REQUIRES_OK(context, LookupResource(context, HandleFromInput(context, 0), &streams_resource)); // Remove the reference at the end of this scope. mutex_lock l(*streams_resource->mutex()); core::ScopedUnref unref_me(streams_resource); const Tensor* next_stamp_token_t; OP_REQUIRES_OK(context, context->input(kNextStampTokenName, &next_stamp_token_t)); int64 next_stamp_token = next_stamp_token_t->scalar()(); const Tensor* stamp_token_t; OP_REQUIRES_OK(context, context->input(kStampTokenName, &stamp_token_t)); int64 stamp_token = stamp_token_t->scalar()(); CHECK(streams_resource->is_stamp_valid(stamp_token)) << "Invalid stamp token in QuantileAccumulatorFlushOp. " << "Passed stamp token: " << stamp_token << " " << "Current token: " << streams_resource->stamp(); QuantileStream* stream = streams_resource->stream(stamp_token); bool generate_quantiles = streams_resource->generate_quantiles(); stream->Finalize(); streams_resource->set_boundaries( stamp_token, generate_quantiles ? GenerateQuantiles(*stream, streams_resource->num_quantiles()) : GenerateBoundaries(*stream, streams_resource->num_quantiles())); streams_resource->Reset(next_stamp_token); } }; REGISTER_KERNEL_BUILDER(Name("QuantileAccumulatorFlush").Device(DEVICE_CPU), QuantileAccumulatorFlushOp); // Flushes the quantile summary stream resource. This version computes the // summary. class QuantileAccumulatorFlushSummaryOp : public OpKernel { public: explicit QuantileAccumulatorFlushSummaryOp( OpKernelConstruction* const context) : OpKernel(context) {} void Compute(OpKernelContext* context) override { QuantileStreamResource* streams_resource; // Create a reference to the underlying resource using the handle. OP_REQUIRES_OK(context, LookupResource(context, HandleFromInput(context, 0), &streams_resource)); // Remove the reference at the end of this scope. mutex_lock l(*streams_resource->mutex()); core::ScopedUnref unref_me(streams_resource); const Tensor* next_stamp_token_t; OP_REQUIRES_OK(context, context->input(kNextStampTokenName, &next_stamp_token_t)); int64 next_stamp_token = next_stamp_token_t->scalar()(); const Tensor* stamp_token_t; OP_REQUIRES_OK(context, context->input(kStampTokenName, &stamp_token_t)); int64 stamp_token = stamp_token_t->scalar()(); CHECK(streams_resource->is_stamp_valid(stamp_token)) << "Invalid stamp token in QuantileAccumulatorFlushSummaryOp. " << "Passed stamp token: " << stamp_token << " " << "Current token: " << streams_resource->stamp(); QuantileStream* stream = streams_resource->stream(stamp_token); stream->Finalize(); protobuf::Arena arena; ::boosted_trees::QuantileSummaryState* summary_proto = protobuf::Arena::CreateMessage<::boosted_trees::QuantileSummaryState>( &arena); const auto& summary = stream->GetFinalSummary(); CopySummaryToProto(summary, summary_proto); // Output to tensor. Tensor* output_t = nullptr; OP_REQUIRES_OK(context, context->allocate_output(0, TensorShape({}), &output_t)); summary_proto->SerializeToString(&output_t->scalar()()); streams_resource->Reset(next_stamp_token); } }; REGISTER_KERNEL_BUILDER( Name("QuantileAccumulatorFlushSummary").Device(DEVICE_CPU), QuantileAccumulatorFlushSummaryOp); // Get bucket boundaries from summaries. class QuantileAccumulatorGetBucketsOp : public OpKernel { public: explicit QuantileAccumulatorGetBucketsOp(OpKernelConstruction* const context) : OpKernel(context) {} void Compute(OpKernelContext* const context) override { OpInputList resource_handle_list; OP_REQUIRES_OK(context, context->input_list(kResourceHandlesName, &resource_handle_list)); OpOutputList are_buckets_ready_list; OP_REQUIRES_OK(context, context->output_list(kAreBucketsReadyName, &are_buckets_ready_list)); OpOutputList buckets_list; OP_REQUIRES_OK(context, context->output_list(kBucketsName, &buckets_list)); const Tensor* stamp_token_t; OP_REQUIRES_OK(context, context->input(kStampTokenName, &stamp_token_t)); int64 stamp_token = stamp_token_t->scalar()(); thread::ThreadPool* const worker_threads = context->device()->tensorflow_cpu_worker_threads()->workers; boosted_trees::utils::ParallelFor( resource_handle_list.size(), worker_threads->NumThreads(), worker_threads, [&context, &resource_handle_list, &are_buckets_ready_list, &buckets_list, stamp_token](int64 start, int64 end) { for (int resource_handle_idx = start; resource_handle_idx < end; ++resource_handle_idx) { const ResourceHandle& handle = resource_handle_list[resource_handle_idx] .flat()(0); QuantileStreamResource* streams_resource; OP_REQUIRES_OK(context, LookupResource(context, handle, &streams_resource)); // Remove the reference at the end of this scope. mutex_lock l(*streams_resource->mutex()); core::ScopedUnref unref_me(streams_resource); bool are_buckets_ready = streams_resource->is_stamp_valid(stamp_token) && streams_resource->are_buckets_ready(); Tensor* are_buckets_ready_t = nullptr; OP_REQUIRES_OK(context, are_buckets_ready_list.allocate( resource_handle_idx, {}, &are_buckets_ready_t)); are_buckets_ready_t->scalar()() = are_buckets_ready; const std::vector& boundaries = are_buckets_ready ? streams_resource->boundaries(stamp_token) : std::vector(); Tensor* output_t = nullptr; OP_REQUIRES_OK(context, buckets_list.allocate( resource_handle_idx, {static_cast(boundaries.size())}, &output_t)); auto* quantiles_flat = output_t->flat().data(); memcpy(quantiles_flat, boundaries.data(), sizeof(float) * boundaries.size()); } }); } }; REGISTER_KERNEL_BUILDER( Name("QuantileAccumulatorGetBuckets").Device(DEVICE_CPU), QuantileAccumulatorGetBucketsOp); // Generates buckets for given set of float values, and the given config. class QuantileBucketsOp : public OpKernel { public: explicit QuantileBucketsOp(OpKernelConstruction* const context) : OpKernel(context) { OP_REQUIRES_OK(context, ReadAndValidateAttributes(context, &num_dense_features_, &num_sparse_features_)); ParseConfig(context, kDenseConfigName, &dense_configs_); OP_REQUIRES(context, dense_configs_.size() == num_dense_features_, errors::InvalidArgument( "Mismatch in number of dense quantile configs.")); ParseConfig(context, kSparseConfigName, &sparse_configs_); OP_REQUIRES(context, sparse_configs_.size() == num_sparse_features_, errors::InvalidArgument( "Mismatch in number of sparse quantile configs.")); } void Compute(OpKernelContext* const context) override { // Read dense float features list; OpInputList dense_float_features_list; OP_REQUIRES_OK(context, TensorUtils::ReadDenseFloatFeatures( context, &dense_float_features_list)); // Read sparse float features list; OpInputList sparse_float_feature_indices_list; OpInputList sparse_float_feature_values_list; OpInputList sparse_float_feature_shapes_list; OP_REQUIRES_OK(context, TensorUtils::ReadSparseFloatFeatures( context, &sparse_float_feature_indices_list, &sparse_float_feature_values_list, &sparse_float_feature_shapes_list)); // Parse example weights and get batch size. const Tensor* example_weights_t; OP_REQUIRES_OK(context, context->input(kExampleWeightsName, &example_weights_t)); auto example_weights = example_weights_t->flat(); const int64 batch_size = example_weights.size(); OpOutputList sparse_buckets_output_list; OP_REQUIRES_OK(context, context->output_list(kSparseBucketsName, &sparse_buckets_output_list)); OpOutputList dense_buckets_output_list; OP_REQUIRES_OK(context, context->output_list(kDenseBucketsName, &dense_buckets_output_list)); auto do_quantile_bucket_gen = [&](const int64 begin, const int64 end) { // These are blocks of ranges. We are iterating over both sparse and // dense features i.e. [0, sparse_features.size() + dense_features.size()] for (int64 i = begin; i < end; ++i) { if (i < sparse_configs_.size()) { const int64 sparse_index = i; const auto sparse_values = sparse_float_feature_values_list[sparse_index].flat(); const auto sparse_indices = sparse_float_feature_indices_list[sparse_index].matrix(); QuantileStream stream(sparse_configs_[sparse_index].eps(), batch_size); // Run quantile summary generation. const int64 num_sparse_rows = sparse_float_feature_indices_list[sparse_index].dim_size(0); for (int64 j = 0; j < num_sparse_rows; ++j) { const int64 example_id = sparse_indices(j, 0); stream.PushEntry(sparse_values(j), example_weights(example_id)); } stream.Finalize(); // Create buckets. const auto boundaries = GenerateBoundaries( stream, sparse_configs_[sparse_index].num_quantiles()); CopyBoundaries(context, boundaries, sparse_index, &sparse_buckets_output_list); } else { const int64 dense_index = i - sparse_configs_.size(); const auto dense_values = dense_float_features_list[dense_index].flat(); QuantileStream stream(dense_configs_[dense_index].eps(), batch_size); // Run quantile summary generation. for (int64 j = 0; j < batch_size; ++j) { stream.PushEntry(dense_values(j), example_weights(j)); } stream.Finalize(); // Create buckets. const auto boundaries = GenerateBoundaries( stream, dense_configs_[dense_index].num_quantiles()); CopyBoundaries(context, boundaries, dense_index, &dense_buckets_output_list); } } }; const int64 kCostPerUnit = 500 * batch_size; const int64 num_features = sparse_configs_.size() + dense_configs_.size(); const DeviceBase::CpuWorkerThreads& worker_threads = *context->device()->tensorflow_cpu_worker_threads(); Shard(worker_threads.num_threads, worker_threads.workers, num_features, kCostPerUnit, do_quantile_bucket_gen); } private: int num_dense_features_; int num_sparse_features_; std::vector dense_configs_; std::vector sparse_configs_; }; REGISTER_KERNEL_BUILDER(Name("QuantileBuckets").Device(DEVICE_CPU), QuantileBucketsOp); // Given the calculated quantiles thresholds and input data, this operation // converts the input features into the buckets (categorical values), depending // on which quantile they fall into. class QuantilesOp : public OpKernel { public: explicit QuantilesOp(OpKernelConstruction* const context) : OpKernel(context) { int num_dense_features; int num_sparse_features; OP_REQUIRES_OK(context, ReadAndValidateAttributes(context, &num_dense_features, &num_sparse_features)); } void Compute(OpKernelContext* const context) override { // Dense features inputs OpInputList dense_float_features_list; OP_REQUIRES_OK(context, context->input_list(kDenseValuesName, &dense_float_features_list)); OpInputList dense_buckets_list; OP_REQUIRES_OK(context, context->input_list(kDenseBucketsName, &dense_buckets_list)); if (dense_buckets_list.size() > 0) { // Check the first tensor to make sure it is the right shape OP_REQUIRES( context, tensorflow::TensorShapeUtils::IsVector(dense_buckets_list[0].shape()), errors::InvalidArgument( strings::Printf("Dense buckets should be flat vectors"))); } // Sparse features inputs OpInputList sparse_float_feature_values_list; OP_REQUIRES_OK(context, context->input_list(kSparseValuesName, &sparse_float_feature_values_list)); OpInputList sparse_float_indices_list; OP_REQUIRES_OK(context, context->input_list(kSparseIndicesName, &sparse_float_indices_list)); OpInputList sparse_buckets_list; OP_REQUIRES_OK( context, context->input_list(kSparseBucketsName, &sparse_buckets_list)); if (sparse_buckets_list.size() > 0) { OP_REQUIRES( context, tensorflow::TensorShapeUtils::IsVector( sparse_buckets_list[0].shape()), errors::InvalidArgument("Sparse buckets should be flat vectors")); } // Quantize the feature values QuantizeFeatures(kDenseOutputTensorName, dense_float_features_list, dense_buckets_list, nullptr, context); QuantizeFeatures(kSparseOutputTensorName, sparse_float_feature_values_list, sparse_buckets_list, &sparse_float_indices_list, context); } }; REGISTER_KERNEL_BUILDER(Name("Quantiles").Device(DEVICE_CPU), QuantilesOp); template class BucketizeWithInputBoundariesOp : public OpKernel { public: explicit BucketizeWithInputBoundariesOp(OpKernelConstruction* context) : OpKernel(context) {} void Compute(OpKernelContext* context) override { const Tensor& boundaries_tensor = context->input(1); VLOG(1) << "boundaries has shape: " << boundaries_tensor.shape().DebugString(); auto boundaries = boundaries_tensor.flat(); std::vector boundaries_vector; boundaries_vector.reserve(boundaries.size()); for (size_t i = 0; i < boundaries.size(); i++) { boundaries_vector.push_back(boundaries(i)); VLOG(1) << "boundaries(" << i << ") : " << boundaries(i); } OP_REQUIRES( context, std::is_sorted(boundaries_vector.begin(), boundaries_vector.end()), errors::InvalidArgument("Expected sorted boundaries")); const Tensor& input_tensor = context->input(0); VLOG(1) << "Inputs has shape: " << input_tensor.shape().DebugString() << " Dtype: " << tensorflow::DataTypeString(input_tensor.dtype()); auto input = input_tensor.flat(); Tensor* output_tensor = nullptr; OP_REQUIRES_OK(context, context->allocate_output(0, input_tensor.shape(), &output_tensor)); auto output = output_tensor->template flat(); for (size_t i = 0; i < input.size(); i++) { output(i) = CalculateBucketIndex(input(i), boundaries_vector); } } private: int32 CalculateBucketIndex(const T value, std::vector& boundaries_vector) { auto first_bigger_it = std::upper_bound(boundaries_vector.begin(), boundaries_vector.end(), value); int32 index = first_bigger_it - boundaries_vector.begin(); CHECK(index >= 0 && index <= boundaries_vector.size()) << "Invalid bucket index: " << index << " boundaries_vector.size(): " << boundaries_vector.size(); return index; } }; #define REGISTER_KERNEL(T) \ REGISTER_KERNEL_BUILDER(Name("BucketizeWithInputBoundaries") \ .Device(DEVICE_CPU) \ .TypeConstraint("T"), \ BucketizeWithInputBoundariesOp); REGISTER_KERNEL(int32); REGISTER_KERNEL(int64); REGISTER_KERNEL(float); REGISTER_KERNEL(double); #undef REGISTER_KERNEL } // namespace tensorflow