1 // Copyright 2011 Google Inc. All Rights Reserved.
2 //
3 // This code is licensed under the same terms as WebM:
4 // Software License Agreement: http://www.webmproject.org/license/software/
5 // Additional IP Rights Grant: http://www.webmproject.org/license/additional/
6 // -----------------------------------------------------------------------------
7 //
8 // Multi-threaded worker
9 //
10 // Author: Skal (pascal.massimino@gmail.com)
11
12 #ifdef HAVE_CONFIG_H
13 #include "config.h"
14 #endif
15
16 #include <assert.h>
17 #include <string.h> // for memset()
18 #include "./thread.h"
19
20 #if defined(__cplusplus) || defined(c_plusplus)
21 extern "C" {
22 #endif
23
24 #ifdef WEBP_USE_THREAD
25
26 #if defined(_WIN32)
27
28 //------------------------------------------------------------------------------
29 // simplistic pthread emulation layer
30
31 #include <process.h>
32
33 // _beginthreadex requires __stdcall
34 #define THREADFN unsigned int __stdcall
35 #define THREAD_RETURN(val) (unsigned int)((DWORD_PTR)val)
36
pthread_create(pthread_t * const thread,const void * attr,unsigned int (__stdcall * start)(void *),void * arg)37 static int pthread_create(pthread_t* const thread, const void* attr,
38 unsigned int (__stdcall *start)(void*), void* arg) {
39 (void)attr;
40 *thread = (pthread_t)_beginthreadex(NULL, /* void *security */
41 0, /* unsigned stack_size */
42 start,
43 arg,
44 0, /* unsigned initflag */
45 NULL); /* unsigned *thrdaddr */
46 if (*thread == NULL) return 1;
47 SetThreadPriority(*thread, THREAD_PRIORITY_ABOVE_NORMAL);
48 return 0;
49 }
50
pthread_join(pthread_t thread,void ** value_ptr)51 static int pthread_join(pthread_t thread, void** value_ptr) {
52 (void)value_ptr;
53 return (WaitForSingleObject(thread, INFINITE) != WAIT_OBJECT_0 ||
54 CloseHandle(thread) == 0);
55 }
56
57 // Mutex
pthread_mutex_init(pthread_mutex_t * const mutex,void * mutexattr)58 static int pthread_mutex_init(pthread_mutex_t* const mutex, void* mutexattr) {
59 (void)mutexattr;
60 InitializeCriticalSection(mutex);
61 return 0;
62 }
63
pthread_mutex_lock(pthread_mutex_t * const mutex)64 static int pthread_mutex_lock(pthread_mutex_t* const mutex) {
65 EnterCriticalSection(mutex);
66 return 0;
67 }
68
pthread_mutex_unlock(pthread_mutex_t * const mutex)69 static int pthread_mutex_unlock(pthread_mutex_t* const mutex) {
70 LeaveCriticalSection(mutex);
71 return 0;
72 }
73
pthread_mutex_destroy(pthread_mutex_t * const mutex)74 static int pthread_mutex_destroy(pthread_mutex_t* const mutex) {
75 DeleteCriticalSection(mutex);
76 return 0;
77 }
78
79 // Condition
pthread_cond_destroy(pthread_cond_t * const condition)80 static int pthread_cond_destroy(pthread_cond_t* const condition) {
81 int ok = 1;
82 ok &= (CloseHandle(condition->waiting_sem_) != 0);
83 ok &= (CloseHandle(condition->received_sem_) != 0);
84 ok &= (CloseHandle(condition->signal_event_) != 0);
85 return !ok;
86 }
87
pthread_cond_init(pthread_cond_t * const condition,void * cond_attr)88 static int pthread_cond_init(pthread_cond_t* const condition, void* cond_attr) {
89 (void)cond_attr;
90 condition->waiting_sem_ = CreateSemaphore(NULL, 0, 1, NULL);
91 condition->received_sem_ = CreateSemaphore(NULL, 0, 1, NULL);
92 condition->signal_event_ = CreateEvent(NULL, FALSE, FALSE, NULL);
93 if (condition->waiting_sem_ == NULL ||
94 condition->received_sem_ == NULL ||
95 condition->signal_event_ == NULL) {
96 pthread_cond_destroy(condition);
97 return 1;
98 }
99 return 0;
100 }
101
pthread_cond_signal(pthread_cond_t * const condition)102 static int pthread_cond_signal(pthread_cond_t* const condition) {
103 int ok = 1;
104 if (WaitForSingleObject(condition->waiting_sem_, 0) == WAIT_OBJECT_0) {
105 // a thread is waiting in pthread_cond_wait: allow it to be notified
106 ok = SetEvent(condition->signal_event_);
107 // wait until the event is consumed so the signaler cannot consume
108 // the event via its own pthread_cond_wait.
109 ok &= (WaitForSingleObject(condition->received_sem_, INFINITE) !=
110 WAIT_OBJECT_0);
111 }
112 return !ok;
113 }
114
pthread_cond_wait(pthread_cond_t * const condition,pthread_mutex_t * const mutex)115 static int pthread_cond_wait(pthread_cond_t* const condition,
116 pthread_mutex_t* const mutex) {
117 int ok;
118 // note that there is a consumer available so the signal isn't dropped in
119 // pthread_cond_signal
120 if (!ReleaseSemaphore(condition->waiting_sem_, 1, NULL))
121 return 1;
122 // now unlock the mutex so pthread_cond_signal may be issued
123 pthread_mutex_unlock(mutex);
124 ok = (WaitForSingleObject(condition->signal_event_, INFINITE) ==
125 WAIT_OBJECT_0);
126 ok &= ReleaseSemaphore(condition->received_sem_, 1, NULL);
127 pthread_mutex_lock(mutex);
128 return !ok;
129 }
130
131 #else // _WIN32
132 # define THREADFN void*
133 # define THREAD_RETURN(val) val
134 #endif
135
136 //------------------------------------------------------------------------------
137
WebPWorkerThreadLoop(void * ptr)138 static THREADFN WebPWorkerThreadLoop(void *ptr) { // thread loop
139 WebPWorker* const worker = (WebPWorker*)ptr;
140 int done = 0;
141 while (!done) {
142 pthread_mutex_lock(&worker->mutex_);
143 while (worker->status_ == OK) { // wait in idling mode
144 pthread_cond_wait(&worker->condition_, &worker->mutex_);
145 }
146 if (worker->status_ == WORK) {
147 if (worker->hook) {
148 worker->had_error |= !worker->hook(worker->data1, worker->data2);
149 }
150 worker->status_ = OK;
151 } else if (worker->status_ == NOT_OK) { // finish the worker
152 done = 1;
153 }
154 // signal to the main thread that we're done (for Sync())
155 pthread_cond_signal(&worker->condition_);
156 pthread_mutex_unlock(&worker->mutex_);
157 }
158 return THREAD_RETURN(NULL); // Thread is finished
159 }
160
161 // main thread state control
WebPWorkerChangeState(WebPWorker * const worker,WebPWorkerStatus new_status)162 static void WebPWorkerChangeState(WebPWorker* const worker,
163 WebPWorkerStatus new_status) {
164 // no-op when attempting to change state on a thread that didn't come up
165 if (worker->status_ < OK) return;
166
167 pthread_mutex_lock(&worker->mutex_);
168 // wait for the worker to finish
169 while (worker->status_ != OK) {
170 pthread_cond_wait(&worker->condition_, &worker->mutex_);
171 }
172 // assign new status and release the working thread if needed
173 if (new_status != OK) {
174 worker->status_ = new_status;
175 pthread_cond_signal(&worker->condition_);
176 }
177 pthread_mutex_unlock(&worker->mutex_);
178 }
179
180 #endif
181
182 //------------------------------------------------------------------------------
183
WebPWorkerInit(WebPWorker * const worker)184 void WebPWorkerInit(WebPWorker* const worker) {
185 memset(worker, 0, sizeof(*worker));
186 worker->status_ = NOT_OK;
187 }
188
WebPWorkerSync(WebPWorker * const worker)189 int WebPWorkerSync(WebPWorker* const worker) {
190 #ifdef WEBP_USE_THREAD
191 WebPWorkerChangeState(worker, OK);
192 #endif
193 assert(worker->status_ <= OK);
194 return !worker->had_error;
195 }
196
WebPWorkerReset(WebPWorker * const worker)197 int WebPWorkerReset(WebPWorker* const worker) {
198 int ok = 1;
199 worker->had_error = 0;
200 if (worker->status_ < OK) {
201 #ifdef WEBP_USE_THREAD
202 if (pthread_mutex_init(&worker->mutex_, NULL) ||
203 pthread_cond_init(&worker->condition_, NULL)) {
204 return 0;
205 }
206 pthread_mutex_lock(&worker->mutex_);
207 ok = !pthread_create(&worker->thread_, NULL, WebPWorkerThreadLoop, worker);
208 if (ok) worker->status_ = OK;
209 pthread_mutex_unlock(&worker->mutex_);
210 #else
211 worker->status_ = OK;
212 #endif
213 } else if (worker->status_ > OK) {
214 ok = WebPWorkerSync(worker);
215 }
216 assert(!ok || (worker->status_ == OK));
217 return ok;
218 }
219
WebPWorkerLaunch(WebPWorker * const worker)220 void WebPWorkerLaunch(WebPWorker* const worker) {
221 #ifdef WEBP_USE_THREAD
222 WebPWorkerChangeState(worker, WORK);
223 #else
224 if (worker->hook)
225 worker->had_error |= !worker->hook(worker->data1, worker->data2);
226 #endif
227 }
228
WebPWorkerEnd(WebPWorker * const worker)229 void WebPWorkerEnd(WebPWorker* const worker) {
230 if (worker->status_ >= OK) {
231 #ifdef WEBP_USE_THREAD
232 WebPWorkerChangeState(worker, NOT_OK);
233 pthread_join(worker->thread_, NULL);
234 pthread_mutex_destroy(&worker->mutex_);
235 pthread_cond_destroy(&worker->condition_);
236 #else
237 worker->status_ = NOT_OK;
238 #endif
239 }
240 assert(worker->status_ == NOT_OK);
241 }
242
243 //------------------------------------------------------------------------------
244
245 #if defined(__cplusplus) || defined(c_plusplus)
246 } // extern "C"
247 #endif
248