• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* MtCoder.c -- Multi-thread Coder
2 2021-12-21 : Igor Pavlov : Public domain */
3 
4 #include "Precomp.h"
5 
6 #include "MtCoder.h"
7 
8 #ifndef _7ZIP_ST
9 
MtProgressThunk_Progress(const ICompressProgress * pp,UInt64 inSize,UInt64 outSize)10 static SRes MtProgressThunk_Progress(const ICompressProgress *pp, UInt64 inSize, UInt64 outSize)
11 {
12   CMtProgressThunk *thunk = CONTAINER_FROM_VTBL(pp, CMtProgressThunk, vt);
13   UInt64 inSize2 = 0;
14   UInt64 outSize2 = 0;
15   if (inSize != (UInt64)(Int64)-1)
16   {
17     inSize2 = inSize - thunk->inSize;
18     thunk->inSize = inSize;
19   }
20   if (outSize != (UInt64)(Int64)-1)
21   {
22     outSize2 = outSize - thunk->outSize;
23     thunk->outSize = outSize;
24   }
25   return MtProgress_ProgressAdd(thunk->mtProgress, inSize2, outSize2);
26 }
27 
28 
MtProgressThunk_CreateVTable(CMtProgressThunk * p)29 void MtProgressThunk_CreateVTable(CMtProgressThunk *p)
30 {
31   p->vt.Progress = MtProgressThunk_Progress;
32 }
33 
34 
35 
36 #define RINOK_THREAD(x) { if ((x) != 0) return SZ_ERROR_THREAD; }
37 
38 
ArEvent_OptCreate_And_Reset(CEvent * p)39 static WRes ArEvent_OptCreate_And_Reset(CEvent *p)
40 {
41   if (Event_IsCreated(p))
42     return Event_Reset(p);
43   return AutoResetEvent_CreateNotSignaled(p);
44 }
45 
46 
47 static THREAD_FUNC_DECL ThreadFunc(void *pp);
48 
49 
MtCoderThread_CreateAndStart(CMtCoderThread * t)50 static SRes MtCoderThread_CreateAndStart(CMtCoderThread *t)
51 {
52   WRes wres = ArEvent_OptCreate_And_Reset(&t->startEvent);
53   if (wres == 0)
54   {
55     t->stop = False;
56     if (!Thread_WasCreated(&t->thread))
57       wres = Thread_Create(&t->thread, ThreadFunc, t);
58     if (wres == 0)
59       wres = Event_Set(&t->startEvent);
60   }
61   if (wres == 0)
62     return SZ_OK;
63   return MY_SRes_HRESULT_FROM_WRes(wres);
64 }
65 
66 
MtCoderThread_Destruct(CMtCoderThread * t)67 static void MtCoderThread_Destruct(CMtCoderThread *t)
68 {
69   if (Thread_WasCreated(&t->thread))
70   {
71     t->stop = 1;
72     Event_Set(&t->startEvent);
73     Thread_Wait_Close(&t->thread);
74   }
75 
76   Event_Close(&t->startEvent);
77 
78   if (t->inBuf)
79   {
80     ISzAlloc_Free(t->mtCoder->allocBig, t->inBuf);
81     t->inBuf = NULL;
82   }
83 }
84 
85 
86 
FullRead(ISeqInStream * stream,Byte * data,size_t * processedSize)87 static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize)
88 {
89   size_t size = *processedSize;
90   *processedSize = 0;
91   while (size != 0)
92   {
93     size_t cur = size;
94     SRes res = ISeqInStream_Read(stream, data, &cur);
95     *processedSize += cur;
96     data += cur;
97     size -= cur;
98     RINOK(res);
99     if (cur == 0)
100       return SZ_OK;
101   }
102   return SZ_OK;
103 }
104 
105 
106 /*
107   ThreadFunc2() returns:
108   SZ_OK           - in all normal cases (even for stream error or memory allocation error)
109   SZ_ERROR_THREAD - in case of failure in system synch function
110 */
111 
ThreadFunc2(CMtCoderThread * t)112 static SRes ThreadFunc2(CMtCoderThread *t)
113 {
114   CMtCoder *mtc = t->mtCoder;
115 
116   for (;;)
117   {
118     unsigned bi;
119     SRes res;
120     SRes res2;
121     BoolInt finished;
122     unsigned bufIndex;
123     size_t size;
124     const Byte *inData;
125     UInt64 readProcessed = 0;
126 
127     RINOK_THREAD(Event_Wait(&mtc->readEvent))
128 
129     /* after Event_Wait(&mtc->readEvent) we must call Event_Set(&mtc->readEvent) in any case to unlock another threads */
130 
131     if (mtc->stopReading)
132     {
133       return Event_Set(&mtc->readEvent) == 0 ? SZ_OK : SZ_ERROR_THREAD;
134     }
135 
136     res = MtProgress_GetError(&mtc->mtProgress);
137 
138     size = 0;
139     inData = NULL;
140     finished = True;
141 
142     if (res == SZ_OK)
143     {
144       size = mtc->blockSize;
145       if (mtc->inStream)
146       {
147         if (!t->inBuf)
148         {
149           t->inBuf = (Byte *)ISzAlloc_Alloc(mtc->allocBig, mtc->blockSize);
150           if (!t->inBuf)
151             res = SZ_ERROR_MEM;
152         }
153         if (res == SZ_OK)
154         {
155           res = FullRead(mtc->inStream, t->inBuf, &size);
156           readProcessed = mtc->readProcessed + size;
157           mtc->readProcessed = readProcessed;
158         }
159         if (res != SZ_OK)
160         {
161           mtc->readRes = res;
162           /* after reading error - we can stop encoding of previous blocks */
163           MtProgress_SetError(&mtc->mtProgress, res);
164         }
165         else
166           finished = (size != mtc->blockSize);
167       }
168       else
169       {
170         size_t rem;
171         readProcessed = mtc->readProcessed;
172         rem = mtc->inDataSize - (size_t)readProcessed;
173         if (size > rem)
174           size = rem;
175         inData = mtc->inData + (size_t)readProcessed;
176         readProcessed += size;
177         mtc->readProcessed = readProcessed;
178         finished = (mtc->inDataSize == (size_t)readProcessed);
179       }
180     }
181 
182     /* we must get some block from blocksSemaphore before Event_Set(&mtc->readEvent) */
183 
184     res2 = SZ_OK;
185 
186     if (Semaphore_Wait(&mtc->blocksSemaphore) != 0)
187     {
188       res2 = SZ_ERROR_THREAD;
189       if (res == SZ_OK)
190       {
191         res = res2;
192         // MtProgress_SetError(&mtc->mtProgress, res);
193       }
194     }
195 
196     bi = mtc->blockIndex;
197 
198     if (++mtc->blockIndex >= mtc->numBlocksMax)
199       mtc->blockIndex = 0;
200 
201     bufIndex = (unsigned)(int)-1;
202 
203     if (res == SZ_OK)
204       res = MtProgress_GetError(&mtc->mtProgress);
205 
206     if (res != SZ_OK)
207       finished = True;
208 
209     if (!finished)
210     {
211       if (mtc->numStartedThreads < mtc->numStartedThreadsLimit
212           && mtc->expectedDataSize != readProcessed)
213       {
214         res = MtCoderThread_CreateAndStart(&mtc->threads[mtc->numStartedThreads]);
215         if (res == SZ_OK)
216           mtc->numStartedThreads++;
217         else
218         {
219           MtProgress_SetError(&mtc->mtProgress, res);
220           finished = True;
221         }
222       }
223     }
224 
225     if (finished)
226       mtc->stopReading = True;
227 
228     RINOK_THREAD(Event_Set(&mtc->readEvent))
229 
230     if (res2 != SZ_OK)
231       return res2;
232 
233     if (res == SZ_OK)
234     {
235       CriticalSection_Enter(&mtc->cs);
236       bufIndex = mtc->freeBlockHead;
237       mtc->freeBlockHead = mtc->freeBlockList[bufIndex];
238       CriticalSection_Leave(&mtc->cs);
239 
240       res = mtc->mtCallback->Code(mtc->mtCallbackObject, t->index, bufIndex,
241           mtc->inStream ? t->inBuf : inData, size, finished);
242 
243       // MtProgress_Reinit(&mtc->mtProgress, t->index);
244 
245       if (res != SZ_OK)
246         MtProgress_SetError(&mtc->mtProgress, res);
247     }
248 
249     {
250       CMtCoderBlock *block = &mtc->blocks[bi];
251       block->res = res;
252       block->bufIndex = bufIndex;
253       block->finished = finished;
254     }
255 
256     #ifdef MTCODER__USE_WRITE_THREAD
257       RINOK_THREAD(Event_Set(&mtc->writeEvents[bi]))
258     #else
259     {
260       unsigned wi;
261       {
262         CriticalSection_Enter(&mtc->cs);
263         wi = mtc->writeIndex;
264         if (wi == bi)
265           mtc->writeIndex = (unsigned)(int)-1;
266         else
267           mtc->ReadyBlocks[bi] = True;
268         CriticalSection_Leave(&mtc->cs);
269       }
270 
271       if (wi != bi)
272       {
273         if (res != SZ_OK || finished)
274           return 0;
275         continue;
276       }
277 
278       if (mtc->writeRes != SZ_OK)
279         res = mtc->writeRes;
280 
281       for (;;)
282       {
283         if (res == SZ_OK && bufIndex != (unsigned)(int)-1)
284         {
285           res = mtc->mtCallback->Write(mtc->mtCallbackObject, bufIndex);
286           if (res != SZ_OK)
287           {
288             mtc->writeRes = res;
289             MtProgress_SetError(&mtc->mtProgress, res);
290           }
291         }
292 
293         if (++wi >= mtc->numBlocksMax)
294           wi = 0;
295         {
296           BoolInt isReady;
297 
298           CriticalSection_Enter(&mtc->cs);
299 
300           if (bufIndex != (unsigned)(int)-1)
301           {
302             mtc->freeBlockList[bufIndex] = mtc->freeBlockHead;
303             mtc->freeBlockHead = bufIndex;
304           }
305 
306           isReady = mtc->ReadyBlocks[wi];
307 
308           if (isReady)
309             mtc->ReadyBlocks[wi] = False;
310           else
311             mtc->writeIndex = wi;
312 
313           CriticalSection_Leave(&mtc->cs);
314 
315           RINOK_THREAD(Semaphore_Release1(&mtc->blocksSemaphore))
316 
317           if (!isReady)
318             break;
319         }
320 
321         {
322           CMtCoderBlock *block = &mtc->blocks[wi];
323           if (res == SZ_OK && block->res != SZ_OK)
324             res = block->res;
325           bufIndex = block->bufIndex;
326           finished = block->finished;
327         }
328       }
329     }
330     #endif
331 
332     if (finished || res != SZ_OK)
333       return 0;
334   }
335 }
336 
337 
ThreadFunc(void * pp)338 static THREAD_FUNC_DECL ThreadFunc(void *pp)
339 {
340   CMtCoderThread *t = (CMtCoderThread *)pp;
341   for (;;)
342   {
343     if (Event_Wait(&t->startEvent) != 0)
344       return (THREAD_FUNC_RET_TYPE)SZ_ERROR_THREAD;
345     if (t->stop)
346       return 0;
347     {
348       SRes res = ThreadFunc2(t);
349       CMtCoder *mtc = t->mtCoder;
350       if (res != SZ_OK)
351       {
352         MtProgress_SetError(&mtc->mtProgress, res);
353       }
354 
355       #ifndef MTCODER__USE_WRITE_THREAD
356       {
357         unsigned numFinished = (unsigned)InterlockedIncrement(&mtc->numFinishedThreads);
358         if (numFinished == mtc->numStartedThreads)
359           if (Event_Set(&mtc->finishedEvent) != 0)
360             return (THREAD_FUNC_RET_TYPE)SZ_ERROR_THREAD;
361       }
362       #endif
363     }
364   }
365 }
366 
367 
368 
MtCoder_Construct(CMtCoder * p)369 void MtCoder_Construct(CMtCoder *p)
370 {
371   unsigned i;
372 
373   p->blockSize = 0;
374   p->numThreadsMax = 0;
375   p->expectedDataSize = (UInt64)(Int64)-1;
376 
377   p->inStream = NULL;
378   p->inData = NULL;
379   p->inDataSize = 0;
380 
381   p->progress = NULL;
382   p->allocBig = NULL;
383 
384   p->mtCallback = NULL;
385   p->mtCallbackObject = NULL;
386 
387   p->allocatedBufsSize = 0;
388 
389   Event_Construct(&p->readEvent);
390   Semaphore_Construct(&p->blocksSemaphore);
391 
392   for (i = 0; i < MTCODER__THREADS_MAX; i++)
393   {
394     CMtCoderThread *t = &p->threads[i];
395     t->mtCoder = p;
396     t->index = i;
397     t->inBuf = NULL;
398     t->stop = False;
399     Event_Construct(&t->startEvent);
400     Thread_Construct(&t->thread);
401   }
402 
403   #ifdef MTCODER__USE_WRITE_THREAD
404     for (i = 0; i < MTCODER__BLOCKS_MAX; i++)
405       Event_Construct(&p->writeEvents[i]);
406   #else
407     Event_Construct(&p->finishedEvent);
408   #endif
409 
410   CriticalSection_Init(&p->cs);
411   CriticalSection_Init(&p->mtProgress.cs);
412 }
413 
414 
415 
416 
MtCoder_Free(CMtCoder * p)417 static void MtCoder_Free(CMtCoder *p)
418 {
419   unsigned i;
420 
421   /*
422   p->stopReading = True;
423   if (Event_IsCreated(&p->readEvent))
424     Event_Set(&p->readEvent);
425   */
426 
427   for (i = 0; i < MTCODER__THREADS_MAX; i++)
428     MtCoderThread_Destruct(&p->threads[i]);
429 
430   Event_Close(&p->readEvent);
431   Semaphore_Close(&p->blocksSemaphore);
432 
433   #ifdef MTCODER__USE_WRITE_THREAD
434     for (i = 0; i < MTCODER__BLOCKS_MAX; i++)
435       Event_Close(&p->writeEvents[i]);
436   #else
437     Event_Close(&p->finishedEvent);
438   #endif
439 }
440 
441 
MtCoder_Destruct(CMtCoder * p)442 void MtCoder_Destruct(CMtCoder *p)
443 {
444   MtCoder_Free(p);
445 
446   CriticalSection_Delete(&p->cs);
447   CriticalSection_Delete(&p->mtProgress.cs);
448 }
449 
450 
MtCoder_Code(CMtCoder * p)451 SRes MtCoder_Code(CMtCoder *p)
452 {
453   unsigned numThreads = p->numThreadsMax;
454   unsigned numBlocksMax;
455   unsigned i;
456   SRes res = SZ_OK;
457 
458   if (numThreads > MTCODER__THREADS_MAX)
459     numThreads = MTCODER__THREADS_MAX;
460   numBlocksMax = MTCODER__GET_NUM_BLOCKS_FROM_THREADS(numThreads);
461 
462   if (p->blockSize < ((UInt32)1 << 26)) numBlocksMax++;
463   if (p->blockSize < ((UInt32)1 << 24)) numBlocksMax++;
464   if (p->blockSize < ((UInt32)1 << 22)) numBlocksMax++;
465 
466   if (numBlocksMax > MTCODER__BLOCKS_MAX)
467     numBlocksMax = MTCODER__BLOCKS_MAX;
468 
469   if (p->blockSize != p->allocatedBufsSize)
470   {
471     for (i = 0; i < MTCODER__THREADS_MAX; i++)
472     {
473       CMtCoderThread *t = &p->threads[i];
474       if (t->inBuf)
475       {
476         ISzAlloc_Free(p->allocBig, t->inBuf);
477         t->inBuf = NULL;
478       }
479     }
480     p->allocatedBufsSize = p->blockSize;
481   }
482 
483   p->readRes = SZ_OK;
484 
485   MtProgress_Init(&p->mtProgress, p->progress);
486 
487   #ifdef MTCODER__USE_WRITE_THREAD
488     for (i = 0; i < numBlocksMax; i++)
489     {
490       RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->writeEvents[i]));
491     }
492   #else
493     RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->finishedEvent));
494   #endif
495 
496   {
497     RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->readEvent));
498     RINOK_THREAD(Semaphore_OptCreateInit(&p->blocksSemaphore, numBlocksMax, numBlocksMax));
499   }
500 
501   for (i = 0; i < MTCODER__BLOCKS_MAX - 1; i++)
502     p->freeBlockList[i] = i + 1;
503   p->freeBlockList[MTCODER__BLOCKS_MAX - 1] = (unsigned)(int)-1;
504   p->freeBlockHead = 0;
505 
506   p->readProcessed = 0;
507   p->blockIndex = 0;
508   p->numBlocksMax = numBlocksMax;
509   p->stopReading = False;
510 
511   #ifndef MTCODER__USE_WRITE_THREAD
512     p->writeIndex = 0;
513     p->writeRes = SZ_OK;
514     for (i = 0; i < MTCODER__BLOCKS_MAX; i++)
515       p->ReadyBlocks[i] = False;
516     p->numFinishedThreads = 0;
517   #endif
518 
519   p->numStartedThreadsLimit = numThreads;
520   p->numStartedThreads = 0;
521 
522   // for (i = 0; i < numThreads; i++)
523   {
524     CMtCoderThread *nextThread = &p->threads[p->numStartedThreads++];
525     RINOK(MtCoderThread_CreateAndStart(nextThread));
526   }
527 
528   RINOK_THREAD(Event_Set(&p->readEvent))
529 
530   #ifdef MTCODER__USE_WRITE_THREAD
531   {
532     unsigned bi = 0;
533 
534     for (;; bi++)
535     {
536       if (bi >= numBlocksMax)
537         bi = 0;
538 
539       RINOK_THREAD(Event_Wait(&p->writeEvents[bi]))
540 
541       {
542         const CMtCoderBlock *block = &p->blocks[bi];
543         unsigned bufIndex = block->bufIndex;
544         BoolInt finished = block->finished;
545         if (res == SZ_OK && block->res != SZ_OK)
546           res = block->res;
547 
548         if (bufIndex != (unsigned)(int)-1)
549         {
550           if (res == SZ_OK)
551           {
552             res = p->mtCallback->Write(p->mtCallbackObject, bufIndex);
553             if (res != SZ_OK)
554               MtProgress_SetError(&p->mtProgress, res);
555           }
556 
557           CriticalSection_Enter(&p->cs);
558           {
559             p->freeBlockList[bufIndex] = p->freeBlockHead;
560             p->freeBlockHead = bufIndex;
561           }
562           CriticalSection_Leave(&p->cs);
563         }
564 
565         RINOK_THREAD(Semaphore_Release1(&p->blocksSemaphore))
566 
567         if (finished)
568           break;
569       }
570     }
571   }
572   #else
573   {
574     WRes wres = Event_Wait(&p->finishedEvent);
575     res = MY_SRes_HRESULT_FROM_WRes(wres);
576   }
577   #endif
578 
579   if (res == SZ_OK)
580     res = p->readRes;
581 
582   if (res == SZ_OK)
583     res = p->mtProgress.res;
584 
585   #ifndef MTCODER__USE_WRITE_THREAD
586     if (res == SZ_OK)
587       res = p->writeRes;
588   #endif
589 
590   if (res != SZ_OK)
591     MtCoder_Free(p);
592   return res;
593 }
594 
595 #endif
596