1 /* MtCoder.c -- Multi-thread Coder
2 2021-12-21 : Igor Pavlov : Public domain */
3
4 #include "Precomp.h"
5
6 #include "MtCoder.h"
7
8 #ifndef _7ZIP_ST
9
MtProgressThunk_Progress(const ICompressProgress * pp,UInt64 inSize,UInt64 outSize)10 static SRes MtProgressThunk_Progress(const ICompressProgress *pp, UInt64 inSize, UInt64 outSize)
11 {
12 CMtProgressThunk *thunk = CONTAINER_FROM_VTBL(pp, CMtProgressThunk, vt);
13 UInt64 inSize2 = 0;
14 UInt64 outSize2 = 0;
15 if (inSize != (UInt64)(Int64)-1)
16 {
17 inSize2 = inSize - thunk->inSize;
18 thunk->inSize = inSize;
19 }
20 if (outSize != (UInt64)(Int64)-1)
21 {
22 outSize2 = outSize - thunk->outSize;
23 thunk->outSize = outSize;
24 }
25 return MtProgress_ProgressAdd(thunk->mtProgress, inSize2, outSize2);
26 }
27
28
MtProgressThunk_CreateVTable(CMtProgressThunk * p)29 void MtProgressThunk_CreateVTable(CMtProgressThunk *p)
30 {
31 p->vt.Progress = MtProgressThunk_Progress;
32 }
33
34
35
36 #define RINOK_THREAD(x) { if ((x) != 0) return SZ_ERROR_THREAD; }
37
38
ArEvent_OptCreate_And_Reset(CEvent * p)39 static WRes ArEvent_OptCreate_And_Reset(CEvent *p)
40 {
41 if (Event_IsCreated(p))
42 return Event_Reset(p);
43 return AutoResetEvent_CreateNotSignaled(p);
44 }
45
46
47 static THREAD_FUNC_DECL ThreadFunc(void *pp);
48
49
MtCoderThread_CreateAndStart(CMtCoderThread * t)50 static SRes MtCoderThread_CreateAndStart(CMtCoderThread *t)
51 {
52 WRes wres = ArEvent_OptCreate_And_Reset(&t->startEvent);
53 if (wres == 0)
54 {
55 t->stop = False;
56 if (!Thread_WasCreated(&t->thread))
57 wres = Thread_Create(&t->thread, ThreadFunc, t);
58 if (wres == 0)
59 wres = Event_Set(&t->startEvent);
60 }
61 if (wres == 0)
62 return SZ_OK;
63 return MY_SRes_HRESULT_FROM_WRes(wres);
64 }
65
66
MtCoderThread_Destruct(CMtCoderThread * t)67 static void MtCoderThread_Destruct(CMtCoderThread *t)
68 {
69 if (Thread_WasCreated(&t->thread))
70 {
71 t->stop = 1;
72 Event_Set(&t->startEvent);
73 Thread_Wait_Close(&t->thread);
74 }
75
76 Event_Close(&t->startEvent);
77
78 if (t->inBuf)
79 {
80 ISzAlloc_Free(t->mtCoder->allocBig, t->inBuf);
81 t->inBuf = NULL;
82 }
83 }
84
85
86
FullRead(ISeqInStream * stream,Byte * data,size_t * processedSize)87 static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize)
88 {
89 size_t size = *processedSize;
90 *processedSize = 0;
91 while (size != 0)
92 {
93 size_t cur = size;
94 SRes res = ISeqInStream_Read(stream, data, &cur);
95 *processedSize += cur;
96 data += cur;
97 size -= cur;
98 RINOK(res);
99 if (cur == 0)
100 return SZ_OK;
101 }
102 return SZ_OK;
103 }
104
105
106 /*
107 ThreadFunc2() returns:
108 SZ_OK - in all normal cases (even for stream error or memory allocation error)
109 SZ_ERROR_THREAD - in case of failure in system synch function
110 */
111
ThreadFunc2(CMtCoderThread * t)112 static SRes ThreadFunc2(CMtCoderThread *t)
113 {
114 CMtCoder *mtc = t->mtCoder;
115
116 for (;;)
117 {
118 unsigned bi;
119 SRes res;
120 SRes res2;
121 BoolInt finished;
122 unsigned bufIndex;
123 size_t size;
124 const Byte *inData;
125 UInt64 readProcessed = 0;
126
127 RINOK_THREAD(Event_Wait(&mtc->readEvent))
128
129 /* after Event_Wait(&mtc->readEvent) we must call Event_Set(&mtc->readEvent) in any case to unlock another threads */
130
131 if (mtc->stopReading)
132 {
133 return Event_Set(&mtc->readEvent) == 0 ? SZ_OK : SZ_ERROR_THREAD;
134 }
135
136 res = MtProgress_GetError(&mtc->mtProgress);
137
138 size = 0;
139 inData = NULL;
140 finished = True;
141
142 if (res == SZ_OK)
143 {
144 size = mtc->blockSize;
145 if (mtc->inStream)
146 {
147 if (!t->inBuf)
148 {
149 t->inBuf = (Byte *)ISzAlloc_Alloc(mtc->allocBig, mtc->blockSize);
150 if (!t->inBuf)
151 res = SZ_ERROR_MEM;
152 }
153 if (res == SZ_OK)
154 {
155 res = FullRead(mtc->inStream, t->inBuf, &size);
156 readProcessed = mtc->readProcessed + size;
157 mtc->readProcessed = readProcessed;
158 }
159 if (res != SZ_OK)
160 {
161 mtc->readRes = res;
162 /* after reading error - we can stop encoding of previous blocks */
163 MtProgress_SetError(&mtc->mtProgress, res);
164 }
165 else
166 finished = (size != mtc->blockSize);
167 }
168 else
169 {
170 size_t rem;
171 readProcessed = mtc->readProcessed;
172 rem = mtc->inDataSize - (size_t)readProcessed;
173 if (size > rem)
174 size = rem;
175 inData = mtc->inData + (size_t)readProcessed;
176 readProcessed += size;
177 mtc->readProcessed = readProcessed;
178 finished = (mtc->inDataSize == (size_t)readProcessed);
179 }
180 }
181
182 /* we must get some block from blocksSemaphore before Event_Set(&mtc->readEvent) */
183
184 res2 = SZ_OK;
185
186 if (Semaphore_Wait(&mtc->blocksSemaphore) != 0)
187 {
188 res2 = SZ_ERROR_THREAD;
189 if (res == SZ_OK)
190 {
191 res = res2;
192 // MtProgress_SetError(&mtc->mtProgress, res);
193 }
194 }
195
196 bi = mtc->blockIndex;
197
198 if (++mtc->blockIndex >= mtc->numBlocksMax)
199 mtc->blockIndex = 0;
200
201 bufIndex = (unsigned)(int)-1;
202
203 if (res == SZ_OK)
204 res = MtProgress_GetError(&mtc->mtProgress);
205
206 if (res != SZ_OK)
207 finished = True;
208
209 if (!finished)
210 {
211 if (mtc->numStartedThreads < mtc->numStartedThreadsLimit
212 && mtc->expectedDataSize != readProcessed)
213 {
214 res = MtCoderThread_CreateAndStart(&mtc->threads[mtc->numStartedThreads]);
215 if (res == SZ_OK)
216 mtc->numStartedThreads++;
217 else
218 {
219 MtProgress_SetError(&mtc->mtProgress, res);
220 finished = True;
221 }
222 }
223 }
224
225 if (finished)
226 mtc->stopReading = True;
227
228 RINOK_THREAD(Event_Set(&mtc->readEvent))
229
230 if (res2 != SZ_OK)
231 return res2;
232
233 if (res == SZ_OK)
234 {
235 CriticalSection_Enter(&mtc->cs);
236 bufIndex = mtc->freeBlockHead;
237 mtc->freeBlockHead = mtc->freeBlockList[bufIndex];
238 CriticalSection_Leave(&mtc->cs);
239
240 res = mtc->mtCallback->Code(mtc->mtCallbackObject, t->index, bufIndex,
241 mtc->inStream ? t->inBuf : inData, size, finished);
242
243 // MtProgress_Reinit(&mtc->mtProgress, t->index);
244
245 if (res != SZ_OK)
246 MtProgress_SetError(&mtc->mtProgress, res);
247 }
248
249 {
250 CMtCoderBlock *block = &mtc->blocks[bi];
251 block->res = res;
252 block->bufIndex = bufIndex;
253 block->finished = finished;
254 }
255
256 #ifdef MTCODER__USE_WRITE_THREAD
257 RINOK_THREAD(Event_Set(&mtc->writeEvents[bi]))
258 #else
259 {
260 unsigned wi;
261 {
262 CriticalSection_Enter(&mtc->cs);
263 wi = mtc->writeIndex;
264 if (wi == bi)
265 mtc->writeIndex = (unsigned)(int)-1;
266 else
267 mtc->ReadyBlocks[bi] = True;
268 CriticalSection_Leave(&mtc->cs);
269 }
270
271 if (wi != bi)
272 {
273 if (res != SZ_OK || finished)
274 return 0;
275 continue;
276 }
277
278 if (mtc->writeRes != SZ_OK)
279 res = mtc->writeRes;
280
281 for (;;)
282 {
283 if (res == SZ_OK && bufIndex != (unsigned)(int)-1)
284 {
285 res = mtc->mtCallback->Write(mtc->mtCallbackObject, bufIndex);
286 if (res != SZ_OK)
287 {
288 mtc->writeRes = res;
289 MtProgress_SetError(&mtc->mtProgress, res);
290 }
291 }
292
293 if (++wi >= mtc->numBlocksMax)
294 wi = 0;
295 {
296 BoolInt isReady;
297
298 CriticalSection_Enter(&mtc->cs);
299
300 if (bufIndex != (unsigned)(int)-1)
301 {
302 mtc->freeBlockList[bufIndex] = mtc->freeBlockHead;
303 mtc->freeBlockHead = bufIndex;
304 }
305
306 isReady = mtc->ReadyBlocks[wi];
307
308 if (isReady)
309 mtc->ReadyBlocks[wi] = False;
310 else
311 mtc->writeIndex = wi;
312
313 CriticalSection_Leave(&mtc->cs);
314
315 RINOK_THREAD(Semaphore_Release1(&mtc->blocksSemaphore))
316
317 if (!isReady)
318 break;
319 }
320
321 {
322 CMtCoderBlock *block = &mtc->blocks[wi];
323 if (res == SZ_OK && block->res != SZ_OK)
324 res = block->res;
325 bufIndex = block->bufIndex;
326 finished = block->finished;
327 }
328 }
329 }
330 #endif
331
332 if (finished || res != SZ_OK)
333 return 0;
334 }
335 }
336
337
ThreadFunc(void * pp)338 static THREAD_FUNC_DECL ThreadFunc(void *pp)
339 {
340 CMtCoderThread *t = (CMtCoderThread *)pp;
341 for (;;)
342 {
343 if (Event_Wait(&t->startEvent) != 0)
344 return (THREAD_FUNC_RET_TYPE)SZ_ERROR_THREAD;
345 if (t->stop)
346 return 0;
347 {
348 SRes res = ThreadFunc2(t);
349 CMtCoder *mtc = t->mtCoder;
350 if (res != SZ_OK)
351 {
352 MtProgress_SetError(&mtc->mtProgress, res);
353 }
354
355 #ifndef MTCODER__USE_WRITE_THREAD
356 {
357 unsigned numFinished = (unsigned)InterlockedIncrement(&mtc->numFinishedThreads);
358 if (numFinished == mtc->numStartedThreads)
359 if (Event_Set(&mtc->finishedEvent) != 0)
360 return (THREAD_FUNC_RET_TYPE)SZ_ERROR_THREAD;
361 }
362 #endif
363 }
364 }
365 }
366
367
368
MtCoder_Construct(CMtCoder * p)369 void MtCoder_Construct(CMtCoder *p)
370 {
371 unsigned i;
372
373 p->blockSize = 0;
374 p->numThreadsMax = 0;
375 p->expectedDataSize = (UInt64)(Int64)-1;
376
377 p->inStream = NULL;
378 p->inData = NULL;
379 p->inDataSize = 0;
380
381 p->progress = NULL;
382 p->allocBig = NULL;
383
384 p->mtCallback = NULL;
385 p->mtCallbackObject = NULL;
386
387 p->allocatedBufsSize = 0;
388
389 Event_Construct(&p->readEvent);
390 Semaphore_Construct(&p->blocksSemaphore);
391
392 for (i = 0; i < MTCODER__THREADS_MAX; i++)
393 {
394 CMtCoderThread *t = &p->threads[i];
395 t->mtCoder = p;
396 t->index = i;
397 t->inBuf = NULL;
398 t->stop = False;
399 Event_Construct(&t->startEvent);
400 Thread_Construct(&t->thread);
401 }
402
403 #ifdef MTCODER__USE_WRITE_THREAD
404 for (i = 0; i < MTCODER__BLOCKS_MAX; i++)
405 Event_Construct(&p->writeEvents[i]);
406 #else
407 Event_Construct(&p->finishedEvent);
408 #endif
409
410 CriticalSection_Init(&p->cs);
411 CriticalSection_Init(&p->mtProgress.cs);
412 }
413
414
415
416
MtCoder_Free(CMtCoder * p)417 static void MtCoder_Free(CMtCoder *p)
418 {
419 unsigned i;
420
421 /*
422 p->stopReading = True;
423 if (Event_IsCreated(&p->readEvent))
424 Event_Set(&p->readEvent);
425 */
426
427 for (i = 0; i < MTCODER__THREADS_MAX; i++)
428 MtCoderThread_Destruct(&p->threads[i]);
429
430 Event_Close(&p->readEvent);
431 Semaphore_Close(&p->blocksSemaphore);
432
433 #ifdef MTCODER__USE_WRITE_THREAD
434 for (i = 0; i < MTCODER__BLOCKS_MAX; i++)
435 Event_Close(&p->writeEvents[i]);
436 #else
437 Event_Close(&p->finishedEvent);
438 #endif
439 }
440
441
MtCoder_Destruct(CMtCoder * p)442 void MtCoder_Destruct(CMtCoder *p)
443 {
444 MtCoder_Free(p);
445
446 CriticalSection_Delete(&p->cs);
447 CriticalSection_Delete(&p->mtProgress.cs);
448 }
449
450
MtCoder_Code(CMtCoder * p)451 SRes MtCoder_Code(CMtCoder *p)
452 {
453 unsigned numThreads = p->numThreadsMax;
454 unsigned numBlocksMax;
455 unsigned i;
456 SRes res = SZ_OK;
457
458 if (numThreads > MTCODER__THREADS_MAX)
459 numThreads = MTCODER__THREADS_MAX;
460 numBlocksMax = MTCODER__GET_NUM_BLOCKS_FROM_THREADS(numThreads);
461
462 if (p->blockSize < ((UInt32)1 << 26)) numBlocksMax++;
463 if (p->blockSize < ((UInt32)1 << 24)) numBlocksMax++;
464 if (p->blockSize < ((UInt32)1 << 22)) numBlocksMax++;
465
466 if (numBlocksMax > MTCODER__BLOCKS_MAX)
467 numBlocksMax = MTCODER__BLOCKS_MAX;
468
469 if (p->blockSize != p->allocatedBufsSize)
470 {
471 for (i = 0; i < MTCODER__THREADS_MAX; i++)
472 {
473 CMtCoderThread *t = &p->threads[i];
474 if (t->inBuf)
475 {
476 ISzAlloc_Free(p->allocBig, t->inBuf);
477 t->inBuf = NULL;
478 }
479 }
480 p->allocatedBufsSize = p->blockSize;
481 }
482
483 p->readRes = SZ_OK;
484
485 MtProgress_Init(&p->mtProgress, p->progress);
486
487 #ifdef MTCODER__USE_WRITE_THREAD
488 for (i = 0; i < numBlocksMax; i++)
489 {
490 RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->writeEvents[i]));
491 }
492 #else
493 RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->finishedEvent));
494 #endif
495
496 {
497 RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->readEvent));
498 RINOK_THREAD(Semaphore_OptCreateInit(&p->blocksSemaphore, numBlocksMax, numBlocksMax));
499 }
500
501 for (i = 0; i < MTCODER__BLOCKS_MAX - 1; i++)
502 p->freeBlockList[i] = i + 1;
503 p->freeBlockList[MTCODER__BLOCKS_MAX - 1] = (unsigned)(int)-1;
504 p->freeBlockHead = 0;
505
506 p->readProcessed = 0;
507 p->blockIndex = 0;
508 p->numBlocksMax = numBlocksMax;
509 p->stopReading = False;
510
511 #ifndef MTCODER__USE_WRITE_THREAD
512 p->writeIndex = 0;
513 p->writeRes = SZ_OK;
514 for (i = 0; i < MTCODER__BLOCKS_MAX; i++)
515 p->ReadyBlocks[i] = False;
516 p->numFinishedThreads = 0;
517 #endif
518
519 p->numStartedThreadsLimit = numThreads;
520 p->numStartedThreads = 0;
521
522 // for (i = 0; i < numThreads; i++)
523 {
524 CMtCoderThread *nextThread = &p->threads[p->numStartedThreads++];
525 RINOK(MtCoderThread_CreateAndStart(nextThread));
526 }
527
528 RINOK_THREAD(Event_Set(&p->readEvent))
529
530 #ifdef MTCODER__USE_WRITE_THREAD
531 {
532 unsigned bi = 0;
533
534 for (;; bi++)
535 {
536 if (bi >= numBlocksMax)
537 bi = 0;
538
539 RINOK_THREAD(Event_Wait(&p->writeEvents[bi]))
540
541 {
542 const CMtCoderBlock *block = &p->blocks[bi];
543 unsigned bufIndex = block->bufIndex;
544 BoolInt finished = block->finished;
545 if (res == SZ_OK && block->res != SZ_OK)
546 res = block->res;
547
548 if (bufIndex != (unsigned)(int)-1)
549 {
550 if (res == SZ_OK)
551 {
552 res = p->mtCallback->Write(p->mtCallbackObject, bufIndex);
553 if (res != SZ_OK)
554 MtProgress_SetError(&p->mtProgress, res);
555 }
556
557 CriticalSection_Enter(&p->cs);
558 {
559 p->freeBlockList[bufIndex] = p->freeBlockHead;
560 p->freeBlockHead = bufIndex;
561 }
562 CriticalSection_Leave(&p->cs);
563 }
564
565 RINOK_THREAD(Semaphore_Release1(&p->blocksSemaphore))
566
567 if (finished)
568 break;
569 }
570 }
571 }
572 #else
573 {
574 WRes wres = Event_Wait(&p->finishedEvent);
575 res = MY_SRes_HRESULT_FROM_WRes(wres);
576 }
577 #endif
578
579 if (res == SZ_OK)
580 res = p->readRes;
581
582 if (res == SZ_OK)
583 res = p->mtProgress.res;
584
585 #ifndef MTCODER__USE_WRITE_THREAD
586 if (res == SZ_OK)
587 res = p->writeRes;
588 #endif
589
590 if (res != SZ_OK)
591 MtCoder_Free(p);
592 return res;
593 }
594
595 #endif
596