• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2019 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CONNECTOR_H_
17 #define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CONNECTOR_H_
18 
19 #include <memory>
20 #include <string>
21 #include <utility>
22 #include <vector>
23 #include "minddata/dataset/util/task_manager.h"
24 #include "minddata/dataset/util/queue.h"
25 #include "minddata/dataset/util/services.h"
26 #include "minddata/dataset/util/cond_var.h"
27 
28 namespace mindspore {
29 namespace dataset {
30 // Connector is a communication data structure between two group of threads that
31 // preserve the order.
32 //
33 // Example use case:
34 // An initial tasks-list of [1,2,3,4,5,6,7,8,9] with 5 threads getting/processing elements from that list,
35 // and pushing the processed elements to a Connector in any order whoever finishes processing first.
36 // If the consumer of the Connector is single threaded, when the consumer pop() the
37 // element from the Connector one by one, it will get [1,2,3,4,5,6,7,8,9].
38 //
39 // Requirements:
40 //   1. Each thread in the group of consumer or producer threads must be assigned ids starting from 0.
41 //   2. If your multi-threads program is not reading from a Connector class but
42 //      want to push to a Connector class, you must follow roundrobin element distribution,
43 //      i.e., the thread-id0 must have the first element, thread-id1 has the second element,
44 //      and so on; then each of this worker can push to the Connector class async in parallel.
45 //
46 // Blocking conditions:
47 //   1. Connector.push(int, T) can block when the internal queue it's trying to push is full.
48 //   2. Connector.pop(int) can block when
49 //        - The internal queue it's trying to pop is empty.
50 //        - The caller thread of pop() is not equal to the _expectConsumer. This is to enforce
51 //          the ordering.
52 //
53 // Future improvement:
54 //   1. Fault tolerant: Right now, if one of the worker dies, the Connector will not work
55 //      properly.
56 template <class T>
57 class Connector {
58  public:
59   // Name: Constructor
60   // Description: Initializing private members with the given input arguments.
61   //              expect_consumer_ and pop_from_ is initialized to 0 as part of
62   //              our requirements. We instantiate nProducers number of internal
63   //              queues so that each producer thread can push to its queue without
64   //              any sync overhead.
65   // Constructor of Connector
66   // Initializing private members with the given input arguments.
67   // _expectConsumer and _popFrom is initialized to 0 as part of
68   // our requirements. We instantiate nProducers number of internal
69   // queues so that each producer thread can push to its queue without
70   // any sync overhead.
71   // @param n_producers The number of threads producing data into this DbConnector.
72   // @param n_consumers The number of thread consuming data from this DbConnector.
73   // @param queue_capacity The number of element for each queue.
Connector(int32_t n_producers,int32_t n_consumers,int32_t queue_capacity)74   Connector(int32_t n_producers, int32_t n_consumers, int32_t queue_capacity)
75       : num_producers_(n_producers), num_consumers_(n_consumers) {
76     MS_LOG(DEBUG) << "A connector is created with " << n_producers << " producers and " << n_consumers << " consumers.";
77     my_name_ = Services::GetUniqueID();
78     // We require the consumers to have ids sequentially from 0 to the num_consumers_-1,
79     // Otherwise a ordered list of consumer ids have to be passed here. (not implemented yet)
80     expect_consumer_ = 0;
81 
82     // Roundrobin pop starts from index 0 of the queues_.
83     pop_from_ = 0;
84 
85     // Initialize the queues_ to have num_producers_ number of queues.
86     // Each queue is a blocking queue and has the same queue_capacity.
87     queues_.Init(num_producers_, queue_capacity);
88   }
89 
90   // Destructor of Connector
91   virtual ~Connector() = default;
92 
93   // Get an element from the Connector.
94   // @not Call to pop() can block the caller thread, see the blocking condition at the top of this file.
95   // @param worker_id The id of a worker thread calling this method.
96   // @param result The address of an object where the popped element will be placed.
Pop(int32_t worker_id,T * result)97   virtual Status Pop(int32_t worker_id,  // The worker-id of the caller. See the requirement at the top of this file.
98                      T *result) noexcept {
99     {
100       MS_ASSERT(worker_id < num_consumers_);
101       std::unique_lock<std::mutex> lk(m_);
102       RETURN_IF_NOT_OK(cv_.Wait(&lk, [this, worker_id]() { return expect_consumer_ == worker_id; }));
103       RETURN_IF_NOT_OK(queues_[pop_from_]->PopFront(result));
104       pop_from_ = (pop_from_ + 1) % num_producers_;
105       out_buffers_count_++;
106       expect_consumer_ = (expect_consumer_ + 1) % num_consumers_;
107     }
108 
109     cv_.NotifyAll();
110     return Status::OK();
111   }
112 
113   // Add an element into the DbConnector without the overhead of synchronization.
114   // It may block when the internal queue is full.
115   // The element passed to this function will be copied into the internal queue.
116   // @param worker_id The id of a worker thread calling this method.
117   // @param el A const lvalue element to be passed/added/pushed.
Push(int32_t worker_id,const T & el)118   Status Push(int32_t worker_id, const T &el) noexcept {
119     MS_ASSERT(worker_id < static_cast<int32_t>(queues_.size()));
120     MS_ASSERT(queues_[worker_id] != nullptr);
121     return (queues_[worker_id]->Add(el));
122   }
123 
out_rows_count()124   auto out_rows_count() const { return out_buffers_count_.load(); }
125 
126   // Add an element into the DbConnector without the overhead of synchronization.
127   // It may block when the internal queue is full.
128   // The element passed to this function will be forwarded into the internal queue.
129   // @param worker_id The id of a worker thread calling this method.
130   // @param el An element to be passed/added/pushed.
Push(int32_t worker_id,T && el)131   virtual Status Push(int32_t worker_id, T &&el) noexcept {
132     MS_ASSERT(worker_id < static_cast<int32_t>(queues_.size()));
133     MS_ASSERT(queues_[worker_id] != nullptr);
134     return (queues_[worker_id]->Add(std::forward<T>(el)));
135   }
136 
137   // Resets the internal index tracking of the queue so that it can be used again with new inputs,
138   // starting from the beginning.
Reset()139   void Reset() {
140     for (int i = 0; i < queues_.size(); ++i) {
141       queues_[i]->ResetQue();
142     }
143     expect_consumer_ = 0;
144     pop_from_ = 0;
145     out_buffers_count_ = 0;
146     MS_LOG(DEBUG) << "Connector counters reset.";
147   }
148 
Print(std::ostream & out,bool showAll)149   void Print(std::ostream &out, bool showAll) const {
150     out << "\n--------- Connector ------------"
151         << "\nConnector Name           : " << my_name_ << "\nNumber of consumers      : " << num_consumers_
152         << "\nNumber of producers      : " << num_producers_ << "\n";
153   }
154 
155   friend std::ostream &operator<<(std::ostream &out, const Connector &con) {
156     con.print(out, false);
157     return out;
158   }
159 
160   // Get current size of connector.
size()161   int32_t size() const {
162     int32_t size = 0;
163     for (size_t i = 0; i < queues_.size(); ++i) {
164       size += queues_[i]->size();
165     }
166     return size;
167   }
168 
capacity()169   int32_t capacity() const {
170     int32_t capacity = 0;
171     for (size_t i = 0; i < queues_.size(); ++i) {
172       capacity += queues_[i]->capacity();
173     }
174     return capacity;
175   }
176 
177   // Register the internal resources with Task group for interruption service.
178   // @param vg
179   // @return
Register(TaskGroup * vg)180   Status Register(TaskGroup *vg) {
181     Status rc = queues_.Register(vg);
182     if (rc.IsOk()) {
183       rc = cv_.Register(vg->GetIntrpService());
184     }
185     return rc;
186   }
187 
188  protected:
189   std::string my_name_;
190 
191   // A list of Queues that are thread safe.
192   QueueList<T> queues_;
193 
194   // The consumer that we allow to get the next data from pop()
195   int32_t expect_consumer_;
196 
197   // The index to the queues_ where the next data should be popped.
198   int32_t pop_from_;
199 
200   int32_t num_producers_;
201   int32_t num_consumers_;
202 
203   // Used in the Pop(), when a thread call pop() but it is not the expect_consumer_.
204   std::mutex m_;
205   CondVar cv_;
206   std::atomic<std::int64_t> out_buffers_count_ = 0;
207 };
208 }  // namespace dataset
209 }  // namespace mindspore
210 
211 #endif  // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CONNECTOR_H_
212