• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //  Copyright (C) 2011 Tim Blechmann
2 //
3 //  Distributed under the Boost Software License, Version 1.0. (See
4 //  accompanying file LICENSE_1_0.txt or copy at
5 //  http://www.boost.org/LICENSE_1_0.txt)
6 
7 #include <cassert>
8 #include <iostream>
9 #include "test_helpers.hpp"
10 
11 #include <boost/array.hpp>
12 #include <boost/thread.hpp>
13 
14 namespace impl {
15 
16 using boost::array;
17 using namespace boost;
18 using namespace std;
19 
20 template <bool Bounded = false>
21 struct queue_stress_tester
22 {
23     static const unsigned int buckets = 1<<13;
24 #ifndef BOOST_LOCKFREE_STRESS_TEST
25     static const long node_count =  5000;
26 #else
27     static const long node_count = 500000;
28 #endif
29     const int reader_threads;
30     const int writer_threads;
31 
32     boost::lockfree::detail::atomic<int> writers_finished;
33 
34     static_hashed_set<long, buckets> data;
35     static_hashed_set<long, buckets> dequeued;
36     array<std::set<long>, buckets> returned;
37 
38     boost::lockfree::detail::atomic<int> push_count, pop_count;
39 
queue_stress_testerimpl::queue_stress_tester40     queue_stress_tester(int reader, int writer):
41         reader_threads(reader), writer_threads(writer), push_count(0), pop_count(0)
42     {}
43 
44     template <typename queue>
add_itemsimpl::queue_stress_tester45     void add_items(queue & stk)
46     {
47         for (long i = 0; i != node_count; ++i) {
48             long id = generate_id<long>();
49 
50             bool inserted = data.insert(id);
51             assert(inserted);
52 
53             if (Bounded)
54                 while(stk.bounded_push(id) == false) {
55 #ifdef __VXWORKS__
56                     thread::yield();
57 #endif
58                 }
59             else
60                 while(stk.push(id) == false) {
61 #ifdef __VXWORKS__
62                     thread::yield();
63 #endif
64                 }
65             ++push_count;
66         }
67         writers_finished += 1;
68     }
69 
70     boost::lockfree::detail::atomic<bool> running;
71 
72     template <typename queue>
consume_elementimpl::queue_stress_tester73     bool consume_element(queue & q)
74     {
75         long id;
76         bool ret = q.pop(id);
77 
78         if (!ret)
79             return false;
80 
81         bool erased = data.erase(id);
82         bool inserted = dequeued.insert(id);
83         assert(erased);
84         assert(inserted);
85         ++pop_count;
86         return true;
87     }
88 
89     template <typename queue>
get_itemsimpl::queue_stress_tester90     void get_items(queue & q)
91     {
92         for (;;) {
93             bool received_element = consume_element(q);
94             if (received_element)
95                 continue;
96 
97             if ( writers_finished.load() == writer_threads )
98                 break;
99 
100 #ifdef __VXWORKS__
101             thread::yield();
102 #endif
103         }
104 
105         while (consume_element(q));
106     }
107 
108     template <typename queue>
runimpl::queue_stress_tester109     void run(queue & stk)
110     {
111         BOOST_WARN(stk.is_lock_free());
112         writers_finished.store(0);
113 
114         thread_group writer;
115         thread_group reader;
116 
117         BOOST_REQUIRE(stk.empty());
118 
119         for (int i = 0; i != reader_threads; ++i)
120             reader.create_thread(boost::bind(&queue_stress_tester::template get_items<queue>, this, boost::ref(stk)));
121 
122         for (int i = 0; i != writer_threads; ++i)
123             writer.create_thread(boost::bind(&queue_stress_tester::template add_items<queue>, this, boost::ref(stk)));
124 
125         std::cout << "threads created" << std::endl;
126 
127         writer.join_all();
128 
129         std::cout << "writer threads joined, waiting for readers" << std::endl;
130 
131         reader.join_all();
132 
133         std::cout << "reader threads joined" << std::endl;
134 
135         BOOST_REQUIRE_EQUAL(data.count_nodes(), (size_t)0);
136         BOOST_REQUIRE(stk.empty());
137 
138         BOOST_REQUIRE_EQUAL(push_count, pop_count);
139         BOOST_REQUIRE_EQUAL(push_count, writer_threads * node_count);
140     }
141 };
142 
143 }
144 
145 using impl::queue_stress_tester;
146