• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* MtDec.c -- Multi-thread Decoder
2 2023-04-02 : Igor Pavlov : Public domain */
3 
4 #include "Precomp.h"
5 
6 // #define SHOW_DEBUG_INFO
7 
8 // #include <stdio.h>
9 #include <string.h>
10 
11 #ifdef SHOW_DEBUG_INFO
12 #include <stdio.h>
13 #endif
14 
15 #include "MtDec.h"
16 
17 #ifndef Z7_ST
18 
19 #ifdef SHOW_DEBUG_INFO
20 #define PRF(x) x
21 #else
22 #define PRF(x)
23 #endif
24 
25 #define PRF_STR_INT(s, d) PRF(printf("\n" s " %d\n", (unsigned)d))
26 
MtProgress_Init(CMtProgress * p,ICompressProgressPtr progress)27 void MtProgress_Init(CMtProgress *p, ICompressProgressPtr progress)
28 {
29   p->progress = progress;
30   p->res = SZ_OK;
31   p->totalInSize = 0;
32   p->totalOutSize = 0;
33 }
34 
35 
MtProgress_Progress_ST(CMtProgress * p)36 SRes MtProgress_Progress_ST(CMtProgress *p)
37 {
38   if (p->res == SZ_OK && p->progress)
39     if (ICompressProgress_Progress(p->progress, p->totalInSize, p->totalOutSize) != SZ_OK)
40       p->res = SZ_ERROR_PROGRESS;
41   return p->res;
42 }
43 
44 
MtProgress_ProgressAdd(CMtProgress * p,UInt64 inSize,UInt64 outSize)45 SRes MtProgress_ProgressAdd(CMtProgress *p, UInt64 inSize, UInt64 outSize)
46 {
47   SRes res;
48   CriticalSection_Enter(&p->cs);
49 
50   p->totalInSize += inSize;
51   p->totalOutSize += outSize;
52   if (p->res == SZ_OK && p->progress)
53     if (ICompressProgress_Progress(p->progress, p->totalInSize, p->totalOutSize) != SZ_OK)
54       p->res = SZ_ERROR_PROGRESS;
55   res = p->res;
56 
57   CriticalSection_Leave(&p->cs);
58   return res;
59 }
60 
61 
MtProgress_GetError(CMtProgress * p)62 SRes MtProgress_GetError(CMtProgress *p)
63 {
64   SRes res;
65   CriticalSection_Enter(&p->cs);
66   res = p->res;
67   CriticalSection_Leave(&p->cs);
68   return res;
69 }
70 
71 
MtProgress_SetError(CMtProgress * p,SRes res)72 void MtProgress_SetError(CMtProgress *p, SRes res)
73 {
74   CriticalSection_Enter(&p->cs);
75   if (p->res == SZ_OK)
76     p->res = res;
77   CriticalSection_Leave(&p->cs);
78 }
79 
80 
81 #define RINOK_THREAD(x) RINOK_WRes(x)
82 
83 
84 struct CMtDecBufLink_
85 {
86   struct CMtDecBufLink_ *next;
87   void *pad[3];
88 };
89 
90 typedef struct CMtDecBufLink_ CMtDecBufLink;
91 
92 #define MTDEC__LINK_DATA_OFFSET sizeof(CMtDecBufLink)
93 #define MTDEC__DATA_PTR_FROM_LINK(link) ((Byte *)(link) + MTDEC__LINK_DATA_OFFSET)
94 
95 
96 
97 static THREAD_FUNC_DECL MtDec_ThreadFunc(void *pp);
98 
99 
MtDecThread_CreateEvents(CMtDecThread * t)100 static WRes MtDecThread_CreateEvents(CMtDecThread *t)
101 {
102   WRes wres = AutoResetEvent_OptCreate_And_Reset(&t->canWrite);
103   if (wres == 0)
104   {
105     wres = AutoResetEvent_OptCreate_And_Reset(&t->canRead);
106     if (wres == 0)
107       return SZ_OK;
108   }
109   return wres;
110 }
111 
112 
MtDecThread_CreateAndStart(CMtDecThread * t)113 static SRes MtDecThread_CreateAndStart(CMtDecThread *t)
114 {
115   WRes wres = MtDecThread_CreateEvents(t);
116   // wres = 17; // for test
117   if (wres == 0)
118   {
119     if (Thread_WasCreated(&t->thread))
120       return SZ_OK;
121     wres = Thread_Create(&t->thread, MtDec_ThreadFunc, t);
122     if (wres == 0)
123       return SZ_OK;
124   }
125   return MY_SRes_HRESULT_FROM_WRes(wres);
126 }
127 
128 
MtDecThread_FreeInBufs(CMtDecThread * t)129 void MtDecThread_FreeInBufs(CMtDecThread *t)
130 {
131   if (t->inBuf)
132   {
133     void *link = t->inBuf;
134     t->inBuf = NULL;
135     do
136     {
137       void *next = ((CMtDecBufLink *)link)->next;
138       ISzAlloc_Free(t->mtDec->alloc, link);
139       link = next;
140     }
141     while (link);
142   }
143 }
144 
145 
MtDecThread_CloseThread(CMtDecThread * t)146 static void MtDecThread_CloseThread(CMtDecThread *t)
147 {
148   if (Thread_WasCreated(&t->thread))
149   {
150     Event_Set(&t->canWrite); /* we can disable it. There are no threads waiting canWrite in normal cases */
151     Event_Set(&t->canRead);
152     Thread_Wait_Close(&t->thread);
153   }
154 
155   Event_Close(&t->canRead);
156   Event_Close(&t->canWrite);
157 }
158 
MtDec_CloseThreads(CMtDec * p)159 static void MtDec_CloseThreads(CMtDec *p)
160 {
161   unsigned i;
162   for (i = 0; i < MTDEC_THREADS_MAX; i++)
163     MtDecThread_CloseThread(&p->threads[i]);
164 }
165 
MtDecThread_Destruct(CMtDecThread * t)166 static void MtDecThread_Destruct(CMtDecThread *t)
167 {
168   MtDecThread_CloseThread(t);
169   MtDecThread_FreeInBufs(t);
170 }
171 
172 
173 
MtDec_GetError_Spec(CMtDec * p,UInt64 interruptIndex,BoolInt * wasInterrupted)174 static SRes MtDec_GetError_Spec(CMtDec *p, UInt64 interruptIndex, BoolInt *wasInterrupted)
175 {
176   SRes res;
177   CriticalSection_Enter(&p->mtProgress.cs);
178   *wasInterrupted = (p->needInterrupt && interruptIndex > p->interruptIndex);
179   res = p->mtProgress.res;
180   CriticalSection_Leave(&p->mtProgress.cs);
181   return res;
182 }
183 
MtDec_Progress_GetError_Spec(CMtDec * p,UInt64 inSize,UInt64 outSize,UInt64 interruptIndex,BoolInt * wasInterrupted)184 static SRes MtDec_Progress_GetError_Spec(CMtDec *p, UInt64 inSize, UInt64 outSize, UInt64 interruptIndex, BoolInt *wasInterrupted)
185 {
186   SRes res;
187   CriticalSection_Enter(&p->mtProgress.cs);
188 
189   p->mtProgress.totalInSize += inSize;
190   p->mtProgress.totalOutSize += outSize;
191   if (p->mtProgress.res == SZ_OK && p->mtProgress.progress)
192     if (ICompressProgress_Progress(p->mtProgress.progress, p->mtProgress.totalInSize, p->mtProgress.totalOutSize) != SZ_OK)
193       p->mtProgress.res = SZ_ERROR_PROGRESS;
194 
195   *wasInterrupted = (p->needInterrupt && interruptIndex > p->interruptIndex);
196   res = p->mtProgress.res;
197 
198   CriticalSection_Leave(&p->mtProgress.cs);
199 
200   return res;
201 }
202 
MtDec_Interrupt(CMtDec * p,UInt64 interruptIndex)203 static void MtDec_Interrupt(CMtDec *p, UInt64 interruptIndex)
204 {
205   CriticalSection_Enter(&p->mtProgress.cs);
206   if (!p->needInterrupt || interruptIndex < p->interruptIndex)
207   {
208     p->interruptIndex = interruptIndex;
209     p->needInterrupt = True;
210   }
211   CriticalSection_Leave(&p->mtProgress.cs);
212 }
213 
MtDec_GetCrossBuff(CMtDec * p)214 Byte *MtDec_GetCrossBuff(CMtDec *p)
215 {
216   Byte *cr = p->crossBlock;
217   if (!cr)
218   {
219     cr = (Byte *)ISzAlloc_Alloc(p->alloc, MTDEC__LINK_DATA_OFFSET + p->inBufSize);
220     if (!cr)
221       return NULL;
222     p->crossBlock = cr;
223   }
224   return MTDEC__DATA_PTR_FROM_LINK(cr);
225 }
226 
227 
228 /*
229   MtDec_ThreadFunc2() returns:
230   0      - in all normal cases (even for stream error or memory allocation error)
231   (!= 0) - WRes error return by system threading function
232 */
233 
234 // #define MTDEC_ProgessStep (1 << 22)
235 #define MTDEC_ProgessStep (1 << 0)
236 
MtDec_ThreadFunc2(CMtDecThread * t)237 static WRes MtDec_ThreadFunc2(CMtDecThread *t)
238 {
239   CMtDec *p = t->mtDec;
240 
241   PRF_STR_INT("MtDec_ThreadFunc2", t->index)
242 
243   // SetThreadAffinityMask(GetCurrentThread(), 1 << t->index);
244 
245   for (;;)
246   {
247     SRes res, codeRes;
248     BoolInt wasInterrupted, isAllocError, overflow, finish;
249     SRes threadingErrorSRes;
250     BoolInt needCode, needWrite, needContinue;
251 
252     size_t inDataSize_Start;
253     UInt64 inDataSize;
254     // UInt64 inDataSize_Full;
255 
256     UInt64 blockIndex;
257 
258     UInt64 inPrev = 0;
259     UInt64 outPrev = 0;
260     UInt64 inCodePos;
261     UInt64 outCodePos;
262 
263     Byte *afterEndData = NULL;
264     size_t afterEndData_Size = 0;
265     BoolInt afterEndData_IsCross = False;
266 
267     BoolInt canCreateNewThread = False;
268     // CMtDecCallbackInfo parse;
269     CMtDecThread *nextThread;
270 
271     PRF_STR_INT("=============== Event_Wait(&t->canRead)", t->index)
272 
273     RINOK_THREAD(Event_Wait(&t->canRead))
274     if (p->exitThread)
275       return 0;
276 
277     PRF_STR_INT("after Event_Wait(&t->canRead)", t->index)
278 
279     // if (t->index == 3) return 19; // for test
280 
281     blockIndex = p->blockIndex++;
282 
283     // PRF(printf("\ncanRead\n"))
284 
285     res = MtDec_Progress_GetError_Spec(p, 0, 0, blockIndex, &wasInterrupted);
286 
287     finish = p->readWasFinished;
288     needCode = False;
289     needWrite = False;
290     isAllocError = False;
291     overflow = False;
292 
293     inDataSize_Start = 0;
294     inDataSize = 0;
295     // inDataSize_Full = 0;
296 
297     if (res == SZ_OK && !wasInterrupted)
298     {
299       // if (p->inStream)
300       {
301         CMtDecBufLink *prev = NULL;
302         CMtDecBufLink *link = (CMtDecBufLink *)t->inBuf;
303         size_t crossSize = p->crossEnd - p->crossStart;
304 
305         PRF(printf("\ncrossSize = %d\n", crossSize));
306 
307         for (;;)
308         {
309           if (!link)
310           {
311             link = (CMtDecBufLink *)ISzAlloc_Alloc(p->alloc, MTDEC__LINK_DATA_OFFSET + p->inBufSize);
312             if (!link)
313             {
314               finish = True;
315               // p->allocError_for_Read_BlockIndex = blockIndex;
316               isAllocError = True;
317               break;
318             }
319             link->next = NULL;
320             if (prev)
321             {
322               // static unsigned g_num = 0;
323               // printf("\n%6d : %x", ++g_num, (unsigned)(size_t)((Byte *)link - (Byte *)prev));
324               prev->next = link;
325             }
326             else
327               t->inBuf = (void *)link;
328           }
329 
330           {
331             Byte *data = MTDEC__DATA_PTR_FROM_LINK(link);
332             Byte *parseData = data;
333             size_t size;
334 
335             if (crossSize != 0)
336             {
337               inDataSize = crossSize;
338               // inDataSize_Full = inDataSize;
339               inDataSize_Start = crossSize;
340               size = crossSize;
341               parseData = MTDEC__DATA_PTR_FROM_LINK(p->crossBlock) + p->crossStart;
342               PRF(printf("\ncross : crossStart = %7d  crossEnd = %7d finish = %1d",
343                   (int)p->crossStart, (int)p->crossEnd, (int)finish));
344             }
345             else
346             {
347               size = p->inBufSize;
348 
349               res = SeqInStream_ReadMax(p->inStream, data, &size);
350 
351               // size = 10; // test
352 
353               inDataSize += size;
354               // inDataSize_Full = inDataSize;
355               if (!prev)
356                 inDataSize_Start = size;
357 
358               p->readProcessed += size;
359               finish = (size != p->inBufSize);
360               if (finish)
361                 p->readWasFinished = True;
362 
363               // res = E_INVALIDARG; // test
364 
365               if (res != SZ_OK)
366               {
367                 // PRF(printf("\nRead error = %d\n", res))
368                 // we want to decode all data before error
369                 p->readRes = res;
370                 // p->readError_BlockIndex = blockIndex;
371                 p->readWasFinished = True;
372                 finish = True;
373                 res = SZ_OK;
374                 // break;
375               }
376 
377               if (inDataSize - inPrev >= MTDEC_ProgessStep)
378               {
379                 res = MtDec_Progress_GetError_Spec(p, 0, 0, blockIndex, &wasInterrupted);
380                 if (res != SZ_OK || wasInterrupted)
381                   break;
382                 inPrev = inDataSize;
383               }
384             }
385 
386             {
387               CMtDecCallbackInfo parse;
388 
389               parse.startCall = (prev == NULL);
390               parse.src = parseData;
391               parse.srcSize = size;
392               parse.srcFinished = finish;
393               parse.canCreateNewThread = True;
394 
395               PRF(printf("\nParse size = %d\n", (unsigned)size));
396 
397               p->mtCallback->Parse(p->mtCallbackObject, t->index, &parse);
398 
399               PRF(printf("   Parse processed = %d, state = %d \n", (unsigned)parse.srcSize, (unsigned)parse.state));
400 
401               needWrite = True;
402               canCreateNewThread = parse.canCreateNewThread;
403 
404               // printf("\n\n%12I64u %12I64u", (UInt64)p->mtProgress.totalInSize, (UInt64)p->mtProgress.totalOutSize);
405 
406               if (
407                   // parseRes != SZ_OK ||
408                   // inDataSize - (size - parse.srcSize) > p->inBlockMax
409                   // ||
410                   parse.state == MTDEC_PARSE_OVERFLOW
411                   // || wasInterrupted
412                   )
413               {
414                 // Overflow or Parse error - switch from MT decoding to ST decoding
415                 finish = True;
416                 overflow = True;
417 
418                 {
419                   PRF(printf("\n Overflow"));
420                   // PRF(printf("\nisBlockFinished = %d", (unsigned)parse.blockWasFinished));
421                   PRF(printf("\n inDataSize = %d", (unsigned)inDataSize));
422                 }
423 
424                 if (crossSize != 0)
425                   memcpy(data, parseData, size);
426                 p->crossStart = 0;
427                 p->crossEnd = 0;
428                 break;
429               }
430 
431               if (crossSize != 0)
432               {
433                 memcpy(data, parseData, parse.srcSize);
434                 p->crossStart += parse.srcSize;
435               }
436 
437               if (parse.state != MTDEC_PARSE_CONTINUE || finish)
438               {
439                 // we don't need to parse in current thread anymore
440 
441                 if (parse.state == MTDEC_PARSE_END)
442                   finish = True;
443 
444                 needCode = True;
445                 // p->crossFinished = finish;
446 
447                 if (parse.srcSize == size)
448                 {
449                   // full parsed - no cross transfer
450                   p->crossStart = 0;
451                   p->crossEnd = 0;
452                   break;
453                 }
454 
455                 if (parse.state == MTDEC_PARSE_END)
456                 {
457                   afterEndData = parseData + parse.srcSize;
458                   afterEndData_Size = size - parse.srcSize;
459                   if (crossSize != 0)
460                     afterEndData_IsCross = True;
461                   // we reduce data size to required bytes (parsed only)
462                   inDataSize -= afterEndData_Size;
463                   if (!prev)
464                     inDataSize_Start = parse.srcSize;
465                   break;
466                 }
467 
468                 {
469                   // partial parsed - need cross transfer
470                   if (crossSize != 0)
471                     inDataSize = parse.srcSize; // it's only parsed now
472                   else
473                   {
474                     // partial parsed - is not in initial cross block - we need to copy new data to cross block
475                     Byte *cr = MtDec_GetCrossBuff(p);
476                     if (!cr)
477                     {
478                       {
479                         PRF(printf("\ncross alloc error error\n"));
480                         // res = SZ_ERROR_MEM;
481                         finish = True;
482                         // p->allocError_for_Read_BlockIndex = blockIndex;
483                         isAllocError = True;
484                         break;
485                       }
486                     }
487 
488                     {
489                       size_t crSize = size - parse.srcSize;
490                       inDataSize -= crSize;
491                       p->crossEnd = crSize;
492                       p->crossStart = 0;
493                       memcpy(cr, parseData + parse.srcSize, crSize);
494                     }
495                   }
496 
497                   // inDataSize_Full = inDataSize;
498                   if (!prev)
499                     inDataSize_Start = parse.srcSize; // it's partial size (parsed only)
500 
501                   finish = False;
502                   break;
503                 }
504               }
505 
506               if (parse.srcSize != size)
507               {
508                 res = SZ_ERROR_FAIL;
509                 PRF(printf("\nfinished error SZ_ERROR_FAIL = %d\n", res));
510                 break;
511               }
512             }
513           }
514 
515           prev = link;
516           link = link->next;
517 
518           if (crossSize != 0)
519           {
520             crossSize = 0;
521             p->crossStart = 0;
522             p->crossEnd = 0;
523           }
524         }
525       }
526 
527       if (res == SZ_OK)
528         res = MtDec_GetError_Spec(p, blockIndex, &wasInterrupted);
529     }
530 
531     codeRes = SZ_OK;
532 
533     if (res == SZ_OK && needCode && !wasInterrupted)
534     {
535       codeRes = p->mtCallback->PreCode(p->mtCallbackObject, t->index);
536       if (codeRes != SZ_OK)
537       {
538         needCode = False;
539         finish = True;
540         // SZ_ERROR_MEM is expected error here.
541         //   if (codeRes == SZ_ERROR_MEM) - we will try single-thread decoding later.
542         //   if (codeRes != SZ_ERROR_MEM) - we can stop decoding or try single-thread decoding.
543       }
544     }
545 
546     if (res != SZ_OK || wasInterrupted)
547       finish = True;
548 
549     nextThread = NULL;
550     threadingErrorSRes = SZ_OK;
551 
552     if (!finish)
553     {
554       if (p->numStartedThreads < p->numStartedThreads_Limit && canCreateNewThread)
555       {
556         SRes res2 = MtDecThread_CreateAndStart(&p->threads[p->numStartedThreads]);
557         if (res2 == SZ_OK)
558         {
559           // if (p->numStartedThreads % 1000 == 0) PRF(printf("\n numStartedThreads=%d\n", p->numStartedThreads));
560           p->numStartedThreads++;
561         }
562         else
563         {
564           PRF(printf("\nERROR: numStartedThreads=%d\n", p->numStartedThreads));
565           if (p->numStartedThreads == 1)
566           {
567             // if only one thread is possible, we leave muti-threading code
568             finish = True;
569             needCode = False;
570             threadingErrorSRes = res2;
571           }
572           else
573             p->numStartedThreads_Limit = p->numStartedThreads;
574         }
575       }
576 
577       if (!finish)
578       {
579         unsigned nextIndex = t->index + 1;
580         nextThread = &p->threads[nextIndex >= p->numStartedThreads ? 0 : nextIndex];
581         RINOK_THREAD(Event_Set(&nextThread->canRead))
582         // We have started executing for new iteration (with next thread)
583         // And that next thread now is responsible for possible exit from decoding (threading_code)
584       }
585     }
586 
587     // each call of Event_Set(&nextThread->canRead) must be followed by call of Event_Set(&nextThread->canWrite)
588     // if ( !finish ) we must call Event_Set(&nextThread->canWrite) in any case
589     // if (  finish ) we switch to single-thread mode and there are 2 ways at the end of current iteration (current block):
590     //   - if (needContinue) after Write(&needContinue), we restore decoding with new iteration
591     //   - otherwise we stop decoding and exit from MtDec_ThreadFunc2()
592 
593     // Don't change (finish) variable in the further code
594 
595 
596     // ---------- CODE ----------
597 
598     inPrev = 0;
599     outPrev = 0;
600     inCodePos = 0;
601     outCodePos = 0;
602 
603     if (res == SZ_OK && needCode && codeRes == SZ_OK)
604     {
605       BoolInt isStartBlock = True;
606       CMtDecBufLink *link = (CMtDecBufLink *)t->inBuf;
607 
608       for (;;)
609       {
610         size_t inSize;
611         int stop;
612 
613         if (isStartBlock)
614           inSize = inDataSize_Start;
615         else
616         {
617           UInt64 rem = inDataSize - inCodePos;
618           inSize = p->inBufSize;
619           if (inSize > rem)
620             inSize = (size_t)rem;
621         }
622 
623         inCodePos += inSize;
624         stop = True;
625 
626         codeRes = p->mtCallback->Code(p->mtCallbackObject, t->index,
627             (const Byte *)MTDEC__DATA_PTR_FROM_LINK(link), inSize,
628             (inCodePos == inDataSize), // srcFinished
629             &inCodePos, &outCodePos, &stop);
630 
631         if (codeRes != SZ_OK)
632         {
633           PRF(printf("\nCode Interrupt error = %x\n", codeRes));
634           // we interrupt only later blocks
635           MtDec_Interrupt(p, blockIndex);
636           break;
637         }
638 
639         if (stop || inCodePos == inDataSize)
640           break;
641 
642         {
643           const UInt64 inDelta = inCodePos - inPrev;
644           const UInt64 outDelta = outCodePos - outPrev;
645           if (inDelta >= MTDEC_ProgessStep || outDelta >= MTDEC_ProgessStep)
646           {
647             // Sleep(1);
648             res = MtDec_Progress_GetError_Spec(p, inDelta, outDelta, blockIndex, &wasInterrupted);
649             if (res != SZ_OK || wasInterrupted)
650               break;
651             inPrev = inCodePos;
652             outPrev = outCodePos;
653           }
654         }
655 
656         link = link->next;
657         isStartBlock = False;
658       }
659     }
660 
661 
662     // ---------- WRITE ----------
663 
664     RINOK_THREAD(Event_Wait(&t->canWrite))
665 
666   {
667     BoolInt isErrorMode = False;
668     BoolInt canRecode = True;
669     BoolInt needWriteToStream = needWrite;
670 
671     if (p->exitThread) return 0; // it's never executed in normal cases
672 
673     if (p->wasInterrupted)
674       wasInterrupted = True;
675     else
676     {
677       if (codeRes != SZ_OK) // || !needCode // check it !!!
678       {
679         p->wasInterrupted = True;
680         p->codeRes = codeRes;
681         if (codeRes == SZ_ERROR_MEM)
682           isAllocError = True;
683       }
684 
685       if (threadingErrorSRes)
686       {
687         p->wasInterrupted = True;
688         p->threadingErrorSRes = threadingErrorSRes;
689         needWriteToStream = False;
690       }
691       if (isAllocError)
692       {
693         p->wasInterrupted = True;
694         p->isAllocError = True;
695         needWriteToStream = False;
696       }
697       if (overflow)
698       {
699         p->wasInterrupted = True;
700         p->overflow = True;
701         needWriteToStream = False;
702       }
703     }
704 
705     if (needCode)
706     {
707       if (wasInterrupted)
708       {
709         inCodePos = 0;
710         outCodePos = 0;
711       }
712       {
713         const UInt64 inDelta = inCodePos - inPrev;
714         const UInt64 outDelta = outCodePos - outPrev;
715         // if (inDelta != 0 || outDelta != 0)
716         res = MtProgress_ProgressAdd(&p->mtProgress, inDelta, outDelta);
717       }
718     }
719 
720     needContinue = (!finish);
721 
722     // if (res == SZ_OK && needWrite && !wasInterrupted)
723     if (needWrite)
724     {
725       // p->inProcessed += inCodePos;
726 
727       PRF(printf("\n--Write afterSize = %d\n", (unsigned)afterEndData_Size));
728 
729       res = p->mtCallback->Write(p->mtCallbackObject, t->index,
730           res == SZ_OK && needWriteToStream && !wasInterrupted, // needWrite
731           afterEndData, afterEndData_Size, afterEndData_IsCross,
732           &needContinue,
733           &canRecode);
734 
735       // res = SZ_ERROR_FAIL; // for test
736 
737       PRF(printf("\nAfter Write needContinue = %d\n", (unsigned)needContinue));
738       PRF(printf("\nprocessed = %d\n", (unsigned)p->inProcessed));
739 
740       if (res != SZ_OK)
741       {
742         PRF(printf("\nWrite error = %d\n", res));
743         isErrorMode = True;
744         p->wasInterrupted = True;
745       }
746       if (res != SZ_OK
747           || (!needContinue && !finish))
748       {
749         PRF(printf("\nWrite Interrupt error = %x\n", res));
750         MtDec_Interrupt(p, blockIndex);
751       }
752     }
753 
754     if (canRecode)
755     if (!needCode
756         || res != SZ_OK
757         || p->wasInterrupted
758         || codeRes != SZ_OK
759         || wasInterrupted
760         || p->numFilledThreads != 0
761         || isErrorMode)
762     {
763       if (p->numFilledThreads == 0)
764         p->filledThreadStart = t->index;
765       if (inDataSize != 0 || !finish)
766       {
767         t->inDataSize_Start = inDataSize_Start;
768         t->inDataSize = inDataSize;
769         p->numFilledThreads++;
770       }
771       PRF(printf("\np->numFilledThreads = %d\n", p->numFilledThreads));
772       PRF(printf("p->filledThreadStart = %d\n", p->filledThreadStart));
773     }
774 
775     if (!finish)
776     {
777       RINOK_THREAD(Event_Set(&nextThread->canWrite))
778     }
779     else
780     {
781       if (needContinue)
782       {
783         // we restore decoding with new iteration
784         RINOK_THREAD(Event_Set(&p->threads[0].canWrite))
785       }
786       else
787       {
788         // we exit from decoding
789         if (t->index == 0)
790           return SZ_OK;
791         p->exitThread = True;
792       }
793       RINOK_THREAD(Event_Set(&p->threads[0].canRead))
794     }
795   }
796   }
797 }
798 
799 #ifdef _WIN32
800 #define USE_ALLOCA
801 #endif
802 
803 #ifdef USE_ALLOCA
804 #ifdef _WIN32
805 #include <malloc.h>
806 #else
807 #include <stdlib.h>
808 #endif
809 #endif
810 
811 
MtDec_ThreadFunc1(void * pp)812 static THREAD_FUNC_DECL MtDec_ThreadFunc1(void *pp)
813 {
814   WRes res;
815 
816   CMtDecThread *t = (CMtDecThread *)pp;
817   CMtDec *p;
818 
819   // fprintf(stdout, "\n%d = %p\n", t->index, &t);
820 
821   res = MtDec_ThreadFunc2(t);
822   p = t->mtDec;
823   if (res == 0)
824     return (THREAD_FUNC_RET_TYPE)(UINT_PTR)p->exitThreadWRes;
825   {
826     // it's unexpected situation for some threading function error
827     if (p->exitThreadWRes == 0)
828       p->exitThreadWRes = res;
829     PRF(printf("\nthread exit error = %d\n", res));
830     p->exitThread = True;
831     Event_Set(&p->threads[0].canRead);
832     Event_Set(&p->threads[0].canWrite);
833     MtProgress_SetError(&p->mtProgress, MY_SRes_HRESULT_FROM_WRes(res));
834   }
835   return (THREAD_FUNC_RET_TYPE)(UINT_PTR)res;
836 }
837 
MtDec_ThreadFunc(void * pp)838 static Z7_NO_INLINE THREAD_FUNC_DECL MtDec_ThreadFunc(void *pp)
839 {
840   #ifdef USE_ALLOCA
841   CMtDecThread *t = (CMtDecThread *)pp;
842   // fprintf(stderr, "\n%d = %p - before", t->index, &t);
843   t->allocaPtr = alloca(t->index * 128);
844   #endif
845   return MtDec_ThreadFunc1(pp);
846 }
847 
848 
MtDec_PrepareRead(CMtDec * p)849 int MtDec_PrepareRead(CMtDec *p)
850 {
851   if (p->crossBlock && p->crossStart == p->crossEnd)
852   {
853     ISzAlloc_Free(p->alloc, p->crossBlock);
854     p->crossBlock = NULL;
855   }
856 
857   {
858     unsigned i;
859     for (i = 0; i < MTDEC_THREADS_MAX; i++)
860       if (i > p->numStartedThreads
861           || p->numFilledThreads <=
862             (i >= p->filledThreadStart ?
863               i - p->filledThreadStart :
864               i + p->numStartedThreads - p->filledThreadStart))
865         MtDecThread_FreeInBufs(&p->threads[i]);
866   }
867 
868   return (p->numFilledThreads != 0) || (p->crossStart != p->crossEnd);
869 }
870 
871 
MtDec_Read(CMtDec * p,size_t * inLim)872 const Byte *MtDec_Read(CMtDec *p, size_t *inLim)
873 {
874   while (p->numFilledThreads != 0)
875   {
876     CMtDecThread *t = &p->threads[p->filledThreadStart];
877 
878     if (*inLim != 0)
879     {
880       {
881         void *link = t->inBuf;
882         void *next = ((CMtDecBufLink *)link)->next;
883         ISzAlloc_Free(p->alloc, link);
884         t->inBuf = next;
885       }
886 
887       if (t->inDataSize == 0)
888       {
889         MtDecThread_FreeInBufs(t);
890         if (--p->numFilledThreads == 0)
891           break;
892         if (++p->filledThreadStart == p->numStartedThreads)
893           p->filledThreadStart = 0;
894         t = &p->threads[p->filledThreadStart];
895       }
896     }
897 
898     {
899       size_t lim = t->inDataSize_Start;
900       if (lim != 0)
901         t->inDataSize_Start = 0;
902       else
903       {
904         UInt64 rem = t->inDataSize;
905         lim = p->inBufSize;
906         if (lim > rem)
907           lim = (size_t)rem;
908       }
909       t->inDataSize -= lim;
910       *inLim = lim;
911       return (const Byte *)MTDEC__DATA_PTR_FROM_LINK(t->inBuf);
912     }
913   }
914 
915   {
916     size_t crossSize = p->crossEnd - p->crossStart;
917     if (crossSize != 0)
918     {
919       const Byte *data = MTDEC__DATA_PTR_FROM_LINK(p->crossBlock) + p->crossStart;
920       *inLim = crossSize;
921       p->crossStart = 0;
922       p->crossEnd = 0;
923       return data;
924     }
925     *inLim = 0;
926     if (p->crossBlock)
927     {
928       ISzAlloc_Free(p->alloc, p->crossBlock);
929       p->crossBlock = NULL;
930     }
931     return NULL;
932   }
933 }
934 
935 
MtDec_Construct(CMtDec * p)936 void MtDec_Construct(CMtDec *p)
937 {
938   unsigned i;
939 
940   p->inBufSize = (size_t)1 << 18;
941 
942   p->numThreadsMax = 0;
943 
944   p->inStream = NULL;
945 
946   // p->inData = NULL;
947   // p->inDataSize = 0;
948 
949   p->crossBlock = NULL;
950   p->crossStart = 0;
951   p->crossEnd = 0;
952 
953   p->numFilledThreads = 0;
954 
955   p->progress = NULL;
956   p->alloc = NULL;
957 
958   p->mtCallback = NULL;
959   p->mtCallbackObject = NULL;
960 
961   p->allocatedBufsSize = 0;
962 
963   for (i = 0; i < MTDEC_THREADS_MAX; i++)
964   {
965     CMtDecThread *t = &p->threads[i];
966     t->mtDec = p;
967     t->index = i;
968     t->inBuf = NULL;
969     Event_Construct(&t->canRead);
970     Event_Construct(&t->canWrite);
971     Thread_CONSTRUCT(&t->thread)
972   }
973 
974   // Event_Construct(&p->finishedEvent);
975 
976   CriticalSection_Init(&p->mtProgress.cs);
977 }
978 
979 
MtDec_Free(CMtDec * p)980 static void MtDec_Free(CMtDec *p)
981 {
982   unsigned i;
983 
984   p->exitThread = True;
985 
986   for (i = 0; i < MTDEC_THREADS_MAX; i++)
987     MtDecThread_Destruct(&p->threads[i]);
988 
989   // Event_Close(&p->finishedEvent);
990 
991   if (p->crossBlock)
992   {
993     ISzAlloc_Free(p->alloc, p->crossBlock);
994     p->crossBlock = NULL;
995   }
996 }
997 
998 
MtDec_Destruct(CMtDec * p)999 void MtDec_Destruct(CMtDec *p)
1000 {
1001   MtDec_Free(p);
1002 
1003   CriticalSection_Delete(&p->mtProgress.cs);
1004 }
1005 
1006 
MtDec_Code(CMtDec * p)1007 SRes MtDec_Code(CMtDec *p)
1008 {
1009   unsigned i;
1010 
1011   p->inProcessed = 0;
1012 
1013   p->blockIndex = 1; // it must be larger than not_defined index (0)
1014   p->isAllocError = False;
1015   p->overflow = False;
1016   p->threadingErrorSRes = SZ_OK;
1017 
1018   p->needContinue = True;
1019 
1020   p->readWasFinished = False;
1021   p->needInterrupt = False;
1022   p->interruptIndex = (UInt64)(Int64)-1;
1023 
1024   p->readProcessed = 0;
1025   p->readRes = SZ_OK;
1026   p->codeRes = SZ_OK;
1027   p->wasInterrupted = False;
1028 
1029   p->crossStart = 0;
1030   p->crossEnd = 0;
1031 
1032   p->filledThreadStart = 0;
1033   p->numFilledThreads = 0;
1034 
1035   {
1036     unsigned numThreads = p->numThreadsMax;
1037     if (numThreads > MTDEC_THREADS_MAX)
1038       numThreads = MTDEC_THREADS_MAX;
1039     p->numStartedThreads_Limit = numThreads;
1040     p->numStartedThreads = 0;
1041   }
1042 
1043   if (p->inBufSize != p->allocatedBufsSize)
1044   {
1045     for (i = 0; i < MTDEC_THREADS_MAX; i++)
1046     {
1047       CMtDecThread *t = &p->threads[i];
1048       if (t->inBuf)
1049         MtDecThread_FreeInBufs(t);
1050     }
1051     if (p->crossBlock)
1052     {
1053       ISzAlloc_Free(p->alloc, p->crossBlock);
1054       p->crossBlock = NULL;
1055     }
1056 
1057     p->allocatedBufsSize = p->inBufSize;
1058   }
1059 
1060   MtProgress_Init(&p->mtProgress, p->progress);
1061 
1062   // RINOK_THREAD(AutoResetEvent_OptCreate_And_Reset(&p->finishedEvent))
1063   p->exitThread = False;
1064   p->exitThreadWRes = 0;
1065 
1066   {
1067     WRes wres;
1068     SRes sres;
1069     CMtDecThread *nextThread = &p->threads[p->numStartedThreads++];
1070     // wres = MtDecThread_CreateAndStart(nextThread);
1071     wres = MtDecThread_CreateEvents(nextThread);
1072     if (wres == 0) { wres = Event_Set(&nextThread->canWrite);
1073     if (wres == 0) { wres = Event_Set(&nextThread->canRead);
1074     if (wres == 0) { THREAD_FUNC_RET_TYPE res = MtDec_ThreadFunc(nextThread);
1075     wres = (WRes)(UINT_PTR)res;
1076     if (wres != 0)
1077     {
1078       p->needContinue = False;
1079       MtDec_CloseThreads(p);
1080     }}}}
1081 
1082     // wres = 17; // for test
1083     // wres = Event_Wait(&p->finishedEvent);
1084 
1085     sres = MY_SRes_HRESULT_FROM_WRes(wres);
1086 
1087     if (sres != 0)
1088       p->threadingErrorSRes = sres;
1089 
1090     if (
1091         // wres == 0
1092         // wres != 0
1093         // || p->mtc.codeRes == SZ_ERROR_MEM
1094         p->isAllocError
1095         || p->threadingErrorSRes != SZ_OK
1096         || p->overflow)
1097     {
1098       // p->needContinue = True;
1099     }
1100     else
1101       p->needContinue = False;
1102 
1103     if (p->needContinue)
1104       return SZ_OK;
1105 
1106     // if (sres != SZ_OK)
1107     return sres;
1108     // return SZ_ERROR_FAIL;
1109   }
1110 }
1111 
1112 #endif
1113 
1114 #undef PRF
1115