1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #define EIGEN_USE_THREADS
17
18 #include "tensorflow/core/kernels/sdca_internal.h"
19
20 #include <limits>
21 #include <numeric>
22 #include <random>
23
24 #include "third_party/eigen3/unsupported/Eigen/CXX11/Tensor"
25 #include "tensorflow/core/lib/gtl/flatset.h"
26 #include "tensorflow/core/lib/math/math_util.h"
27 #include "tensorflow/core/lib/random/simple_philox.h"
28
29 #if defined(TENSORFLOW_USE_CUSTOM_CONTRACTION_KERNEL)
30 #include "tensorflow/core/kernels/eigen_contraction_kernel.h"
31 #endif
32
33 namespace tensorflow {
34 namespace sdca {
35
36 using UnalignedFloatVector = TTypes<const float>::UnalignedConstVec;
37 using UnalignedInt64Vector = TTypes<const int64>::UnalignedConstVec;
38
UpdateDenseDeltaWeights(const Eigen::ThreadPoolDevice & device,const Example::DenseVector & dense_vector,const std::vector<double> & normalized_bounded_dual_delta)39 void FeatureWeightsDenseStorage::UpdateDenseDeltaWeights(
40 const Eigen::ThreadPoolDevice& device,
41 const Example::DenseVector& dense_vector,
42 const std::vector<double>& normalized_bounded_dual_delta) {
43 const size_t num_weight_vectors = normalized_bounded_dual_delta.size();
44 if (num_weight_vectors == 1) {
45 deltas_.device(device) =
46 deltas_ + dense_vector.RowAsMatrix() *
47 deltas_.constant(normalized_bounded_dual_delta[0]);
48 } else {
49 // Transform the dual vector into a column matrix.
50 const Eigen::TensorMap<Eigen::Tensor<const double, 2, Eigen::RowMajor>>
51 dual_matrix(normalized_bounded_dual_delta.data(), num_weight_vectors,
52 1);
53 const Eigen::array<Eigen::IndexPair<int>, 1> product_dims = {
54 Eigen::IndexPair<int>(1, 0)};
55 // This computes delta_w += delta_vector / \lamdba * N.
56 deltas_.device(device) =
57 (deltas_.cast<double>() +
58 dual_matrix.contract(dense_vector.RowAsMatrix().cast<double>(),
59 product_dims))
60 .cast<float>();
61 }
62 }
63
UpdateSparseDeltaWeights(const Eigen::ThreadPoolDevice & device,const Example::SparseFeatures & sparse_features,const std::vector<double> & normalized_bounded_dual_delta)64 void FeatureWeightsSparseStorage::UpdateSparseDeltaWeights(
65 const Eigen::ThreadPoolDevice& device,
66 const Example::SparseFeatures& sparse_features,
67 const std::vector<double>& normalized_bounded_dual_delta) {
68 for (int64 k = 0; k < sparse_features.indices->size(); ++k) {
69 const double feature_value =
70 sparse_features.values == nullptr ? 1.0 : (*sparse_features.values)(k);
71 auto it = indices_to_id_.find((*sparse_features.indices)(k));
72 for (size_t l = 0; l < normalized_bounded_dual_delta.size(); ++l) {
73 deltas_(l, it->second) +=
74 feature_value * normalized_bounded_dual_delta[l];
75 }
76 }
77 }
78
UpdateDeltaWeights(const Eigen::ThreadPoolDevice & device,const Example & example,const std::vector<double> & normalized_bounded_dual_delta)79 void ModelWeights::UpdateDeltaWeights(
80 const Eigen::ThreadPoolDevice& device, const Example& example,
81 const std::vector<double>& normalized_bounded_dual_delta) {
82 // Sparse weights.
83 for (size_t j = 0; j < sparse_weights_.size(); ++j) {
84 sparse_weights_[j].UpdateSparseDeltaWeights(
85 device, example.sparse_features_[j], normalized_bounded_dual_delta);
86 }
87
88 // Dense weights.
89 for (size_t j = 0; j < dense_weights_.size(); ++j) {
90 dense_weights_[j].UpdateDenseDeltaWeights(
91 device, *example.dense_vectors_[j], normalized_bounded_dual_delta);
92 }
93 }
94
Initialize(OpKernelContext * const context)95 Status ModelWeights::Initialize(OpKernelContext* const context) {
96 OpInputList sparse_indices_inputs;
97 TF_RETURN_IF_ERROR(
98 context->input_list("sparse_indices", &sparse_indices_inputs));
99 OpInputList sparse_weights_inputs;
100 TF_RETURN_IF_ERROR(
101 context->input_list("sparse_weights", &sparse_weights_inputs));
102 OpInputList dense_weights_inputs;
103 TF_RETURN_IF_ERROR(
104 context->input_list("dense_weights", &dense_weights_inputs));
105
106 OpOutputList sparse_weights_outputs;
107 TF_RETURN_IF_ERROR(context->output_list("out_delta_sparse_weights",
108 &sparse_weights_outputs));
109
110 OpOutputList dense_weights_outputs;
111 TF_RETURN_IF_ERROR(
112 context->output_list("out_delta_dense_weights", &dense_weights_outputs));
113
114 for (int i = 0; i < sparse_weights_inputs.size(); ++i) {
115 Tensor* delta_t;
116 TF_RETURN_IF_ERROR(sparse_weights_outputs.allocate(
117 i, sparse_weights_inputs[i].shape(), &delta_t));
118 // Convert the input vector to a row matrix in internal representation.
119 auto deltas = delta_t->shaped<float, 2>({1, delta_t->NumElements()});
120 deltas.setZero();
121 sparse_weights_.emplace_back(FeatureWeightsSparseStorage{
122 sparse_indices_inputs[i].flat<int64>(),
123 sparse_weights_inputs[i].shaped<float, 2>(
124 {1, sparse_weights_inputs[i].NumElements()}),
125 deltas});
126 }
127
128 // Reads in the weights, and allocates and initializes the delta weights.
129 const auto initialize_weights =
130 [&](const OpInputList& weight_inputs, OpOutputList* const weight_outputs,
131 std::vector<FeatureWeightsDenseStorage>* const feature_weights) {
132 for (int i = 0; i < weight_inputs.size(); ++i) {
133 Tensor* delta_t;
134 TF_RETURN_IF_ERROR(
135 weight_outputs->allocate(i, weight_inputs[i].shape(), &delta_t));
136 // Convert the input vector to a row matrix in internal
137 // representation.
138 auto deltas = delta_t->shaped<float, 2>({1, delta_t->NumElements()});
139 deltas.setZero();
140 feature_weights->emplace_back(FeatureWeightsDenseStorage{
141 weight_inputs[i].shaped<float, 2>(
142 {1, weight_inputs[i].NumElements()}),
143 deltas});
144 }
145 return Status::OK();
146 };
147
148 return initialize_weights(dense_weights_inputs, &dense_weights_outputs,
149 &dense_weights_);
150 }
151
152 // Computes the example statistics for given example, and model. Defined here
153 // as we need definition of ModelWeights and Regularizations.
ComputeWxAndWeightedExampleNorm(const int num_loss_partitions,const ModelWeights & model_weights,const Regularizations & regularization,const int num_weight_vectors) const154 const ExampleStatistics Example::ComputeWxAndWeightedExampleNorm(
155 const int num_loss_partitions, const ModelWeights& model_weights,
156 const Regularizations& regularization, const int num_weight_vectors) const {
157 ExampleStatistics result(num_weight_vectors);
158
159 result.normalized_squared_norm =
160 squared_norm_ / regularization.symmetric_l2();
161
162 // Compute w \dot x and prev_w \dot x.
163 // This is for sparse features contribution to the logit.
164 for (size_t j = 0; j < sparse_features_.size(); ++j) {
165 const Example::SparseFeatures& sparse_features = sparse_features_[j];
166 const FeatureWeightsSparseStorage& sparse_weights =
167 model_weights.sparse_weights()[j];
168
169 for (int64 k = 0; k < sparse_features.indices->size(); ++k) {
170 const int64 feature_index = (*sparse_features.indices)(k);
171 const double feature_value = sparse_features.values == nullptr
172 ? 1.0
173 : (*sparse_features.values)(k);
174 for (int l = 0; l < num_weight_vectors; ++l) {
175 const float sparse_weight = sparse_weights.nominals(l, feature_index);
176 const double feature_weight =
177 sparse_weight +
178 sparse_weights.deltas(l, feature_index) * num_loss_partitions;
179 result.prev_wx[l] +=
180 feature_value * regularization.Shrink(sparse_weight);
181 result.wx[l] += feature_value * regularization.Shrink(feature_weight);
182 }
183 }
184 }
185
186 // Compute w \dot x and prev_w \dot x.
187 // This is for dense features contribution to the logit.
188 for (size_t j = 0; j < dense_vectors_.size(); ++j) {
189 const Example::DenseVector& dense_vector = *dense_vectors_[j];
190 const FeatureWeightsDenseStorage& dense_weights =
191 model_weights.dense_weights()[j];
192
193 const Eigen::Tensor<float, 2, Eigen::RowMajor> feature_weights =
194 dense_weights.nominals() +
195 dense_weights.deltas() *
196 dense_weights.deltas().constant(num_loss_partitions);
197 if (num_weight_vectors == 1) {
198 const Eigen::Tensor<float, 0, Eigen::RowMajor> prev_prediction =
199 (dense_vector.Row() *
200 regularization.EigenShrinkVector(
201 Eigen::TensorMap<Eigen::Tensor<const float, 1, Eigen::RowMajor>>(
202 dense_weights.nominals().data(),
203 dense_weights.nominals().dimension(1))))
204 .sum();
205 const Eigen::Tensor<float, 0, Eigen::RowMajor> prediction =
206 (dense_vector.Row() *
207 regularization.EigenShrinkVector(
208 Eigen::TensorMap<Eigen::Tensor<const float, 1, Eigen::RowMajor>>(
209 feature_weights.data(), feature_weights.dimension(1))))
210 .sum();
211 result.prev_wx[0] += prev_prediction();
212 result.wx[0] += prediction();
213 } else {
214 const Eigen::array<Eigen::IndexPair<int>, 1> product_dims = {
215 Eigen::IndexPair<int>(1, 1)};
216 const Eigen::Tensor<float, 2, Eigen::RowMajor> prev_prediction =
217 regularization.EigenShrinkMatrix(dense_weights.nominals())
218 .contract(dense_vector.RowAsMatrix(), product_dims);
219 const Eigen::Tensor<float, 2, Eigen::RowMajor> prediction =
220 regularization.EigenShrinkMatrix(feature_weights)
221 .contract(dense_vector.RowAsMatrix(), product_dims);
222 // The result of "tensor contraction" (multiplication) in the code
223 // above is of dimension num_weight_vectors * 1.
224 for (int l = 0; l < num_weight_vectors; ++l) {
225 result.prev_wx[l] += prev_prediction(l, 0);
226 result.wx[l] += prediction(l, 0);
227 }
228 }
229 }
230
231 return result;
232 }
233
234 // Examples contains all the training examples that SDCA uses for a mini-batch.
SampleAdaptiveProbabilities(const int num_loss_partitions,const Regularizations & regularization,const ModelWeights & model_weights,const TTypes<float>::Matrix example_state_data,const std::unique_ptr<DualLossUpdater> & loss_updater,const int num_weight_vectors)235 Status Examples::SampleAdaptiveProbabilities(
236 const int num_loss_partitions, const Regularizations& regularization,
237 const ModelWeights& model_weights,
238 const TTypes<float>::Matrix example_state_data,
239 const std::unique_ptr<DualLossUpdater>& loss_updater,
240 const int num_weight_vectors) {
241 if (num_weight_vectors != 1) {
242 return errors::InvalidArgument(
243 "Adaptive SDCA only works with binary SDCA, "
244 "where num_weight_vectors should be 1.");
245 }
246 // Compute the probabilities
247 for (int example_id = 0; example_id < num_examples(); ++example_id) {
248 const Example& example = examples_[example_id];
249 const double example_weight = example.example_weight();
250 float label = example.example_label();
251 const Status conversion_status = loss_updater->ConvertLabel(&label);
252 const ExampleStatistics example_statistics =
253 example.ComputeWxAndWeightedExampleNorm(num_loss_partitions,
254 model_weights, regularization,
255 num_weight_vectors);
256 const double kappa = example_state_data(example_id, 0) +
257 loss_updater->PrimalLossDerivative(
258 example_statistics.wx[0], label, 1.0);
259 probabilities_[example_id] = example_weight *
260 sqrt(examples_[example_id].squared_norm_ +
261 regularization.symmetric_l2() *
262 loss_updater->SmoothnessConstant()) *
263 std::abs(kappa);
264 }
265
266 // Sample the index
267 random::DistributionSampler sampler(probabilities_);
268 GuardedPhiloxRandom generator;
269 generator.Init(0, 0);
270 auto local_gen = generator.ReserveSamples32(num_examples());
271 random::SimplePhilox random(&local_gen);
272 std::random_device rd;
273 std::mt19937 gen(rd());
274 std::uniform_real_distribution<> dis(0, 1);
275
276 // We use a decay of 10: the probability of an example is divided by 10
277 // once that example is picked. A good approximation of that is to only
278 // keep a picked example with probability (1 / 10) ^ k where k is the
279 // number of times we already picked that example. We add a num_retries
280 // to avoid taking too long to sample. We then fill the sampled_index with
281 // unseen examples sorted by probabilities.
282 int id = 0;
283 int num_retries = 0;
284 while (id < num_examples() && num_retries < num_examples()) {
285 int picked_id = sampler.Sample(&random);
286 if (dis(gen) > MathUtil::IPow(0.1, sampled_count_[picked_id])) {
287 num_retries++;
288 continue;
289 }
290 sampled_count_[picked_id]++;
291 sampled_index_[id++] = picked_id;
292 }
293
294 std::vector<std::pair<int, float>> examples_not_seen;
295 examples_not_seen.reserve(num_examples());
296 for (int i = 0; i < num_examples(); ++i) {
297 if (sampled_count_[i] == 0)
298 examples_not_seen.emplace_back(sampled_index_[i], probabilities_[i]);
299 }
300 std::sort(
301 examples_not_seen.begin(), examples_not_seen.end(),
302 [](const std::pair<int, float>& lhs, const std::pair<int, float>& rhs) {
303 return lhs.second > rhs.second;
304 });
305 for (int i = id; i < num_examples(); ++i) {
306 sampled_count_[i] = examples_not_seen[i - id].first;
307 }
308 return Status::OK();
309 }
310
RandomShuffle()311 void Examples::RandomShuffle() {
312 std::iota(sampled_index_.begin(), sampled_index_.end(), 0);
313
314 std::random_device rd;
315 std::mt19937 rng(rd());
316 std::shuffle(sampled_index_.begin(), sampled_index_.end(), rng);
317 }
318
319 // TODO(sibyl-Aix6ihai): Refactor/shorten this function.
Initialize(OpKernelContext * const context,const ModelWeights & weights,const int num_sparse_features,const int num_sparse_features_with_values,const int num_dense_features)320 Status Examples::Initialize(OpKernelContext* const context,
321 const ModelWeights& weights,
322 const int num_sparse_features,
323 const int num_sparse_features_with_values,
324 const int num_dense_features) {
325 num_features_ = num_sparse_features + num_dense_features;
326
327 OpInputList sparse_example_indices_inputs;
328 TF_RETURN_IF_ERROR(context->input_list("sparse_example_indices",
329 &sparse_example_indices_inputs));
330 OpInputList sparse_feature_indices_inputs;
331 TF_RETURN_IF_ERROR(context->input_list("sparse_feature_indices",
332 &sparse_feature_indices_inputs));
333 OpInputList sparse_feature_values_inputs;
334 if (num_sparse_features_with_values > 0) {
335 TF_RETURN_IF_ERROR(context->input_list("sparse_feature_values",
336 &sparse_feature_values_inputs));
337 }
338
339 const Tensor* example_weights_t;
340 TF_RETURN_IF_ERROR(context->input("example_weights", &example_weights_t));
341 auto example_weights = example_weights_t->flat<float>();
342
343 if (example_weights.size() >= std::numeric_limits<int>::max()) {
344 return errors::InvalidArgument(strings::Printf(
345 "Too many examples in a mini-batch: %zu > %d", example_weights.size(),
346 std::numeric_limits<int>::max()));
347 }
348
349 // The static_cast here is safe since num_examples can be at max an int.
350 const int num_examples = static_cast<int>(example_weights.size());
351 const Tensor* example_labels_t;
352 TF_RETURN_IF_ERROR(context->input("example_labels", &example_labels_t));
353 auto example_labels = example_labels_t->flat<float>();
354
355 OpInputList dense_features_inputs;
356 TF_RETURN_IF_ERROR(
357 context->input_list("dense_features", &dense_features_inputs));
358
359 examples_.clear();
360 examples_.resize(num_examples);
361 probabilities_.resize(num_examples);
362 sampled_index_.resize(num_examples);
363 sampled_count_.resize(num_examples);
364 for (int example_id = 0; example_id < num_examples; ++example_id) {
365 Example* const example = &examples_[example_id];
366 example->sparse_features_.resize(num_sparse_features);
367 example->dense_vectors_.resize(num_dense_features);
368 example->example_weight_ = example_weights(example_id);
369 example->example_label_ = example_labels(example_id);
370 }
371 const DeviceBase::CpuWorkerThreads& worker_threads =
372 *context->device()->tensorflow_cpu_worker_threads();
373 TF_RETURN_IF_ERROR(CreateSparseFeatureRepresentation(
374 worker_threads, num_examples, num_sparse_features, weights,
375 sparse_example_indices_inputs, sparse_feature_indices_inputs,
376 sparse_feature_values_inputs, &examples_));
377 TF_RETURN_IF_ERROR(CreateDenseFeatureRepresentation(
378 worker_threads, num_examples, num_dense_features, weights,
379 dense_features_inputs, &examples_));
380 TF_RETURN_IF_ERROR(ComputeSquaredNormPerExample(
381 worker_threads, num_examples, num_sparse_features, num_dense_features,
382 &examples_));
383 return Status::OK();
384 }
385
CreateSparseFeatureRepresentation(const DeviceBase::CpuWorkerThreads & worker_threads,const int num_examples,const int num_sparse_features,const ModelWeights & weights,const OpInputList & sparse_example_indices_inputs,const OpInputList & sparse_feature_indices_inputs,const OpInputList & sparse_feature_values_inputs,std::vector<Example> * const examples)386 Status Examples::CreateSparseFeatureRepresentation(
387 const DeviceBase::CpuWorkerThreads& worker_threads, const int num_examples,
388 const int num_sparse_features, const ModelWeights& weights,
389 const OpInputList& sparse_example_indices_inputs,
390 const OpInputList& sparse_feature_indices_inputs,
391 const OpInputList& sparse_feature_values_inputs,
392 std::vector<Example>* const examples) {
393 mutex mu;
394 Status result; // Guarded by mu
395 auto parse_partition = [&](const int64 begin, const int64 end) {
396 // The static_cast here is safe since begin and end can be at most
397 // num_examples which is an int.
398 for (int i = static_cast<int>(begin); i < end; ++i) {
399 auto example_indices =
400 sparse_example_indices_inputs[i].template flat<int64>();
401 auto feature_indices =
402 sparse_feature_indices_inputs[i].template flat<int64>();
403
404 // Parse features for each example. Features for a particular example
405 // are at the offsets (start_id, end_id]
406 int start_id = -1;
407 int end_id = 0;
408 for (int example_id = 0; example_id < num_examples; ++example_id) {
409 start_id = end_id;
410 while (end_id < example_indices.size() &&
411 example_indices(end_id) == example_id) {
412 ++end_id;
413 }
414 Example::SparseFeatures* const sparse_features =
415 &(*examples)[example_id].sparse_features_[i];
416 if (start_id < example_indices.size() &&
417 example_indices(start_id) == example_id) {
418 sparse_features->indices.reset(new UnalignedInt64Vector(
419 &(feature_indices(start_id)), end_id - start_id));
420 if (sparse_feature_values_inputs.size() > i) {
421 auto feature_weights =
422 sparse_feature_values_inputs[i].flat<float>();
423 sparse_features->values.reset(new UnalignedFloatVector(
424 &(feature_weights(start_id)), end_id - start_id));
425 }
426 // If features are non empty.
427 if (end_id - start_id > 0) {
428 // TODO(sibyl-Aix6ihai): Write this efficiently using vectorized
429 // operations from eigen.
430 for (int64 k = 0; k < sparse_features->indices->size(); ++k) {
431 const int64 feature_index = (*sparse_features->indices)(k);
432 if (!weights.SparseIndexValid(i, feature_index)) {
433 mutex_lock l(mu);
434 result = errors::InvalidArgument(
435 "Found sparse feature indices out of valid range: ",
436 (*sparse_features->indices)(k));
437 return;
438 }
439 }
440 }
441 } else {
442 // Add a Tensor that has size 0.
443 sparse_features->indices.reset(
444 new UnalignedInt64Vector(&(feature_indices(0)), 0));
445 // If values exist for this feature group.
446 if (sparse_feature_values_inputs.size() > i) {
447 auto feature_weights =
448 sparse_feature_values_inputs[i].flat<float>();
449 sparse_features->values.reset(
450 new UnalignedFloatVector(&(feature_weights(0)), 0));
451 }
452 }
453 }
454 }
455 };
456 // For each column, the cost of parsing it is O(num_examples). We use
457 // num_examples here, as empirically Shard() creates the right amount of
458 // threads based on the problem size.
459 // TODO(sibyl-Aix6ihai): Tune this as a function of dataset size.
460 const int64 kCostPerUnit = num_examples;
461 Shard(worker_threads.num_threads, worker_threads.workers, num_sparse_features,
462 kCostPerUnit, parse_partition);
463 return result;
464 }
465
CreateDenseFeatureRepresentation(const DeviceBase::CpuWorkerThreads & worker_threads,const int num_examples,const int num_dense_features,const ModelWeights & weights,const OpInputList & dense_features_inputs,std::vector<Example> * const examples)466 Status Examples::CreateDenseFeatureRepresentation(
467 const DeviceBase::CpuWorkerThreads& worker_threads, const int num_examples,
468 const int num_dense_features, const ModelWeights& weights,
469 const OpInputList& dense_features_inputs,
470 std::vector<Example>* const examples) {
471 mutex mu;
472 Status result; // Guarded by mu
473 auto parse_partition = [&](const int64 begin, const int64 end) {
474 // The static_cast here is safe since begin and end can be at most
475 // num_examples which is an int.
476 for (int i = static_cast<int>(begin); i < end; ++i) {
477 auto dense_features = dense_features_inputs[i].template matrix<float>();
478 for (int example_id = 0; example_id < num_examples; ++example_id) {
479 (*examples)[example_id].dense_vectors_[i].reset(
480 new Example::DenseVector{dense_features, example_id});
481 }
482 if (!weights.DenseIndexValid(i, dense_features.dimension(1) - 1)) {
483 mutex_lock l(mu);
484 result = errors::InvalidArgument(
485 "More dense features than we have parameters for: ",
486 dense_features.dimension(1));
487 return;
488 }
489 }
490 };
491 // TODO(sibyl-Aix6ihai): Tune this as a function of dataset size.
492 const int64 kCostPerUnit = num_examples;
493 Shard(worker_threads.num_threads, worker_threads.workers, num_dense_features,
494 kCostPerUnit, parse_partition);
495 return result;
496 }
497
ComputeSquaredNormPerExample(const DeviceBase::CpuWorkerThreads & worker_threads,const int num_examples,const int num_sparse_features,const int num_dense_features,std::vector<Example> * const examples)498 Status Examples::ComputeSquaredNormPerExample(
499 const DeviceBase::CpuWorkerThreads& worker_threads, const int num_examples,
500 const int num_sparse_features, const int num_dense_features,
501 std::vector<Example>* const examples) {
502 mutex mu;
503 Status result; // Guarded by mu
504 // Compute norm of examples.
505 auto compute_example_norm = [&](const int64 begin, const int64 end) {
506 // The static_cast here is safe since begin and end can be at most
507 // num_examples which is an int.
508 gtl::FlatSet<int64> previous_indices;
509 for (int example_id = static_cast<int>(begin); example_id < end;
510 ++example_id) {
511 double squared_norm = 0;
512 Example* const example = &(*examples)[example_id];
513 for (int j = 0; j < num_sparse_features; ++j) {
514 const Example::SparseFeatures& sparse_features =
515 example->sparse_features_[j];
516 previous_indices.clear();
517 for (int64 k = 0; k < sparse_features.indices->size(); ++k) {
518 const int64 feature_index = (*sparse_features.indices)(k);
519 if (previous_indices.insert(feature_index).second == false) {
520 mutex_lock l(mu);
521 result =
522 errors::InvalidArgument("Duplicate index in sparse vector.");
523 return;
524 }
525 const double feature_value = sparse_features.values == nullptr
526 ? 1.0
527 : (*sparse_features.values)(k);
528 squared_norm += feature_value * feature_value;
529 }
530 }
531 for (int j = 0; j < num_dense_features; ++j) {
532 const Eigen::Tensor<float, 0, Eigen::RowMajor> sn =
533 example->dense_vectors_[j]->Row().square().sum();
534 squared_norm += sn();
535 }
536 example->squared_norm_ = squared_norm;
537 }
538 };
539 // TODO(sibyl-Aix6ihai): Compute the cost optimally.
540 const int64 kCostPerUnit = num_dense_features + num_sparse_features;
541 Shard(worker_threads.num_threads, worker_threads.workers, num_examples,
542 kCostPerUnit, compute_example_norm);
543 return result;
544 }
545
546 } // namespace sdca
547 } // namespace tensorflow
548