• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Basic workqueue like code, that sets up a thread and allows async
3  * processing of some sort. Could be extended to allow for multiple
4  * worker threads. But right now fio associates one of this per IO
5  * thread, so should be enough to have just a single thread doing the
6  * work.
7  */
8 #include <stdio.h>
9 #include <stdlib.h>
10 #include <stdarg.h>
11 #include <unistd.h>
12 #include <errno.h>
13 #include <pthread.h>
14 #include <string.h>
15 
16 #include "../smalloc.h"
17 #include "../log.h"
18 #include "tp.h"
19 
tp_flush_work(struct flist_head * list)20 static void tp_flush_work(struct flist_head *list)
21 {
22 	struct tp_work *work;
23 
24 	while (!flist_empty(list)) {
25 		int prio;
26 
27 		work = flist_entry(list->next, struct tp_work, list);
28 		flist_del(&work->list);
29 
30 		prio = work->prio;
31 		if (nice(prio) < 0)
32 			log_err("fio: nice %s\n", strerror(errno));
33 
34 		work->fn(work);
35 
36 		if (nice(prio) < 0)
37 			log_err("fio: nice %s\n", strerror(errno));
38 	}
39 }
40 
tp_thread(void * data)41 static void *tp_thread(void *data)
42 {
43 	struct tp_data *tdat = data;
44 	struct flist_head work_list;
45 
46 	INIT_FLIST_HEAD(&work_list);
47 
48 	while (1) {
49 		pthread_mutex_lock(&tdat->lock);
50 
51 		if (!tdat->thread_exit && flist_empty(&tdat->work))
52 			pthread_cond_wait(&tdat->cv, &tdat->lock);
53 
54 		if (!flist_empty(&tdat->work))
55 			flist_splice_tail_init(&tdat->work, &work_list);
56 
57 		pthread_mutex_unlock(&tdat->lock);
58 
59 		if (flist_empty(&work_list)) {
60 			if (tdat->thread_exit)
61 				break;
62 			continue;
63 		}
64 
65 		tp_flush_work(&work_list);
66 	}
67 
68 	return NULL;
69 }
70 
tp_queue_work(struct tp_data * tdat,struct tp_work * work)71 void tp_queue_work(struct tp_data *tdat, struct tp_work *work)
72 {
73 	work->done = 0;
74 
75 	pthread_mutex_lock(&tdat->lock);
76 	flist_add_tail(&work->list, &tdat->work);
77 	pthread_mutex_unlock(&tdat->lock);
78 
79 	pthread_cond_signal(&tdat->cv);
80 }
81 
tp_init(struct tp_data ** tdatp)82 void tp_init(struct tp_data **tdatp)
83 {
84 	struct tp_data *tdat;
85 	int ret;
86 
87 	if (*tdatp)
88 		return;
89 
90 	*tdatp = tdat = smalloc(sizeof(*tdat));
91 	pthread_mutex_init(&tdat->lock, NULL);
92 	INIT_FLIST_HEAD(&tdat->work);
93 	pthread_cond_init(&tdat->cv, NULL);
94 	pthread_cond_init(&tdat->sleep_cv, NULL);
95 
96 	ret = pthread_create(&tdat->thread, NULL, tp_thread, tdat);
97 	if (ret)
98 		log_err("fio: failed to create tp thread\n");
99 }
100 
tp_exit(struct tp_data ** tdatp)101 void tp_exit(struct tp_data **tdatp)
102 {
103 	struct tp_data *tdat = *tdatp;
104 	void *ret;
105 
106 	if (!tdat)
107 		return;
108 
109 	pthread_mutex_lock(&tdat->lock);
110 	tdat->thread_exit = 1;
111 	pthread_mutex_unlock(&tdat->lock);
112 
113 	pthread_cond_signal(&tdat->cv);
114 
115 	pthread_join(tdat->thread, &ret);
116 
117 	sfree(tdat);
118 	*tdatp = NULL;
119 }
120