• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2011 Google Inc. All Rights Reserved.
2 //
3 // This code is licensed under the same terms as WebM:
4 //  Software License Agreement:  http://www.webmproject.org/license/software/
5 //  Additional IP Rights Grant:  http://www.webmproject.org/license/additional/
6 // -----------------------------------------------------------------------------
7 //
8 // Multi-threaded worker
9 //
10 // Author: Skal (pascal.massimino@gmail.com)
11 
12 #include <assert.h>
13 #include <string.h>   // for memset()
14 #include "./thread.h"
15 
16 #if defined(__cplusplus) || defined(c_plusplus)
17 extern "C" {
18 #endif
19 
20 #ifdef WEBP_USE_THREAD
21 
22 #if defined(_WIN32)
23 
24 //------------------------------------------------------------------------------
25 // simplistic pthread emulation layer
26 
27 #include <process.h>
28 
29 // _beginthreadex requires __stdcall
30 #define THREADFN unsigned int __stdcall
31 #define THREAD_RETURN(val) (unsigned int)((DWORD_PTR)val)
32 
pthread_create(pthread_t * const thread,const void * attr,unsigned int (__stdcall * start)(void *),void * arg)33 static int pthread_create(pthread_t* const thread, const void* attr,
34                           unsigned int (__stdcall *start)(void*), void* arg) {
35   (void)attr;
36   *thread = (pthread_t)_beginthreadex(NULL,   /* void *security */
37                                       0,      /* unsigned stack_size */
38                                       start,
39                                       arg,
40                                       0,      /* unsigned initflag */
41                                       NULL);  /* unsigned *thrdaddr */
42   if (*thread == NULL) return 1;
43   SetThreadPriority(*thread, THREAD_PRIORITY_ABOVE_NORMAL);
44   return 0;
45 }
46 
pthread_join(pthread_t thread,void ** value_ptr)47 static int pthread_join(pthread_t thread, void** value_ptr) {
48   (void)value_ptr;
49   return (WaitForSingleObject(thread, INFINITE) != WAIT_OBJECT_0 ||
50           CloseHandle(thread) == 0);
51 }
52 
53 // Mutex
pthread_mutex_init(pthread_mutex_t * const mutex,void * mutexattr)54 static int pthread_mutex_init(pthread_mutex_t* const mutex, void* mutexattr) {
55   (void)mutexattr;
56   InitializeCriticalSection(mutex);
57   return 0;
58 }
59 
pthread_mutex_lock(pthread_mutex_t * const mutex)60 static int pthread_mutex_lock(pthread_mutex_t* const mutex) {
61   EnterCriticalSection(mutex);
62   return 0;
63 }
64 
pthread_mutex_unlock(pthread_mutex_t * const mutex)65 static int pthread_mutex_unlock(pthread_mutex_t* const mutex) {
66   LeaveCriticalSection(mutex);
67   return 0;
68 }
69 
pthread_mutex_destroy(pthread_mutex_t * const mutex)70 static int pthread_mutex_destroy(pthread_mutex_t* const mutex) {
71   DeleteCriticalSection(mutex);
72   return 0;
73 }
74 
75 // Condition
pthread_cond_destroy(pthread_cond_t * const condition)76 static int pthread_cond_destroy(pthread_cond_t* const condition) {
77   int ok = 1;
78   ok &= (CloseHandle(condition->waiting_sem_) != 0);
79   ok &= (CloseHandle(condition->received_sem_) != 0);
80   ok &= (CloseHandle(condition->signal_event_) != 0);
81   return !ok;
82 }
83 
pthread_cond_init(pthread_cond_t * const condition,void * cond_attr)84 static int pthread_cond_init(pthread_cond_t* const condition, void* cond_attr) {
85   (void)cond_attr;
86   condition->waiting_sem_ = CreateSemaphore(NULL, 0, 1, NULL);
87   condition->received_sem_ = CreateSemaphore(NULL, 0, 1, NULL);
88   condition->signal_event_ = CreateEvent(NULL, FALSE, FALSE, NULL);
89   if (condition->waiting_sem_ == NULL ||
90       condition->received_sem_ == NULL ||
91       condition->signal_event_ == NULL) {
92     pthread_cond_destroy(condition);
93     return 1;
94   }
95   return 0;
96 }
97 
pthread_cond_signal(pthread_cond_t * const condition)98 static int pthread_cond_signal(pthread_cond_t* const condition) {
99   int ok = 1;
100   if (WaitForSingleObject(condition->waiting_sem_, 0) == WAIT_OBJECT_0) {
101     // a thread is waiting in pthread_cond_wait: allow it to be notified
102     ok = SetEvent(condition->signal_event_);
103     // wait until the event is consumed so the signaler cannot consume
104     // the event via its own pthread_cond_wait.
105     ok &= (WaitForSingleObject(condition->received_sem_, INFINITE) !=
106            WAIT_OBJECT_0);
107   }
108   return !ok;
109 }
110 
pthread_cond_wait(pthread_cond_t * const condition,pthread_mutex_t * const mutex)111 static int pthread_cond_wait(pthread_cond_t* const condition,
112                              pthread_mutex_t* const mutex) {
113   int ok;
114   // note that there is a consumer available so the signal isn't dropped in
115   // pthread_cond_signal
116   if (!ReleaseSemaphore(condition->waiting_sem_, 1, NULL))
117     return 1;
118   // now unlock the mutex so pthread_cond_signal may be issued
119   pthread_mutex_unlock(mutex);
120   ok = (WaitForSingleObject(condition->signal_event_, INFINITE) ==
121         WAIT_OBJECT_0);
122   ok &= ReleaseSemaphore(condition->received_sem_, 1, NULL);
123   pthread_mutex_lock(mutex);
124   return !ok;
125 }
126 
127 #else  // _WIN32
128 # define THREADFN void*
129 # define THREAD_RETURN(val) val
130 #endif
131 
132 //------------------------------------------------------------------------------
133 
WebPWorkerThreadLoop(void * ptr)134 static THREADFN WebPWorkerThreadLoop(void *ptr) {    // thread loop
135   WebPWorker* const worker = (WebPWorker*)ptr;
136   int done = 0;
137   while (!done) {
138     pthread_mutex_lock(&worker->mutex_);
139     while (worker->status_ == OK) {   // wait in idling mode
140       pthread_cond_wait(&worker->condition_, &worker->mutex_);
141     }
142     if (worker->status_ == WORK) {
143       if (worker->hook) {
144         worker->had_error |= !worker->hook(worker->data1, worker->data2);
145       }
146       worker->status_ = OK;
147     } else if (worker->status_ == NOT_OK) {   // finish the worker
148       done = 1;
149     }
150     // signal to the main thread that we're done (for Sync())
151     pthread_cond_signal(&worker->condition_);
152     pthread_mutex_unlock(&worker->mutex_);
153   }
154   return THREAD_RETURN(NULL);    // Thread is finished
155 }
156 
157 // main thread state control
WebPWorkerChangeState(WebPWorker * const worker,WebPWorkerStatus new_status)158 static void WebPWorkerChangeState(WebPWorker* const worker,
159                                   WebPWorkerStatus new_status) {
160   // no-op when attempting to change state on a thread that didn't come up
161   if (worker->status_ < OK) return;
162 
163   pthread_mutex_lock(&worker->mutex_);
164   // wait for the worker to finish
165   while (worker->status_ != OK) {
166     pthread_cond_wait(&worker->condition_, &worker->mutex_);
167   }
168   // assign new status and release the working thread if needed
169   if (new_status != OK) {
170     worker->status_ = new_status;
171     pthread_cond_signal(&worker->condition_);
172   }
173   pthread_mutex_unlock(&worker->mutex_);
174 }
175 
176 #endif
177 
178 //------------------------------------------------------------------------------
179 
WebPWorkerInit(WebPWorker * const worker)180 void WebPWorkerInit(WebPWorker* const worker) {
181   memset(worker, 0, sizeof(*worker));
182   worker->status_ = NOT_OK;
183 }
184 
WebPWorkerSync(WebPWorker * const worker)185 int WebPWorkerSync(WebPWorker* const worker) {
186 #ifdef WEBP_USE_THREAD
187   WebPWorkerChangeState(worker, OK);
188 #endif
189   assert(worker->status_ <= OK);
190   return !worker->had_error;
191 }
192 
WebPWorkerReset(WebPWorker * const worker)193 int WebPWorkerReset(WebPWorker* const worker) {
194   int ok = 1;
195   worker->had_error = 0;
196   if (worker->status_ < OK) {
197 #ifdef WEBP_USE_THREAD
198     if (pthread_mutex_init(&worker->mutex_, NULL) ||
199         pthread_cond_init(&worker->condition_, NULL)) {
200       return 0;
201     }
202     pthread_mutex_lock(&worker->mutex_);
203     ok = !pthread_create(&worker->thread_, NULL, WebPWorkerThreadLoop, worker);
204     if (ok) worker->status_ = OK;
205     pthread_mutex_unlock(&worker->mutex_);
206 #else
207     worker->status_ = OK;
208 #endif
209   } else if (worker->status_ > OK) {
210     ok = WebPWorkerSync(worker);
211   }
212   assert(!ok || (worker->status_ == OK));
213   return ok;
214 }
215 
WebPWorkerLaunch(WebPWorker * const worker)216 void WebPWorkerLaunch(WebPWorker* const worker) {
217 #ifdef WEBP_USE_THREAD
218   WebPWorkerChangeState(worker, WORK);
219 #else
220   if (worker->hook)
221     worker->had_error |= !worker->hook(worker->data1, worker->data2);
222 #endif
223 }
224 
WebPWorkerEnd(WebPWorker * const worker)225 void WebPWorkerEnd(WebPWorker* const worker) {
226   if (worker->status_ >= OK) {
227 #ifdef WEBP_USE_THREAD
228     WebPWorkerChangeState(worker, NOT_OK);
229     pthread_join(worker->thread_, NULL);
230     pthread_mutex_destroy(&worker->mutex_);
231     pthread_cond_destroy(&worker->condition_);
232 #else
233     worker->status_ = NOT_OK;
234 #endif
235   }
236   assert(worker->status_ == NOT_OK);
237 }
238 
239 //------------------------------------------------------------------------------
240 
241 #if defined(__cplusplus) || defined(c_plusplus)
242 }    // extern "C"
243 #endif
244