1 /** 2 * Copyright 2019 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #include "common/common.h" 18 #include "gtest/gtest.h" 19 #include "minddata/dataset/util/task_manager.h" 20 #include "minddata/dataset/util/queue.h" 21 #include <atomic> 22 #include <chrono> 23 #include <random> 24 #include "utils/log_adapter.h" 25 26 using namespace mindspore::dataset; 27 using mindspore::LogStream; 28 using mindspore::ExceptionType::NoExceptionType; 29 using mindspore::MsLogLevel::INFO; 30 31 class MindDataTestQueue : public UT::Common { 32 public: 33 MindDataTestQueue() {} 34 35 void SetUp() {} 36 }; 37 38 int gRefCountDestructorCalled; 39 40 class RefCount { 41 public: 42 RefCount() : v_(nullptr) {} 43 explicit RefCount(int x) : v_(std::make_shared<int>(x)) {} 44 RefCount(const RefCount &o) : v_(o.v_) {} 45 ~RefCount() { 46 MS_LOG(DEBUG) << "Destructor of RefCount called" << std::endl; 47 gRefCountDestructorCalled++; 48 } 49 RefCount &operator=(const RefCount &o) { 50 v_ = o.v_; 51 return *this; 52 } 53 RefCount(RefCount &&o) : v_(std::move(o.v_)) {} 54 RefCount &operator=(RefCount &&o) { 55 if (&o != this) { 56 v_ = std::move(o.v_); 57 } 58 return *this; 59 } 60 61 std::shared_ptr<int> v_; 62 }; 63 64 TEST_F(MindDataTestQueue, Test1) { 65 // Passing shared pointer along the queue 66 Queue<std::shared_ptr<int>> que(3); 67 std::shared_ptr<int> a = std::make_shared<int>(20); 68 Status rc = que.Add(a); 69 ASSERT_TRUE(rc.IsOk()); 70 // Use count should be 2 right now. a plus the one in the queue. 71 ASSERT_EQ(a.use_count(), 2); 72 std::shared_ptr<int> b; 73 rc = que.PopFront(&b); 74 ASSERT_TRUE(rc.IsOk()); 75 ASSERT_EQ(*b, 20); 76 // Use count should remain 2. a and b. No copy in the queue. 77 ASSERT_EQ(a.use_count(), 2); 78 a.reset(new int(5)); 79 ASSERT_EQ(a.use_count(), 1); 80 // Push again but expect a is nullptr after push 81 rc = que.Add(std::move(a)); 82 ASSERT_TRUE(rc.IsOk()); 83 ASSERT_EQ(a.use_count(), 0); 84 rc = que.PopFront(&b); 85 ASSERT_TRUE(rc.IsOk()); 86 ASSERT_EQ(*b, 5); 87 ASSERT_EQ(b.use_count(), 1); 88 // Test construct in place 89 rc = que.EmplaceBack(std::make_shared<int>(100)); 90 ASSERT_TRUE(rc.IsOk()); 91 rc = que.PopFront(&b); 92 ASSERT_TRUE(rc.IsOk()); 93 ASSERT_EQ(*b, 100); 94 ASSERT_EQ(b.use_count(), 1); 95 // Test the destructor of the Queue by add an element in the queue without popping it and let the queue go 96 // out of scope. 97 rc = que.EmplaceBack(std::make_shared<int>(2000)); 98 ASSERT_TRUE(rc.IsOk()); 99 } 100 101 TEST_F(MindDataTestQueue, Test2) { 102 // Passing status object 103 Queue<Status> que(3); 104 Status rc_send(StatusCode::kMDUnexpectedError, __LINE__, __FILE__, "Oops"); 105 Status rc = que.Add(rc_send); 106 ASSERT_TRUE(rc.IsOk()); 107 Status rc_recv; 108 rc = que.PopFront(&rc_recv); 109 ASSERT_TRUE(rc.IsOk()); 110 ASSERT_EQ(rc_recv, rc_send); 111 rc = que.EmplaceBack(StatusCode::kMDOutOfMemory, "Test emplace"); 112 ASSERT_TRUE(rc.IsOk()); 113 Status rc_recv2; 114 rc = que.PopFront(&rc_recv2); 115 ASSERT_TRUE(rc.IsOk()); 116 ASSERT_TRUE(rc_recv2 == StatusCode::kMDOutOfMemory); 117 } 118 119 TEST_F(MindDataTestQueue, Test3) { 120 Queue<std::unique_ptr<int>> que(3); 121 std::unique_ptr<int> a(new int(3)); 122 Status rc = que.Add(std::move(a)); 123 ASSERT_TRUE(rc.IsOk()); 124 ASSERT_EQ(a.get(), nullptr); 125 std::unique_ptr<int> b; 126 rc = que.PopFront(&b); 127 ASSERT_TRUE(rc.IsOk()); 128 ASSERT_EQ(*b, 3); 129 rc = que.EmplaceBack(new int(40)); 130 ASSERT_TRUE(rc.IsOk()); 131 rc = que.PopFront(&b); 132 ASSERT_TRUE(rc.IsOk()); 133 ASSERT_EQ(*b, 40); 134 } 135 136 void test4() { 137 gRefCountDestructorCalled = 0; 138 // Pass a structure along the queue. 139 Queue<RefCount> que(3); 140 RefCount a(3); 141 Status rc = que.Add(a); 142 ASSERT_TRUE(rc.IsOk()); 143 RefCount b; 144 rc = que.PopFront(&b); 145 ASSERT_TRUE(rc.IsOk()); 146 ASSERT_EQ(b.v_.use_count(), 2); 147 ASSERT_EQ(*(b.v_.get()), 3); 148 // Test the destructor of the Queue by adding an element without popping. 149 rc = que.EmplaceBack(10); 150 ASSERT_TRUE(rc.IsOk()); 151 } 152 153 TEST_F(MindDataTestQueue, Test4) { test4(); } 154 155 TEST_F(MindDataTestQueue, Test5) { 156 test4(); 157 // Assume we have run Test4. The destructor of the RefCount should be called 4 times. 158 // One for a. One for b. One for the stale element in the queue. 3 more for 159 // the one in the queue (but they are empty). 160 ASSERT_EQ(gRefCountDestructorCalled, 6); 161 } 162 163 TEST_F(MindDataTestQueue, Test6) { 164 // Create a list of queues 165 QueueList<std::unique_ptr<int>> my_list_of_queues; 166 const int chosen_queue_index = 2; 167 const int num_queues = 4; 168 const int queue_capacity = 3; 169 my_list_of_queues.Init(num_queues, queue_capacity); 170 // Now try to insert a number into a specific queue and pop it 171 std::unique_ptr<int> a(new int(99)); 172 Status rc = my_list_of_queues[chosen_queue_index]->Add(std::move(a)); 173 ASSERT_TRUE(rc.IsOk()); 174 std::unique_ptr<int> pepped_value; 175 rc = my_list_of_queues[chosen_queue_index]->PopFront(&pepped_value); 176 ASSERT_TRUE(rc.IsOk()); 177 MS_LOG(INFO) << "Popped value " << *pepped_value << " from queue index " << chosen_queue_index; 178 ASSERT_EQ(*pepped_value, 99); 179 } 180