1 /* MtDec.c -- Multi-thread Decoder
2 2021-12-21 : Igor Pavlov : Public domain */
3
4 #include "Precomp.h"
5
6 // #define SHOW_DEBUG_INFO
7
8 // #include <stdio.h>
9 #include <string.h>
10
11 #ifdef SHOW_DEBUG_INFO
12 #include <stdio.h>
13 #endif
14
15 #include "MtDec.h"
16
17 #ifndef _7ZIP_ST
18
19 #ifdef SHOW_DEBUG_INFO
20 #define PRF(x) x
21 #else
22 #define PRF(x)
23 #endif
24
25 #define PRF_STR_INT(s, d) PRF(printf("\n" s " %d\n", (unsigned)d))
26
MtProgress_Init(CMtProgress * p,ICompressProgress * progress)27 void MtProgress_Init(CMtProgress *p, ICompressProgress *progress)
28 {
29 p->progress = progress;
30 p->res = SZ_OK;
31 p->totalInSize = 0;
32 p->totalOutSize = 0;
33 }
34
35
MtProgress_Progress_ST(CMtProgress * p)36 SRes MtProgress_Progress_ST(CMtProgress *p)
37 {
38 if (p->res == SZ_OK && p->progress)
39 if (ICompressProgress_Progress(p->progress, p->totalInSize, p->totalOutSize) != SZ_OK)
40 p->res = SZ_ERROR_PROGRESS;
41 return p->res;
42 }
43
44
MtProgress_ProgressAdd(CMtProgress * p,UInt64 inSize,UInt64 outSize)45 SRes MtProgress_ProgressAdd(CMtProgress *p, UInt64 inSize, UInt64 outSize)
46 {
47 SRes res;
48 CriticalSection_Enter(&p->cs);
49
50 p->totalInSize += inSize;
51 p->totalOutSize += outSize;
52 if (p->res == SZ_OK && p->progress)
53 if (ICompressProgress_Progress(p->progress, p->totalInSize, p->totalOutSize) != SZ_OK)
54 p->res = SZ_ERROR_PROGRESS;
55 res = p->res;
56
57 CriticalSection_Leave(&p->cs);
58 return res;
59 }
60
61
MtProgress_GetError(CMtProgress * p)62 SRes MtProgress_GetError(CMtProgress *p)
63 {
64 SRes res;
65 CriticalSection_Enter(&p->cs);
66 res = p->res;
67 CriticalSection_Leave(&p->cs);
68 return res;
69 }
70
71
MtProgress_SetError(CMtProgress * p,SRes res)72 void MtProgress_SetError(CMtProgress *p, SRes res)
73 {
74 CriticalSection_Enter(&p->cs);
75 if (p->res == SZ_OK)
76 p->res = res;
77 CriticalSection_Leave(&p->cs);
78 }
79
80
81 #define RINOK_THREAD(x) RINOK_WRes(x)
82
83
ArEvent_OptCreate_And_Reset(CEvent * p)84 static WRes ArEvent_OptCreate_And_Reset(CEvent *p)
85 {
86 if (Event_IsCreated(p))
87 return Event_Reset(p);
88 return AutoResetEvent_CreateNotSignaled(p);
89 }
90
91
92 struct __CMtDecBufLink
93 {
94 struct __CMtDecBufLink *next;
95 void *pad[3];
96 };
97
98 typedef struct __CMtDecBufLink CMtDecBufLink;
99
100 #define MTDEC__LINK_DATA_OFFSET sizeof(CMtDecBufLink)
101 #define MTDEC__DATA_PTR_FROM_LINK(link) ((Byte *)(link) + MTDEC__LINK_DATA_OFFSET)
102
103
104
105 static THREAD_FUNC_DECL ThreadFunc(void *pp);
106
107
MtDecThread_CreateEvents(CMtDecThread * t)108 static WRes MtDecThread_CreateEvents(CMtDecThread *t)
109 {
110 WRes wres = ArEvent_OptCreate_And_Reset(&t->canWrite);
111 if (wres == 0)
112 {
113 wres = ArEvent_OptCreate_And_Reset(&t->canRead);
114 if (wres == 0)
115 return SZ_OK;
116 }
117 return wres;
118 }
119
120
MtDecThread_CreateAndStart(CMtDecThread * t)121 static SRes MtDecThread_CreateAndStart(CMtDecThread *t)
122 {
123 WRes wres = MtDecThread_CreateEvents(t);
124 // wres = 17; // for test
125 if (wres == 0)
126 {
127 if (Thread_WasCreated(&t->thread))
128 return SZ_OK;
129 wres = Thread_Create(&t->thread, ThreadFunc, t);
130 if (wres == 0)
131 return SZ_OK;
132 }
133 return MY_SRes_HRESULT_FROM_WRes(wres);
134 }
135
136
MtDecThread_FreeInBufs(CMtDecThread * t)137 void MtDecThread_FreeInBufs(CMtDecThread *t)
138 {
139 if (t->inBuf)
140 {
141 void *link = t->inBuf;
142 t->inBuf = NULL;
143 do
144 {
145 void *next = ((CMtDecBufLink *)link)->next;
146 ISzAlloc_Free(t->mtDec->alloc, link);
147 link = next;
148 }
149 while (link);
150 }
151 }
152
153
MtDecThread_CloseThread(CMtDecThread * t)154 static void MtDecThread_CloseThread(CMtDecThread *t)
155 {
156 if (Thread_WasCreated(&t->thread))
157 {
158 Event_Set(&t->canWrite); /* we can disable it. There are no threads waiting canWrite in normal cases */
159 Event_Set(&t->canRead);
160 Thread_Wait_Close(&t->thread);
161 }
162
163 Event_Close(&t->canRead);
164 Event_Close(&t->canWrite);
165 }
166
MtDec_CloseThreads(CMtDec * p)167 static void MtDec_CloseThreads(CMtDec *p)
168 {
169 unsigned i;
170 for (i = 0; i < MTDEC__THREADS_MAX; i++)
171 MtDecThread_CloseThread(&p->threads[i]);
172 }
173
MtDecThread_Destruct(CMtDecThread * t)174 static void MtDecThread_Destruct(CMtDecThread *t)
175 {
176 MtDecThread_CloseThread(t);
177 MtDecThread_FreeInBufs(t);
178 }
179
180
181
FullRead(ISeqInStream * stream,Byte * data,size_t * processedSize)182 static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize)
183 {
184 size_t size = *processedSize;
185 *processedSize = 0;
186 while (size != 0)
187 {
188 size_t cur = size;
189 SRes res = ISeqInStream_Read(stream, data, &cur);
190 *processedSize += cur;
191 data += cur;
192 size -= cur;
193 RINOK(res);
194 if (cur == 0)
195 return SZ_OK;
196 }
197 return SZ_OK;
198 }
199
200
MtDec_GetError_Spec(CMtDec * p,UInt64 interruptIndex,BoolInt * wasInterrupted)201 static SRes MtDec_GetError_Spec(CMtDec *p, UInt64 interruptIndex, BoolInt *wasInterrupted)
202 {
203 SRes res;
204 CriticalSection_Enter(&p->mtProgress.cs);
205 *wasInterrupted = (p->needInterrupt && interruptIndex > p->interruptIndex);
206 res = p->mtProgress.res;
207 CriticalSection_Leave(&p->mtProgress.cs);
208 return res;
209 }
210
MtDec_Progress_GetError_Spec(CMtDec * p,UInt64 inSize,UInt64 outSize,UInt64 interruptIndex,BoolInt * wasInterrupted)211 static SRes MtDec_Progress_GetError_Spec(CMtDec *p, UInt64 inSize, UInt64 outSize, UInt64 interruptIndex, BoolInt *wasInterrupted)
212 {
213 SRes res;
214 CriticalSection_Enter(&p->mtProgress.cs);
215
216 p->mtProgress.totalInSize += inSize;
217 p->mtProgress.totalOutSize += outSize;
218 if (p->mtProgress.res == SZ_OK && p->mtProgress.progress)
219 if (ICompressProgress_Progress(p->mtProgress.progress, p->mtProgress.totalInSize, p->mtProgress.totalOutSize) != SZ_OK)
220 p->mtProgress.res = SZ_ERROR_PROGRESS;
221
222 *wasInterrupted = (p->needInterrupt && interruptIndex > p->interruptIndex);
223 res = p->mtProgress.res;
224
225 CriticalSection_Leave(&p->mtProgress.cs);
226
227 return res;
228 }
229
MtDec_Interrupt(CMtDec * p,UInt64 interruptIndex)230 static void MtDec_Interrupt(CMtDec *p, UInt64 interruptIndex)
231 {
232 CriticalSection_Enter(&p->mtProgress.cs);
233 if (!p->needInterrupt || interruptIndex < p->interruptIndex)
234 {
235 p->interruptIndex = interruptIndex;
236 p->needInterrupt = True;
237 }
238 CriticalSection_Leave(&p->mtProgress.cs);
239 }
240
MtDec_GetCrossBuff(CMtDec * p)241 Byte *MtDec_GetCrossBuff(CMtDec *p)
242 {
243 Byte *cr = p->crossBlock;
244 if (!cr)
245 {
246 cr = (Byte *)ISzAlloc_Alloc(p->alloc, MTDEC__LINK_DATA_OFFSET + p->inBufSize);
247 if (!cr)
248 return NULL;
249 p->crossBlock = cr;
250 }
251 return MTDEC__DATA_PTR_FROM_LINK(cr);
252 }
253
254
255 /*
256 ThreadFunc2() returns:
257 0 - in all normal cases (even for stream error or memory allocation error)
258 (!= 0) - WRes error return by system threading function
259 */
260
261 // #define MTDEC_ProgessStep (1 << 22)
262 #define MTDEC_ProgessStep (1 << 0)
263
ThreadFunc2(CMtDecThread * t)264 static WRes ThreadFunc2(CMtDecThread *t)
265 {
266 CMtDec *p = t->mtDec;
267
268 PRF_STR_INT("ThreadFunc2", t->index);
269
270 // SetThreadAffinityMask(GetCurrentThread(), 1 << t->index);
271
272 for (;;)
273 {
274 SRes res, codeRes;
275 BoolInt wasInterrupted, isAllocError, overflow, finish;
276 SRes threadingErrorSRes;
277 BoolInt needCode, needWrite, needContinue;
278
279 size_t inDataSize_Start;
280 UInt64 inDataSize;
281 // UInt64 inDataSize_Full;
282
283 UInt64 blockIndex;
284
285 UInt64 inPrev = 0;
286 UInt64 outPrev = 0;
287 UInt64 inCodePos;
288 UInt64 outCodePos;
289
290 Byte *afterEndData = NULL;
291 size_t afterEndData_Size = 0;
292 BoolInt afterEndData_IsCross = False;
293
294 BoolInt canCreateNewThread = False;
295 // CMtDecCallbackInfo parse;
296 CMtDecThread *nextThread;
297
298 PRF_STR_INT("=============== Event_Wait(&t->canRead)", t->index);
299
300 RINOK_THREAD(Event_Wait(&t->canRead));
301 if (p->exitThread)
302 return 0;
303
304 PRF_STR_INT("after Event_Wait(&t->canRead)", t->index);
305
306 // if (t->index == 3) return 19; // for test
307
308 blockIndex = p->blockIndex++;
309
310 // PRF(printf("\ncanRead\n"))
311
312 res = MtDec_Progress_GetError_Spec(p, 0, 0, blockIndex, &wasInterrupted);
313
314 finish = p->readWasFinished;
315 needCode = False;
316 needWrite = False;
317 isAllocError = False;
318 overflow = False;
319
320 inDataSize_Start = 0;
321 inDataSize = 0;
322 // inDataSize_Full = 0;
323
324 if (res == SZ_OK && !wasInterrupted)
325 {
326 // if (p->inStream)
327 {
328 CMtDecBufLink *prev = NULL;
329 CMtDecBufLink *link = (CMtDecBufLink *)t->inBuf;
330 size_t crossSize = p->crossEnd - p->crossStart;
331
332 PRF(printf("\ncrossSize = %d\n", crossSize));
333
334 for (;;)
335 {
336 if (!link)
337 {
338 link = (CMtDecBufLink *)ISzAlloc_Alloc(p->alloc, MTDEC__LINK_DATA_OFFSET + p->inBufSize);
339 if (!link)
340 {
341 finish = True;
342 // p->allocError_for_Read_BlockIndex = blockIndex;
343 isAllocError = True;
344 break;
345 }
346 link->next = NULL;
347 if (prev)
348 {
349 // static unsigned g_num = 0;
350 // printf("\n%6d : %x", ++g_num, (unsigned)(size_t)((Byte *)link - (Byte *)prev));
351 prev->next = link;
352 }
353 else
354 t->inBuf = (void *)link;
355 }
356
357 {
358 Byte *data = MTDEC__DATA_PTR_FROM_LINK(link);
359 Byte *parseData = data;
360 size_t size;
361
362 if (crossSize != 0)
363 {
364 inDataSize = crossSize;
365 // inDataSize_Full = inDataSize;
366 inDataSize_Start = crossSize;
367 size = crossSize;
368 parseData = MTDEC__DATA_PTR_FROM_LINK(p->crossBlock) + p->crossStart;
369 PRF(printf("\ncross : crossStart = %7d crossEnd = %7d finish = %1d",
370 (int)p->crossStart, (int)p->crossEnd, (int)finish));
371 }
372 else
373 {
374 size = p->inBufSize;
375
376 res = FullRead(p->inStream, data, &size);
377
378 // size = 10; // test
379
380 inDataSize += size;
381 // inDataSize_Full = inDataSize;
382 if (!prev)
383 inDataSize_Start = size;
384
385 p->readProcessed += size;
386 finish = (size != p->inBufSize);
387 if (finish)
388 p->readWasFinished = True;
389
390 // res = E_INVALIDARG; // test
391
392 if (res != SZ_OK)
393 {
394 // PRF(printf("\nRead error = %d\n", res))
395 // we want to decode all data before error
396 p->readRes = res;
397 // p->readError_BlockIndex = blockIndex;
398 p->readWasFinished = True;
399 finish = True;
400 res = SZ_OK;
401 // break;
402 }
403
404 if (inDataSize - inPrev >= MTDEC_ProgessStep)
405 {
406 res = MtDec_Progress_GetError_Spec(p, 0, 0, blockIndex, &wasInterrupted);
407 if (res != SZ_OK || wasInterrupted)
408 break;
409 inPrev = inDataSize;
410 }
411 }
412
413 {
414 CMtDecCallbackInfo parse;
415
416 parse.startCall = (prev == NULL);
417 parse.src = parseData;
418 parse.srcSize = size;
419 parse.srcFinished = finish;
420 parse.canCreateNewThread = True;
421
422 PRF(printf("\nParse size = %d\n", (unsigned)size));
423
424 p->mtCallback->Parse(p->mtCallbackObject, t->index, &parse);
425
426 PRF(printf(" Parse processed = %d, state = %d \n", (unsigned)parse.srcSize, (unsigned)parse.state));
427
428 needWrite = True;
429 canCreateNewThread = parse.canCreateNewThread;
430
431 // printf("\n\n%12I64u %12I64u", (UInt64)p->mtProgress.totalInSize, (UInt64)p->mtProgress.totalOutSize);
432
433 if (
434 // parseRes != SZ_OK ||
435 // inDataSize - (size - parse.srcSize) > p->inBlockMax
436 // ||
437 parse.state == MTDEC_PARSE_OVERFLOW
438 // || wasInterrupted
439 )
440 {
441 // Overflow or Parse error - switch from MT decoding to ST decoding
442 finish = True;
443 overflow = True;
444
445 {
446 PRF(printf("\n Overflow"));
447 // PRF(printf("\nisBlockFinished = %d", (unsigned)parse.blockWasFinished));
448 PRF(printf("\n inDataSize = %d", (unsigned)inDataSize));
449 }
450
451 if (crossSize != 0)
452 memcpy(data, parseData, size);
453 p->crossStart = 0;
454 p->crossEnd = 0;
455 break;
456 }
457
458 if (crossSize != 0)
459 {
460 memcpy(data, parseData, parse.srcSize);
461 p->crossStart += parse.srcSize;
462 }
463
464 if (parse.state != MTDEC_PARSE_CONTINUE || finish)
465 {
466 // we don't need to parse in current thread anymore
467
468 if (parse.state == MTDEC_PARSE_END)
469 finish = True;
470
471 needCode = True;
472 // p->crossFinished = finish;
473
474 if (parse.srcSize == size)
475 {
476 // full parsed - no cross transfer
477 p->crossStart = 0;
478 p->crossEnd = 0;
479 break;
480 }
481
482 if (parse.state == MTDEC_PARSE_END)
483 {
484 afterEndData = parseData + parse.srcSize;
485 afterEndData_Size = size - parse.srcSize;
486 if (crossSize != 0)
487 afterEndData_IsCross = True;
488 // we reduce data size to required bytes (parsed only)
489 inDataSize -= afterEndData_Size;
490 if (!prev)
491 inDataSize_Start = parse.srcSize;
492 break;
493 }
494
495 {
496 // partial parsed - need cross transfer
497 if (crossSize != 0)
498 inDataSize = parse.srcSize; // it's only parsed now
499 else
500 {
501 // partial parsed - is not in initial cross block - we need to copy new data to cross block
502 Byte *cr = MtDec_GetCrossBuff(p);
503 if (!cr)
504 {
505 {
506 PRF(printf("\ncross alloc error error\n"));
507 // res = SZ_ERROR_MEM;
508 finish = True;
509 // p->allocError_for_Read_BlockIndex = blockIndex;
510 isAllocError = True;
511 break;
512 }
513 }
514
515 {
516 size_t crSize = size - parse.srcSize;
517 inDataSize -= crSize;
518 p->crossEnd = crSize;
519 p->crossStart = 0;
520 memcpy(cr, parseData + parse.srcSize, crSize);
521 }
522 }
523
524 // inDataSize_Full = inDataSize;
525 if (!prev)
526 inDataSize_Start = parse.srcSize; // it's partial size (parsed only)
527
528 finish = False;
529 break;
530 }
531 }
532
533 if (parse.srcSize != size)
534 {
535 res = SZ_ERROR_FAIL;
536 PRF(printf("\nfinished error SZ_ERROR_FAIL = %d\n", res));
537 break;
538 }
539 }
540 }
541
542 prev = link;
543 link = link->next;
544
545 if (crossSize != 0)
546 {
547 crossSize = 0;
548 p->crossStart = 0;
549 p->crossEnd = 0;
550 }
551 }
552 }
553
554 if (res == SZ_OK)
555 res = MtDec_GetError_Spec(p, blockIndex, &wasInterrupted);
556 }
557
558 codeRes = SZ_OK;
559
560 if (res == SZ_OK && needCode && !wasInterrupted)
561 {
562 codeRes = p->mtCallback->PreCode(p->mtCallbackObject, t->index);
563 if (codeRes != SZ_OK)
564 {
565 needCode = False;
566 finish = True;
567 // SZ_ERROR_MEM is expected error here.
568 // if (codeRes == SZ_ERROR_MEM) - we will try single-thread decoding later.
569 // if (codeRes != SZ_ERROR_MEM) - we can stop decoding or try single-thread decoding.
570 }
571 }
572
573 if (res != SZ_OK || wasInterrupted)
574 finish = True;
575
576 nextThread = NULL;
577 threadingErrorSRes = SZ_OK;
578
579 if (!finish)
580 {
581 if (p->numStartedThreads < p->numStartedThreads_Limit && canCreateNewThread)
582 {
583 SRes res2 = MtDecThread_CreateAndStart(&p->threads[p->numStartedThreads]);
584 if (res2 == SZ_OK)
585 {
586 // if (p->numStartedThreads % 1000 == 0) PRF(printf("\n numStartedThreads=%d\n", p->numStartedThreads));
587 p->numStartedThreads++;
588 }
589 else
590 {
591 PRF(printf("\nERROR: numStartedThreads=%d\n", p->numStartedThreads));
592 if (p->numStartedThreads == 1)
593 {
594 // if only one thread is possible, we leave muti-threading code
595 finish = True;
596 needCode = False;
597 threadingErrorSRes = res2;
598 }
599 else
600 p->numStartedThreads_Limit = p->numStartedThreads;
601 }
602 }
603
604 if (!finish)
605 {
606 unsigned nextIndex = t->index + 1;
607 nextThread = &p->threads[nextIndex >= p->numStartedThreads ? 0 : nextIndex];
608 RINOK_THREAD(Event_Set(&nextThread->canRead))
609 // We have started executing for new iteration (with next thread)
610 // And that next thread now is responsible for possible exit from decoding (threading_code)
611 }
612 }
613
614 // each call of Event_Set(&nextThread->canRead) must be followed by call of Event_Set(&nextThread->canWrite)
615 // if ( !finish ) we must call Event_Set(&nextThread->canWrite) in any case
616 // if ( finish ) we switch to single-thread mode and there are 2 ways at the end of current iteration (current block):
617 // - if (needContinue) after Write(&needContinue), we restore decoding with new iteration
618 // - otherwise we stop decoding and exit from ThreadFunc2()
619
620 // Don't change (finish) variable in the further code
621
622
623 // ---------- CODE ----------
624
625 inPrev = 0;
626 outPrev = 0;
627 inCodePos = 0;
628 outCodePos = 0;
629
630 if (res == SZ_OK && needCode && codeRes == SZ_OK)
631 {
632 BoolInt isStartBlock = True;
633 CMtDecBufLink *link = (CMtDecBufLink *)t->inBuf;
634
635 for (;;)
636 {
637 size_t inSize;
638 int stop;
639
640 if (isStartBlock)
641 inSize = inDataSize_Start;
642 else
643 {
644 UInt64 rem = inDataSize - inCodePos;
645 inSize = p->inBufSize;
646 if (inSize > rem)
647 inSize = (size_t)rem;
648 }
649
650 inCodePos += inSize;
651 stop = True;
652
653 codeRes = p->mtCallback->Code(p->mtCallbackObject, t->index,
654 (const Byte *)MTDEC__DATA_PTR_FROM_LINK(link), inSize,
655 (inCodePos == inDataSize), // srcFinished
656 &inCodePos, &outCodePos, &stop);
657
658 if (codeRes != SZ_OK)
659 {
660 PRF(printf("\nCode Interrupt error = %x\n", codeRes));
661 // we interrupt only later blocks
662 MtDec_Interrupt(p, blockIndex);
663 break;
664 }
665
666 if (stop || inCodePos == inDataSize)
667 break;
668
669 {
670 const UInt64 inDelta = inCodePos - inPrev;
671 const UInt64 outDelta = outCodePos - outPrev;
672 if (inDelta >= MTDEC_ProgessStep || outDelta >= MTDEC_ProgessStep)
673 {
674 // Sleep(1);
675 res = MtDec_Progress_GetError_Spec(p, inDelta, outDelta, blockIndex, &wasInterrupted);
676 if (res != SZ_OK || wasInterrupted)
677 break;
678 inPrev = inCodePos;
679 outPrev = outCodePos;
680 }
681 }
682
683 link = link->next;
684 isStartBlock = False;
685 }
686 }
687
688
689 // ---------- WRITE ----------
690
691 RINOK_THREAD(Event_Wait(&t->canWrite));
692
693 {
694 BoolInt isErrorMode = False;
695 BoolInt canRecode = True;
696 BoolInt needWriteToStream = needWrite;
697
698 if (p->exitThread) return 0; // it's never executed in normal cases
699
700 if (p->wasInterrupted)
701 wasInterrupted = True;
702 else
703 {
704 if (codeRes != SZ_OK) // || !needCode // check it !!!
705 {
706 p->wasInterrupted = True;
707 p->codeRes = codeRes;
708 if (codeRes == SZ_ERROR_MEM)
709 isAllocError = True;
710 }
711
712 if (threadingErrorSRes)
713 {
714 p->wasInterrupted = True;
715 p->threadingErrorSRes = threadingErrorSRes;
716 needWriteToStream = False;
717 }
718 if (isAllocError)
719 {
720 p->wasInterrupted = True;
721 p->isAllocError = True;
722 needWriteToStream = False;
723 }
724 if (overflow)
725 {
726 p->wasInterrupted = True;
727 p->overflow = True;
728 needWriteToStream = False;
729 }
730 }
731
732 if (needCode)
733 {
734 if (wasInterrupted)
735 {
736 inCodePos = 0;
737 outCodePos = 0;
738 }
739 {
740 const UInt64 inDelta = inCodePos - inPrev;
741 const UInt64 outDelta = outCodePos - outPrev;
742 // if (inDelta != 0 || outDelta != 0)
743 res = MtProgress_ProgressAdd(&p->mtProgress, inDelta, outDelta);
744 }
745 }
746
747 needContinue = (!finish);
748
749 // if (res == SZ_OK && needWrite && !wasInterrupted)
750 if (needWrite)
751 {
752 // p->inProcessed += inCodePos;
753
754 PRF(printf("\n--Write afterSize = %d\n", (unsigned)afterEndData_Size));
755
756 res = p->mtCallback->Write(p->mtCallbackObject, t->index,
757 res == SZ_OK && needWriteToStream && !wasInterrupted, // needWrite
758 afterEndData, afterEndData_Size, afterEndData_IsCross,
759 &needContinue,
760 &canRecode);
761
762 // res = SZ_ERROR_FAIL; // for test
763
764 PRF(printf("\nAfter Write needContinue = %d\n", (unsigned)needContinue));
765 PRF(printf("\nprocessed = %d\n", (unsigned)p->inProcessed));
766
767 if (res != SZ_OK)
768 {
769 PRF(printf("\nWrite error = %d\n", res));
770 isErrorMode = True;
771 p->wasInterrupted = True;
772 }
773 if (res != SZ_OK
774 || (!needContinue && !finish))
775 {
776 PRF(printf("\nWrite Interrupt error = %x\n", res));
777 MtDec_Interrupt(p, blockIndex);
778 }
779 }
780
781 if (canRecode)
782 if (!needCode
783 || res != SZ_OK
784 || p->wasInterrupted
785 || codeRes != SZ_OK
786 || wasInterrupted
787 || p->numFilledThreads != 0
788 || isErrorMode)
789 {
790 if (p->numFilledThreads == 0)
791 p->filledThreadStart = t->index;
792 if (inDataSize != 0 || !finish)
793 {
794 t->inDataSize_Start = inDataSize_Start;
795 t->inDataSize = inDataSize;
796 p->numFilledThreads++;
797 }
798 PRF(printf("\np->numFilledThreads = %d\n", p->numFilledThreads));
799 PRF(printf("p->filledThreadStart = %d\n", p->filledThreadStart));
800 }
801
802 if (!finish)
803 {
804 RINOK_THREAD(Event_Set(&nextThread->canWrite));
805 }
806 else
807 {
808 if (needContinue)
809 {
810 // we restore decoding with new iteration
811 RINOK_THREAD(Event_Set(&p->threads[0].canWrite));
812 }
813 else
814 {
815 // we exit from decoding
816 if (t->index == 0)
817 return SZ_OK;
818 p->exitThread = True;
819 }
820 RINOK_THREAD(Event_Set(&p->threads[0].canRead));
821 }
822 }
823 }
824 }
825
826 #ifdef _WIN32
827 #define USE_ALLOCA
828 #endif
829
830 #ifdef USE_ALLOCA
831 #ifdef _WIN32
832 #include <malloc.h>
833 #else
834 #include <stdlib.h>
835 #endif
836 #endif
837
838
ThreadFunc1(void * pp)839 static THREAD_FUNC_DECL ThreadFunc1(void *pp)
840 {
841 WRes res;
842
843 CMtDecThread *t = (CMtDecThread *)pp;
844 CMtDec *p;
845
846 // fprintf(stdout, "\n%d = %p\n", t->index, &t);
847
848 res = ThreadFunc2(t);
849 p = t->mtDec;
850 if (res == 0)
851 return (THREAD_FUNC_RET_TYPE)(UINT_PTR)p->exitThreadWRes;
852 {
853 // it's unexpected situation for some threading function error
854 if (p->exitThreadWRes == 0)
855 p->exitThreadWRes = res;
856 PRF(printf("\nthread exit error = %d\n", res));
857 p->exitThread = True;
858 Event_Set(&p->threads[0].canRead);
859 Event_Set(&p->threads[0].canWrite);
860 MtProgress_SetError(&p->mtProgress, MY_SRes_HRESULT_FROM_WRes(res));
861 }
862 return (THREAD_FUNC_RET_TYPE)(UINT_PTR)res;
863 }
864
ThreadFunc(void * pp)865 static MY_NO_INLINE THREAD_FUNC_DECL ThreadFunc(void *pp)
866 {
867 #ifdef USE_ALLOCA
868 CMtDecThread *t = (CMtDecThread *)pp;
869 // fprintf(stderr, "\n%d = %p - before", t->index, &t);
870 t->allocaPtr = alloca(t->index * 128);
871 #endif
872 return ThreadFunc1(pp);
873 }
874
875
MtDec_PrepareRead(CMtDec * p)876 int MtDec_PrepareRead(CMtDec *p)
877 {
878 if (p->crossBlock && p->crossStart == p->crossEnd)
879 {
880 ISzAlloc_Free(p->alloc, p->crossBlock);
881 p->crossBlock = NULL;
882 }
883
884 {
885 unsigned i;
886 for (i = 0; i < MTDEC__THREADS_MAX; i++)
887 if (i > p->numStartedThreads
888 || p->numFilledThreads <=
889 (i >= p->filledThreadStart ?
890 i - p->filledThreadStart :
891 i + p->numStartedThreads - p->filledThreadStart))
892 MtDecThread_FreeInBufs(&p->threads[i]);
893 }
894
895 return (p->numFilledThreads != 0) || (p->crossStart != p->crossEnd);
896 }
897
898
MtDec_Read(CMtDec * p,size_t * inLim)899 const Byte *MtDec_Read(CMtDec *p, size_t *inLim)
900 {
901 while (p->numFilledThreads != 0)
902 {
903 CMtDecThread *t = &p->threads[p->filledThreadStart];
904
905 if (*inLim != 0)
906 {
907 {
908 void *link = t->inBuf;
909 void *next = ((CMtDecBufLink *)link)->next;
910 ISzAlloc_Free(p->alloc, link);
911 t->inBuf = next;
912 }
913
914 if (t->inDataSize == 0)
915 {
916 MtDecThread_FreeInBufs(t);
917 if (--p->numFilledThreads == 0)
918 break;
919 if (++p->filledThreadStart == p->numStartedThreads)
920 p->filledThreadStart = 0;
921 t = &p->threads[p->filledThreadStart];
922 }
923 }
924
925 {
926 size_t lim = t->inDataSize_Start;
927 if (lim != 0)
928 t->inDataSize_Start = 0;
929 else
930 {
931 UInt64 rem = t->inDataSize;
932 lim = p->inBufSize;
933 if (lim > rem)
934 lim = (size_t)rem;
935 }
936 t->inDataSize -= lim;
937 *inLim = lim;
938 return (const Byte *)MTDEC__DATA_PTR_FROM_LINK(t->inBuf);
939 }
940 }
941
942 {
943 size_t crossSize = p->crossEnd - p->crossStart;
944 if (crossSize != 0)
945 {
946 const Byte *data = MTDEC__DATA_PTR_FROM_LINK(p->crossBlock) + p->crossStart;
947 *inLim = crossSize;
948 p->crossStart = 0;
949 p->crossEnd = 0;
950 return data;
951 }
952 *inLim = 0;
953 if (p->crossBlock)
954 {
955 ISzAlloc_Free(p->alloc, p->crossBlock);
956 p->crossBlock = NULL;
957 }
958 return NULL;
959 }
960 }
961
962
MtDec_Construct(CMtDec * p)963 void MtDec_Construct(CMtDec *p)
964 {
965 unsigned i;
966
967 p->inBufSize = (size_t)1 << 18;
968
969 p->numThreadsMax = 0;
970
971 p->inStream = NULL;
972
973 // p->inData = NULL;
974 // p->inDataSize = 0;
975
976 p->crossBlock = NULL;
977 p->crossStart = 0;
978 p->crossEnd = 0;
979
980 p->numFilledThreads = 0;
981
982 p->progress = NULL;
983 p->alloc = NULL;
984
985 p->mtCallback = NULL;
986 p->mtCallbackObject = NULL;
987
988 p->allocatedBufsSize = 0;
989
990 for (i = 0; i < MTDEC__THREADS_MAX; i++)
991 {
992 CMtDecThread *t = &p->threads[i];
993 t->mtDec = p;
994 t->index = i;
995 t->inBuf = NULL;
996 Event_Construct(&t->canRead);
997 Event_Construct(&t->canWrite);
998 Thread_Construct(&t->thread);
999 }
1000
1001 // Event_Construct(&p->finishedEvent);
1002
1003 CriticalSection_Init(&p->mtProgress.cs);
1004 }
1005
1006
MtDec_Free(CMtDec * p)1007 static void MtDec_Free(CMtDec *p)
1008 {
1009 unsigned i;
1010
1011 p->exitThread = True;
1012
1013 for (i = 0; i < MTDEC__THREADS_MAX; i++)
1014 MtDecThread_Destruct(&p->threads[i]);
1015
1016 // Event_Close(&p->finishedEvent);
1017
1018 if (p->crossBlock)
1019 {
1020 ISzAlloc_Free(p->alloc, p->crossBlock);
1021 p->crossBlock = NULL;
1022 }
1023 }
1024
1025
MtDec_Destruct(CMtDec * p)1026 void MtDec_Destruct(CMtDec *p)
1027 {
1028 MtDec_Free(p);
1029
1030 CriticalSection_Delete(&p->mtProgress.cs);
1031 }
1032
1033
MtDec_Code(CMtDec * p)1034 SRes MtDec_Code(CMtDec *p)
1035 {
1036 unsigned i;
1037
1038 p->inProcessed = 0;
1039
1040 p->blockIndex = 1; // it must be larger than not_defined index (0)
1041 p->isAllocError = False;
1042 p->overflow = False;
1043 p->threadingErrorSRes = SZ_OK;
1044
1045 p->needContinue = True;
1046
1047 p->readWasFinished = False;
1048 p->needInterrupt = False;
1049 p->interruptIndex = (UInt64)(Int64)-1;
1050
1051 p->readProcessed = 0;
1052 p->readRes = SZ_OK;
1053 p->codeRes = SZ_OK;
1054 p->wasInterrupted = False;
1055
1056 p->crossStart = 0;
1057 p->crossEnd = 0;
1058
1059 p->filledThreadStart = 0;
1060 p->numFilledThreads = 0;
1061
1062 {
1063 unsigned numThreads = p->numThreadsMax;
1064 if (numThreads > MTDEC__THREADS_MAX)
1065 numThreads = MTDEC__THREADS_MAX;
1066 p->numStartedThreads_Limit = numThreads;
1067 p->numStartedThreads = 0;
1068 }
1069
1070 if (p->inBufSize != p->allocatedBufsSize)
1071 {
1072 for (i = 0; i < MTDEC__THREADS_MAX; i++)
1073 {
1074 CMtDecThread *t = &p->threads[i];
1075 if (t->inBuf)
1076 MtDecThread_FreeInBufs(t);
1077 }
1078 if (p->crossBlock)
1079 {
1080 ISzAlloc_Free(p->alloc, p->crossBlock);
1081 p->crossBlock = NULL;
1082 }
1083
1084 p->allocatedBufsSize = p->inBufSize;
1085 }
1086
1087 MtProgress_Init(&p->mtProgress, p->progress);
1088
1089 // RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->finishedEvent));
1090 p->exitThread = False;
1091 p->exitThreadWRes = 0;
1092
1093 {
1094 WRes wres;
1095 SRes sres;
1096 CMtDecThread *nextThread = &p->threads[p->numStartedThreads++];
1097 // wres = MtDecThread_CreateAndStart(nextThread);
1098 wres = MtDecThread_CreateEvents(nextThread);
1099 if (wres == 0) { wres = Event_Set(&nextThread->canWrite);
1100 if (wres == 0) { wres = Event_Set(&nextThread->canRead);
1101 if (wres == 0) { THREAD_FUNC_RET_TYPE res = ThreadFunc(nextThread);
1102 wres = (WRes)(UINT_PTR)res;
1103 if (wres != 0)
1104 {
1105 p->needContinue = False;
1106 MtDec_CloseThreads(p);
1107 }}}}
1108
1109 // wres = 17; // for test
1110 // wres = Event_Wait(&p->finishedEvent);
1111
1112 sres = MY_SRes_HRESULT_FROM_WRes(wres);
1113
1114 if (sres != 0)
1115 p->threadingErrorSRes = sres;
1116
1117 if (
1118 // wres == 0
1119 // wres != 0
1120 // || p->mtc.codeRes == SZ_ERROR_MEM
1121 p->isAllocError
1122 || p->threadingErrorSRes != SZ_OK
1123 || p->overflow)
1124 {
1125 // p->needContinue = True;
1126 }
1127 else
1128 p->needContinue = False;
1129
1130 if (p->needContinue)
1131 return SZ_OK;
1132
1133 // if (sres != SZ_OK)
1134 return sres;
1135 // return SZ_ERROR_FAIL;
1136 }
1137 }
1138
1139 #endif
1140