1 /** 2 * Copyright 2019 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CONNECTOR_H_ 17 #define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CONNECTOR_H_ 18 19 #include <memory> 20 #include <string> 21 #include <utility> 22 #include <vector> 23 #include "minddata/dataset/util/task_manager.h" 24 #include "minddata/dataset/util/queue.h" 25 #include "minddata/dataset/util/services.h" 26 #include "minddata/dataset/util/cond_var.h" 27 28 namespace mindspore { 29 namespace dataset { 30 // Connector is a communication data structure between two group of threads that 31 // preserve the order. 32 // 33 // Example use case: 34 // An initial tasks-list of [1,2,3,4,5,6,7,8,9] with 5 threads getting/processing elements from that list, 35 // and pushing the processed elements to a Connector in any order whoever finishes processing first. 36 // If the consumer of the Connector is single threaded, when the consumer pop() the 37 // element from the Connector one by one, it will get [1,2,3,4,5,6,7,8,9]. 38 // 39 // Requirements: 40 // 1. Each thread in the group of consumer or producer threads must be assigned ids starting from 0. 41 // 2. If your multi-threads program is not reading from a Connector class but 42 // want to push to a Connector class, you must follow roundrobin element distribution, 43 // i.e., the thread-id0 must have the first element, thread-id1 has the second element, 44 // and so on; then each of this worker can push to the Connector class async in parallel. 45 // 46 // Blocking conditions: 47 // 1. Connector.push(int, T) can block when the internal queue it's trying to push is full. 48 // 2. Connector.pop(int) can block when 49 // - The internal queue it's trying to pop is empty. 50 // - The caller thread of pop() is not equal to the _expectConsumer. This is to enforce 51 // the ordering. 52 // 53 // Future improvement: 54 // 1. Fault tolerant: Right now, if one of the worker dies, the Connector will not work 55 // properly. 56 template <class T> 57 class Connector { 58 public: 59 // Name: Constructor 60 // Description: Initializing private members with the given input arguments. 61 // expect_consumer_ and pop_from_ is initialized to 0 as part of 62 // our requirements. We instantiate nProducers number of internal 63 // queues so that each producer thread can push to its queue without 64 // any sync overhead. 65 // Constructor of Connector 66 // Initializing private members with the given input arguments. 67 // _expectConsumer and _popFrom is initialized to 0 as part of 68 // our requirements. We instantiate nProducers number of internal 69 // queues so that each producer thread can push to its queue without 70 // any sync overhead. 71 // @param n_producers The number of threads producing data into this DbConnector. 72 // @param n_consumers The number of thread consuming data from this DbConnector. 73 // @param queue_capacity The number of element for each queue. Connector(int32_t n_producers,int32_t n_consumers,int32_t queue_capacity)74 Connector(int32_t n_producers, int32_t n_consumers, int32_t queue_capacity) 75 : num_producers_(n_producers), num_consumers_(n_consumers) { 76 MS_LOG(DEBUG) << "A connector is created with " << n_producers << " producers and " << n_consumers << " consumers."; 77 my_name_ = Services::GetUniqueID(); 78 // We require the consumers to have ids sequentially from 0 to the num_consumers_-1, 79 // Otherwise a ordered list of consumer ids have to be passed here. (not implemented yet) 80 expect_consumer_ = 0; 81 82 // Roundrobin pop starts from index 0 of the queues_. 83 pop_from_ = 0; 84 85 // Initialize the queues_ to have num_producers_ number of queues. 86 // Each queue is a blocking queue and has the same queue_capacity. 87 queues_.Init(num_producers_, queue_capacity); 88 } 89 90 // Destructor of Connector 91 virtual ~Connector() = default; 92 93 // Get an element from the Connector. 94 // @not Call to pop() can block the caller thread, see the blocking condition at the top of this file. 95 // @param worker_id The id of a worker thread calling this method. 96 // @param result The address of an object where the popped element will be placed. Pop(int32_t worker_id,T * result)97 virtual Status Pop(int32_t worker_id, // The worker-id of the caller. See the requirement at the top of this file. 98 T *result) noexcept { 99 { 100 MS_ASSERT(worker_id < num_consumers_); 101 std::unique_lock<std::mutex> lk(m_); 102 RETURN_IF_NOT_OK(cv_.Wait(&lk, [this, worker_id]() { return expect_consumer_ == worker_id; })); 103 RETURN_IF_NOT_OK(queues_[pop_from_]->PopFront(result)); 104 pop_from_ = (pop_from_ + 1) % num_producers_; 105 out_buffers_count_++; 106 expect_consumer_ = (expect_consumer_ + 1) % num_consumers_; 107 } 108 109 cv_.NotifyAll(); 110 return Status::OK(); 111 } 112 113 // Add an element into the DbConnector without the overhead of synchronization. 114 // It may block when the internal queue is full. 115 // The element passed to this function will be copied into the internal queue. 116 // @param worker_id The id of a worker thread calling this method. 117 // @param el A const lvalue element to be passed/added/pushed. Push(int32_t worker_id,const T & el)118 Status Push(int32_t worker_id, const T &el) noexcept { 119 MS_ASSERT(worker_id < static_cast<int32_t>(queues_.size())); 120 MS_ASSERT(queues_[worker_id] != nullptr); 121 return (queues_[worker_id]->Add(el)); 122 } 123 out_rows_count()124 auto out_rows_count() const { return out_buffers_count_.load(); } 125 126 // Add an element into the DbConnector without the overhead of synchronization. 127 // It may block when the internal queue is full. 128 // The element passed to this function will be forwarded into the internal queue. 129 // @param worker_id The id of a worker thread calling this method. 130 // @param el An element to be passed/added/pushed. Push(int32_t worker_id,T && el)131 virtual Status Push(int32_t worker_id, T &&el) noexcept { 132 MS_ASSERT(worker_id < static_cast<int32_t>(queues_.size())); 133 MS_ASSERT(queues_[worker_id] != nullptr); 134 return (queues_[worker_id]->Add(std::forward<T>(el))); 135 } 136 137 // Resets the internal index tracking of the queue so that it can be used again with new inputs, 138 // starting from the beginning. Reset()139 void Reset() { 140 for (int i = 0; i < queues_.size(); ++i) { 141 queues_[i]->ResetQue(); 142 } 143 expect_consumer_ = 0; 144 pop_from_ = 0; 145 out_buffers_count_ = 0; 146 MS_LOG(DEBUG) << "Connector counters reset."; 147 } 148 Print(std::ostream & out,bool showAll)149 void Print(std::ostream &out, bool showAll) const { 150 out << "\n--------- Connector ------------" 151 << "\nConnector Name : " << my_name_ << "\nNumber of consumers : " << num_consumers_ 152 << "\nNumber of producers : " << num_producers_ << "\n"; 153 } 154 155 friend std::ostream &operator<<(std::ostream &out, const Connector &con) { 156 con.print(out, false); 157 return out; 158 } 159 160 // Get current size of connector. size()161 int32_t size() const { 162 int32_t size = 0; 163 for (size_t i = 0; i < queues_.size(); ++i) { 164 size += queues_[i]->size(); 165 } 166 return size; 167 } 168 capacity()169 int32_t capacity() const { 170 int32_t capacity = 0; 171 for (size_t i = 0; i < queues_.size(); ++i) { 172 capacity += queues_[i]->capacity(); 173 } 174 return capacity; 175 } 176 177 // Register the internal resources with Task group for interruption service. 178 // @param vg 179 // @return Register(TaskGroup * vg)180 Status Register(TaskGroup *vg) { 181 Status rc = queues_.Register(vg); 182 if (rc.IsOk()) { 183 rc = cv_.Register(vg->GetIntrpService()); 184 } 185 return rc; 186 } 187 188 protected: 189 std::string my_name_; 190 191 // A list of Queues that are thread safe. 192 QueueList<T> queues_; 193 194 // The consumer that we allow to get the next data from pop() 195 int32_t expect_consumer_; 196 197 // The index to the queues_ where the next data should be popped. 198 int32_t pop_from_; 199 200 int32_t num_producers_; 201 int32_t num_consumers_; 202 203 // Used in the Pop(), when a thread call pop() but it is not the expect_consumer_. 204 std::mutex m_; 205 CondVar cv_; 206 std::atomic<std::int64_t> out_buffers_count_ = 0; 207 }; 208 } // namespace dataset 209 } // namespace mindspore 210 211 #endif // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CONNECTOR_H_ 212