1 /* Test program that performs producer-consumer style communication through
2 * a circular buffer. This test program is a slightly modified version of the
3 * test program made available by Miguel Ojeda
4 * -- see also http://article.gmane.org/gmane.comp.debugging.valgrind/8782.
5 */
6
7
8 #include <stdio.h>
9 #include <string.h>
10 #include <stdlib.h>
11 #include <unistd.h>
12 #include <time.h>
13 #include <pthread.h>
14 #include <semaphore.h>
15 #include <fcntl.h>
16 #include "../../config.h"
17
18
19 /** gcc versions 4.1.0 and later have support for atomic builtins. */
20
21 #ifndef HAVE_BUILTIN_ATOMIC
22 #error Sorry, but this test program can only be compiled by a compiler that\
23 has built-in functions for atomic memory access.
24 #endif
25
26
27 #define BUFFER_MAX (2)
28 #define DATA_SEMAPHORE_NAME "cb-data-semaphore"
29 #define FREE_SEMAPHORE_NAME "cb-free-semaphore"
30
31
32 typedef int data_t;
33
34 typedef struct {
35 /* Counting semaphore representing the number of data items in the buffer. */
36 sem_t* data;
37 /* Counting semaphore representing the number of free elements. */
38 sem_t* free;
39 /* Position where a new elements should be written. */
40 int in;
41 /* Position from where an element can be removed. */
42 int out;
43 /* Mutex that protects 'in'. */
44 pthread_mutex_t mutex_in;
45 /* Mutex that protects 'out'. */
46 pthread_mutex_t mutex_out;
47 /* Data buffer. */
48 data_t buffer[BUFFER_MAX];
49 } buffer_t;
50
51 static int quiet = 0;
52 static int use_locking = 1;
53
54 static __inline__
fetch_and_add(int * p,int i)55 int fetch_and_add(int* p, int i)
56 {
57 return __sync_fetch_and_add(p, i);
58 }
59
create_semaphore(const char * const name,const int value)60 static sem_t* create_semaphore(const char* const name, const int value)
61 {
62 #ifdef VGO_darwin
63 char name_and_pid[32];
64 snprintf(name_and_pid, sizeof(name_and_pid), "%s-%d", name, getpid());
65 sem_t* p = sem_open(name_and_pid, O_CREAT | O_EXCL, 0600, value);
66 if (p == SEM_FAILED) {
67 perror("sem_open");
68 return NULL;
69 }
70 return p;
71 #else
72 sem_t* p = malloc(sizeof(*p));
73 if (p)
74 sem_init(p, 0, value);
75 return p;
76 #endif
77 }
78
destroy_semaphore(const char * const name,sem_t * p)79 static void destroy_semaphore(const char* const name, sem_t* p)
80 {
81 #ifdef VGO_darwin
82 sem_close(p);
83 sem_unlink(name);
84 #else
85 sem_destroy(p);
86 free(p);
87 #endif
88 }
89
buffer_init(buffer_t * b)90 static void buffer_init(buffer_t * b)
91 {
92 b->data = create_semaphore(DATA_SEMAPHORE_NAME, 0);
93 b->free = create_semaphore(FREE_SEMAPHORE_NAME, BUFFER_MAX);
94
95 pthread_mutex_init(&b->mutex_in, NULL);
96 pthread_mutex_init(&b->mutex_out, NULL);
97
98 b->in = 0;
99 b->out = 0;
100 }
101
buffer_recv(buffer_t * b,data_t * d)102 static void buffer_recv(buffer_t* b, data_t* d)
103 {
104 int out;
105 sem_wait(b->data);
106 if (use_locking)
107 pthread_mutex_lock(&b->mutex_out);
108 out = fetch_and_add(&b->out, 1);
109 if (out >= BUFFER_MAX)
110 {
111 fetch_and_add(&b->out, -BUFFER_MAX);
112 out -= BUFFER_MAX;
113 }
114 *d = b->buffer[out];
115 if (use_locking)
116 pthread_mutex_unlock(&b->mutex_out);
117 if (! quiet)
118 {
119 printf("received %d from buffer[%d]\n", *d, out);
120 fflush(stdout);
121 }
122 sem_post(b->free);
123 }
124
buffer_send(buffer_t * b,data_t * d)125 static void buffer_send(buffer_t* b, data_t* d)
126 {
127 int in;
128 sem_wait(b->free);
129 if (use_locking)
130 pthread_mutex_lock(&b->mutex_in);
131 in = fetch_and_add(&b->in, 1);
132 if (in >= BUFFER_MAX)
133 {
134 fetch_and_add(&b->in, -BUFFER_MAX);
135 in -= BUFFER_MAX;
136 }
137 b->buffer[in] = *d;
138 if (use_locking)
139 pthread_mutex_unlock(&b->mutex_in);
140 if (! quiet)
141 {
142 printf("sent %d to buffer[%d]\n", *d, in);
143 fflush(stdout);
144 }
145 sem_post(b->data);
146 }
147
buffer_destroy(buffer_t * b)148 static void buffer_destroy(buffer_t* b)
149 {
150 destroy_semaphore(DATA_SEMAPHORE_NAME, b->data);
151 destroy_semaphore(FREE_SEMAPHORE_NAME, b->free);
152
153 pthread_mutex_destroy(&b->mutex_in);
154 pthread_mutex_destroy(&b->mutex_out);
155 }
156
157 static buffer_t b;
158
producer(int * id)159 static void producer(int* id)
160 {
161 buffer_send(&b, id);
162 pthread_exit(NULL);
163 }
164
165 #define MAXSLEEP (100 * 1000)
166
consumer(int * id)167 static void consumer(int* id)
168 {
169 int d;
170 usleep(rand() % MAXSLEEP);
171 buffer_recv(&b, &d);
172 if (! quiet)
173 {
174 printf("%i: %i\n", *id, d);
175 fflush(stdout);
176 }
177 pthread_exit(NULL);
178 }
179
180 #define THREADS (10)
181
main(int argc,char ** argv)182 int main(int argc, char** argv)
183 {
184 pthread_t producers[THREADS];
185 pthread_t consumers[THREADS];
186 int thread_arg[THREADS];
187 int i;
188 int optchar;
189
190 while ((optchar = getopt(argc, argv, "nq")) != EOF)
191 {
192 switch (optchar)
193 {
194 case 'n':
195 use_locking = 0;
196 break;
197 case 'q':
198 quiet = 1;
199 break;
200 }
201 }
202
203 srand(time(NULL));
204
205 buffer_init(&b);
206
207 for (i = 0; i < THREADS; ++i)
208 {
209 thread_arg[i] = i;
210 pthread_create(producers + i, NULL,
211 (void * (*)(void *)) producer, &thread_arg[i]);
212 }
213
214 for (i = 0; i < THREADS; ++i)
215 pthread_create(consumers + i, NULL,
216 (void * (*)(void *)) consumer, &thread_arg[i]);
217
218 for (i = 0; i < THREADS; ++i)
219 {
220 pthread_join(producers[i], NULL);
221 pthread_join(consumers[i], NULL);
222 }
223
224 buffer_destroy(&b);
225
226 return 0;
227 }
228