1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #ifndef TENSORFLOW_CORE_KERNELS_CONCAT_LIB_CPU_H_
17 #define TENSORFLOW_CORE_KERNELS_CONCAT_LIB_CPU_H_
18
19 #define EIGEN_USE_THREADS
20
21 #include <vector>
22 #include "tensorflow/core/framework/register_types.h"
23 #include "tensorflow/core/kernels/concat_lib.h"
24 #include "tensorflow/core/util/work_sharder.h"
25
26 namespace tensorflow {
27
28 // ElementCopier must be a struct with a single Copy function, which is passed
29 // the output pointer, input pointer, input index, and number of elements to
30 // copy from input to output.
31 template <typename T, typename ElementCopier>
ConcatCPUImpl(DeviceBase * d,const std::vector<std::unique_ptr<typename TTypes<T,2>::ConstMatrix>> & inputs,int64 cost_per_unit,ElementCopier copier,typename TTypes<T,2>::Matrix * output)32 void ConcatCPUImpl(
33 DeviceBase* d,
34 const std::vector<std::unique_ptr<typename TTypes<T, 2>::ConstMatrix>>&
35 inputs,
36 int64 cost_per_unit, ElementCopier copier,
37 typename TTypes<T, 2>::Matrix* output) {
38 size_t num_inputs = inputs.size();
39
40 std::vector<ptrdiff_t> sizes;
41 sizes.reserve(num_inputs);
42 int64 row_size = 0;
43 for (const auto& input : inputs) {
44 sizes.push_back(input->dimension(1));
45 row_size += sizes.back();
46 }
47
48 auto worker_threads = d->tensorflow_cpu_worker_threads();
49 int num_threads = std::min(4, worker_threads->num_threads);
50 // strings define a different amount of work (generally much more) compared
51 // with standard POD, so we parallelize differently.
52 if (!std::is_same<T, string>::value) {
53 num_threads =
54 static_cast<int>(std::min<int64>(num_threads, output->size() / 4096));
55 }
56 // Single threaded mode.
57 // TODO(dga): Deduplicate this code w.r.t. sharded code below.
58 if (num_threads == 0) {
59 T* out = &(*output)(0, 0);
60 std::vector<const T*> inp;
61 inp.reserve(num_inputs);
62 for (const auto& input : inputs) {
63 inp.push_back(&(*input)(0, 0));
64 }
65 const int64 dim0 = output->dimension(0);
66 for (int64 i = 0; i < dim0; ++i) {
67 for (int64 j = 0; j < num_inputs; ++j) {
68 auto size = sizes[j];
69 copier.Copy(out, inp[j], j, size);
70 out += size;
71 inp[j] += size;
72 }
73 }
74 return;
75 }
76
77 // Sharded mode.
78 auto work = [&row_size, &sizes, &inputs, &output, &copier, &num_inputs](
79 int64 start, int64 end) {
80 int64 skipped_rows = start / row_size;
81 T* out = output->data() + skipped_rows * row_size;
82 T* out_start = output->data() + start;
83 T* out_end = output->data() + end;
84
85 // Handle partial row at start
86 if (out < out_start) {
87 for (size_t j = 0; j < num_inputs; ++j) {
88 ptrdiff_t size = sizes[j];
89 ptrdiff_t offset = out_start - out;
90 if (size <= offset) {
91 out += size;
92 continue;
93 }
94 const T* inp = &(*inputs[j])(skipped_rows, 0);
95 if (offset > 0) {
96 out += offset;
97 inp += offset;
98 size -= offset;
99 }
100 size = std::min(size, out_end - out);
101 if (size <= 0) break;
102 copier.Copy(out, inp, j, size);
103 out += size;
104 }
105 ++skipped_rows;
106 }
107 if (out == out_end) return;
108 CHECK(out >= out_start);
109 CHECK(out < out_end);
110
111 // Copy remaining data.
112 std::vector<const T*> inp;
113 inp.reserve(num_inputs);
114 for (const auto& input : inputs) {
115 inp.push_back(&(*input)(skipped_rows, 0));
116 }
117 const int64 dim0 = output->dimension(0);
118 for (int64 i = skipped_rows; i < dim0; ++i) {
119 for (int64 j = 0; j < num_inputs; ++j) {
120 ptrdiff_t size = std::min(sizes[j], out_end - out);
121 copier.Copy(out, inp[j], j, size);
122 out += size;
123 inp[j] += size;
124 if (out == out_end) return;
125 }
126 }
127 };
128 Shard(worker_threads->num_threads, worker_threads->workers, output->size(),
129 cost_per_unit, work);
130 }
131
132 #ifdef TENSORFLOW_USE_SYCL
133 template <typename T, typename ElementCopier>
ConcatSYCLImpl(const Eigen::SyclDevice & d,const std::vector<std::unique_ptr<typename TTypes<T,2>::ConstMatrix>> & inputs,int64 cost_per_unit,ElementCopier copier,typename TTypes<T,2>::Matrix * output)134 void ConcatSYCLImpl(
135 const Eigen::SyclDevice& d,
136 const std::vector<std::unique_ptr<typename TTypes<T, 2>::ConstMatrix>>&
137 inputs,
138 int64 cost_per_unit, ElementCopier copier,
139 typename TTypes<T, 2>::Matrix* output) {
140 size_t num_inputs = inputs.size();
141
142 std::vector<ptrdiff_t> sizes;
143 sizes.reserve(num_inputs);
144 int64 row_size = 0;
145 for (const auto& input : inputs) {
146 sizes.push_back(input->dimension(1));
147 row_size += sizes.back();
148 }
149
150 T* out = &(*output)(0, 0);
151 std::vector<const T*> inp;
152 inp.reserve(num_inputs);
153 for (const auto& input : inputs) {
154 inp.push_back(&(*input)(0, 0));
155 }
156 const int64 dim0 = output->dimension(0);
157 for (int64 i = 0; i < dim0; ++i) {
158 for (int64 j = 0; j < num_inputs; ++j) {
159 auto size = sizes[j];
160 d.memcpy(out, inp[j], size * sizeof(T));
161 out += size;
162 inp[j] += size;
163 }
164 }
165 }
166 #endif // TENSORFLOW_USE_SYCL
167 } // namespace tensorflow
168
169 #endif // TENSORFLOW_CORE_KERNELS_CONCAT_LIB_CPU_H_
170