• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2004, Bull S.A..  All rights reserved.
3  * Created by: Sebastien Decugis
4 
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of version 2 of the GNU General Public License as
7  * published by the Free Software Foundation.
8  *
9  * This program is distributed in the hope that it would be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
12  *
13  * You should have received a copy of the GNU General Public License along
14  * with this program; if not, write the Free Software Foundation, Inc.,
15  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 
17  * This scalability sample aims to test the following assertions:
18  *  -> If pthread_create fails because of a lack of a resource, or
19  	PTHREAD_THREADS_MAX threads already exist, EAGAIN shall be returned.
20  *  -> It also tests that the thread creation time does not depend on the # of threads
21  *     already created.
22 
23  * The steps are:
24  * -> Create threads until failure
25 
26  */
27 
28 /********************************************************************************************/
29 /****************************** standard includes *****************************************/
30 /********************************************************************************************/
31 #include <pthread.h>
32 #include <stdarg.h>
33 #include <stdio.h>
34 #include <stdlib.h>
35 #include <string.h>
36 #include <unistd.h>
37 
38 #include <sched.h>
39 #include <semaphore.h>
40 #include <errno.h>
41 #include <assert.h>
42 #include <sys/wait.h>
43 #include <math.h>
44 
45 /********************************************************************************************/
46 /******************************   Test framework   *****************************************/
47 /********************************************************************************************/
48 #include "testfrmw.h"
49 #include "testfrmw.c"
50  /* This header is responsible for defining the following macros:
51   * UNRESOLVED(ret, descr);
52   *    where descr is a description of the error and ret is an int (error code for example)
53   * FAILED(descr);
54   *    where descr is a short text saying why the test has failed.
55   * PASSED();
56   *    No parameter.
57   *
58   * Both three macros shall terminate the calling process.
59   * The testcase shall not terminate in any other maneer.
60   *
61   * The other file defines the functions
62   * void output_init()
63   * void output(char * string, ...)
64   *
65   * Those may be used to output information.
66   */
67 
68 /********************************************************************************************/
69 /********************************** Configuration ******************************************/
70 /********************************************************************************************/
71 #ifndef SCALABILITY_FACTOR
72 #define SCALABILITY_FACTOR 1
73 #endif
74 #ifndef VERBOSE
75 #define VERBOSE 1
76 #endif
77 
78 #define RESOLUTION (5 * SCALABILITY_FACTOR)
79 
80 #ifdef PLOT_OUTPUT
81 #undef VERBOSE
82 #define VERBOSE 0
83 #endif
84 
85 /********************************************************************************************/
86 /***********************************    Test cases  *****************************************/
87 /********************************************************************************************/
88 
89 #include "threads_scenarii.c"
90 
91 /* This file will define the following objects:
92  * scenarii: array of struct __scenario type.
93  * NSCENAR : macro giving the total # of scenarii
94  * scenar_init(): function to call before use the scenarii array.
95  * scenar_fini(): function to call after end of use of the scenarii array.
96  */
97 
98 /********************************************************************************************/
99 /***********************************    Real Test   *****************************************/
100 /********************************************************************************************/
101 
102 /* The next structure is used to save the tests measures */
103 typedef struct __mes_t {
104 	int nthreads;
105 	long _data[NSCENAR];	/* As we store µsec values, a long type should be amply enough. */
106 	struct __mes_t *next;
107 } mes_t;
108 
109 /* Forward declaration */
110 int parse_measure(mes_t * measures);
111 
112 pthread_mutex_t m_synchro = PTHREAD_MUTEX_INITIALIZER;
113 
threaded(void * arg)114 void *threaded(void *arg)
115 {
116 	int ret = 0;
117 
118 	/* Signal we're done */
119 	do {
120 		ret = sem_post(&scenarii[sc].sem);
121 	}
122 	while ((ret == -1) && (errno == EINTR));
123 	if (ret == -1) {
124 		UNRESOLVED(errno, "Failed to wait for the semaphore");
125 	}
126 
127 	/* Wait for all threads being created */
128 	ret = pthread_mutex_lock(&m_synchro);
129 	if (ret != 0) {
130 		UNRESOLVED(ret, "Mutex lock failed");
131 	}
132 	(*(int *)arg) += 1;
133 	ret = pthread_mutex_unlock(&m_synchro);
134 	if (ret != 0) {
135 		UNRESOLVED(ret, "Mutex unlock failed");
136 	}
137 
138 	return arg;
139 }
140 
main(int argc,char * argv[])141 int main(int argc, char *argv[])
142 {
143 	int ret = 0;
144 	pthread_t child;
145 	pthread_t *th;
146 
147 	int nthreads, ctl, i, tmp;
148 
149 	struct timespec ts_ref, ts_fin;
150 
151 	mes_t sentinel;
152 	mes_t *m_cur, *m_tmp;
153 
154 	long PTHREAD_THREADS_MAX = sysconf(_SC_THREAD_THREADS_MAX);
155 	long my_max = 1000 * SCALABILITY_FACTOR;
156 
157 	/* Initialize the measure list */
158 	m_cur = &sentinel;
159 	m_cur->next = NULL;
160 
161 	/* Initialize output routine */
162 	output_init();
163 
164 	if (PTHREAD_THREADS_MAX > 0)
165 		my_max = PTHREAD_THREADS_MAX;
166 
167 	th = (pthread_t *) calloc(1 + my_max, sizeof(pthread_t));
168 	if (th == NULL) {
169 		UNRESOLVED(errno, "Not enough memory for thread storage");
170 	}
171 
172 	/* Initialize thread attribute objects */
173 	scenar_init();
174 
175 #ifdef PLOT_OUTPUT
176 	printf("# COLUMNS %d #threads", NSCENAR + 1);
177 	for (sc = 0; sc < NSCENAR; sc++)
178 		printf(" %i", sc);
179 	printf("\n");
180 #endif
181 
182 	for (sc = 0; sc < NSCENAR; sc++) {
183 		if (scenarii[sc].bottom == NULL) {	/* skip the alternate stacks as we could create only 1 */
184 #if VERBOSE > 0
185 			output("-----\n");
186 			output("Starting test with scenario (%i): %s\n", sc,
187 			       scenarii[sc].descr);
188 #endif
189 
190 			/* Block every (about to be) created threads */
191 			ret = pthread_mutex_lock(&m_synchro);
192 			if (ret != 0) {
193 				UNRESOLVED(ret, "Mutex lock failed");
194 			}
195 
196 			ctl = 0;
197 			nthreads = 0;
198 			m_cur = &sentinel;
199 
200 			/* Create 1 thread for testing purpose */
201 			ret =
202 			    pthread_create(&child, &scenarii[sc].ta, threaded,
203 					   &ctl);
204 			switch (scenarii[sc].result) {
205 			case 0:	/* Operation was expected to succeed */
206 				if (ret != 0) {
207 					UNRESOLVED(ret,
208 						   "Failed to create this thread");
209 				}
210 				break;
211 
212 			case 1:	/* Operation was expected to fail */
213 				if (ret == 0) {
214 					UNRESOLVED(-1,
215 						   "An error was expected but the thread creation succeeded");
216 				}
217 				break;
218 
219 			case 2:	/* We did not know the expected result */
220 			default:
221 #if VERBOSE > 0
222 				if (ret == 0) {
223 					output
224 					    ("Thread has been created successfully for this scenario\n");
225 				} else {
226 					output
227 					    ("Thread creation failed with the error: %s\n",
228 					     strerror(ret));
229 				}
230 #endif
231 				;
232 			}
233 			if (ret == 0) {	/* The new thread is running */
234 
235 				while (1) {	/* we will break */
236 					/* read clock */
237 					ret =
238 					    clock_gettime(CLOCK_REALTIME,
239 							  &ts_ref);
240 					if (ret != 0) {
241 						UNRESOLVED(errno,
242 							   "Unable to read clock");
243 					}
244 
245 					/* create a new thread */
246 					ret =
247 					    pthread_create(&th[nthreads],
248 							   &scenarii[sc].ta,
249 							   threaded, &ctl);
250 
251 					/* stop here if we've got EAGAIN */
252 					if (ret == EAGAIN)
253 						break;
254 
255 // temporary hack
256 					if (ret == ENOMEM)
257 						break;
258 					nthreads++;
259 
260 					/* FAILED if error is != EAGAIN or nthreads > PTHREAD_THREADS_MAX */
261 					if (ret != 0) {
262 						output
263 						    ("pthread_create returned: %i (%s)\n",
264 						     ret, strerror(ret));
265 						FAILED
266 						    ("pthread_create did not return EAGAIN on a lack of resource");
267 					}
268 					if (nthreads > my_max) {
269 						if (PTHREAD_THREADS_MAX > 0) {
270 							FAILED
271 							    ("We were able to create more than PTHREAD_THREADS_MAX threads");
272 						} else {
273 							break;
274 						}
275 					}
276 
277 					/* wait for the semaphore */
278 					do {
279 						ret =
280 						    sem_wait(&scenarii[sc].sem);
281 					}
282 					while ((ret == -1) && (errno == EINTR));
283 					if (ret == -1) {
284 						UNRESOLVED(errno,
285 							   "Failed to wait for the semaphore");
286 					}
287 
288 					/* read clock */
289 					ret =
290 					    clock_gettime(CLOCK_REALTIME,
291 							  &ts_fin);
292 					if (ret != 0) {
293 						UNRESOLVED(errno,
294 							   "Unable to read clock");
295 					}
296 
297 					/* add to the measure list if nthreads % resolution == 0 */
298 					if ((nthreads % RESOLUTION) == 0) {
299 						if (m_cur->next == NULL) {
300 							/* Create an empty new element */
301 							m_tmp = malloc(sizeof(mes_t));
302 							if (m_tmp == NULL) {
303 								UNRESOLVED
304 								    (errno,
305 								     "Unable to alloc memory for measure saving");
306 							}
307 							m_tmp->nthreads =
308 							    nthreads;
309 							m_tmp->next = NULL;
310 							for (tmp = 0;
311 							     tmp < NSCENAR;
312 							     tmp++)
313 								m_tmp->
314 								    _data[tmp] =
315 								    0;
316 							m_cur->next = m_tmp;
317 						}
318 
319 						/* Add this measure to the next element */
320 						m_cur = m_cur->next;
321 						m_cur->_data[sc] =
322 						    ((ts_fin.tv_sec -
323 						      ts_ref.tv_sec) *
324 						     1000000) +
325 						    ((ts_fin.tv_nsec -
326 						      ts_ref.tv_nsec) / 1000);
327 
328 #if VERBOSE > 5
329 						output
330 						    ("Added the following measure: sc=%i, n=%i, v=%li\n",
331 						     sc, nthreads,
332 						     m_cur->_data[sc]);
333 #endif
334 					}
335 				}
336 #if VERBOSE > 3
337 				output
338 				    ("Could not create anymore thread. Current count is %i\n",
339 				     nthreads);
340 #endif
341 
342 				/* Unblock every created threads */
343 				ret = pthread_mutex_unlock(&m_synchro);
344 				if (ret != 0) {
345 					UNRESOLVED(ret, "Mutex unlock failed");
346 				}
347 
348 				if (scenarii[sc].detached == 0) {
349 #if VERBOSE > 3
350 					output("Joining the threads\n");
351 #endif
352 					for (i = 0; i < nthreads; i++) {
353 						ret = pthread_join(th[i], NULL);
354 						if (ret != 0) {
355 							UNRESOLVED(ret,
356 								   "Unable to join a thread");
357 						}
358 					}
359 
360 					ret = pthread_join(child, NULL);
361 					if (ret != 0) {
362 						UNRESOLVED(ret,
363 							   "Unalbe to join a thread");
364 					}
365 
366 				}
367 #if VERBOSE > 3
368 				output
369 				    ("Waiting for threads (almost) termination\n");
370 #endif
371 				do {
372 					ret = pthread_mutex_lock(&m_synchro);
373 					if (ret != 0) {
374 						UNRESOLVED(ret,
375 							   "Mutex lock failed");
376 					}
377 
378 					tmp = ctl;
379 
380 					ret = pthread_mutex_unlock(&m_synchro);
381 					if (ret != 0) {
382 						UNRESOLVED(ret,
383 							   "Mutex unlock failed");
384 					}
385 				} while (tmp != nthreads + 1);
386 
387 			}
388 			/* The thread was created */
389 		}
390 	}			/* next scenario */
391 
392 	/* Free some memory before result parsing */
393 	free(th);
394 
395 	/* Compute the results */
396 	ret = parse_measure(&sentinel);
397 
398 	/* Free the resources and output the results */
399 
400 #if VERBOSE > 5
401 	printf("Dump : \n");
402 	printf("%8.8s", "nth");
403 	for (i = 0; i < NSCENAR; i++)
404 		printf("|   %2.2i   ", i);
405 	printf("\n");
406 #endif
407 	while (sentinel.next != NULL) {
408 		m_cur = sentinel.next;
409 #if (VERBOSE > 5) || defined(PLOT_OUTPUT)
410 		printf("%8.8i", m_cur->nthreads);
411 		for (i = 0; i < NSCENAR; i++)
412 			printf(" %1.1li.%6.6li", m_cur->_data[i] / 1000000,
413 			       m_cur->_data[i] % 1000000);
414 		printf("\n");
415 #endif
416 		sentinel.next = m_cur->next;
417 		free(m_cur);
418 	}
419 
420 	scenar_fini();
421 
422 #if VERBOSE > 0
423 	output("-----\n");
424 	output("All test data destroyed\n");
425 	output("Test PASSED\n");
426 #endif
427 
428 	PASSED;
429 }
430 
431 /***
432  * The next function will seek for the better model for each series of measurements.
433  *
434  * The tested models are: -- X = # threads; Y = latency
435  * -> Y = a;      -- Error is r1 = avg((Y - Yavg)²);
436  * -> Y = aX + b; -- Error is r2 = avg((Y -aX -b)²);
437  *                -- where a = avg ((X - Xavg)(Y - Yavg)) / avg((X - Xavg)²)
438  *                --         Note: We will call _q = sum((X - Xavg) * (Y - Yavg));
439  *                --                       and  _d = sum((X - Xavg)²);
440  *                -- and   b = Yavg - a * Xavg
441  * -> Y = c * X^a;-- Same as previous, but with log(Y) = a log(X) + b; and b = log(c). Error is r3
442  * -> Y = exp(aX + b); -- log(Y) = aX + b. Error is r4
443  *
444  * We compute each error factor (r1, r2, r3, r4) then search which is the smallest (with ponderation).
445  * The function returns 0 when r1 is the best for all cases (latency is constant) and !0 otherwise.
446  */
447 
448 struct row {
449 	long X;			/* the X values -- copied from function argument */
450 	long Y[NSCENAR];	/* the Y values -- copied from function argument */
451 	long _x[NSCENAR];	/* Value X - Xavg */
452 	long _y[NSCENAR];	/* Value Y - Yavg */
453 	double LnX;		/* Natural logarithm of X values */
454 	double LnY[NSCENAR];	/* Natural logarithm of Y values */
455 	double _lnx[NSCENAR];	/* Value LnX - LnXavg */
456 	double _lny[NSCENAR];	/* Value LnY - LnYavg */
457 };
458 
parse_measure(mes_t * measures)459 int parse_measure(mes_t * measures)
460 {
461 	int ret, i, r;
462 
463 	mes_t *cur;
464 
465 	double Xavg[NSCENAR], Yavg[NSCENAR];
466 	double LnXavg[NSCENAR], LnYavg[NSCENAR];
467 
468 	int N;
469 
470 	double r1[NSCENAR], r2[NSCENAR], r3[NSCENAR], r4[NSCENAR];
471 
472 	/* Some more intermediate vars */
473 	long double _q[3][NSCENAR];
474 	long double _d[3][NSCENAR];
475 
476 	long double t;		/* temp value */
477 
478 	struct row *Table = NULL;
479 
480 	/* This array contains the last element of each serie */
481 	int array_max[NSCENAR];
482 
483 	/* Initialize the datas */
484 	for (i = 0; i < NSCENAR; i++) {
485 		array_max[i] = -1;	/* means no data */
486 		Xavg[i] = 0.0;
487 		LnXavg[i] = 0.0;
488 		Yavg[i] = 0.0;
489 		LnYavg[i] = 0.0;
490 		r1[i] = 0.0;
491 		r2[i] = 0.0;
492 		r3[i] = 0.0;
493 		r4[i] = 0.0;
494 		_q[0][i] = 0.0;
495 		_q[1][i] = 0.0;
496 		_q[2][i] = 0.0;
497 		_d[0][i] = 0.0;
498 		_d[1][i] = 0.0;
499 		_d[2][i] = 0.0;
500 	}
501 	N = 0;
502 	cur = measures;
503 
504 #if VERBOSE > 1
505 	output("Data analysis starting\n");
506 #endif
507 
508 	/* We start with reading the list to find:
509 	 * -> number of elements, to assign an array.
510 	 * -> average values
511 	 */
512 	while (cur->next != NULL) {
513 		cur = cur->next;
514 
515 		N++;
516 
517 		for (i = 0; i < NSCENAR; i++) {
518 			if (cur->_data[i] != 0) {
519 				array_max[i] = N;
520 				Xavg[i] += (double)cur->nthreads;
521 				LnXavg[i] += log((double)cur->nthreads);
522 				Yavg[i] += (double)cur->_data[i];
523 				LnYavg[i] += log((double)cur->_data[i]);
524 			}
525 		}
526 	}
527 
528 	/* We have the sum; we can divide to obtain the average values */
529 	for (i = 0; i < NSCENAR; i++) {
530 		if (array_max[i] != -1) {
531 			Xavg[i] /= array_max[i];
532 			LnXavg[i] /= array_max[i];
533 			Yavg[i] /= array_max[i];
534 			LnYavg[i] /= array_max[i];
535 		}
536 	}
537 
538 #if VERBOSE > 1
539 	output(" Found %d rows and %d columns\n", N, NSCENAR);
540 #endif
541 
542 	/* We will now alloc the array ... */
543 	Table = calloc(N, sizeof(struct row));
544 	if (Table == NULL) {
545 		UNRESOLVED(errno, "Unable to alloc space for results parsing");
546 	}
547 
548 	/* ... and fill it */
549 	N = 0;
550 	cur = measures;
551 
552 	while (cur->next != NULL) {
553 		cur = cur->next;
554 
555 		Table[N].X = (long)cur->nthreads;
556 		Table[N].LnX = log((double)cur->nthreads);
557 		for (i = 0; i < NSCENAR; i++) {
558 			if (array_max[i] > N) {
559 				Table[N]._x[i] = Table[N].X - Xavg[i];
560 				Table[N]._lnx[i] = Table[N].LnX - LnXavg[i];
561 				Table[N].Y[i] = cur->_data[i];
562 				Table[N]._y[i] = Table[N].Y[i] - Yavg[i];
563 				Table[N].LnY[i] = log((double)cur->_data[i]);
564 				Table[N]._lny[i] = Table[N].LnY[i] - LnYavg[i];
565 			}
566 		}
567 
568 		N++;
569 	}
570 
571 	/* We won't need the list anymore -- we'll work with the array which should be faster. */
572 #if VERBOSE > 1
573 	output(" Data was stored in an array.\n");
574 #endif
575 
576 	/* We need to read the full array at least twice to compute all the error factors */
577 
578 	/* In the first pass, we'll compute:
579 	 * -> r1 for each scenar.
580 	 * -> "a" factor for linear (0), power (1) and exponential (2) approximations -- with using the _d and _q vars.
581 	 */
582 #if VERBOSE > 1
583 	output("Starting first pass...\n");
584 #endif
585 	for (i = 0; i < NSCENAR; i++) {
586 		for (r = 0; r < array_max[i]; r++) {
587 			r1[i] +=
588 			    ((double)Table[r]._y[i] / array_max[i]) *
589 			    (double)Table[r]._y[i];
590 
591 			_q[0][i] += Table[r]._y[i] * Table[r]._x[i];
592 			_d[0][i] += Table[r]._x[i] * Table[r]._x[i];
593 
594 			_q[1][i] += Table[r]._lny[i] * Table[r]._lnx[i];
595 			_d[1][i] += Table[r]._lnx[i] * Table[r]._lnx[i];
596 
597 			_q[2][i] += Table[r]._lny[i] * Table[r]._x[i];
598 			_d[2][i] += Table[r]._x[i] * Table[r]._x[i];
599 		}
600 	}
601 
602 	/* First pass is terminated; a2 = _q[0]/_d[0]; a3 = _q[1]/_d[1]; a4 = _q[2]/_d[2] */
603 
604 	/* In the first pass, we'll compute:
605 	 * -> r2, r3, r4 for each scenar.
606 	 */
607 
608 #if VERBOSE > 1
609 	output("Starting second pass...\n");
610 #endif
611 	for (i = 0; i < NSCENAR; i++) {
612 		for (r = 0; r < array_max[i]; r++) {
613 			/* r2 = avg((y - ax -b)²);  t = (y - ax - b) = (y - yavg) - a (x - xavg); */
614 			t = (Table[r]._y[i] -
615 			     ((_q[0][i] * Table[r]._x[i]) / _d[0][i]));
616 			r2[i] += t * t / array_max[i];
617 
618 			/* r3 = avg((y - c.x^a) ²);
619 			   t = y - c * x ^ a
620 			   = y - log (LnYavg - (_q[1]/_d[1]) * LnXavg) * x ^ (_q[1]/_d[1])
621 			 */
622 			t = (Table[r].Y[i]
623 			     -
624 			     (logl
625 			      (LnYavg[i] - (_q[1][i] / _d[1][i]) * LnXavg[i])
626 			      * powl(Table[r].X, (_q[1][i] / _d[1][i]))
627 			     ));
628 			r3[i] += t * t / array_max[i];
629 
630 			/* r4 = avg((y - exp(ax+b))²);
631 			   t = y - exp(ax+b)
632 			   = y - exp(_q[2]/_d[2] * x + (LnYavg - (_q[2]/_d[2] * Xavg)));
633 			   = y - exp(_q[2]/_d[2] * (x - Xavg) + LnYavg);
634 			 */
635 			t = (Table[r].Y[i]
636 			     - expl((_q[2][i] / _d[2][i]) * Table[r]._x[i] +
637 				    LnYavg[i]));
638 			r4[i] += t * t / array_max[i];
639 
640 		}
641 	}
642 
643 #if VERBOSE > 1
644 	output("All computing terminated.\n");
645 #endif
646 	ret = 0;
647 	for (i = 0; i < NSCENAR; i++) {
648 #if VERBOSE > 1
649 		output("\nScenario: %s\n", scenarii[i].descr);
650 
651 		output(" # of data: %i\n", array_max[i]);
652 
653 		output("  Model: Y = k\n");
654 		output("       k = %g\n", Yavg[i]);
655 		output("    Divergence %g\n", r1[i]);
656 
657 		output("  Model: Y = a * X + b\n");
658 		output("       a = %Lg\n", _q[0][i] / _d[0][i]);
659 		output("       b = %Lg\n",
660 		       Yavg[i] - ((_q[0][i] / _d[0][i]) * Xavg[i]));
661 		output("    Divergence %g\n", r2[i]);
662 
663 		output("  Model: Y = c * X ^ a\n");
664 		output("       a = %Lg\n", _q[1][i] / _d[1][i]);
665 		output("       c = %Lg\n",
666 		       logl(LnYavg[i] - (_q[1][i] / _d[1][i]) * LnXavg[i]));
667 		output("    Divergence %g\n", r2[i]);
668 
669 		output("  Model: Y = exp(a * X + b)\n");
670 		output("       a = %Lg\n", _q[2][i] / _d[2][i]);
671 		output("       b = %Lg\n",
672 		       LnYavg[i] - ((_q[2][i] / _d[2][i]) * Xavg[i]));
673 		output("    Divergence %g\n", r2[i]);
674 #endif
675 
676 		if (array_max[i] != -1) {
677 			/* Compare r1 to other values, with some ponderations */
678 			if ((r1[i] > 1.1 * r2[i]) || (r1[i] > 1.2 * r3[i])
679 			    || (r1[i] > 1.3 * r4[i]))
680 				ret++;
681 #if VERBOSE > 1
682 			else
683 				output(" Sanction: OK\n");
684 #endif
685 		}
686 	}
687 
688 	/* We need to free the array */
689 	free(Table);
690 
691 	/* We're done */
692 	return ret;
693 }
694