• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* MtDec.c -- Multi-thread Decoder
2 2019-02-02 : Igor Pavlov : Public domain */
3 
4 #include "Precomp.h"
5 
6 // #define SHOW_DEBUG_INFO
7 
8 // #include <stdio.h>
9 
10 #ifdef SHOW_DEBUG_INFO
11 #include <stdio.h>
12 #endif
13 
14 #ifdef SHOW_DEBUG_INFO
15 #define PRF(x) x
16 #else
17 #define PRF(x)
18 #endif
19 
20 #define PRF_STR_INT(s, d) PRF(printf("\n" s " %d\n", (unsigned)d))
21 
22 #include "MtDec.h"
23 
24 #ifndef _7ZIP_ST
25 
MtProgress_Init(CMtProgress * p,ICompressProgress * progress)26 void MtProgress_Init(CMtProgress *p, ICompressProgress *progress)
27 {
28   p->progress = progress;
29   p->res = SZ_OK;
30   p->totalInSize = 0;
31   p->totalOutSize = 0;
32 }
33 
34 
MtProgress_Progress_ST(CMtProgress * p)35 SRes MtProgress_Progress_ST(CMtProgress *p)
36 {
37   if (p->res == SZ_OK && p->progress)
38     if (ICompressProgress_Progress(p->progress, p->totalInSize, p->totalOutSize) != SZ_OK)
39       p->res = SZ_ERROR_PROGRESS;
40   return p->res;
41 }
42 
43 
MtProgress_ProgressAdd(CMtProgress * p,UInt64 inSize,UInt64 outSize)44 SRes MtProgress_ProgressAdd(CMtProgress *p, UInt64 inSize, UInt64 outSize)
45 {
46   SRes res;
47   CriticalSection_Enter(&p->cs);
48 
49   p->totalInSize += inSize;
50   p->totalOutSize += outSize;
51   if (p->res == SZ_OK && p->progress)
52     if (ICompressProgress_Progress(p->progress, p->totalInSize, p->totalOutSize) != SZ_OK)
53       p->res = SZ_ERROR_PROGRESS;
54   res = p->res;
55 
56   CriticalSection_Leave(&p->cs);
57   return res;
58 }
59 
60 
MtProgress_GetError(CMtProgress * p)61 SRes MtProgress_GetError(CMtProgress *p)
62 {
63   SRes res;
64   CriticalSection_Enter(&p->cs);
65   res = p->res;
66   CriticalSection_Leave(&p->cs);
67   return res;
68 }
69 
70 
MtProgress_SetError(CMtProgress * p,SRes res)71 void MtProgress_SetError(CMtProgress *p, SRes res)
72 {
73   CriticalSection_Enter(&p->cs);
74   if (p->res == SZ_OK)
75     p->res = res;
76   CriticalSection_Leave(&p->cs);
77 }
78 
79 
80 #define RINOK_THREAD(x) RINOK(x)
81 
82 
ArEvent_OptCreate_And_Reset(CEvent * p)83 static WRes ArEvent_OptCreate_And_Reset(CEvent *p)
84 {
85   if (Event_IsCreated(p))
86     return Event_Reset(p);
87   return AutoResetEvent_CreateNotSignaled(p);
88 }
89 
90 
91 struct __CMtDecBufLink
92 {
93   struct __CMtDecBufLink *next;
94   void *pad[3];
95 };
96 
97 typedef struct __CMtDecBufLink CMtDecBufLink;
98 
99 #define MTDEC__LINK_DATA_OFFSET sizeof(CMtDecBufLink)
100 #define MTDEC__DATA_PTR_FROM_LINK(link) ((Byte *)(link) + MTDEC__LINK_DATA_OFFSET)
101 
102 
103 
104 static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp);
105 
106 
MtDecThread_CreateEvents(CMtDecThread * t)107 static WRes MtDecThread_CreateEvents(CMtDecThread *t)
108 {
109   WRes wres = ArEvent_OptCreate_And_Reset(&t->canWrite);
110   if (wres == 0)
111   {
112     wres = ArEvent_OptCreate_And_Reset(&t->canRead);
113     if (wres == 0)
114       return SZ_OK;
115   }
116   return wres;
117 }
118 
119 
MtDecThread_CreateAndStart(CMtDecThread * t)120 static SRes MtDecThread_CreateAndStart(CMtDecThread *t)
121 {
122   WRes wres = MtDecThread_CreateEvents(t);
123   // wres = 17; // for test
124   if (wres == 0)
125   {
126     if (Thread_WasCreated(&t->thread))
127       return SZ_OK;
128     wres = Thread_Create(&t->thread, ThreadFunc, t);
129     if (wres == 0)
130       return SZ_OK;
131   }
132   return MY_SRes_HRESULT_FROM_WRes(wres);
133 }
134 
135 
MtDecThread_FreeInBufs(CMtDecThread * t)136 void MtDecThread_FreeInBufs(CMtDecThread *t)
137 {
138   if (t->inBuf)
139   {
140     void *link = t->inBuf;
141     t->inBuf = NULL;
142     do
143     {
144       void *next = ((CMtDecBufLink *)link)->next;
145       ISzAlloc_Free(t->mtDec->alloc, link);
146       link = next;
147     }
148     while (link);
149   }
150 }
151 
152 
MtDecThread_CloseThread(CMtDecThread * t)153 static void MtDecThread_CloseThread(CMtDecThread *t)
154 {
155   if (Thread_WasCreated(&t->thread))
156   {
157     Event_Set(&t->canWrite); /* we can disable it. There are no threads waiting canWrite in normal cases */
158     Event_Set(&t->canRead);
159     Thread_Wait(&t->thread);
160     Thread_Close(&t->thread);
161   }
162 
163   Event_Close(&t->canRead);
164   Event_Close(&t->canWrite);
165 }
166 
MtDec_CloseThreads(CMtDec * p)167 static void MtDec_CloseThreads(CMtDec *p)
168 {
169   unsigned i;
170   for (i = 0; i < MTDEC__THREADS_MAX; i++)
171     MtDecThread_CloseThread(&p->threads[i]);
172 }
173 
MtDecThread_Destruct(CMtDecThread * t)174 static void MtDecThread_Destruct(CMtDecThread *t)
175 {
176   MtDecThread_CloseThread(t);
177   MtDecThread_FreeInBufs(t);
178 }
179 
180 
181 
FullRead(ISeqInStream * stream,Byte * data,size_t * processedSize)182 static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize)
183 {
184   size_t size = *processedSize;
185   *processedSize = 0;
186   while (size != 0)
187   {
188     size_t cur = size;
189     SRes res = ISeqInStream_Read(stream, data, &cur);
190     *processedSize += cur;
191     data += cur;
192     size -= cur;
193     RINOK(res);
194     if (cur == 0)
195       return SZ_OK;
196   }
197   return SZ_OK;
198 }
199 
200 
MtDec_GetError_Spec(CMtDec * p,UInt64 interruptIndex,BoolInt * wasInterrupted)201 static SRes MtDec_GetError_Spec(CMtDec *p, UInt64 interruptIndex, BoolInt *wasInterrupted)
202 {
203   SRes res;
204   CriticalSection_Enter(&p->mtProgress.cs);
205   *wasInterrupted = (p->needInterrupt && interruptIndex > p->interruptIndex);
206   res = p->mtProgress.res;
207   CriticalSection_Leave(&p->mtProgress.cs);
208   return res;
209 }
210 
MtDec_Progress_GetError_Spec(CMtDec * p,UInt64 inSize,UInt64 outSize,UInt64 interruptIndex,BoolInt * wasInterrupted)211 static SRes MtDec_Progress_GetError_Spec(CMtDec *p, UInt64 inSize, UInt64 outSize, UInt64 interruptIndex, BoolInt *wasInterrupted)
212 {
213   SRes res;
214   CriticalSection_Enter(&p->mtProgress.cs);
215 
216   p->mtProgress.totalInSize += inSize;
217   p->mtProgress.totalOutSize += outSize;
218   if (p->mtProgress.res == SZ_OK && p->mtProgress.progress)
219     if (ICompressProgress_Progress(p->mtProgress.progress, p->mtProgress.totalInSize, p->mtProgress.totalOutSize) != SZ_OK)
220       p->mtProgress.res = SZ_ERROR_PROGRESS;
221 
222   *wasInterrupted = (p->needInterrupt && interruptIndex > p->interruptIndex);
223   res = p->mtProgress.res;
224 
225   CriticalSection_Leave(&p->mtProgress.cs);
226 
227   return res;
228 }
229 
MtDec_Interrupt(CMtDec * p,UInt64 interruptIndex)230 static void MtDec_Interrupt(CMtDec *p, UInt64 interruptIndex)
231 {
232   CriticalSection_Enter(&p->mtProgress.cs);
233   if (!p->needInterrupt || interruptIndex < p->interruptIndex)
234   {
235     p->interruptIndex = interruptIndex;
236     p->needInterrupt = True;
237   }
238   CriticalSection_Leave(&p->mtProgress.cs);
239 }
240 
MtDec_GetCrossBuff(CMtDec * p)241 Byte *MtDec_GetCrossBuff(CMtDec *p)
242 {
243   Byte *cr = p->crossBlock;
244   if (!cr)
245   {
246     cr = (Byte *)ISzAlloc_Alloc(p->alloc, MTDEC__LINK_DATA_OFFSET + p->inBufSize);
247     if (!cr)
248       return NULL;
249     p->crossBlock = cr;
250   }
251   return MTDEC__DATA_PTR_FROM_LINK(cr);
252 }
253 
254 
255 /*
256   ThreadFunc2() returns:
257   0      - in all normal cases (even for stream error or memory allocation error)
258   (!= 0) - WRes error return by system threading function
259 */
260 
261 // #define MTDEC_ProgessStep (1 << 22)
262 #define MTDEC_ProgessStep (1 << 0)
263 
ThreadFunc2(CMtDecThread * t)264 static WRes ThreadFunc2(CMtDecThread *t)
265 {
266   CMtDec *p = t->mtDec;
267 
268   PRF_STR_INT("ThreadFunc2", t->index);
269 
270   // SetThreadAffinityMask(GetCurrentThread(), 1 << t->index);
271 
272   for (;;)
273   {
274     SRes res, codeRes;
275     BoolInt wasInterrupted, isAllocError, overflow, finish;
276     SRes threadingErrorSRes;
277     BoolInt needCode, needWrite, needContinue;
278 
279     size_t inDataSize_Start;
280     UInt64 inDataSize;
281     // UInt64 inDataSize_Full;
282 
283     UInt64 blockIndex;
284 
285     UInt64 inPrev = 0;
286     UInt64 outPrev = 0;
287     UInt64 inCodePos;
288     UInt64 outCodePos;
289 
290     Byte *afterEndData = NULL;
291     size_t afterEndData_Size = 0;
292 
293     BoolInt canCreateNewThread = False;
294     // CMtDecCallbackInfo parse;
295     CMtDecThread *nextThread;
296 
297     PRF_STR_INT("Event_Wait(&t->canRead)", t->index);
298 
299     RINOK_THREAD(Event_Wait(&t->canRead));
300     if (p->exitThread)
301       return 0;
302 
303     PRF_STR_INT("after Event_Wait(&t->canRead)", t->index);
304 
305     // if (t->index == 3) return 19; // for test
306 
307     blockIndex = p->blockIndex++;
308 
309     // PRF(printf("\ncanRead\n"))
310 
311     res = MtDec_Progress_GetError_Spec(p, 0, 0, blockIndex, &wasInterrupted);
312 
313     finish = p->readWasFinished;
314     needCode = False;
315     needWrite = False;
316     isAllocError = False;
317     overflow = False;
318 
319     inDataSize_Start = 0;
320     inDataSize = 0;
321     // inDataSize_Full = 0;
322 
323     if (res == SZ_OK && !wasInterrupted)
324     {
325       // if (p->inStream)
326       {
327         CMtDecBufLink *prev = NULL;
328         CMtDecBufLink *link = (CMtDecBufLink *)t->inBuf;
329         size_t crossSize = p->crossEnd - p->crossStart;
330 
331         PRF(printf("\ncrossSize = %d\n", crossSize));
332 
333         for (;;)
334         {
335           if (!link)
336           {
337             link = (CMtDecBufLink *)ISzAlloc_Alloc(p->alloc, MTDEC__LINK_DATA_OFFSET + p->inBufSize);
338             if (!link)
339             {
340               finish = True;
341               // p->allocError_for_Read_BlockIndex = blockIndex;
342               isAllocError = True;
343               break;
344             }
345             link->next = NULL;
346             if (prev)
347             {
348               // static unsigned g_num = 0;
349               // printf("\n%6d : %x", ++g_num, (unsigned)(size_t)((Byte *)link - (Byte *)prev));
350               prev->next = link;
351             }
352             else
353               t->inBuf = (void *)link;
354           }
355 
356           {
357             Byte *data = MTDEC__DATA_PTR_FROM_LINK(link);
358             Byte *parseData = data;
359             size_t size;
360 
361             if (crossSize != 0)
362             {
363               inDataSize = crossSize;
364               // inDataSize_Full = inDataSize;
365               inDataSize_Start = crossSize;
366               size = crossSize;
367               parseData = MTDEC__DATA_PTR_FROM_LINK(p->crossBlock) + p->crossStart;
368               PRF(printf("\ncross : crossStart = %7d  crossEnd = %7d finish = %1d",
369                   (int)p->crossStart, (int)p->crossEnd, (int)finish));
370             }
371             else
372             {
373               size = p->inBufSize;
374 
375               res = FullRead(p->inStream, data, &size);
376 
377               // size = 10; // test
378 
379               inDataSize += size;
380               // inDataSize_Full = inDataSize;
381               if (!prev)
382                 inDataSize_Start = size;
383 
384               p->readProcessed += size;
385               finish = (size != p->inBufSize);
386               if (finish)
387                 p->readWasFinished = True;
388 
389               // res = E_INVALIDARG; // test
390 
391               if (res != SZ_OK)
392               {
393                 // PRF(printf("\nRead error = %d\n", res))
394                 // we want to decode all data before error
395                 p->readRes = res;
396                 // p->readError_BlockIndex = blockIndex;
397                 p->readWasFinished = True;
398                 finish = True;
399                 res = SZ_OK;
400                 // break;
401               }
402 
403               if (inDataSize - inPrev >= MTDEC_ProgessStep)
404               {
405                 res = MtDec_Progress_GetError_Spec(p, 0, 0, blockIndex, &wasInterrupted);
406                 if (res != SZ_OK || wasInterrupted)
407                   break;
408                 inPrev = inDataSize;
409               }
410             }
411 
412             {
413               CMtDecCallbackInfo parse;
414 
415               parse.startCall = (prev == NULL);
416               parse.src = parseData;
417               parse.srcSize = size;
418               parse.srcFinished = finish;
419               parse.canCreateNewThread = True;
420 
421               // PRF(printf("\nParse size = %d\n", (unsigned)size))
422 
423               p->mtCallback->Parse(p->mtCallbackObject, t->index, &parse);
424 
425               needWrite = True;
426               canCreateNewThread = parse.canCreateNewThread;
427 
428               // printf("\n\n%12I64u %12I64u", (UInt64)p->mtProgress.totalInSize, (UInt64)p->mtProgress.totalOutSize);
429 
430               if (
431                   // parseRes != SZ_OK ||
432                   // inDataSize - (size - parse.srcSize) > p->inBlockMax
433                   // ||
434                   parse.state == MTDEC_PARSE_OVERFLOW
435                   // || wasInterrupted
436                   )
437               {
438                 // Overflow or Parse error - switch from MT decoding to ST decoding
439                 finish = True;
440                 overflow = True;
441 
442                 {
443                   PRF(printf("\n Overflow"));
444                   // PRF(printf("\nisBlockFinished = %d", (unsigned)parse.blockWasFinished));
445                   PRF(printf("\n inDataSize = %d", (unsigned)inDataSize));
446                 }
447 
448                 if (crossSize != 0)
449                   memcpy(data, parseData, size);
450                 p->crossStart = 0;
451                 p->crossEnd = 0;
452                 break;
453               }
454 
455               if (crossSize != 0)
456               {
457                 memcpy(data, parseData, parse.srcSize);
458                 p->crossStart += parse.srcSize;
459               }
460 
461               if (parse.state != MTDEC_PARSE_CONTINUE || finish)
462               {
463                 // we don't need to parse in current thread anymore
464 
465                 if (parse.state == MTDEC_PARSE_END)
466                   finish = True;
467 
468                 needCode = True;
469                 // p->crossFinished = finish;
470 
471                 if (parse.srcSize == size)
472                 {
473                   // full parsed - no cross transfer
474                   p->crossStart = 0;
475                   p->crossEnd = 0;
476                   break;
477                 }
478 
479                 if (parse.state == MTDEC_PARSE_END)
480                 {
481                   p->crossStart = 0;
482                   p->crossEnd = 0;
483 
484                   if (crossSize != 0)
485                     memcpy(data + parse.srcSize, parseData + parse.srcSize, size - parse.srcSize); // we need all data
486                   afterEndData_Size = size - parse.srcSize;
487                   afterEndData = parseData + parse.srcSize;
488 
489                   // we reduce data size to required bytes (parsed only)
490                   inDataSize -= (size - parse.srcSize);
491                   if (!prev)
492                     inDataSize_Start = parse.srcSize;
493                   break;
494                 }
495 
496                 {
497                   // partial parsed - need cross transfer
498                   if (crossSize != 0)
499                     inDataSize = parse.srcSize; // it's only parsed now
500                   else
501                   {
502                     // partial parsed - is not in initial cross block - we need to copy new data to cross block
503                     Byte *cr = MtDec_GetCrossBuff(p);
504                     if (!cr)
505                     {
506                       {
507                         PRF(printf("\ncross alloc error error\n"));
508                         // res = SZ_ERROR_MEM;
509                         finish = True;
510                         // p->allocError_for_Read_BlockIndex = blockIndex;
511                         isAllocError = True;
512                         break;
513                       }
514                     }
515 
516                     {
517                       size_t crSize = size - parse.srcSize;
518                       inDataSize -= crSize;
519                       p->crossEnd = crSize;
520                       p->crossStart = 0;
521                       memcpy(cr, parseData + parse.srcSize, crSize);
522                     }
523                   }
524 
525                   // inDataSize_Full = inDataSize;
526                   if (!prev)
527                     inDataSize_Start = parse.srcSize; // it's partial size (parsed only)
528 
529                   finish = False;
530                   break;
531                 }
532               }
533 
534               if (parse.srcSize != size)
535               {
536                 res = SZ_ERROR_FAIL;
537                 PRF(printf("\nfinished error SZ_ERROR_FAIL = %d\n", res));
538                 break;
539               }
540             }
541           }
542 
543           prev = link;
544           link = link->next;
545 
546           if (crossSize != 0)
547           {
548             crossSize = 0;
549             p->crossStart = 0;
550             p->crossEnd = 0;
551           }
552         }
553       }
554 
555       if (res == SZ_OK)
556         res = MtDec_GetError_Spec(p, blockIndex, &wasInterrupted);
557     }
558 
559     codeRes = SZ_OK;
560 
561     if (res == SZ_OK && needCode && !wasInterrupted)
562     {
563       codeRes = p->mtCallback->PreCode(p->mtCallbackObject, t->index);
564       if (codeRes != SZ_OK)
565       {
566         needCode = False;
567         finish = True;
568         // SZ_ERROR_MEM is expected error here.
569         //   if (codeRes == SZ_ERROR_MEM) - we will try single-thread decoding later.
570         //   if (codeRes != SZ_ERROR_MEM) - we can stop decoding or try single-thread decoding.
571       }
572     }
573 
574     if (res != SZ_OK || wasInterrupted)
575       finish = True;
576 
577     nextThread = NULL;
578     threadingErrorSRes = SZ_OK;
579 
580     if (!finish)
581     {
582       if (p->numStartedThreads < p->numStartedThreads_Limit && canCreateNewThread)
583       {
584         SRes res2 = MtDecThread_CreateAndStart(&p->threads[p->numStartedThreads]);
585         if (res2 == SZ_OK)
586         {
587           // if (p->numStartedThreads % 1000 == 0) PRF(printf("\n numStartedThreads=%d\n", p->numStartedThreads));
588           p->numStartedThreads++;
589         }
590         else
591         {
592           PRF(printf("\nERROR: numStartedThreads=%d\n", p->numStartedThreads));
593           if (p->numStartedThreads == 1)
594           {
595             // if only one thread is possible, we leave muti-threading code
596             finish = True;
597             needCode = False;
598             threadingErrorSRes = res2;
599           }
600           else
601             p->numStartedThreads_Limit = p->numStartedThreads;
602         }
603       }
604 
605       if (!finish)
606       {
607         unsigned nextIndex = t->index + 1;
608         nextThread = &p->threads[nextIndex >= p->numStartedThreads ? 0 : nextIndex];
609         RINOK_THREAD(Event_Set(&nextThread->canRead))
610         // We have started executing for new iteration (with next thread)
611         // And that next thread now is responsible for possible exit from decoding (threading_code)
612       }
613     }
614 
615     // each call of Event_Set(&nextThread->canRead) must be followed by call of Event_Set(&nextThread->canWrite)
616     // if ( !finish ) we must call Event_Set(&nextThread->canWrite) in any case
617     // if (  finish ) we switch to single-thread mode and there are 2 ways at the end of current iteration (current block):
618     //   - if (needContinue) after Write(&needContinue), we restore decoding with new iteration
619     //   - otherwise we stop decoding and exit from ThreadFunc2()
620 
621     // Don't change (finish) variable in the further code
622 
623 
624     // ---------- CODE ----------
625 
626     inPrev = 0;
627     outPrev = 0;
628     inCodePos = 0;
629     outCodePos = 0;
630 
631     if (res == SZ_OK && needCode && codeRes == SZ_OK)
632     {
633       BoolInt isStartBlock = True;
634       CMtDecBufLink *link = (CMtDecBufLink *)t->inBuf;
635 
636       for (;;)
637       {
638         size_t inSize;
639         int stop;
640 
641         if (isStartBlock)
642           inSize = inDataSize_Start;
643         else
644         {
645           UInt64 rem = inDataSize - inCodePos;
646           inSize = p->inBufSize;
647           if (inSize > rem)
648             inSize = (size_t)rem;
649         }
650 
651         inCodePos += inSize;
652         stop = True;
653 
654         codeRes = p->mtCallback->Code(p->mtCallbackObject, t->index,
655             (const Byte *)MTDEC__DATA_PTR_FROM_LINK(link), inSize,
656             (inCodePos == inDataSize), // srcFinished
657             &inCodePos, &outCodePos, &stop);
658 
659         if (codeRes != SZ_OK)
660         {
661           PRF(printf("\nCode Interrupt error = %x\n", codeRes));
662           // we interrupt only later blocks
663           MtDec_Interrupt(p, blockIndex);
664           break;
665         }
666 
667         if (stop || inCodePos == inDataSize)
668           break;
669 
670         {
671           const UInt64 inDelta = inCodePos - inPrev;
672           const UInt64 outDelta = outCodePos - outPrev;
673           if (inDelta >= MTDEC_ProgessStep || outDelta >= MTDEC_ProgessStep)
674           {
675             // Sleep(1);
676             res = MtDec_Progress_GetError_Spec(p, inDelta, outDelta, blockIndex, &wasInterrupted);
677             if (res != SZ_OK || wasInterrupted)
678               break;
679             inPrev = inCodePos;
680             outPrev = outCodePos;
681           }
682         }
683 
684         link = link->next;
685         isStartBlock = False;
686       }
687     }
688 
689 
690     // ---------- WRITE ----------
691 
692     RINOK_THREAD(Event_Wait(&t->canWrite));
693 
694   {
695     BoolInt isErrorMode = False;
696     BoolInt canRecode = True;
697     BoolInt needWriteToStream = needWrite;
698 
699     if (p->exitThread) return 0; // it's never executed in normal cases
700 
701     if (p->wasInterrupted)
702       wasInterrupted = True;
703     else
704     {
705       if (codeRes != SZ_OK) // || !needCode // check it !!!
706       {
707         p->wasInterrupted = True;
708         p->codeRes = codeRes;
709         if (codeRes == SZ_ERROR_MEM)
710           isAllocError = True;
711       }
712 
713       if (threadingErrorSRes)
714       {
715         p->wasInterrupted = True;
716         p->threadingErrorSRes = threadingErrorSRes;
717         needWriteToStream = False;
718       }
719       if (isAllocError)
720       {
721         p->wasInterrupted = True;
722         p->isAllocError = True;
723         needWriteToStream = False;
724       }
725       if (overflow)
726       {
727         p->wasInterrupted = True;
728         p->overflow = True;
729         needWriteToStream = False;
730       }
731     }
732 
733     if (needCode)
734     {
735       if (wasInterrupted)
736       {
737         inCodePos = 0;
738         outCodePos = 0;
739       }
740       {
741         const UInt64 inDelta = inCodePos - inPrev;
742         const UInt64 outDelta = outCodePos - outPrev;
743         // if (inDelta != 0 || outDelta != 0)
744         res = MtProgress_ProgressAdd(&p->mtProgress, inDelta, outDelta);
745       }
746     }
747 
748     needContinue = (!finish);
749 
750     // if (res == SZ_OK && needWrite && !wasInterrupted)
751     if (needWrite)
752     {
753       // p->inProcessed += inCodePos;
754 
755       res = p->mtCallback->Write(p->mtCallbackObject, t->index,
756           res == SZ_OK && needWriteToStream && !wasInterrupted, // needWrite
757           afterEndData, afterEndData_Size,
758           &needContinue,
759           &canRecode);
760 
761       // res= E_INVALIDARG; // for test
762 
763       PRF(printf("\nAfter Write needContinue = %d\n", (unsigned)needContinue));
764       PRF(printf("\nprocessed = %d\n", (unsigned)p->inProcessed));
765 
766       if (res != SZ_OK)
767       {
768         PRF(printf("\nWrite error = %d\n", res));
769         isErrorMode = True;
770         p->wasInterrupted = True;
771       }
772       if (res != SZ_OK
773           || (!needContinue && !finish))
774       {
775         PRF(printf("\nWrite Interrupt error = %x\n", res));
776         MtDec_Interrupt(p, blockIndex);
777       }
778     }
779 
780     if (canRecode)
781     if (!needCode
782         || res != SZ_OK
783         || p->wasInterrupted
784         || codeRes != SZ_OK
785         || wasInterrupted
786         || p->numFilledThreads != 0
787         || isErrorMode)
788     {
789       if (p->numFilledThreads == 0)
790         p->filledThreadStart = t->index;
791       if (inDataSize != 0 || !finish)
792       {
793         t->inDataSize_Start = inDataSize_Start;
794         t->inDataSize = inDataSize;
795         p->numFilledThreads++;
796       }
797       PRF(printf("\np->numFilledThreads = %d\n", p->numFilledThreads));
798       PRF(printf("p->filledThreadStart = %d\n", p->filledThreadStart));
799     }
800 
801     if (!finish)
802     {
803       RINOK_THREAD(Event_Set(&nextThread->canWrite));
804     }
805     else
806     {
807       if (needContinue)
808       {
809         // we restore decoding with new iteration
810         RINOK_THREAD(Event_Set(&p->threads[0].canWrite));
811       }
812       else
813       {
814         // we exit from decoding
815         if (t->index == 0)
816           return SZ_OK;
817         p->exitThread = True;
818       }
819       RINOK_THREAD(Event_Set(&p->threads[0].canRead));
820     }
821   }
822   }
823 }
824 
825 #ifdef _WIN32
826 #define USE_ALLOCA
827 #endif
828 
829 #ifdef USE_ALLOCA
830 #ifdef _WIN32
831 #include <malloc.h>
832 #else
833 #include <stdlib.h>
834 #endif
835 #endif
836 
837 
ThreadFunc1(void * pp)838 static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc1(void *pp)
839 {
840   WRes res;
841 
842   CMtDecThread *t = (CMtDecThread *)pp;
843   CMtDec *p;
844 
845   // fprintf(stdout, "\n%d = %p\n", t->index, &t);
846 
847   res = ThreadFunc2(t);
848   p = t->mtDec;
849   if (res == 0)
850     return p->exitThreadWRes;
851   {
852     // it's unexpected situation for some threading function error
853     if (p->exitThreadWRes == 0)
854       p->exitThreadWRes = res;
855     PRF(printf("\nthread exit error = %d\n", res));
856     p->exitThread = True;
857     Event_Set(&p->threads[0].canRead);
858     Event_Set(&p->threads[0].canWrite);
859     MtProgress_SetError(&p->mtProgress, MY_SRes_HRESULT_FROM_WRes(res));
860   }
861   return res;
862 }
863 
ThreadFunc(void * pp)864 static MY_NO_INLINE THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp)
865 {
866   CMtDecThread *t = (CMtDecThread *)pp;
867 
868   // fprintf(stderr, "\n%d = %p - before", t->index, &t);
869   #ifdef USE_ALLOCA
870   t->allocaPtr = alloca(t->index * 128);
871   #endif
872   return ThreadFunc1(pp);
873 }
874 
875 
MtDec_PrepareRead(CMtDec * p)876 int MtDec_PrepareRead(CMtDec *p)
877 {
878   if (p->crossBlock && p->crossStart == p->crossEnd)
879   {
880     ISzAlloc_Free(p->alloc, p->crossBlock);
881     p->crossBlock = NULL;
882   }
883 
884   {
885     unsigned i;
886     for (i = 0; i < MTDEC__THREADS_MAX; i++)
887       if (i > p->numStartedThreads
888           || p->numFilledThreads <=
889             (i >= p->filledThreadStart ?
890               i - p->filledThreadStart :
891               i + p->numStartedThreads - p->filledThreadStart))
892         MtDecThread_FreeInBufs(&p->threads[i]);
893   }
894 
895   return (p->numFilledThreads != 0) || (p->crossStart != p->crossEnd);
896 }
897 
898 
MtDec_Read(CMtDec * p,size_t * inLim)899 const Byte *MtDec_Read(CMtDec *p, size_t *inLim)
900 {
901   while (p->numFilledThreads != 0)
902   {
903     CMtDecThread *t = &p->threads[p->filledThreadStart];
904 
905     if (*inLim != 0)
906     {
907       {
908         void *link = t->inBuf;
909         void *next = ((CMtDecBufLink *)link)->next;
910         ISzAlloc_Free(p->alloc, link);
911         t->inBuf = next;
912       }
913 
914       if (t->inDataSize == 0)
915       {
916         MtDecThread_FreeInBufs(t);
917         if (--p->numFilledThreads == 0)
918           break;
919         if (++p->filledThreadStart == p->numStartedThreads)
920           p->filledThreadStart = 0;
921         t = &p->threads[p->filledThreadStart];
922       }
923     }
924 
925     {
926       size_t lim = t->inDataSize_Start;
927       if (lim != 0)
928         t->inDataSize_Start = 0;
929       else
930       {
931         UInt64 rem = t->inDataSize;
932         lim = p->inBufSize;
933         if (lim > rem)
934           lim = (size_t)rem;
935       }
936       t->inDataSize -= lim;
937       *inLim = lim;
938       return (const Byte *)MTDEC__DATA_PTR_FROM_LINK(t->inBuf);
939     }
940   }
941 
942   {
943     size_t crossSize = p->crossEnd - p->crossStart;
944     if (crossSize != 0)
945     {
946       const Byte *data = MTDEC__DATA_PTR_FROM_LINK(p->crossBlock) + p->crossStart;
947       *inLim = crossSize;
948       p->crossStart = 0;
949       p->crossEnd = 0;
950       return data;
951     }
952     *inLim = 0;
953     if (p->crossBlock)
954     {
955       ISzAlloc_Free(p->alloc, p->crossBlock);
956       p->crossBlock = NULL;
957     }
958     return NULL;
959   }
960 }
961 
962 
MtDec_Construct(CMtDec * p)963 void MtDec_Construct(CMtDec *p)
964 {
965   unsigned i;
966 
967   p->inBufSize = (size_t)1 << 18;
968 
969   p->numThreadsMax = 0;
970 
971   p->inStream = NULL;
972 
973   // p->inData = NULL;
974   // p->inDataSize = 0;
975 
976   p->crossBlock = NULL;
977   p->crossStart = 0;
978   p->crossEnd = 0;
979 
980   p->numFilledThreads = 0;
981 
982   p->progress = NULL;
983   p->alloc = NULL;
984 
985   p->mtCallback = NULL;
986   p->mtCallbackObject = NULL;
987 
988   p->allocatedBufsSize = 0;
989 
990   for (i = 0; i < MTDEC__THREADS_MAX; i++)
991   {
992     CMtDecThread *t = &p->threads[i];
993     t->mtDec = p;
994     t->index = i;
995     t->inBuf = NULL;
996     Event_Construct(&t->canRead);
997     Event_Construct(&t->canWrite);
998     Thread_Construct(&t->thread);
999   }
1000 
1001   // Event_Construct(&p->finishedEvent);
1002 
1003   CriticalSection_Init(&p->mtProgress.cs);
1004 }
1005 
1006 
MtDec_Free(CMtDec * p)1007 static void MtDec_Free(CMtDec *p)
1008 {
1009   unsigned i;
1010 
1011   p->exitThread = True;
1012 
1013   for (i = 0; i < MTDEC__THREADS_MAX; i++)
1014     MtDecThread_Destruct(&p->threads[i]);
1015 
1016   // Event_Close(&p->finishedEvent);
1017 
1018   if (p->crossBlock)
1019   {
1020     ISzAlloc_Free(p->alloc, p->crossBlock);
1021     p->crossBlock = NULL;
1022   }
1023 }
1024 
1025 
MtDec_Destruct(CMtDec * p)1026 void MtDec_Destruct(CMtDec *p)
1027 {
1028   MtDec_Free(p);
1029 
1030   CriticalSection_Delete(&p->mtProgress.cs);
1031 }
1032 
1033 
MtDec_Code(CMtDec * p)1034 SRes MtDec_Code(CMtDec *p)
1035 {
1036   unsigned i;
1037 
1038   p->inProcessed = 0;
1039 
1040   p->blockIndex = 1; // it must be larger than not_defined index (0)
1041   p->isAllocError = False;
1042   p->overflow = False;
1043   p->threadingErrorSRes = SZ_OK;
1044 
1045   p->needContinue = True;
1046 
1047   p->readWasFinished = False;
1048   p->needInterrupt = False;
1049   p->interruptIndex = (UInt64)(Int64)-1;
1050 
1051   p->readProcessed = 0;
1052   p->readRes = SZ_OK;
1053   p->codeRes = SZ_OK;
1054   p->wasInterrupted = False;
1055 
1056   p->crossStart = 0;
1057   p->crossEnd = 0;
1058 
1059   p->filledThreadStart = 0;
1060   p->numFilledThreads = 0;
1061 
1062   {
1063     unsigned numThreads = p->numThreadsMax;
1064     if (numThreads > MTDEC__THREADS_MAX)
1065       numThreads = MTDEC__THREADS_MAX;
1066     p->numStartedThreads_Limit = numThreads;
1067     p->numStartedThreads = 0;
1068   }
1069 
1070   if (p->inBufSize != p->allocatedBufsSize)
1071   {
1072     for (i = 0; i < MTDEC__THREADS_MAX; i++)
1073     {
1074       CMtDecThread *t = &p->threads[i];
1075       if (t->inBuf)
1076         MtDecThread_FreeInBufs(t);
1077     }
1078     if (p->crossBlock)
1079     {
1080       ISzAlloc_Free(p->alloc, p->crossBlock);
1081       p->crossBlock = NULL;
1082     }
1083 
1084     p->allocatedBufsSize = p->inBufSize;
1085   }
1086 
1087   MtProgress_Init(&p->mtProgress, p->progress);
1088 
1089   // RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->finishedEvent));
1090   p->exitThread = False;
1091   p->exitThreadWRes = 0;
1092 
1093   {
1094     WRes wres;
1095     WRes sres;
1096     CMtDecThread *nextThread = &p->threads[p->numStartedThreads++];
1097     // wres = MtDecThread_CreateAndStart(nextThread);
1098     wres = MtDecThread_CreateEvents(nextThread);
1099     if (wres == 0) { wres = Event_Set(&nextThread->canWrite);
1100     if (wres == 0) { wres = Event_Set(&nextThread->canRead);
1101     if (wres == 0) { wres = ThreadFunc(nextThread);
1102     if (wres != 0)
1103     {
1104       p->needContinue = False;
1105       MtDec_CloseThreads(p);
1106     }}}}
1107 
1108     // wres = 17; // for test
1109     // wres = Event_Wait(&p->finishedEvent);
1110 
1111     sres = MY_SRes_HRESULT_FROM_WRes(wres);
1112 
1113     if (sres != 0)
1114       p->threadingErrorSRes = sres;
1115 
1116     if (
1117         // wres == 0
1118         // wres != 0
1119         // || p->mtc.codeRes == SZ_ERROR_MEM
1120         p->isAllocError
1121         || p->threadingErrorSRes != SZ_OK
1122         || p->overflow)
1123     {
1124       // p->needContinue = True;
1125     }
1126     else
1127       p->needContinue = False;
1128 
1129     if (p->needContinue)
1130       return SZ_OK;
1131 
1132     // if (sres != SZ_OK)
1133       return sres;
1134     // return E_FAIL;
1135   }
1136 }
1137 
1138 #endif
1139