• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* MtDec.c -- Multi-thread Decoder
2 2021-12-21 : Igor Pavlov : Public domain */
3 
4 #include "Precomp.h"
5 
6 // #define SHOW_DEBUG_INFO
7 
8 // #include <stdio.h>
9 #include <string.h>
10 
11 #ifdef SHOW_DEBUG_INFO
12 #include <stdio.h>
13 #endif
14 
15 #include "MtDec.h"
16 
17 #ifndef _7ZIP_ST
18 
19 #ifdef SHOW_DEBUG_INFO
20 #define PRF(x) x
21 #else
22 #define PRF(x)
23 #endif
24 
25 #define PRF_STR_INT(s, d) PRF(printf("\n" s " %d\n", (unsigned)d))
26 
MtProgress_Init(CMtProgress * p,ICompressProgress * progress)27 void MtProgress_Init(CMtProgress *p, ICompressProgress *progress)
28 {
29   p->progress = progress;
30   p->res = SZ_OK;
31   p->totalInSize = 0;
32   p->totalOutSize = 0;
33 }
34 
35 
MtProgress_Progress_ST(CMtProgress * p)36 SRes MtProgress_Progress_ST(CMtProgress *p)
37 {
38   if (p->res == SZ_OK && p->progress)
39     if (ICompressProgress_Progress(p->progress, p->totalInSize, p->totalOutSize) != SZ_OK)
40       p->res = SZ_ERROR_PROGRESS;
41   return p->res;
42 }
43 
44 
MtProgress_ProgressAdd(CMtProgress * p,UInt64 inSize,UInt64 outSize)45 SRes MtProgress_ProgressAdd(CMtProgress *p, UInt64 inSize, UInt64 outSize)
46 {
47   SRes res;
48   CriticalSection_Enter(&p->cs);
49 
50   p->totalInSize += inSize;
51   p->totalOutSize += outSize;
52   if (p->res == SZ_OK && p->progress)
53     if (ICompressProgress_Progress(p->progress, p->totalInSize, p->totalOutSize) != SZ_OK)
54       p->res = SZ_ERROR_PROGRESS;
55   res = p->res;
56 
57   CriticalSection_Leave(&p->cs);
58   return res;
59 }
60 
61 
MtProgress_GetError(CMtProgress * p)62 SRes MtProgress_GetError(CMtProgress *p)
63 {
64   SRes res;
65   CriticalSection_Enter(&p->cs);
66   res = p->res;
67   CriticalSection_Leave(&p->cs);
68   return res;
69 }
70 
71 
MtProgress_SetError(CMtProgress * p,SRes res)72 void MtProgress_SetError(CMtProgress *p, SRes res)
73 {
74   CriticalSection_Enter(&p->cs);
75   if (p->res == SZ_OK)
76     p->res = res;
77   CriticalSection_Leave(&p->cs);
78 }
79 
80 
81 #define RINOK_THREAD(x) RINOK_WRes(x)
82 
83 
ArEvent_OptCreate_And_Reset(CEvent * p)84 static WRes ArEvent_OptCreate_And_Reset(CEvent *p)
85 {
86   if (Event_IsCreated(p))
87     return Event_Reset(p);
88   return AutoResetEvent_CreateNotSignaled(p);
89 }
90 
91 
92 struct __CMtDecBufLink
93 {
94   struct __CMtDecBufLink *next;
95   void *pad[3];
96 };
97 
98 typedef struct __CMtDecBufLink CMtDecBufLink;
99 
100 #define MTDEC__LINK_DATA_OFFSET sizeof(CMtDecBufLink)
101 #define MTDEC__DATA_PTR_FROM_LINK(link) ((Byte *)(link) + MTDEC__LINK_DATA_OFFSET)
102 
103 
104 
105 static THREAD_FUNC_DECL ThreadFunc(void *pp);
106 
107 
MtDecThread_CreateEvents(CMtDecThread * t)108 static WRes MtDecThread_CreateEvents(CMtDecThread *t)
109 {
110   WRes wres = ArEvent_OptCreate_And_Reset(&t->canWrite);
111   if (wres == 0)
112   {
113     wres = ArEvent_OptCreate_And_Reset(&t->canRead);
114     if (wres == 0)
115       return SZ_OK;
116   }
117   return wres;
118 }
119 
120 
MtDecThread_CreateAndStart(CMtDecThread * t)121 static SRes MtDecThread_CreateAndStart(CMtDecThread *t)
122 {
123   WRes wres = MtDecThread_CreateEvents(t);
124   // wres = 17; // for test
125   if (wres == 0)
126   {
127     if (Thread_WasCreated(&t->thread))
128       return SZ_OK;
129     wres = Thread_Create(&t->thread, ThreadFunc, t);
130     if (wres == 0)
131       return SZ_OK;
132   }
133   return MY_SRes_HRESULT_FROM_WRes(wres);
134 }
135 
136 
MtDecThread_FreeInBufs(CMtDecThread * t)137 void MtDecThread_FreeInBufs(CMtDecThread *t)
138 {
139   if (t->inBuf)
140   {
141     void *link = t->inBuf;
142     t->inBuf = NULL;
143     do
144     {
145       void *next = ((CMtDecBufLink *)link)->next;
146       ISzAlloc_Free(t->mtDec->alloc, link);
147       link = next;
148     }
149     while (link);
150   }
151 }
152 
153 
MtDecThread_CloseThread(CMtDecThread * t)154 static void MtDecThread_CloseThread(CMtDecThread *t)
155 {
156   if (Thread_WasCreated(&t->thread))
157   {
158     Event_Set(&t->canWrite); /* we can disable it. There are no threads waiting canWrite in normal cases */
159     Event_Set(&t->canRead);
160     Thread_Wait_Close(&t->thread);
161   }
162 
163   Event_Close(&t->canRead);
164   Event_Close(&t->canWrite);
165 }
166 
MtDec_CloseThreads(CMtDec * p)167 static void MtDec_CloseThreads(CMtDec *p)
168 {
169   unsigned i;
170   for (i = 0; i < MTDEC__THREADS_MAX; i++)
171     MtDecThread_CloseThread(&p->threads[i]);
172 }
173 
MtDecThread_Destruct(CMtDecThread * t)174 static void MtDecThread_Destruct(CMtDecThread *t)
175 {
176   MtDecThread_CloseThread(t);
177   MtDecThread_FreeInBufs(t);
178 }
179 
180 
181 
FullRead(ISeqInStream * stream,Byte * data,size_t * processedSize)182 static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize)
183 {
184   size_t size = *processedSize;
185   *processedSize = 0;
186   while (size != 0)
187   {
188     size_t cur = size;
189     SRes res = ISeqInStream_Read(stream, data, &cur);
190     *processedSize += cur;
191     data += cur;
192     size -= cur;
193     RINOK(res);
194     if (cur == 0)
195       return SZ_OK;
196   }
197   return SZ_OK;
198 }
199 
200 
MtDec_GetError_Spec(CMtDec * p,UInt64 interruptIndex,BoolInt * wasInterrupted)201 static SRes MtDec_GetError_Spec(CMtDec *p, UInt64 interruptIndex, BoolInt *wasInterrupted)
202 {
203   SRes res;
204   CriticalSection_Enter(&p->mtProgress.cs);
205   *wasInterrupted = (p->needInterrupt && interruptIndex > p->interruptIndex);
206   res = p->mtProgress.res;
207   CriticalSection_Leave(&p->mtProgress.cs);
208   return res;
209 }
210 
MtDec_Progress_GetError_Spec(CMtDec * p,UInt64 inSize,UInt64 outSize,UInt64 interruptIndex,BoolInt * wasInterrupted)211 static SRes MtDec_Progress_GetError_Spec(CMtDec *p, UInt64 inSize, UInt64 outSize, UInt64 interruptIndex, BoolInt *wasInterrupted)
212 {
213   SRes res;
214   CriticalSection_Enter(&p->mtProgress.cs);
215 
216   p->mtProgress.totalInSize += inSize;
217   p->mtProgress.totalOutSize += outSize;
218   if (p->mtProgress.res == SZ_OK && p->mtProgress.progress)
219     if (ICompressProgress_Progress(p->mtProgress.progress, p->mtProgress.totalInSize, p->mtProgress.totalOutSize) != SZ_OK)
220       p->mtProgress.res = SZ_ERROR_PROGRESS;
221 
222   *wasInterrupted = (p->needInterrupt && interruptIndex > p->interruptIndex);
223   res = p->mtProgress.res;
224 
225   CriticalSection_Leave(&p->mtProgress.cs);
226 
227   return res;
228 }
229 
MtDec_Interrupt(CMtDec * p,UInt64 interruptIndex)230 static void MtDec_Interrupt(CMtDec *p, UInt64 interruptIndex)
231 {
232   CriticalSection_Enter(&p->mtProgress.cs);
233   if (!p->needInterrupt || interruptIndex < p->interruptIndex)
234   {
235     p->interruptIndex = interruptIndex;
236     p->needInterrupt = True;
237   }
238   CriticalSection_Leave(&p->mtProgress.cs);
239 }
240 
MtDec_GetCrossBuff(CMtDec * p)241 Byte *MtDec_GetCrossBuff(CMtDec *p)
242 {
243   Byte *cr = p->crossBlock;
244   if (!cr)
245   {
246     cr = (Byte *)ISzAlloc_Alloc(p->alloc, MTDEC__LINK_DATA_OFFSET + p->inBufSize);
247     if (!cr)
248       return NULL;
249     p->crossBlock = cr;
250   }
251   return MTDEC__DATA_PTR_FROM_LINK(cr);
252 }
253 
254 
255 /*
256   ThreadFunc2() returns:
257   0      - in all normal cases (even for stream error or memory allocation error)
258   (!= 0) - WRes error return by system threading function
259 */
260 
261 // #define MTDEC_ProgessStep (1 << 22)
262 #define MTDEC_ProgessStep (1 << 0)
263 
ThreadFunc2(CMtDecThread * t)264 static WRes ThreadFunc2(CMtDecThread *t)
265 {
266   CMtDec *p = t->mtDec;
267 
268   PRF_STR_INT("ThreadFunc2", t->index);
269 
270   // SetThreadAffinityMask(GetCurrentThread(), 1 << t->index);
271 
272   for (;;)
273   {
274     SRes res, codeRes;
275     BoolInt wasInterrupted, isAllocError, overflow, finish;
276     SRes threadingErrorSRes;
277     BoolInt needCode, needWrite, needContinue;
278 
279     size_t inDataSize_Start;
280     UInt64 inDataSize;
281     // UInt64 inDataSize_Full;
282 
283     UInt64 blockIndex;
284 
285     UInt64 inPrev = 0;
286     UInt64 outPrev = 0;
287     UInt64 inCodePos;
288     UInt64 outCodePos;
289 
290     Byte *afterEndData = NULL;
291     size_t afterEndData_Size = 0;
292     BoolInt afterEndData_IsCross = False;
293 
294     BoolInt canCreateNewThread = False;
295     // CMtDecCallbackInfo parse;
296     CMtDecThread *nextThread;
297 
298     PRF_STR_INT("=============== Event_Wait(&t->canRead)", t->index);
299 
300     RINOK_THREAD(Event_Wait(&t->canRead));
301     if (p->exitThread)
302       return 0;
303 
304     PRF_STR_INT("after Event_Wait(&t->canRead)", t->index);
305 
306     // if (t->index == 3) return 19; // for test
307 
308     blockIndex = p->blockIndex++;
309 
310     // PRF(printf("\ncanRead\n"))
311 
312     res = MtDec_Progress_GetError_Spec(p, 0, 0, blockIndex, &wasInterrupted);
313 
314     finish = p->readWasFinished;
315     needCode = False;
316     needWrite = False;
317     isAllocError = False;
318     overflow = False;
319 
320     inDataSize_Start = 0;
321     inDataSize = 0;
322     // inDataSize_Full = 0;
323 
324     if (res == SZ_OK && !wasInterrupted)
325     {
326       // if (p->inStream)
327       {
328         CMtDecBufLink *prev = NULL;
329         CMtDecBufLink *link = (CMtDecBufLink *)t->inBuf;
330         size_t crossSize = p->crossEnd - p->crossStart;
331 
332         PRF(printf("\ncrossSize = %d\n", crossSize));
333 
334         for (;;)
335         {
336           if (!link)
337           {
338             link = (CMtDecBufLink *)ISzAlloc_Alloc(p->alloc, MTDEC__LINK_DATA_OFFSET + p->inBufSize);
339             if (!link)
340             {
341               finish = True;
342               // p->allocError_for_Read_BlockIndex = blockIndex;
343               isAllocError = True;
344               break;
345             }
346             link->next = NULL;
347             if (prev)
348             {
349               // static unsigned g_num = 0;
350               // printf("\n%6d : %x", ++g_num, (unsigned)(size_t)((Byte *)link - (Byte *)prev));
351               prev->next = link;
352             }
353             else
354               t->inBuf = (void *)link;
355           }
356 
357           {
358             Byte *data = MTDEC__DATA_PTR_FROM_LINK(link);
359             Byte *parseData = data;
360             size_t size;
361 
362             if (crossSize != 0)
363             {
364               inDataSize = crossSize;
365               // inDataSize_Full = inDataSize;
366               inDataSize_Start = crossSize;
367               size = crossSize;
368               parseData = MTDEC__DATA_PTR_FROM_LINK(p->crossBlock) + p->crossStart;
369               PRF(printf("\ncross : crossStart = %7d  crossEnd = %7d finish = %1d",
370                   (int)p->crossStart, (int)p->crossEnd, (int)finish));
371             }
372             else
373             {
374               size = p->inBufSize;
375 
376               res = FullRead(p->inStream, data, &size);
377 
378               // size = 10; // test
379 
380               inDataSize += size;
381               // inDataSize_Full = inDataSize;
382               if (!prev)
383                 inDataSize_Start = size;
384 
385               p->readProcessed += size;
386               finish = (size != p->inBufSize);
387               if (finish)
388                 p->readWasFinished = True;
389 
390               // res = E_INVALIDARG; // test
391 
392               if (res != SZ_OK)
393               {
394                 // PRF(printf("\nRead error = %d\n", res))
395                 // we want to decode all data before error
396                 p->readRes = res;
397                 // p->readError_BlockIndex = blockIndex;
398                 p->readWasFinished = True;
399                 finish = True;
400                 res = SZ_OK;
401                 // break;
402               }
403 
404               if (inDataSize - inPrev >= MTDEC_ProgessStep)
405               {
406                 res = MtDec_Progress_GetError_Spec(p, 0, 0, blockIndex, &wasInterrupted);
407                 if (res != SZ_OK || wasInterrupted)
408                   break;
409                 inPrev = inDataSize;
410               }
411             }
412 
413             {
414               CMtDecCallbackInfo parse;
415 
416               parse.startCall = (prev == NULL);
417               parse.src = parseData;
418               parse.srcSize = size;
419               parse.srcFinished = finish;
420               parse.canCreateNewThread = True;
421 
422               PRF(printf("\nParse size = %d\n", (unsigned)size));
423 
424               p->mtCallback->Parse(p->mtCallbackObject, t->index, &parse);
425 
426               PRF(printf("   Parse processed = %d, state = %d \n", (unsigned)parse.srcSize, (unsigned)parse.state));
427 
428               needWrite = True;
429               canCreateNewThread = parse.canCreateNewThread;
430 
431               // printf("\n\n%12I64u %12I64u", (UInt64)p->mtProgress.totalInSize, (UInt64)p->mtProgress.totalOutSize);
432 
433               if (
434                   // parseRes != SZ_OK ||
435                   // inDataSize - (size - parse.srcSize) > p->inBlockMax
436                   // ||
437                   parse.state == MTDEC_PARSE_OVERFLOW
438                   // || wasInterrupted
439                   )
440               {
441                 // Overflow or Parse error - switch from MT decoding to ST decoding
442                 finish = True;
443                 overflow = True;
444 
445                 {
446                   PRF(printf("\n Overflow"));
447                   // PRF(printf("\nisBlockFinished = %d", (unsigned)parse.blockWasFinished));
448                   PRF(printf("\n inDataSize = %d", (unsigned)inDataSize));
449                 }
450 
451                 if (crossSize != 0)
452                   memcpy(data, parseData, size);
453                 p->crossStart = 0;
454                 p->crossEnd = 0;
455                 break;
456               }
457 
458               if (crossSize != 0)
459               {
460                 memcpy(data, parseData, parse.srcSize);
461                 p->crossStart += parse.srcSize;
462               }
463 
464               if (parse.state != MTDEC_PARSE_CONTINUE || finish)
465               {
466                 // we don't need to parse in current thread anymore
467 
468                 if (parse.state == MTDEC_PARSE_END)
469                   finish = True;
470 
471                 needCode = True;
472                 // p->crossFinished = finish;
473 
474                 if (parse.srcSize == size)
475                 {
476                   // full parsed - no cross transfer
477                   p->crossStart = 0;
478                   p->crossEnd = 0;
479                   break;
480                 }
481 
482                 if (parse.state == MTDEC_PARSE_END)
483                 {
484                   afterEndData = parseData + parse.srcSize;
485                   afterEndData_Size = size - parse.srcSize;
486                   if (crossSize != 0)
487                     afterEndData_IsCross = True;
488                   // we reduce data size to required bytes (parsed only)
489                   inDataSize -= afterEndData_Size;
490                   if (!prev)
491                     inDataSize_Start = parse.srcSize;
492                   break;
493                 }
494 
495                 {
496                   // partial parsed - need cross transfer
497                   if (crossSize != 0)
498                     inDataSize = parse.srcSize; // it's only parsed now
499                   else
500                   {
501                     // partial parsed - is not in initial cross block - we need to copy new data to cross block
502                     Byte *cr = MtDec_GetCrossBuff(p);
503                     if (!cr)
504                     {
505                       {
506                         PRF(printf("\ncross alloc error error\n"));
507                         // res = SZ_ERROR_MEM;
508                         finish = True;
509                         // p->allocError_for_Read_BlockIndex = blockIndex;
510                         isAllocError = True;
511                         break;
512                       }
513                     }
514 
515                     {
516                       size_t crSize = size - parse.srcSize;
517                       inDataSize -= crSize;
518                       p->crossEnd = crSize;
519                       p->crossStart = 0;
520                       memcpy(cr, parseData + parse.srcSize, crSize);
521                     }
522                   }
523 
524                   // inDataSize_Full = inDataSize;
525                   if (!prev)
526                     inDataSize_Start = parse.srcSize; // it's partial size (parsed only)
527 
528                   finish = False;
529                   break;
530                 }
531               }
532 
533               if (parse.srcSize != size)
534               {
535                 res = SZ_ERROR_FAIL;
536                 PRF(printf("\nfinished error SZ_ERROR_FAIL = %d\n", res));
537                 break;
538               }
539             }
540           }
541 
542           prev = link;
543           link = link->next;
544 
545           if (crossSize != 0)
546           {
547             crossSize = 0;
548             p->crossStart = 0;
549             p->crossEnd = 0;
550           }
551         }
552       }
553 
554       if (res == SZ_OK)
555         res = MtDec_GetError_Spec(p, blockIndex, &wasInterrupted);
556     }
557 
558     codeRes = SZ_OK;
559 
560     if (res == SZ_OK && needCode && !wasInterrupted)
561     {
562       codeRes = p->mtCallback->PreCode(p->mtCallbackObject, t->index);
563       if (codeRes != SZ_OK)
564       {
565         needCode = False;
566         finish = True;
567         // SZ_ERROR_MEM is expected error here.
568         //   if (codeRes == SZ_ERROR_MEM) - we will try single-thread decoding later.
569         //   if (codeRes != SZ_ERROR_MEM) - we can stop decoding or try single-thread decoding.
570       }
571     }
572 
573     if (res != SZ_OK || wasInterrupted)
574       finish = True;
575 
576     nextThread = NULL;
577     threadingErrorSRes = SZ_OK;
578 
579     if (!finish)
580     {
581       if (p->numStartedThreads < p->numStartedThreads_Limit && canCreateNewThread)
582       {
583         SRes res2 = MtDecThread_CreateAndStart(&p->threads[p->numStartedThreads]);
584         if (res2 == SZ_OK)
585         {
586           // if (p->numStartedThreads % 1000 == 0) PRF(printf("\n numStartedThreads=%d\n", p->numStartedThreads));
587           p->numStartedThreads++;
588         }
589         else
590         {
591           PRF(printf("\nERROR: numStartedThreads=%d\n", p->numStartedThreads));
592           if (p->numStartedThreads == 1)
593           {
594             // if only one thread is possible, we leave muti-threading code
595             finish = True;
596             needCode = False;
597             threadingErrorSRes = res2;
598           }
599           else
600             p->numStartedThreads_Limit = p->numStartedThreads;
601         }
602       }
603 
604       if (!finish)
605       {
606         unsigned nextIndex = t->index + 1;
607         nextThread = &p->threads[nextIndex >= p->numStartedThreads ? 0 : nextIndex];
608         RINOK_THREAD(Event_Set(&nextThread->canRead))
609         // We have started executing for new iteration (with next thread)
610         // And that next thread now is responsible for possible exit from decoding (threading_code)
611       }
612     }
613 
614     // each call of Event_Set(&nextThread->canRead) must be followed by call of Event_Set(&nextThread->canWrite)
615     // if ( !finish ) we must call Event_Set(&nextThread->canWrite) in any case
616     // if (  finish ) we switch to single-thread mode and there are 2 ways at the end of current iteration (current block):
617     //   - if (needContinue) after Write(&needContinue), we restore decoding with new iteration
618     //   - otherwise we stop decoding and exit from ThreadFunc2()
619 
620     // Don't change (finish) variable in the further code
621 
622 
623     // ---------- CODE ----------
624 
625     inPrev = 0;
626     outPrev = 0;
627     inCodePos = 0;
628     outCodePos = 0;
629 
630     if (res == SZ_OK && needCode && codeRes == SZ_OK)
631     {
632       BoolInt isStartBlock = True;
633       CMtDecBufLink *link = (CMtDecBufLink *)t->inBuf;
634 
635       for (;;)
636       {
637         size_t inSize;
638         int stop;
639 
640         if (isStartBlock)
641           inSize = inDataSize_Start;
642         else
643         {
644           UInt64 rem = inDataSize - inCodePos;
645           inSize = p->inBufSize;
646           if (inSize > rem)
647             inSize = (size_t)rem;
648         }
649 
650         inCodePos += inSize;
651         stop = True;
652 
653         codeRes = p->mtCallback->Code(p->mtCallbackObject, t->index,
654             (const Byte *)MTDEC__DATA_PTR_FROM_LINK(link), inSize,
655             (inCodePos == inDataSize), // srcFinished
656             &inCodePos, &outCodePos, &stop);
657 
658         if (codeRes != SZ_OK)
659         {
660           PRF(printf("\nCode Interrupt error = %x\n", codeRes));
661           // we interrupt only later blocks
662           MtDec_Interrupt(p, blockIndex);
663           break;
664         }
665 
666         if (stop || inCodePos == inDataSize)
667           break;
668 
669         {
670           const UInt64 inDelta = inCodePos - inPrev;
671           const UInt64 outDelta = outCodePos - outPrev;
672           if (inDelta >= MTDEC_ProgessStep || outDelta >= MTDEC_ProgessStep)
673           {
674             // Sleep(1);
675             res = MtDec_Progress_GetError_Spec(p, inDelta, outDelta, blockIndex, &wasInterrupted);
676             if (res != SZ_OK || wasInterrupted)
677               break;
678             inPrev = inCodePos;
679             outPrev = outCodePos;
680           }
681         }
682 
683         link = link->next;
684         isStartBlock = False;
685       }
686     }
687 
688 
689     // ---------- WRITE ----------
690 
691     RINOK_THREAD(Event_Wait(&t->canWrite));
692 
693   {
694     BoolInt isErrorMode = False;
695     BoolInt canRecode = True;
696     BoolInt needWriteToStream = needWrite;
697 
698     if (p->exitThread) return 0; // it's never executed in normal cases
699 
700     if (p->wasInterrupted)
701       wasInterrupted = True;
702     else
703     {
704       if (codeRes != SZ_OK) // || !needCode // check it !!!
705       {
706         p->wasInterrupted = True;
707         p->codeRes = codeRes;
708         if (codeRes == SZ_ERROR_MEM)
709           isAllocError = True;
710       }
711 
712       if (threadingErrorSRes)
713       {
714         p->wasInterrupted = True;
715         p->threadingErrorSRes = threadingErrorSRes;
716         needWriteToStream = False;
717       }
718       if (isAllocError)
719       {
720         p->wasInterrupted = True;
721         p->isAllocError = True;
722         needWriteToStream = False;
723       }
724       if (overflow)
725       {
726         p->wasInterrupted = True;
727         p->overflow = True;
728         needWriteToStream = False;
729       }
730     }
731 
732     if (needCode)
733     {
734       if (wasInterrupted)
735       {
736         inCodePos = 0;
737         outCodePos = 0;
738       }
739       {
740         const UInt64 inDelta = inCodePos - inPrev;
741         const UInt64 outDelta = outCodePos - outPrev;
742         // if (inDelta != 0 || outDelta != 0)
743         res = MtProgress_ProgressAdd(&p->mtProgress, inDelta, outDelta);
744       }
745     }
746 
747     needContinue = (!finish);
748 
749     // if (res == SZ_OK && needWrite && !wasInterrupted)
750     if (needWrite)
751     {
752       // p->inProcessed += inCodePos;
753 
754       PRF(printf("\n--Write afterSize = %d\n", (unsigned)afterEndData_Size));
755 
756       res = p->mtCallback->Write(p->mtCallbackObject, t->index,
757           res == SZ_OK && needWriteToStream && !wasInterrupted, // needWrite
758           afterEndData, afterEndData_Size, afterEndData_IsCross,
759           &needContinue,
760           &canRecode);
761 
762       // res = SZ_ERROR_FAIL; // for test
763 
764       PRF(printf("\nAfter Write needContinue = %d\n", (unsigned)needContinue));
765       PRF(printf("\nprocessed = %d\n", (unsigned)p->inProcessed));
766 
767       if (res != SZ_OK)
768       {
769         PRF(printf("\nWrite error = %d\n", res));
770         isErrorMode = True;
771         p->wasInterrupted = True;
772       }
773       if (res != SZ_OK
774           || (!needContinue && !finish))
775       {
776         PRF(printf("\nWrite Interrupt error = %x\n", res));
777         MtDec_Interrupt(p, blockIndex);
778       }
779     }
780 
781     if (canRecode)
782     if (!needCode
783         || res != SZ_OK
784         || p->wasInterrupted
785         || codeRes != SZ_OK
786         || wasInterrupted
787         || p->numFilledThreads != 0
788         || isErrorMode)
789     {
790       if (p->numFilledThreads == 0)
791         p->filledThreadStart = t->index;
792       if (inDataSize != 0 || !finish)
793       {
794         t->inDataSize_Start = inDataSize_Start;
795         t->inDataSize = inDataSize;
796         p->numFilledThreads++;
797       }
798       PRF(printf("\np->numFilledThreads = %d\n", p->numFilledThreads));
799       PRF(printf("p->filledThreadStart = %d\n", p->filledThreadStart));
800     }
801 
802     if (!finish)
803     {
804       RINOK_THREAD(Event_Set(&nextThread->canWrite));
805     }
806     else
807     {
808       if (needContinue)
809       {
810         // we restore decoding with new iteration
811         RINOK_THREAD(Event_Set(&p->threads[0].canWrite));
812       }
813       else
814       {
815         // we exit from decoding
816         if (t->index == 0)
817           return SZ_OK;
818         p->exitThread = True;
819       }
820       RINOK_THREAD(Event_Set(&p->threads[0].canRead));
821     }
822   }
823   }
824 }
825 
826 #ifdef _WIN32
827 #define USE_ALLOCA
828 #endif
829 
830 #ifdef USE_ALLOCA
831 #ifdef _WIN32
832 #include <malloc.h>
833 #else
834 #include <stdlib.h>
835 #endif
836 #endif
837 
838 
ThreadFunc1(void * pp)839 static THREAD_FUNC_DECL ThreadFunc1(void *pp)
840 {
841   WRes res;
842 
843   CMtDecThread *t = (CMtDecThread *)pp;
844   CMtDec *p;
845 
846   // fprintf(stdout, "\n%d = %p\n", t->index, &t);
847 
848   res = ThreadFunc2(t);
849   p = t->mtDec;
850   if (res == 0)
851     return (THREAD_FUNC_RET_TYPE)(UINT_PTR)p->exitThreadWRes;
852   {
853     // it's unexpected situation for some threading function error
854     if (p->exitThreadWRes == 0)
855       p->exitThreadWRes = res;
856     PRF(printf("\nthread exit error = %d\n", res));
857     p->exitThread = True;
858     Event_Set(&p->threads[0].canRead);
859     Event_Set(&p->threads[0].canWrite);
860     MtProgress_SetError(&p->mtProgress, MY_SRes_HRESULT_FROM_WRes(res));
861   }
862   return (THREAD_FUNC_RET_TYPE)(UINT_PTR)res;
863 }
864 
ThreadFunc(void * pp)865 static MY_NO_INLINE THREAD_FUNC_DECL ThreadFunc(void *pp)
866 {
867   #ifdef USE_ALLOCA
868   CMtDecThread *t = (CMtDecThread *)pp;
869   // fprintf(stderr, "\n%d = %p - before", t->index, &t);
870   t->allocaPtr = alloca(t->index * 128);
871   #endif
872   return ThreadFunc1(pp);
873 }
874 
875 
MtDec_PrepareRead(CMtDec * p)876 int MtDec_PrepareRead(CMtDec *p)
877 {
878   if (p->crossBlock && p->crossStart == p->crossEnd)
879   {
880     ISzAlloc_Free(p->alloc, p->crossBlock);
881     p->crossBlock = NULL;
882   }
883 
884   {
885     unsigned i;
886     for (i = 0; i < MTDEC__THREADS_MAX; i++)
887       if (i > p->numStartedThreads
888           || p->numFilledThreads <=
889             (i >= p->filledThreadStart ?
890               i - p->filledThreadStart :
891               i + p->numStartedThreads - p->filledThreadStart))
892         MtDecThread_FreeInBufs(&p->threads[i]);
893   }
894 
895   return (p->numFilledThreads != 0) || (p->crossStart != p->crossEnd);
896 }
897 
898 
MtDec_Read(CMtDec * p,size_t * inLim)899 const Byte *MtDec_Read(CMtDec *p, size_t *inLim)
900 {
901   while (p->numFilledThreads != 0)
902   {
903     CMtDecThread *t = &p->threads[p->filledThreadStart];
904 
905     if (*inLim != 0)
906     {
907       {
908         void *link = t->inBuf;
909         void *next = ((CMtDecBufLink *)link)->next;
910         ISzAlloc_Free(p->alloc, link);
911         t->inBuf = next;
912       }
913 
914       if (t->inDataSize == 0)
915       {
916         MtDecThread_FreeInBufs(t);
917         if (--p->numFilledThreads == 0)
918           break;
919         if (++p->filledThreadStart == p->numStartedThreads)
920           p->filledThreadStart = 0;
921         t = &p->threads[p->filledThreadStart];
922       }
923     }
924 
925     {
926       size_t lim = t->inDataSize_Start;
927       if (lim != 0)
928         t->inDataSize_Start = 0;
929       else
930       {
931         UInt64 rem = t->inDataSize;
932         lim = p->inBufSize;
933         if (lim > rem)
934           lim = (size_t)rem;
935       }
936       t->inDataSize -= lim;
937       *inLim = lim;
938       return (const Byte *)MTDEC__DATA_PTR_FROM_LINK(t->inBuf);
939     }
940   }
941 
942   {
943     size_t crossSize = p->crossEnd - p->crossStart;
944     if (crossSize != 0)
945     {
946       const Byte *data = MTDEC__DATA_PTR_FROM_LINK(p->crossBlock) + p->crossStart;
947       *inLim = crossSize;
948       p->crossStart = 0;
949       p->crossEnd = 0;
950       return data;
951     }
952     *inLim = 0;
953     if (p->crossBlock)
954     {
955       ISzAlloc_Free(p->alloc, p->crossBlock);
956       p->crossBlock = NULL;
957     }
958     return NULL;
959   }
960 }
961 
962 
MtDec_Construct(CMtDec * p)963 void MtDec_Construct(CMtDec *p)
964 {
965   unsigned i;
966 
967   p->inBufSize = (size_t)1 << 18;
968 
969   p->numThreadsMax = 0;
970 
971   p->inStream = NULL;
972 
973   // p->inData = NULL;
974   // p->inDataSize = 0;
975 
976   p->crossBlock = NULL;
977   p->crossStart = 0;
978   p->crossEnd = 0;
979 
980   p->numFilledThreads = 0;
981 
982   p->progress = NULL;
983   p->alloc = NULL;
984 
985   p->mtCallback = NULL;
986   p->mtCallbackObject = NULL;
987 
988   p->allocatedBufsSize = 0;
989 
990   for (i = 0; i < MTDEC__THREADS_MAX; i++)
991   {
992     CMtDecThread *t = &p->threads[i];
993     t->mtDec = p;
994     t->index = i;
995     t->inBuf = NULL;
996     Event_Construct(&t->canRead);
997     Event_Construct(&t->canWrite);
998     Thread_Construct(&t->thread);
999   }
1000 
1001   // Event_Construct(&p->finishedEvent);
1002 
1003   CriticalSection_Init(&p->mtProgress.cs);
1004 }
1005 
1006 
MtDec_Free(CMtDec * p)1007 static void MtDec_Free(CMtDec *p)
1008 {
1009   unsigned i;
1010 
1011   p->exitThread = True;
1012 
1013   for (i = 0; i < MTDEC__THREADS_MAX; i++)
1014     MtDecThread_Destruct(&p->threads[i]);
1015 
1016   // Event_Close(&p->finishedEvent);
1017 
1018   if (p->crossBlock)
1019   {
1020     ISzAlloc_Free(p->alloc, p->crossBlock);
1021     p->crossBlock = NULL;
1022   }
1023 }
1024 
1025 
MtDec_Destruct(CMtDec * p)1026 void MtDec_Destruct(CMtDec *p)
1027 {
1028   MtDec_Free(p);
1029 
1030   CriticalSection_Delete(&p->mtProgress.cs);
1031 }
1032 
1033 
MtDec_Code(CMtDec * p)1034 SRes MtDec_Code(CMtDec *p)
1035 {
1036   unsigned i;
1037 
1038   p->inProcessed = 0;
1039 
1040   p->blockIndex = 1; // it must be larger than not_defined index (0)
1041   p->isAllocError = False;
1042   p->overflow = False;
1043   p->threadingErrorSRes = SZ_OK;
1044 
1045   p->needContinue = True;
1046 
1047   p->readWasFinished = False;
1048   p->needInterrupt = False;
1049   p->interruptIndex = (UInt64)(Int64)-1;
1050 
1051   p->readProcessed = 0;
1052   p->readRes = SZ_OK;
1053   p->codeRes = SZ_OK;
1054   p->wasInterrupted = False;
1055 
1056   p->crossStart = 0;
1057   p->crossEnd = 0;
1058 
1059   p->filledThreadStart = 0;
1060   p->numFilledThreads = 0;
1061 
1062   {
1063     unsigned numThreads = p->numThreadsMax;
1064     if (numThreads > MTDEC__THREADS_MAX)
1065       numThreads = MTDEC__THREADS_MAX;
1066     p->numStartedThreads_Limit = numThreads;
1067     p->numStartedThreads = 0;
1068   }
1069 
1070   if (p->inBufSize != p->allocatedBufsSize)
1071   {
1072     for (i = 0; i < MTDEC__THREADS_MAX; i++)
1073     {
1074       CMtDecThread *t = &p->threads[i];
1075       if (t->inBuf)
1076         MtDecThread_FreeInBufs(t);
1077     }
1078     if (p->crossBlock)
1079     {
1080       ISzAlloc_Free(p->alloc, p->crossBlock);
1081       p->crossBlock = NULL;
1082     }
1083 
1084     p->allocatedBufsSize = p->inBufSize;
1085   }
1086 
1087   MtProgress_Init(&p->mtProgress, p->progress);
1088 
1089   // RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->finishedEvent));
1090   p->exitThread = False;
1091   p->exitThreadWRes = 0;
1092 
1093   {
1094     WRes wres;
1095     SRes sres;
1096     CMtDecThread *nextThread = &p->threads[p->numStartedThreads++];
1097     // wres = MtDecThread_CreateAndStart(nextThread);
1098     wres = MtDecThread_CreateEvents(nextThread);
1099     if (wres == 0) { wres = Event_Set(&nextThread->canWrite);
1100     if (wres == 0) { wres = Event_Set(&nextThread->canRead);
1101     if (wres == 0) { THREAD_FUNC_RET_TYPE res = ThreadFunc(nextThread);
1102     wres = (WRes)(UINT_PTR)res;
1103     if (wres != 0)
1104     {
1105       p->needContinue = False;
1106       MtDec_CloseThreads(p);
1107     }}}}
1108 
1109     // wres = 17; // for test
1110     // wres = Event_Wait(&p->finishedEvent);
1111 
1112     sres = MY_SRes_HRESULT_FROM_WRes(wres);
1113 
1114     if (sres != 0)
1115       p->threadingErrorSRes = sres;
1116 
1117     if (
1118         // wres == 0
1119         // wres != 0
1120         // || p->mtc.codeRes == SZ_ERROR_MEM
1121         p->isAllocError
1122         || p->threadingErrorSRes != SZ_OK
1123         || p->overflow)
1124     {
1125       // p->needContinue = True;
1126     }
1127     else
1128       p->needContinue = False;
1129 
1130     if (p->needContinue)
1131       return SZ_OK;
1132 
1133     // if (sres != SZ_OK)
1134     return sres;
1135     // return SZ_ERROR_FAIL;
1136   }
1137 }
1138 
1139 #endif
1140