• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/compiler/xla/service/cpu/runtime_fork_join.h"
17 
18 #define EIGEN_USE_THREADS
19 
20 #include "third_party/eigen3/unsupported/Eigen/CXX11/Tensor"
21 #include "tensorflow/compiler/xla/executable_run_options.h"
22 #include "tensorflow/core/lib/core/blocking_counter.h"
23 #include "tensorflow/core/platform/dynamic_annotations.h"
24 #include "tensorflow/core/platform/logging.h"
25 #include "tensorflow/core/platform/types.h"
26 
27 using tensorflow::int32;
28 using tensorflow::int64;
29 using tensorflow::uint64;
30 
31 using ComputeFunctionType = void (*)(void*, const void*, const void**, void**,
32                                      int64*, uint64*);
33 
34 // Dispatches 'num_partitions - 1' calls to 'function_ptr' in parallel.
35 // Calls 'function_ptr' for first partition inline.
36 // Uses blocking counter to synchonize threads after parallel calls complete.
37 //
38 // The 'partitions' array has a total number of elements equal to
39 // 'num_partitions * num_partitioned_dims * 2' (the '2' is necessary to specify
40 // dimension start and limit indices).
41 //
42 // The 'partitions' array layout stores array elements in memory with dimension
43 // start limit as the most-minor dimension, followed by dimension, then
44 // partition.
45 //
46 // EX: Layout of 'partitions' array with 'num_partitions = 2', and
47 //     'num_partitioned_dims = 3'
48 //
49 //   [partition0_dim0_start]
50 //   [partition0_dim0_limit]
51 //   [partition0_dim1_start]
52 //   [partition0_dim1_limit]
53 //   [partition0_dim2_start]
54 //   [partition0_dim2_limit]
55 //   [partition1_dim0_start]
56 //   [partition1_dim0_limit]
57 //   [partition1_dim1_start]
58 //   [partition1_dim1_limit]
59 //   [partition1_dim2_start]
60 //   [partition1_dim2_limit]
61 //
__xla_cpu_runtime_ParallelForkJoin(void * result_ptr,const void * run_options_ptr,const void ** params,void ** buffer_table,uint64 * prof_counters,int32 num_partitions,int64 * partitions,int32 num_partitioned_dims,void * function_ptr)62 TF_ATTRIBUTE_NO_SANITIZE_MEMORY void __xla_cpu_runtime_ParallelForkJoin(
63     void* result_ptr, const void* run_options_ptr, const void** params,
64     void** buffer_table, uint64* prof_counters, int32 num_partitions,
65     int64* partitions, int32 num_partitioned_dims, void* function_ptr) {
66   VLOG(2) << "ParallelForkJoin ENTRY"
67           << " num_partitions: " << num_partitions
68           << " num_partitioned_dims: " << num_partitioned_dims;
69   CHECK_EQ(params, nullptr);
70   CHECK_GT(num_partitions, 1);
71   CHECK_GT(num_partitioned_dims, 0);
72   CHECK_NE(function_ptr, nullptr);
73   CHECK_NE(partitions, nullptr);
74   const xla::ExecutableRunOptions* run_options =
75       static_cast<const xla::ExecutableRunOptions*>(run_options_ptr);
76   CHECK_NE(run_options, nullptr);
77   CHECK_NE(run_options->intra_op_thread_pool(), nullptr);
78 
79   ComputeFunctionType function =
80       reinterpret_cast<ComputeFunctionType>(function_ptr);
81   // Compute partition stride in 'partitions' array.
82   const int64 stride = 2 * num_partitioned_dims;
83 
84   // Dispatch 'num_partitions - 1' compute functions to run in parallel.
85   tensorflow::BlockingCounter bc(num_partitions - 1);
86   for (int32 i = 1; i < num_partitions; ++i) {
87     const int64 offset = i * stride;
88     run_options->intra_op_thread_pool()->enqueueNoNotification(
89         [i, function, result_ptr, run_options_ptr, buffer_table, prof_counters,
90          partitions, offset, &bc]() {
91           function(result_ptr, run_options_ptr, nullptr, buffer_table,
92                    &partitions[offset], prof_counters);
93           bc.DecrementCount();
94           VLOG(3) << "ParallelForkJoin partition " << i << " done.";
95         });
96   }
97 
98   // Call first compute function inline.
99   function(result_ptr, run_options_ptr, params, buffer_table, &partitions[0],
100            prof_counters);
101   VLOG(3) << "ParallelForkJoin partition 0 done.";
102   bc.Wait();
103   VLOG(2) << "ParallelForkJoin EXIT";
104 }
105