• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #define EIGEN_USE_THREADS
17 
18 #include "tensorflow/core/kernels/sdca_internal.h"
19 
20 #include <limits>
21 #include <numeric>
22 #include <random>
23 
24 #include "third_party/eigen3/unsupported/Eigen/CXX11/Tensor"
25 #include "tensorflow/core/lib/gtl/flatset.h"
26 #include "tensorflow/core/lib/math/math_util.h"
27 #include "tensorflow/core/lib/random/simple_philox.h"
28 
29 #if defined(TENSORFLOW_USE_CUSTOM_CONTRACTION_KERNEL)
30 #include "tensorflow/core/kernels/eigen_contraction_kernel.h"
31 #endif
32 
33 namespace tensorflow {
34 namespace sdca {
35 
36 using UnalignedFloatVector = TTypes<const float>::UnalignedConstVec;
37 using UnalignedInt64Vector = TTypes<const int64>::UnalignedConstVec;
38 
UpdateDenseDeltaWeights(const Eigen::ThreadPoolDevice & device,const Example::DenseVector & dense_vector,const std::vector<double> & normalized_bounded_dual_delta)39 void FeatureWeightsDenseStorage::UpdateDenseDeltaWeights(
40     const Eigen::ThreadPoolDevice& device,
41     const Example::DenseVector& dense_vector,
42     const std::vector<double>& normalized_bounded_dual_delta) {
43   const size_t num_weight_vectors = normalized_bounded_dual_delta.size();
44   if (num_weight_vectors == 1) {
45     deltas_.device(device) =
46         deltas_ + dense_vector.RowAsMatrix() *
47                       deltas_.constant(normalized_bounded_dual_delta[0]);
48   } else {
49     // Transform the dual vector into a column matrix.
50     const Eigen::TensorMap<Eigen::Tensor<const double, 2, Eigen::RowMajor>>
51         dual_matrix(normalized_bounded_dual_delta.data(), num_weight_vectors,
52                     1);
53     const Eigen::array<Eigen::IndexPair<int>, 1> product_dims = {
54         Eigen::IndexPair<int>(1, 0)};
55     // This computes delta_w += delta_vector / \lamdba * N.
56     deltas_.device(device) =
57         (deltas_.cast<double>() +
58          dual_matrix.contract(dense_vector.RowAsMatrix().cast<double>(),
59                               product_dims))
60             .cast<float>();
61   }
62 }
63 
UpdateSparseDeltaWeights(const Eigen::ThreadPoolDevice & device,const Example::SparseFeatures & sparse_features,const std::vector<double> & normalized_bounded_dual_delta)64 void FeatureWeightsSparseStorage::UpdateSparseDeltaWeights(
65     const Eigen::ThreadPoolDevice& device,
66     const Example::SparseFeatures& sparse_features,
67     const std::vector<double>& normalized_bounded_dual_delta) {
68   for (int64 k = 0; k < sparse_features.indices->size(); ++k) {
69     const double feature_value =
70         sparse_features.values == nullptr ? 1.0 : (*sparse_features.values)(k);
71     auto it = indices_to_id_.find((*sparse_features.indices)(k));
72     for (size_t l = 0; l < normalized_bounded_dual_delta.size(); ++l) {
73       deltas_(l, it->second) +=
74           feature_value * normalized_bounded_dual_delta[l];
75     }
76   }
77 }
78 
UpdateDeltaWeights(const Eigen::ThreadPoolDevice & device,const Example & example,const std::vector<double> & normalized_bounded_dual_delta)79 void ModelWeights::UpdateDeltaWeights(
80     const Eigen::ThreadPoolDevice& device, const Example& example,
81     const std::vector<double>& normalized_bounded_dual_delta) {
82   // Sparse weights.
83   for (size_t j = 0; j < sparse_weights_.size(); ++j) {
84     sparse_weights_[j].UpdateSparseDeltaWeights(
85         device, example.sparse_features_[j], normalized_bounded_dual_delta);
86   }
87 
88   // Dense weights.
89   for (size_t j = 0; j < dense_weights_.size(); ++j) {
90     dense_weights_[j].UpdateDenseDeltaWeights(
91         device, *example.dense_vectors_[j], normalized_bounded_dual_delta);
92   }
93 }
94 
Initialize(OpKernelContext * const context)95 Status ModelWeights::Initialize(OpKernelContext* const context) {
96   OpInputList sparse_indices_inputs;
97   TF_RETURN_IF_ERROR(
98       context->input_list("sparse_indices", &sparse_indices_inputs));
99   OpInputList sparse_weights_inputs;
100   TF_RETURN_IF_ERROR(
101       context->input_list("sparse_weights", &sparse_weights_inputs));
102   OpInputList dense_weights_inputs;
103   TF_RETURN_IF_ERROR(
104       context->input_list("dense_weights", &dense_weights_inputs));
105 
106   OpOutputList sparse_weights_outputs;
107   TF_RETURN_IF_ERROR(context->output_list("out_delta_sparse_weights",
108                                           &sparse_weights_outputs));
109 
110   OpOutputList dense_weights_outputs;
111   TF_RETURN_IF_ERROR(
112       context->output_list("out_delta_dense_weights", &dense_weights_outputs));
113 
114   for (int i = 0; i < sparse_weights_inputs.size(); ++i) {
115     Tensor* delta_t;
116     TF_RETURN_IF_ERROR(sparse_weights_outputs.allocate(
117         i, sparse_weights_inputs[i].shape(), &delta_t));
118     // Convert the input vector to a row matrix in internal representation.
119     auto deltas = delta_t->shaped<float, 2>({1, delta_t->NumElements()});
120     deltas.setZero();
121     sparse_weights_.emplace_back(FeatureWeightsSparseStorage{
122         sparse_indices_inputs[i].flat<int64>(),
123         sparse_weights_inputs[i].shaped<float, 2>(
124             {1, sparse_weights_inputs[i].NumElements()}),
125         deltas});
126   }
127 
128   // Reads in the weights, and allocates and initializes the delta weights.
129   const auto initialize_weights =
130       [&](const OpInputList& weight_inputs, OpOutputList* const weight_outputs,
131           std::vector<FeatureWeightsDenseStorage>* const feature_weights) {
132         for (int i = 0; i < weight_inputs.size(); ++i) {
133           Tensor* delta_t;
134           TF_RETURN_IF_ERROR(
135               weight_outputs->allocate(i, weight_inputs[i].shape(), &delta_t));
136           // Convert the input vector to a row matrix in internal
137           // representation.
138           auto deltas = delta_t->shaped<float, 2>({1, delta_t->NumElements()});
139           deltas.setZero();
140           feature_weights->emplace_back(FeatureWeightsDenseStorage{
141               weight_inputs[i].shaped<float, 2>(
142                   {1, weight_inputs[i].NumElements()}),
143               deltas});
144         }
145         return Status::OK();
146       };
147 
148   return initialize_weights(dense_weights_inputs, &dense_weights_outputs,
149                             &dense_weights_);
150 }
151 
152 // Computes the example statistics for given example, and model. Defined here
153 // as we need definition of ModelWeights and Regularizations.
ComputeWxAndWeightedExampleNorm(const int num_loss_partitions,const ModelWeights & model_weights,const Regularizations & regularization,const int num_weight_vectors) const154 const ExampleStatistics Example::ComputeWxAndWeightedExampleNorm(
155     const int num_loss_partitions, const ModelWeights& model_weights,
156     const Regularizations& regularization, const int num_weight_vectors) const {
157   ExampleStatistics result(num_weight_vectors);
158 
159   result.normalized_squared_norm =
160       squared_norm_ / regularization.symmetric_l2();
161 
162   // Compute w \dot x and prev_w \dot x.
163   // This is for sparse features contribution to the logit.
164   for (size_t j = 0; j < sparse_features_.size(); ++j) {
165     const Example::SparseFeatures& sparse_features = sparse_features_[j];
166     const FeatureWeightsSparseStorage& sparse_weights =
167         model_weights.sparse_weights()[j];
168 
169     for (int64 k = 0; k < sparse_features.indices->size(); ++k) {
170       const int64 feature_index = (*sparse_features.indices)(k);
171       const double feature_value = sparse_features.values == nullptr
172                                        ? 1.0
173                                        : (*sparse_features.values)(k);
174       for (int l = 0; l < num_weight_vectors; ++l) {
175         const float sparse_weight = sparse_weights.nominals(l, feature_index);
176         const double feature_weight =
177             sparse_weight +
178             sparse_weights.deltas(l, feature_index) * num_loss_partitions;
179         result.prev_wx[l] +=
180             feature_value * regularization.Shrink(sparse_weight);
181         result.wx[l] += feature_value * regularization.Shrink(feature_weight);
182       }
183     }
184   }
185 
186   // Compute w \dot x and prev_w \dot x.
187   // This is for dense features contribution to the logit.
188   for (size_t j = 0; j < dense_vectors_.size(); ++j) {
189     const Example::DenseVector& dense_vector = *dense_vectors_[j];
190     const FeatureWeightsDenseStorage& dense_weights =
191         model_weights.dense_weights()[j];
192 
193     const Eigen::Tensor<float, 2, Eigen::RowMajor> feature_weights =
194         dense_weights.nominals() +
195         dense_weights.deltas() *
196             dense_weights.deltas().constant(num_loss_partitions);
197     if (num_weight_vectors == 1) {
198       const Eigen::Tensor<float, 0, Eigen::RowMajor> prev_prediction =
199           (dense_vector.Row() *
200            regularization.EigenShrinkVector(
201                Eigen::TensorMap<Eigen::Tensor<const float, 1, Eigen::RowMajor>>(
202                    dense_weights.nominals().data(),
203                    dense_weights.nominals().dimension(1))))
204               .sum();
205       const Eigen::Tensor<float, 0, Eigen::RowMajor> prediction =
206           (dense_vector.Row() *
207            regularization.EigenShrinkVector(
208                Eigen::TensorMap<Eigen::Tensor<const float, 1, Eigen::RowMajor>>(
209                    feature_weights.data(), feature_weights.dimension(1))))
210               .sum();
211       result.prev_wx[0] += prev_prediction();
212       result.wx[0] += prediction();
213     } else {
214       const Eigen::array<Eigen::IndexPair<int>, 1> product_dims = {
215           Eigen::IndexPair<int>(1, 1)};
216       const Eigen::Tensor<float, 2, Eigen::RowMajor> prev_prediction =
217           regularization.EigenShrinkMatrix(dense_weights.nominals())
218               .contract(dense_vector.RowAsMatrix(), product_dims);
219       const Eigen::Tensor<float, 2, Eigen::RowMajor> prediction =
220           regularization.EigenShrinkMatrix(feature_weights)
221               .contract(dense_vector.RowAsMatrix(), product_dims);
222       // The result of "tensor contraction" (multiplication)  in the code
223       // above is of dimension num_weight_vectors * 1.
224       for (int l = 0; l < num_weight_vectors; ++l) {
225         result.prev_wx[l] += prev_prediction(l, 0);
226         result.wx[l] += prediction(l, 0);
227       }
228     }
229   }
230 
231   return result;
232 }
233 
234 // Examples contains all the training examples that SDCA uses for a mini-batch.
SampleAdaptiveProbabilities(const int num_loss_partitions,const Regularizations & regularization,const ModelWeights & model_weights,const TTypes<float>::Matrix example_state_data,const std::unique_ptr<DualLossUpdater> & loss_updater,const int num_weight_vectors)235 Status Examples::SampleAdaptiveProbabilities(
236     const int num_loss_partitions, const Regularizations& regularization,
237     const ModelWeights& model_weights,
238     const TTypes<float>::Matrix example_state_data,
239     const std::unique_ptr<DualLossUpdater>& loss_updater,
240     const int num_weight_vectors) {
241   if (num_weight_vectors != 1) {
242     return errors::InvalidArgument(
243         "Adaptive SDCA only works with binary SDCA, "
244         "where num_weight_vectors should be 1.");
245   }
246   // Compute the probabilities
247   for (int example_id = 0; example_id < num_examples(); ++example_id) {
248     const Example& example = examples_[example_id];
249     const double example_weight = example.example_weight();
250     float label = example.example_label();
251     const Status conversion_status = loss_updater->ConvertLabel(&label);
252     const ExampleStatistics example_statistics =
253         example.ComputeWxAndWeightedExampleNorm(num_loss_partitions,
254                                                 model_weights, regularization,
255                                                 num_weight_vectors);
256     const double kappa = example_state_data(example_id, 0) +
257                          loss_updater->PrimalLossDerivative(
258                              example_statistics.wx[0], label, 1.0);
259     probabilities_[example_id] = example_weight *
260                                  sqrt(examples_[example_id].squared_norm_ +
261                                       regularization.symmetric_l2() *
262                                           loss_updater->SmoothnessConstant()) *
263                                  std::abs(kappa);
264   }
265 
266   // Sample the index
267   random::DistributionSampler sampler(probabilities_);
268   GuardedPhiloxRandom generator;
269   generator.Init(0, 0);
270   auto local_gen = generator.ReserveSamples32(num_examples());
271   random::SimplePhilox random(&local_gen);
272   std::random_device rd;
273   std::mt19937 gen(rd());
274   std::uniform_real_distribution<> dis(0, 1);
275 
276   // We use a decay of 10: the probability of an example is divided by 10
277   // once that example is picked. A good approximation of that is to only
278   // keep a picked example with probability (1 / 10) ^ k where k is the
279   // number of times we already picked that example. We add a num_retries
280   // to avoid taking too long to sample. We then fill the sampled_index with
281   // unseen examples sorted by probabilities.
282   int id = 0;
283   int num_retries = 0;
284   while (id < num_examples() && num_retries < num_examples()) {
285     int picked_id = sampler.Sample(&random);
286     if (dis(gen) > MathUtil::IPow(0.1, sampled_count_[picked_id])) {
287       num_retries++;
288       continue;
289     }
290     sampled_count_[picked_id]++;
291     sampled_index_[id++] = picked_id;
292   }
293 
294   std::vector<std::pair<int, float>> examples_not_seen;
295   examples_not_seen.reserve(num_examples());
296   for (int i = 0; i < num_examples(); ++i) {
297     if (sampled_count_[i] == 0)
298       examples_not_seen.emplace_back(sampled_index_[i], probabilities_[i]);
299   }
300   std::sort(
301       examples_not_seen.begin(), examples_not_seen.end(),
302       [](const std::pair<int, float>& lhs, const std::pair<int, float>& rhs) {
303         return lhs.second > rhs.second;
304       });
305   for (int i = id; i < num_examples(); ++i) {
306     sampled_count_[i] = examples_not_seen[i - id].first;
307   }
308   return Status::OK();
309 }
310 
RandomShuffle()311 void Examples::RandomShuffle() {
312   std::iota(sampled_index_.begin(), sampled_index_.end(), 0);
313 
314   std::random_device rd;
315   std::mt19937 rng(rd());
316   std::shuffle(sampled_index_.begin(), sampled_index_.end(), rng);
317 }
318 
319 // TODO(sibyl-Aix6ihai): Refactor/shorten this function.
Initialize(OpKernelContext * const context,const ModelWeights & weights,const int num_sparse_features,const int num_sparse_features_with_values,const int num_dense_features)320 Status Examples::Initialize(OpKernelContext* const context,
321                             const ModelWeights& weights,
322                             const int num_sparse_features,
323                             const int num_sparse_features_with_values,
324                             const int num_dense_features) {
325   num_features_ = num_sparse_features + num_dense_features;
326 
327   OpInputList sparse_example_indices_inputs;
328   TF_RETURN_IF_ERROR(context->input_list("sparse_example_indices",
329                                          &sparse_example_indices_inputs));
330   OpInputList sparse_feature_indices_inputs;
331   TF_RETURN_IF_ERROR(context->input_list("sparse_feature_indices",
332                                          &sparse_feature_indices_inputs));
333   OpInputList sparse_feature_values_inputs;
334   if (num_sparse_features_with_values > 0) {
335     TF_RETURN_IF_ERROR(context->input_list("sparse_feature_values",
336                                            &sparse_feature_values_inputs));
337   }
338 
339   const Tensor* example_weights_t;
340   TF_RETURN_IF_ERROR(context->input("example_weights", &example_weights_t));
341   auto example_weights = example_weights_t->flat<float>();
342 
343   if (example_weights.size() >= std::numeric_limits<int>::max()) {
344     return errors::InvalidArgument(strings::Printf(
345         "Too many examples in a mini-batch: %zu > %d", example_weights.size(),
346         std::numeric_limits<int>::max()));
347   }
348 
349   // The static_cast here is safe since num_examples can be at max an int.
350   const int num_examples = static_cast<int>(example_weights.size());
351   const Tensor* example_labels_t;
352   TF_RETURN_IF_ERROR(context->input("example_labels", &example_labels_t));
353   auto example_labels = example_labels_t->flat<float>();
354 
355   OpInputList dense_features_inputs;
356   TF_RETURN_IF_ERROR(
357       context->input_list("dense_features", &dense_features_inputs));
358 
359   examples_.clear();
360   examples_.resize(num_examples);
361   probabilities_.resize(num_examples);
362   sampled_index_.resize(num_examples);
363   sampled_count_.resize(num_examples);
364   for (int example_id = 0; example_id < num_examples; ++example_id) {
365     Example* const example = &examples_[example_id];
366     example->sparse_features_.resize(num_sparse_features);
367     example->dense_vectors_.resize(num_dense_features);
368     example->example_weight_ = example_weights(example_id);
369     example->example_label_ = example_labels(example_id);
370   }
371   const DeviceBase::CpuWorkerThreads& worker_threads =
372       *context->device()->tensorflow_cpu_worker_threads();
373   TF_RETURN_IF_ERROR(CreateSparseFeatureRepresentation(
374       worker_threads, num_examples, num_sparse_features, weights,
375       sparse_example_indices_inputs, sparse_feature_indices_inputs,
376       sparse_feature_values_inputs, &examples_));
377   TF_RETURN_IF_ERROR(CreateDenseFeatureRepresentation(
378       worker_threads, num_examples, num_dense_features, weights,
379       dense_features_inputs, &examples_));
380   TF_RETURN_IF_ERROR(ComputeSquaredNormPerExample(
381       worker_threads, num_examples, num_sparse_features, num_dense_features,
382       &examples_));
383   return Status::OK();
384 }
385 
CreateSparseFeatureRepresentation(const DeviceBase::CpuWorkerThreads & worker_threads,const int num_examples,const int num_sparse_features,const ModelWeights & weights,const OpInputList & sparse_example_indices_inputs,const OpInputList & sparse_feature_indices_inputs,const OpInputList & sparse_feature_values_inputs,std::vector<Example> * const examples)386 Status Examples::CreateSparseFeatureRepresentation(
387     const DeviceBase::CpuWorkerThreads& worker_threads, const int num_examples,
388     const int num_sparse_features, const ModelWeights& weights,
389     const OpInputList& sparse_example_indices_inputs,
390     const OpInputList& sparse_feature_indices_inputs,
391     const OpInputList& sparse_feature_values_inputs,
392     std::vector<Example>* const examples) {
393   mutex mu;
394   Status result;  // Guarded by mu
395   auto parse_partition = [&](const int64 begin, const int64 end) {
396     // The static_cast here is safe since begin and end can be at most
397     // num_examples which is an int.
398     for (int i = static_cast<int>(begin); i < end; ++i) {
399       auto example_indices =
400           sparse_example_indices_inputs[i].template flat<int64>();
401       auto feature_indices =
402           sparse_feature_indices_inputs[i].template flat<int64>();
403 
404       // Parse features for each example. Features for a particular example
405       // are at the offsets (start_id, end_id]
406       int start_id = -1;
407       int end_id = 0;
408       for (int example_id = 0; example_id < num_examples; ++example_id) {
409         start_id = end_id;
410         while (end_id < example_indices.size() &&
411                example_indices(end_id) == example_id) {
412           ++end_id;
413         }
414         Example::SparseFeatures* const sparse_features =
415             &(*examples)[example_id].sparse_features_[i];
416         if (start_id < example_indices.size() &&
417             example_indices(start_id) == example_id) {
418           sparse_features->indices.reset(new UnalignedInt64Vector(
419               &(feature_indices(start_id)), end_id - start_id));
420           if (sparse_feature_values_inputs.size() > i) {
421             auto feature_weights =
422                 sparse_feature_values_inputs[i].flat<float>();
423             sparse_features->values.reset(new UnalignedFloatVector(
424                 &(feature_weights(start_id)), end_id - start_id));
425           }
426           // If features are non empty.
427           if (end_id - start_id > 0) {
428             // TODO(sibyl-Aix6ihai): Write this efficiently using vectorized
429             // operations from eigen.
430             for (int64 k = 0; k < sparse_features->indices->size(); ++k) {
431               const int64 feature_index = (*sparse_features->indices)(k);
432               if (!weights.SparseIndexValid(i, feature_index)) {
433                 mutex_lock l(mu);
434                 result = errors::InvalidArgument(
435                     "Found sparse feature indices out of valid range: ",
436                     (*sparse_features->indices)(k));
437                 return;
438               }
439             }
440           }
441         } else {
442           // Add a Tensor that has size 0.
443           sparse_features->indices.reset(
444               new UnalignedInt64Vector(&(feature_indices(0)), 0));
445           // If values exist for this feature group.
446           if (sparse_feature_values_inputs.size() > i) {
447             auto feature_weights =
448                 sparse_feature_values_inputs[i].flat<float>();
449             sparse_features->values.reset(
450                 new UnalignedFloatVector(&(feature_weights(0)), 0));
451           }
452         }
453       }
454     }
455   };
456   // For each column, the cost of parsing it is O(num_examples). We use
457   // num_examples here, as empirically Shard() creates the right amount of
458   // threads based on the problem size.
459   // TODO(sibyl-Aix6ihai): Tune this as a function of dataset size.
460   const int64 kCostPerUnit = num_examples;
461   Shard(worker_threads.num_threads, worker_threads.workers, num_sparse_features,
462         kCostPerUnit, parse_partition);
463   return result;
464 }
465 
CreateDenseFeatureRepresentation(const DeviceBase::CpuWorkerThreads & worker_threads,const int num_examples,const int num_dense_features,const ModelWeights & weights,const OpInputList & dense_features_inputs,std::vector<Example> * const examples)466 Status Examples::CreateDenseFeatureRepresentation(
467     const DeviceBase::CpuWorkerThreads& worker_threads, const int num_examples,
468     const int num_dense_features, const ModelWeights& weights,
469     const OpInputList& dense_features_inputs,
470     std::vector<Example>* const examples) {
471   mutex mu;
472   Status result;  // Guarded by mu
473   auto parse_partition = [&](const int64 begin, const int64 end) {
474     // The static_cast here is safe since begin and end can be at most
475     // num_examples which is an int.
476     for (int i = static_cast<int>(begin); i < end; ++i) {
477       auto dense_features = dense_features_inputs[i].template matrix<float>();
478       for (int example_id = 0; example_id < num_examples; ++example_id) {
479         (*examples)[example_id].dense_vectors_[i].reset(
480             new Example::DenseVector{dense_features, example_id});
481       }
482       if (!weights.DenseIndexValid(i, dense_features.dimension(1) - 1)) {
483         mutex_lock l(mu);
484         result = errors::InvalidArgument(
485             "More dense features than we have parameters for: ",
486             dense_features.dimension(1));
487         return;
488       }
489     }
490   };
491   // TODO(sibyl-Aix6ihai): Tune this as a function of dataset size.
492   const int64 kCostPerUnit = num_examples;
493   Shard(worker_threads.num_threads, worker_threads.workers, num_dense_features,
494         kCostPerUnit, parse_partition);
495   return result;
496 }
497 
ComputeSquaredNormPerExample(const DeviceBase::CpuWorkerThreads & worker_threads,const int num_examples,const int num_sparse_features,const int num_dense_features,std::vector<Example> * const examples)498 Status Examples::ComputeSquaredNormPerExample(
499     const DeviceBase::CpuWorkerThreads& worker_threads, const int num_examples,
500     const int num_sparse_features, const int num_dense_features,
501     std::vector<Example>* const examples) {
502   mutex mu;
503   Status result;  // Guarded by mu
504   // Compute norm of examples.
505   auto compute_example_norm = [&](const int64 begin, const int64 end) {
506     // The static_cast here is safe since begin and end can be at most
507     // num_examples which is an int.
508     gtl::FlatSet<int64> previous_indices;
509     for (int example_id = static_cast<int>(begin); example_id < end;
510          ++example_id) {
511       double squared_norm = 0;
512       Example* const example = &(*examples)[example_id];
513       for (int j = 0; j < num_sparse_features; ++j) {
514         const Example::SparseFeatures& sparse_features =
515             example->sparse_features_[j];
516         previous_indices.clear();
517         for (int64 k = 0; k < sparse_features.indices->size(); ++k) {
518           const int64 feature_index = (*sparse_features.indices)(k);
519           if (previous_indices.insert(feature_index).second == false) {
520             mutex_lock l(mu);
521             result =
522                 errors::InvalidArgument("Duplicate index in sparse vector.");
523             return;
524           }
525           const double feature_value = sparse_features.values == nullptr
526                                            ? 1.0
527                                            : (*sparse_features.values)(k);
528           squared_norm += feature_value * feature_value;
529         }
530       }
531       for (int j = 0; j < num_dense_features; ++j) {
532         const Eigen::Tensor<float, 0, Eigen::RowMajor> sn =
533             example->dense_vectors_[j]->Row().square().sum();
534         squared_norm += sn();
535       }
536       example->squared_norm_ = squared_norm;
537     }
538   };
539   // TODO(sibyl-Aix6ihai): Compute the cost optimally.
540   const int64 kCostPerUnit = num_dense_features + num_sparse_features;
541   Shard(worker_threads.num_threads, worker_threads.workers, num_examples,
542         kCostPerUnit, compute_example_norm);
543   return result;
544 }
545 
546 }  // namespace sdca
547 }  // namespace tensorflow
548