1 /**
2 * Copyright 2019 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "common/common.h"
18 #include "gtest/gtest.h"
19 #include "minddata/dataset/util/task_manager.h"
20 #include "minddata/dataset/util/queue.h"
21 #include <atomic>
22 #include <chrono>
23 #include <random>
24 #include "utils/log_adapter.h"
25
26 using namespace mindspore::dataset;
27 using mindspore::LogStream;
28 using mindspore::ExceptionType::NoExceptionType;
29 using mindspore::MsLogLevel::INFO;
30
31 class MindDataTestQueue : public UT::Common {
32 public:
MindDataTestQueue()33 MindDataTestQueue() {}
34
SetUp()35 void SetUp() {}
36 };
37
38 int gRefCountDestructorCalled;
39
40 class RefCount {
41 public:
RefCount()42 RefCount() : v_(nullptr) {}
RefCount(int x)43 explicit RefCount(int x) : v_(std::make_shared<int>(x)) {}
RefCount(const RefCount & o)44 RefCount(const RefCount &o) : v_(o.v_) {}
~RefCount()45 ~RefCount() {
46 MS_LOG(DEBUG) << "Destructor of RefCount called" << std::endl;
47 gRefCountDestructorCalled++;
48 }
operator =(const RefCount & o)49 RefCount &operator=(const RefCount &o) {
50 v_ = o.v_;
51 return *this;
52 }
RefCount(RefCount && o)53 RefCount(RefCount &&o) : v_(std::move(o.v_)) {}
operator =(RefCount && o)54 RefCount &operator=(RefCount &&o) {
55 if (&o != this) {
56 v_ = std::move(o.v_);
57 }
58 return *this;
59 }
60
61 std::shared_ptr<int> v_;
62 };
63
TEST_F(MindDataTestQueue,Test1)64 TEST_F(MindDataTestQueue, Test1) {
65 // Passing shared pointer along the queue
66 Queue<std::shared_ptr<int>> que(3);
67 std::shared_ptr<int> a = std::make_shared<int>(20);
68 Status rc = que.Add(a);
69 ASSERT_TRUE(rc.IsOk());
70 // Use count should be 2 right now. a plus the one in the queue.
71 ASSERT_EQ(a.use_count(), 2);
72 std::shared_ptr<int> b;
73 rc = que.PopFront(&b);
74 ASSERT_TRUE(rc.IsOk());
75 ASSERT_EQ(*b, 20);
76 // Use count should remain 2. a and b. No copy in the queue.
77 ASSERT_EQ(a.use_count(), 2);
78 a.reset(new int(5));
79 ASSERT_EQ(a.use_count(), 1);
80 // Push again but expect a is nullptr after push
81 rc = que.Add(std::move(a));
82 ASSERT_TRUE(rc.IsOk());
83 ASSERT_EQ(a.use_count(), 0);
84 rc = que.PopFront(&b);
85 ASSERT_TRUE(rc.IsOk());
86 ASSERT_EQ(*b, 5);
87 ASSERT_EQ(b.use_count(), 1);
88 // Test construct in place
89 rc = que.EmplaceBack(std::make_shared<int>(100));
90 ASSERT_TRUE(rc.IsOk());
91 rc = que.PopFront(&b);
92 ASSERT_TRUE(rc.IsOk());
93 ASSERT_EQ(*b, 100);
94 ASSERT_EQ(b.use_count(), 1);
95 // Test the destructor of the Queue by add an element in the queue without popping it and let the queue go
96 // out of scope.
97 rc = que.EmplaceBack(std::make_shared<int>(2000));
98 ASSERT_TRUE(rc.IsOk());
99 }
100
TEST_F(MindDataTestQueue,Test2)101 TEST_F(MindDataTestQueue, Test2) {
102 // Passing status object
103 Queue<Status> que(3);
104 Status rc_send(StatusCode::kMDUnexpectedError, __LINE__, __FILE__, "Oops");
105 Status rc = que.Add(rc_send);
106 ASSERT_TRUE(rc.IsOk());
107 Status rc_recv;
108 rc = que.PopFront(&rc_recv);
109 ASSERT_TRUE(rc.IsOk());
110 ASSERT_EQ(rc_recv, rc_send);
111 rc = que.EmplaceBack(StatusCode::kMDOutOfMemory, "Test emplace");
112 ASSERT_TRUE(rc.IsOk());
113 Status rc_recv2;
114 rc = que.PopFront(&rc_recv2);
115 ASSERT_TRUE(rc.IsOk());
116 ASSERT_TRUE(rc_recv2 == StatusCode::kMDOutOfMemory);
117 }
118
TEST_F(MindDataTestQueue,Test3)119 TEST_F(MindDataTestQueue, Test3) {
120 Queue<std::unique_ptr<int>> que(3);
121 std::unique_ptr<int> a(new int(3));
122 Status rc = que.Add(std::move(a));
123 ASSERT_TRUE(rc.IsOk());
124 ASSERT_EQ(a.get(), nullptr);
125 std::unique_ptr<int> b;
126 rc = que.PopFront(&b);
127 ASSERT_TRUE(rc.IsOk());
128 ASSERT_EQ(*b, 3);
129 rc = que.EmplaceBack(new int(40));
130 ASSERT_TRUE(rc.IsOk());
131 rc = que.PopFront(&b);
132 ASSERT_TRUE(rc.IsOk());
133 ASSERT_EQ(*b, 40);
134 }
135
test4()136 void test4() {
137 gRefCountDestructorCalled = 0;
138 // Pass a structure along the queue.
139 Queue<RefCount> que(3);
140 RefCount a(3);
141 Status rc = que.Add(a);
142 ASSERT_TRUE(rc.IsOk());
143 RefCount b;
144 rc = que.PopFront(&b);
145 ASSERT_TRUE(rc.IsOk());
146 ASSERT_EQ(b.v_.use_count(), 2);
147 ASSERT_EQ(*(b.v_.get()), 3);
148 // Test the destructor of the Queue by adding an element without popping.
149 rc = que.EmplaceBack(10);
150 ASSERT_TRUE(rc.IsOk());
151 }
152
TEST_F(MindDataTestQueue,Test4)153 TEST_F(MindDataTestQueue, Test4) { test4(); }
154
TEST_F(MindDataTestQueue,Test5)155 TEST_F(MindDataTestQueue, Test5) {
156 test4();
157 // Assume we have run Test4. The destructor of the RefCount should be called 4 times.
158 // One for a. One for b. One for the stale element in the queue. 3 more for
159 // the one in the queue (but they are empty).
160 ASSERT_EQ(gRefCountDestructorCalled, 6);
161 }
162
TEST_F(MindDataTestQueue,Test6)163 TEST_F(MindDataTestQueue, Test6) {
164 // Create a list of queues
165 QueueList<std::unique_ptr<int>> my_list_of_queues;
166 const int chosen_queue_index = 2;
167 const int num_queues = 4;
168 const int queue_capacity = 3;
169 my_list_of_queues.Init(num_queues, queue_capacity);
170 // Now try to insert a number into a specific queue and pop it
171 std::unique_ptr<int> a(new int(99));
172 Status rc = my_list_of_queues[chosen_queue_index]->Add(std::move(a));
173 ASSERT_TRUE(rc.IsOk());
174 std::unique_ptr<int> pepped_value;
175 rc = my_list_of_queues[chosen_queue_index]->PopFront(&pepped_value);
176 ASSERT_TRUE(rc.IsOk());
177 MS_LOG(INFO) << "Popped value " << *pepped_value << " from queue index " << chosen_queue_index;
178 ASSERT_EQ(*pepped_value, 99);
179 }
180