• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * fio - the flexible io tester
3  *
4  * Copyright (C) 2005 Jens Axboe <axboe@suse.de>
5  * Copyright (C) 2006-2012 Jens Axboe <axboe@kernel.dk>
6  *
7  * The license below covers all files distributed with fio unless otherwise
8  * noted in the file itself.
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License version 2 as
12  *  published by the Free Software Foundation.
13  *
14  *  This program is distributed in the hope that it will be useful,
15  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *  GNU General Public License for more details.
18  *
19  *  You should have received a copy of the GNU General Public License
20  *  along with this program; if not, write to the Free Software
21  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
22  *
23  */
24 #include <unistd.h>
25 #include <fcntl.h>
26 #include <string.h>
27 #include <limits.h>
28 #include <signal.h>
29 #include <time.h>
30 #include <locale.h>
31 #include <assert.h>
32 #include <time.h>
33 #include <inttypes.h>
34 #include <sys/stat.h>
35 #include <sys/wait.h>
36 #include <sys/ipc.h>
37 #include <sys/mman.h>
38 
39 #include "fio.h"
40 #ifndef FIO_NO_HAVE_SHM_H
41 #include <sys/shm.h>
42 #endif
43 #include "hash.h"
44 #include "smalloc.h"
45 #include "verify.h"
46 #include "trim.h"
47 #include "diskutil.h"
48 #include "cgroup.h"
49 #include "profile.h"
50 #include "lib/rand.h"
51 #include "memalign.h"
52 #include "server.h"
53 #include "lib/getrusage.h"
54 #include "idletime.h"
55 #include "err.h"
56 #include "lib/tp.h"
57 
58 static pthread_t helper_thread;
59 static pthread_mutex_t helper_lock;
60 pthread_cond_t helper_cond;
61 int helper_do_stat = 0;
62 
63 static struct fio_mutex *startup_mutex;
64 static struct flist_head *cgroup_list;
65 static char *cgroup_mnt;
66 static int exit_value;
67 static volatile int fio_abort;
68 static unsigned int nr_process = 0;
69 static unsigned int nr_thread = 0;
70 
71 struct io_log *agg_io_log[DDIR_RWDIR_CNT];
72 
73 int groupid = 0;
74 unsigned int thread_number = 0;
75 unsigned int stat_number = 0;
76 int shm_id = 0;
77 int temp_stall_ts;
78 unsigned long done_secs = 0;
79 volatile int helper_exit = 0;
80 
81 #define PAGE_ALIGN(buf)	\
82 	(char *) (((uintptr_t) (buf) + page_mask) & ~page_mask)
83 
84 #define JOB_START_TIMEOUT	(5 * 1000)
85 
sig_int(int sig)86 static void sig_int(int sig)
87 {
88 	if (threads) {
89 		if (is_backend)
90 			fio_server_got_signal(sig);
91 		else {
92 			log_info("\nfio: terminating on signal %d\n", sig);
93 			log_info_flush();
94 			exit_value = 128;
95 		}
96 
97 		fio_terminate_threads(TERMINATE_ALL);
98 	}
99 }
100 
sig_show_status(int sig)101 static void sig_show_status(int sig)
102 {
103 	show_running_run_stats();
104 }
105 
set_sig_handlers(void)106 static void set_sig_handlers(void)
107 {
108 	struct sigaction act;
109 
110 	memset(&act, 0, sizeof(act));
111 	act.sa_handler = sig_int;
112 	act.sa_flags = SA_RESTART;
113 	sigaction(SIGINT, &act, NULL);
114 
115 	memset(&act, 0, sizeof(act));
116 	act.sa_handler = sig_int;
117 	act.sa_flags = SA_RESTART;
118 	sigaction(SIGTERM, &act, NULL);
119 
120 /* Windows uses SIGBREAK as a quit signal from other applications */
121 #ifdef WIN32
122 	memset(&act, 0, sizeof(act));
123 	act.sa_handler = sig_int;
124 	act.sa_flags = SA_RESTART;
125 	sigaction(SIGBREAK, &act, NULL);
126 #endif
127 
128 	memset(&act, 0, sizeof(act));
129 	act.sa_handler = sig_show_status;
130 	act.sa_flags = SA_RESTART;
131 	sigaction(SIGUSR1, &act, NULL);
132 
133 	if (is_backend) {
134 		memset(&act, 0, sizeof(act));
135 		act.sa_handler = sig_int;
136 		act.sa_flags = SA_RESTART;
137 		sigaction(SIGPIPE, &act, NULL);
138 	}
139 }
140 
141 /*
142  * Check if we are above the minimum rate given.
143  */
__check_min_rate(struct thread_data * td,struct timeval * now,enum fio_ddir ddir)144 static int __check_min_rate(struct thread_data *td, struct timeval *now,
145 			    enum fio_ddir ddir)
146 {
147 	unsigned long long bytes = 0;
148 	unsigned long iops = 0;
149 	unsigned long spent;
150 	unsigned long rate;
151 	unsigned int ratemin = 0;
152 	unsigned int rate_iops = 0;
153 	unsigned int rate_iops_min = 0;
154 
155 	assert(ddir_rw(ddir));
156 
157 	if (!td->o.ratemin[ddir] && !td->o.rate_iops_min[ddir])
158 		return 0;
159 
160 	/*
161 	 * allow a 2 second settle period in the beginning
162 	 */
163 	if (mtime_since(&td->start, now) < 2000)
164 		return 0;
165 
166 	iops += td->this_io_blocks[ddir];
167 	bytes += td->this_io_bytes[ddir];
168 	ratemin += td->o.ratemin[ddir];
169 	rate_iops += td->o.rate_iops[ddir];
170 	rate_iops_min += td->o.rate_iops_min[ddir];
171 
172 	/*
173 	 * if rate blocks is set, sample is running
174 	 */
175 	if (td->rate_bytes[ddir] || td->rate_blocks[ddir]) {
176 		spent = mtime_since(&td->lastrate[ddir], now);
177 		if (spent < td->o.ratecycle)
178 			return 0;
179 
180 		if (td->o.rate[ddir]) {
181 			/*
182 			 * check bandwidth specified rate
183 			 */
184 			if (bytes < td->rate_bytes[ddir]) {
185 				log_err("%s: min rate %u not met\n", td->o.name,
186 								ratemin);
187 				return 1;
188 			} else {
189 				if (spent)
190 					rate = ((bytes - td->rate_bytes[ddir]) * 1000) / spent;
191 				else
192 					rate = 0;
193 
194 				if (rate < ratemin ||
195 				    bytes < td->rate_bytes[ddir]) {
196 					log_err("%s: min rate %u not met, got"
197 						" %luKB/sec\n", td->o.name,
198 							ratemin, rate);
199 					return 1;
200 				}
201 			}
202 		} else {
203 			/*
204 			 * checks iops specified rate
205 			 */
206 			if (iops < rate_iops) {
207 				log_err("%s: min iops rate %u not met\n",
208 						td->o.name, rate_iops);
209 				return 1;
210 			} else {
211 				if (spent)
212 					rate = ((iops - td->rate_blocks[ddir]) * 1000) / spent;
213 				else
214 					rate = 0;
215 
216 				if (rate < rate_iops_min ||
217 				    iops < td->rate_blocks[ddir]) {
218 					log_err("%s: min iops rate %u not met,"
219 						" got %lu\n", td->o.name,
220 							rate_iops_min, rate);
221 				}
222 			}
223 		}
224 	}
225 
226 	td->rate_bytes[ddir] = bytes;
227 	td->rate_blocks[ddir] = iops;
228 	memcpy(&td->lastrate[ddir], now, sizeof(*now));
229 	return 0;
230 }
231 
check_min_rate(struct thread_data * td,struct timeval * now,uint64_t * bytes_done)232 static int check_min_rate(struct thread_data *td, struct timeval *now,
233 			  uint64_t *bytes_done)
234 {
235 	int ret = 0;
236 
237 	if (bytes_done[DDIR_READ])
238 		ret |= __check_min_rate(td, now, DDIR_READ);
239 	if (bytes_done[DDIR_WRITE])
240 		ret |= __check_min_rate(td, now, DDIR_WRITE);
241 	if (bytes_done[DDIR_TRIM])
242 		ret |= __check_min_rate(td, now, DDIR_TRIM);
243 
244 	return ret;
245 }
246 
247 /*
248  * When job exits, we can cancel the in-flight IO if we are using async
249  * io. Attempt to do so.
250  */
cleanup_pending_aio(struct thread_data * td)251 static void cleanup_pending_aio(struct thread_data *td)
252 {
253 	int r;
254 
255 	/*
256 	 * get immediately available events, if any
257 	 */
258 	r = io_u_queued_complete(td, 0, NULL);
259 	if (r < 0)
260 		return;
261 
262 	/*
263 	 * now cancel remaining active events
264 	 */
265 	if (td->io_ops->cancel) {
266 		struct io_u *io_u;
267 		int i;
268 
269 		io_u_qiter(&td->io_u_all, io_u, i) {
270 			if (io_u->flags & IO_U_F_FLIGHT) {
271 				r = td->io_ops->cancel(td, io_u);
272 				if (!r)
273 					put_io_u(td, io_u);
274 			}
275 		}
276 	}
277 
278 	if (td->cur_depth)
279 		r = io_u_queued_complete(td, td->cur_depth, NULL);
280 }
281 
282 /*
283  * Helper to handle the final sync of a file. Works just like the normal
284  * io path, just does everything sync.
285  */
fio_io_sync(struct thread_data * td,struct fio_file * f)286 static int fio_io_sync(struct thread_data *td, struct fio_file *f)
287 {
288 	struct io_u *io_u = __get_io_u(td);
289 	int ret;
290 
291 	if (!io_u)
292 		return 1;
293 
294 	io_u->ddir = DDIR_SYNC;
295 	io_u->file = f;
296 
297 	if (td_io_prep(td, io_u)) {
298 		put_io_u(td, io_u);
299 		return 1;
300 	}
301 
302 requeue:
303 	ret = td_io_queue(td, io_u);
304 	if (ret < 0) {
305 		td_verror(td, io_u->error, "td_io_queue");
306 		put_io_u(td, io_u);
307 		return 1;
308 	} else if (ret == FIO_Q_QUEUED) {
309 		if (io_u_queued_complete(td, 1, NULL) < 0)
310 			return 1;
311 	} else if (ret == FIO_Q_COMPLETED) {
312 		if (io_u->error) {
313 			td_verror(td, io_u->error, "td_io_queue");
314 			return 1;
315 		}
316 
317 		if (io_u_sync_complete(td, io_u, NULL) < 0)
318 			return 1;
319 	} else if (ret == FIO_Q_BUSY) {
320 		if (td_io_commit(td))
321 			return 1;
322 		goto requeue;
323 	}
324 
325 	return 0;
326 }
327 
fio_file_fsync(struct thread_data * td,struct fio_file * f)328 static int fio_file_fsync(struct thread_data *td, struct fio_file *f)
329 {
330 	int ret;
331 
332 	if (fio_file_open(f))
333 		return fio_io_sync(td, f);
334 
335 	if (td_io_open_file(td, f))
336 		return 1;
337 
338 	ret = fio_io_sync(td, f);
339 	td_io_close_file(td, f);
340 	return ret;
341 }
342 
__update_tv_cache(struct thread_data * td)343 static inline void __update_tv_cache(struct thread_data *td)
344 {
345 	fio_gettime(&td->tv_cache, NULL);
346 }
347 
update_tv_cache(struct thread_data * td)348 static inline void update_tv_cache(struct thread_data *td)
349 {
350 	if ((++td->tv_cache_nr & td->tv_cache_mask) == td->tv_cache_mask)
351 		__update_tv_cache(td);
352 }
353 
runtime_exceeded(struct thread_data * td,struct timeval * t)354 static inline int runtime_exceeded(struct thread_data *td, struct timeval *t)
355 {
356 	if (in_ramp_time(td))
357 		return 0;
358 	if (!td->o.timeout)
359 		return 0;
360 	if (utime_since(&td->epoch, t) >= td->o.timeout)
361 		return 1;
362 
363 	return 0;
364 }
365 
break_on_this_error(struct thread_data * td,enum fio_ddir ddir,int * retptr)366 static int break_on_this_error(struct thread_data *td, enum fio_ddir ddir,
367 			       int *retptr)
368 {
369 	int ret = *retptr;
370 
371 	if (ret < 0 || td->error) {
372 		int err = td->error;
373 		enum error_type_bit eb;
374 
375 		if (ret < 0)
376 			err = -ret;
377 
378 		eb = td_error_type(ddir, err);
379 		if (!(td->o.continue_on_error & (1 << eb)))
380 			return 1;
381 
382 		if (td_non_fatal_error(td, eb, err)) {
383 		        /*
384 		         * Continue with the I/Os in case of
385 			 * a non fatal error.
386 			 */
387 			update_error_count(td, err);
388 			td_clear_error(td);
389 			*retptr = 0;
390 			return 0;
391 		} else if (td->o.fill_device && err == ENOSPC) {
392 			/*
393 			 * We expect to hit this error if
394 			 * fill_device option is set.
395 			 */
396 			td_clear_error(td);
397 			fio_mark_td_terminate(td);
398 			return 1;
399 		} else {
400 			/*
401 			 * Stop the I/O in case of a fatal
402 			 * error.
403 			 */
404 			update_error_count(td, err);
405 			return 1;
406 		}
407 	}
408 
409 	return 0;
410 }
411 
check_update_rusage(struct thread_data * td)412 static void check_update_rusage(struct thread_data *td)
413 {
414 	if (td->update_rusage) {
415 		td->update_rusage = 0;
416 		update_rusage_stat(td);
417 		fio_mutex_up(td->rusage_sem);
418 	}
419 }
420 
wait_for_completions(struct thread_data * td,struct timeval * time,uint64_t * bytes_done)421 static int wait_for_completions(struct thread_data *td, struct timeval *time,
422 				uint64_t *bytes_done)
423 {
424 	const int full = queue_full(td);
425 	int min_evts = 0;
426 	int ret;
427 
428 	/*
429 	 * if the queue is full, we MUST reap at least 1 event
430 	 */
431 	min_evts = min(td->o.iodepth_batch_complete, td->cur_depth);
432 	if (full && !min_evts)
433 		min_evts = 1;
434 
435 	if (time && (__should_check_rate(td, DDIR_READ) ||
436 	    __should_check_rate(td, DDIR_WRITE) ||
437 	    __should_check_rate(td, DDIR_TRIM)))
438 		fio_gettime(time, NULL);
439 
440 	do {
441 		ret = io_u_queued_complete(td, min_evts, bytes_done);
442 		if (ret < 0)
443 			break;
444 	} while (full && (td->cur_depth > td->o.iodepth_low));
445 
446 	return ret;
447 }
448 
449 /*
450  * The main verify engine. Runs over the writes we previously submitted,
451  * reads the blocks back in, and checks the crc/md5 of the data.
452  */
do_verify(struct thread_data * td,uint64_t verify_bytes)453 static void do_verify(struct thread_data *td, uint64_t verify_bytes)
454 {
455 	uint64_t bytes_done[DDIR_RWDIR_CNT] = { 0, 0, 0 };
456 	struct fio_file *f;
457 	struct io_u *io_u;
458 	int ret, min_events;
459 	unsigned int i;
460 
461 	dprint(FD_VERIFY, "starting loop\n");
462 
463 	/*
464 	 * sync io first and invalidate cache, to make sure we really
465 	 * read from disk.
466 	 */
467 	for_each_file(td, f, i) {
468 		if (!fio_file_open(f))
469 			continue;
470 		if (fio_io_sync(td, f))
471 			break;
472 		if (file_invalidate_cache(td, f))
473 			break;
474 	}
475 
476 	check_update_rusage(td);
477 
478 	if (td->error)
479 		return;
480 
481 	td_set_runstate(td, TD_VERIFYING);
482 
483 	io_u = NULL;
484 	while (!td->terminate) {
485 		enum fio_ddir ddir;
486 		int ret2, full;
487 
488 		update_tv_cache(td);
489 		check_update_rusage(td);
490 
491 		if (runtime_exceeded(td, &td->tv_cache)) {
492 			__update_tv_cache(td);
493 			if (runtime_exceeded(td, &td->tv_cache)) {
494 				fio_mark_td_terminate(td);
495 				break;
496 			}
497 		}
498 
499 		if (flow_threshold_exceeded(td))
500 			continue;
501 
502 		if (!td->o.experimental_verify) {
503 			io_u = __get_io_u(td);
504 			if (!io_u)
505 				break;
506 
507 			if (get_next_verify(td, io_u)) {
508 				put_io_u(td, io_u);
509 				break;
510 			}
511 
512 			if (td_io_prep(td, io_u)) {
513 				put_io_u(td, io_u);
514 				break;
515 			}
516 		} else {
517 			if (ddir_rw_sum(bytes_done) + td->o.rw_min_bs > verify_bytes)
518 				break;
519 
520 			while ((io_u = get_io_u(td)) != NULL) {
521 				if (IS_ERR(io_u)) {
522 					io_u = NULL;
523 					ret = FIO_Q_BUSY;
524 					goto reap;
525 				}
526 
527 				/*
528 				 * We are only interested in the places where
529 				 * we wrote or trimmed IOs. Turn those into
530 				 * reads for verification purposes.
531 				 */
532 				if (io_u->ddir == DDIR_READ) {
533 					/*
534 					 * Pretend we issued it for rwmix
535 					 * accounting
536 					 */
537 					td->io_issues[DDIR_READ]++;
538 					put_io_u(td, io_u);
539 					continue;
540 				} else if (io_u->ddir == DDIR_TRIM) {
541 					io_u->ddir = DDIR_READ;
542 					io_u->flags |= IO_U_F_TRIMMED;
543 					break;
544 				} else if (io_u->ddir == DDIR_WRITE) {
545 					io_u->ddir = DDIR_READ;
546 					break;
547 				} else {
548 					put_io_u(td, io_u);
549 					continue;
550 				}
551 			}
552 
553 			if (!io_u)
554 				break;
555 		}
556 
557 		if (verify_state_should_stop(td, io_u)) {
558 			put_io_u(td, io_u);
559 			break;
560 		}
561 
562 		if (td->o.verify_async)
563 			io_u->end_io = verify_io_u_async;
564 		else
565 			io_u->end_io = verify_io_u;
566 
567 		ddir = io_u->ddir;
568 		if (!td->o.disable_slat)
569 			fio_gettime(&io_u->start_time, NULL);
570 
571 		ret = td_io_queue(td, io_u);
572 		switch (ret) {
573 		case FIO_Q_COMPLETED:
574 			if (io_u->error) {
575 				ret = -io_u->error;
576 				clear_io_u(td, io_u);
577 			} else if (io_u->resid) {
578 				int bytes = io_u->xfer_buflen - io_u->resid;
579 
580 				/*
581 				 * zero read, fail
582 				 */
583 				if (!bytes) {
584 					td_verror(td, EIO, "full resid");
585 					put_io_u(td, io_u);
586 					break;
587 				}
588 
589 				io_u->xfer_buflen = io_u->resid;
590 				io_u->xfer_buf += bytes;
591 				io_u->offset += bytes;
592 
593 				if (ddir_rw(io_u->ddir))
594 					td->ts.short_io_u[io_u->ddir]++;
595 
596 				f = io_u->file;
597 				if (io_u->offset == f->real_file_size)
598 					goto sync_done;
599 
600 				requeue_io_u(td, &io_u);
601 			} else {
602 sync_done:
603 				ret = io_u_sync_complete(td, io_u, bytes_done);
604 				if (ret < 0)
605 					break;
606 			}
607 			continue;
608 		case FIO_Q_QUEUED:
609 			break;
610 		case FIO_Q_BUSY:
611 			requeue_io_u(td, &io_u);
612 			ret2 = td_io_commit(td);
613 			if (ret2 < 0)
614 				ret = ret2;
615 			break;
616 		default:
617 			assert(ret < 0);
618 			td_verror(td, -ret, "td_io_queue");
619 			break;
620 		}
621 
622 		if (break_on_this_error(td, ddir, &ret))
623 			break;
624 
625 		/*
626 		 * if we can queue more, do so. but check if there are
627 		 * completed io_u's first. Note that we can get BUSY even
628 		 * without IO queued, if the system is resource starved.
629 		 */
630 reap:
631 		full = queue_full(td) || (ret == FIO_Q_BUSY && td->cur_depth);
632 		if (full || !td->o.iodepth_batch_complete)
633 			ret = wait_for_completions(td, NULL, bytes_done);
634 
635 		if (ret < 0)
636 			break;
637 	}
638 
639 	check_update_rusage(td);
640 
641 	if (!td->error) {
642 		min_events = td->cur_depth;
643 
644 		if (min_events)
645 			ret = io_u_queued_complete(td, min_events, NULL);
646 	} else
647 		cleanup_pending_aio(td);
648 
649 	td_set_runstate(td, TD_RUNNING);
650 
651 	dprint(FD_VERIFY, "exiting loop\n");
652 }
653 
exceeds_number_ios(struct thread_data * td)654 static unsigned int exceeds_number_ios(struct thread_data *td)
655 {
656 	unsigned long long number_ios;
657 
658 	if (!td->o.number_ios)
659 		return 0;
660 
661 	number_ios = ddir_rw_sum(td->io_blocks);
662 	number_ios += td->io_u_queued + td->io_u_in_flight;
663 
664 	return number_ios >= (td->o.number_ios * td->loops);
665 }
666 
io_issue_bytes_exceeded(struct thread_data * td)667 static int io_issue_bytes_exceeded(struct thread_data *td)
668 {
669 	unsigned long long bytes, limit;
670 
671 	if (td_rw(td))
672 		bytes = td->io_issue_bytes[DDIR_READ] + td->io_issue_bytes[DDIR_WRITE];
673 	else if (td_write(td))
674 		bytes = td->io_issue_bytes[DDIR_WRITE];
675 	else if (td_read(td))
676 		bytes = td->io_issue_bytes[DDIR_READ];
677 	else
678 		bytes = td->io_issue_bytes[DDIR_TRIM];
679 
680 	if (td->o.io_limit)
681 		limit = td->o.io_limit;
682 	else
683 		limit = td->o.size;
684 
685 	limit *= td->loops;
686 	return bytes >= limit || exceeds_number_ios(td);
687 }
688 
io_complete_bytes_exceeded(struct thread_data * td)689 static int io_complete_bytes_exceeded(struct thread_data *td)
690 {
691 	unsigned long long bytes, limit;
692 
693 	if (td_rw(td))
694 		bytes = td->this_io_bytes[DDIR_READ] + td->this_io_bytes[DDIR_WRITE];
695 	else if (td_write(td))
696 		bytes = td->this_io_bytes[DDIR_WRITE];
697 	else if (td_read(td))
698 		bytes = td->this_io_bytes[DDIR_READ];
699 	else
700 		bytes = td->this_io_bytes[DDIR_TRIM];
701 
702 	if (td->o.io_limit)
703 		limit = td->o.io_limit;
704 	else
705 		limit = td->o.size;
706 
707 	limit *= td->loops;
708 	return bytes >= limit || exceeds_number_ios(td);
709 }
710 
711 /*
712  * Main IO worker function. It retrieves io_u's to process and queues
713  * and reaps them, checking for rate and errors along the way.
714  *
715  * Returns number of bytes written and trimmed.
716  */
do_io(struct thread_data * td)717 static uint64_t do_io(struct thread_data *td)
718 {
719 	uint64_t bytes_done[DDIR_RWDIR_CNT] = { 0, 0, 0 };
720 	unsigned int i;
721 	int ret = 0;
722 	uint64_t total_bytes, bytes_issued = 0;
723 
724 	if (in_ramp_time(td))
725 		td_set_runstate(td, TD_RAMP);
726 	else
727 		td_set_runstate(td, TD_RUNNING);
728 
729 	lat_target_init(td);
730 
731 	total_bytes = td->o.size;
732 	/*
733 	* Allow random overwrite workloads to write up to io_limit
734 	* before starting verification phase as 'size' doesn't apply.
735 	*/
736 	if (td_write(td) && td_random(td) && td->o.norandommap)
737 		total_bytes = max(total_bytes, (uint64_t) td->o.io_limit);
738 	/*
739 	 * If verify_backlog is enabled, we'll run the verify in this
740 	 * handler as well. For that case, we may need up to twice the
741 	 * amount of bytes.
742 	 */
743 	if (td->o.verify != VERIFY_NONE &&
744 	   (td_write(td) && td->o.verify_backlog))
745 		total_bytes += td->o.size;
746 
747 	while ((td->o.read_iolog_file && !flist_empty(&td->io_log_list)) ||
748 		(!flist_empty(&td->trim_list)) || !io_issue_bytes_exceeded(td) ||
749 		td->o.time_based) {
750 		struct timeval comp_time;
751 		struct io_u *io_u;
752 		int ret2, full;
753 		enum fio_ddir ddir;
754 
755 		check_update_rusage(td);
756 
757 		if (td->terminate || td->done)
758 			break;
759 
760 		update_tv_cache(td);
761 
762 		if (runtime_exceeded(td, &td->tv_cache)) {
763 			__update_tv_cache(td);
764 			if (runtime_exceeded(td, &td->tv_cache)) {
765 				fio_mark_td_terminate(td);
766 				break;
767 			}
768 		}
769 
770 		if (flow_threshold_exceeded(td))
771 			continue;
772 
773 		if (bytes_issued >= total_bytes)
774 			break;
775 
776 		io_u = get_io_u(td);
777 		if (IS_ERR_OR_NULL(io_u)) {
778 			int err = PTR_ERR(io_u);
779 
780 			io_u = NULL;
781 			if (err == -EBUSY) {
782 				ret = FIO_Q_BUSY;
783 				goto reap;
784 			}
785 			if (td->o.latency_target)
786 				goto reap;
787 			break;
788 		}
789 
790 		ddir = io_u->ddir;
791 
792 		/*
793 		 * Add verification end_io handler if:
794 		 *	- Asked to verify (!td_rw(td))
795 		 *	- Or the io_u is from our verify list (mixed write/ver)
796 		 */
797 		if (td->o.verify != VERIFY_NONE && io_u->ddir == DDIR_READ &&
798 		    ((io_u->flags & IO_U_F_VER_LIST) || !td_rw(td))) {
799 
800 			if (!td->o.verify_pattern_bytes) {
801 				io_u->rand_seed = __rand(&td->verify_state);
802 				if (sizeof(int) != sizeof(long *))
803 					io_u->rand_seed *= __rand(&td->verify_state);
804 			}
805 
806 			if (verify_state_should_stop(td, io_u)) {
807 				put_io_u(td, io_u);
808 				break;
809 			}
810 
811 			if (td->o.verify_async)
812 				io_u->end_io = verify_io_u_async;
813 			else
814 				io_u->end_io = verify_io_u;
815 			td_set_runstate(td, TD_VERIFYING);
816 		} else if (in_ramp_time(td))
817 			td_set_runstate(td, TD_RAMP);
818 		else
819 			td_set_runstate(td, TD_RUNNING);
820 
821 		/*
822 		 * Always log IO before it's issued, so we know the specific
823 		 * order of it. The logged unit will track when the IO has
824 		 * completed.
825 		 */
826 		if (td_write(td) && io_u->ddir == DDIR_WRITE &&
827 		    td->o.do_verify &&
828 		    td->o.verify != VERIFY_NONE &&
829 		    !td->o.experimental_verify)
830 			log_io_piece(td, io_u);
831 
832 		ret = td_io_queue(td, io_u);
833 		switch (ret) {
834 		case FIO_Q_COMPLETED:
835 			if (io_u->error) {
836 				ret = -io_u->error;
837 				unlog_io_piece(td, io_u);
838 				clear_io_u(td, io_u);
839 			} else if (io_u->resid) {
840 				int bytes = io_u->xfer_buflen - io_u->resid;
841 				struct fio_file *f = io_u->file;
842 
843 				bytes_issued += bytes;
844 
845 				trim_io_piece(td, io_u);
846 
847 				/*
848 				 * zero read, fail
849 				 */
850 				if (!bytes) {
851 					unlog_io_piece(td, io_u);
852 					td_verror(td, EIO, "full resid");
853 					put_io_u(td, io_u);
854 					break;
855 				}
856 
857 				io_u->xfer_buflen = io_u->resid;
858 				io_u->xfer_buf += bytes;
859 				io_u->offset += bytes;
860 
861 				if (ddir_rw(io_u->ddir))
862 					td->ts.short_io_u[io_u->ddir]++;
863 
864 				if (io_u->offset == f->real_file_size)
865 					goto sync_done;
866 
867 				requeue_io_u(td, &io_u);
868 			} else {
869 sync_done:
870 				if (__should_check_rate(td, DDIR_READ) ||
871 				    __should_check_rate(td, DDIR_WRITE) ||
872 				    __should_check_rate(td, DDIR_TRIM))
873 					fio_gettime(&comp_time, NULL);
874 
875 				ret = io_u_sync_complete(td, io_u, bytes_done);
876 				if (ret < 0)
877 					break;
878 				bytes_issued += io_u->xfer_buflen;
879 			}
880 			break;
881 		case FIO_Q_QUEUED:
882 			/*
883 			 * if the engine doesn't have a commit hook,
884 			 * the io_u is really queued. if it does have such
885 			 * a hook, it has to call io_u_queued() itself.
886 			 */
887 			if (td->io_ops->commit == NULL)
888 				io_u_queued(td, io_u);
889 			bytes_issued += io_u->xfer_buflen;
890 			break;
891 		case FIO_Q_BUSY:
892 			unlog_io_piece(td, io_u);
893 			requeue_io_u(td, &io_u);
894 			ret2 = td_io_commit(td);
895 			if (ret2 < 0)
896 				ret = ret2;
897 			break;
898 		default:
899 			assert(ret < 0);
900 			put_io_u(td, io_u);
901 			break;
902 		}
903 
904 		if (break_on_this_error(td, ddir, &ret))
905 			break;
906 
907 		/*
908 		 * See if we need to complete some commands. Note that we
909 		 * can get BUSY even without IO queued, if the system is
910 		 * resource starved.
911 		 */
912 reap:
913 		full = queue_full(td) || (ret == FIO_Q_BUSY && td->cur_depth);
914 		if (full || !td->o.iodepth_batch_complete)
915 			ret = wait_for_completions(td, &comp_time, bytes_done);
916 		if (ret < 0)
917 			break;
918 		if (!ddir_rw_sum(bytes_done) && !(td->io_ops->flags & FIO_NOIO))
919 			continue;
920 
921 		if (!in_ramp_time(td) && should_check_rate(td, bytes_done)) {
922 			if (check_min_rate(td, &comp_time, bytes_done)) {
923 				if (exitall_on_terminate)
924 					fio_terminate_threads(td->groupid);
925 				td_verror(td, EIO, "check_min_rate");
926 				break;
927 			}
928 		}
929 		if (!in_ramp_time(td) && td->o.latency_target)
930 			lat_target_check(td);
931 
932 		if (td->o.thinktime) {
933 			unsigned long long b;
934 
935 			b = ddir_rw_sum(td->io_blocks);
936 			if (!(b % td->o.thinktime_blocks)) {
937 				int left;
938 
939 				io_u_quiesce(td);
940 
941 				if (td->o.thinktime_spin)
942 					usec_spin(td->o.thinktime_spin);
943 
944 				left = td->o.thinktime - td->o.thinktime_spin;
945 				if (left)
946 					usec_sleep(td, left);
947 			}
948 		}
949 	}
950 
951 	check_update_rusage(td);
952 
953 	if (td->trim_entries)
954 		log_err("fio: %lu trim entries leaked?\n", td->trim_entries);
955 
956 	if (td->o.fill_device && td->error == ENOSPC) {
957 		td->error = 0;
958 		fio_mark_td_terminate(td);
959 	}
960 	if (!td->error) {
961 		struct fio_file *f;
962 
963 		i = td->cur_depth;
964 		if (i) {
965 			ret = io_u_queued_complete(td, i, bytes_done);
966 			if (td->o.fill_device && td->error == ENOSPC)
967 				td->error = 0;
968 		}
969 
970 		if (should_fsync(td) && td->o.end_fsync) {
971 			td_set_runstate(td, TD_FSYNCING);
972 
973 			for_each_file(td, f, i) {
974 				if (!fio_file_fsync(td, f))
975 					continue;
976 
977 				log_err("fio: end_fsync failed for file %s\n",
978 								f->file_name);
979 			}
980 		}
981 	} else
982 		cleanup_pending_aio(td);
983 
984 	/*
985 	 * stop job if we failed doing any IO
986 	 */
987 	if (!ddir_rw_sum(td->this_io_bytes))
988 		td->done = 1;
989 
990 	return bytes_done[DDIR_WRITE] + bytes_done[DDIR_TRIM];
991 }
992 
cleanup_io_u(struct thread_data * td)993 static void cleanup_io_u(struct thread_data *td)
994 {
995 	struct io_u *io_u;
996 
997 	while ((io_u = io_u_qpop(&td->io_u_freelist)) != NULL) {
998 
999 		if (td->io_ops->io_u_free)
1000 			td->io_ops->io_u_free(td, io_u);
1001 
1002 		fio_memfree(io_u, sizeof(*io_u));
1003 	}
1004 
1005 	free_io_mem(td);
1006 
1007 	io_u_rexit(&td->io_u_requeues);
1008 	io_u_qexit(&td->io_u_freelist);
1009 	io_u_qexit(&td->io_u_all);
1010 
1011 	if (td->last_write_comp)
1012 		sfree(td->last_write_comp);
1013 }
1014 
init_io_u(struct thread_data * td)1015 static int init_io_u(struct thread_data *td)
1016 {
1017 	struct io_u *io_u;
1018 	unsigned int max_bs, min_write;
1019 	int cl_align, i, max_units;
1020 	int data_xfer = 1, err;
1021 	char *p;
1022 
1023 	max_units = td->o.iodepth;
1024 	max_bs = td_max_bs(td);
1025 	min_write = td->o.min_bs[DDIR_WRITE];
1026 	td->orig_buffer_size = (unsigned long long) max_bs
1027 					* (unsigned long long) max_units;
1028 
1029 	if ((td->io_ops->flags & FIO_NOIO) || !(td_read(td) || td_write(td)))
1030 		data_xfer = 0;
1031 
1032 	err = 0;
1033 	err += io_u_rinit(&td->io_u_requeues, td->o.iodepth);
1034 	err += io_u_qinit(&td->io_u_freelist, td->o.iodepth);
1035 	err += io_u_qinit(&td->io_u_all, td->o.iodepth);
1036 
1037 	if (err) {
1038 		log_err("fio: failed setting up IO queues\n");
1039 		return 1;
1040 	}
1041 
1042 	/*
1043 	 * if we may later need to do address alignment, then add any
1044 	 * possible adjustment here so that we don't cause a buffer
1045 	 * overflow later. this adjustment may be too much if we get
1046 	 * lucky and the allocator gives us an aligned address.
1047 	 */
1048 	if (td->o.odirect || td->o.mem_align || td->o.oatomic ||
1049 	    (td->io_ops->flags & FIO_RAWIO))
1050 		td->orig_buffer_size += page_mask + td->o.mem_align;
1051 
1052 	if (td->o.mem_type == MEM_SHMHUGE || td->o.mem_type == MEM_MMAPHUGE) {
1053 		unsigned long bs;
1054 
1055 		bs = td->orig_buffer_size + td->o.hugepage_size - 1;
1056 		td->orig_buffer_size = bs & ~(td->o.hugepage_size - 1);
1057 	}
1058 
1059 	if (td->orig_buffer_size != (size_t) td->orig_buffer_size) {
1060 		log_err("fio: IO memory too large. Reduce max_bs or iodepth\n");
1061 		return 1;
1062 	}
1063 
1064 	if (data_xfer && allocate_io_mem(td))
1065 		return 1;
1066 
1067 	if (td->o.odirect || td->o.mem_align || td->o.oatomic ||
1068 	    (td->io_ops->flags & FIO_RAWIO))
1069 		p = PAGE_ALIGN(td->orig_buffer) + td->o.mem_align;
1070 	else
1071 		p = td->orig_buffer;
1072 
1073 	cl_align = os_cache_line_size();
1074 
1075 	for (i = 0; i < max_units; i++) {
1076 		void *ptr;
1077 
1078 		if (td->terminate)
1079 			return 1;
1080 
1081 		ptr = fio_memalign(cl_align, sizeof(*io_u));
1082 		if (!ptr) {
1083 			log_err("fio: unable to allocate aligned memory\n");
1084 			break;
1085 		}
1086 
1087 		io_u = ptr;
1088 		memset(io_u, 0, sizeof(*io_u));
1089 		INIT_FLIST_HEAD(&io_u->verify_list);
1090 		dprint(FD_MEM, "io_u alloc %p, index %u\n", io_u, i);
1091 
1092 		if (data_xfer) {
1093 			io_u->buf = p;
1094 			dprint(FD_MEM, "io_u %p, mem %p\n", io_u, io_u->buf);
1095 
1096 			if (td_write(td))
1097 				io_u_fill_buffer(td, io_u, min_write, max_bs);
1098 			if (td_write(td) && td->o.verify_pattern_bytes) {
1099 				/*
1100 				 * Fill the buffer with the pattern if we are
1101 				 * going to be doing writes.
1102 				 */
1103 				fill_verify_pattern(td, io_u->buf, max_bs, io_u, 0, 0);
1104 			}
1105 		}
1106 
1107 		io_u->index = i;
1108 		io_u->flags = IO_U_F_FREE;
1109 		io_u_qpush(&td->io_u_freelist, io_u);
1110 
1111 		/*
1112 		 * io_u never leaves this stack, used for iteration of all
1113 		 * io_u buffers.
1114 		 */
1115 		io_u_qpush(&td->io_u_all, io_u);
1116 
1117 		if (td->io_ops->io_u_init) {
1118 			int ret = td->io_ops->io_u_init(td, io_u);
1119 
1120 			if (ret) {
1121 				log_err("fio: failed to init engine data: %d\n", ret);
1122 				return 1;
1123 			}
1124 		}
1125 
1126 		p += max_bs;
1127 	}
1128 
1129 	if (td->o.verify != VERIFY_NONE) {
1130 		td->last_write_comp = scalloc(max_units, sizeof(uint64_t));
1131 		if (!td->last_write_comp) {
1132 			log_err("fio: failed to alloc write comp data\n");
1133 			return 1;
1134 		}
1135 	}
1136 
1137 	return 0;
1138 }
1139 
switch_ioscheduler(struct thread_data * td)1140 static int switch_ioscheduler(struct thread_data *td)
1141 {
1142 	char tmp[256], tmp2[128];
1143 	FILE *f;
1144 	int ret;
1145 
1146 	if (td->io_ops->flags & FIO_DISKLESSIO)
1147 		return 0;
1148 
1149 	sprintf(tmp, "%s/queue/scheduler", td->sysfs_root);
1150 
1151 	f = fopen(tmp, "r+");
1152 	if (!f) {
1153 		if (errno == ENOENT) {
1154 			log_err("fio: os or kernel doesn't support IO scheduler"
1155 				" switching\n");
1156 			return 0;
1157 		}
1158 		td_verror(td, errno, "fopen iosched");
1159 		return 1;
1160 	}
1161 
1162 	/*
1163 	 * Set io scheduler.
1164 	 */
1165 	ret = fwrite(td->o.ioscheduler, strlen(td->o.ioscheduler), 1, f);
1166 	if (ferror(f) || ret != 1) {
1167 		td_verror(td, errno, "fwrite");
1168 		fclose(f);
1169 		return 1;
1170 	}
1171 
1172 	rewind(f);
1173 
1174 	/*
1175 	 * Read back and check that the selected scheduler is now the default.
1176 	 */
1177 	ret = fread(tmp, sizeof(tmp), 1, f);
1178 	if (ferror(f) || ret < 0) {
1179 		td_verror(td, errno, "fread");
1180 		fclose(f);
1181 		return 1;
1182 	}
1183 	tmp[sizeof(tmp) - 1] = '\0';
1184 
1185 
1186 	sprintf(tmp2, "[%s]", td->o.ioscheduler);
1187 	if (!strstr(tmp, tmp2)) {
1188 		log_err("fio: io scheduler %s not found\n", td->o.ioscheduler);
1189 		td_verror(td, EINVAL, "iosched_switch");
1190 		fclose(f);
1191 		return 1;
1192 	}
1193 
1194 	fclose(f);
1195 	return 0;
1196 }
1197 
keep_running(struct thread_data * td)1198 static int keep_running(struct thread_data *td)
1199 {
1200 	unsigned long long limit;
1201 
1202 	if (td->done)
1203 		return 0;
1204 	if (td->o.time_based)
1205 		return 1;
1206 	if (td->o.loops) {
1207 		td->o.loops--;
1208 		return 1;
1209 	}
1210 	if (exceeds_number_ios(td))
1211 		return 0;
1212 
1213 	if (td->o.io_limit)
1214 		limit = td->o.io_limit;
1215 	else
1216 		limit = td->o.size;
1217 
1218 	if (limit != -1ULL && ddir_rw_sum(td->io_bytes) < limit) {
1219 		uint64_t diff;
1220 
1221 		/*
1222 		 * If the difference is less than the minimum IO size, we
1223 		 * are done.
1224 		 */
1225 		diff = limit - ddir_rw_sum(td->io_bytes);
1226 		if (diff < td_max_bs(td))
1227 			return 0;
1228 
1229 		if (fio_files_done(td))
1230 			return 0;
1231 
1232 		return 1;
1233 	}
1234 
1235 	return 0;
1236 }
1237 
exec_string(struct thread_options * o,const char * string,const char * mode)1238 static int exec_string(struct thread_options *o, const char *string, const char *mode)
1239 {
1240 	int ret, newlen = strlen(string) + strlen(o->name) + strlen(mode) + 9 + 1;
1241 	char *str;
1242 
1243 	str = malloc(newlen);
1244 	sprintf(str, "%s &> %s.%s.txt", string, o->name, mode);
1245 
1246 	log_info("%s : Saving output of %s in %s.%s.txt\n",o->name, mode, o->name, mode);
1247 	ret = system(str);
1248 	if (ret == -1)
1249 		log_err("fio: exec of cmd <%s> failed\n", str);
1250 
1251 	free(str);
1252 	return ret;
1253 }
1254 
1255 /*
1256  * Dry run to compute correct state of numberio for verification.
1257  */
do_dry_run(struct thread_data * td)1258 static uint64_t do_dry_run(struct thread_data *td)
1259 {
1260 	uint64_t bytes_done[DDIR_RWDIR_CNT] = { 0, 0, 0 };
1261 
1262 	td_set_runstate(td, TD_RUNNING);
1263 
1264 	while ((td->o.read_iolog_file && !flist_empty(&td->io_log_list)) ||
1265 		(!flist_empty(&td->trim_list)) || !io_complete_bytes_exceeded(td)) {
1266 		struct io_u *io_u;
1267 		int ret;
1268 
1269 		if (td->terminate || td->done)
1270 			break;
1271 
1272 		io_u = get_io_u(td);
1273 		if (!io_u)
1274 			break;
1275 
1276 		io_u->flags |= IO_U_F_FLIGHT;
1277 		io_u->error = 0;
1278 		io_u->resid = 0;
1279 		if (ddir_rw(acct_ddir(io_u)))
1280 			td->io_issues[acct_ddir(io_u)]++;
1281 		if (ddir_rw(io_u->ddir)) {
1282 			io_u_mark_depth(td, 1);
1283 			td->ts.total_io_u[io_u->ddir]++;
1284 		}
1285 
1286 		if (td_write(td) && io_u->ddir == DDIR_WRITE &&
1287 		    td->o.do_verify &&
1288 		    td->o.verify != VERIFY_NONE &&
1289 		    !td->o.experimental_verify)
1290 			log_io_piece(td, io_u);
1291 
1292 		ret = io_u_sync_complete(td, io_u, bytes_done);
1293 		(void) ret;
1294 	}
1295 
1296 	return bytes_done[DDIR_WRITE] + bytes_done[DDIR_TRIM];
1297 }
1298 
1299 /*
1300  * Entry point for the thread based jobs. The process based jobs end up
1301  * here as well, after a little setup.
1302  */
thread_main(void * data)1303 static void *thread_main(void *data)
1304 {
1305 	unsigned long long elapsed;
1306 	struct thread_data *td = data;
1307 	struct thread_options *o = &td->o;
1308 	pthread_condattr_t attr;
1309 	int clear_state;
1310 	int ret;
1311 
1312 	if (!o->use_thread) {
1313 		setsid();
1314 		td->pid = getpid();
1315 	} else
1316 		td->pid = gettid();
1317 
1318 	fio_local_clock_init(o->use_thread);
1319 
1320 	dprint(FD_PROCESS, "jobs pid=%d started\n", (int) td->pid);
1321 
1322 	if (is_backend)
1323 		fio_server_send_start(td);
1324 
1325 	INIT_FLIST_HEAD(&td->io_log_list);
1326 	INIT_FLIST_HEAD(&td->io_hist_list);
1327 	INIT_FLIST_HEAD(&td->verify_list);
1328 	INIT_FLIST_HEAD(&td->trim_list);
1329 	INIT_FLIST_HEAD(&td->next_rand_list);
1330 	pthread_mutex_init(&td->io_u_lock, NULL);
1331 	td->io_hist_tree = RB_ROOT;
1332 
1333 	pthread_condattr_init(&attr);
1334 	pthread_cond_init(&td->verify_cond, &attr);
1335 	pthread_cond_init(&td->free_cond, &attr);
1336 
1337 	td_set_runstate(td, TD_INITIALIZED);
1338 	dprint(FD_MUTEX, "up startup_mutex\n");
1339 	fio_mutex_up(startup_mutex);
1340 	dprint(FD_MUTEX, "wait on td->mutex\n");
1341 	fio_mutex_down(td->mutex);
1342 	dprint(FD_MUTEX, "done waiting on td->mutex\n");
1343 
1344 	/*
1345 	 * A new gid requires privilege, so we need to do this before setting
1346 	 * the uid.
1347 	 */
1348 	if (o->gid != -1U && setgid(o->gid)) {
1349 		td_verror(td, errno, "setgid");
1350 		goto err;
1351 	}
1352 	if (o->uid != -1U && setuid(o->uid)) {
1353 		td_verror(td, errno, "setuid");
1354 		goto err;
1355 	}
1356 
1357 	/*
1358 	 * If we have a gettimeofday() thread, make sure we exclude that
1359 	 * thread from this job
1360 	 */
1361 	if (o->gtod_cpu)
1362 		fio_cpu_clear(&o->cpumask, o->gtod_cpu);
1363 
1364 	/*
1365 	 * Set affinity first, in case it has an impact on the memory
1366 	 * allocations.
1367 	 */
1368 	if (fio_option_is_set(o, cpumask)) {
1369 		if (o->cpus_allowed_policy == FIO_CPUS_SPLIT) {
1370 			ret = fio_cpus_split(&o->cpumask, td->thread_number - 1);
1371 			if (!ret) {
1372 				log_err("fio: no CPUs set\n");
1373 				log_err("fio: Try increasing number of available CPUs\n");
1374 				td_verror(td, EINVAL, "cpus_split");
1375 				goto err;
1376 			}
1377 		}
1378 		ret = fio_setaffinity(td->pid, o->cpumask);
1379 		if (ret == -1) {
1380 			td_verror(td, errno, "cpu_set_affinity");
1381 			goto err;
1382 		}
1383 	}
1384 
1385 #ifdef CONFIG_LIBNUMA
1386 	/* numa node setup */
1387 	if (fio_option_is_set(o, numa_cpunodes) ||
1388 	    fio_option_is_set(o, numa_memnodes)) {
1389 		struct bitmask *mask;
1390 
1391 		if (numa_available() < 0) {
1392 			td_verror(td, errno, "Does not support NUMA API\n");
1393 			goto err;
1394 		}
1395 
1396 		if (fio_option_is_set(o, numa_cpunodes)) {
1397 			mask = numa_parse_nodestring(o->numa_cpunodes);
1398 			ret = numa_run_on_node_mask(mask);
1399 			numa_free_nodemask(mask);
1400 			if (ret == -1) {
1401 				td_verror(td, errno, \
1402 					"numa_run_on_node_mask failed\n");
1403 				goto err;
1404 			}
1405 		}
1406 
1407 		if (fio_option_is_set(o, numa_memnodes)) {
1408 			mask = NULL;
1409 			if (o->numa_memnodes)
1410 				mask = numa_parse_nodestring(o->numa_memnodes);
1411 
1412 			switch (o->numa_mem_mode) {
1413 			case MPOL_INTERLEAVE:
1414 				numa_set_interleave_mask(mask);
1415 				break;
1416 			case MPOL_BIND:
1417 				numa_set_membind(mask);
1418 				break;
1419 			case MPOL_LOCAL:
1420 				numa_set_localalloc();
1421 				break;
1422 			case MPOL_PREFERRED:
1423 				numa_set_preferred(o->numa_mem_prefer_node);
1424 				break;
1425 			case MPOL_DEFAULT:
1426 			default:
1427 				break;
1428 			}
1429 
1430 			if (mask)
1431 				numa_free_nodemask(mask);
1432 
1433 		}
1434 	}
1435 #endif
1436 
1437 	if (fio_pin_memory(td))
1438 		goto err;
1439 
1440 	/*
1441 	 * May alter parameters that init_io_u() will use, so we need to
1442 	 * do this first.
1443 	 */
1444 	if (init_iolog(td))
1445 		goto err;
1446 
1447 	if (init_io_u(td))
1448 		goto err;
1449 
1450 	if (o->verify_async && verify_async_init(td))
1451 		goto err;
1452 
1453 	if (fio_option_is_set(o, ioprio) ||
1454 	    fio_option_is_set(o, ioprio_class)) {
1455 		ret = ioprio_set(IOPRIO_WHO_PROCESS, 0, o->ioprio_class, o->ioprio);
1456 		if (ret == -1) {
1457 			td_verror(td, errno, "ioprio_set");
1458 			goto err;
1459 		}
1460 	}
1461 
1462 	if (o->cgroup && cgroup_setup(td, cgroup_list, &cgroup_mnt))
1463 		goto err;
1464 
1465 	errno = 0;
1466 	if (nice(o->nice) == -1 && errno != 0) {
1467 		td_verror(td, errno, "nice");
1468 		goto err;
1469 	}
1470 
1471 	if (o->ioscheduler && switch_ioscheduler(td))
1472 		goto err;
1473 
1474 	if (!o->create_serialize && setup_files(td))
1475 		goto err;
1476 
1477 	if (td_io_init(td))
1478 		goto err;
1479 
1480 	if (init_random_map(td))
1481 		goto err;
1482 
1483 	if (o->exec_prerun && exec_string(o, o->exec_prerun, (const char *)"prerun"))
1484 		goto err;
1485 
1486 	if (o->pre_read) {
1487 		if (pre_read_files(td) < 0)
1488 			goto err;
1489 	}
1490 
1491 	if (td->flags & TD_F_COMPRESS_LOG)
1492 		tp_init(&td->tp_data);
1493 
1494 	fio_verify_init(td);
1495 
1496 	fio_gettime(&td->epoch, NULL);
1497 	fio_getrusage(&td->ru_start);
1498 	clear_state = 0;
1499 	while (keep_running(td)) {
1500 		uint64_t verify_bytes;
1501 
1502 		fio_gettime(&td->start, NULL);
1503 		memcpy(&td->bw_sample_time, &td->start, sizeof(td->start));
1504 		memcpy(&td->iops_sample_time, &td->start, sizeof(td->start));
1505 		memcpy(&td->tv_cache, &td->start, sizeof(td->start));
1506 
1507 		if (o->ratemin[DDIR_READ] || o->ratemin[DDIR_WRITE] ||
1508 				o->ratemin[DDIR_TRIM]) {
1509 		        memcpy(&td->lastrate[DDIR_READ], &td->bw_sample_time,
1510 						sizeof(td->bw_sample_time));
1511 		        memcpy(&td->lastrate[DDIR_WRITE], &td->bw_sample_time,
1512 						sizeof(td->bw_sample_time));
1513 		        memcpy(&td->lastrate[DDIR_TRIM], &td->bw_sample_time,
1514 						sizeof(td->bw_sample_time));
1515 		}
1516 
1517 		if (clear_state)
1518 			clear_io_state(td);
1519 
1520 		prune_io_piece_log(td);
1521 
1522 		if (td->o.verify_only && (td_write(td) || td_rw(td)))
1523 			verify_bytes = do_dry_run(td);
1524 		else
1525 			verify_bytes = do_io(td);
1526 
1527 		clear_state = 1;
1528 
1529 		fio_mutex_down(stat_mutex);
1530 		if (td_read(td) && td->io_bytes[DDIR_READ]) {
1531 			elapsed = mtime_since_now(&td->start);
1532 			td->ts.runtime[DDIR_READ] += elapsed;
1533 		}
1534 		if (td_write(td) && td->io_bytes[DDIR_WRITE]) {
1535 			elapsed = mtime_since_now(&td->start);
1536 			td->ts.runtime[DDIR_WRITE] += elapsed;
1537 		}
1538 		if (td_trim(td) && td->io_bytes[DDIR_TRIM]) {
1539 			elapsed = mtime_since_now(&td->start);
1540 			td->ts.runtime[DDIR_TRIM] += elapsed;
1541 		}
1542 		fio_gettime(&td->start, NULL);
1543 		fio_mutex_up(stat_mutex);
1544 
1545 		if (td->error || td->terminate)
1546 			break;
1547 
1548 		if (!o->do_verify ||
1549 		    o->verify == VERIFY_NONE ||
1550 		    (td->io_ops->flags & FIO_UNIDIR))
1551 			continue;
1552 
1553 		clear_io_state(td);
1554 
1555 		fio_gettime(&td->start, NULL);
1556 
1557 		do_verify(td, verify_bytes);
1558 
1559 		fio_mutex_down(stat_mutex);
1560 		td->ts.runtime[DDIR_READ] += mtime_since_now(&td->start);
1561 		fio_gettime(&td->start, NULL);
1562 		fio_mutex_up(stat_mutex);
1563 
1564 		if (td->error || td->terminate)
1565 			break;
1566 	}
1567 
1568 	update_rusage_stat(td);
1569 	td->ts.total_run_time = mtime_since_now(&td->epoch);
1570 	td->ts.io_bytes[DDIR_READ] = td->io_bytes[DDIR_READ];
1571 	td->ts.io_bytes[DDIR_WRITE] = td->io_bytes[DDIR_WRITE];
1572 	td->ts.io_bytes[DDIR_TRIM] = td->io_bytes[DDIR_TRIM];
1573 
1574 	if (td->o.verify_state_save && !(td->flags & TD_F_VSTATE_SAVED) &&
1575 	    (td->o.verify != VERIFY_NONE && td_write(td))) {
1576 		struct all_io_list *state;
1577 		size_t sz;
1578 
1579 		state = get_all_io_list(td->thread_number, &sz);
1580 		if (state) {
1581 			__verify_save_state(state, "local");
1582 			free(state);
1583 		}
1584 	}
1585 
1586 	fio_unpin_memory(td);
1587 
1588 	fio_writeout_logs(td);
1589 
1590 	if (td->flags & TD_F_COMPRESS_LOG)
1591 		tp_exit(&td->tp_data);
1592 
1593 	if (o->exec_postrun)
1594 		exec_string(o, o->exec_postrun, (const char *)"postrun");
1595 
1596 	if (exitall_on_terminate)
1597 		fio_terminate_threads(td->groupid);
1598 
1599 err:
1600 	if (td->error)
1601 		log_info("fio: pid=%d, err=%d/%s\n", (int) td->pid, td->error,
1602 							td->verror);
1603 
1604 	if (o->verify_async)
1605 		verify_async_exit(td);
1606 
1607 	close_and_free_files(td);
1608 	cleanup_io_u(td);
1609 	close_ioengine(td);
1610 	cgroup_shutdown(td, &cgroup_mnt);
1611 	verify_free_state(td);
1612 
1613 	if (fio_option_is_set(o, cpumask)) {
1614 		ret = fio_cpuset_exit(&o->cpumask);
1615 		if (ret)
1616 			td_verror(td, ret, "fio_cpuset_exit");
1617 	}
1618 
1619 	/*
1620 	 * do this very late, it will log file closing as well
1621 	 */
1622 	if (o->write_iolog_file)
1623 		write_iolog_close(td);
1624 
1625 	fio_mutex_remove(td->mutex);
1626 	td->mutex = NULL;
1627 
1628 	td_set_runstate(td, TD_EXITED);
1629 
1630 	/*
1631 	 * Do this last after setting our runstate to exited, so we
1632 	 * know that the stat thread is signaled.
1633 	 */
1634 	check_update_rusage(td);
1635 
1636 	return (void *) (uintptr_t) td->error;
1637 }
1638 
1639 
1640 /*
1641  * We cannot pass the td data into a forked process, so attach the td and
1642  * pass it to the thread worker.
1643  */
fork_main(int shmid,int offset)1644 static int fork_main(int shmid, int offset)
1645 {
1646 	struct thread_data *td;
1647 	void *data, *ret;
1648 
1649 #if !defined(__hpux) && !defined(CONFIG_NO_SHM)
1650 	data = shmat(shmid, NULL, 0);
1651 	if (data == (void *) -1) {
1652 		int __err = errno;
1653 
1654 		perror("shmat");
1655 		return __err;
1656 	}
1657 #else
1658 	/*
1659 	 * HP-UX inherits shm mappings?
1660 	 */
1661 	data = threads;
1662 #endif
1663 
1664 	td = data + offset * sizeof(struct thread_data);
1665 	ret = thread_main(td);
1666 	shmdt(data);
1667 	return (int) (uintptr_t) ret;
1668 }
1669 
dump_td_info(struct thread_data * td)1670 static void dump_td_info(struct thread_data *td)
1671 {
1672 	log_err("fio: job '%s' hasn't exited in %lu seconds, it appears to "
1673 		"be stuck. Doing forceful exit of this job.\n", td->o.name,
1674 			(unsigned long) time_since_now(&td->terminate_time));
1675 }
1676 
1677 /*
1678  * Run over the job map and reap the threads that have exited, if any.
1679  */
reap_threads(unsigned int * nr_running,unsigned int * t_rate,unsigned int * m_rate)1680 static void reap_threads(unsigned int *nr_running, unsigned int *t_rate,
1681 			 unsigned int *m_rate)
1682 {
1683 	struct thread_data *td;
1684 	unsigned int cputhreads, realthreads, pending;
1685 	int i, status, ret;
1686 
1687 	/*
1688 	 * reap exited threads (TD_EXITED -> TD_REAPED)
1689 	 */
1690 	realthreads = pending = cputhreads = 0;
1691 	for_each_td(td, i) {
1692 		int flags = 0;
1693 
1694 		/*
1695 		 * ->io_ops is NULL for a thread that has closed its
1696 		 * io engine
1697 		 */
1698 		if (td->io_ops && !strcmp(td->io_ops->name, "cpuio"))
1699 			cputhreads++;
1700 		else
1701 			realthreads++;
1702 
1703 		if (!td->pid) {
1704 			pending++;
1705 			continue;
1706 		}
1707 		if (td->runstate == TD_REAPED)
1708 			continue;
1709 		if (td->o.use_thread) {
1710 			if (td->runstate == TD_EXITED) {
1711 				td_set_runstate(td, TD_REAPED);
1712 				goto reaped;
1713 			}
1714 			continue;
1715 		}
1716 
1717 		flags = WNOHANG;
1718 		if (td->runstate == TD_EXITED)
1719 			flags = 0;
1720 
1721 		/*
1722 		 * check if someone quit or got killed in an unusual way
1723 		 */
1724 		ret = waitpid(td->pid, &status, flags);
1725 		if (ret < 0) {
1726 			if (errno == ECHILD) {
1727 				log_err("fio: pid=%d disappeared %d\n",
1728 						(int) td->pid, td->runstate);
1729 				td->sig = ECHILD;
1730 				td_set_runstate(td, TD_REAPED);
1731 				goto reaped;
1732 			}
1733 			perror("waitpid");
1734 		} else if (ret == td->pid) {
1735 			if (WIFSIGNALED(status)) {
1736 				int sig = WTERMSIG(status);
1737 
1738 				if (sig != SIGTERM && sig != SIGUSR2)
1739 					log_err("fio: pid=%d, got signal=%d\n",
1740 							(int) td->pid, sig);
1741 				td->sig = sig;
1742 				td_set_runstate(td, TD_REAPED);
1743 				goto reaped;
1744 			}
1745 			if (WIFEXITED(status)) {
1746 				if (WEXITSTATUS(status) && !td->error)
1747 					td->error = WEXITSTATUS(status);
1748 
1749 				td_set_runstate(td, TD_REAPED);
1750 				goto reaped;
1751 			}
1752 		}
1753 
1754 		/*
1755 		 * If the job is stuck, do a forceful timeout of it and
1756 		 * move on.
1757 		 */
1758 		if (td->terminate &&
1759 		    time_since_now(&td->terminate_time) >= FIO_REAP_TIMEOUT) {
1760 			dump_td_info(td);
1761 			td_set_runstate(td, TD_REAPED);
1762 			goto reaped;
1763 		}
1764 
1765 		/*
1766 		 * thread is not dead, continue
1767 		 */
1768 		pending++;
1769 		continue;
1770 reaped:
1771 		(*nr_running)--;
1772 		(*m_rate) -= ddir_rw_sum(td->o.ratemin);
1773 		(*t_rate) -= ddir_rw_sum(td->o.rate);
1774 		if (!td->pid)
1775 			pending--;
1776 
1777 		if (td->error)
1778 			exit_value++;
1779 
1780 		done_secs += mtime_since_now(&td->epoch) / 1000;
1781 		profile_td_exit(td);
1782 	}
1783 
1784 	if (*nr_running == cputhreads && !pending && realthreads)
1785 		fio_terminate_threads(TERMINATE_ALL);
1786 }
1787 
__check_trigger_file(void)1788 static int __check_trigger_file(void)
1789 {
1790 	struct stat sb;
1791 
1792 	if (!trigger_file)
1793 		return 0;
1794 
1795 	if (stat(trigger_file, &sb))
1796 		return 0;
1797 
1798 	if (unlink(trigger_file) < 0)
1799 		log_err("fio: failed to unlink %s: %s\n", trigger_file,
1800 							strerror(errno));
1801 
1802 	return 1;
1803 }
1804 
trigger_timedout(void)1805 static int trigger_timedout(void)
1806 {
1807 	if (trigger_timeout)
1808 		return time_since_genesis() >= trigger_timeout;
1809 
1810 	return 0;
1811 }
1812 
exec_trigger(const char * cmd)1813 void exec_trigger(const char *cmd)
1814 {
1815 	int ret;
1816 
1817 	if (!cmd)
1818 		return;
1819 
1820 	ret = system(cmd);
1821 	if (ret == -1)
1822 		log_err("fio: failed executing %s trigger\n", cmd);
1823 }
1824 
check_trigger_file(void)1825 void check_trigger_file(void)
1826 {
1827 	if (__check_trigger_file() || trigger_timedout()) {
1828 		if (nr_clients)
1829 			fio_clients_send_trigger(trigger_remote_cmd);
1830 		else {
1831 			verify_save_state();
1832 			fio_terminate_threads(TERMINATE_ALL);
1833 			exec_trigger(trigger_cmd);
1834 		}
1835 	}
1836 }
1837 
fio_verify_load_state(struct thread_data * td)1838 static int fio_verify_load_state(struct thread_data *td)
1839 {
1840 	int ret;
1841 
1842 	if (!td->o.verify_state)
1843 		return 0;
1844 
1845 	if (is_backend) {
1846 		void *data;
1847 
1848 		ret = fio_server_get_verify_state(td->o.name,
1849 					td->thread_number - 1, &data);
1850 		if (!ret)
1851 			verify_convert_assign_state(td, data);
1852 	} else
1853 		ret = verify_load_state(td, "local");
1854 
1855 	return ret;
1856 }
1857 
do_usleep(unsigned int usecs)1858 static void do_usleep(unsigned int usecs)
1859 {
1860 	check_for_running_stats();
1861 	check_trigger_file();
1862 	usleep(usecs);
1863 }
1864 
1865 /*
1866  * Main function for kicking off and reaping jobs, as needed.
1867  */
run_threads(void)1868 static void run_threads(void)
1869 {
1870 	struct thread_data *td;
1871 	unsigned int i, todo, nr_running, m_rate, t_rate, nr_started;
1872 	uint64_t spent;
1873 
1874 	if (fio_gtod_offload && fio_start_gtod_thread())
1875 		return;
1876 
1877 	fio_idle_prof_init();
1878 
1879 	set_sig_handlers();
1880 
1881 	nr_thread = nr_process = 0;
1882 	for_each_td(td, i) {
1883 		if (td->o.use_thread)
1884 			nr_thread++;
1885 		else
1886 			nr_process++;
1887 	}
1888 
1889 	if (output_format == FIO_OUTPUT_NORMAL) {
1890 		log_info("Starting ");
1891 		if (nr_thread)
1892 			log_info("%d thread%s", nr_thread,
1893 						nr_thread > 1 ? "s" : "");
1894 		if (nr_process) {
1895 			if (nr_thread)
1896 				log_info(" and ");
1897 			log_info("%d process%s", nr_process,
1898 						nr_process > 1 ? "es" : "");
1899 		}
1900 		log_info("\n");
1901 		log_info_flush();
1902 	}
1903 
1904 	todo = thread_number;
1905 	nr_running = 0;
1906 	nr_started = 0;
1907 	m_rate = t_rate = 0;
1908 
1909 	for_each_td(td, i) {
1910 		print_status_init(td->thread_number - 1);
1911 
1912 		if (!td->o.create_serialize)
1913 			continue;
1914 
1915 		if (fio_verify_load_state(td))
1916 			goto reap;
1917 
1918 		/*
1919 		 * do file setup here so it happens sequentially,
1920 		 * we don't want X number of threads getting their
1921 		 * client data interspersed on disk
1922 		 */
1923 		if (setup_files(td)) {
1924 reap:
1925 			exit_value++;
1926 			if (td->error)
1927 				log_err("fio: pid=%d, err=%d/%s\n",
1928 					(int) td->pid, td->error, td->verror);
1929 			td_set_runstate(td, TD_REAPED);
1930 			todo--;
1931 		} else {
1932 			struct fio_file *f;
1933 			unsigned int j;
1934 
1935 			/*
1936 			 * for sharing to work, each job must always open
1937 			 * its own files. so close them, if we opened them
1938 			 * for creation
1939 			 */
1940 			for_each_file(td, f, j) {
1941 				if (fio_file_open(f))
1942 					td_io_close_file(td, f);
1943 			}
1944 		}
1945 	}
1946 
1947 	/* start idle threads before io threads start to run */
1948 	fio_idle_prof_start();
1949 
1950 	set_genesis_time();
1951 
1952 	while (todo) {
1953 		struct thread_data *map[REAL_MAX_JOBS];
1954 		struct timeval this_start;
1955 		int this_jobs = 0, left;
1956 
1957 		/*
1958 		 * create threads (TD_NOT_CREATED -> TD_CREATED)
1959 		 */
1960 		for_each_td(td, i) {
1961 			if (td->runstate != TD_NOT_CREATED)
1962 				continue;
1963 
1964 			/*
1965 			 * never got a chance to start, killed by other
1966 			 * thread for some reason
1967 			 */
1968 			if (td->terminate) {
1969 				todo--;
1970 				continue;
1971 			}
1972 
1973 			if (td->o.start_delay) {
1974 				spent = utime_since_genesis();
1975 
1976 				if (td->o.start_delay > spent)
1977 					continue;
1978 			}
1979 
1980 			if (td->o.stonewall && (nr_started || nr_running)) {
1981 				dprint(FD_PROCESS, "%s: stonewall wait\n",
1982 							td->o.name);
1983 				break;
1984 			}
1985 
1986 			init_disk_util(td);
1987 
1988 			td->rusage_sem = fio_mutex_init(FIO_MUTEX_LOCKED);
1989 			td->update_rusage = 0;
1990 
1991 			/*
1992 			 * Set state to created. Thread will transition
1993 			 * to TD_INITIALIZED when it's done setting up.
1994 			 */
1995 			td_set_runstate(td, TD_CREATED);
1996 			map[this_jobs++] = td;
1997 			nr_started++;
1998 
1999 			if (td->o.use_thread) {
2000 				int ret;
2001 
2002 				dprint(FD_PROCESS, "will pthread_create\n");
2003 				ret = pthread_create(&td->thread, NULL,
2004 							thread_main, td);
2005 				if (ret) {
2006 					log_err("pthread_create: %s\n",
2007 							strerror(ret));
2008 					nr_started--;
2009 					break;
2010 				}
2011 				ret = pthread_detach(td->thread);
2012 				if (ret)
2013 					log_err("pthread_detach: %s",
2014 							strerror(ret));
2015 			} else {
2016 				pid_t pid;
2017 				dprint(FD_PROCESS, "will fork\n");
2018 				pid = fork();
2019 				if (!pid) {
2020 					int ret = fork_main(shm_id, i);
2021 
2022 					_exit(ret);
2023 				} else if (i == fio_debug_jobno)
2024 					*fio_debug_jobp = pid;
2025 			}
2026 			dprint(FD_MUTEX, "wait on startup_mutex\n");
2027 			if (fio_mutex_down_timeout(startup_mutex, 10)) {
2028 				log_err("fio: job startup hung? exiting.\n");
2029 				fio_terminate_threads(TERMINATE_ALL);
2030 				fio_abort = 1;
2031 				nr_started--;
2032 				break;
2033 			}
2034 			dprint(FD_MUTEX, "done waiting on startup_mutex\n");
2035 		}
2036 
2037 		/*
2038 		 * Wait for the started threads to transition to
2039 		 * TD_INITIALIZED.
2040 		 */
2041 		fio_gettime(&this_start, NULL);
2042 		left = this_jobs;
2043 		while (left && !fio_abort) {
2044 			if (mtime_since_now(&this_start) > JOB_START_TIMEOUT)
2045 				break;
2046 
2047 			do_usleep(100000);
2048 
2049 			for (i = 0; i < this_jobs; i++) {
2050 				td = map[i];
2051 				if (!td)
2052 					continue;
2053 				if (td->runstate == TD_INITIALIZED) {
2054 					map[i] = NULL;
2055 					left--;
2056 				} else if (td->runstate >= TD_EXITED) {
2057 					map[i] = NULL;
2058 					left--;
2059 					todo--;
2060 					nr_running++; /* work-around... */
2061 				}
2062 			}
2063 		}
2064 
2065 		if (left) {
2066 			log_err("fio: %d job%s failed to start\n", left,
2067 					left > 1 ? "s" : "");
2068 			for (i = 0; i < this_jobs; i++) {
2069 				td = map[i];
2070 				if (!td)
2071 					continue;
2072 				kill(td->pid, SIGTERM);
2073 			}
2074 			break;
2075 		}
2076 
2077 		/*
2078 		 * start created threads (TD_INITIALIZED -> TD_RUNNING).
2079 		 */
2080 		for_each_td(td, i) {
2081 			if (td->runstate != TD_INITIALIZED)
2082 				continue;
2083 
2084 			if (in_ramp_time(td))
2085 				td_set_runstate(td, TD_RAMP);
2086 			else
2087 				td_set_runstate(td, TD_RUNNING);
2088 			nr_running++;
2089 			nr_started--;
2090 			m_rate += ddir_rw_sum(td->o.ratemin);
2091 			t_rate += ddir_rw_sum(td->o.rate);
2092 			todo--;
2093 			fio_mutex_up(td->mutex);
2094 		}
2095 
2096 		reap_threads(&nr_running, &t_rate, &m_rate);
2097 
2098 		if (todo)
2099 			do_usleep(100000);
2100 	}
2101 
2102 	while (nr_running) {
2103 		reap_threads(&nr_running, &t_rate, &m_rate);
2104 		do_usleep(10000);
2105 	}
2106 
2107 	fio_idle_prof_stop();
2108 
2109 	update_io_ticks();
2110 }
2111 
wait_for_helper_thread_exit(void)2112 static void wait_for_helper_thread_exit(void)
2113 {
2114 	void *ret;
2115 
2116 	helper_exit = 1;
2117 	pthread_cond_signal(&helper_cond);
2118 	pthread_join(helper_thread, &ret);
2119 }
2120 
free_disk_util(void)2121 static void free_disk_util(void)
2122 {
2123 	disk_util_prune_entries();
2124 
2125 	pthread_cond_destroy(&helper_cond);
2126 }
2127 
helper_thread_main(void * data)2128 static void *helper_thread_main(void *data)
2129 {
2130 	int ret = 0;
2131 
2132 	fio_mutex_up(startup_mutex);
2133 
2134 	while (!ret) {
2135 		uint64_t sec = DISK_UTIL_MSEC / 1000;
2136 		uint64_t nsec = (DISK_UTIL_MSEC % 1000) * 1000000;
2137 		struct timespec ts;
2138 		struct timeval tv;
2139 
2140 		gettimeofday(&tv, NULL);
2141 		ts.tv_sec = tv.tv_sec + sec;
2142 		ts.tv_nsec = (tv.tv_usec * 1000) + nsec;
2143 
2144 		if (ts.tv_nsec >= 1000000000ULL) {
2145 			ts.tv_nsec -= 1000000000ULL;
2146 			ts.tv_sec++;
2147 		}
2148 
2149 		pthread_cond_timedwait(&helper_cond, &helper_lock, &ts);
2150 
2151 		ret = update_io_ticks();
2152 
2153 		if (helper_do_stat) {
2154 			helper_do_stat = 0;
2155 			__show_running_run_stats();
2156 		}
2157 
2158 		if (!is_backend)
2159 			print_thread_status();
2160 	}
2161 
2162 	return NULL;
2163 }
2164 
create_helper_thread(void)2165 static int create_helper_thread(void)
2166 {
2167 	int ret;
2168 
2169 	setup_disk_util();
2170 
2171 	pthread_cond_init(&helper_cond, NULL);
2172 	pthread_mutex_init(&helper_lock, NULL);
2173 
2174 	ret = pthread_create(&helper_thread, NULL, helper_thread_main, NULL);
2175 	if (ret) {
2176 		log_err("Can't create helper thread: %s\n", strerror(ret));
2177 		return 1;
2178 	}
2179 
2180 	dprint(FD_MUTEX, "wait on startup_mutex\n");
2181 	fio_mutex_down(startup_mutex);
2182 	dprint(FD_MUTEX, "done waiting on startup_mutex\n");
2183 	return 0;
2184 }
2185 
fio_backend(void)2186 int fio_backend(void)
2187 {
2188 	struct thread_data *td;
2189 	int i;
2190 
2191 	if (exec_profile) {
2192 		if (load_profile(exec_profile))
2193 			return 1;
2194 		free(exec_profile);
2195 		exec_profile = NULL;
2196 	}
2197 	if (!thread_number)
2198 		return 0;
2199 
2200 	if (write_bw_log) {
2201 		struct log_params p = {
2202 			.log_type = IO_LOG_TYPE_BW,
2203 		};
2204 
2205 		setup_log(&agg_io_log[DDIR_READ], &p, "agg-read_bw.log");
2206 		setup_log(&agg_io_log[DDIR_WRITE], &p, "agg-write_bw.log");
2207 		setup_log(&agg_io_log[DDIR_TRIM], &p, "agg-trim_bw.log");
2208 	}
2209 
2210 	startup_mutex = fio_mutex_init(FIO_MUTEX_LOCKED);
2211 	if (startup_mutex == NULL)
2212 		return 1;
2213 
2214 	set_genesis_time();
2215 	stat_init();
2216 	create_helper_thread();
2217 
2218 	cgroup_list = smalloc(sizeof(*cgroup_list));
2219 	INIT_FLIST_HEAD(cgroup_list);
2220 
2221 	run_threads();
2222 
2223 	wait_for_helper_thread_exit();
2224 
2225 	if (!fio_abort) {
2226 		__show_run_stats();
2227 		if (write_bw_log) {
2228 			for (i = 0; i < DDIR_RWDIR_CNT; i++) {
2229 				struct io_log *log = agg_io_log[i];
2230 
2231 				flush_log(log);
2232 				free_log(log);
2233 			}
2234 		}
2235 	}
2236 
2237 	for_each_td(td, i) {
2238 		fio_options_free(td);
2239 		if (td->rusage_sem) {
2240 			fio_mutex_remove(td->rusage_sem);
2241 			td->rusage_sem = NULL;
2242 		}
2243 	}
2244 
2245 	free_disk_util();
2246 	cgroup_kill(cgroup_list);
2247 	sfree(cgroup_list);
2248 	sfree(cgroup_mnt);
2249 
2250 	fio_mutex_remove(startup_mutex);
2251 	stat_exit();
2252 	return exit_value;
2253 }
2254