• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* MtDec.c -- Multi-thread Decoder
2 2018-07-04 : Igor Pavlov : Public domain */
3 
4 #include "Precomp.h"
5 
6 // #define SHOW_DEBUG_INFO
7 
8 // #include <stdio.h>
9 
10 #ifdef SHOW_DEBUG_INFO
11 #include <stdio.h>
12 #endif
13 
14 #ifdef SHOW_DEBUG_INFO
15 #define PRF(x) x
16 #else
17 #define PRF(x)
18 #endif
19 
20 #define PRF_STR_INT(s, d) PRF(printf("\n" s " %d\n", (unsigned)d))
21 
22 #include "MtDec.h"
23 
24 #ifndef _7ZIP_ST
25 
MtProgress_Init(CMtProgress * p,ICompressProgress * progress)26 void MtProgress_Init(CMtProgress *p, ICompressProgress *progress)
27 {
28   p->progress = progress;
29   p->res = SZ_OK;
30   p->totalInSize = 0;
31   p->totalOutSize = 0;
32 }
33 
34 
MtProgress_Progress_ST(CMtProgress * p)35 SRes MtProgress_Progress_ST(CMtProgress *p)
36 {
37   if (p->res == SZ_OK && p->progress)
38     if (ICompressProgress_Progress(p->progress, p->totalInSize, p->totalOutSize) != SZ_OK)
39       p->res = SZ_ERROR_PROGRESS;
40   return p->res;
41 }
42 
43 
MtProgress_ProgressAdd(CMtProgress * p,UInt64 inSize,UInt64 outSize)44 SRes MtProgress_ProgressAdd(CMtProgress *p, UInt64 inSize, UInt64 outSize)
45 {
46   SRes res;
47   CriticalSection_Enter(&p->cs);
48 
49   p->totalInSize += inSize;
50   p->totalOutSize += outSize;
51   if (p->res == SZ_OK && p->progress)
52     if (ICompressProgress_Progress(p->progress, p->totalInSize, p->totalOutSize) != SZ_OK)
53       p->res = SZ_ERROR_PROGRESS;
54   res = p->res;
55 
56   CriticalSection_Leave(&p->cs);
57   return res;
58 }
59 
60 
MtProgress_GetError(CMtProgress * p)61 SRes MtProgress_GetError(CMtProgress *p)
62 {
63   SRes res;
64   CriticalSection_Enter(&p->cs);
65   res = p->res;
66   CriticalSection_Leave(&p->cs);
67   return res;
68 }
69 
70 
MtProgress_SetError(CMtProgress * p,SRes res)71 void MtProgress_SetError(CMtProgress *p, SRes res)
72 {
73   CriticalSection_Enter(&p->cs);
74   if (p->res == SZ_OK)
75     p->res = res;
76   CriticalSection_Leave(&p->cs);
77 }
78 
79 
80 #define RINOK_THREAD(x) RINOK(x)
81 
82 
ArEvent_OptCreate_And_Reset(CEvent * p)83 static WRes ArEvent_OptCreate_And_Reset(CEvent *p)
84 {
85   if (Event_IsCreated(p))
86     return Event_Reset(p);
87   return AutoResetEvent_CreateNotSignaled(p);
88 }
89 
90 
91 
92 typedef struct
93 {
94   void *next;
95   void *pad[3];
96 } CMtDecBufLink;
97 
98 #define MTDEC__LINK_DATA_OFFSET sizeof(CMtDecBufLink)
99 #define MTDEC__DATA_PTR_FROM_LINK(link) ((Byte *)(link) + MTDEC__LINK_DATA_OFFSET)
100 
101 
102 
103 static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp);
104 
105 
MtDecThread_CreateEvents(CMtDecThread * t)106 static WRes MtDecThread_CreateEvents(CMtDecThread *t)
107 {
108   WRes wres = ArEvent_OptCreate_And_Reset(&t->canWrite);
109   if (wres == 0)
110   {
111     wres = ArEvent_OptCreate_And_Reset(&t->canRead);
112     if (wres == 0)
113       return SZ_OK;
114   }
115   return wres;
116 }
117 
118 
MtDecThread_CreateAndStart(CMtDecThread * t)119 static SRes MtDecThread_CreateAndStart(CMtDecThread *t)
120 {
121   WRes wres = MtDecThread_CreateEvents(t);
122   // wres = 17; // for test
123   if (wres == 0)
124   {
125     if (Thread_WasCreated(&t->thread))
126       return SZ_OK;
127     wres = Thread_Create(&t->thread, ThreadFunc, t);
128     if (wres == 0)
129       return SZ_OK;
130   }
131   return MY_SRes_HRESULT_FROM_WRes(wres);
132 }
133 
134 
MtDecThread_FreeInBufs(CMtDecThread * t)135 void MtDecThread_FreeInBufs(CMtDecThread *t)
136 {
137   if (t->inBuf)
138   {
139     void *link = t->inBuf;
140     t->inBuf = NULL;
141     do
142     {
143       void *next = ((CMtDecBufLink *)link)->next;
144       ISzAlloc_Free(t->mtDec->alloc, link);
145       link = next;
146     }
147     while (link);
148   }
149 }
150 
151 
MtDecThread_CloseThread(CMtDecThread * t)152 static void MtDecThread_CloseThread(CMtDecThread *t)
153 {
154   if (Thread_WasCreated(&t->thread))
155   {
156     Event_Set(&t->canWrite); /* we can disable it. There are no threads waiting canWrite in normal cases */
157     Event_Set(&t->canRead);
158     Thread_Wait(&t->thread);
159     Thread_Close(&t->thread);
160   }
161 
162   Event_Close(&t->canRead);
163   Event_Close(&t->canWrite);
164 }
165 
MtDec_CloseThreads(CMtDec * p)166 static void MtDec_CloseThreads(CMtDec *p)
167 {
168   unsigned i;
169   for (i = 0; i < MTDEC__THREADS_MAX; i++)
170     MtDecThread_CloseThread(&p->threads[i]);
171 }
172 
MtDecThread_Destruct(CMtDecThread * t)173 static void MtDecThread_Destruct(CMtDecThread *t)
174 {
175   MtDecThread_CloseThread(t);
176   MtDecThread_FreeInBufs(t);
177 }
178 
179 
180 
FullRead(ISeqInStream * stream,Byte * data,size_t * processedSize)181 static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize)
182 {
183   size_t size = *processedSize;
184   *processedSize = 0;
185   while (size != 0)
186   {
187     size_t cur = size;
188     SRes res = ISeqInStream_Read(stream, data, &cur);
189     *processedSize += cur;
190     data += cur;
191     size -= cur;
192     RINOK(res);
193     if (cur == 0)
194       return SZ_OK;
195   }
196   return SZ_OK;
197 }
198 
199 
MtDec_GetError_Spec(CMtDec * p,UInt64 interruptIndex,BoolInt * wasInterrupted)200 static SRes MtDec_GetError_Spec(CMtDec *p, UInt64 interruptIndex, BoolInt *wasInterrupted)
201 {
202   SRes res;
203   CriticalSection_Enter(&p->mtProgress.cs);
204   *wasInterrupted = (p->needInterrupt && interruptIndex > p->interruptIndex);
205   res = p->mtProgress.res;
206   CriticalSection_Leave(&p->mtProgress.cs);
207   return res;
208 }
209 
MtDec_Progress_GetError_Spec(CMtDec * p,UInt64 inSize,UInt64 outSize,UInt64 interruptIndex,BoolInt * wasInterrupted)210 static SRes MtDec_Progress_GetError_Spec(CMtDec *p, UInt64 inSize, UInt64 outSize, UInt64 interruptIndex, BoolInt *wasInterrupted)
211 {
212   SRes res;
213   CriticalSection_Enter(&p->mtProgress.cs);
214 
215   p->mtProgress.totalInSize += inSize;
216   p->mtProgress.totalOutSize += outSize;
217   if (p->mtProgress.res == SZ_OK && p->mtProgress.progress)
218     if (ICompressProgress_Progress(p->mtProgress.progress, p->mtProgress.totalInSize, p->mtProgress.totalOutSize) != SZ_OK)
219       p->mtProgress.res = SZ_ERROR_PROGRESS;
220 
221   *wasInterrupted = (p->needInterrupt && interruptIndex > p->interruptIndex);
222   res = p->mtProgress.res;
223 
224   CriticalSection_Leave(&p->mtProgress.cs);
225 
226   return res;
227 }
228 
MtDec_Interrupt(CMtDec * p,UInt64 interruptIndex)229 static void MtDec_Interrupt(CMtDec *p, UInt64 interruptIndex)
230 {
231   CriticalSection_Enter(&p->mtProgress.cs);
232   if (!p->needInterrupt || interruptIndex < p->interruptIndex)
233   {
234     p->interruptIndex = interruptIndex;
235     p->needInterrupt = True;
236   }
237   CriticalSection_Leave(&p->mtProgress.cs);
238 }
239 
MtDec_GetCrossBuff(CMtDec * p)240 Byte *MtDec_GetCrossBuff(CMtDec *p)
241 {
242   Byte *cr = p->crossBlock;
243   if (!cr)
244   {
245     cr = (Byte *)ISzAlloc_Alloc(p->alloc, MTDEC__LINK_DATA_OFFSET + p->inBufSize);
246     if (!cr)
247       return NULL;
248     p->crossBlock = cr;
249   }
250   return MTDEC__DATA_PTR_FROM_LINK(cr);
251 }
252 
253 
254 /*
255   ThreadFunc2() returns:
256   0      - in all normal cases (even for stream error or memory allocation error)
257   (!= 0) - WRes error return by system threading function
258 */
259 
260 // #define MTDEC_ProgessStep (1 << 22)
261 #define MTDEC_ProgessStep (1 << 0)
262 
ThreadFunc2(CMtDecThread * t)263 static WRes ThreadFunc2(CMtDecThread *t)
264 {
265   CMtDec *p = t->mtDec;
266 
267   PRF_STR_INT("ThreadFunc2", t->index);
268 
269   // SetThreadAffinityMask(GetCurrentThread(), 1 << t->index);
270 
271   for (;;)
272   {
273     SRes res, codeRes;
274     BoolInt wasInterrupted, isAllocError, overflow, finish;
275     SRes threadingErrorSRes;
276     BoolInt needCode, needWrite, needContinue;
277 
278     size_t inDataSize_Start;
279     UInt64 inDataSize;
280     // UInt64 inDataSize_Full;
281 
282     UInt64 blockIndex;
283 
284     UInt64 inPrev = 0;
285     UInt64 outPrev = 0;
286     UInt64 inCodePos;
287     UInt64 outCodePos;
288 
289     Byte *afterEndData = NULL;
290     size_t afterEndData_Size = 0;
291 
292     BoolInt canCreateNewThread = False;
293     // CMtDecCallbackInfo parse;
294     CMtDecThread *nextThread;
295 
296     PRF_STR_INT("Event_Wait(&t->canRead)", t->index);
297 
298     RINOK_THREAD(Event_Wait(&t->canRead));
299     if (p->exitThread)
300       return 0;
301 
302     PRF_STR_INT("after Event_Wait(&t->canRead)", t->index);
303 
304     // if (t->index == 3) return 19; // for test
305 
306     blockIndex = p->blockIndex++;
307 
308     // PRF(printf("\ncanRead\n"))
309 
310     res = MtDec_Progress_GetError_Spec(p, 0, 0, blockIndex, &wasInterrupted);
311 
312     finish = p->readWasFinished;
313     needCode = False;
314     needWrite = False;
315     isAllocError = False;
316     overflow = False;
317 
318     inDataSize_Start = 0;
319     inDataSize = 0;
320     // inDataSize_Full = 0;
321 
322     if (res == SZ_OK && !wasInterrupted)
323     {
324       // if (p->inStream)
325       {
326         CMtDecBufLink *prev = NULL;
327         CMtDecBufLink *link = (CMtDecBufLink *)t->inBuf;
328         size_t crossSize = p->crossEnd - p->crossStart;
329 
330         PRF(printf("\ncrossSize = %d\n", crossSize));
331 
332         for (;;)
333         {
334           if (!link)
335           {
336             link = (CMtDecBufLink *)ISzAlloc_Alloc(p->alloc, MTDEC__LINK_DATA_OFFSET + p->inBufSize);
337             if (!link)
338             {
339               finish = True;
340               // p->allocError_for_Read_BlockIndex = blockIndex;
341               isAllocError = True;
342               break;
343             }
344             link->next = NULL;
345             if (prev)
346             {
347               // static unsigned g_num = 0;
348               // printf("\n%6d : %x", ++g_num, (unsigned)(size_t)((Byte *)link - (Byte *)prev));
349               prev->next = link;
350             }
351             else
352               t->inBuf = (void *)link;
353           }
354 
355           {
356             Byte *data = MTDEC__DATA_PTR_FROM_LINK(link);
357             Byte *parseData = data;
358             size_t size;
359 
360             if (crossSize != 0)
361             {
362               inDataSize = crossSize;
363               // inDataSize_Full = inDataSize;
364               inDataSize_Start = crossSize;
365               size = crossSize;
366               parseData = MTDEC__DATA_PTR_FROM_LINK(p->crossBlock) + p->crossStart;
367               PRF(printf("\ncross : crossStart = %7d  crossEnd = %7d finish = %1d",
368                   (int)p->crossStart, (int)p->crossEnd, (int)finish));
369             }
370             else
371             {
372               size = p->inBufSize;
373 
374               res = FullRead(p->inStream, data, &size);
375 
376               // size = 10; // test
377 
378               inDataSize += size;
379               // inDataSize_Full = inDataSize;
380               if (!prev)
381                 inDataSize_Start = size;
382 
383               p->readProcessed += size;
384               finish = (size != p->inBufSize);
385               if (finish)
386                 p->readWasFinished = True;
387 
388               // res = E_INVALIDARG; // test
389 
390               if (res != SZ_OK)
391               {
392                 // PRF(printf("\nRead error = %d\n", res))
393                 // we want to decode all data before error
394                 p->readRes = res;
395                 // p->readError_BlockIndex = blockIndex;
396                 p->readWasFinished = True;
397                 finish = True;
398                 res = SZ_OK;
399                 // break;
400               }
401 
402               if (inDataSize - inPrev >= MTDEC_ProgessStep)
403               {
404                 res = MtDec_Progress_GetError_Spec(p, 0, 0, blockIndex, &wasInterrupted);
405                 if (res != SZ_OK || wasInterrupted)
406                   break;
407                 inPrev = inDataSize;
408               }
409             }
410 
411             {
412               CMtDecCallbackInfo parse;
413 
414               parse.startCall = (prev == NULL);
415               parse.src = parseData;
416               parse.srcSize = size;
417               parse.srcFinished = finish;
418               parse.canCreateNewThread = True;
419 
420               // PRF(printf("\nParse size = %d\n", (unsigned)size))
421 
422               p->mtCallback->Parse(p->mtCallbackObject, t->index, &parse);
423 
424               needWrite = True;
425               canCreateNewThread = parse.canCreateNewThread;
426 
427               // printf("\n\n%12I64u %12I64u", (UInt64)p->mtProgress.totalInSize, (UInt64)p->mtProgress.totalOutSize);
428 
429               if (
430                   // parseRes != SZ_OK ||
431                   // inDataSize - (size - parse.srcSize) > p->inBlockMax
432                   // ||
433                   parse.state == MTDEC_PARSE_OVERFLOW
434                   // || wasInterrupted
435                   )
436               {
437                 // Overflow or Parse error - switch from MT decoding to ST decoding
438                 finish = True;
439                 overflow = True;
440 
441                 {
442                   PRF(printf("\n Overflow"));
443                   // PRF(printf("\nisBlockFinished = %d", (unsigned)parse.blockWasFinished));
444                   PRF(printf("\n inDataSize = %d", (unsigned)inDataSize));
445                 }
446 
447                 if (crossSize != 0)
448                   memcpy(data, parseData, size);
449                 p->crossStart = 0;
450                 p->crossEnd = 0;
451                 break;
452               }
453 
454               if (crossSize != 0)
455               {
456                 memcpy(data, parseData, parse.srcSize);
457                 p->crossStart += parse.srcSize;
458               }
459 
460               if (parse.state != MTDEC_PARSE_CONTINUE || finish)
461               {
462                 // we don't need to parse in current thread anymore
463 
464                 if (parse.state == MTDEC_PARSE_END)
465                   finish = True;
466 
467                 needCode = True;
468                 // p->crossFinished = finish;
469 
470                 if (parse.srcSize == size)
471                 {
472                   // full parsed - no cross transfer
473                   p->crossStart = 0;
474                   p->crossEnd = 0;
475                   break;
476                 }
477 
478                 if (parse.state == MTDEC_PARSE_END)
479                 {
480                   p->crossStart = 0;
481                   p->crossEnd = 0;
482 
483                   if (crossSize != 0)
484                     memcpy(data + parse.srcSize, parseData + parse.srcSize, size - parse.srcSize); // we need all data
485                   afterEndData_Size = size - parse.srcSize;
486                   afterEndData = parseData + parse.srcSize;
487 
488                   // we reduce data size to required bytes (parsed only)
489                   inDataSize -= (size - parse.srcSize);
490                   if (!prev)
491                     inDataSize_Start = parse.srcSize;
492                   break;
493                 }
494 
495                 {
496                   // partial parsed - need cross transfer
497                   if (crossSize != 0)
498                     inDataSize = parse.srcSize; // it's only parsed now
499                   else
500                   {
501                     // partial parsed - is not in initial cross block - we need to copy new data to cross block
502                     Byte *cr = MtDec_GetCrossBuff(p);
503                     if (!cr)
504                     {
505                       {
506                         PRF(printf("\ncross alloc error error\n"));
507                         // res = SZ_ERROR_MEM;
508                         finish = True;
509                         // p->allocError_for_Read_BlockIndex = blockIndex;
510                         isAllocError = True;
511                         break;
512                       }
513                     }
514 
515                     {
516                       size_t crSize = size - parse.srcSize;
517                       inDataSize -= crSize;
518                       p->crossEnd = crSize;
519                       p->crossStart = 0;
520                       memcpy(cr, parseData + parse.srcSize, crSize);
521                     }
522                   }
523 
524                   // inDataSize_Full = inDataSize;
525                   if (!prev)
526                     inDataSize_Start = parse.srcSize; // it's partial size (parsed only)
527 
528                   finish = False;
529                   break;
530                 }
531               }
532 
533               if (parse.srcSize != size)
534               {
535                 res = SZ_ERROR_FAIL;
536                 PRF(printf("\nfinished error SZ_ERROR_FAIL = %d\n", res));
537                 break;
538               }
539             }
540           }
541 
542           prev = link;
543           link = link->next;
544 
545           if (crossSize != 0)
546           {
547             crossSize = 0;
548             p->crossStart = 0;
549             p->crossEnd = 0;
550           }
551         }
552       }
553 
554       if (res == SZ_OK)
555         res = MtDec_GetError_Spec(p, blockIndex, &wasInterrupted);
556     }
557 
558     codeRes = SZ_OK;
559 
560     if (res == SZ_OK && needCode && !wasInterrupted)
561     {
562       codeRes = p->mtCallback->PreCode(p->mtCallbackObject, t->index);
563       if (codeRes != SZ_OK)
564       {
565         needCode = False;
566         finish = True;
567         // SZ_ERROR_MEM is expected error here.
568         //   if (codeRes == SZ_ERROR_MEM) - we will try single-thread decoding later.
569         //   if (codeRes != SZ_ERROR_MEM) - we can stop decoding or try single-thread decoding.
570       }
571     }
572 
573     if (res != SZ_OK || wasInterrupted)
574       finish = True;
575 
576     nextThread = NULL;
577     threadingErrorSRes = SZ_OK;
578 
579     if (!finish)
580     {
581       if (p->numStartedThreads < p->numStartedThreads_Limit && canCreateNewThread)
582       {
583         SRes res2 = MtDecThread_CreateAndStart(&p->threads[p->numStartedThreads]);
584         if (res2 == SZ_OK)
585         {
586           // if (p->numStartedThreads % 1000 == 0) PRF(printf("\n numStartedThreads=%d\n", p->numStartedThreads));
587           p->numStartedThreads++;
588         }
589         else
590         {
591           PRF(printf("\nERROR: numStartedThreads=%d\n", p->numStartedThreads));
592           if (p->numStartedThreads == 1)
593           {
594             // if only one thread is possible, we leave muti-threading code
595             finish = True;
596             needCode = False;
597             threadingErrorSRes = res2;
598           }
599           else
600             p->numStartedThreads_Limit = p->numStartedThreads;
601         }
602       }
603 
604       if (!finish)
605       {
606         unsigned nextIndex = t->index + 1;
607         nextThread = &p->threads[nextIndex >= p->numStartedThreads ? 0 : nextIndex];
608         RINOK_THREAD(Event_Set(&nextThread->canRead))
609         // We have started executing for new iteration (with next thread)
610         // And that next thread now is responsible for possible exit from decoding (threading_code)
611       }
612     }
613 
614     // each call of Event_Set(&nextThread->canRead) must be followed by call of Event_Set(&nextThread->canWrite)
615     // if ( !finish ) we must call Event_Set(&nextThread->canWrite) in any case
616     // if (  finish ) we switch to single-thread mode and there are 2 ways at the end of current iteration (current block):
617     //   - if (needContinue) after Write(&needContinue), we restore decoding with new iteration
618     //   - otherwise we stop decoding and exit from ThreadFunc2()
619 
620     // Don't change (finish) variable in the further code
621 
622 
623     // ---------- CODE ----------
624 
625     inPrev = 0;
626     outPrev = 0;
627     inCodePos = 0;
628     outCodePos = 0;
629 
630     if (res == SZ_OK && needCode && codeRes == SZ_OK)
631     {
632       BoolInt isStartBlock = True;
633       CMtDecBufLink *link = (CMtDecBufLink *)t->inBuf;
634 
635       for (;;)
636       {
637         size_t inSize;
638         int stop;
639 
640         if (isStartBlock)
641           inSize = inDataSize_Start;
642         else
643         {
644           UInt64 rem = inDataSize - inCodePos;
645           inSize = p->inBufSize;
646           if (inSize > rem)
647             inSize = (size_t)rem;
648         }
649 
650         inCodePos += inSize;
651         stop = True;
652 
653         codeRes = p->mtCallback->Code(p->mtCallbackObject, t->index,
654             (const Byte *)MTDEC__DATA_PTR_FROM_LINK(link), inSize,
655             (inCodePos == inDataSize), // srcFinished
656             &inCodePos, &outCodePos, &stop);
657 
658         if (codeRes != SZ_OK)
659         {
660           PRF(printf("\nCode Interrupt error = %x\n", codeRes));
661           // we interrupt only later blocks
662           MtDec_Interrupt(p, blockIndex);
663           break;
664         }
665 
666         if (stop || inCodePos == inDataSize)
667           break;
668 
669         {
670           const UInt64 inDelta = inCodePos - inPrev;
671           const UInt64 outDelta = outCodePos - outPrev;
672           if (inDelta >= MTDEC_ProgessStep || outDelta >= MTDEC_ProgessStep)
673           {
674             // Sleep(1);
675             res = MtDec_Progress_GetError_Spec(p, inDelta, outDelta, blockIndex, &wasInterrupted);
676             if (res != SZ_OK || wasInterrupted)
677               break;
678             inPrev = inCodePos;
679             outPrev = outCodePos;
680           }
681         }
682 
683         link = link->next;
684         isStartBlock = False;
685       }
686     }
687 
688 
689     // ---------- WRITE ----------
690 
691     RINOK_THREAD(Event_Wait(&t->canWrite));
692 
693   {
694     BoolInt isErrorMode = False;
695     BoolInt canRecode = True;
696     BoolInt needWriteToStream = needWrite;
697 
698     if (p->exitThread) return 0; // it's never executed in normal cases
699 
700     if (p->wasInterrupted)
701       wasInterrupted = True;
702     else
703     {
704       if (codeRes != SZ_OK) // || !needCode // check it !!!
705       {
706         p->wasInterrupted = True;
707         p->codeRes = codeRes;
708         if (codeRes == SZ_ERROR_MEM)
709           isAllocError = True;
710       }
711 
712       if (threadingErrorSRes)
713       {
714         p->wasInterrupted = True;
715         p->threadingErrorSRes = threadingErrorSRes;
716         needWriteToStream = False;
717       }
718       if (isAllocError)
719       {
720         p->wasInterrupted = True;
721         p->isAllocError = True;
722         needWriteToStream = False;
723       }
724       if (overflow)
725       {
726         p->wasInterrupted = True;
727         p->overflow = True;
728         needWriteToStream = False;
729       }
730     }
731 
732     if (needCode)
733     {
734       if (wasInterrupted)
735       {
736         inCodePos = 0;
737         outCodePos = 0;
738       }
739       {
740         const UInt64 inDelta = inCodePos - inPrev;
741         const UInt64 outDelta = outCodePos - outPrev;
742         // if (inDelta != 0 || outDelta != 0)
743         res = MtProgress_ProgressAdd(&p->mtProgress, inDelta, outDelta);
744       }
745     }
746 
747     needContinue = (!finish);
748 
749     // if (res == SZ_OK && needWrite && !wasInterrupted)
750     if (needWrite)
751     {
752       // p->inProcessed += inCodePos;
753 
754       res = p->mtCallback->Write(p->mtCallbackObject, t->index,
755           res == SZ_OK && needWriteToStream && !wasInterrupted, // needWrite
756           afterEndData, afterEndData_Size,
757           &needContinue,
758           &canRecode);
759 
760       // res= E_INVALIDARG; // for test
761 
762       PRF(printf("\nAfter Write needContinue = %d\n", (unsigned)needContinue));
763       PRF(printf("\nprocessed = %d\n", (unsigned)p->inProcessed));
764 
765       if (res != SZ_OK)
766       {
767         PRF(printf("\nWrite error = %d\n", res));
768         isErrorMode = True;
769         p->wasInterrupted = True;
770       }
771       if (res != SZ_OK
772           || (!needContinue && !finish))
773       {
774         PRF(printf("\nWrite Interrupt error = %x\n", res));
775         MtDec_Interrupt(p, blockIndex);
776       }
777     }
778 
779     if (canRecode)
780     if (!needCode
781         || res != SZ_OK
782         || p->wasInterrupted
783         || codeRes != SZ_OK
784         || wasInterrupted
785         || p->numFilledThreads != 0
786         || isErrorMode)
787     {
788       if (p->numFilledThreads == 0)
789         p->filledThreadStart = t->index;
790       if (inDataSize != 0 || !finish)
791       {
792         t->inDataSize_Start = inDataSize_Start;
793         t->inDataSize = inDataSize;
794         p->numFilledThreads++;
795       }
796       PRF(printf("\np->numFilledThreads = %d\n", p->numFilledThreads));
797       PRF(printf("p->filledThreadStart = %d\n", p->filledThreadStart));
798     }
799 
800     if (!finish)
801     {
802       RINOK_THREAD(Event_Set(&nextThread->canWrite));
803     }
804     else
805     {
806       if (needContinue)
807       {
808         // we restore decoding with new iteration
809         RINOK_THREAD(Event_Set(&p->threads[0].canWrite));
810       }
811       else
812       {
813         // we exit from decoding
814         if (t->index == 0)
815           return SZ_OK;
816         p->exitThread = True;
817       }
818       RINOK_THREAD(Event_Set(&p->threads[0].canRead));
819     }
820   }
821   }
822 }
823 
824 #ifdef _WIN32
825 #define USE_ALLOCA
826 #endif
827 
828 #ifdef USE_ALLOCA
829 #ifdef _WIN32
830 #include <malloc.h>
831 #else
832 #include <stdlib.h>
833 #endif
834 #endif
835 
836 
ThreadFunc1(void * pp)837 static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc1(void *pp)
838 {
839   WRes res;
840 
841   CMtDecThread *t = (CMtDecThread *)pp;
842   CMtDec *p;
843 
844   // fprintf(stdout, "\n%d = %p\n", t->index, &t);
845 
846   res = ThreadFunc2(t);
847   p = t->mtDec;
848   if (res == 0)
849     return p->exitThreadWRes;
850   {
851     // it's unexpected situation for some threading function error
852     if (p->exitThreadWRes == 0)
853       p->exitThreadWRes = res;
854     PRF(printf("\nthread exit error = %d\n", res));
855     p->exitThread = True;
856     Event_Set(&p->threads[0].canRead);
857     Event_Set(&p->threads[0].canWrite);
858     MtProgress_SetError(&p->mtProgress, MY_SRes_HRESULT_FROM_WRes(res));
859   }
860   return res;
861 }
862 
ThreadFunc(void * pp)863 static MY_NO_INLINE THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp)
864 {
865   CMtDecThread *t = (CMtDecThread *)pp;
866 
867   // fprintf(stderr, "\n%d = %p - before", t->index, &t);
868   #ifdef USE_ALLOCA
869   t->allocaPtr = alloca(t->index * 128);
870   #endif
871   return ThreadFunc1(pp);
872 }
873 
874 
MtDec_PrepareRead(CMtDec * p)875 int MtDec_PrepareRead(CMtDec *p)
876 {
877   if (p->crossBlock && p->crossStart == p->crossEnd)
878   {
879     ISzAlloc_Free(p->alloc, p->crossBlock);
880     p->crossBlock = NULL;
881   }
882 
883   {
884     unsigned i;
885     for (i = 0; i < MTDEC__THREADS_MAX; i++)
886       if (i > p->numStartedThreads
887           || p->numFilledThreads <=
888             (i >= p->filledThreadStart ?
889               i - p->filledThreadStart :
890               i + p->numStartedThreads - p->filledThreadStart))
891         MtDecThread_FreeInBufs(&p->threads[i]);
892   }
893 
894   return (p->numFilledThreads != 0) || (p->crossStart != p->crossEnd);
895 }
896 
897 
MtDec_Read(CMtDec * p,size_t * inLim)898 const Byte *MtDec_Read(CMtDec *p, size_t *inLim)
899 {
900   while (p->numFilledThreads != 0)
901   {
902     CMtDecThread *t = &p->threads[p->filledThreadStart];
903 
904     if (*inLim != 0)
905     {
906       {
907         void *link = t->inBuf;
908         void *next = ((CMtDecBufLink *)link)->next;
909         ISzAlloc_Free(p->alloc, link);
910         t->inBuf = next;
911       }
912 
913       if (t->inDataSize == 0)
914       {
915         MtDecThread_FreeInBufs(t);
916         if (--p->numFilledThreads == 0)
917           break;
918         if (++p->filledThreadStart == p->numStartedThreads)
919           p->filledThreadStart = 0;
920         t = &p->threads[p->filledThreadStart];
921       }
922     }
923 
924     {
925       size_t lim = t->inDataSize_Start;
926       if (lim != 0)
927         t->inDataSize_Start = 0;
928       else
929       {
930         UInt64 rem = t->inDataSize;
931         lim = p->inBufSize;
932         if (lim > rem)
933           lim = (size_t)rem;
934       }
935       t->inDataSize -= lim;
936       *inLim = lim;
937       return (const Byte *)MTDEC__DATA_PTR_FROM_LINK(t->inBuf);
938     }
939   }
940 
941   {
942     size_t crossSize = p->crossEnd - p->crossStart;
943     if (crossSize != 0)
944     {
945       const Byte *data = MTDEC__DATA_PTR_FROM_LINK(p->crossBlock) + p->crossStart;
946       *inLim = crossSize;
947       p->crossStart = 0;
948       p->crossEnd = 0;
949       return data;
950     }
951     *inLim = 0;
952     if (p->crossBlock)
953     {
954       ISzAlloc_Free(p->alloc, p->crossBlock);
955       p->crossBlock = NULL;
956     }
957     return NULL;
958   }
959 }
960 
961 
MtDec_Construct(CMtDec * p)962 void MtDec_Construct(CMtDec *p)
963 {
964   unsigned i;
965 
966   p->inBufSize = (size_t)1 << 18;
967 
968   p->numThreadsMax = 0;
969 
970   p->inStream = NULL;
971 
972   // p->inData = NULL;
973   // p->inDataSize = 0;
974 
975   p->crossBlock = NULL;
976   p->crossStart = 0;
977   p->crossEnd = 0;
978 
979   p->numFilledThreads = 0;
980 
981   p->progress = NULL;
982   p->alloc = NULL;
983 
984   p->mtCallback = NULL;
985   p->mtCallbackObject = NULL;
986 
987   p->allocatedBufsSize = 0;
988 
989   for (i = 0; i < MTDEC__THREADS_MAX; i++)
990   {
991     CMtDecThread *t = &p->threads[i];
992     t->mtDec = p;
993     t->index = i;
994     t->inBuf = NULL;
995     Event_Construct(&t->canRead);
996     Event_Construct(&t->canWrite);
997     Thread_Construct(&t->thread);
998   }
999 
1000   // Event_Construct(&p->finishedEvent);
1001 
1002   CriticalSection_Init(&p->mtProgress.cs);
1003 }
1004 
1005 
MtDec_Free(CMtDec * p)1006 static void MtDec_Free(CMtDec *p)
1007 {
1008   unsigned i;
1009 
1010   p->exitThread = True;
1011 
1012   for (i = 0; i < MTDEC__THREADS_MAX; i++)
1013     MtDecThread_Destruct(&p->threads[i]);
1014 
1015   // Event_Close(&p->finishedEvent);
1016 
1017   if (p->crossBlock)
1018   {
1019     ISzAlloc_Free(p->alloc, p->crossBlock);
1020     p->crossBlock = NULL;
1021   }
1022 }
1023 
1024 
MtDec_Destruct(CMtDec * p)1025 void MtDec_Destruct(CMtDec *p)
1026 {
1027   MtDec_Free(p);
1028 
1029   CriticalSection_Delete(&p->mtProgress.cs);
1030 }
1031 
1032 
MtDec_Code(CMtDec * p)1033 SRes MtDec_Code(CMtDec *p)
1034 {
1035   unsigned i;
1036 
1037   p->inProcessed = 0;
1038 
1039   p->blockIndex = 1; // it must be larger than not_defined index (0)
1040   p->isAllocError = False;
1041   p->overflow = False;
1042   p->threadingErrorSRes = SZ_OK;
1043 
1044   p->needContinue = True;
1045 
1046   p->readWasFinished = False;
1047   p->needInterrupt = False;
1048   p->interruptIndex = (UInt64)(Int64)-1;
1049 
1050   p->readProcessed = 0;
1051   p->readRes = SZ_OK;
1052   p->codeRes = SZ_OK;
1053   p->wasInterrupted = False;
1054 
1055   p->crossStart = 0;
1056   p->crossEnd = 0;
1057 
1058   p->filledThreadStart = 0;
1059   p->numFilledThreads = 0;
1060 
1061   {
1062     unsigned numThreads = p->numThreadsMax;
1063     if (numThreads > MTDEC__THREADS_MAX)
1064       numThreads = MTDEC__THREADS_MAX;
1065     p->numStartedThreads_Limit = numThreads;
1066     p->numStartedThreads = 0;
1067   }
1068 
1069   if (p->inBufSize != p->allocatedBufsSize)
1070   {
1071     for (i = 0; i < MTDEC__THREADS_MAX; i++)
1072     {
1073       CMtDecThread *t = &p->threads[i];
1074       if (t->inBuf)
1075         MtDecThread_FreeInBufs(t);
1076     }
1077     if (p->crossBlock)
1078     {
1079       ISzAlloc_Free(p->alloc, p->crossBlock);
1080       p->crossBlock = NULL;
1081     }
1082 
1083     p->allocatedBufsSize = p->inBufSize;
1084   }
1085 
1086   MtProgress_Init(&p->mtProgress, p->progress);
1087 
1088   // RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->finishedEvent));
1089   p->exitThread = False;
1090   p->exitThreadWRes = 0;
1091 
1092   {
1093     WRes wres;
1094     WRes sres;
1095     CMtDecThread *nextThread = &p->threads[p->numStartedThreads++];
1096     // wres = MtDecThread_CreateAndStart(nextThread);
1097     wres = MtDecThread_CreateEvents(nextThread);
1098     if (wres == 0) { wres = Event_Set(&nextThread->canWrite);
1099     if (wres == 0) { wres = Event_Set(&nextThread->canRead);
1100     if (wres == 0) { wres = ThreadFunc(nextThread);
1101     if (wres != 0)
1102     {
1103       p->needContinue = False;
1104       MtDec_CloseThreads(p);
1105     }}}}
1106 
1107     // wres = 17; // for test
1108     // wres = Event_Wait(&p->finishedEvent);
1109 
1110     sres = MY_SRes_HRESULT_FROM_WRes(wres);
1111 
1112     if (sres != 0)
1113       p->threadingErrorSRes = sres;
1114 
1115     if (
1116         // wres == 0
1117         // wres != 0
1118         // || p->mtc.codeRes == SZ_ERROR_MEM
1119         p->isAllocError
1120         || p->threadingErrorSRes != SZ_OK
1121         || p->overflow)
1122     {
1123       // p->needContinue = True;
1124     }
1125     else
1126       p->needContinue = False;
1127 
1128     if (p->needContinue)
1129       return SZ_OK;
1130 
1131     // if (sres != SZ_OK)
1132       return sres;
1133     // return E_FAIL;
1134   }
1135 }
1136 
1137 #endif
1138