• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2004 SuSE, Inc.  All Rights Reserved.
3  *
4  * This program is free software; you can redistribute it and/or modify it
5  * under the terms of version 2 of the GNU General Public License as
6  * published by the Free Software Foundation.
7  *
8  * This program is distributed in the hope that it would be useful, but
9  * WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
11  *
12  * Further, this software is distributed without any warranty that it is
13  * free of the rightful claim of any third person regarding infringement
14  * or the like.  Any license provided herein, whether implied or
15  * otherwise, applies only to this software file.  Patent licenses, if
16  * any, provided herein do not apply to combinations of this program with
17  * other software, or any other product whatsoever.
18  *
19  * You should have received a copy of the GNU General Public License along
20  * with this program; if not, write the Free Software Foundation, Inc.,
21  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
22  *
23  * Contact information: Silicon Graphics, Inc., 1600 Amphitheatre Pkwy,
24  * Mountain View, CA  94043, or:
25  *
26  *
27  * aio-stress
28  *
29  * will open or create each file on the command line, and start a series
30  * of aio to it.
31  *
32  * aio is done in a rotating loop.  first file1 gets 8 requests, then
33  * file2, then file3 etc.  As each file finishes writing, it is switched
34  * to reads
35  *
36  * io buffers are aligned in case you want to do raw io
37  *
38  * compile with gcc -Wall -laio -lpthread -o aio-stress aio-stress.c
39  *
40  * run aio-stress -h to see the options
41  *
42  * Please mail Chris Mason (mason@suse.com) with bug reports or patches
43  */
44 #define _FILE_OFFSET_BITS 64
45 #define PROG_VERSION "0.21"
46 #define NEW_GETEVENTS
47 
48 #define _GNU_SOURCE
49 #include <stdio.h>
50 #include <errno.h>
51 #include <assert.h>
52 #include <stdlib.h>
53 #include <sys/types.h>
54 #include <sys/stat.h>
55 #include <fcntl.h>
56 #include <unistd.h>
57 #include <sys/time.h>
58 #include <sys/ipc.h>
59 #include <sys/shm.h>
60 #include <sys/mman.h>
61 #include <string.h>
62 #include <pthread.h>
63 
64 #include "config.h"
65 #include "tst_res_flags.h"
66 
67 #ifdef HAVE_LIBAIO
68 #include <libaio.h>
69 
70 #define IO_FREE 0
71 #define IO_PENDING 1
72 #define RUN_FOREVER -1
73 
74 enum {
75 	WRITE,
76 	READ,
77 	RWRITE,
78 	RREAD,
79 	LAST_STAGE,
80 };
81 
82 #define USE_MALLOC 0
83 #define USE_SHM 1
84 #define USE_SHMFS 2
85 
86 /*
87  * various globals, these are effectively read only by the time the threads
88  * are started
89  */
90 long stages = 0;
91 unsigned long page_size_mask;
92 int o_direct = 0;
93 int o_sync = 0;
94 int latency_stats = 0;
95 int completion_latency_stats = 0;
96 int io_iter = 8;
97 int iterations = RUN_FOREVER;
98 int max_io_submit = 0;
99 long rec_len = 64 * 1024;
100 int depth = 64;
101 int num_threads = 1;
102 int num_contexts = 1;
103 off_t context_offset = 2 * 1024 * 1024;
104 int fsync_stages = 1;
105 int use_shm = 0;
106 int shm_id;
107 char *unaligned_buffer = NULL;
108 char *aligned_buffer = NULL;
109 int padded_reclen = 0;
110 int stonewall = 1;
111 int verify = 0;
112 char *verify_buf = NULL;
113 int unlink_files = 0;
114 
115 struct io_unit;
116 struct thread_info;
117 
118 /* pthread mutexes and other globals for keeping the threads in sync */
119 pthread_cond_t stage_cond = PTHREAD_COND_INITIALIZER;
120 pthread_mutex_t stage_mutex = PTHREAD_MUTEX_INITIALIZER;
121 int threads_ending = 0;
122 int threads_starting = 0;
123 struct timeval global_stage_start_time;
124 struct thread_info *global_thread_info;
125 
126 /*
127  * latencies during io_submit are measured, these are the
128  * granularities for deviations
129  */
130 #define DEVIATIONS 6
131 int deviations[DEVIATIONS] = { 100, 250, 500, 1000, 5000, 10000 };
132 
133 struct io_latency {
134 	double max;
135 	double min;
136 	double total_io;
137 	double total_lat;
138 	double deviations[DEVIATIONS];
139 };
140 
141 /* container for a series of operations to a file */
142 struct io_oper {
143 	/* already open file descriptor, valid for whatever operation you want */
144 	int fd;
145 
146 	/* starting byte of the operation */
147 	off_t start;
148 
149 	/* ending byte of the operation */
150 	off_t end;
151 
152 	/* size of the read/write buffer */
153 	int reclen;
154 
155 	/* max number of pending requests before a wait is triggered */
156 	int depth;
157 
158 	/* current number of pending requests */
159 	int num_pending;
160 
161 	/* last error, zero if there were none */
162 	int last_err;
163 
164 	/* total number of errors hit. */
165 	int num_err;
166 
167 	/* read,write, random, etc */
168 	int rw;
169 
170 	/* number of I/O that will get sent to aio */
171 	int total_ios;
172 
173 	/* number of I/O we've already sent */
174 	int started_ios;
175 
176 	/* last offset used in an io operation */
177 	off_t last_offset;
178 
179 	/* stonewalled = 1 when we got cut off before submitting all our I/O */
180 	int stonewalled;
181 
182 	/* list management */
183 	struct io_oper *next;
184 	struct io_oper *prev;
185 
186 	struct timeval start_time;
187 
188 	char *file_name;
189 };
190 
191 /* a single io, and all the tracking needed for it */
192 struct io_unit {
193 	/* note, iocb must go first! */
194 	struct iocb iocb;
195 
196 	/* pointer to parent io operation struct */
197 	struct io_oper *io_oper;
198 
199 	/* aligned buffer */
200 	char *buf;
201 
202 	/* size of the aligned buffer (record size) */
203 	int buf_size;
204 
205 	/* state of this io unit (free, pending, done) */
206 	int busy;
207 
208 	/* result of last operation */
209 	long res;
210 
211 	struct io_unit *next;
212 
213 	struct timeval io_start_time;	/* time of io_submit */
214 };
215 
216 struct thread_info {
217 	io_context_t io_ctx;
218 	pthread_t tid;
219 
220 	/* allocated array of io_unit structs */
221 	struct io_unit *ios;
222 
223 	/* list of io units available for io */
224 	struct io_unit *free_ious;
225 
226 	/* number of io units in the I/O array */
227 	int num_global_ios;
228 
229 	/* number of io units in flight */
230 	int num_global_pending;
231 
232 	/* preallocated array of iocb pointers, only used in run_active */
233 	struct iocb **iocbs;
234 
235 	/* preallocated array of events */
236 	struct io_event *events;
237 
238 	/* size of the events array */
239 	int num_global_events;
240 
241 	/* latency stats for io_submit */
242 	struct io_latency io_submit_latency;
243 
244 	/* list of operations still in progress, and of those finished */
245 	struct io_oper *active_opers;
246 	struct io_oper *finished_opers;
247 
248 	/* number of files this thread is doing io on */
249 	int num_files;
250 
251 	/* how much io this thread did in the last stage */
252 	double stage_mb_trans;
253 
254 	/* latency completion stats i/o time from io_submit until io_getevents */
255 	struct io_latency io_completion_latency;
256 };
257 
258 /*
259  * return seconds between start_tv and stop_tv in double precision
260  */
time_since(struct timeval * start_tv,struct timeval * stop_tv)261 static double time_since(struct timeval *start_tv, struct timeval *stop_tv)
262 {
263 	double sec, usec;
264 	double ret;
265 	sec = stop_tv->tv_sec - start_tv->tv_sec;
266 	usec = stop_tv->tv_usec - start_tv->tv_usec;
267 	if (sec > 0 && usec < 0) {
268 		sec--;
269 		usec += 1000000;
270 	}
271 	ret = sec + usec / (double)1000000;
272 	if (ret < 0)
273 		ret = 0;
274 	return ret;
275 }
276 
277 /*
278  * return seconds between start_tv and now in double precision
279  */
time_since_now(struct timeval * start_tv)280 static double time_since_now(struct timeval *start_tv)
281 {
282 	struct timeval stop_time;
283 	gettimeofday(&stop_time, NULL);
284 	return time_since(start_tv, &stop_time);
285 }
286 
287 /*
288  * Add latency info to latency struct
289  */
calc_latency(struct timeval * start_tv,struct timeval * stop_tv,struct io_latency * lat)290 static void calc_latency(struct timeval *start_tv, struct timeval *stop_tv,
291 			 struct io_latency *lat)
292 {
293 	double delta;
294 	int i;
295 	delta = time_since(start_tv, stop_tv);
296 	delta = delta * 1000;
297 
298 	if (delta > lat->max)
299 		lat->max = delta;
300 	if (!lat->min || delta < lat->min)
301 		lat->min = delta;
302 	lat->total_io++;
303 	lat->total_lat += delta;
304 	for (i = 0; i < DEVIATIONS; i++) {
305 		if (delta < deviations[i]) {
306 			lat->deviations[i]++;
307 			break;
308 		}
309 	}
310 }
311 
oper_list_add(struct io_oper * oper,struct io_oper ** list)312 static void oper_list_add(struct io_oper *oper, struct io_oper **list)
313 {
314 	if (!*list) {
315 		*list = oper;
316 		oper->prev = oper->next = oper;
317 		return;
318 	}
319 	oper->prev = (*list)->prev;
320 	oper->next = *list;
321 	(*list)->prev->next = oper;
322 	(*list)->prev = oper;
323 	return;
324 }
325 
oper_list_del(struct io_oper * oper,struct io_oper ** list)326 static void oper_list_del(struct io_oper *oper, struct io_oper **list)
327 {
328 	if ((*list)->next == (*list)->prev && *list == (*list)->next) {
329 		*list = NULL;
330 		return;
331 	}
332 	oper->prev->next = oper->next;
333 	oper->next->prev = oper->prev;
334 	if (*list == oper)
335 		*list = oper->next;
336 }
337 
338 /* worker func to check error fields in the io unit */
check_finished_io(struct io_unit * io)339 static int check_finished_io(struct io_unit *io)
340 {
341 	int i;
342 	if (io->res != io->buf_size) {
343 
344 		struct stat s;
345 		fstat(io->io_oper->fd, &s);
346 
347 		/*
348 		 * If file size is large enough for the read, then this short
349 		 * read is an error.
350 		 */
351 		if ((io->io_oper->rw == READ || io->io_oper->rw == RREAD) &&
352 		    s.st_size > (io->iocb.u.c.offset + io->res)) {
353 
354 			fprintf(stderr,
355 				"io err %lu (%s) op %d, off %Lu size %d\n",
356 				io->res, strerror(-io->res),
357 				io->iocb.aio_lio_opcode, io->iocb.u.c.offset,
358 				io->buf_size);
359 			io->io_oper->last_err = io->res;
360 			io->io_oper->num_err++;
361 			return -1;
362 		}
363 	}
364 	if (verify && io->io_oper->rw == READ) {
365 		if (memcmp(io->buf, verify_buf, io->io_oper->reclen)) {
366 			fprintf(stderr,
367 				"verify error, file %s offset %Lu contents (offset:bad:good):\n",
368 				io->io_oper->file_name, io->iocb.u.c.offset);
369 
370 			for (i = 0; i < io->io_oper->reclen; i++) {
371 				if (io->buf[i] != verify_buf[i]) {
372 					fprintf(stderr, "%d:%c:%c ", i,
373 						io->buf[i], verify_buf[i]);
374 				}
375 			}
376 			fprintf(stderr, "\n");
377 		}
378 
379 	}
380 	return 0;
381 }
382 
383 /* worker func to check the busy bits and get an io unit ready for use */
grab_iou(struct io_unit * io,struct io_oper * oper)384 static int grab_iou(struct io_unit *io, struct io_oper *oper)
385 {
386 	if (io->busy == IO_PENDING)
387 		return -1;
388 
389 	io->busy = IO_PENDING;
390 	io->res = 0;
391 	io->io_oper = oper;
392 	return 0;
393 }
394 
stage_name(int rw)395 char *stage_name(int rw)
396 {
397 	switch (rw) {
398 	case WRITE:
399 		return "write";
400 	case READ:
401 		return "read";
402 	case RWRITE:
403 		return "random write";
404 	case RREAD:
405 		return "random read";
406 	}
407 	return "unknown";
408 }
409 
oper_mb_trans(struct io_oper * oper)410 static inline double oper_mb_trans(struct io_oper *oper)
411 {
412 	return ((double)oper->started_ios * (double)oper->reclen) /
413 	    (double)(1024 * 1024);
414 }
415 
print_time(struct io_oper * oper)416 static void print_time(struct io_oper *oper)
417 {
418 	double runtime;
419 	double tput;
420 	double mb;
421 
422 	runtime = time_since_now(&oper->start_time);
423 	mb = oper_mb_trans(oper);
424 	tput = mb / runtime;
425 	fprintf(stderr, "%s on %s (%.2f MB/s) %.2f MB in %.2fs\n",
426 		stage_name(oper->rw), oper->file_name, tput, mb, runtime);
427 }
428 
print_lat(char * str,struct io_latency * lat)429 static void print_lat(char *str, struct io_latency *lat)
430 {
431 	double avg = lat->total_lat / lat->total_io;
432 	int i;
433 	double total_counted = 0;
434 	fprintf(stderr, "%s min %.2f avg %.2f max %.2f\n\t",
435 		str, lat->min, avg, lat->max);
436 
437 	for (i = 0; i < DEVIATIONS; i++) {
438 		fprintf(stderr, " %.0f < %d", lat->deviations[i],
439 			deviations[i]);
440 		total_counted += lat->deviations[i];
441 	}
442 	if (total_counted && lat->total_io - total_counted)
443 		fprintf(stderr, " < %.0f", lat->total_io - total_counted);
444 	fprintf(stderr, "\n");
445 	memset(lat, 0, sizeof(*lat));
446 }
447 
print_latency(struct thread_info * t)448 static void print_latency(struct thread_info *t)
449 {
450 	struct io_latency *lat = &t->io_submit_latency;
451 	print_lat("latency", lat);
452 }
453 
print_completion_latency(struct thread_info * t)454 static void print_completion_latency(struct thread_info *t)
455 {
456 	struct io_latency *lat = &t->io_completion_latency;
457 	print_lat("completion latency", lat);
458 }
459 
460 /*
461  * updates the fields in the io operation struct that belongs to this
462  * io unit, and make the io unit reusable again
463  */
finish_io(struct thread_info * t,struct io_unit * io,long result,struct timeval * tv_now)464 void finish_io(struct thread_info *t, struct io_unit *io, long result,
465 	       struct timeval *tv_now)
466 {
467 	struct io_oper *oper = io->io_oper;
468 
469 	calc_latency(&io->io_start_time, tv_now, &t->io_completion_latency);
470 	io->res = result;
471 	io->busy = IO_FREE;
472 	io->next = t->free_ious;
473 	t->free_ious = io;
474 	oper->num_pending--;
475 	t->num_global_pending--;
476 	check_finished_io(io);
477 	if (oper->num_pending == 0 &&
478 	    (oper->started_ios == oper->total_ios || oper->stonewalled)) {
479 		print_time(oper);
480 	}
481 }
482 
read_some_events(struct thread_info * t)483 int read_some_events(struct thread_info *t)
484 {
485 	struct io_unit *event_io;
486 	struct io_event *event;
487 	int nr;
488 	int i;
489 	int min_nr = io_iter;
490 	struct timeval stop_time;
491 
492 	if (t->num_global_pending < io_iter)
493 		min_nr = t->num_global_pending;
494 
495 #ifdef NEW_GETEVENTS
496 	nr = io_getevents(t->io_ctx, min_nr, t->num_global_events, t->events,
497 			  NULL);
498 #else
499 	nr = io_getevents(t->io_ctx, t->num_global_events, t->events, NULL);
500 #endif
501 	if (nr <= 0)
502 		return nr;
503 
504 	gettimeofday(&stop_time, NULL);
505 	for (i = 0; i < nr; i++) {
506 		event = t->events + i;
507 		event_io = (struct io_unit *)((unsigned long)event->obj);
508 		finish_io(t, event_io, event->res, &stop_time);
509 	}
510 	return nr;
511 }
512 
513 /*
514  * finds a free io unit, waiting for pending requests if required.  returns
515  * null if none could be found
516  */
find_iou(struct thread_info * t,struct io_oper * oper)517 static struct io_unit *find_iou(struct thread_info *t, struct io_oper *oper)
518 {
519 	struct io_unit *event_io;
520 	int nr;
521 
522 retry:
523 	if (t->free_ious) {
524 		event_io = t->free_ious;
525 		t->free_ious = t->free_ious->next;
526 		if (grab_iou(event_io, oper)) {
527 			fprintf(stderr, "io unit on free list but not free\n");
528 			abort();
529 		}
530 		return event_io;
531 	}
532 	nr = read_some_events(t);
533 	if (nr > 0)
534 		goto retry;
535 	else
536 		fprintf(stderr, "no free ious after read_some_events\n");
537 	return NULL;
538 }
539 
540 /*
541  * wait for all pending requests for this io operation to finish
542  */
io_oper_wait(struct thread_info * t,struct io_oper * oper)543 static int io_oper_wait(struct thread_info *t, struct io_oper *oper)
544 {
545 	struct io_event event;
546 	struct io_unit *event_io;
547 
548 	if (oper == NULL) {
549 		return 0;
550 	}
551 
552 	if (oper->num_pending == 0)
553 		goto done;
554 
555 	/* this func is not speed sensitive, no need to go wild reading
556 	 * more than one event at a time
557 	 */
558 #ifdef NEW_GETEVENTS
559 	while (io_getevents(t->io_ctx, 1, 1, &event, NULL) > 0) {
560 #else
561 	while (io_getevents(t->io_ctx, 1, &event, NULL) > 0) {
562 #endif
563 		struct timeval tv_now;
564 		event_io = (struct io_unit *)((unsigned long)event.obj);
565 
566 		gettimeofday(&tv_now, NULL);
567 		finish_io(t, event_io, event.res, &tv_now);
568 
569 		if (oper->num_pending == 0)
570 			break;
571 	}
572 done:
573 	if (oper->num_err) {
574 		fprintf(stderr, "%u errors on oper, last %u\n",
575 			oper->num_err, oper->last_err);
576 	}
577 	return 0;
578 }
579 
580 off_t random_byte_offset(struct io_oper * oper)
581 {
582 	off_t num;
583 	off_t rand_byte = oper->start;
584 	off_t range;
585 	off_t offset = 1;
586 
587 	range = (oper->end - oper->start) / (1024 * 1024);
588 	if ((page_size_mask + 1) > (1024 * 1024))
589 		offset = (page_size_mask + 1) / (1024 * 1024);
590 	if (range < offset)
591 		range = 0;
592 	else
593 		range -= offset;
594 
595 	/* find a random mb offset */
596 	num = 1 + (int)((double)range * rand() / (RAND_MAX + 1.0));
597 	rand_byte += num * 1024 * 1024;
598 
599 	/* find a random byte offset */
600 	num = 1 + (int)((double)(1024 * 1024) * rand() / (RAND_MAX + 1.0));
601 
602 	/* page align */
603 	num = (num + page_size_mask) & ~page_size_mask;
604 	rand_byte += num;
605 
606 	if (rand_byte + oper->reclen > oper->end) {
607 		rand_byte -= oper->reclen;
608 	}
609 	return rand_byte;
610 }
611 
612 /*
613  * build an aio iocb for an operation, based on oper->rw and the
614  * last offset used.  This finds the struct io_unit that will be attached
615  * to the iocb, and things are ready for submission to aio after this
616  * is called.
617  *
618  * returns null on error
619  */
620 static struct io_unit *build_iocb(struct thread_info *t, struct io_oper *oper)
621 {
622 	struct io_unit *io;
623 	off_t rand_byte;
624 
625 	io = find_iou(t, oper);
626 	if (!io) {
627 		fprintf(stderr, "unable to find io unit\n");
628 		return NULL;
629 	}
630 
631 	switch (oper->rw) {
632 	case WRITE:
633 		io_prep_pwrite(&io->iocb, oper->fd, io->buf, oper->reclen,
634 			       oper->last_offset);
635 		oper->last_offset += oper->reclen;
636 		break;
637 	case READ:
638 		io_prep_pread(&io->iocb, oper->fd, io->buf, oper->reclen,
639 			      oper->last_offset);
640 		oper->last_offset += oper->reclen;
641 		break;
642 	case RREAD:
643 		rand_byte = random_byte_offset(oper);
644 		oper->last_offset = rand_byte;
645 		io_prep_pread(&io->iocb, oper->fd, io->buf, oper->reclen,
646 			      rand_byte);
647 		break;
648 	case RWRITE:
649 		rand_byte = random_byte_offset(oper);
650 		oper->last_offset = rand_byte;
651 		io_prep_pwrite(&io->iocb, oper->fd, io->buf, oper->reclen,
652 			       rand_byte);
653 
654 		break;
655 	}
656 
657 	return io;
658 }
659 
660 /*
661  * wait for any pending requests, and then free all ram associated with
662  * an operation.  returns the last error the operation hit (zero means none)
663  */
664 static int finish_oper(struct thread_info *t, struct io_oper *oper)
665 {
666 	unsigned long last_err;
667 
668 	io_oper_wait(t, oper);
669 	last_err = oper->last_err;
670 	if (oper->num_pending > 0) {
671 		fprintf(stderr, "oper num_pending is %d\n", oper->num_pending);
672 	}
673 	close(oper->fd);
674 	free(oper);
675 	return last_err;
676 }
677 
678 /*
679  * allocates an io operation and fills in all the fields.  returns
680  * null on error
681  */
682 static struct io_oper *create_oper(int fd, int rw, off_t start, off_t end,
683 				   int reclen, int depth, int iter,
684 				   char *file_name)
685 {
686 	struct io_oper *oper;
687 
688 	oper = malloc(sizeof(*oper));
689 	if (!oper) {
690 		fprintf(stderr, "unable to allocate io oper\n");
691 		return NULL;
692 	}
693 	memset(oper, 0, sizeof(*oper));
694 
695 	oper->depth = depth;
696 	oper->start = start;
697 	oper->end = end;
698 	oper->last_offset = oper->start;
699 	oper->fd = fd;
700 	oper->reclen = reclen;
701 	oper->rw = rw;
702 	oper->total_ios = (oper->end - oper->start) / oper->reclen;
703 	oper->file_name = file_name;
704 
705 	return oper;
706 }
707 
708 /*
709  * does setup on num_ios worth of iocbs, but does not actually
710  * start any io
711  */
712 int build_oper(struct thread_info *t, struct io_oper *oper, int num_ios,
713 	       struct iocb **my_iocbs)
714 {
715 	int i;
716 	struct io_unit *io;
717 
718 	if (oper->started_ios == 0)
719 		gettimeofday(&oper->start_time, NULL);
720 
721 	if (num_ios == 0)
722 		num_ios = oper->total_ios;
723 
724 	if ((oper->started_ios + num_ios) > oper->total_ios)
725 		num_ios = oper->total_ios - oper->started_ios;
726 
727 	for (i = 0; i < num_ios; i++) {
728 		io = build_iocb(t, oper);
729 		if (!io) {
730 			return -1;
731 		}
732 		my_iocbs[i] = &io->iocb;
733 	}
734 	return num_ios;
735 }
736 
737 /*
738  * runs through the iocbs in the array provided and updates
739  * counters in the associated oper struct
740  */
741 static void update_iou_counters(struct iocb **my_iocbs, int nr,
742 				struct timeval *tv_now)
743 {
744 	struct io_unit *io;
745 	int i;
746 	for (i = 0; i < nr; i++) {
747 		io = (struct io_unit *)(my_iocbs[i]);
748 		io->io_oper->num_pending++;
749 		io->io_oper->started_ios++;
750 		io->io_start_time = *tv_now;	/* set time of io_submit */
751 	}
752 }
753 
754 /* starts some io for a given file, returns zero if all went well */
755 int run_built(struct thread_info *t, int num_ios, struct iocb **my_iocbs)
756 {
757 	int ret;
758 	struct timeval start_time;
759 	struct timeval stop_time;
760 
761 resubmit:
762 	gettimeofday(&start_time, NULL);
763 	ret = io_submit(t->io_ctx, num_ios, my_iocbs);
764 	gettimeofday(&stop_time, NULL);
765 	calc_latency(&start_time, &stop_time, &t->io_submit_latency);
766 
767 	if (ret != num_ios) {
768 		/* some I/O got through */
769 		if (ret > 0) {
770 			update_iou_counters(my_iocbs, ret, &stop_time);
771 			my_iocbs += ret;
772 			t->num_global_pending += ret;
773 			num_ios -= ret;
774 		}
775 		/*
776 		 * we've used all the requests allocated in aio_init, wait and
777 		 * retry
778 		 */
779 		if (ret > 0 || ret == -EAGAIN) {
780 			int old_ret = ret;
781 			if ((ret = read_some_events(t) > 0)) {
782 				goto resubmit;
783 			} else {
784 				fprintf(stderr, "ret was %d and now is %d\n",
785 					ret, old_ret);
786 				abort();
787 			}
788 		}
789 
790 		fprintf(stderr, "ret %d (%s) on io_submit\n", ret,
791 			strerror(-ret));
792 		return -1;
793 	}
794 	update_iou_counters(my_iocbs, ret, &stop_time);
795 	t->num_global_pending += ret;
796 	return 0;
797 }
798 
799 /*
800  * changes oper->rw to the next in a command sequence, or returns zero
801  * to say this operation is really, completely done for
802  */
803 static int restart_oper(struct io_oper *oper)
804 {
805 	int new_rw = 0;
806 	if (oper->last_err)
807 		return 0;
808 
809 	/* this switch falls through */
810 	switch (oper->rw) {
811 	case WRITE:
812 		if (stages & (1 << READ))
813 			new_rw = READ;
814 	case READ:
815 		if (!new_rw && stages & (1 << RWRITE))
816 			new_rw = RWRITE;
817 	case RWRITE:
818 		if (!new_rw && stages & (1 << RREAD))
819 			new_rw = RREAD;
820 	}
821 
822 	if (new_rw) {
823 		oper->started_ios = 0;
824 		oper->last_offset = oper->start;
825 		oper->stonewalled = 0;
826 
827 		/*
828 		 * we're restarting an operation with pending requests, so the
829 		 * timing info won't be printed by finish_io.  Printing it here
830 		 */
831 		if (oper->num_pending)
832 			print_time(oper);
833 
834 		oper->rw = new_rw;
835 		return 1;
836 	}
837 	return 0;
838 }
839 
840 static int oper_runnable(struct io_oper *oper)
841 {
842 	struct stat buf;
843 	int ret;
844 
845 	/* first context is always runnable, if started_ios > 0, no need to
846 	 * redo the calculations
847 	 */
848 	if (oper->started_ios || oper->start == 0)
849 		return 1;
850 	/*
851 	 * only the sequential phases force delays in starting */
852 	if (oper->rw >= RWRITE)
853 		return 1;
854 	ret = fstat(oper->fd, &buf);
855 	if (ret < 0) {
856 		perror("fstat");
857 		exit(1);
858 	}
859 	if (S_ISREG(buf.st_mode) && buf.st_size < oper->start)
860 		return 0;
861 	return 1;
862 }
863 
864 /*
865  * runs through all the io operations on the active list, and starts
866  * a chunk of io on each.  If any io operations are completely finished,
867  * it either switches them to the next stage or puts them on the
868  * finished list.
869  *
870  * this function stops after max_io_submit iocbs are sent down the
871  * pipe, even if it has not yet touched all the operations on the
872  * active list.  Any operations that have finished are moved onto
873  * the finished_opers list.
874  */
875 static int run_active_list(struct thread_info *t,
876 			   int io_iter, int max_io_submit)
877 {
878 	struct io_oper *oper;
879 	struct io_oper *built_opers = NULL;
880 	struct iocb **my_iocbs = t->iocbs;
881 	int ret = 0;
882 	int num_built = 0;
883 
884 	oper = t->active_opers;
885 	while (oper) {
886 		if (!oper_runnable(oper)) {
887 			oper = oper->next;
888 			if (oper == t->active_opers)
889 				break;
890 			continue;
891 		}
892 		ret = build_oper(t, oper, io_iter, my_iocbs);
893 		if (ret >= 0) {
894 			my_iocbs += ret;
895 			num_built += ret;
896 			oper_list_del(oper, &t->active_opers);
897 			oper_list_add(oper, &built_opers);
898 			oper = t->active_opers;
899 			if (num_built + io_iter > max_io_submit)
900 				break;
901 		} else
902 			break;
903 	}
904 	if (num_built) {
905 		ret = run_built(t, num_built, t->iocbs);
906 		if (ret < 0) {
907 			fprintf(stderr, "error %d on run_built\n", ret);
908 			exit(1);
909 		}
910 		while (built_opers) {
911 			oper = built_opers;
912 			oper_list_del(oper, &built_opers);
913 			oper_list_add(oper, &t->active_opers);
914 			if (oper->started_ios == oper->total_ios) {
915 				oper_list_del(oper, &t->active_opers);
916 				oper_list_add(oper, &t->finished_opers);
917 			}
918 		}
919 	}
920 	return 0;
921 }
922 
923 void drop_shm()
924 {
925 	int ret;
926 	struct shmid_ds ds;
927 	if (use_shm != USE_SHM)
928 		return;
929 
930 	ret = shmctl(shm_id, IPC_RMID, &ds);
931 	if (ret) {
932 		perror("shmctl IPC_RMID");
933 	}
934 }
935 
936 void aio_setup(io_context_t * io_ctx, int n)
937 {
938 	int res = io_queue_init(n, io_ctx);
939 	if (res != 0) {
940 		fprintf(stderr, "io_queue_setup(%d) returned %d (%s)\n",
941 			n, res, strerror(-res));
942 		exit(3);
943 	}
944 }
945 
946 /*
947  * allocate io operation and event arrays for a given thread
948  */
949 int setup_ious(struct thread_info *t,
950 	       int num_files, int depth, int reclen, int max_io_submit)
951 {
952 	int i;
953 	size_t bytes = num_files * depth * sizeof(*t->ios);
954 
955 	t->ios = malloc(bytes);
956 	if (!t->ios) {
957 		fprintf(stderr, "unable to allocate io units\n");
958 		return -1;
959 	}
960 	memset(t->ios, 0, bytes);
961 
962 	for (i = 0; i < depth * num_files; i++) {
963 		t->ios[i].buf = aligned_buffer;
964 		aligned_buffer += padded_reclen;
965 		t->ios[i].buf_size = reclen;
966 		if (verify)
967 			memset(t->ios[i].buf, 'b', reclen);
968 		else
969 			memset(t->ios[i].buf, 0, reclen);
970 		t->ios[i].next = t->free_ious;
971 		t->free_ious = t->ios + i;
972 	}
973 	if (verify) {
974 		verify_buf = aligned_buffer;
975 		memset(verify_buf, 'b', reclen);
976 	}
977 
978 	t->iocbs = malloc(sizeof(struct iocb *) * max_io_submit);
979 	if (!t->iocbs) {
980 		fprintf(stderr, "unable to allocate iocbs\n");
981 		goto free_buffers;
982 	}
983 
984 	memset(t->iocbs, 0, max_io_submit * sizeof(struct iocb *));
985 
986 	t->events = malloc(sizeof(struct io_event) * depth * num_files);
987 	if (!t->events) {
988 		fprintf(stderr, "unable to allocate ram for events\n");
989 		goto free_buffers;
990 	}
991 	memset(t->events, 0, num_files * sizeof(struct io_event) * depth);
992 
993 	t->num_global_ios = num_files * depth;
994 	t->num_global_events = t->num_global_ios;
995 	return 0;
996 
997 free_buffers:
998 	free(t->ios);
999 	free(t->iocbs);
1000 	free(t->events);
1001 	return -1;
1002 }
1003 
1004 /*
1005  * The buffers used for file data are allocated as a single big
1006  * malloc, and then each thread and operation takes a piece and uses
1007  * that for file data.  This lets us do a large shm or bigpages alloc
1008  * and without trying to find a special place in each thread to map the
1009  * buffers to
1010  */
1011 int setup_shared_mem(int num_threads, int num_files, int depth,
1012 		     int reclen, int max_io_submit)
1013 {
1014 	char *p = NULL;
1015 	size_t total_ram;
1016 
1017 	padded_reclen = (reclen + page_size_mask) / (page_size_mask + 1);
1018 	padded_reclen = padded_reclen * (page_size_mask + 1);
1019 	total_ram = num_files * depth * padded_reclen + num_threads;
1020 	if (verify)
1021 		total_ram += padded_reclen;
1022 
1023 	/* for aligning buffer after the allocation */
1024 	total_ram += page_size_mask;
1025 
1026 	if (use_shm == USE_MALLOC) {
1027 		p = malloc(total_ram);
1028 	} else if (use_shm == USE_SHM) {
1029 		shm_id = shmget(IPC_PRIVATE, total_ram, IPC_CREAT | 0700);
1030 		if (shm_id < 0) {
1031 			perror("shmget");
1032 			drop_shm();
1033 			goto free_buffers;
1034 		}
1035 		p = shmat(shm_id, (char *)0x50000000, 0);
1036 		if ((long)p == -1) {
1037 			perror("shmat");
1038 			goto free_buffers;
1039 		}
1040 		/* won't really be dropped until we shmdt */
1041 		drop_shm();
1042 	} else if (use_shm == USE_SHMFS) {
1043 		char mmap_name[16];	/* /dev/shm/ + null + XXXXXX */
1044 		int fd;
1045 
1046 		strcpy(mmap_name, "/dev/shm/XXXXXX");
1047 		fd = mkstemp(mmap_name);
1048 		if (fd < 0) {
1049 			perror("mkstemp");
1050 			goto free_buffers;
1051 		}
1052 		unlink(mmap_name);
1053 		ftruncate(fd, total_ram);
1054 		shm_id = fd;
1055 		p = mmap((char *)0x50000000, total_ram,
1056 			 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
1057 
1058 		if (p == MAP_FAILED) {
1059 			perror("mmap");
1060 			goto free_buffers;
1061 		}
1062 	}
1063 	if (!p) {
1064 		fprintf(stderr, "unable to allocate buffers\n");
1065 		goto free_buffers;
1066 	}
1067 	unaligned_buffer = p;
1068 	p = (char *)((intptr_t) (p + page_size_mask) & ~page_size_mask);
1069 	aligned_buffer = p;
1070 	return 0;
1071 
1072 free_buffers:
1073 	drop_shm();
1074 	if (unaligned_buffer)
1075 		free(unaligned_buffer);
1076 	return -1;
1077 }
1078 
1079 /*
1080  * runs through all the thread_info structs and calculates a combined
1081  * throughput
1082  */
1083 void global_thread_throughput(struct thread_info *t, char *this_stage)
1084 {
1085 	int i;
1086 	double runtime = time_since_now(&global_stage_start_time);
1087 	double total_mb = 0;
1088 	double min_trans = 0;
1089 
1090 	for (i = 0; i < num_threads; i++) {
1091 		total_mb += global_thread_info[i].stage_mb_trans;
1092 		if (!min_trans || t->stage_mb_trans < min_trans)
1093 			min_trans = t->stage_mb_trans;
1094 	}
1095 	if (total_mb) {
1096 		fprintf(stderr, "%s throughput (%.2f MB/s) ", this_stage,
1097 			total_mb / runtime);
1098 		fprintf(stderr, "%.2f MB in %.2fs", total_mb, runtime);
1099 		if (stonewall)
1100 			fprintf(stderr, " min transfer %.2fMB", min_trans);
1101 		fprintf(stderr, "\n");
1102 	}
1103 }
1104 
1105 /* this is the meat of the state machine.  There is a list of
1106  * active operations structs, and as each one finishes the required
1107  * io it is moved to a list of finished operations.  Once they have
1108  * all finished whatever stage they were in, they are given the chance
1109  * to restart and pick a different stage (read/write/random read etc)
1110  *
1111  * various timings are printed in between the stages, along with
1112  * thread synchronization if there are more than one threads.
1113  */
1114 int worker(struct thread_info *t)
1115 {
1116 	struct io_oper *oper;
1117 	char *this_stage = NULL;
1118 	struct timeval stage_time;
1119 	int status = 0;
1120 	int iteration = 0;
1121 	int cnt;
1122 
1123 	aio_setup(&t->io_ctx, 512);
1124 
1125 restart:
1126 	if (num_threads > 1) {
1127 		pthread_mutex_lock(&stage_mutex);
1128 		threads_starting++;
1129 		if (threads_starting == num_threads) {
1130 			threads_ending = 0;
1131 			gettimeofday(&global_stage_start_time, NULL);
1132 			pthread_cond_broadcast(&stage_cond);
1133 		}
1134 		while (threads_starting != num_threads)
1135 			pthread_cond_wait(&stage_cond, &stage_mutex);
1136 		pthread_mutex_unlock(&stage_mutex);
1137 	}
1138 	if (t->active_opers) {
1139 		this_stage = stage_name(t->active_opers->rw);
1140 		gettimeofday(&stage_time, NULL);
1141 		t->stage_mb_trans = 0;
1142 	}
1143 
1144 	cnt = 0;
1145 	/* first we send everything through aio */
1146 	while (t->active_opers
1147 	       && (cnt < iterations || iterations == RUN_FOREVER)) {
1148 		if (stonewall && threads_ending) {
1149 			oper = t->active_opers;
1150 			oper->stonewalled = 1;
1151 			oper_list_del(oper, &t->active_opers);
1152 			oper_list_add(oper, &t->finished_opers);
1153 		} else {
1154 			run_active_list(t, io_iter, max_io_submit);
1155 		}
1156 		cnt++;
1157 	}
1158 	if (latency_stats)
1159 		print_latency(t);
1160 
1161 	if (completion_latency_stats)
1162 		print_completion_latency(t);
1163 
1164 	/* then we wait for all the operations to finish */
1165 	oper = t->finished_opers;
1166 	do {
1167 		if (!oper)
1168 			break;
1169 		io_oper_wait(t, oper);
1170 		oper = oper->next;
1171 	} while (oper != t->finished_opers);
1172 
1173 	/* then we do an fsync to get the timing for any future operations
1174 	 * right, and check to see if any of these need to get restarted
1175 	 */
1176 	oper = t->finished_opers;
1177 	while (oper) {
1178 		if (fsync_stages)
1179 			fsync(oper->fd);
1180 		t->stage_mb_trans += oper_mb_trans(oper);
1181 		if (restart_oper(oper)) {
1182 			oper_list_del(oper, &t->finished_opers);
1183 			oper_list_add(oper, &t->active_opers);
1184 			oper = t->finished_opers;
1185 			continue;
1186 		}
1187 		oper = oper->next;
1188 		if (oper == t->finished_opers)
1189 			break;
1190 	}
1191 
1192 	if (t->stage_mb_trans && t->num_files > 0) {
1193 		double seconds = time_since_now(&stage_time);
1194 		fprintf(stderr,
1195 			"thread %td %s totals (%.2f MB/s) %.2f MB in %.2fs\n",
1196 			t - global_thread_info, this_stage,
1197 			t->stage_mb_trans / seconds, t->stage_mb_trans,
1198 			seconds);
1199 	}
1200 
1201 	if (num_threads > 1) {
1202 		pthread_mutex_lock(&stage_mutex);
1203 		threads_ending++;
1204 		if (threads_ending == num_threads) {
1205 			threads_starting = 0;
1206 			pthread_cond_broadcast(&stage_cond);
1207 			global_thread_throughput(t, this_stage);
1208 		}
1209 		while (threads_ending != num_threads)
1210 			pthread_cond_wait(&stage_cond, &stage_mutex);
1211 		pthread_mutex_unlock(&stage_mutex);
1212 	}
1213 
1214 	/* someone got restarted, go back to the beginning */
1215 	if (t->active_opers && (cnt < iterations || iterations == RUN_FOREVER)) {
1216 		iteration++;
1217 		goto restart;
1218 	}
1219 
1220 	/* finally, free all the ram */
1221 	while (t->finished_opers) {
1222 		oper = t->finished_opers;
1223 		oper_list_del(oper, &t->finished_opers);
1224 		status = finish_oper(t, oper);
1225 	}
1226 
1227 	if (t->num_global_pending) {
1228 		fprintf(stderr, "global num pending is %d\n",
1229 			t->num_global_pending);
1230 	}
1231 	io_queue_release(t->io_ctx);
1232 
1233 	return status;
1234 }
1235 
1236 typedef void *(*start_routine) (void *);
1237 int run_workers(struct thread_info *t, int num_threads)
1238 {
1239 	int ret;
1240 	int i;
1241 
1242 	for (i = 0; i < num_threads; i++) {
1243 		ret =
1244 		    pthread_create(&t[i].tid, NULL, (start_routine) worker,
1245 				   t + i);
1246 		if (ret) {
1247 			perror("pthread_create");
1248 			exit(1);
1249 		}
1250 	}
1251 	for (i = 0; i < num_threads; i++) {
1252 		ret = pthread_join(t[i].tid, NULL);
1253 		if (ret) {
1254 			perror("pthread_join");
1255 			exit(1);
1256 		}
1257 	}
1258 	return 0;
1259 }
1260 
1261 off_t parse_size(char *size_arg, off_t mult)
1262 {
1263 	char c;
1264 	int num;
1265 	off_t ret;
1266 	c = size_arg[strlen(size_arg) - 1];
1267 	if (c > '9') {
1268 		size_arg[strlen(size_arg) - 1] = '\0';
1269 	}
1270 	num = atoi(size_arg);
1271 	switch (c) {
1272 	case 'g':
1273 	case 'G':
1274 		mult = 1024 * 1024 * 1024;
1275 		break;
1276 	case 'm':
1277 	case 'M':
1278 		mult = 1024 * 1024;
1279 		break;
1280 	case 'k':
1281 	case 'K':
1282 		mult = 1024;
1283 		break;
1284 	case 'b':
1285 	case 'B':
1286 		mult = 1;
1287 		break;
1288 	}
1289 	ret = mult * num;
1290 	return ret;
1291 }
1292 
1293 void print_usage(void)
1294 {
1295 	printf
1296 	    ("usage: aio-stress [-s size] [-r size] [-a size] [-d num] [-b num]\n");
1297 	printf
1298 	    ("                  [-i num] [-t num] [-c num] [-C size] [-nxhOS ]\n");
1299 	printf("                  file1 [file2 ...]\n");
1300 	printf("\t-a size in KB at which to align buffers\n");
1301 	printf("\t-b max number of iocbs to give io_submit at once\n");
1302 	printf("\t-c number of io contexts per file\n");
1303 	printf("\t-C offset between contexts, default 2MB\n");
1304 	printf("\t-s size in MB of the test file(s), default 1024MB\n");
1305 	printf("\t-r record size in KB used for each io, default 64KB\n");
1306 	printf
1307 	    ("\t-d number of pending aio requests for each file, default 64\n");
1308 	printf("\t-i number of I/O per file sent before switching\n"
1309 	       "\t   to the next file, default 8\n");
1310 	printf("\t-I total number of ayncs I/O the program will run, "
1311 	       "default is run until Cntl-C\n");
1312 	printf("\t-O Use O_DIRECT (not available in 2.4 kernels),\n");
1313 	printf("\t-S Use O_SYNC for writes\n");
1314 	printf("\t-o add an operation to the list: write=0, read=1,\n");
1315 	printf("\t   random write=2, random read=3.\n");
1316 	printf("\t   repeat -o to specify multiple ops: -o 0 -o 1 etc.\n");
1317 	printf
1318 	    ("\t-m shm use ipc shared memory for io buffers instead of malloc\n");
1319 	printf("\t-m shmfs mmap a file in /dev/shm for io buffers\n");
1320 	printf("\t-n no fsyncs between write stage and read stage\n");
1321 	printf("\t-l print io_submit latencies after each stage\n");
1322 	printf("\t-L print io completion latencies after each stage\n");
1323 	printf("\t-t number of threads to run\n");
1324 	printf("\t-u unlink files after completion\n");
1325 	printf("\t-v verification of bytes written\n");
1326 	printf("\t-x turn off thread stonewalling\n");
1327 	printf("\t-h this message\n");
1328 	printf
1329 	    ("\n\t   the size options (-a -s and -r) allow modifiers -s 400{k,m,g}\n");
1330 	printf("\t   translate to 400KB, 400MB and 400GB\n");
1331 	printf("version %s\n", PROG_VERSION);
1332 }
1333 
1334 int main(int ac, char **av)
1335 {
1336 	int rwfd;
1337 	int i;
1338 	int j;
1339 	int c;
1340 
1341 	off_t file_size = 1 * 1024 * 1024 * 1024;
1342 	int first_stage = WRITE;
1343 	struct io_oper *oper;
1344 	int status = 0;
1345 	int num_files = 0;
1346 	int open_fds = 0;
1347 	struct thread_info *t;
1348 
1349 	page_size_mask = getpagesize() - 1;
1350 
1351 	while (1) {
1352 		c = getopt(ac, av, "a:b:c:C:m:s:r:d:i:I:o:t:lLnhOSxvu");
1353 		if (c < 0)
1354 			break;
1355 
1356 		switch (c) {
1357 		case 'a':
1358 			page_size_mask = parse_size(optarg, 1024);
1359 			page_size_mask--;
1360 			break;
1361 		case 'c':
1362 			num_contexts = atoi(optarg);
1363 			break;
1364 		case 'C':
1365 			context_offset = parse_size(optarg, 1024 * 1024);
1366 		case 'b':
1367 			max_io_submit = atoi(optarg);
1368 			break;
1369 		case 's':
1370 			file_size = parse_size(optarg, 1024 * 1024);
1371 			break;
1372 		case 'd':
1373 			depth = atoi(optarg);
1374 			break;
1375 		case 'r':
1376 			rec_len = parse_size(optarg, 1024);
1377 			break;
1378 		case 'i':
1379 			io_iter = atoi(optarg);
1380 			break;
1381 		case 'I':
1382 			iterations = atoi(optarg);
1383 			break;
1384 		case 'n':
1385 			fsync_stages = 0;
1386 			break;
1387 		case 'l':
1388 			latency_stats = 1;
1389 			break;
1390 		case 'L':
1391 			completion_latency_stats = 1;
1392 			break;
1393 		case 'm':
1394 			if (!strcmp(optarg, "shm")) {
1395 				fprintf(stderr, "using ipc shm\n");
1396 				use_shm = USE_SHM;
1397 			} else if (!strcmp(optarg, "shmfs")) {
1398 				fprintf(stderr, "using /dev/shm for buffers\n");
1399 				use_shm = USE_SHMFS;
1400 			}
1401 			break;
1402 		case 'o':
1403 			i = atoi(optarg);
1404 			stages |= 1 << i;
1405 			fprintf(stderr, "adding stage %s\n", stage_name(i));
1406 			break;
1407 		case 'O':
1408 			o_direct = O_DIRECT;
1409 			break;
1410 		case 'S':
1411 			o_sync = O_SYNC;
1412 			break;
1413 		case 't':
1414 			num_threads = atoi(optarg);
1415 			break;
1416 		case 'x':
1417 			stonewall = 0;
1418 			break;
1419 		case 'u':
1420 			unlink_files = 1;
1421 			break;
1422 		case 'v':
1423 			verify = 1;
1424 			break;
1425 		case 'h':
1426 		default:
1427 			print_usage();
1428 			exit(1);
1429 		}
1430 	}
1431 
1432 	/*
1433 	 * make sure we don't try to submit more I/O than we have allocated
1434 	 * memory for
1435 	 */
1436 	if (depth < io_iter) {
1437 		io_iter = depth;
1438 		fprintf(stderr, "dropping io_iter to %d\n", io_iter);
1439 	}
1440 
1441 	if (optind >= ac) {
1442 		print_usage();
1443 		exit(1);
1444 	}
1445 
1446 	num_files = ac - optind;
1447 
1448 	if (num_threads > (num_files * num_contexts)) {
1449 		num_threads = num_files * num_contexts;
1450 		fprintf(stderr,
1451 			"dropping thread count to the number of contexts %d\n",
1452 			num_threads);
1453 	}
1454 
1455 	t = malloc(num_threads * sizeof(*t));
1456 	if (!t) {
1457 		perror("malloc");
1458 		exit(1);
1459 	}
1460 	memset(t, 0, num_threads * sizeof(*t));
1461 	global_thread_info = t;
1462 
1463 	/* by default, allow a huge number of iocbs to be sent towards
1464 	 * io_submit
1465 	 */
1466 	if (!max_io_submit)
1467 		max_io_submit = num_files * io_iter * num_contexts;
1468 
1469 	/*
1470 	 * make sure we don't try to submit more I/O than max_io_submit allows
1471 	 */
1472 	if (max_io_submit < io_iter) {
1473 		io_iter = max_io_submit;
1474 		fprintf(stderr, "dropping io_iter to %d\n", io_iter);
1475 	}
1476 
1477 	if (!stages) {
1478 		stages =
1479 		    (1 << WRITE) | (1 << READ) | (1 << RREAD) | (1 << RWRITE);
1480 	} else {
1481 		for (i = 0; i < LAST_STAGE; i++) {
1482 			if (stages & (1 << i)) {
1483 				first_stage = i;
1484 				fprintf(stderr, "starting with %s\n",
1485 					stage_name(i));
1486 				break;
1487 			}
1488 		}
1489 	}
1490 
1491 	if (file_size < num_contexts * context_offset) {
1492 		fprintf(stderr, "file size %ld too small for %d contexts\n",
1493 			(long)file_size, num_contexts);
1494 		exit(1);
1495 	}
1496 
1497 	fprintf(stderr, "file size %ldMB, record size %ldKB, depth %d, "
1498 		"I/O per iteration %d\n",
1499 		(long)(file_size / (1024 * 1024)),
1500 		rec_len / 1024, depth, io_iter);
1501 	fprintf(stderr, "max io_submit %d, buffer alignment set to %luKB\n",
1502 		max_io_submit, (page_size_mask + 1) / 1024);
1503 	fprintf(stderr, "threads %d files %d contexts %d context offset %ldMB "
1504 		"verification %s\n", num_threads, num_files, num_contexts,
1505 		(long)(context_offset / (1024 * 1024)), verify ? "on" : "off");
1506 	/* open all the files and do any required setup for them */
1507 	for (i = optind; i < ac; i++) {
1508 		int thread_index;
1509 		for (j = 0; j < num_contexts; j++) {
1510 			thread_index = open_fds % num_threads;
1511 			open_fds++;
1512 
1513 			rwfd =
1514 			    open(av[i], O_CREAT | O_RDWR | o_direct | o_sync,
1515 				 0600);
1516 			if (rwfd == -1) {
1517 				fprintf(stderr,
1518 					"error while creating file %s: %s",
1519 					av[i], strerror(errno));
1520 				exit(1);
1521 			}
1522 
1523 			oper =
1524 			    create_oper(rwfd, first_stage, j * context_offset,
1525 					file_size - j * context_offset, rec_len,
1526 					depth, io_iter, av[i]);
1527 			if (!oper) {
1528 				fprintf(stderr, "error in create_oper\n");
1529 				exit(-1);
1530 			}
1531 			oper_list_add(oper, &t[thread_index].active_opers);
1532 			t[thread_index].num_files++;
1533 		}
1534 	}
1535 	if (setup_shared_mem(num_threads, num_files * num_contexts,
1536 			     depth, rec_len, max_io_submit)) {
1537 		exit(1);
1538 	}
1539 	for (i = 0; i < num_threads; i++) {
1540 		if (setup_ious
1541 		    (&t[i], t[i].num_files, depth, rec_len, max_io_submit))
1542 			exit(1);
1543 	}
1544 	if (num_threads > 1) {
1545 		printf("Running multi thread version num_threads:%d\n",
1546 		       num_threads);
1547 		run_workers(t, num_threads);
1548 	} else {
1549 		printf("Running single thread version \n");
1550 		status = worker(t);
1551 	}
1552 	if (unlink_files) {
1553 		for (i = optind; i < ac; i++) {
1554 			printf("Cleaning up file %s \n", av[i]);
1555 			unlink(av[i]);
1556 		}
1557 	}
1558 
1559 	if (status) {
1560 		exit(1);
1561 	}
1562 	return status;
1563 }
1564 #else
1565 int main(void)
1566 {
1567 	fprintf(stderr, "test requires libaio and it's development packages\n");
1568 	return TCONF;
1569 }
1570 #endif
1571