1 // Copyright 2011 Google Inc. All Rights Reserved.
2 //
3 // This code is licensed under the same terms as WebM:
4 // Software License Agreement: http://www.webmproject.org/license/software/
5 // Additional IP Rights Grant: http://www.webmproject.org/license/additional/
6 // -----------------------------------------------------------------------------
7 //
8 // Multi-threaded worker
9 //
10 // Author: Skal (pascal.massimino@gmail.com)
11
12 #include <assert.h>
13 #include <string.h> // for memset()
14 #include "./thread.h"
15
16 #if defined(__cplusplus) || defined(c_plusplus)
17 extern "C" {
18 #endif
19
20 #ifdef WEBP_USE_THREAD
21
22 #if defined(_WIN32)
23
24 //------------------------------------------------------------------------------
25 // simplistic pthread emulation layer
26
27 #include <process.h>
28
29 // _beginthreadex requires __stdcall
30 #define THREADFN unsigned int __stdcall
31 #define THREAD_RETURN(val) (unsigned int)((DWORD_PTR)val)
32
pthread_create(pthread_t * const thread,const void * attr,unsigned int (__stdcall * start)(void *),void * arg)33 static int pthread_create(pthread_t* const thread, const void* attr,
34 unsigned int (__stdcall *start)(void*), void* arg) {
35 (void)attr;
36 *thread = (pthread_t)_beginthreadex(NULL, /* void *security */
37 0, /* unsigned stack_size */
38 start,
39 arg,
40 0, /* unsigned initflag */
41 NULL); /* unsigned *thrdaddr */
42 if (*thread == NULL) return 1;
43 SetThreadPriority(*thread, THREAD_PRIORITY_ABOVE_NORMAL);
44 return 0;
45 }
46
pthread_join(pthread_t thread,void ** value_ptr)47 static int pthread_join(pthread_t thread, void** value_ptr) {
48 (void)value_ptr;
49 return (WaitForSingleObject(thread, INFINITE) != WAIT_OBJECT_0 ||
50 CloseHandle(thread) == 0);
51 }
52
53 // Mutex
pthread_mutex_init(pthread_mutex_t * const mutex,void * mutexattr)54 static int pthread_mutex_init(pthread_mutex_t* const mutex, void* mutexattr) {
55 (void)mutexattr;
56 InitializeCriticalSection(mutex);
57 return 0;
58 }
59
pthread_mutex_lock(pthread_mutex_t * const mutex)60 static int pthread_mutex_lock(pthread_mutex_t* const mutex) {
61 EnterCriticalSection(mutex);
62 return 0;
63 }
64
pthread_mutex_unlock(pthread_mutex_t * const mutex)65 static int pthread_mutex_unlock(pthread_mutex_t* const mutex) {
66 LeaveCriticalSection(mutex);
67 return 0;
68 }
69
pthread_mutex_destroy(pthread_mutex_t * const mutex)70 static int pthread_mutex_destroy(pthread_mutex_t* const mutex) {
71 DeleteCriticalSection(mutex);
72 return 0;
73 }
74
75 // Condition
pthread_cond_destroy(pthread_cond_t * const condition)76 static int pthread_cond_destroy(pthread_cond_t* const condition) {
77 int ok = 1;
78 ok &= (CloseHandle(condition->waiting_sem_) != 0);
79 ok &= (CloseHandle(condition->received_sem_) != 0);
80 ok &= (CloseHandle(condition->signal_event_) != 0);
81 return !ok;
82 }
83
pthread_cond_init(pthread_cond_t * const condition,void * cond_attr)84 static int pthread_cond_init(pthread_cond_t* const condition, void* cond_attr) {
85 (void)cond_attr;
86 condition->waiting_sem_ = CreateSemaphore(NULL, 0, 1, NULL);
87 condition->received_sem_ = CreateSemaphore(NULL, 0, 1, NULL);
88 condition->signal_event_ = CreateEvent(NULL, FALSE, FALSE, NULL);
89 if (condition->waiting_sem_ == NULL ||
90 condition->received_sem_ == NULL ||
91 condition->signal_event_ == NULL) {
92 pthread_cond_destroy(condition);
93 return 1;
94 }
95 return 0;
96 }
97
pthread_cond_signal(pthread_cond_t * const condition)98 static int pthread_cond_signal(pthread_cond_t* const condition) {
99 int ok = 1;
100 if (WaitForSingleObject(condition->waiting_sem_, 0) == WAIT_OBJECT_0) {
101 // a thread is waiting in pthread_cond_wait: allow it to be notified
102 ok = SetEvent(condition->signal_event_);
103 // wait until the event is consumed so the signaler cannot consume
104 // the event via its own pthread_cond_wait.
105 ok &= (WaitForSingleObject(condition->received_sem_, INFINITE) !=
106 WAIT_OBJECT_0);
107 }
108 return !ok;
109 }
110
pthread_cond_wait(pthread_cond_t * const condition,pthread_mutex_t * const mutex)111 static int pthread_cond_wait(pthread_cond_t* const condition,
112 pthread_mutex_t* const mutex) {
113 int ok;
114 // note that there is a consumer available so the signal isn't dropped in
115 // pthread_cond_signal
116 if (!ReleaseSemaphore(condition->waiting_sem_, 1, NULL))
117 return 1;
118 // now unlock the mutex so pthread_cond_signal may be issued
119 pthread_mutex_unlock(mutex);
120 ok = (WaitForSingleObject(condition->signal_event_, INFINITE) ==
121 WAIT_OBJECT_0);
122 ok &= ReleaseSemaphore(condition->received_sem_, 1, NULL);
123 pthread_mutex_lock(mutex);
124 return !ok;
125 }
126
127 #else // _WIN32
128 # define THREADFN void*
129 # define THREAD_RETURN(val) val
130 #endif
131
132 //------------------------------------------------------------------------------
133
WebPWorkerThreadLoop(void * ptr)134 static THREADFN WebPWorkerThreadLoop(void *ptr) { // thread loop
135 WebPWorker* const worker = (WebPWorker*)ptr;
136 int done = 0;
137 while (!done) {
138 pthread_mutex_lock(&worker->mutex_);
139 while (worker->status_ == OK) { // wait in idling mode
140 pthread_cond_wait(&worker->condition_, &worker->mutex_);
141 }
142 if (worker->status_ == WORK) {
143 if (worker->hook) {
144 worker->had_error |= !worker->hook(worker->data1, worker->data2);
145 }
146 worker->status_ = OK;
147 } else if (worker->status_ == NOT_OK) { // finish the worker
148 done = 1;
149 }
150 // signal to the main thread that we're done (for Sync())
151 pthread_cond_signal(&worker->condition_);
152 pthread_mutex_unlock(&worker->mutex_);
153 }
154 return THREAD_RETURN(NULL); // Thread is finished
155 }
156
157 // main thread state control
WebPWorkerChangeState(WebPWorker * const worker,WebPWorkerStatus new_status)158 static void WebPWorkerChangeState(WebPWorker* const worker,
159 WebPWorkerStatus new_status) {
160 // no-op when attempting to change state on a thread that didn't come up
161 if (worker->status_ < OK) return;
162
163 pthread_mutex_lock(&worker->mutex_);
164 // wait for the worker to finish
165 while (worker->status_ != OK) {
166 pthread_cond_wait(&worker->condition_, &worker->mutex_);
167 }
168 // assign new status and release the working thread if needed
169 if (new_status != OK) {
170 worker->status_ = new_status;
171 pthread_cond_signal(&worker->condition_);
172 }
173 pthread_mutex_unlock(&worker->mutex_);
174 }
175
176 #endif
177
178 //------------------------------------------------------------------------------
179
WebPWorkerInit(WebPWorker * const worker)180 void WebPWorkerInit(WebPWorker* const worker) {
181 memset(worker, 0, sizeof(*worker));
182 worker->status_ = NOT_OK;
183 }
184
WebPWorkerSync(WebPWorker * const worker)185 int WebPWorkerSync(WebPWorker* const worker) {
186 #ifdef WEBP_USE_THREAD
187 WebPWorkerChangeState(worker, OK);
188 #endif
189 assert(worker->status_ <= OK);
190 return !worker->had_error;
191 }
192
WebPWorkerReset(WebPWorker * const worker)193 int WebPWorkerReset(WebPWorker* const worker) {
194 int ok = 1;
195 worker->had_error = 0;
196 if (worker->status_ < OK) {
197 #ifdef WEBP_USE_THREAD
198 if (pthread_mutex_init(&worker->mutex_, NULL) ||
199 pthread_cond_init(&worker->condition_, NULL)) {
200 return 0;
201 }
202 pthread_mutex_lock(&worker->mutex_);
203 ok = !pthread_create(&worker->thread_, NULL, WebPWorkerThreadLoop, worker);
204 if (ok) worker->status_ = OK;
205 pthread_mutex_unlock(&worker->mutex_);
206 #else
207 worker->status_ = OK;
208 #endif
209 } else if (worker->status_ > OK) {
210 ok = WebPWorkerSync(worker);
211 }
212 assert(!ok || (worker->status_ == OK));
213 return ok;
214 }
215
WebPWorkerLaunch(WebPWorker * const worker)216 void WebPWorkerLaunch(WebPWorker* const worker) {
217 #ifdef WEBP_USE_THREAD
218 WebPWorkerChangeState(worker, WORK);
219 #else
220 if (worker->hook)
221 worker->had_error |= !worker->hook(worker->data1, worker->data2);
222 #endif
223 }
224
WebPWorkerEnd(WebPWorker * const worker)225 void WebPWorkerEnd(WebPWorker* const worker) {
226 if (worker->status_ >= OK) {
227 #ifdef WEBP_USE_THREAD
228 WebPWorkerChangeState(worker, NOT_OK);
229 pthread_join(worker->thread_, NULL);
230 pthread_mutex_destroy(&worker->mutex_);
231 pthread_cond_destroy(&worker->condition_);
232 #else
233 worker->status_ = NOT_OK;
234 #endif
235 }
236 assert(worker->status_ == NOT_OK);
237 }
238
239 //------------------------------------------------------------------------------
240
241 #if defined(__cplusplus) || defined(c_plusplus)
242 } // extern "C"
243 #endif
244