• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) Yann Collet, Facebook, Inc.
3  * All rights reserved.
4  *
5  * This source code is licensed under both the BSD-style license (found in the
6  * LICENSE file in the root directory of this source tree) and the GPLv2 (found
7  * in the COPYING file in the root directory of this source tree).
8  * You may select, at your option, one of the above-listed licenses.
9  */
10 
11 
12 #include "pool.h"
13 #include "threading.h"
14 #include "util.h"
15 #include "timefn.h"
16 #include <stddef.h>
17 #include <stdio.h>
18 
19 #define ASSERT_TRUE(p)                                                       \
20   do {                                                                       \
21     if (!(p)) {                                                              \
22       return 1;                                                              \
23     }                                                                        \
24   } while (0)
25 #define ASSERT_FALSE(p) ASSERT_TRUE(!(p))
26 #define ASSERT_EQ(lhs, rhs) ASSERT_TRUE((lhs) == (rhs))
27 
28 struct data {
29   ZSTD_pthread_mutex_t mutex;
30   unsigned data[16];
31   size_t i;
32 };
33 
fn(void * opaque)34 static void fn(void *opaque)
35 {
36   struct data *data = (struct data *)opaque;
37   ZSTD_pthread_mutex_lock(&data->mutex);
38   data->data[data->i] = (unsigned)(data->i);
39   ++data->i;
40   ZSTD_pthread_mutex_unlock(&data->mutex);
41 }
42 
testOrder(size_t numThreads,size_t queueSize)43 static int testOrder(size_t numThreads, size_t queueSize)
44 {
45   struct data data;
46   POOL_ctx* const ctx = POOL_create(numThreads, queueSize);
47   ASSERT_TRUE(ctx);
48   data.i = 0;
49   ASSERT_FALSE(ZSTD_pthread_mutex_init(&data.mutex, NULL));
50   { size_t i;
51     for (i = 0; i < 16; ++i) {
52       POOL_add(ctx, &fn, &data);
53     }
54   }
55   POOL_free(ctx);
56   ASSERT_EQ(16, data.i);
57   { size_t i;
58     for (i = 0; i < data.i; ++i) {
59       ASSERT_EQ(i, data.data[i]);
60     }
61   }
62   ZSTD_pthread_mutex_destroy(&data.mutex);
63   return 0;
64 }
65 
66 
67 /* --- test deadlocks --- */
68 
waitFn(void * opaque)69 static void waitFn(void *opaque) {
70   (void)opaque;
71   UTIL_sleepMilli(1);
72 }
73 
74 /* Tests for deadlock */
testWait(size_t numThreads,size_t queueSize)75 static int testWait(size_t numThreads, size_t queueSize) {
76   struct data data;
77   POOL_ctx* const ctx = POOL_create(numThreads, queueSize);
78   ASSERT_TRUE(ctx);
79   { size_t i;
80     for (i = 0; i < 16; ++i) {
81         POOL_add(ctx, &waitFn, &data);
82     }
83   }
84   POOL_free(ctx);
85   return 0;
86 }
87 
88 
89 /* --- test POOL_resize() --- */
90 
91 typedef struct {
92     ZSTD_pthread_mutex_t mut;
93     int countdown;
94     int val;
95     int max;
96     ZSTD_pthread_cond_t cond;
97 } poolTest_t;
98 
waitLongFn(void * opaque)99 static void waitLongFn(void *opaque) {
100   poolTest_t* const test = (poolTest_t*) opaque;
101   ZSTD_pthread_mutex_lock(&test->mut);
102   test->val++;
103   if (test->val > test->max)
104       test->max = test->val;
105   ZSTD_pthread_mutex_unlock(&test->mut);
106 
107   UTIL_sleepMilli(10);
108 
109   ZSTD_pthread_mutex_lock(&test->mut);
110   test->val--;
111   test->countdown--;
112   if (test->countdown == 0)
113       ZSTD_pthread_cond_signal(&test->cond);
114   ZSTD_pthread_mutex_unlock(&test->mut);
115 }
116 
testThreadReduction_internal(POOL_ctx * ctx,poolTest_t test)117 static int testThreadReduction_internal(POOL_ctx* ctx, poolTest_t test)
118 {
119     int const nbWaits = 16;
120 
121     test.countdown = nbWaits;
122     test.val = 0;
123     test.max = 0;
124 
125     {   int i;
126         for (i=0; i<nbWaits; i++)
127             POOL_add(ctx, &waitLongFn, &test);
128     }
129     ZSTD_pthread_mutex_lock(&test.mut);
130     while (test.countdown > 0)
131         ZSTD_pthread_cond_wait(&test.cond, &test.mut);
132     ASSERT_EQ(test.val, 0);
133     ASSERT_EQ(test.max, 4);
134     ZSTD_pthread_mutex_unlock(&test.mut);
135 
136     ASSERT_EQ( POOL_resize(ctx, 2/*nbThreads*/) , 0 );
137     test.countdown = nbWaits;
138     test.val = 0;
139     test.max = 0;
140     {   int i;
141         for (i=0; i<nbWaits; i++)
142             POOL_add(ctx, &waitLongFn, &test);
143     }
144     ZSTD_pthread_mutex_lock(&test.mut);
145     while (test.countdown > 0)
146         ZSTD_pthread_cond_wait(&test.cond, &test.mut);
147     ASSERT_EQ(test.val, 0);
148     ASSERT_EQ(test.max, 2);
149     ZSTD_pthread_mutex_unlock(&test.mut);
150 
151     return 0;
152 }
153 
testThreadReduction(void)154 static int testThreadReduction(void) {
155     int result;
156     poolTest_t test;
157     POOL_ctx* const ctx = POOL_create(4 /*nbThreads*/, 2 /*queueSize*/);
158 
159     ASSERT_TRUE(ctx);
160 
161     memset(&test, 0, sizeof(test));
162     ASSERT_FALSE( ZSTD_pthread_mutex_init(&test.mut, NULL) );
163     ASSERT_FALSE( ZSTD_pthread_cond_init(&test.cond, NULL) );
164 
165     result = testThreadReduction_internal(ctx, test);
166 
167     ZSTD_pthread_mutex_destroy(&test.mut);
168     ZSTD_pthread_cond_destroy(&test.cond);
169     POOL_free(ctx);
170 
171     return result;
172 }
173 
174 
175 /* --- test abrupt ending --- */
176 
177 typedef struct {
178     ZSTD_pthread_mutex_t mut;
179     int val;
180 } abruptEndCanary_t;
181 
waitIncFn(void * opaque)182 static void waitIncFn(void *opaque) {
183   abruptEndCanary_t* test = (abruptEndCanary_t*) opaque;
184   UTIL_sleepMilli(10);
185   ZSTD_pthread_mutex_lock(&test->mut);
186   test->val = test->val + 1;
187   ZSTD_pthread_mutex_unlock(&test->mut);
188 }
189 
testAbruptEnding_internal(abruptEndCanary_t test)190 static int testAbruptEnding_internal(abruptEndCanary_t test)
191 {
192     int const nbWaits = 16;
193 
194     POOL_ctx* const ctx = POOL_create(3 /*numThreads*/, nbWaits /*queueSize*/);
195     ASSERT_TRUE(ctx);
196     test.val = 0;
197 
198     {   int i;
199         for (i=0; i<nbWaits; i++)
200             POOL_add(ctx, &waitIncFn, &test);  /* all jobs pushed into queue */
201     }
202     ASSERT_EQ( POOL_resize(ctx, 1 /*numThreads*/) , 0 );   /* downsize numThreads, to try to break end condition */
203 
204     POOL_free(ctx);  /* must finish all jobs in queue before giving back control */
205     ASSERT_EQ(test.val, nbWaits);
206     return 0;
207 }
208 
testAbruptEnding(void)209 static int testAbruptEnding(void) {
210     int result;
211     abruptEndCanary_t test;
212 
213     memset(&test, 0, sizeof(test));
214     ASSERT_FALSE( ZSTD_pthread_mutex_init(&test.mut, NULL) );
215 
216     result = testAbruptEnding_internal(test);
217 
218     ZSTD_pthread_mutex_destroy(&test.mut);
219     return result;
220 }
221 
222 
223 
224 /* --- test launcher --- */
225 
main(int argc,const char ** argv)226 int main(int argc, const char **argv) {
227   size_t numThreads;
228   (void)argc;
229   (void)argv;
230 
231   if (POOL_create(0, 1)) {   /* should not be possible */
232     printf("FAIL: should not create POOL with 0 threads\n");
233     return 1;
234   }
235 
236   for (numThreads = 1; numThreads <= 4; ++numThreads) {
237     size_t queueSize;
238     for (queueSize = 0; queueSize <= 2; ++queueSize) {
239       printf("queueSize==%u, numThreads=%u \n",
240             (unsigned)queueSize, (unsigned)numThreads);
241       if (testOrder(numThreads, queueSize)) {
242         printf("FAIL: testOrder\n");
243         return 1;
244       }
245       printf("SUCCESS: testOrder\n");
246       if (testWait(numThreads, queueSize)) {
247         printf("FAIL: testWait\n");
248         return 1;
249       }
250       printf("SUCCESS: testWait\n");
251     }
252   }
253 
254   if (testThreadReduction()) {
255       printf("FAIL: thread reduction not effective \n");
256       return 1;
257   } else {
258       printf("SUCCESS: thread reduction effective \n");
259   }
260 
261   if (testAbruptEnding()) {
262       printf("FAIL: jobs in queue not completed on early end \n");
263       return 1;
264   } else {
265       printf("SUCCESS: all jobs in queue completed on early end \n");
266   }
267 
268   printf("PASS: all POOL tests\n");
269 
270   return 0;
271 }
272