• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2011 Google Inc. All Rights Reserved.
2 //
3 // This code is licensed under the same terms as WebM:
4 //  Software License Agreement:  http://www.webmproject.org/license/software/
5 //  Additional IP Rights Grant:  http://www.webmproject.org/license/additional/
6 // -----------------------------------------------------------------------------
7 //
8 // Multi-threaded worker
9 //
10 // Author: Skal (pascal.massimino@gmail.com)
11 
12 #ifdef HAVE_CONFIG_H
13 #include "config.h"
14 #endif
15 
16 #include <assert.h>
17 #include <string.h>   // for memset()
18 #include "./thread.h"
19 
20 #if defined(__cplusplus) || defined(c_plusplus)
21 extern "C" {
22 #endif
23 
24 #ifdef WEBP_USE_THREAD
25 
26 #if defined(_WIN32)
27 
28 //------------------------------------------------------------------------------
29 // simplistic pthread emulation layer
30 
31 #include <process.h>
32 
33 // _beginthreadex requires __stdcall
34 #define THREADFN unsigned int __stdcall
35 #define THREAD_RETURN(val) (unsigned int)((DWORD_PTR)val)
36 
pthread_create(pthread_t * const thread,const void * attr,unsigned int (__stdcall * start)(void *),void * arg)37 static int pthread_create(pthread_t* const thread, const void* attr,
38                           unsigned int (__stdcall *start)(void*), void* arg) {
39   (void)attr;
40   *thread = (pthread_t)_beginthreadex(NULL,   /* void *security */
41                                       0,      /* unsigned stack_size */
42                                       start,
43                                       arg,
44                                       0,      /* unsigned initflag */
45                                       NULL);  /* unsigned *thrdaddr */
46   if (*thread == NULL) return 1;
47   SetThreadPriority(*thread, THREAD_PRIORITY_ABOVE_NORMAL);
48   return 0;
49 }
50 
pthread_join(pthread_t thread,void ** value_ptr)51 static int pthread_join(pthread_t thread, void** value_ptr) {
52   (void)value_ptr;
53   return (WaitForSingleObject(thread, INFINITE) != WAIT_OBJECT_0 ||
54           CloseHandle(thread) == 0);
55 }
56 
57 // Mutex
pthread_mutex_init(pthread_mutex_t * const mutex,void * mutexattr)58 static int pthread_mutex_init(pthread_mutex_t* const mutex, void* mutexattr) {
59   (void)mutexattr;
60   InitializeCriticalSection(mutex);
61   return 0;
62 }
63 
pthread_mutex_lock(pthread_mutex_t * const mutex)64 static int pthread_mutex_lock(pthread_mutex_t* const mutex) {
65   EnterCriticalSection(mutex);
66   return 0;
67 }
68 
pthread_mutex_unlock(pthread_mutex_t * const mutex)69 static int pthread_mutex_unlock(pthread_mutex_t* const mutex) {
70   LeaveCriticalSection(mutex);
71   return 0;
72 }
73 
pthread_mutex_destroy(pthread_mutex_t * const mutex)74 static int pthread_mutex_destroy(pthread_mutex_t* const mutex) {
75   DeleteCriticalSection(mutex);
76   return 0;
77 }
78 
79 // Condition
pthread_cond_destroy(pthread_cond_t * const condition)80 static int pthread_cond_destroy(pthread_cond_t* const condition) {
81   int ok = 1;
82   ok &= (CloseHandle(condition->waiting_sem_) != 0);
83   ok &= (CloseHandle(condition->received_sem_) != 0);
84   ok &= (CloseHandle(condition->signal_event_) != 0);
85   return !ok;
86 }
87 
pthread_cond_init(pthread_cond_t * const condition,void * cond_attr)88 static int pthread_cond_init(pthread_cond_t* const condition, void* cond_attr) {
89   (void)cond_attr;
90   condition->waiting_sem_ = CreateSemaphore(NULL, 0, 1, NULL);
91   condition->received_sem_ = CreateSemaphore(NULL, 0, 1, NULL);
92   condition->signal_event_ = CreateEvent(NULL, FALSE, FALSE, NULL);
93   if (condition->waiting_sem_ == NULL ||
94       condition->received_sem_ == NULL ||
95       condition->signal_event_ == NULL) {
96     pthread_cond_destroy(condition);
97     return 1;
98   }
99   return 0;
100 }
101 
pthread_cond_signal(pthread_cond_t * const condition)102 static int pthread_cond_signal(pthread_cond_t* const condition) {
103   int ok = 1;
104   if (WaitForSingleObject(condition->waiting_sem_, 0) == WAIT_OBJECT_0) {
105     // a thread is waiting in pthread_cond_wait: allow it to be notified
106     ok = SetEvent(condition->signal_event_);
107     // wait until the event is consumed so the signaler cannot consume
108     // the event via its own pthread_cond_wait.
109     ok &= (WaitForSingleObject(condition->received_sem_, INFINITE) !=
110            WAIT_OBJECT_0);
111   }
112   return !ok;
113 }
114 
pthread_cond_wait(pthread_cond_t * const condition,pthread_mutex_t * const mutex)115 static int pthread_cond_wait(pthread_cond_t* const condition,
116                              pthread_mutex_t* const mutex) {
117   int ok;
118   // note that there is a consumer available so the signal isn't dropped in
119   // pthread_cond_signal
120   if (!ReleaseSemaphore(condition->waiting_sem_, 1, NULL))
121     return 1;
122   // now unlock the mutex so pthread_cond_signal may be issued
123   pthread_mutex_unlock(mutex);
124   ok = (WaitForSingleObject(condition->signal_event_, INFINITE) ==
125         WAIT_OBJECT_0);
126   ok &= ReleaseSemaphore(condition->received_sem_, 1, NULL);
127   pthread_mutex_lock(mutex);
128   return !ok;
129 }
130 
131 #else  // _WIN32
132 # define THREADFN void*
133 # define THREAD_RETURN(val) val
134 #endif
135 
136 //------------------------------------------------------------------------------
137 
WebPWorkerThreadLoop(void * ptr)138 static THREADFN WebPWorkerThreadLoop(void *ptr) {    // thread loop
139   WebPWorker* const worker = (WebPWorker*)ptr;
140   int done = 0;
141   while (!done) {
142     pthread_mutex_lock(&worker->mutex_);
143     while (worker->status_ == OK) {   // wait in idling mode
144       pthread_cond_wait(&worker->condition_, &worker->mutex_);
145     }
146     if (worker->status_ == WORK) {
147       if (worker->hook) {
148         worker->had_error |= !worker->hook(worker->data1, worker->data2);
149       }
150       worker->status_ = OK;
151     } else if (worker->status_ == NOT_OK) {   // finish the worker
152       done = 1;
153     }
154     // signal to the main thread that we're done (for Sync())
155     pthread_cond_signal(&worker->condition_);
156     pthread_mutex_unlock(&worker->mutex_);
157   }
158   return THREAD_RETURN(NULL);    // Thread is finished
159 }
160 
161 // main thread state control
WebPWorkerChangeState(WebPWorker * const worker,WebPWorkerStatus new_status)162 static void WebPWorkerChangeState(WebPWorker* const worker,
163                                   WebPWorkerStatus new_status) {
164   // no-op when attempting to change state on a thread that didn't come up
165   if (worker->status_ < OK) return;
166 
167   pthread_mutex_lock(&worker->mutex_);
168   // wait for the worker to finish
169   while (worker->status_ != OK) {
170     pthread_cond_wait(&worker->condition_, &worker->mutex_);
171   }
172   // assign new status and release the working thread if needed
173   if (new_status != OK) {
174     worker->status_ = new_status;
175     pthread_cond_signal(&worker->condition_);
176   }
177   pthread_mutex_unlock(&worker->mutex_);
178 }
179 
180 #endif
181 
182 //------------------------------------------------------------------------------
183 
WebPWorkerInit(WebPWorker * const worker)184 void WebPWorkerInit(WebPWorker* const worker) {
185   memset(worker, 0, sizeof(*worker));
186   worker->status_ = NOT_OK;
187 }
188 
WebPWorkerSync(WebPWorker * const worker)189 int WebPWorkerSync(WebPWorker* const worker) {
190 #ifdef WEBP_USE_THREAD
191   WebPWorkerChangeState(worker, OK);
192 #endif
193   assert(worker->status_ <= OK);
194   return !worker->had_error;
195 }
196 
WebPWorkerReset(WebPWorker * const worker)197 int WebPWorkerReset(WebPWorker* const worker) {
198   int ok = 1;
199   worker->had_error = 0;
200   if (worker->status_ < OK) {
201 #ifdef WEBP_USE_THREAD
202     if (pthread_mutex_init(&worker->mutex_, NULL) ||
203         pthread_cond_init(&worker->condition_, NULL)) {
204       return 0;
205     }
206     pthread_mutex_lock(&worker->mutex_);
207     ok = !pthread_create(&worker->thread_, NULL, WebPWorkerThreadLoop, worker);
208     if (ok) worker->status_ = OK;
209     pthread_mutex_unlock(&worker->mutex_);
210 #else
211     worker->status_ = OK;
212 #endif
213   } else if (worker->status_ > OK) {
214     ok = WebPWorkerSync(worker);
215   }
216   assert(!ok || (worker->status_ == OK));
217   return ok;
218 }
219 
WebPWorkerLaunch(WebPWorker * const worker)220 void WebPWorkerLaunch(WebPWorker* const worker) {
221 #ifdef WEBP_USE_THREAD
222   WebPWorkerChangeState(worker, WORK);
223 #else
224   if (worker->hook)
225     worker->had_error |= !worker->hook(worker->data1, worker->data2);
226 #endif
227 }
228 
WebPWorkerEnd(WebPWorker * const worker)229 void WebPWorkerEnd(WebPWorker* const worker) {
230   if (worker->status_ >= OK) {
231 #ifdef WEBP_USE_THREAD
232     WebPWorkerChangeState(worker, NOT_OK);
233     pthread_join(worker->thread_, NULL);
234     pthread_mutex_destroy(&worker->mutex_);
235     pthread_cond_destroy(&worker->condition_);
236 #else
237     worker->status_ = NOT_OK;
238 #endif
239   }
240   assert(worker->status_ == NOT_OK);
241 }
242 
243 //------------------------------------------------------------------------------
244 
245 #if defined(__cplusplus) || defined(c_plusplus)
246 }    // extern "C"
247 #endif
248