• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Rated submission helpers
3  *
4  * Copyright (C) 2015 Jens Axboe <axboe@kernel.dk>
5  *
6  */
7 #include "fio.h"
8 #include "ioengines.h"
9 #include "lib/getrusage.h"
10 #include "rate-submit.h"
11 
io_workqueue_fn(struct submit_worker * sw,struct workqueue_work * work)12 static int io_workqueue_fn(struct submit_worker *sw,
13 			   struct workqueue_work *work)
14 {
15 	struct io_u *io_u = container_of(work, struct io_u, work);
16 	const enum fio_ddir ddir = io_u->ddir;
17 	struct thread_data *td = sw->priv;
18 	int ret;
19 
20 	dprint(FD_RATE, "io_u %p queued by %u\n", io_u, gettid());
21 
22 	io_u_set(td, io_u, IO_U_F_NO_FILE_PUT);
23 
24 	td->cur_depth++;
25 
26 	do {
27 		ret = td_io_queue(td, io_u);
28 		if (ret != FIO_Q_BUSY)
29 			break;
30 		ret = io_u_queued_complete(td, 1);
31 		if (ret > 0)
32 			td->cur_depth -= ret;
33 		io_u_clear(td, io_u, IO_U_F_FLIGHT);
34 	} while (1);
35 
36 	dprint(FD_RATE, "io_u %p ret %d by %u\n", io_u, ret, gettid());
37 
38 	io_queue_event(td, io_u, &ret, ddir, NULL, 0, NULL);
39 
40 	if (ret == FIO_Q_COMPLETED)
41 		td->cur_depth--;
42 	else if (ret == FIO_Q_QUEUED) {
43 		unsigned int min_evts;
44 
45 		if (td->o.iodepth == 1)
46 			min_evts = 1;
47 		else
48 			min_evts = 0;
49 
50 		ret = io_u_queued_complete(td, min_evts);
51 		if (ret > 0)
52 			td->cur_depth -= ret;
53 	} else if (ret == FIO_Q_BUSY) {
54 		ret = io_u_queued_complete(td, td->cur_depth);
55 		if (ret > 0)
56 			td->cur_depth -= ret;
57 	}
58 
59 	return 0;
60 }
61 
io_workqueue_pre_sleep_flush_fn(struct submit_worker * sw)62 static bool io_workqueue_pre_sleep_flush_fn(struct submit_worker *sw)
63 {
64 	struct thread_data *td = sw->priv;
65 
66 	if (td->io_u_queued || td->cur_depth || td->io_u_in_flight)
67 		return true;
68 
69 	return false;
70 }
71 
io_workqueue_pre_sleep_fn(struct submit_worker * sw)72 static void io_workqueue_pre_sleep_fn(struct submit_worker *sw)
73 {
74 	struct thread_data *td = sw->priv;
75 	int ret;
76 
77 	ret = io_u_quiesce(td);
78 	if (ret > 0)
79 		td->cur_depth -= ret;
80 }
81 
io_workqueue_alloc_fn(struct submit_worker * sw)82 static int io_workqueue_alloc_fn(struct submit_worker *sw)
83 {
84 	struct thread_data *td;
85 
86 	td = calloc(1, sizeof(*td));
87 	sw->priv = td;
88 	return 0;
89 }
90 
io_workqueue_free_fn(struct submit_worker * sw)91 static void io_workqueue_free_fn(struct submit_worker *sw)
92 {
93 	free(sw->priv);
94 	sw->priv = NULL;
95 }
96 
io_workqueue_init_worker_fn(struct submit_worker * sw)97 static int io_workqueue_init_worker_fn(struct submit_worker *sw)
98 {
99 	struct thread_data *parent = sw->wq->td;
100 	struct thread_data *td = sw->priv;
101 
102 	memcpy(&td->o, &parent->o, sizeof(td->o));
103 	memcpy(&td->ts, &parent->ts, sizeof(td->ts));
104 	td->o.uid = td->o.gid = -1U;
105 	dup_files(td, parent);
106 	td->eo = parent->eo;
107 	fio_options_mem_dupe(td);
108 
109 	if (ioengine_load(td))
110 		goto err;
111 
112 	td->pid = gettid();
113 
114 	INIT_FLIST_HEAD(&td->io_log_list);
115 	INIT_FLIST_HEAD(&td->io_hist_list);
116 	INIT_FLIST_HEAD(&td->verify_list);
117 	INIT_FLIST_HEAD(&td->trim_list);
118 	INIT_FLIST_HEAD(&td->next_rand_list);
119 	td->io_hist_tree = RB_ROOT;
120 
121 	td->o.iodepth = 1;
122 	if (td_io_init(td))
123 		goto err_io_init;
124 
125 	set_epoch_time(td, td->o.log_unix_epoch);
126 	fio_getrusage(&td->ru_start);
127 	clear_io_state(td, 1);
128 
129 	td_set_runstate(td, TD_RUNNING);
130 	td->flags |= TD_F_CHILD;
131 	td->parent = parent;
132 	return 0;
133 
134 err_io_init:
135 	close_ioengine(td);
136 err:
137 	return 1;
138 
139 }
140 
io_workqueue_exit_worker_fn(struct submit_worker * sw,unsigned int * sum_cnt)141 static void io_workqueue_exit_worker_fn(struct submit_worker *sw,
142 					unsigned int *sum_cnt)
143 {
144 	struct thread_data *td = sw->priv;
145 
146 	(*sum_cnt)++;
147 	sum_thread_stats(&sw->wq->td->ts, &td->ts, *sum_cnt == 1);
148 
149 	fio_options_free(td);
150 	close_and_free_files(td);
151 	if (td->io_ops)
152 		close_ioengine(td);
153 	td_set_runstate(td, TD_EXITED);
154 }
155 
156 #ifdef CONFIG_SFAA
sum_val(uint64_t * dst,uint64_t * src)157 static void sum_val(uint64_t *dst, uint64_t *src)
158 {
159 	if (*src) {
160 		__sync_fetch_and_add(dst, *src);
161 		*src = 0;
162 	}
163 }
164 #else
sum_val(uint64_t * dst,uint64_t * src)165 static void sum_val(uint64_t *dst, uint64_t *src)
166 {
167 	if (*src) {
168 		*dst += *src;
169 		*src = 0;
170 	}
171 }
172 #endif
173 
pthread_double_unlock(pthread_mutex_t * lock1,pthread_mutex_t * lock2)174 static void pthread_double_unlock(pthread_mutex_t *lock1,
175 				  pthread_mutex_t *lock2)
176 {
177 #ifndef CONFIG_SFAA
178 	pthread_mutex_unlock(lock1);
179 	pthread_mutex_unlock(lock2);
180 #endif
181 }
182 
pthread_double_lock(pthread_mutex_t * lock1,pthread_mutex_t * lock2)183 static void pthread_double_lock(pthread_mutex_t *lock1, pthread_mutex_t *lock2)
184 {
185 #ifndef CONFIG_SFAA
186 	if (lock1 < lock2) {
187 		pthread_mutex_lock(lock1);
188 		pthread_mutex_lock(lock2);
189 	} else {
190 		pthread_mutex_lock(lock2);
191 		pthread_mutex_lock(lock1);
192 	}
193 #endif
194 }
195 
sum_ddir(struct thread_data * dst,struct thread_data * src,enum fio_ddir ddir)196 static void sum_ddir(struct thread_data *dst, struct thread_data *src,
197 		     enum fio_ddir ddir)
198 {
199 	pthread_double_lock(&dst->io_wq.stat_lock, &src->io_wq.stat_lock);
200 
201 	sum_val(&dst->io_bytes[ddir], &src->io_bytes[ddir]);
202 	sum_val(&dst->io_blocks[ddir], &src->io_blocks[ddir]);
203 	sum_val(&dst->this_io_blocks[ddir], &src->this_io_blocks[ddir]);
204 	sum_val(&dst->this_io_bytes[ddir], &src->this_io_bytes[ddir]);
205 	sum_val(&dst->bytes_done[ddir], &src->bytes_done[ddir]);
206 
207 	pthread_double_unlock(&dst->io_wq.stat_lock, &src->io_wq.stat_lock);
208 }
209 
io_workqueue_update_acct_fn(struct submit_worker * sw)210 static void io_workqueue_update_acct_fn(struct submit_worker *sw)
211 {
212 	struct thread_data *src = sw->priv;
213 	struct thread_data *dst = sw->wq->td;
214 
215 	if (td_read(src))
216 		sum_ddir(dst, src, DDIR_READ);
217 	if (td_write(src))
218 		sum_ddir(dst, src, DDIR_WRITE);
219 	if (td_trim(src))
220 		sum_ddir(dst, src, DDIR_TRIM);
221 
222 }
223 
224 static struct workqueue_ops rated_wq_ops = {
225 	.fn			= io_workqueue_fn,
226 	.pre_sleep_flush_fn	= io_workqueue_pre_sleep_flush_fn,
227 	.pre_sleep_fn		= io_workqueue_pre_sleep_fn,
228 	.update_acct_fn		= io_workqueue_update_acct_fn,
229 	.alloc_worker_fn	= io_workqueue_alloc_fn,
230 	.free_worker_fn		= io_workqueue_free_fn,
231 	.init_worker_fn		= io_workqueue_init_worker_fn,
232 	.exit_worker_fn		= io_workqueue_exit_worker_fn,
233 };
234 
rate_submit_init(struct thread_data * td,struct sk_out * sk_out)235 int rate_submit_init(struct thread_data *td, struct sk_out *sk_out)
236 {
237 	if (td->o.io_submit_mode != IO_MODE_OFFLOAD)
238 		return 0;
239 
240 	return workqueue_init(td, &td->io_wq, &rated_wq_ops, td->o.iodepth, sk_out);
241 }
242 
rate_submit_exit(struct thread_data * td)243 void rate_submit_exit(struct thread_data *td)
244 {
245 	if (td->o.io_submit_mode != IO_MODE_OFFLOAD)
246 		return;
247 
248 	workqueue_exit(&td->io_wq);
249 }
250