• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * fio - the flexible io tester
3  *
4  * Copyright (C) 2005 Jens Axboe <axboe@suse.de>
5  * Copyright (C) 2006-2012 Jens Axboe <axboe@kernel.dk>
6  *
7  * The license below covers all files distributed with fio unless otherwise
8  * noted in the file itself.
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License version 2 as
12  *  published by the Free Software Foundation.
13  *
14  *  This program is distributed in the hope that it will be useful,
15  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *  GNU General Public License for more details.
18  *
19  *  You should have received a copy of the GNU General Public License
20  *  along with this program; if not, write to the Free Software
21  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
22  *
23  */
24 #include <unistd.h>
25 #include <fcntl.h>
26 #include <string.h>
27 #include <limits.h>
28 #include <signal.h>
29 #include <time.h>
30 #include <locale.h>
31 #include <assert.h>
32 #include <time.h>
33 #include <inttypes.h>
34 #include <sys/stat.h>
35 #include <sys/wait.h>
36 #include <sys/ipc.h>
37 #include <sys/mman.h>
38 
39 #include "fio.h"
40 #ifndef FIO_NO_HAVE_SHM_H
41 #include <sys/shm.h>
42 #endif
43 #include "hash.h"
44 #include "smalloc.h"
45 #include "verify.h"
46 #include "trim.h"
47 #include "diskutil.h"
48 #include "cgroup.h"
49 #include "profile.h"
50 #include "lib/rand.h"
51 #include "memalign.h"
52 #include "server.h"
53 #include "lib/getrusage.h"
54 #include "idletime.h"
55 #include "err.h"
56 
57 static pthread_t disk_util_thread;
58 static struct fio_mutex *disk_thread_mutex;
59 static struct fio_mutex *startup_mutex;
60 static struct flist_head *cgroup_list;
61 static char *cgroup_mnt;
62 static int exit_value;
63 static volatile int fio_abort;
64 static unsigned int nr_process = 0;
65 static unsigned int nr_thread = 0;
66 
67 struct io_log *agg_io_log[DDIR_RWDIR_CNT];
68 
69 int groupid = 0;
70 unsigned int thread_number = 0;
71 unsigned int stat_number = 0;
72 int shm_id = 0;
73 int temp_stall_ts;
74 unsigned long done_secs = 0;
75 volatile int disk_util_exit = 0;
76 
77 #define PAGE_ALIGN(buf)	\
78 	(char *) (((uintptr_t) (buf) + page_mask) & ~page_mask)
79 
80 #define JOB_START_TIMEOUT	(5 * 1000)
81 
sig_int(int sig)82 static void sig_int(int sig)
83 {
84 	if (threads) {
85 		if (is_backend)
86 			fio_server_got_signal(sig);
87 		else {
88 			log_info("\nfio: terminating on signal %d\n", sig);
89 			fflush(stdout);
90 			exit_value = 128;
91 		}
92 
93 		fio_terminate_threads(TERMINATE_ALL);
94 	}
95 }
96 
sig_show_status(int sig)97 static void sig_show_status(int sig)
98 {
99 	show_running_run_stats();
100 }
101 
set_sig_handlers(void)102 static void set_sig_handlers(void)
103 {
104 	struct sigaction act;
105 
106 	memset(&act, 0, sizeof(act));
107 	act.sa_handler = sig_int;
108 	act.sa_flags = SA_RESTART;
109 	sigaction(SIGINT, &act, NULL);
110 
111 	memset(&act, 0, sizeof(act));
112 	act.sa_handler = sig_int;
113 	act.sa_flags = SA_RESTART;
114 	sigaction(SIGTERM, &act, NULL);
115 
116 /* Windows uses SIGBREAK as a quit signal from other applications */
117 #ifdef WIN32
118 	memset(&act, 0, sizeof(act));
119 	act.sa_handler = sig_int;
120 	act.sa_flags = SA_RESTART;
121 	sigaction(SIGBREAK, &act, NULL);
122 #endif
123 
124 	memset(&act, 0, sizeof(act));
125 	act.sa_handler = sig_show_status;
126 	act.sa_flags = SA_RESTART;
127 	sigaction(SIGUSR1, &act, NULL);
128 
129 	if (is_backend) {
130 		memset(&act, 0, sizeof(act));
131 		act.sa_handler = sig_int;
132 		act.sa_flags = SA_RESTART;
133 		sigaction(SIGPIPE, &act, NULL);
134 	}
135 }
136 
137 /*
138  * Check if we are above the minimum rate given.
139  */
__check_min_rate(struct thread_data * td,struct timeval * now,enum fio_ddir ddir)140 static int __check_min_rate(struct thread_data *td, struct timeval *now,
141 			    enum fio_ddir ddir)
142 {
143 	unsigned long long bytes = 0;
144 	unsigned long iops = 0;
145 	unsigned long spent;
146 	unsigned long rate;
147 	unsigned int ratemin = 0;
148 	unsigned int rate_iops = 0;
149 	unsigned int rate_iops_min = 0;
150 
151 	assert(ddir_rw(ddir));
152 
153 	if (!td->o.ratemin[ddir] && !td->o.rate_iops_min[ddir])
154 		return 0;
155 
156 	/*
157 	 * allow a 2 second settle period in the beginning
158 	 */
159 	if (mtime_since(&td->start, now) < 2000)
160 		return 0;
161 
162 	iops += td->this_io_blocks[ddir];
163 	bytes += td->this_io_bytes[ddir];
164 	ratemin += td->o.ratemin[ddir];
165 	rate_iops += td->o.rate_iops[ddir];
166 	rate_iops_min += td->o.rate_iops_min[ddir];
167 
168 	/*
169 	 * if rate blocks is set, sample is running
170 	 */
171 	if (td->rate_bytes[ddir] || td->rate_blocks[ddir]) {
172 		spent = mtime_since(&td->lastrate[ddir], now);
173 		if (spent < td->o.ratecycle)
174 			return 0;
175 
176 		if (td->o.rate[ddir]) {
177 			/*
178 			 * check bandwidth specified rate
179 			 */
180 			if (bytes < td->rate_bytes[ddir]) {
181 				log_err("%s: min rate %u not met\n", td->o.name,
182 								ratemin);
183 				return 1;
184 			} else {
185 				if (spent)
186 					rate = ((bytes - td->rate_bytes[ddir]) * 1000) / spent;
187 				else
188 					rate = 0;
189 
190 				if (rate < ratemin ||
191 				    bytes < td->rate_bytes[ddir]) {
192 					log_err("%s: min rate %u not met, got"
193 						" %luKB/sec\n", td->o.name,
194 							ratemin, rate);
195 					return 1;
196 				}
197 			}
198 		} else {
199 			/*
200 			 * checks iops specified rate
201 			 */
202 			if (iops < rate_iops) {
203 				log_err("%s: min iops rate %u not met\n",
204 						td->o.name, rate_iops);
205 				return 1;
206 			} else {
207 				if (spent)
208 					rate = ((iops - td->rate_blocks[ddir]) * 1000) / spent;
209 				else
210 					rate = 0;
211 
212 				if (rate < rate_iops_min ||
213 				    iops < td->rate_blocks[ddir]) {
214 					log_err("%s: min iops rate %u not met,"
215 						" got %lu\n", td->o.name,
216 							rate_iops_min, rate);
217 				}
218 			}
219 		}
220 	}
221 
222 	td->rate_bytes[ddir] = bytes;
223 	td->rate_blocks[ddir] = iops;
224 	memcpy(&td->lastrate[ddir], now, sizeof(*now));
225 	return 0;
226 }
227 
check_min_rate(struct thread_data * td,struct timeval * now,uint64_t * bytes_done)228 static int check_min_rate(struct thread_data *td, struct timeval *now,
229 			  uint64_t *bytes_done)
230 {
231 	int ret = 0;
232 
233 	if (bytes_done[DDIR_READ])
234 		ret |= __check_min_rate(td, now, DDIR_READ);
235 	if (bytes_done[DDIR_WRITE])
236 		ret |= __check_min_rate(td, now, DDIR_WRITE);
237 	if (bytes_done[DDIR_TRIM])
238 		ret |= __check_min_rate(td, now, DDIR_TRIM);
239 
240 	return ret;
241 }
242 
243 /*
244  * When job exits, we can cancel the in-flight IO if we are using async
245  * io. Attempt to do so.
246  */
cleanup_pending_aio(struct thread_data * td)247 static void cleanup_pending_aio(struct thread_data *td)
248 {
249 	int r;
250 
251 	/*
252 	 * get immediately available events, if any
253 	 */
254 	r = io_u_queued_complete(td, 0, NULL);
255 	if (r < 0)
256 		return;
257 
258 	/*
259 	 * now cancel remaining active events
260 	 */
261 	if (td->io_ops->cancel) {
262 		struct io_u *io_u;
263 		int i;
264 
265 		io_u_qiter(&td->io_u_all, io_u, i) {
266 			if (io_u->flags & IO_U_F_FLIGHT) {
267 				r = td->io_ops->cancel(td, io_u);
268 				if (!r)
269 					put_io_u(td, io_u);
270 			}
271 		}
272 	}
273 
274 	if (td->cur_depth)
275 		r = io_u_queued_complete(td, td->cur_depth, NULL);
276 }
277 
278 /*
279  * Helper to handle the final sync of a file. Works just like the normal
280  * io path, just does everything sync.
281  */
fio_io_sync(struct thread_data * td,struct fio_file * f)282 static int fio_io_sync(struct thread_data *td, struct fio_file *f)
283 {
284 	struct io_u *io_u = __get_io_u(td);
285 	int ret;
286 
287 	if (!io_u)
288 		return 1;
289 
290 	io_u->ddir = DDIR_SYNC;
291 	io_u->file = f;
292 
293 	if (td_io_prep(td, io_u)) {
294 		put_io_u(td, io_u);
295 		return 1;
296 	}
297 
298 requeue:
299 	ret = td_io_queue(td, io_u);
300 	if (ret < 0) {
301 		td_verror(td, io_u->error, "td_io_queue");
302 		put_io_u(td, io_u);
303 		return 1;
304 	} else if (ret == FIO_Q_QUEUED) {
305 		if (io_u_queued_complete(td, 1, NULL) < 0)
306 			return 1;
307 	} else if (ret == FIO_Q_COMPLETED) {
308 		if (io_u->error) {
309 			td_verror(td, io_u->error, "td_io_queue");
310 			return 1;
311 		}
312 
313 		if (io_u_sync_complete(td, io_u, NULL) < 0)
314 			return 1;
315 	} else if (ret == FIO_Q_BUSY) {
316 		if (td_io_commit(td))
317 			return 1;
318 		goto requeue;
319 	}
320 
321 	return 0;
322 }
323 
fio_file_fsync(struct thread_data * td,struct fio_file * f)324 static int fio_file_fsync(struct thread_data *td, struct fio_file *f)
325 {
326 	int ret;
327 
328 	if (fio_file_open(f))
329 		return fio_io_sync(td, f);
330 
331 	if (td_io_open_file(td, f))
332 		return 1;
333 
334 	ret = fio_io_sync(td, f);
335 	td_io_close_file(td, f);
336 	return ret;
337 }
338 
__update_tv_cache(struct thread_data * td)339 static inline void __update_tv_cache(struct thread_data *td)
340 {
341 	fio_gettime(&td->tv_cache, NULL);
342 }
343 
update_tv_cache(struct thread_data * td)344 static inline void update_tv_cache(struct thread_data *td)
345 {
346 	if ((++td->tv_cache_nr & td->tv_cache_mask) == td->tv_cache_mask)
347 		__update_tv_cache(td);
348 }
349 
runtime_exceeded(struct thread_data * td,struct timeval * t)350 static inline int runtime_exceeded(struct thread_data *td, struct timeval *t)
351 {
352 	if (in_ramp_time(td))
353 		return 0;
354 	if (!td->o.timeout)
355 		return 0;
356 	if (utime_since(&td->epoch, t) >= td->o.timeout)
357 		return 1;
358 
359 	return 0;
360 }
361 
break_on_this_error(struct thread_data * td,enum fio_ddir ddir,int * retptr)362 static int break_on_this_error(struct thread_data *td, enum fio_ddir ddir,
363 			       int *retptr)
364 {
365 	int ret = *retptr;
366 
367 	if (ret < 0 || td->error) {
368 		int err = td->error;
369 		enum error_type_bit eb;
370 
371 		if (ret < 0)
372 			err = -ret;
373 
374 		eb = td_error_type(ddir, err);
375 		if (!(td->o.continue_on_error & (1 << eb)))
376 			return 1;
377 
378 		if (td_non_fatal_error(td, eb, err)) {
379 		        /*
380 		         * Continue with the I/Os in case of
381 			 * a non fatal error.
382 			 */
383 			update_error_count(td, err);
384 			td_clear_error(td);
385 			*retptr = 0;
386 			return 0;
387 		} else if (td->o.fill_device && err == ENOSPC) {
388 			/*
389 			 * We expect to hit this error if
390 			 * fill_device option is set.
391 			 */
392 			td_clear_error(td);
393 			td->terminate = 1;
394 			return 1;
395 		} else {
396 			/*
397 			 * Stop the I/O in case of a fatal
398 			 * error.
399 			 */
400 			update_error_count(td, err);
401 			return 1;
402 		}
403 	}
404 
405 	return 0;
406 }
407 
check_update_rusage(struct thread_data * td)408 static void check_update_rusage(struct thread_data *td)
409 {
410 	if (td->update_rusage) {
411 		td->update_rusage = 0;
412 		update_rusage_stat(td);
413 		fio_mutex_up(td->rusage_sem);
414 	}
415 }
416 
417 /*
418  * The main verify engine. Runs over the writes we previously submitted,
419  * reads the blocks back in, and checks the crc/md5 of the data.
420  */
do_verify(struct thread_data * td,uint64_t verify_bytes)421 static void do_verify(struct thread_data *td, uint64_t verify_bytes)
422 {
423 	uint64_t bytes_done[DDIR_RWDIR_CNT] = { 0, 0, 0 };
424 	struct fio_file *f;
425 	struct io_u *io_u;
426 	int ret, min_events;
427 	unsigned int i;
428 
429 	dprint(FD_VERIFY, "starting loop\n");
430 
431 	/*
432 	 * sync io first and invalidate cache, to make sure we really
433 	 * read from disk.
434 	 */
435 	for_each_file(td, f, i) {
436 		if (!fio_file_open(f))
437 			continue;
438 		if (fio_io_sync(td, f))
439 			break;
440 		if (file_invalidate_cache(td, f))
441 			break;
442 	}
443 
444 	check_update_rusage(td);
445 
446 	if (td->error)
447 		return;
448 
449 	td_set_runstate(td, TD_VERIFYING);
450 
451 	io_u = NULL;
452 	while (!td->terminate) {
453 		enum fio_ddir ddir;
454 		int ret2, full;
455 
456 		update_tv_cache(td);
457 		check_update_rusage(td);
458 
459 		if (runtime_exceeded(td, &td->tv_cache)) {
460 			__update_tv_cache(td);
461 			if (runtime_exceeded(td, &td->tv_cache)) {
462 				td->terminate = 1;
463 				break;
464 			}
465 		}
466 
467 		if (flow_threshold_exceeded(td))
468 			continue;
469 
470 		if (!td->o.experimental_verify) {
471 			io_u = __get_io_u(td);
472 			if (!io_u)
473 				break;
474 
475 			if (get_next_verify(td, io_u)) {
476 				put_io_u(td, io_u);
477 				break;
478 			}
479 
480 			if (td_io_prep(td, io_u)) {
481 				put_io_u(td, io_u);
482 				break;
483 			}
484 		} else {
485 			if (ddir_rw_sum(bytes_done) + td->o.rw_min_bs > verify_bytes)
486 				break;
487 
488 			while ((io_u = get_io_u(td)) != NULL) {
489 				if (IS_ERR(io_u)) {
490 					io_u = NULL;
491 					ret = FIO_Q_BUSY;
492 					goto reap;
493 				}
494 
495 				/*
496 				 * We are only interested in the places where
497 				 * we wrote or trimmed IOs. Turn those into
498 				 * reads for verification purposes.
499 				 */
500 				if (io_u->ddir == DDIR_READ) {
501 					/*
502 					 * Pretend we issued it for rwmix
503 					 * accounting
504 					 */
505 					td->io_issues[DDIR_READ]++;
506 					put_io_u(td, io_u);
507 					continue;
508 				} else if (io_u->ddir == DDIR_TRIM) {
509 					io_u->ddir = DDIR_READ;
510 					io_u->flags |= IO_U_F_TRIMMED;
511 					break;
512 				} else if (io_u->ddir == DDIR_WRITE) {
513 					io_u->ddir = DDIR_READ;
514 					break;
515 				} else {
516 					put_io_u(td, io_u);
517 					continue;
518 				}
519 			}
520 
521 			if (!io_u)
522 				break;
523 		}
524 
525 		if (td->o.verify_async)
526 			io_u->end_io = verify_io_u_async;
527 		else
528 			io_u->end_io = verify_io_u;
529 
530 		ddir = io_u->ddir;
531 
532 		ret = td_io_queue(td, io_u);
533 		switch (ret) {
534 		case FIO_Q_COMPLETED:
535 			if (io_u->error) {
536 				ret = -io_u->error;
537 				clear_io_u(td, io_u);
538 			} else if (io_u->resid) {
539 				int bytes = io_u->xfer_buflen - io_u->resid;
540 
541 				/*
542 				 * zero read, fail
543 				 */
544 				if (!bytes) {
545 					td_verror(td, EIO, "full resid");
546 					put_io_u(td, io_u);
547 					break;
548 				}
549 
550 				io_u->xfer_buflen = io_u->resid;
551 				io_u->xfer_buf += bytes;
552 				io_u->offset += bytes;
553 
554 				if (ddir_rw(io_u->ddir))
555 					td->ts.short_io_u[io_u->ddir]++;
556 
557 				f = io_u->file;
558 				if (io_u->offset == f->real_file_size)
559 					goto sync_done;
560 
561 				requeue_io_u(td, &io_u);
562 			} else {
563 sync_done:
564 				ret = io_u_sync_complete(td, io_u, bytes_done);
565 				if (ret < 0)
566 					break;
567 			}
568 			continue;
569 		case FIO_Q_QUEUED:
570 			break;
571 		case FIO_Q_BUSY:
572 			requeue_io_u(td, &io_u);
573 			ret2 = td_io_commit(td);
574 			if (ret2 < 0)
575 				ret = ret2;
576 			break;
577 		default:
578 			assert(ret < 0);
579 			td_verror(td, -ret, "td_io_queue");
580 			break;
581 		}
582 
583 		if (break_on_this_error(td, ddir, &ret))
584 			break;
585 
586 		/*
587 		 * if we can queue more, do so. but check if there are
588 		 * completed io_u's first. Note that we can get BUSY even
589 		 * without IO queued, if the system is resource starved.
590 		 */
591 reap:
592 		full = queue_full(td) || (ret == FIO_Q_BUSY && td->cur_depth);
593 		if (full || !td->o.iodepth_batch_complete) {
594 			min_events = min(td->o.iodepth_batch_complete,
595 					 td->cur_depth);
596 			/*
597 			 * if the queue is full, we MUST reap at least 1 event
598 			 */
599 			if (full && !min_events)
600 				min_events = 1;
601 
602 			do {
603 				/*
604 				 * Reap required number of io units, if any,
605 				 * and do the verification on them through
606 				 * the callback handler
607 				 */
608 				if (io_u_queued_complete(td, min_events, bytes_done) < 0) {
609 					ret = -1;
610 					break;
611 				}
612 			} while (full && (td->cur_depth > td->o.iodepth_low));
613 		}
614 		if (ret < 0)
615 			break;
616 	}
617 
618 	check_update_rusage(td);
619 
620 	if (!td->error) {
621 		min_events = td->cur_depth;
622 
623 		if (min_events)
624 			ret = io_u_queued_complete(td, min_events, NULL);
625 	} else
626 		cleanup_pending_aio(td);
627 
628 	td_set_runstate(td, TD_RUNNING);
629 
630 	dprint(FD_VERIFY, "exiting loop\n");
631 }
632 
exceeds_number_ios(struct thread_data * td)633 static unsigned int exceeds_number_ios(struct thread_data *td)
634 {
635 	unsigned long long number_ios;
636 
637 	if (!td->o.number_ios)
638 		return 0;
639 
640 	number_ios = ddir_rw_sum(td->this_io_blocks);
641 	number_ios += td->io_u_queued + td->io_u_in_flight;
642 
643 	return number_ios >= td->o.number_ios;
644 }
645 
io_bytes_exceeded(struct thread_data * td)646 static int io_bytes_exceeded(struct thread_data *td)
647 {
648 	unsigned long long bytes, limit;
649 
650 	if (td_rw(td))
651 		bytes = td->this_io_bytes[DDIR_READ] + td->this_io_bytes[DDIR_WRITE];
652 	else if (td_write(td))
653 		bytes = td->this_io_bytes[DDIR_WRITE];
654 	else if (td_read(td))
655 		bytes = td->this_io_bytes[DDIR_READ];
656 	else
657 		bytes = td->this_io_bytes[DDIR_TRIM];
658 
659 	if (td->o.io_limit)
660 		limit = td->o.io_limit;
661 	else
662 		limit = td->o.size;
663 
664 	return bytes >= limit || exceeds_number_ios(td);
665 }
666 
667 /*
668  * Main IO worker function. It retrieves io_u's to process and queues
669  * and reaps them, checking for rate and errors along the way.
670  *
671  * Returns number of bytes written and trimmed.
672  */
do_io(struct thread_data * td)673 static uint64_t do_io(struct thread_data *td)
674 {
675 	uint64_t bytes_done[DDIR_RWDIR_CNT] = { 0, 0, 0 };
676 	unsigned int i;
677 	int ret = 0;
678 	uint64_t total_bytes, bytes_issued = 0;
679 
680 	if (in_ramp_time(td))
681 		td_set_runstate(td, TD_RAMP);
682 	else
683 		td_set_runstate(td, TD_RUNNING);
684 
685 	lat_target_init(td);
686 
687 	/*
688 	 * If verify_backlog is enabled, we'll run the verify in this
689 	 * handler as well. For that case, we may need up to twice the
690 	 * amount of bytes.
691 	 */
692 	total_bytes = td->o.size;
693 	if (td->o.verify != VERIFY_NONE &&
694 	   (td_write(td) && td->o.verify_backlog))
695 		total_bytes += td->o.size;
696 
697 	while ((td->o.read_iolog_file && !flist_empty(&td->io_log_list)) ||
698 		(!flist_empty(&td->trim_list)) || !io_bytes_exceeded(td) ||
699 		td->o.time_based) {
700 		struct timeval comp_time;
701 		int min_evts = 0;
702 		struct io_u *io_u;
703 		int ret2, full;
704 		enum fio_ddir ddir;
705 
706 		check_update_rusage(td);
707 
708 		if (td->terminate || td->done)
709 			break;
710 
711 		update_tv_cache(td);
712 
713 		if (runtime_exceeded(td, &td->tv_cache)) {
714 			__update_tv_cache(td);
715 			if (runtime_exceeded(td, &td->tv_cache)) {
716 				td->terminate = 1;
717 				break;
718 			}
719 		}
720 
721 		if (flow_threshold_exceeded(td))
722 			continue;
723 
724 		if (bytes_issued >= total_bytes)
725 			break;
726 
727 		io_u = get_io_u(td);
728 		if (IS_ERR_OR_NULL(io_u)) {
729 			int err = PTR_ERR(io_u);
730 
731 			io_u = NULL;
732 			if (err == -EBUSY) {
733 				ret = FIO_Q_BUSY;
734 				goto reap;
735 			}
736 			if (td->o.latency_target)
737 				goto reap;
738 			break;
739 		}
740 
741 		ddir = io_u->ddir;
742 
743 		/*
744 		 * Add verification end_io handler if:
745 		 *	- Asked to verify (!td_rw(td))
746 		 *	- Or the io_u is from our verify list (mixed write/ver)
747 		 */
748 		if (td->o.verify != VERIFY_NONE && io_u->ddir == DDIR_READ &&
749 		    ((io_u->flags & IO_U_F_VER_LIST) || !td_rw(td))) {
750 
751 			if (!td->o.verify_pattern_bytes) {
752 				io_u->rand_seed = __rand(&td->__verify_state);
753 				if (sizeof(int) != sizeof(long *))
754 					io_u->rand_seed *= __rand(&td->__verify_state);
755 			}
756 
757 			if (td->o.verify_async)
758 				io_u->end_io = verify_io_u_async;
759 			else
760 				io_u->end_io = verify_io_u;
761 			td_set_runstate(td, TD_VERIFYING);
762 		} else if (in_ramp_time(td))
763 			td_set_runstate(td, TD_RAMP);
764 		else
765 			td_set_runstate(td, TD_RUNNING);
766 
767 		/*
768 		 * Always log IO before it's issued, so we know the specific
769 		 * order of it. The logged unit will track when the IO has
770 		 * completed.
771 		 */
772 		if (td_write(td) && io_u->ddir == DDIR_WRITE &&
773 		    td->o.do_verify &&
774 		    td->o.verify != VERIFY_NONE &&
775 		    !td->o.experimental_verify)
776 			log_io_piece(td, io_u);
777 
778 		ret = td_io_queue(td, io_u);
779 		switch (ret) {
780 		case FIO_Q_COMPLETED:
781 			if (io_u->error) {
782 				ret = -io_u->error;
783 				unlog_io_piece(td, io_u);
784 				clear_io_u(td, io_u);
785 			} else if (io_u->resid) {
786 				int bytes = io_u->xfer_buflen - io_u->resid;
787 				struct fio_file *f = io_u->file;
788 
789 				bytes_issued += bytes;
790 
791 				trim_io_piece(td, io_u);
792 
793 				/*
794 				 * zero read, fail
795 				 */
796 				if (!bytes) {
797 					unlog_io_piece(td, io_u);
798 					td_verror(td, EIO, "full resid");
799 					put_io_u(td, io_u);
800 					break;
801 				}
802 
803 				io_u->xfer_buflen = io_u->resid;
804 				io_u->xfer_buf += bytes;
805 				io_u->offset += bytes;
806 
807 				if (ddir_rw(io_u->ddir))
808 					td->ts.short_io_u[io_u->ddir]++;
809 
810 				if (io_u->offset == f->real_file_size)
811 					goto sync_done;
812 
813 				requeue_io_u(td, &io_u);
814 			} else {
815 sync_done:
816 				if (__should_check_rate(td, DDIR_READ) ||
817 				    __should_check_rate(td, DDIR_WRITE) ||
818 				    __should_check_rate(td, DDIR_TRIM))
819 					fio_gettime(&comp_time, NULL);
820 
821 				ret = io_u_sync_complete(td, io_u, bytes_done);
822 				if (ret < 0)
823 					break;
824 				bytes_issued += io_u->xfer_buflen;
825 			}
826 			break;
827 		case FIO_Q_QUEUED:
828 			/*
829 			 * if the engine doesn't have a commit hook,
830 			 * the io_u is really queued. if it does have such
831 			 * a hook, it has to call io_u_queued() itself.
832 			 */
833 			if (td->io_ops->commit == NULL)
834 				io_u_queued(td, io_u);
835 			bytes_issued += io_u->xfer_buflen;
836 			break;
837 		case FIO_Q_BUSY:
838 			unlog_io_piece(td, io_u);
839 			requeue_io_u(td, &io_u);
840 			ret2 = td_io_commit(td);
841 			if (ret2 < 0)
842 				ret = ret2;
843 			break;
844 		default:
845 			assert(ret < 0);
846 			put_io_u(td, io_u);
847 			break;
848 		}
849 
850 		if (break_on_this_error(td, ddir, &ret))
851 			break;
852 
853 		/*
854 		 * See if we need to complete some commands. Note that we
855 		 * can get BUSY even without IO queued, if the system is
856 		 * resource starved.
857 		 */
858 reap:
859 		full = queue_full(td) || (ret == FIO_Q_BUSY && td->cur_depth);
860 		if (full || !td->o.iodepth_batch_complete) {
861 			min_evts = min(td->o.iodepth_batch_complete,
862 					td->cur_depth);
863 			/*
864 			 * if the queue is full, we MUST reap at least 1 event
865 			 */
866 			if (full && !min_evts)
867 				min_evts = 1;
868 
869 			if (__should_check_rate(td, DDIR_READ) ||
870 			    __should_check_rate(td, DDIR_WRITE) ||
871 			    __should_check_rate(td, DDIR_TRIM))
872 				fio_gettime(&comp_time, NULL);
873 
874 			do {
875 				ret = io_u_queued_complete(td, min_evts, bytes_done);
876 				if (ret < 0)
877 					break;
878 
879 			} while (full && (td->cur_depth > td->o.iodepth_low));
880 		}
881 
882 		if (ret < 0)
883 			break;
884 		if (!ddir_rw_sum(bytes_done) && !(td->io_ops->flags & FIO_NOIO))
885 			continue;
886 
887 		if (!in_ramp_time(td) && should_check_rate(td, bytes_done)) {
888 			if (check_min_rate(td, &comp_time, bytes_done)) {
889 				if (exitall_on_terminate)
890 					fio_terminate_threads(td->groupid);
891 				td_verror(td, EIO, "check_min_rate");
892 				break;
893 			}
894 		}
895 		if (!in_ramp_time(td) && td->o.latency_target)
896 			lat_target_check(td);
897 
898 		if (td->o.thinktime) {
899 			unsigned long long b;
900 
901 			b = ddir_rw_sum(td->io_blocks);
902 			if (!(b % td->o.thinktime_blocks)) {
903 				int left;
904 
905 				io_u_quiesce(td);
906 
907 				if (td->o.thinktime_spin)
908 					usec_spin(td->o.thinktime_spin);
909 
910 				left = td->o.thinktime - td->o.thinktime_spin;
911 				if (left)
912 					usec_sleep(td, left);
913 			}
914 		}
915 	}
916 
917 	check_update_rusage(td);
918 
919 	if (td->trim_entries)
920 		log_err("fio: %lu trim entries leaked?\n", td->trim_entries);
921 
922 	if (td->o.fill_device && td->error == ENOSPC) {
923 		td->error = 0;
924 		td->terminate = 1;
925 	}
926 	if (!td->error) {
927 		struct fio_file *f;
928 
929 		i = td->cur_depth;
930 		if (i) {
931 			ret = io_u_queued_complete(td, i, bytes_done);
932 			if (td->o.fill_device && td->error == ENOSPC)
933 				td->error = 0;
934 		}
935 
936 		if (should_fsync(td) && td->o.end_fsync) {
937 			td_set_runstate(td, TD_FSYNCING);
938 
939 			for_each_file(td, f, i) {
940 				if (!fio_file_fsync(td, f))
941 					continue;
942 
943 				log_err("fio: end_fsync failed for file %s\n",
944 								f->file_name);
945 			}
946 		}
947 	} else
948 		cleanup_pending_aio(td);
949 
950 	/*
951 	 * stop job if we failed doing any IO
952 	 */
953 	if (!ddir_rw_sum(td->this_io_bytes))
954 		td->done = 1;
955 
956 	return bytes_done[DDIR_WRITE] + bytes_done[DDIR_TRIM];
957 }
958 
cleanup_io_u(struct thread_data * td)959 static void cleanup_io_u(struct thread_data *td)
960 {
961 	struct io_u *io_u;
962 
963 	while ((io_u = io_u_qpop(&td->io_u_freelist)) != NULL) {
964 
965 		if (td->io_ops->io_u_free)
966 			td->io_ops->io_u_free(td, io_u);
967 
968 		fio_memfree(io_u, sizeof(*io_u));
969 	}
970 
971 	free_io_mem(td);
972 
973 	io_u_rexit(&td->io_u_requeues);
974 	io_u_qexit(&td->io_u_freelist);
975 	io_u_qexit(&td->io_u_all);
976 }
977 
init_io_u(struct thread_data * td)978 static int init_io_u(struct thread_data *td)
979 {
980 	struct io_u *io_u;
981 	unsigned int max_bs, min_write;
982 	int cl_align, i, max_units;
983 	int data_xfer = 1, err;
984 	char *p;
985 
986 	max_units = td->o.iodepth;
987 	max_bs = td_max_bs(td);
988 	min_write = td->o.min_bs[DDIR_WRITE];
989 	td->orig_buffer_size = (unsigned long long) max_bs
990 					* (unsigned long long) max_units;
991 
992 	if ((td->io_ops->flags & FIO_NOIO) || !(td_read(td) || td_write(td)))
993 		data_xfer = 0;
994 
995 	err = 0;
996 	err += io_u_rinit(&td->io_u_requeues, td->o.iodepth);
997 	err += io_u_qinit(&td->io_u_freelist, td->o.iodepth);
998 	err += io_u_qinit(&td->io_u_all, td->o.iodepth);
999 
1000 	if (err) {
1001 		log_err("fio: failed setting up IO queues\n");
1002 		return 1;
1003 	}
1004 
1005 	/*
1006 	 * if we may later need to do address alignment, then add any
1007 	 * possible adjustment here so that we don't cause a buffer
1008 	 * overflow later. this adjustment may be too much if we get
1009 	 * lucky and the allocator gives us an aligned address.
1010 	 */
1011 	if (td->o.odirect || td->o.mem_align || td->o.oatomic ||
1012 	    (td->io_ops->flags & FIO_RAWIO))
1013 		td->orig_buffer_size += page_mask + td->o.mem_align;
1014 
1015 	if (td->o.mem_type == MEM_SHMHUGE || td->o.mem_type == MEM_MMAPHUGE) {
1016 		unsigned long bs;
1017 
1018 		bs = td->orig_buffer_size + td->o.hugepage_size - 1;
1019 		td->orig_buffer_size = bs & ~(td->o.hugepage_size - 1);
1020 	}
1021 
1022 	if (td->orig_buffer_size != (size_t) td->orig_buffer_size) {
1023 		log_err("fio: IO memory too large. Reduce max_bs or iodepth\n");
1024 		return 1;
1025 	}
1026 
1027 	if (data_xfer && allocate_io_mem(td))
1028 		return 1;
1029 
1030 	if (td->o.odirect || td->o.mem_align || td->o.oatomic ||
1031 	    (td->io_ops->flags & FIO_RAWIO))
1032 		p = PAGE_ALIGN(td->orig_buffer) + td->o.mem_align;
1033 	else
1034 		p = td->orig_buffer;
1035 
1036 	cl_align = os_cache_line_size();
1037 
1038 	for (i = 0; i < max_units; i++) {
1039 		void *ptr;
1040 
1041 		if (td->terminate)
1042 			return 1;
1043 
1044 		ptr = fio_memalign(cl_align, sizeof(*io_u));
1045 		if (!ptr) {
1046 			log_err("fio: unable to allocate aligned memory\n");
1047 			break;
1048 		}
1049 
1050 		io_u = ptr;
1051 		memset(io_u, 0, sizeof(*io_u));
1052 		INIT_FLIST_HEAD(&io_u->verify_list);
1053 		dprint(FD_MEM, "io_u alloc %p, index %u\n", io_u, i);
1054 
1055 		if (data_xfer) {
1056 			io_u->buf = p;
1057 			dprint(FD_MEM, "io_u %p, mem %p\n", io_u, io_u->buf);
1058 
1059 			if (td_write(td))
1060 				io_u_fill_buffer(td, io_u, min_write, max_bs);
1061 			if (td_write(td) && td->o.verify_pattern_bytes) {
1062 				/*
1063 				 * Fill the buffer with the pattern if we are
1064 				 * going to be doing writes.
1065 				 */
1066 				fill_verify_pattern(td, io_u->buf, max_bs, io_u, 0, 0);
1067 			}
1068 		}
1069 
1070 		io_u->index = i;
1071 		io_u->flags = IO_U_F_FREE;
1072 		io_u_qpush(&td->io_u_freelist, io_u);
1073 
1074 		/*
1075 		 * io_u never leaves this stack, used for iteration of all
1076 		 * io_u buffers.
1077 		 */
1078 		io_u_qpush(&td->io_u_all, io_u);
1079 
1080 		if (td->io_ops->io_u_init) {
1081 			int ret = td->io_ops->io_u_init(td, io_u);
1082 
1083 			if (ret) {
1084 				log_err("fio: failed to init engine data: %d\n", ret);
1085 				return 1;
1086 			}
1087 		}
1088 
1089 		p += max_bs;
1090 	}
1091 
1092 	return 0;
1093 }
1094 
switch_ioscheduler(struct thread_data * td)1095 static int switch_ioscheduler(struct thread_data *td)
1096 {
1097 	char tmp[256], tmp2[128];
1098 	FILE *f;
1099 	int ret;
1100 
1101 	if (td->io_ops->flags & FIO_DISKLESSIO)
1102 		return 0;
1103 
1104 	sprintf(tmp, "%s/queue/scheduler", td->sysfs_root);
1105 
1106 	f = fopen(tmp, "r+");
1107 	if (!f) {
1108 		if (errno == ENOENT) {
1109 			log_err("fio: os or kernel doesn't support IO scheduler"
1110 				" switching\n");
1111 			return 0;
1112 		}
1113 		td_verror(td, errno, "fopen iosched");
1114 		return 1;
1115 	}
1116 
1117 	/*
1118 	 * Set io scheduler.
1119 	 */
1120 	ret = fwrite(td->o.ioscheduler, strlen(td->o.ioscheduler), 1, f);
1121 	if (ferror(f) || ret != 1) {
1122 		td_verror(td, errno, "fwrite");
1123 		fclose(f);
1124 		return 1;
1125 	}
1126 
1127 	rewind(f);
1128 
1129 	/*
1130 	 * Read back and check that the selected scheduler is now the default.
1131 	 */
1132 	ret = fread(tmp, sizeof(tmp), 1, f);
1133 	if (ferror(f) || ret < 0) {
1134 		td_verror(td, errno, "fread");
1135 		fclose(f);
1136 		return 1;
1137 	}
1138 	tmp[sizeof(tmp) - 1] = '\0';
1139 
1140 
1141 	sprintf(tmp2, "[%s]", td->o.ioscheduler);
1142 	if (!strstr(tmp, tmp2)) {
1143 		log_err("fio: io scheduler %s not found\n", td->o.ioscheduler);
1144 		td_verror(td, EINVAL, "iosched_switch");
1145 		fclose(f);
1146 		return 1;
1147 	}
1148 
1149 	fclose(f);
1150 	return 0;
1151 }
1152 
keep_running(struct thread_data * td)1153 static int keep_running(struct thread_data *td)
1154 {
1155 	unsigned long long limit;
1156 
1157 	if (td->done)
1158 		return 0;
1159 	if (td->o.time_based)
1160 		return 1;
1161 	if (td->o.loops) {
1162 		td->o.loops--;
1163 		return 1;
1164 	}
1165 	if (exceeds_number_ios(td))
1166 		return 0;
1167 
1168 	if (td->o.io_limit)
1169 		limit = td->o.io_limit;
1170 	else
1171 		limit = td->o.size;
1172 
1173 	if (limit != -1ULL && ddir_rw_sum(td->io_bytes) < limit) {
1174 		uint64_t diff;
1175 
1176 		/*
1177 		 * If the difference is less than the minimum IO size, we
1178 		 * are done.
1179 		 */
1180 		diff = limit - ddir_rw_sum(td->io_bytes);
1181 		if (diff < td_max_bs(td))
1182 			return 0;
1183 
1184 		if (fio_files_done(td))
1185 			return 0;
1186 
1187 		return 1;
1188 	}
1189 
1190 	return 0;
1191 }
1192 
exec_string(struct thread_options * o,const char * string,const char * mode)1193 static int exec_string(struct thread_options *o, const char *string, const char *mode)
1194 {
1195 	int ret, newlen = strlen(string) + strlen(o->name) + strlen(mode) + 9 + 1;
1196 	char *str;
1197 
1198 	str = malloc(newlen);
1199 	sprintf(str, "%s &> %s.%s.txt", string, o->name, mode);
1200 
1201 	log_info("%s : Saving output of %s in %s.%s.txt\n",o->name, mode, o->name, mode);
1202 	ret = system(str);
1203 	if (ret == -1)
1204 		log_err("fio: exec of cmd <%s> failed\n", str);
1205 
1206 	free(str);
1207 	return ret;
1208 }
1209 
1210 /*
1211  * Dry run to compute correct state of numberio for verification.
1212  */
do_dry_run(struct thread_data * td)1213 static uint64_t do_dry_run(struct thread_data *td)
1214 {
1215 	uint64_t bytes_done[DDIR_RWDIR_CNT] = { 0, 0, 0 };
1216 
1217 	td_set_runstate(td, TD_RUNNING);
1218 
1219 	while ((td->o.read_iolog_file && !flist_empty(&td->io_log_list)) ||
1220 		(!flist_empty(&td->trim_list)) || !io_bytes_exceeded(td)) {
1221 		struct io_u *io_u;
1222 		int ret;
1223 
1224 		if (td->terminate || td->done)
1225 			break;
1226 
1227 		io_u = get_io_u(td);
1228 		if (!io_u)
1229 			break;
1230 
1231 		io_u->flags |= IO_U_F_FLIGHT;
1232 		io_u->error = 0;
1233 		io_u->resid = 0;
1234 		if (ddir_rw(acct_ddir(io_u)))
1235 			td->io_issues[acct_ddir(io_u)]++;
1236 		if (ddir_rw(io_u->ddir)) {
1237 			io_u_mark_depth(td, 1);
1238 			td->ts.total_io_u[io_u->ddir]++;
1239 		}
1240 
1241 		if (td_write(td) && io_u->ddir == DDIR_WRITE &&
1242 		    td->o.do_verify &&
1243 		    td->o.verify != VERIFY_NONE &&
1244 		    !td->o.experimental_verify)
1245 			log_io_piece(td, io_u);
1246 
1247 		ret = io_u_sync_complete(td, io_u, bytes_done);
1248 		(void) ret;
1249 	}
1250 
1251 	return bytes_done[DDIR_WRITE] + bytes_done[DDIR_TRIM];
1252 }
1253 
1254 /*
1255  * Entry point for the thread based jobs. The process based jobs end up
1256  * here as well, after a little setup.
1257  */
thread_main(void * data)1258 static void *thread_main(void *data)
1259 {
1260 	unsigned long long elapsed;
1261 	struct thread_data *td = data;
1262 	struct thread_options *o = &td->o;
1263 	pthread_condattr_t attr;
1264 	int clear_state;
1265 	int ret;
1266 
1267 	if (!o->use_thread) {
1268 		setsid();
1269 		td->pid = getpid();
1270 	} else
1271 		td->pid = gettid();
1272 
1273 	/*
1274 	 * fio_time_init() may not have been called yet if running as a server
1275 	 */
1276 	fio_time_init();
1277 
1278 	fio_local_clock_init(o->use_thread);
1279 
1280 	dprint(FD_PROCESS, "jobs pid=%d started\n", (int) td->pid);
1281 
1282 	if (is_backend)
1283 		fio_server_send_start(td);
1284 
1285 	INIT_FLIST_HEAD(&td->io_log_list);
1286 	INIT_FLIST_HEAD(&td->io_hist_list);
1287 	INIT_FLIST_HEAD(&td->verify_list);
1288 	INIT_FLIST_HEAD(&td->trim_list);
1289 	INIT_FLIST_HEAD(&td->next_rand_list);
1290 	pthread_mutex_init(&td->io_u_lock, NULL);
1291 	td->io_hist_tree = RB_ROOT;
1292 
1293 	pthread_condattr_init(&attr);
1294 	pthread_cond_init(&td->verify_cond, &attr);
1295 	pthread_cond_init(&td->free_cond, &attr);
1296 
1297 	td_set_runstate(td, TD_INITIALIZED);
1298 	dprint(FD_MUTEX, "up startup_mutex\n");
1299 	fio_mutex_up(startup_mutex);
1300 	dprint(FD_MUTEX, "wait on td->mutex\n");
1301 	fio_mutex_down(td->mutex);
1302 	dprint(FD_MUTEX, "done waiting on td->mutex\n");
1303 
1304 	/*
1305 	 * A new gid requires privilege, so we need to do this before setting
1306 	 * the uid.
1307 	 */
1308 	if (o->gid != -1U && setgid(o->gid)) {
1309 		td_verror(td, errno, "setgid");
1310 		goto err;
1311 	}
1312 	if (o->uid != -1U && setuid(o->uid)) {
1313 		td_verror(td, errno, "setuid");
1314 		goto err;
1315 	}
1316 
1317 	/*
1318 	 * If we have a gettimeofday() thread, make sure we exclude that
1319 	 * thread from this job
1320 	 */
1321 	if (o->gtod_cpu)
1322 		fio_cpu_clear(&o->cpumask, o->gtod_cpu);
1323 
1324 	/*
1325 	 * Set affinity first, in case it has an impact on the memory
1326 	 * allocations.
1327 	 */
1328 	if (o->cpumask_set) {
1329 		if (o->cpus_allowed_policy == FIO_CPUS_SPLIT) {
1330 			ret = fio_cpus_split(&o->cpumask, td->thread_number - 1);
1331 			if (!ret) {
1332 				log_err("fio: no CPUs set\n");
1333 				log_err("fio: Try increasing number of available CPUs\n");
1334 				td_verror(td, EINVAL, "cpus_split");
1335 				goto err;
1336 			}
1337 		}
1338 		ret = fio_setaffinity(td->pid, o->cpumask);
1339 		if (ret == -1) {
1340 			td_verror(td, errno, "cpu_set_affinity");
1341 			goto err;
1342 		}
1343 	}
1344 
1345 #ifdef CONFIG_LIBNUMA
1346 	/* numa node setup */
1347 	if (o->numa_cpumask_set || o->numa_memmask_set) {
1348 		struct bitmask *mask;
1349 		int ret;
1350 
1351 		if (numa_available() < 0) {
1352 			td_verror(td, errno, "Does not support NUMA API\n");
1353 			goto err;
1354 		}
1355 
1356 		if (o->numa_cpumask_set) {
1357 			mask = numa_parse_nodestring(o->numa_cpunodes);
1358 			ret = numa_run_on_node_mask(mask);
1359 			numa_free_nodemask(mask);
1360 			if (ret == -1) {
1361 				td_verror(td, errno, \
1362 					"numa_run_on_node_mask failed\n");
1363 				goto err;
1364 			}
1365 		}
1366 
1367 		if (o->numa_memmask_set) {
1368 
1369 			mask = NULL;
1370 			if (o->numa_memnodes)
1371 				mask = numa_parse_nodestring(o->numa_memnodes);
1372 
1373 			switch (o->numa_mem_mode) {
1374 			case MPOL_INTERLEAVE:
1375 				numa_set_interleave_mask(mask);
1376 				break;
1377 			case MPOL_BIND:
1378 				numa_set_membind(mask);
1379 				break;
1380 			case MPOL_LOCAL:
1381 				numa_set_localalloc();
1382 				break;
1383 			case MPOL_PREFERRED:
1384 				numa_set_preferred(o->numa_mem_prefer_node);
1385 				break;
1386 			case MPOL_DEFAULT:
1387 			default:
1388 				break;
1389 			}
1390 
1391 			if (mask)
1392 				numa_free_nodemask(mask);
1393 
1394 		}
1395 	}
1396 #endif
1397 
1398 	if (fio_pin_memory(td))
1399 		goto err;
1400 
1401 	/*
1402 	 * May alter parameters that init_io_u() will use, so we need to
1403 	 * do this first.
1404 	 */
1405 	if (init_iolog(td))
1406 		goto err;
1407 
1408 	if (init_io_u(td))
1409 		goto err;
1410 
1411 	if (o->verify_async && verify_async_init(td))
1412 		goto err;
1413 
1414 	if (o->ioprio) {
1415 		ret = ioprio_set(IOPRIO_WHO_PROCESS, 0, o->ioprio_class, o->ioprio);
1416 		if (ret == -1) {
1417 			td_verror(td, errno, "ioprio_set");
1418 			goto err;
1419 		}
1420 	}
1421 
1422 	if (o->cgroup && cgroup_setup(td, cgroup_list, &cgroup_mnt))
1423 		goto err;
1424 
1425 	errno = 0;
1426 	if (nice(o->nice) == -1 && errno != 0) {
1427 		td_verror(td, errno, "nice");
1428 		goto err;
1429 	}
1430 
1431 	if (o->ioscheduler && switch_ioscheduler(td))
1432 		goto err;
1433 
1434 	if (!o->create_serialize && setup_files(td))
1435 		goto err;
1436 
1437 	if (td_io_init(td))
1438 		goto err;
1439 
1440 	if (init_random_map(td))
1441 		goto err;
1442 
1443 	if (o->exec_prerun && exec_string(o, o->exec_prerun, (const char *)"prerun"))
1444 		goto err;
1445 
1446 	if (o->pre_read) {
1447 		if (pre_read_files(td) < 0)
1448 			goto err;
1449 	}
1450 
1451 	fio_verify_init(td);
1452 
1453 	fio_gettime(&td->epoch, NULL);
1454 	fio_getrusage(&td->ru_start);
1455 	clear_state = 0;
1456 	while (keep_running(td)) {
1457 		uint64_t verify_bytes;
1458 
1459 		fio_gettime(&td->start, NULL);
1460 		memcpy(&td->bw_sample_time, &td->start, sizeof(td->start));
1461 		memcpy(&td->iops_sample_time, &td->start, sizeof(td->start));
1462 		memcpy(&td->tv_cache, &td->start, sizeof(td->start));
1463 
1464 		if (o->ratemin[DDIR_READ] || o->ratemin[DDIR_WRITE] ||
1465 				o->ratemin[DDIR_TRIM]) {
1466 		        memcpy(&td->lastrate[DDIR_READ], &td->bw_sample_time,
1467 						sizeof(td->bw_sample_time));
1468 		        memcpy(&td->lastrate[DDIR_WRITE], &td->bw_sample_time,
1469 						sizeof(td->bw_sample_time));
1470 		        memcpy(&td->lastrate[DDIR_TRIM], &td->bw_sample_time,
1471 						sizeof(td->bw_sample_time));
1472 		}
1473 
1474 		if (clear_state)
1475 			clear_io_state(td);
1476 
1477 		prune_io_piece_log(td);
1478 
1479 		if (td->o.verify_only && (td_write(td) || td_rw(td)))
1480 			verify_bytes = do_dry_run(td);
1481 		else
1482 			verify_bytes = do_io(td);
1483 
1484 		clear_state = 1;
1485 
1486 		if (td_read(td) && td->io_bytes[DDIR_READ]) {
1487 			elapsed = utime_since_now(&td->start);
1488 			td->ts.runtime[DDIR_READ] += elapsed;
1489 		}
1490 		if (td_write(td) && td->io_bytes[DDIR_WRITE]) {
1491 			elapsed = utime_since_now(&td->start);
1492 			td->ts.runtime[DDIR_WRITE] += elapsed;
1493 		}
1494 		if (td_trim(td) && td->io_bytes[DDIR_TRIM]) {
1495 			elapsed = utime_since_now(&td->start);
1496 			td->ts.runtime[DDIR_TRIM] += elapsed;
1497 		}
1498 
1499 		if (td->error || td->terminate)
1500 			break;
1501 
1502 		if (!o->do_verify ||
1503 		    o->verify == VERIFY_NONE ||
1504 		    (td->io_ops->flags & FIO_UNIDIR))
1505 			continue;
1506 
1507 		clear_io_state(td);
1508 
1509 		fio_gettime(&td->start, NULL);
1510 
1511 		do_verify(td, verify_bytes);
1512 
1513 		td->ts.runtime[DDIR_READ] += utime_since_now(&td->start);
1514 
1515 		if (td->error || td->terminate)
1516 			break;
1517 	}
1518 
1519 	update_rusage_stat(td);
1520 	td->ts.runtime[DDIR_READ] = (td->ts.runtime[DDIR_READ] + 999) / 1000;
1521 	td->ts.runtime[DDIR_WRITE] = (td->ts.runtime[DDIR_WRITE] + 999) / 1000;
1522 	td->ts.runtime[DDIR_TRIM] = (td->ts.runtime[DDIR_TRIM] + 999) / 1000;
1523 	td->ts.total_run_time = mtime_since_now(&td->epoch);
1524 	td->ts.io_bytes[DDIR_READ] = td->io_bytes[DDIR_READ];
1525 	td->ts.io_bytes[DDIR_WRITE] = td->io_bytes[DDIR_WRITE];
1526 	td->ts.io_bytes[DDIR_TRIM] = td->io_bytes[DDIR_TRIM];
1527 
1528 	fio_unpin_memory(td);
1529 
1530 	fio_writeout_logs(td);
1531 
1532 	if (o->exec_postrun)
1533 		exec_string(o, o->exec_postrun, (const char *)"postrun");
1534 
1535 	if (exitall_on_terminate)
1536 		fio_terminate_threads(td->groupid);
1537 
1538 err:
1539 	if (td->error)
1540 		log_info("fio: pid=%d, err=%d/%s\n", (int) td->pid, td->error,
1541 							td->verror);
1542 
1543 	if (o->verify_async)
1544 		verify_async_exit(td);
1545 
1546 	close_and_free_files(td);
1547 	cleanup_io_u(td);
1548 	close_ioengine(td);
1549 	cgroup_shutdown(td, &cgroup_mnt);
1550 
1551 	if (o->cpumask_set) {
1552 		int ret = fio_cpuset_exit(&o->cpumask);
1553 
1554 		td_verror(td, ret, "fio_cpuset_exit");
1555 	}
1556 
1557 	/*
1558 	 * do this very late, it will log file closing as well
1559 	 */
1560 	if (o->write_iolog_file)
1561 		write_iolog_close(td);
1562 
1563 	fio_mutex_remove(td->rusage_sem);
1564 	td->rusage_sem = NULL;
1565 
1566 	fio_mutex_remove(td->mutex);
1567 	td->mutex = NULL;
1568 
1569 	td_set_runstate(td, TD_EXITED);
1570 	return (void *) (uintptr_t) td->error;
1571 }
1572 
1573 
1574 /*
1575  * We cannot pass the td data into a forked process, so attach the td and
1576  * pass it to the thread worker.
1577  */
fork_main(int shmid,int offset)1578 static int fork_main(int shmid, int offset)
1579 {
1580 	struct thread_data *td;
1581 	void *data, *ret;
1582 
1583 #ifndef __hpux
1584 	data = shmat(shmid, NULL, 0);
1585 	if (data == (void *) -1) {
1586 		int __err = errno;
1587 
1588 		perror("shmat");
1589 		return __err;
1590 	}
1591 #else
1592 	/*
1593 	 * HP-UX inherits shm mappings?
1594 	 */
1595 	data = threads;
1596 #endif
1597 
1598 	td = data + offset * sizeof(struct thread_data);
1599 	ret = thread_main(td);
1600 	shmdt(data);
1601 	return (int) (uintptr_t) ret;
1602 }
1603 
1604 /*
1605  * Run over the job map and reap the threads that have exited, if any.
1606  */
reap_threads(unsigned int * nr_running,unsigned int * t_rate,unsigned int * m_rate)1607 static void reap_threads(unsigned int *nr_running, unsigned int *t_rate,
1608 			 unsigned int *m_rate)
1609 {
1610 	struct thread_data *td;
1611 	unsigned int cputhreads, realthreads, pending;
1612 	int i, status, ret;
1613 
1614 	/*
1615 	 * reap exited threads (TD_EXITED -> TD_REAPED)
1616 	 */
1617 	realthreads = pending = cputhreads = 0;
1618 	for_each_td(td, i) {
1619 		int flags = 0;
1620 
1621 		/*
1622 		 * ->io_ops is NULL for a thread that has closed its
1623 		 * io engine
1624 		 */
1625 		if (td->io_ops && !strcmp(td->io_ops->name, "cpuio"))
1626 			cputhreads++;
1627 		else
1628 			realthreads++;
1629 
1630 		if (!td->pid) {
1631 			pending++;
1632 			continue;
1633 		}
1634 		if (td->runstate == TD_REAPED)
1635 			continue;
1636 		if (td->o.use_thread) {
1637 			if (td->runstate == TD_EXITED) {
1638 				td_set_runstate(td, TD_REAPED);
1639 				goto reaped;
1640 			}
1641 			continue;
1642 		}
1643 
1644 		flags = WNOHANG;
1645 		if (td->runstate == TD_EXITED)
1646 			flags = 0;
1647 
1648 		/*
1649 		 * check if someone quit or got killed in an unusual way
1650 		 */
1651 		ret = waitpid(td->pid, &status, flags);
1652 		if (ret < 0) {
1653 			if (errno == ECHILD) {
1654 				log_err("fio: pid=%d disappeared %d\n",
1655 						(int) td->pid, td->runstate);
1656 				td->sig = ECHILD;
1657 				td_set_runstate(td, TD_REAPED);
1658 				goto reaped;
1659 			}
1660 			perror("waitpid");
1661 		} else if (ret == td->pid) {
1662 			if (WIFSIGNALED(status)) {
1663 				int sig = WTERMSIG(status);
1664 
1665 				if (sig != SIGTERM && sig != SIGUSR2)
1666 					log_err("fio: pid=%d, got signal=%d\n",
1667 							(int) td->pid, sig);
1668 				td->sig = sig;
1669 				td_set_runstate(td, TD_REAPED);
1670 				goto reaped;
1671 			}
1672 			if (WIFEXITED(status)) {
1673 				if (WEXITSTATUS(status) && !td->error)
1674 					td->error = WEXITSTATUS(status);
1675 
1676 				td_set_runstate(td, TD_REAPED);
1677 				goto reaped;
1678 			}
1679 		}
1680 
1681 		/*
1682 		 * thread is not dead, continue
1683 		 */
1684 		pending++;
1685 		continue;
1686 reaped:
1687 		(*nr_running)--;
1688 		(*m_rate) -= ddir_rw_sum(td->o.ratemin);
1689 		(*t_rate) -= ddir_rw_sum(td->o.rate);
1690 		if (!td->pid)
1691 			pending--;
1692 
1693 		if (td->error)
1694 			exit_value++;
1695 
1696 		done_secs += mtime_since_now(&td->epoch) / 1000;
1697 		profile_td_exit(td);
1698 	}
1699 
1700 	if (*nr_running == cputhreads && !pending && realthreads)
1701 		fio_terminate_threads(TERMINATE_ALL);
1702 }
1703 
do_usleep(unsigned int usecs)1704 static void do_usleep(unsigned int usecs)
1705 {
1706 	check_for_running_stats();
1707 	usleep(usecs);
1708 }
1709 
1710 /*
1711  * Main function for kicking off and reaping jobs, as needed.
1712  */
run_threads(void)1713 static void run_threads(void)
1714 {
1715 	struct thread_data *td;
1716 	unsigned int i, todo, nr_running, m_rate, t_rate, nr_started;
1717 	uint64_t spent;
1718 
1719 	if (fio_gtod_offload && fio_start_gtod_thread())
1720 		return;
1721 
1722 	fio_idle_prof_init();
1723 
1724 	set_sig_handlers();
1725 
1726 	nr_thread = nr_process = 0;
1727 	for_each_td(td, i) {
1728 		if (td->o.use_thread)
1729 			nr_thread++;
1730 		else
1731 			nr_process++;
1732 	}
1733 
1734 	if (output_format == FIO_OUTPUT_NORMAL) {
1735 		log_info("Starting ");
1736 		if (nr_thread)
1737 			log_info("%d thread%s", nr_thread,
1738 						nr_thread > 1 ? "s" : "");
1739 		if (nr_process) {
1740 			if (nr_thread)
1741 				log_info(" and ");
1742 			log_info("%d process%s", nr_process,
1743 						nr_process > 1 ? "es" : "");
1744 		}
1745 		log_info("\n");
1746 		fflush(stdout);
1747 	}
1748 
1749 	todo = thread_number;
1750 	nr_running = 0;
1751 	nr_started = 0;
1752 	m_rate = t_rate = 0;
1753 
1754 	for_each_td(td, i) {
1755 		print_status_init(td->thread_number - 1);
1756 
1757 		if (!td->o.create_serialize)
1758 			continue;
1759 
1760 		/*
1761 		 * do file setup here so it happens sequentially,
1762 		 * we don't want X number of threads getting their
1763 		 * client data interspersed on disk
1764 		 */
1765 		if (setup_files(td)) {
1766 			exit_value++;
1767 			if (td->error)
1768 				log_err("fio: pid=%d, err=%d/%s\n",
1769 					(int) td->pid, td->error, td->verror);
1770 			td_set_runstate(td, TD_REAPED);
1771 			todo--;
1772 		} else {
1773 			struct fio_file *f;
1774 			unsigned int j;
1775 
1776 			/*
1777 			 * for sharing to work, each job must always open
1778 			 * its own files. so close them, if we opened them
1779 			 * for creation
1780 			 */
1781 			for_each_file(td, f, j) {
1782 				if (fio_file_open(f))
1783 					td_io_close_file(td, f);
1784 			}
1785 		}
1786 	}
1787 
1788 	/* start idle threads before io threads start to run */
1789 	fio_idle_prof_start();
1790 
1791 	set_genesis_time();
1792 
1793 	while (todo) {
1794 		struct thread_data *map[REAL_MAX_JOBS];
1795 		struct timeval this_start;
1796 		int this_jobs = 0, left;
1797 
1798 		/*
1799 		 * create threads (TD_NOT_CREATED -> TD_CREATED)
1800 		 */
1801 		for_each_td(td, i) {
1802 			if (td->runstate != TD_NOT_CREATED)
1803 				continue;
1804 
1805 			/*
1806 			 * never got a chance to start, killed by other
1807 			 * thread for some reason
1808 			 */
1809 			if (td->terminate) {
1810 				todo--;
1811 				continue;
1812 			}
1813 
1814 			if (td->o.start_delay) {
1815 				spent = utime_since_genesis();
1816 
1817 				if (td->o.start_delay > spent)
1818 					continue;
1819 			}
1820 
1821 			if (td->o.stonewall && (nr_started || nr_running)) {
1822 				dprint(FD_PROCESS, "%s: stonewall wait\n",
1823 							td->o.name);
1824 				break;
1825 			}
1826 
1827 			init_disk_util(td);
1828 
1829 			td->rusage_sem = fio_mutex_init(FIO_MUTEX_LOCKED);
1830 			td->update_rusage = 0;
1831 
1832 			/*
1833 			 * Set state to created. Thread will transition
1834 			 * to TD_INITIALIZED when it's done setting up.
1835 			 */
1836 			td_set_runstate(td, TD_CREATED);
1837 			map[this_jobs++] = td;
1838 			nr_started++;
1839 
1840 			if (td->o.use_thread) {
1841 				int ret;
1842 
1843 				dprint(FD_PROCESS, "will pthread_create\n");
1844 				ret = pthread_create(&td->thread, NULL,
1845 							thread_main, td);
1846 				if (ret) {
1847 					log_err("pthread_create: %s\n",
1848 							strerror(ret));
1849 					nr_started--;
1850 					break;
1851 				}
1852 				ret = pthread_detach(td->thread);
1853 				if (ret)
1854 					log_err("pthread_detach: %s",
1855 							strerror(ret));
1856 			} else {
1857 				pid_t pid;
1858 				dprint(FD_PROCESS, "will fork\n");
1859 				pid = fork();
1860 				if (!pid) {
1861 					int ret = fork_main(shm_id, i);
1862 
1863 					_exit(ret);
1864 				} else if (i == fio_debug_jobno)
1865 					*fio_debug_jobp = pid;
1866 			}
1867 			dprint(FD_MUTEX, "wait on startup_mutex\n");
1868 			if (fio_mutex_down_timeout(startup_mutex, 10)) {
1869 				log_err("fio: job startup hung? exiting.\n");
1870 				fio_terminate_threads(TERMINATE_ALL);
1871 				fio_abort = 1;
1872 				nr_started--;
1873 				break;
1874 			}
1875 			dprint(FD_MUTEX, "done waiting on startup_mutex\n");
1876 		}
1877 
1878 		/*
1879 		 * Wait for the started threads to transition to
1880 		 * TD_INITIALIZED.
1881 		 */
1882 		fio_gettime(&this_start, NULL);
1883 		left = this_jobs;
1884 		while (left && !fio_abort) {
1885 			if (mtime_since_now(&this_start) > JOB_START_TIMEOUT)
1886 				break;
1887 
1888 			do_usleep(100000);
1889 
1890 			for (i = 0; i < this_jobs; i++) {
1891 				td = map[i];
1892 				if (!td)
1893 					continue;
1894 				if (td->runstate == TD_INITIALIZED) {
1895 					map[i] = NULL;
1896 					left--;
1897 				} else if (td->runstate >= TD_EXITED) {
1898 					map[i] = NULL;
1899 					left--;
1900 					todo--;
1901 					nr_running++; /* work-around... */
1902 				}
1903 			}
1904 		}
1905 
1906 		if (left) {
1907 			log_err("fio: %d job%s failed to start\n", left,
1908 					left > 1 ? "s" : "");
1909 			for (i = 0; i < this_jobs; i++) {
1910 				td = map[i];
1911 				if (!td)
1912 					continue;
1913 				kill(td->pid, SIGTERM);
1914 			}
1915 			break;
1916 		}
1917 
1918 		/*
1919 		 * start created threads (TD_INITIALIZED -> TD_RUNNING).
1920 		 */
1921 		for_each_td(td, i) {
1922 			if (td->runstate != TD_INITIALIZED)
1923 				continue;
1924 
1925 			if (in_ramp_time(td))
1926 				td_set_runstate(td, TD_RAMP);
1927 			else
1928 				td_set_runstate(td, TD_RUNNING);
1929 			nr_running++;
1930 			nr_started--;
1931 			m_rate += ddir_rw_sum(td->o.ratemin);
1932 			t_rate += ddir_rw_sum(td->o.rate);
1933 			todo--;
1934 			fio_mutex_up(td->mutex);
1935 		}
1936 
1937 		reap_threads(&nr_running, &t_rate, &m_rate);
1938 
1939 		if (todo)
1940 			do_usleep(100000);
1941 	}
1942 
1943 	while (nr_running) {
1944 		reap_threads(&nr_running, &t_rate, &m_rate);
1945 		do_usleep(10000);
1946 	}
1947 
1948 	fio_idle_prof_stop();
1949 
1950 	update_io_ticks();
1951 }
1952 
wait_for_disk_thread_exit(void)1953 void wait_for_disk_thread_exit(void)
1954 {
1955 	fio_mutex_down(disk_thread_mutex);
1956 }
1957 
free_disk_util(void)1958 static void free_disk_util(void)
1959 {
1960 	disk_util_start_exit();
1961 	wait_for_disk_thread_exit();
1962 	disk_util_prune_entries();
1963 }
1964 
disk_thread_main(void * data)1965 static void *disk_thread_main(void *data)
1966 {
1967 	int ret = 0;
1968 
1969 	fio_mutex_up(startup_mutex);
1970 
1971 	while (threads && !ret) {
1972 		usleep(DISK_UTIL_MSEC * 1000);
1973 		if (!threads)
1974 			break;
1975 		ret = update_io_ticks();
1976 
1977 		if (!is_backend)
1978 			print_thread_status();
1979 	}
1980 
1981 	fio_mutex_up(disk_thread_mutex);
1982 	return NULL;
1983 }
1984 
create_disk_util_thread(void)1985 static int create_disk_util_thread(void)
1986 {
1987 	int ret;
1988 
1989 	setup_disk_util();
1990 
1991 	disk_thread_mutex = fio_mutex_init(FIO_MUTEX_LOCKED);
1992 
1993 	ret = pthread_create(&disk_util_thread, NULL, disk_thread_main, NULL);
1994 	if (ret) {
1995 		fio_mutex_remove(disk_thread_mutex);
1996 		log_err("Can't create disk util thread: %s\n", strerror(ret));
1997 		return 1;
1998 	}
1999 
2000 	ret = pthread_detach(disk_util_thread);
2001 	if (ret) {
2002 		fio_mutex_remove(disk_thread_mutex);
2003 		log_err("Can't detatch disk util thread: %s\n", strerror(ret));
2004 		return 1;
2005 	}
2006 
2007 	dprint(FD_MUTEX, "wait on startup_mutex\n");
2008 	fio_mutex_down(startup_mutex);
2009 	dprint(FD_MUTEX, "done waiting on startup_mutex\n");
2010 	return 0;
2011 }
2012 
fio_backend(void)2013 int fio_backend(void)
2014 {
2015 	struct thread_data *td;
2016 	int i;
2017 
2018 	if (exec_profile) {
2019 		if (load_profile(exec_profile))
2020 			return 1;
2021 		free(exec_profile);
2022 		exec_profile = NULL;
2023 	}
2024 	if (!thread_number)
2025 		return 0;
2026 
2027 	if (write_bw_log) {
2028 		setup_log(&agg_io_log[DDIR_READ], 0, IO_LOG_TYPE_BW);
2029 		setup_log(&agg_io_log[DDIR_WRITE], 0, IO_LOG_TYPE_BW);
2030 		setup_log(&agg_io_log[DDIR_TRIM], 0, IO_LOG_TYPE_BW);
2031 	}
2032 
2033 	startup_mutex = fio_mutex_init(FIO_MUTEX_LOCKED);
2034 	if (startup_mutex == NULL)
2035 		return 1;
2036 
2037 	set_genesis_time();
2038 	stat_init();
2039 	create_disk_util_thread();
2040 
2041 	cgroup_list = smalloc(sizeof(*cgroup_list));
2042 	INIT_FLIST_HEAD(cgroup_list);
2043 
2044 	run_threads();
2045 
2046 	if (!fio_abort) {
2047 		show_run_stats();
2048 		if (write_bw_log) {
2049 			__finish_log(agg_io_log[DDIR_READ], "agg-read_bw.log");
2050 			__finish_log(agg_io_log[DDIR_WRITE],
2051 					"agg-write_bw.log");
2052 			__finish_log(agg_io_log[DDIR_TRIM],
2053 					"agg-write_bw.log");
2054 		}
2055 	}
2056 
2057 	for_each_td(td, i)
2058 		fio_options_free(td);
2059 
2060 	free_disk_util();
2061 	cgroup_kill(cgroup_list);
2062 	sfree(cgroup_list);
2063 	sfree(cgroup_mnt);
2064 
2065 	fio_mutex_remove(startup_mutex);
2066 	fio_mutex_remove(disk_thread_mutex);
2067 	stat_exit();
2068 	return exit_value;
2069 }
2070