• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* MtCoder.c -- Multi-thread Coder
2 2018-07-04 : Igor Pavlov : Public domain */
3 
4 #include "Precomp.h"
5 
6 #include "MtCoder.h"
7 
8 #ifndef _7ZIP_ST
9 
MtProgressThunk_Progress(const ICompressProgress * pp,UInt64 inSize,UInt64 outSize)10 SRes MtProgressThunk_Progress(const ICompressProgress *pp, UInt64 inSize, UInt64 outSize)
11 {
12   CMtProgressThunk *thunk = CONTAINER_FROM_VTBL(pp, CMtProgressThunk, vt);
13   UInt64 inSize2 = 0;
14   UInt64 outSize2 = 0;
15   if (inSize != (UInt64)(Int64)-1)
16   {
17     inSize2 = inSize - thunk->inSize;
18     thunk->inSize = inSize;
19   }
20   if (outSize != (UInt64)(Int64)-1)
21   {
22     outSize2 = outSize - thunk->outSize;
23     thunk->outSize = outSize;
24   }
25   return MtProgress_ProgressAdd(thunk->mtProgress, inSize2, outSize2);
26 }
27 
28 
MtProgressThunk_CreateVTable(CMtProgressThunk * p)29 void MtProgressThunk_CreateVTable(CMtProgressThunk *p)
30 {
31   p->vt.Progress = MtProgressThunk_Progress;
32 }
33 
34 
35 
36 #define RINOK_THREAD(x) { if ((x) != 0) return SZ_ERROR_THREAD; }
37 
38 
ArEvent_OptCreate_And_Reset(CEvent * p)39 static WRes ArEvent_OptCreate_And_Reset(CEvent *p)
40 {
41   if (Event_IsCreated(p))
42     return Event_Reset(p);
43   return AutoResetEvent_CreateNotSignaled(p);
44 }
45 
46 
47 static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp);
48 
49 
MtCoderThread_CreateAndStart(CMtCoderThread * t)50 static SRes MtCoderThread_CreateAndStart(CMtCoderThread *t)
51 {
52   WRes wres = ArEvent_OptCreate_And_Reset(&t->startEvent);
53   if (wres == 0)
54   {
55     t->stop = False;
56     if (!Thread_WasCreated(&t->thread))
57       wres = Thread_Create(&t->thread, ThreadFunc, t);
58     if (wres == 0)
59       wres = Event_Set(&t->startEvent);
60   }
61   if (wres == 0)
62     return SZ_OK;
63   return MY_SRes_HRESULT_FROM_WRes(wres);
64 }
65 
66 
MtCoderThread_Destruct(CMtCoderThread * t)67 static void MtCoderThread_Destruct(CMtCoderThread *t)
68 {
69   if (Thread_WasCreated(&t->thread))
70   {
71     t->stop = 1;
72     Event_Set(&t->startEvent);
73     Thread_Wait(&t->thread);
74     Thread_Close(&t->thread);
75   }
76 
77   Event_Close(&t->startEvent);
78 
79   if (t->inBuf)
80   {
81     ISzAlloc_Free(t->mtCoder->allocBig, t->inBuf);
82     t->inBuf = NULL;
83   }
84 }
85 
86 
87 
FullRead(ISeqInStream * stream,Byte * data,size_t * processedSize)88 static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize)
89 {
90   size_t size = *processedSize;
91   *processedSize = 0;
92   while (size != 0)
93   {
94     size_t cur = size;
95     SRes res = ISeqInStream_Read(stream, data, &cur);
96     *processedSize += cur;
97     data += cur;
98     size -= cur;
99     RINOK(res);
100     if (cur == 0)
101       return SZ_OK;
102   }
103   return SZ_OK;
104 }
105 
106 
107 /*
108   ThreadFunc2() returns:
109   SZ_OK           - in all normal cases (even for stream error or memory allocation error)
110   SZ_ERROR_THREAD - in case of failure in system synch function
111 */
112 
ThreadFunc2(CMtCoderThread * t)113 static SRes ThreadFunc2(CMtCoderThread *t)
114 {
115   CMtCoder *mtc = t->mtCoder;
116 
117   for (;;)
118   {
119     unsigned bi;
120     SRes res;
121     SRes res2;
122     BoolInt finished;
123     unsigned bufIndex;
124     size_t size;
125     const Byte *inData;
126     UInt64 readProcessed = 0;
127 
128     RINOK_THREAD(Event_Wait(&mtc->readEvent))
129 
130     /* after Event_Wait(&mtc->readEvent) we must call Event_Set(&mtc->readEvent) in any case to unlock another threads */
131 
132     if (mtc->stopReading)
133     {
134       return Event_Set(&mtc->readEvent) == 0 ? SZ_OK : SZ_ERROR_THREAD;
135     }
136 
137     res = MtProgress_GetError(&mtc->mtProgress);
138 
139     size = 0;
140     inData = NULL;
141     finished = True;
142 
143     if (res == SZ_OK)
144     {
145       size = mtc->blockSize;
146       if (mtc->inStream)
147       {
148         if (!t->inBuf)
149         {
150           t->inBuf = (Byte *)ISzAlloc_Alloc(mtc->allocBig, mtc->blockSize);
151           if (!t->inBuf)
152             res = SZ_ERROR_MEM;
153         }
154         if (res == SZ_OK)
155         {
156           res = FullRead(mtc->inStream, t->inBuf, &size);
157           readProcessed = mtc->readProcessed + size;
158           mtc->readProcessed = readProcessed;
159         }
160         if (res != SZ_OK)
161         {
162           mtc->readRes = res;
163           /* after reading error - we can stop encoding of previous blocks */
164           MtProgress_SetError(&mtc->mtProgress, res);
165         }
166         else
167           finished = (size != mtc->blockSize);
168       }
169       else
170       {
171         size_t rem;
172         readProcessed = mtc->readProcessed;
173         rem = mtc->inDataSize - (size_t)readProcessed;
174         if (size > rem)
175           size = rem;
176         inData = mtc->inData + (size_t)readProcessed;
177         readProcessed += size;
178         mtc->readProcessed = readProcessed;
179         finished = (mtc->inDataSize == (size_t)readProcessed);
180       }
181     }
182 
183     /* we must get some block from blocksSemaphore before Event_Set(&mtc->readEvent) */
184 
185     res2 = SZ_OK;
186 
187     if (Semaphore_Wait(&mtc->blocksSemaphore) != 0)
188     {
189       res2 = SZ_ERROR_THREAD;
190       if (res == SZ_OK)
191       {
192         res = res2;
193         // MtProgress_SetError(&mtc->mtProgress, res);
194       }
195     }
196 
197     bi = mtc->blockIndex;
198 
199     if (++mtc->blockIndex >= mtc->numBlocksMax)
200       mtc->blockIndex = 0;
201 
202     bufIndex = (unsigned)(int)-1;
203 
204     if (res == SZ_OK)
205       res = MtProgress_GetError(&mtc->mtProgress);
206 
207     if (res != SZ_OK)
208       finished = True;
209 
210     if (!finished)
211     {
212       if (mtc->numStartedThreads < mtc->numStartedThreadsLimit
213           && mtc->expectedDataSize != readProcessed)
214       {
215         res = MtCoderThread_CreateAndStart(&mtc->threads[mtc->numStartedThreads]);
216         if (res == SZ_OK)
217           mtc->numStartedThreads++;
218         else
219         {
220           MtProgress_SetError(&mtc->mtProgress, res);
221           finished = True;
222         }
223       }
224     }
225 
226     if (finished)
227       mtc->stopReading = True;
228 
229     RINOK_THREAD(Event_Set(&mtc->readEvent))
230 
231     if (res2 != SZ_OK)
232       return res2;
233 
234     if (res == SZ_OK)
235     {
236       CriticalSection_Enter(&mtc->cs);
237       bufIndex = mtc->freeBlockHead;
238       mtc->freeBlockHead = mtc->freeBlockList[bufIndex];
239       CriticalSection_Leave(&mtc->cs);
240 
241       res = mtc->mtCallback->Code(mtc->mtCallbackObject, t->index, bufIndex,
242           mtc->inStream ? t->inBuf : inData, size, finished);
243 
244       // MtProgress_Reinit(&mtc->mtProgress, t->index);
245 
246       if (res != SZ_OK)
247         MtProgress_SetError(&mtc->mtProgress, res);
248     }
249 
250     {
251       CMtCoderBlock *block = &mtc->blocks[bi];
252       block->res = res;
253       block->bufIndex = bufIndex;
254       block->finished = finished;
255     }
256 
257     #ifdef MTCODER__USE_WRITE_THREAD
258       RINOK_THREAD(Event_Set(&mtc->writeEvents[bi]))
259     #else
260     {
261       unsigned wi;
262       {
263         CriticalSection_Enter(&mtc->cs);
264         wi = mtc->writeIndex;
265         if (wi == bi)
266           mtc->writeIndex = (unsigned)(int)-1;
267         else
268           mtc->ReadyBlocks[bi] = True;
269         CriticalSection_Leave(&mtc->cs);
270       }
271 
272       if (wi != bi)
273       {
274         if (res != SZ_OK || finished)
275           return 0;
276         continue;
277       }
278 
279       if (mtc->writeRes != SZ_OK)
280         res = mtc->writeRes;
281 
282       for (;;)
283       {
284         if (res == SZ_OK && bufIndex != (unsigned)(int)-1)
285         {
286           res = mtc->mtCallback->Write(mtc->mtCallbackObject, bufIndex);
287           if (res != SZ_OK)
288           {
289             mtc->writeRes = res;
290             MtProgress_SetError(&mtc->mtProgress, res);
291           }
292         }
293 
294         if (++wi >= mtc->numBlocksMax)
295           wi = 0;
296         {
297           BoolInt isReady;
298 
299           CriticalSection_Enter(&mtc->cs);
300 
301           if (bufIndex != (unsigned)(int)-1)
302           {
303             mtc->freeBlockList[bufIndex] = mtc->freeBlockHead;
304             mtc->freeBlockHead = bufIndex;
305           }
306 
307           isReady = mtc->ReadyBlocks[wi];
308 
309           if (isReady)
310             mtc->ReadyBlocks[wi] = False;
311           else
312             mtc->writeIndex = wi;
313 
314           CriticalSection_Leave(&mtc->cs);
315 
316           RINOK_THREAD(Semaphore_Release1(&mtc->blocksSemaphore))
317 
318           if (!isReady)
319             break;
320         }
321 
322         {
323           CMtCoderBlock *block = &mtc->blocks[wi];
324           if (res == SZ_OK && block->res != SZ_OK)
325             res = block->res;
326           bufIndex = block->bufIndex;
327           finished = block->finished;
328         }
329       }
330     }
331     #endif
332 
333     if (finished || res != SZ_OK)
334       return 0;
335   }
336 }
337 
338 
ThreadFunc(void * pp)339 static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp)
340 {
341   CMtCoderThread *t = (CMtCoderThread *)pp;
342   for (;;)
343   {
344     if (Event_Wait(&t->startEvent) != 0)
345       return SZ_ERROR_THREAD;
346     if (t->stop)
347       return 0;
348     {
349       SRes res = ThreadFunc2(t);
350       CMtCoder *mtc = t->mtCoder;
351       if (res != SZ_OK)
352       {
353         MtProgress_SetError(&mtc->mtProgress, res);
354       }
355 
356       #ifndef MTCODER__USE_WRITE_THREAD
357       {
358         unsigned numFinished = (unsigned)InterlockedIncrement(&mtc->numFinishedThreads);
359         if (numFinished == mtc->numStartedThreads)
360           if (Event_Set(&mtc->finishedEvent) != 0)
361             return SZ_ERROR_THREAD;
362       }
363       #endif
364     }
365   }
366 }
367 
368 
369 
MtCoder_Construct(CMtCoder * p)370 void MtCoder_Construct(CMtCoder *p)
371 {
372   unsigned i;
373 
374   p->blockSize = 0;
375   p->numThreadsMax = 0;
376   p->expectedDataSize = (UInt64)(Int64)-1;
377 
378   p->inStream = NULL;
379   p->inData = NULL;
380   p->inDataSize = 0;
381 
382   p->progress = NULL;
383   p->allocBig = NULL;
384 
385   p->mtCallback = NULL;
386   p->mtCallbackObject = NULL;
387 
388   p->allocatedBufsSize = 0;
389 
390   Event_Construct(&p->readEvent);
391   Semaphore_Construct(&p->blocksSemaphore);
392 
393   for (i = 0; i < MTCODER__THREADS_MAX; i++)
394   {
395     CMtCoderThread *t = &p->threads[i];
396     t->mtCoder = p;
397     t->index = i;
398     t->inBuf = NULL;
399     t->stop = False;
400     Event_Construct(&t->startEvent);
401     Thread_Construct(&t->thread);
402   }
403 
404   #ifdef MTCODER__USE_WRITE_THREAD
405     for (i = 0; i < MTCODER__BLOCKS_MAX; i++)
406       Event_Construct(&p->writeEvents[i]);
407   #else
408     Event_Construct(&p->finishedEvent);
409   #endif
410 
411   CriticalSection_Init(&p->cs);
412   CriticalSection_Init(&p->mtProgress.cs);
413 }
414 
415 
416 
417 
MtCoder_Free(CMtCoder * p)418 static void MtCoder_Free(CMtCoder *p)
419 {
420   unsigned i;
421 
422   /*
423   p->stopReading = True;
424   if (Event_IsCreated(&p->readEvent))
425     Event_Set(&p->readEvent);
426   */
427 
428   for (i = 0; i < MTCODER__THREADS_MAX; i++)
429     MtCoderThread_Destruct(&p->threads[i]);
430 
431   Event_Close(&p->readEvent);
432   Semaphore_Close(&p->blocksSemaphore);
433 
434   #ifdef MTCODER__USE_WRITE_THREAD
435     for (i = 0; i < MTCODER__BLOCKS_MAX; i++)
436       Event_Close(&p->writeEvents[i]);
437   #else
438     Event_Close(&p->finishedEvent);
439   #endif
440 }
441 
442 
MtCoder_Destruct(CMtCoder * p)443 void MtCoder_Destruct(CMtCoder *p)
444 {
445   MtCoder_Free(p);
446 
447   CriticalSection_Delete(&p->cs);
448   CriticalSection_Delete(&p->mtProgress.cs);
449 }
450 
451 
MtCoder_Code(CMtCoder * p)452 SRes MtCoder_Code(CMtCoder *p)
453 {
454   unsigned numThreads = p->numThreadsMax;
455   unsigned numBlocksMax;
456   unsigned i;
457   SRes res = SZ_OK;
458 
459   if (numThreads > MTCODER__THREADS_MAX)
460     numThreads = MTCODER__THREADS_MAX;
461   numBlocksMax = MTCODER__GET_NUM_BLOCKS_FROM_THREADS(numThreads);
462 
463   if (p->blockSize < ((UInt32)1 << 26)) numBlocksMax++;
464   if (p->blockSize < ((UInt32)1 << 24)) numBlocksMax++;
465   if (p->blockSize < ((UInt32)1 << 22)) numBlocksMax++;
466 
467   if (numBlocksMax > MTCODER__BLOCKS_MAX)
468     numBlocksMax = MTCODER__BLOCKS_MAX;
469 
470   if (p->blockSize != p->allocatedBufsSize)
471   {
472     for (i = 0; i < MTCODER__THREADS_MAX; i++)
473     {
474       CMtCoderThread *t = &p->threads[i];
475       if (t->inBuf)
476       {
477         ISzAlloc_Free(p->allocBig, t->inBuf);
478         t->inBuf = NULL;
479       }
480     }
481     p->allocatedBufsSize = p->blockSize;
482   }
483 
484   p->readRes = SZ_OK;
485 
486   MtProgress_Init(&p->mtProgress, p->progress);
487 
488   #ifdef MTCODER__USE_WRITE_THREAD
489     for (i = 0; i < numBlocksMax; i++)
490     {
491       RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->writeEvents[i]));
492     }
493   #else
494     RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->finishedEvent));
495   #endif
496 
497   {
498     RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->readEvent));
499 
500     if (Semaphore_IsCreated(&p->blocksSemaphore))
501     {
502       RINOK_THREAD(Semaphore_Close(&p->blocksSemaphore));
503     }
504     RINOK_THREAD(Semaphore_Create(&p->blocksSemaphore, numBlocksMax, numBlocksMax));
505   }
506 
507   for (i = 0; i < MTCODER__BLOCKS_MAX - 1; i++)
508     p->freeBlockList[i] = i + 1;
509   p->freeBlockList[MTCODER__BLOCKS_MAX - 1] = (unsigned)(int)-1;
510   p->freeBlockHead = 0;
511 
512   p->readProcessed = 0;
513   p->blockIndex = 0;
514   p->numBlocksMax = numBlocksMax;
515   p->stopReading = False;
516 
517   #ifndef MTCODER__USE_WRITE_THREAD
518     p->writeIndex = 0;
519     p->writeRes = SZ_OK;
520     for (i = 0; i < MTCODER__BLOCKS_MAX; i++)
521       p->ReadyBlocks[i] = False;
522     p->numFinishedThreads = 0;
523   #endif
524 
525   p->numStartedThreadsLimit = numThreads;
526   p->numStartedThreads = 0;
527 
528   // for (i = 0; i < numThreads; i++)
529   {
530     CMtCoderThread *nextThread = &p->threads[p->numStartedThreads++];
531     RINOK(MtCoderThread_CreateAndStart(nextThread));
532   }
533 
534   RINOK_THREAD(Event_Set(&p->readEvent))
535 
536   #ifdef MTCODER__USE_WRITE_THREAD
537   {
538     unsigned bi = 0;
539 
540     for (;; bi++)
541     {
542       if (bi >= numBlocksMax)
543         bi = 0;
544 
545       RINOK_THREAD(Event_Wait(&p->writeEvents[bi]))
546 
547       {
548         const CMtCoderBlock *block = &p->blocks[bi];
549         unsigned bufIndex = block->bufIndex;
550         BoolInt finished = block->finished;
551         if (res == SZ_OK && block->res != SZ_OK)
552           res = block->res;
553 
554         if (bufIndex != (unsigned)(int)-1)
555         {
556           if (res == SZ_OK)
557           {
558             res = p->mtCallback->Write(p->mtCallbackObject, bufIndex);
559             if (res != SZ_OK)
560               MtProgress_SetError(&p->mtProgress, res);
561           }
562 
563           CriticalSection_Enter(&p->cs);
564           {
565             p->freeBlockList[bufIndex] = p->freeBlockHead;
566             p->freeBlockHead = bufIndex;
567           }
568           CriticalSection_Leave(&p->cs);
569         }
570 
571         RINOK_THREAD(Semaphore_Release1(&p->blocksSemaphore))
572 
573         if (finished)
574           break;
575       }
576     }
577   }
578   #else
579   {
580     WRes wres = Event_Wait(&p->finishedEvent);
581     res = MY_SRes_HRESULT_FROM_WRes(wres);
582   }
583   #endif
584 
585   if (res == SZ_OK)
586     res = p->readRes;
587 
588   if (res == SZ_OK)
589     res = p->mtProgress.res;
590 
591   #ifndef MTCODER__USE_WRITE_THREAD
592     if (res == SZ_OK)
593       res = p->writeRes;
594   #endif
595 
596   if (res != SZ_OK)
597     MtCoder_Free(p);
598   return res;
599 }
600 
601 #endif
602