1 // Copyright (C) 2011 Tim Blechmann 2 // 3 // Distributed under the Boost Software License, Version 1.0. (See 4 // accompanying file LICENSE_1_0.txt or copy at 5 // http://www.boost.org/LICENSE_1_0.txt) 6 7 #include <cassert> 8 #include <iostream> 9 #include "test_helpers.hpp" 10 11 #include <boost/array.hpp> 12 #include <boost/thread.hpp> 13 14 namespace impl { 15 16 using boost::array; 17 using namespace boost; 18 using namespace std; 19 20 template <bool Bounded = false> 21 struct queue_stress_tester 22 { 23 static const unsigned int buckets = 1<<13; 24 #ifndef BOOST_LOCKFREE_STRESS_TEST 25 static const long node_count = 5000; 26 #else 27 static const long node_count = 500000; 28 #endif 29 const int reader_threads; 30 const int writer_threads; 31 32 boost::lockfree::detail::atomic<int> writers_finished; 33 34 static_hashed_set<long, buckets> data; 35 static_hashed_set<long, buckets> dequeued; 36 array<std::set<long>, buckets> returned; 37 38 boost::lockfree::detail::atomic<int> push_count, pop_count; 39 queue_stress_testerimpl::queue_stress_tester40 queue_stress_tester(int reader, int writer): 41 reader_threads(reader), writer_threads(writer), push_count(0), pop_count(0) 42 {} 43 44 template <typename queue> add_itemsimpl::queue_stress_tester45 void add_items(queue & stk) 46 { 47 for (long i = 0; i != node_count; ++i) { 48 long id = generate_id<long>(); 49 50 bool inserted = data.insert(id); 51 assert(inserted); 52 53 if (Bounded) 54 while(stk.bounded_push(id) == false) { 55 #ifdef __VXWORKS__ 56 thread::yield(); 57 #endif 58 } 59 else 60 while(stk.push(id) == false) { 61 #ifdef __VXWORKS__ 62 thread::yield(); 63 #endif 64 } 65 ++push_count; 66 } 67 writers_finished += 1; 68 } 69 70 boost::lockfree::detail::atomic<bool> running; 71 72 template <typename queue> consume_elementimpl::queue_stress_tester73 bool consume_element(queue & q) 74 { 75 long id; 76 bool ret = q.pop(id); 77 78 if (!ret) 79 return false; 80 81 bool erased = data.erase(id); 82 bool inserted = dequeued.insert(id); 83 assert(erased); 84 assert(inserted); 85 ++pop_count; 86 return true; 87 } 88 89 template <typename queue> get_itemsimpl::queue_stress_tester90 void get_items(queue & q) 91 { 92 for (;;) { 93 bool received_element = consume_element(q); 94 if (received_element) 95 continue; 96 97 if ( writers_finished.load() == writer_threads ) 98 break; 99 100 #ifdef __VXWORKS__ 101 thread::yield(); 102 #endif 103 } 104 105 while (consume_element(q)); 106 } 107 108 template <typename queue> runimpl::queue_stress_tester109 void run(queue & stk) 110 { 111 BOOST_WARN(stk.is_lock_free()); 112 writers_finished.store(0); 113 114 thread_group writer; 115 thread_group reader; 116 117 BOOST_REQUIRE(stk.empty()); 118 119 for (int i = 0; i != reader_threads; ++i) 120 reader.create_thread(boost::bind(&queue_stress_tester::template get_items<queue>, this, boost::ref(stk))); 121 122 for (int i = 0; i != writer_threads; ++i) 123 writer.create_thread(boost::bind(&queue_stress_tester::template add_items<queue>, this, boost::ref(stk))); 124 125 std::cout << "threads created" << std::endl; 126 127 writer.join_all(); 128 129 std::cout << "writer threads joined, waiting for readers" << std::endl; 130 131 reader.join_all(); 132 133 std::cout << "reader threads joined" << std::endl; 134 135 BOOST_REQUIRE_EQUAL(data.count_nodes(), (size_t)0); 136 BOOST_REQUIRE(stk.empty()); 137 138 BOOST_REQUIRE_EQUAL(push_count, pop_count); 139 BOOST_REQUIRE_EQUAL(push_count, writer_threads * node_count); 140 } 141 }; 142 143 } 144 145 using impl::queue_stress_tester; 146