1 /* MtDec.c -- Multi-thread Decoder
2 2023-04-02 : Igor Pavlov : Public domain */
3
4 #include "Precomp.h"
5
6 // #define SHOW_DEBUG_INFO
7
8 // #include <stdio.h>
9 #include <string.h>
10
11 #ifdef SHOW_DEBUG_INFO
12 #include <stdio.h>
13 #endif
14
15 #include "MtDec.h"
16
17 #ifndef Z7_ST
18
19 #ifdef SHOW_DEBUG_INFO
20 #define PRF(x) x
21 #else
22 #define PRF(x)
23 #endif
24
25 #define PRF_STR_INT(s, d) PRF(printf("\n" s " %d\n", (unsigned)d))
26
MtProgress_Init(CMtProgress * p,ICompressProgressPtr progress)27 void MtProgress_Init(CMtProgress *p, ICompressProgressPtr progress)
28 {
29 p->progress = progress;
30 p->res = SZ_OK;
31 p->totalInSize = 0;
32 p->totalOutSize = 0;
33 }
34
35
MtProgress_Progress_ST(CMtProgress * p)36 SRes MtProgress_Progress_ST(CMtProgress *p)
37 {
38 if (p->res == SZ_OK && p->progress)
39 if (ICompressProgress_Progress(p->progress, p->totalInSize, p->totalOutSize) != SZ_OK)
40 p->res = SZ_ERROR_PROGRESS;
41 return p->res;
42 }
43
44
MtProgress_ProgressAdd(CMtProgress * p,UInt64 inSize,UInt64 outSize)45 SRes MtProgress_ProgressAdd(CMtProgress *p, UInt64 inSize, UInt64 outSize)
46 {
47 SRes res;
48 CriticalSection_Enter(&p->cs);
49
50 p->totalInSize += inSize;
51 p->totalOutSize += outSize;
52 if (p->res == SZ_OK && p->progress)
53 if (ICompressProgress_Progress(p->progress, p->totalInSize, p->totalOutSize) != SZ_OK)
54 p->res = SZ_ERROR_PROGRESS;
55 res = p->res;
56
57 CriticalSection_Leave(&p->cs);
58 return res;
59 }
60
61
MtProgress_GetError(CMtProgress * p)62 SRes MtProgress_GetError(CMtProgress *p)
63 {
64 SRes res;
65 CriticalSection_Enter(&p->cs);
66 res = p->res;
67 CriticalSection_Leave(&p->cs);
68 return res;
69 }
70
71
MtProgress_SetError(CMtProgress * p,SRes res)72 void MtProgress_SetError(CMtProgress *p, SRes res)
73 {
74 CriticalSection_Enter(&p->cs);
75 if (p->res == SZ_OK)
76 p->res = res;
77 CriticalSection_Leave(&p->cs);
78 }
79
80
81 #define RINOK_THREAD(x) RINOK_WRes(x)
82
83
84 struct CMtDecBufLink_
85 {
86 struct CMtDecBufLink_ *next;
87 void *pad[3];
88 };
89
90 typedef struct CMtDecBufLink_ CMtDecBufLink;
91
92 #define MTDEC__LINK_DATA_OFFSET sizeof(CMtDecBufLink)
93 #define MTDEC__DATA_PTR_FROM_LINK(link) ((Byte *)(link) + MTDEC__LINK_DATA_OFFSET)
94
95
96
97 static THREAD_FUNC_DECL MtDec_ThreadFunc(void *pp);
98
99
MtDecThread_CreateEvents(CMtDecThread * t)100 static WRes MtDecThread_CreateEvents(CMtDecThread *t)
101 {
102 WRes wres = AutoResetEvent_OptCreate_And_Reset(&t->canWrite);
103 if (wres == 0)
104 {
105 wres = AutoResetEvent_OptCreate_And_Reset(&t->canRead);
106 if (wres == 0)
107 return SZ_OK;
108 }
109 return wres;
110 }
111
112
MtDecThread_CreateAndStart(CMtDecThread * t)113 static SRes MtDecThread_CreateAndStart(CMtDecThread *t)
114 {
115 WRes wres = MtDecThread_CreateEvents(t);
116 // wres = 17; // for test
117 if (wres == 0)
118 {
119 if (Thread_WasCreated(&t->thread))
120 return SZ_OK;
121 wres = Thread_Create(&t->thread, MtDec_ThreadFunc, t);
122 if (wres == 0)
123 return SZ_OK;
124 }
125 return MY_SRes_HRESULT_FROM_WRes(wres);
126 }
127
128
MtDecThread_FreeInBufs(CMtDecThread * t)129 void MtDecThread_FreeInBufs(CMtDecThread *t)
130 {
131 if (t->inBuf)
132 {
133 void *link = t->inBuf;
134 t->inBuf = NULL;
135 do
136 {
137 void *next = ((CMtDecBufLink *)link)->next;
138 ISzAlloc_Free(t->mtDec->alloc, link);
139 link = next;
140 }
141 while (link);
142 }
143 }
144
145
MtDecThread_CloseThread(CMtDecThread * t)146 static void MtDecThread_CloseThread(CMtDecThread *t)
147 {
148 if (Thread_WasCreated(&t->thread))
149 {
150 Event_Set(&t->canWrite); /* we can disable it. There are no threads waiting canWrite in normal cases */
151 Event_Set(&t->canRead);
152 Thread_Wait_Close(&t->thread);
153 }
154
155 Event_Close(&t->canRead);
156 Event_Close(&t->canWrite);
157 }
158
MtDec_CloseThreads(CMtDec * p)159 static void MtDec_CloseThreads(CMtDec *p)
160 {
161 unsigned i;
162 for (i = 0; i < MTDEC_THREADS_MAX; i++)
163 MtDecThread_CloseThread(&p->threads[i]);
164 }
165
MtDecThread_Destruct(CMtDecThread * t)166 static void MtDecThread_Destruct(CMtDecThread *t)
167 {
168 MtDecThread_CloseThread(t);
169 MtDecThread_FreeInBufs(t);
170 }
171
172
173
MtDec_GetError_Spec(CMtDec * p,UInt64 interruptIndex,BoolInt * wasInterrupted)174 static SRes MtDec_GetError_Spec(CMtDec *p, UInt64 interruptIndex, BoolInt *wasInterrupted)
175 {
176 SRes res;
177 CriticalSection_Enter(&p->mtProgress.cs);
178 *wasInterrupted = (p->needInterrupt && interruptIndex > p->interruptIndex);
179 res = p->mtProgress.res;
180 CriticalSection_Leave(&p->mtProgress.cs);
181 return res;
182 }
183
MtDec_Progress_GetError_Spec(CMtDec * p,UInt64 inSize,UInt64 outSize,UInt64 interruptIndex,BoolInt * wasInterrupted)184 static SRes MtDec_Progress_GetError_Spec(CMtDec *p, UInt64 inSize, UInt64 outSize, UInt64 interruptIndex, BoolInt *wasInterrupted)
185 {
186 SRes res;
187 CriticalSection_Enter(&p->mtProgress.cs);
188
189 p->mtProgress.totalInSize += inSize;
190 p->mtProgress.totalOutSize += outSize;
191 if (p->mtProgress.res == SZ_OK && p->mtProgress.progress)
192 if (ICompressProgress_Progress(p->mtProgress.progress, p->mtProgress.totalInSize, p->mtProgress.totalOutSize) != SZ_OK)
193 p->mtProgress.res = SZ_ERROR_PROGRESS;
194
195 *wasInterrupted = (p->needInterrupt && interruptIndex > p->interruptIndex);
196 res = p->mtProgress.res;
197
198 CriticalSection_Leave(&p->mtProgress.cs);
199
200 return res;
201 }
202
MtDec_Interrupt(CMtDec * p,UInt64 interruptIndex)203 static void MtDec_Interrupt(CMtDec *p, UInt64 interruptIndex)
204 {
205 CriticalSection_Enter(&p->mtProgress.cs);
206 if (!p->needInterrupt || interruptIndex < p->interruptIndex)
207 {
208 p->interruptIndex = interruptIndex;
209 p->needInterrupt = True;
210 }
211 CriticalSection_Leave(&p->mtProgress.cs);
212 }
213
MtDec_GetCrossBuff(CMtDec * p)214 Byte *MtDec_GetCrossBuff(CMtDec *p)
215 {
216 Byte *cr = p->crossBlock;
217 if (!cr)
218 {
219 cr = (Byte *)ISzAlloc_Alloc(p->alloc, MTDEC__LINK_DATA_OFFSET + p->inBufSize);
220 if (!cr)
221 return NULL;
222 p->crossBlock = cr;
223 }
224 return MTDEC__DATA_PTR_FROM_LINK(cr);
225 }
226
227
228 /*
229 MtDec_ThreadFunc2() returns:
230 0 - in all normal cases (even for stream error or memory allocation error)
231 (!= 0) - WRes error return by system threading function
232 */
233
234 // #define MTDEC_ProgessStep (1 << 22)
235 #define MTDEC_ProgessStep (1 << 0)
236
MtDec_ThreadFunc2(CMtDecThread * t)237 static WRes MtDec_ThreadFunc2(CMtDecThread *t)
238 {
239 CMtDec *p = t->mtDec;
240
241 PRF_STR_INT("MtDec_ThreadFunc2", t->index)
242
243 // SetThreadAffinityMask(GetCurrentThread(), 1 << t->index);
244
245 for (;;)
246 {
247 SRes res, codeRes;
248 BoolInt wasInterrupted, isAllocError, overflow, finish;
249 SRes threadingErrorSRes;
250 BoolInt needCode, needWrite, needContinue;
251
252 size_t inDataSize_Start;
253 UInt64 inDataSize;
254 // UInt64 inDataSize_Full;
255
256 UInt64 blockIndex;
257
258 UInt64 inPrev = 0;
259 UInt64 outPrev = 0;
260 UInt64 inCodePos;
261 UInt64 outCodePos;
262
263 Byte *afterEndData = NULL;
264 size_t afterEndData_Size = 0;
265 BoolInt afterEndData_IsCross = False;
266
267 BoolInt canCreateNewThread = False;
268 // CMtDecCallbackInfo parse;
269 CMtDecThread *nextThread;
270
271 PRF_STR_INT("=============== Event_Wait(&t->canRead)", t->index)
272
273 RINOK_THREAD(Event_Wait(&t->canRead))
274 if (p->exitThread)
275 return 0;
276
277 PRF_STR_INT("after Event_Wait(&t->canRead)", t->index)
278
279 // if (t->index == 3) return 19; // for test
280
281 blockIndex = p->blockIndex++;
282
283 // PRF(printf("\ncanRead\n"))
284
285 res = MtDec_Progress_GetError_Spec(p, 0, 0, blockIndex, &wasInterrupted);
286
287 finish = p->readWasFinished;
288 needCode = False;
289 needWrite = False;
290 isAllocError = False;
291 overflow = False;
292
293 inDataSize_Start = 0;
294 inDataSize = 0;
295 // inDataSize_Full = 0;
296
297 if (res == SZ_OK && !wasInterrupted)
298 {
299 // if (p->inStream)
300 {
301 CMtDecBufLink *prev = NULL;
302 CMtDecBufLink *link = (CMtDecBufLink *)t->inBuf;
303 size_t crossSize = p->crossEnd - p->crossStart;
304
305 PRF(printf("\ncrossSize = %d\n", crossSize));
306
307 for (;;)
308 {
309 if (!link)
310 {
311 link = (CMtDecBufLink *)ISzAlloc_Alloc(p->alloc, MTDEC__LINK_DATA_OFFSET + p->inBufSize);
312 if (!link)
313 {
314 finish = True;
315 // p->allocError_for_Read_BlockIndex = blockIndex;
316 isAllocError = True;
317 break;
318 }
319 link->next = NULL;
320 if (prev)
321 {
322 // static unsigned g_num = 0;
323 // printf("\n%6d : %x", ++g_num, (unsigned)(size_t)((Byte *)link - (Byte *)prev));
324 prev->next = link;
325 }
326 else
327 t->inBuf = (void *)link;
328 }
329
330 {
331 Byte *data = MTDEC__DATA_PTR_FROM_LINK(link);
332 Byte *parseData = data;
333 size_t size;
334
335 if (crossSize != 0)
336 {
337 inDataSize = crossSize;
338 // inDataSize_Full = inDataSize;
339 inDataSize_Start = crossSize;
340 size = crossSize;
341 parseData = MTDEC__DATA_PTR_FROM_LINK(p->crossBlock) + p->crossStart;
342 PRF(printf("\ncross : crossStart = %7d crossEnd = %7d finish = %1d",
343 (int)p->crossStart, (int)p->crossEnd, (int)finish));
344 }
345 else
346 {
347 size = p->inBufSize;
348
349 res = SeqInStream_ReadMax(p->inStream, data, &size);
350
351 // size = 10; // test
352
353 inDataSize += size;
354 // inDataSize_Full = inDataSize;
355 if (!prev)
356 inDataSize_Start = size;
357
358 p->readProcessed += size;
359 finish = (size != p->inBufSize);
360 if (finish)
361 p->readWasFinished = True;
362
363 // res = E_INVALIDARG; // test
364
365 if (res != SZ_OK)
366 {
367 // PRF(printf("\nRead error = %d\n", res))
368 // we want to decode all data before error
369 p->readRes = res;
370 // p->readError_BlockIndex = blockIndex;
371 p->readWasFinished = True;
372 finish = True;
373 res = SZ_OK;
374 // break;
375 }
376
377 if (inDataSize - inPrev >= MTDEC_ProgessStep)
378 {
379 res = MtDec_Progress_GetError_Spec(p, 0, 0, blockIndex, &wasInterrupted);
380 if (res != SZ_OK || wasInterrupted)
381 break;
382 inPrev = inDataSize;
383 }
384 }
385
386 {
387 CMtDecCallbackInfo parse;
388
389 parse.startCall = (prev == NULL);
390 parse.src = parseData;
391 parse.srcSize = size;
392 parse.srcFinished = finish;
393 parse.canCreateNewThread = True;
394
395 PRF(printf("\nParse size = %d\n", (unsigned)size));
396
397 p->mtCallback->Parse(p->mtCallbackObject, t->index, &parse);
398
399 PRF(printf(" Parse processed = %d, state = %d \n", (unsigned)parse.srcSize, (unsigned)parse.state));
400
401 needWrite = True;
402 canCreateNewThread = parse.canCreateNewThread;
403
404 // printf("\n\n%12I64u %12I64u", (UInt64)p->mtProgress.totalInSize, (UInt64)p->mtProgress.totalOutSize);
405
406 if (
407 // parseRes != SZ_OK ||
408 // inDataSize - (size - parse.srcSize) > p->inBlockMax
409 // ||
410 parse.state == MTDEC_PARSE_OVERFLOW
411 // || wasInterrupted
412 )
413 {
414 // Overflow or Parse error - switch from MT decoding to ST decoding
415 finish = True;
416 overflow = True;
417
418 {
419 PRF(printf("\n Overflow"));
420 // PRF(printf("\nisBlockFinished = %d", (unsigned)parse.blockWasFinished));
421 PRF(printf("\n inDataSize = %d", (unsigned)inDataSize));
422 }
423
424 if (crossSize != 0)
425 memcpy(data, parseData, size);
426 p->crossStart = 0;
427 p->crossEnd = 0;
428 break;
429 }
430
431 if (crossSize != 0)
432 {
433 memcpy(data, parseData, parse.srcSize);
434 p->crossStart += parse.srcSize;
435 }
436
437 if (parse.state != MTDEC_PARSE_CONTINUE || finish)
438 {
439 // we don't need to parse in current thread anymore
440
441 if (parse.state == MTDEC_PARSE_END)
442 finish = True;
443
444 needCode = True;
445 // p->crossFinished = finish;
446
447 if (parse.srcSize == size)
448 {
449 // full parsed - no cross transfer
450 p->crossStart = 0;
451 p->crossEnd = 0;
452 break;
453 }
454
455 if (parse.state == MTDEC_PARSE_END)
456 {
457 afterEndData = parseData + parse.srcSize;
458 afterEndData_Size = size - parse.srcSize;
459 if (crossSize != 0)
460 afterEndData_IsCross = True;
461 // we reduce data size to required bytes (parsed only)
462 inDataSize -= afterEndData_Size;
463 if (!prev)
464 inDataSize_Start = parse.srcSize;
465 break;
466 }
467
468 {
469 // partial parsed - need cross transfer
470 if (crossSize != 0)
471 inDataSize = parse.srcSize; // it's only parsed now
472 else
473 {
474 // partial parsed - is not in initial cross block - we need to copy new data to cross block
475 Byte *cr = MtDec_GetCrossBuff(p);
476 if (!cr)
477 {
478 {
479 PRF(printf("\ncross alloc error error\n"));
480 // res = SZ_ERROR_MEM;
481 finish = True;
482 // p->allocError_for_Read_BlockIndex = blockIndex;
483 isAllocError = True;
484 break;
485 }
486 }
487
488 {
489 size_t crSize = size - parse.srcSize;
490 inDataSize -= crSize;
491 p->crossEnd = crSize;
492 p->crossStart = 0;
493 memcpy(cr, parseData + parse.srcSize, crSize);
494 }
495 }
496
497 // inDataSize_Full = inDataSize;
498 if (!prev)
499 inDataSize_Start = parse.srcSize; // it's partial size (parsed only)
500
501 finish = False;
502 break;
503 }
504 }
505
506 if (parse.srcSize != size)
507 {
508 res = SZ_ERROR_FAIL;
509 PRF(printf("\nfinished error SZ_ERROR_FAIL = %d\n", res));
510 break;
511 }
512 }
513 }
514
515 prev = link;
516 link = link->next;
517
518 if (crossSize != 0)
519 {
520 crossSize = 0;
521 p->crossStart = 0;
522 p->crossEnd = 0;
523 }
524 }
525 }
526
527 if (res == SZ_OK)
528 res = MtDec_GetError_Spec(p, blockIndex, &wasInterrupted);
529 }
530
531 codeRes = SZ_OK;
532
533 if (res == SZ_OK && needCode && !wasInterrupted)
534 {
535 codeRes = p->mtCallback->PreCode(p->mtCallbackObject, t->index);
536 if (codeRes != SZ_OK)
537 {
538 needCode = False;
539 finish = True;
540 // SZ_ERROR_MEM is expected error here.
541 // if (codeRes == SZ_ERROR_MEM) - we will try single-thread decoding later.
542 // if (codeRes != SZ_ERROR_MEM) - we can stop decoding or try single-thread decoding.
543 }
544 }
545
546 if (res != SZ_OK || wasInterrupted)
547 finish = True;
548
549 nextThread = NULL;
550 threadingErrorSRes = SZ_OK;
551
552 if (!finish)
553 {
554 if (p->numStartedThreads < p->numStartedThreads_Limit && canCreateNewThread)
555 {
556 SRes res2 = MtDecThread_CreateAndStart(&p->threads[p->numStartedThreads]);
557 if (res2 == SZ_OK)
558 {
559 // if (p->numStartedThreads % 1000 == 0) PRF(printf("\n numStartedThreads=%d\n", p->numStartedThreads));
560 p->numStartedThreads++;
561 }
562 else
563 {
564 PRF(printf("\nERROR: numStartedThreads=%d\n", p->numStartedThreads));
565 if (p->numStartedThreads == 1)
566 {
567 // if only one thread is possible, we leave muti-threading code
568 finish = True;
569 needCode = False;
570 threadingErrorSRes = res2;
571 }
572 else
573 p->numStartedThreads_Limit = p->numStartedThreads;
574 }
575 }
576
577 if (!finish)
578 {
579 unsigned nextIndex = t->index + 1;
580 nextThread = &p->threads[nextIndex >= p->numStartedThreads ? 0 : nextIndex];
581 RINOK_THREAD(Event_Set(&nextThread->canRead))
582 // We have started executing for new iteration (with next thread)
583 // And that next thread now is responsible for possible exit from decoding (threading_code)
584 }
585 }
586
587 // each call of Event_Set(&nextThread->canRead) must be followed by call of Event_Set(&nextThread->canWrite)
588 // if ( !finish ) we must call Event_Set(&nextThread->canWrite) in any case
589 // if ( finish ) we switch to single-thread mode and there are 2 ways at the end of current iteration (current block):
590 // - if (needContinue) after Write(&needContinue), we restore decoding with new iteration
591 // - otherwise we stop decoding and exit from MtDec_ThreadFunc2()
592
593 // Don't change (finish) variable in the further code
594
595
596 // ---------- CODE ----------
597
598 inPrev = 0;
599 outPrev = 0;
600 inCodePos = 0;
601 outCodePos = 0;
602
603 if (res == SZ_OK && needCode && codeRes == SZ_OK)
604 {
605 BoolInt isStartBlock = True;
606 CMtDecBufLink *link = (CMtDecBufLink *)t->inBuf;
607
608 for (;;)
609 {
610 size_t inSize;
611 int stop;
612
613 if (isStartBlock)
614 inSize = inDataSize_Start;
615 else
616 {
617 UInt64 rem = inDataSize - inCodePos;
618 inSize = p->inBufSize;
619 if (inSize > rem)
620 inSize = (size_t)rem;
621 }
622
623 inCodePos += inSize;
624 stop = True;
625
626 codeRes = p->mtCallback->Code(p->mtCallbackObject, t->index,
627 (const Byte *)MTDEC__DATA_PTR_FROM_LINK(link), inSize,
628 (inCodePos == inDataSize), // srcFinished
629 &inCodePos, &outCodePos, &stop);
630
631 if (codeRes != SZ_OK)
632 {
633 PRF(printf("\nCode Interrupt error = %x\n", codeRes));
634 // we interrupt only later blocks
635 MtDec_Interrupt(p, blockIndex);
636 break;
637 }
638
639 if (stop || inCodePos == inDataSize)
640 break;
641
642 {
643 const UInt64 inDelta = inCodePos - inPrev;
644 const UInt64 outDelta = outCodePos - outPrev;
645 if (inDelta >= MTDEC_ProgessStep || outDelta >= MTDEC_ProgessStep)
646 {
647 // Sleep(1);
648 res = MtDec_Progress_GetError_Spec(p, inDelta, outDelta, blockIndex, &wasInterrupted);
649 if (res != SZ_OK || wasInterrupted)
650 break;
651 inPrev = inCodePos;
652 outPrev = outCodePos;
653 }
654 }
655
656 link = link->next;
657 isStartBlock = False;
658 }
659 }
660
661
662 // ---------- WRITE ----------
663
664 RINOK_THREAD(Event_Wait(&t->canWrite))
665
666 {
667 BoolInt isErrorMode = False;
668 BoolInt canRecode = True;
669 BoolInt needWriteToStream = needWrite;
670
671 if (p->exitThread) return 0; // it's never executed in normal cases
672
673 if (p->wasInterrupted)
674 wasInterrupted = True;
675 else
676 {
677 if (codeRes != SZ_OK) // || !needCode // check it !!!
678 {
679 p->wasInterrupted = True;
680 p->codeRes = codeRes;
681 if (codeRes == SZ_ERROR_MEM)
682 isAllocError = True;
683 }
684
685 if (threadingErrorSRes)
686 {
687 p->wasInterrupted = True;
688 p->threadingErrorSRes = threadingErrorSRes;
689 needWriteToStream = False;
690 }
691 if (isAllocError)
692 {
693 p->wasInterrupted = True;
694 p->isAllocError = True;
695 needWriteToStream = False;
696 }
697 if (overflow)
698 {
699 p->wasInterrupted = True;
700 p->overflow = True;
701 needWriteToStream = False;
702 }
703 }
704
705 if (needCode)
706 {
707 if (wasInterrupted)
708 {
709 inCodePos = 0;
710 outCodePos = 0;
711 }
712 {
713 const UInt64 inDelta = inCodePos - inPrev;
714 const UInt64 outDelta = outCodePos - outPrev;
715 // if (inDelta != 0 || outDelta != 0)
716 res = MtProgress_ProgressAdd(&p->mtProgress, inDelta, outDelta);
717 }
718 }
719
720 needContinue = (!finish);
721
722 // if (res == SZ_OK && needWrite && !wasInterrupted)
723 if (needWrite)
724 {
725 // p->inProcessed += inCodePos;
726
727 PRF(printf("\n--Write afterSize = %d\n", (unsigned)afterEndData_Size));
728
729 res = p->mtCallback->Write(p->mtCallbackObject, t->index,
730 res == SZ_OK && needWriteToStream && !wasInterrupted, // needWrite
731 afterEndData, afterEndData_Size, afterEndData_IsCross,
732 &needContinue,
733 &canRecode);
734
735 // res = SZ_ERROR_FAIL; // for test
736
737 PRF(printf("\nAfter Write needContinue = %d\n", (unsigned)needContinue));
738 PRF(printf("\nprocessed = %d\n", (unsigned)p->inProcessed));
739
740 if (res != SZ_OK)
741 {
742 PRF(printf("\nWrite error = %d\n", res));
743 isErrorMode = True;
744 p->wasInterrupted = True;
745 }
746 if (res != SZ_OK
747 || (!needContinue && !finish))
748 {
749 PRF(printf("\nWrite Interrupt error = %x\n", res));
750 MtDec_Interrupt(p, blockIndex);
751 }
752 }
753
754 if (canRecode)
755 if (!needCode
756 || res != SZ_OK
757 || p->wasInterrupted
758 || codeRes != SZ_OK
759 || wasInterrupted
760 || p->numFilledThreads != 0
761 || isErrorMode)
762 {
763 if (p->numFilledThreads == 0)
764 p->filledThreadStart = t->index;
765 if (inDataSize != 0 || !finish)
766 {
767 t->inDataSize_Start = inDataSize_Start;
768 t->inDataSize = inDataSize;
769 p->numFilledThreads++;
770 }
771 PRF(printf("\np->numFilledThreads = %d\n", p->numFilledThreads));
772 PRF(printf("p->filledThreadStart = %d\n", p->filledThreadStart));
773 }
774
775 if (!finish)
776 {
777 RINOK_THREAD(Event_Set(&nextThread->canWrite))
778 }
779 else
780 {
781 if (needContinue)
782 {
783 // we restore decoding with new iteration
784 RINOK_THREAD(Event_Set(&p->threads[0].canWrite))
785 }
786 else
787 {
788 // we exit from decoding
789 if (t->index == 0)
790 return SZ_OK;
791 p->exitThread = True;
792 }
793 RINOK_THREAD(Event_Set(&p->threads[0].canRead))
794 }
795 }
796 }
797 }
798
799 #ifdef _WIN32
800 #define USE_ALLOCA
801 #endif
802
803 #ifdef USE_ALLOCA
804 #ifdef _WIN32
805 #include <malloc.h>
806 #else
807 #include <stdlib.h>
808 #endif
809 #endif
810
811
MtDec_ThreadFunc1(void * pp)812 static THREAD_FUNC_DECL MtDec_ThreadFunc1(void *pp)
813 {
814 WRes res;
815
816 CMtDecThread *t = (CMtDecThread *)pp;
817 CMtDec *p;
818
819 // fprintf(stdout, "\n%d = %p\n", t->index, &t);
820
821 res = MtDec_ThreadFunc2(t);
822 p = t->mtDec;
823 if (res == 0)
824 return (THREAD_FUNC_RET_TYPE)(UINT_PTR)p->exitThreadWRes;
825 {
826 // it's unexpected situation for some threading function error
827 if (p->exitThreadWRes == 0)
828 p->exitThreadWRes = res;
829 PRF(printf("\nthread exit error = %d\n", res));
830 p->exitThread = True;
831 Event_Set(&p->threads[0].canRead);
832 Event_Set(&p->threads[0].canWrite);
833 MtProgress_SetError(&p->mtProgress, MY_SRes_HRESULT_FROM_WRes(res));
834 }
835 return (THREAD_FUNC_RET_TYPE)(UINT_PTR)res;
836 }
837
MtDec_ThreadFunc(void * pp)838 static Z7_NO_INLINE THREAD_FUNC_DECL MtDec_ThreadFunc(void *pp)
839 {
840 #ifdef USE_ALLOCA
841 CMtDecThread *t = (CMtDecThread *)pp;
842 // fprintf(stderr, "\n%d = %p - before", t->index, &t);
843 t->allocaPtr = alloca(t->index * 128);
844 #endif
845 return MtDec_ThreadFunc1(pp);
846 }
847
848
MtDec_PrepareRead(CMtDec * p)849 int MtDec_PrepareRead(CMtDec *p)
850 {
851 if (p->crossBlock && p->crossStart == p->crossEnd)
852 {
853 ISzAlloc_Free(p->alloc, p->crossBlock);
854 p->crossBlock = NULL;
855 }
856
857 {
858 unsigned i;
859 for (i = 0; i < MTDEC_THREADS_MAX; i++)
860 if (i > p->numStartedThreads
861 || p->numFilledThreads <=
862 (i >= p->filledThreadStart ?
863 i - p->filledThreadStart :
864 i + p->numStartedThreads - p->filledThreadStart))
865 MtDecThread_FreeInBufs(&p->threads[i]);
866 }
867
868 return (p->numFilledThreads != 0) || (p->crossStart != p->crossEnd);
869 }
870
871
MtDec_Read(CMtDec * p,size_t * inLim)872 const Byte *MtDec_Read(CMtDec *p, size_t *inLim)
873 {
874 while (p->numFilledThreads != 0)
875 {
876 CMtDecThread *t = &p->threads[p->filledThreadStart];
877
878 if (*inLim != 0)
879 {
880 {
881 void *link = t->inBuf;
882 void *next = ((CMtDecBufLink *)link)->next;
883 ISzAlloc_Free(p->alloc, link);
884 t->inBuf = next;
885 }
886
887 if (t->inDataSize == 0)
888 {
889 MtDecThread_FreeInBufs(t);
890 if (--p->numFilledThreads == 0)
891 break;
892 if (++p->filledThreadStart == p->numStartedThreads)
893 p->filledThreadStart = 0;
894 t = &p->threads[p->filledThreadStart];
895 }
896 }
897
898 {
899 size_t lim = t->inDataSize_Start;
900 if (lim != 0)
901 t->inDataSize_Start = 0;
902 else
903 {
904 UInt64 rem = t->inDataSize;
905 lim = p->inBufSize;
906 if (lim > rem)
907 lim = (size_t)rem;
908 }
909 t->inDataSize -= lim;
910 *inLim = lim;
911 return (const Byte *)MTDEC__DATA_PTR_FROM_LINK(t->inBuf);
912 }
913 }
914
915 {
916 size_t crossSize = p->crossEnd - p->crossStart;
917 if (crossSize != 0)
918 {
919 const Byte *data = MTDEC__DATA_PTR_FROM_LINK(p->crossBlock) + p->crossStart;
920 *inLim = crossSize;
921 p->crossStart = 0;
922 p->crossEnd = 0;
923 return data;
924 }
925 *inLim = 0;
926 if (p->crossBlock)
927 {
928 ISzAlloc_Free(p->alloc, p->crossBlock);
929 p->crossBlock = NULL;
930 }
931 return NULL;
932 }
933 }
934
935
MtDec_Construct(CMtDec * p)936 void MtDec_Construct(CMtDec *p)
937 {
938 unsigned i;
939
940 p->inBufSize = (size_t)1 << 18;
941
942 p->numThreadsMax = 0;
943
944 p->inStream = NULL;
945
946 // p->inData = NULL;
947 // p->inDataSize = 0;
948
949 p->crossBlock = NULL;
950 p->crossStart = 0;
951 p->crossEnd = 0;
952
953 p->numFilledThreads = 0;
954
955 p->progress = NULL;
956 p->alloc = NULL;
957
958 p->mtCallback = NULL;
959 p->mtCallbackObject = NULL;
960
961 p->allocatedBufsSize = 0;
962
963 for (i = 0; i < MTDEC_THREADS_MAX; i++)
964 {
965 CMtDecThread *t = &p->threads[i];
966 t->mtDec = p;
967 t->index = i;
968 t->inBuf = NULL;
969 Event_Construct(&t->canRead);
970 Event_Construct(&t->canWrite);
971 Thread_CONSTRUCT(&t->thread)
972 }
973
974 // Event_Construct(&p->finishedEvent);
975
976 CriticalSection_Init(&p->mtProgress.cs);
977 }
978
979
MtDec_Free(CMtDec * p)980 static void MtDec_Free(CMtDec *p)
981 {
982 unsigned i;
983
984 p->exitThread = True;
985
986 for (i = 0; i < MTDEC_THREADS_MAX; i++)
987 MtDecThread_Destruct(&p->threads[i]);
988
989 // Event_Close(&p->finishedEvent);
990
991 if (p->crossBlock)
992 {
993 ISzAlloc_Free(p->alloc, p->crossBlock);
994 p->crossBlock = NULL;
995 }
996 }
997
998
MtDec_Destruct(CMtDec * p)999 void MtDec_Destruct(CMtDec *p)
1000 {
1001 MtDec_Free(p);
1002
1003 CriticalSection_Delete(&p->mtProgress.cs);
1004 }
1005
1006
MtDec_Code(CMtDec * p)1007 SRes MtDec_Code(CMtDec *p)
1008 {
1009 unsigned i;
1010
1011 p->inProcessed = 0;
1012
1013 p->blockIndex = 1; // it must be larger than not_defined index (0)
1014 p->isAllocError = False;
1015 p->overflow = False;
1016 p->threadingErrorSRes = SZ_OK;
1017
1018 p->needContinue = True;
1019
1020 p->readWasFinished = False;
1021 p->needInterrupt = False;
1022 p->interruptIndex = (UInt64)(Int64)-1;
1023
1024 p->readProcessed = 0;
1025 p->readRes = SZ_OK;
1026 p->codeRes = SZ_OK;
1027 p->wasInterrupted = False;
1028
1029 p->crossStart = 0;
1030 p->crossEnd = 0;
1031
1032 p->filledThreadStart = 0;
1033 p->numFilledThreads = 0;
1034
1035 {
1036 unsigned numThreads = p->numThreadsMax;
1037 if (numThreads > MTDEC_THREADS_MAX)
1038 numThreads = MTDEC_THREADS_MAX;
1039 p->numStartedThreads_Limit = numThreads;
1040 p->numStartedThreads = 0;
1041 }
1042
1043 if (p->inBufSize != p->allocatedBufsSize)
1044 {
1045 for (i = 0; i < MTDEC_THREADS_MAX; i++)
1046 {
1047 CMtDecThread *t = &p->threads[i];
1048 if (t->inBuf)
1049 MtDecThread_FreeInBufs(t);
1050 }
1051 if (p->crossBlock)
1052 {
1053 ISzAlloc_Free(p->alloc, p->crossBlock);
1054 p->crossBlock = NULL;
1055 }
1056
1057 p->allocatedBufsSize = p->inBufSize;
1058 }
1059
1060 MtProgress_Init(&p->mtProgress, p->progress);
1061
1062 // RINOK_THREAD(AutoResetEvent_OptCreate_And_Reset(&p->finishedEvent))
1063 p->exitThread = False;
1064 p->exitThreadWRes = 0;
1065
1066 {
1067 WRes wres;
1068 SRes sres;
1069 CMtDecThread *nextThread = &p->threads[p->numStartedThreads++];
1070 // wres = MtDecThread_CreateAndStart(nextThread);
1071 wres = MtDecThread_CreateEvents(nextThread);
1072 if (wres == 0) { wres = Event_Set(&nextThread->canWrite);
1073 if (wres == 0) { wres = Event_Set(&nextThread->canRead);
1074 if (wres == 0) { THREAD_FUNC_RET_TYPE res = MtDec_ThreadFunc(nextThread);
1075 wres = (WRes)(UINT_PTR)res;
1076 if (wres != 0)
1077 {
1078 p->needContinue = False;
1079 MtDec_CloseThreads(p);
1080 }}}}
1081
1082 // wres = 17; // for test
1083 // wres = Event_Wait(&p->finishedEvent);
1084
1085 sres = MY_SRes_HRESULT_FROM_WRes(wres);
1086
1087 if (sres != 0)
1088 p->threadingErrorSRes = sres;
1089
1090 if (
1091 // wres == 0
1092 // wres != 0
1093 // || p->mtc.codeRes == SZ_ERROR_MEM
1094 p->isAllocError
1095 || p->threadingErrorSRes != SZ_OK
1096 || p->overflow)
1097 {
1098 // p->needContinue = True;
1099 }
1100 else
1101 p->needContinue = False;
1102
1103 if (p->needContinue)
1104 return SZ_OK;
1105
1106 // if (sres != SZ_OK)
1107 return sres;
1108 // return SZ_ERROR_FAIL;
1109 }
1110 }
1111
1112 #endif
1113
1114 #undef PRF
1115