1 /* MtCoder.c -- Multi-thread Coder
2 2018-07-04 : Igor Pavlov : Public domain */
3
4 #include "Precomp.h"
5
6 #include "MtCoder.h"
7
8 #ifndef _7ZIP_ST
9
MtProgressThunk_Progress(const ICompressProgress * pp,UInt64 inSize,UInt64 outSize)10 SRes MtProgressThunk_Progress(const ICompressProgress *pp, UInt64 inSize, UInt64 outSize)
11 {
12 CMtProgressThunk *thunk = CONTAINER_FROM_VTBL(pp, CMtProgressThunk, vt);
13 UInt64 inSize2 = 0;
14 UInt64 outSize2 = 0;
15 if (inSize != (UInt64)(Int64)-1)
16 {
17 inSize2 = inSize - thunk->inSize;
18 thunk->inSize = inSize;
19 }
20 if (outSize != (UInt64)(Int64)-1)
21 {
22 outSize2 = outSize - thunk->outSize;
23 thunk->outSize = outSize;
24 }
25 return MtProgress_ProgressAdd(thunk->mtProgress, inSize2, outSize2);
26 }
27
28
MtProgressThunk_CreateVTable(CMtProgressThunk * p)29 void MtProgressThunk_CreateVTable(CMtProgressThunk *p)
30 {
31 p->vt.Progress = MtProgressThunk_Progress;
32 }
33
34
35
36 #define RINOK_THREAD(x) { if ((x) != 0) return SZ_ERROR_THREAD; }
37
38
ArEvent_OptCreate_And_Reset(CEvent * p)39 static WRes ArEvent_OptCreate_And_Reset(CEvent *p)
40 {
41 if (Event_IsCreated(p))
42 return Event_Reset(p);
43 return AutoResetEvent_CreateNotSignaled(p);
44 }
45
46
47 static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp);
48
49
MtCoderThread_CreateAndStart(CMtCoderThread * t)50 static SRes MtCoderThread_CreateAndStart(CMtCoderThread *t)
51 {
52 WRes wres = ArEvent_OptCreate_And_Reset(&t->startEvent);
53 if (wres == 0)
54 {
55 t->stop = False;
56 if (!Thread_WasCreated(&t->thread))
57 wres = Thread_Create(&t->thread, ThreadFunc, t);
58 if (wres == 0)
59 wres = Event_Set(&t->startEvent);
60 }
61 if (wres == 0)
62 return SZ_OK;
63 return MY_SRes_HRESULT_FROM_WRes(wres);
64 }
65
66
MtCoderThread_Destruct(CMtCoderThread * t)67 static void MtCoderThread_Destruct(CMtCoderThread *t)
68 {
69 if (Thread_WasCreated(&t->thread))
70 {
71 t->stop = 1;
72 Event_Set(&t->startEvent);
73 Thread_Wait(&t->thread);
74 Thread_Close(&t->thread);
75 }
76
77 Event_Close(&t->startEvent);
78
79 if (t->inBuf)
80 {
81 ISzAlloc_Free(t->mtCoder->allocBig, t->inBuf);
82 t->inBuf = NULL;
83 }
84 }
85
86
87
FullRead(ISeqInStream * stream,Byte * data,size_t * processedSize)88 static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize)
89 {
90 size_t size = *processedSize;
91 *processedSize = 0;
92 while (size != 0)
93 {
94 size_t cur = size;
95 SRes res = ISeqInStream_Read(stream, data, &cur);
96 *processedSize += cur;
97 data += cur;
98 size -= cur;
99 RINOK(res);
100 if (cur == 0)
101 return SZ_OK;
102 }
103 return SZ_OK;
104 }
105
106
107 /*
108 ThreadFunc2() returns:
109 SZ_OK - in all normal cases (even for stream error or memory allocation error)
110 SZ_ERROR_THREAD - in case of failure in system synch function
111 */
112
ThreadFunc2(CMtCoderThread * t)113 static SRes ThreadFunc2(CMtCoderThread *t)
114 {
115 CMtCoder *mtc = t->mtCoder;
116
117 for (;;)
118 {
119 unsigned bi;
120 SRes res;
121 SRes res2;
122 BoolInt finished;
123 unsigned bufIndex;
124 size_t size;
125 const Byte *inData;
126 UInt64 readProcessed = 0;
127
128 RINOK_THREAD(Event_Wait(&mtc->readEvent))
129
130 /* after Event_Wait(&mtc->readEvent) we must call Event_Set(&mtc->readEvent) in any case to unlock another threads */
131
132 if (mtc->stopReading)
133 {
134 return Event_Set(&mtc->readEvent) == 0 ? SZ_OK : SZ_ERROR_THREAD;
135 }
136
137 res = MtProgress_GetError(&mtc->mtProgress);
138
139 size = 0;
140 inData = NULL;
141 finished = True;
142
143 if (res == SZ_OK)
144 {
145 size = mtc->blockSize;
146 if (mtc->inStream)
147 {
148 if (!t->inBuf)
149 {
150 t->inBuf = (Byte *)ISzAlloc_Alloc(mtc->allocBig, mtc->blockSize);
151 if (!t->inBuf)
152 res = SZ_ERROR_MEM;
153 }
154 if (res == SZ_OK)
155 {
156 res = FullRead(mtc->inStream, t->inBuf, &size);
157 readProcessed = mtc->readProcessed + size;
158 mtc->readProcessed = readProcessed;
159 }
160 if (res != SZ_OK)
161 {
162 mtc->readRes = res;
163 /* after reading error - we can stop encoding of previous blocks */
164 MtProgress_SetError(&mtc->mtProgress, res);
165 }
166 else
167 finished = (size != mtc->blockSize);
168 }
169 else
170 {
171 size_t rem;
172 readProcessed = mtc->readProcessed;
173 rem = mtc->inDataSize - (size_t)readProcessed;
174 if (size > rem)
175 size = rem;
176 inData = mtc->inData + (size_t)readProcessed;
177 readProcessed += size;
178 mtc->readProcessed = readProcessed;
179 finished = (mtc->inDataSize == (size_t)readProcessed);
180 }
181 }
182
183 /* we must get some block from blocksSemaphore before Event_Set(&mtc->readEvent) */
184
185 res2 = SZ_OK;
186
187 if (Semaphore_Wait(&mtc->blocksSemaphore) != 0)
188 {
189 res2 = SZ_ERROR_THREAD;
190 if (res == SZ_OK)
191 {
192 res = res2;
193 // MtProgress_SetError(&mtc->mtProgress, res);
194 }
195 }
196
197 bi = mtc->blockIndex;
198
199 if (++mtc->blockIndex >= mtc->numBlocksMax)
200 mtc->blockIndex = 0;
201
202 bufIndex = (unsigned)(int)-1;
203
204 if (res == SZ_OK)
205 res = MtProgress_GetError(&mtc->mtProgress);
206
207 if (res != SZ_OK)
208 finished = True;
209
210 if (!finished)
211 {
212 if (mtc->numStartedThreads < mtc->numStartedThreadsLimit
213 && mtc->expectedDataSize != readProcessed)
214 {
215 res = MtCoderThread_CreateAndStart(&mtc->threads[mtc->numStartedThreads]);
216 if (res == SZ_OK)
217 mtc->numStartedThreads++;
218 else
219 {
220 MtProgress_SetError(&mtc->mtProgress, res);
221 finished = True;
222 }
223 }
224 }
225
226 if (finished)
227 mtc->stopReading = True;
228
229 RINOK_THREAD(Event_Set(&mtc->readEvent))
230
231 if (res2 != SZ_OK)
232 return res2;
233
234 if (res == SZ_OK)
235 {
236 CriticalSection_Enter(&mtc->cs);
237 bufIndex = mtc->freeBlockHead;
238 mtc->freeBlockHead = mtc->freeBlockList[bufIndex];
239 CriticalSection_Leave(&mtc->cs);
240
241 res = mtc->mtCallback->Code(mtc->mtCallbackObject, t->index, bufIndex,
242 mtc->inStream ? t->inBuf : inData, size, finished);
243
244 // MtProgress_Reinit(&mtc->mtProgress, t->index);
245
246 if (res != SZ_OK)
247 MtProgress_SetError(&mtc->mtProgress, res);
248 }
249
250 {
251 CMtCoderBlock *block = &mtc->blocks[bi];
252 block->res = res;
253 block->bufIndex = bufIndex;
254 block->finished = finished;
255 }
256
257 #ifdef MTCODER__USE_WRITE_THREAD
258 RINOK_THREAD(Event_Set(&mtc->writeEvents[bi]))
259 #else
260 {
261 unsigned wi;
262 {
263 CriticalSection_Enter(&mtc->cs);
264 wi = mtc->writeIndex;
265 if (wi == bi)
266 mtc->writeIndex = (unsigned)(int)-1;
267 else
268 mtc->ReadyBlocks[bi] = True;
269 CriticalSection_Leave(&mtc->cs);
270 }
271
272 if (wi != bi)
273 {
274 if (res != SZ_OK || finished)
275 return 0;
276 continue;
277 }
278
279 if (mtc->writeRes != SZ_OK)
280 res = mtc->writeRes;
281
282 for (;;)
283 {
284 if (res == SZ_OK && bufIndex != (unsigned)(int)-1)
285 {
286 res = mtc->mtCallback->Write(mtc->mtCallbackObject, bufIndex);
287 if (res != SZ_OK)
288 {
289 mtc->writeRes = res;
290 MtProgress_SetError(&mtc->mtProgress, res);
291 }
292 }
293
294 if (++wi >= mtc->numBlocksMax)
295 wi = 0;
296 {
297 BoolInt isReady;
298
299 CriticalSection_Enter(&mtc->cs);
300
301 if (bufIndex != (unsigned)(int)-1)
302 {
303 mtc->freeBlockList[bufIndex] = mtc->freeBlockHead;
304 mtc->freeBlockHead = bufIndex;
305 }
306
307 isReady = mtc->ReadyBlocks[wi];
308
309 if (isReady)
310 mtc->ReadyBlocks[wi] = False;
311 else
312 mtc->writeIndex = wi;
313
314 CriticalSection_Leave(&mtc->cs);
315
316 RINOK_THREAD(Semaphore_Release1(&mtc->blocksSemaphore))
317
318 if (!isReady)
319 break;
320 }
321
322 {
323 CMtCoderBlock *block = &mtc->blocks[wi];
324 if (res == SZ_OK && block->res != SZ_OK)
325 res = block->res;
326 bufIndex = block->bufIndex;
327 finished = block->finished;
328 }
329 }
330 }
331 #endif
332
333 if (finished || res != SZ_OK)
334 return 0;
335 }
336 }
337
338
ThreadFunc(void * pp)339 static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp)
340 {
341 CMtCoderThread *t = (CMtCoderThread *)pp;
342 for (;;)
343 {
344 if (Event_Wait(&t->startEvent) != 0)
345 return SZ_ERROR_THREAD;
346 if (t->stop)
347 return 0;
348 {
349 SRes res = ThreadFunc2(t);
350 CMtCoder *mtc = t->mtCoder;
351 if (res != SZ_OK)
352 {
353 MtProgress_SetError(&mtc->mtProgress, res);
354 }
355
356 #ifndef MTCODER__USE_WRITE_THREAD
357 {
358 unsigned numFinished = (unsigned)InterlockedIncrement(&mtc->numFinishedThreads);
359 if (numFinished == mtc->numStartedThreads)
360 if (Event_Set(&mtc->finishedEvent) != 0)
361 return SZ_ERROR_THREAD;
362 }
363 #endif
364 }
365 }
366 }
367
368
369
MtCoder_Construct(CMtCoder * p)370 void MtCoder_Construct(CMtCoder *p)
371 {
372 unsigned i;
373
374 p->blockSize = 0;
375 p->numThreadsMax = 0;
376 p->expectedDataSize = (UInt64)(Int64)-1;
377
378 p->inStream = NULL;
379 p->inData = NULL;
380 p->inDataSize = 0;
381
382 p->progress = NULL;
383 p->allocBig = NULL;
384
385 p->mtCallback = NULL;
386 p->mtCallbackObject = NULL;
387
388 p->allocatedBufsSize = 0;
389
390 Event_Construct(&p->readEvent);
391 Semaphore_Construct(&p->blocksSemaphore);
392
393 for (i = 0; i < MTCODER__THREADS_MAX; i++)
394 {
395 CMtCoderThread *t = &p->threads[i];
396 t->mtCoder = p;
397 t->index = i;
398 t->inBuf = NULL;
399 t->stop = False;
400 Event_Construct(&t->startEvent);
401 Thread_Construct(&t->thread);
402 }
403
404 #ifdef MTCODER__USE_WRITE_THREAD
405 for (i = 0; i < MTCODER__BLOCKS_MAX; i++)
406 Event_Construct(&p->writeEvents[i]);
407 #else
408 Event_Construct(&p->finishedEvent);
409 #endif
410
411 CriticalSection_Init(&p->cs);
412 CriticalSection_Init(&p->mtProgress.cs);
413 }
414
415
416
417
MtCoder_Free(CMtCoder * p)418 static void MtCoder_Free(CMtCoder *p)
419 {
420 unsigned i;
421
422 /*
423 p->stopReading = True;
424 if (Event_IsCreated(&p->readEvent))
425 Event_Set(&p->readEvent);
426 */
427
428 for (i = 0; i < MTCODER__THREADS_MAX; i++)
429 MtCoderThread_Destruct(&p->threads[i]);
430
431 Event_Close(&p->readEvent);
432 Semaphore_Close(&p->blocksSemaphore);
433
434 #ifdef MTCODER__USE_WRITE_THREAD
435 for (i = 0; i < MTCODER__BLOCKS_MAX; i++)
436 Event_Close(&p->writeEvents[i]);
437 #else
438 Event_Close(&p->finishedEvent);
439 #endif
440 }
441
442
MtCoder_Destruct(CMtCoder * p)443 void MtCoder_Destruct(CMtCoder *p)
444 {
445 MtCoder_Free(p);
446
447 CriticalSection_Delete(&p->cs);
448 CriticalSection_Delete(&p->mtProgress.cs);
449 }
450
451
MtCoder_Code(CMtCoder * p)452 SRes MtCoder_Code(CMtCoder *p)
453 {
454 unsigned numThreads = p->numThreadsMax;
455 unsigned numBlocksMax;
456 unsigned i;
457 SRes res = SZ_OK;
458
459 if (numThreads > MTCODER__THREADS_MAX)
460 numThreads = MTCODER__THREADS_MAX;
461 numBlocksMax = MTCODER__GET_NUM_BLOCKS_FROM_THREADS(numThreads);
462
463 if (p->blockSize < ((UInt32)1 << 26)) numBlocksMax++;
464 if (p->blockSize < ((UInt32)1 << 24)) numBlocksMax++;
465 if (p->blockSize < ((UInt32)1 << 22)) numBlocksMax++;
466
467 if (numBlocksMax > MTCODER__BLOCKS_MAX)
468 numBlocksMax = MTCODER__BLOCKS_MAX;
469
470 if (p->blockSize != p->allocatedBufsSize)
471 {
472 for (i = 0; i < MTCODER__THREADS_MAX; i++)
473 {
474 CMtCoderThread *t = &p->threads[i];
475 if (t->inBuf)
476 {
477 ISzAlloc_Free(p->allocBig, t->inBuf);
478 t->inBuf = NULL;
479 }
480 }
481 p->allocatedBufsSize = p->blockSize;
482 }
483
484 p->readRes = SZ_OK;
485
486 MtProgress_Init(&p->mtProgress, p->progress);
487
488 #ifdef MTCODER__USE_WRITE_THREAD
489 for (i = 0; i < numBlocksMax; i++)
490 {
491 RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->writeEvents[i]));
492 }
493 #else
494 RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->finishedEvent));
495 #endif
496
497 {
498 RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->readEvent));
499
500 if (Semaphore_IsCreated(&p->blocksSemaphore))
501 {
502 RINOK_THREAD(Semaphore_Close(&p->blocksSemaphore));
503 }
504 RINOK_THREAD(Semaphore_Create(&p->blocksSemaphore, numBlocksMax, numBlocksMax));
505 }
506
507 for (i = 0; i < MTCODER__BLOCKS_MAX - 1; i++)
508 p->freeBlockList[i] = i + 1;
509 p->freeBlockList[MTCODER__BLOCKS_MAX - 1] = (unsigned)(int)-1;
510 p->freeBlockHead = 0;
511
512 p->readProcessed = 0;
513 p->blockIndex = 0;
514 p->numBlocksMax = numBlocksMax;
515 p->stopReading = False;
516
517 #ifndef MTCODER__USE_WRITE_THREAD
518 p->writeIndex = 0;
519 p->writeRes = SZ_OK;
520 for (i = 0; i < MTCODER__BLOCKS_MAX; i++)
521 p->ReadyBlocks[i] = False;
522 p->numFinishedThreads = 0;
523 #endif
524
525 p->numStartedThreadsLimit = numThreads;
526 p->numStartedThreads = 0;
527
528 // for (i = 0; i < numThreads; i++)
529 {
530 CMtCoderThread *nextThread = &p->threads[p->numStartedThreads++];
531 RINOK(MtCoderThread_CreateAndStart(nextThread));
532 }
533
534 RINOK_THREAD(Event_Set(&p->readEvent))
535
536 #ifdef MTCODER__USE_WRITE_THREAD
537 {
538 unsigned bi = 0;
539
540 for (;; bi++)
541 {
542 if (bi >= numBlocksMax)
543 bi = 0;
544
545 RINOK_THREAD(Event_Wait(&p->writeEvents[bi]))
546
547 {
548 const CMtCoderBlock *block = &p->blocks[bi];
549 unsigned bufIndex = block->bufIndex;
550 BoolInt finished = block->finished;
551 if (res == SZ_OK && block->res != SZ_OK)
552 res = block->res;
553
554 if (bufIndex != (unsigned)(int)-1)
555 {
556 if (res == SZ_OK)
557 {
558 res = p->mtCallback->Write(p->mtCallbackObject, bufIndex);
559 if (res != SZ_OK)
560 MtProgress_SetError(&p->mtProgress, res);
561 }
562
563 CriticalSection_Enter(&p->cs);
564 {
565 p->freeBlockList[bufIndex] = p->freeBlockHead;
566 p->freeBlockHead = bufIndex;
567 }
568 CriticalSection_Leave(&p->cs);
569 }
570
571 RINOK_THREAD(Semaphore_Release1(&p->blocksSemaphore))
572
573 if (finished)
574 break;
575 }
576 }
577 }
578 #else
579 {
580 WRes wres = Event_Wait(&p->finishedEvent);
581 res = MY_SRes_HRESULT_FROM_WRes(wres);
582 }
583 #endif
584
585 if (res == SZ_OK)
586 res = p->readRes;
587
588 if (res == SZ_OK)
589 res = p->mtProgress.res;
590
591 #ifndef MTCODER__USE_WRITE_THREAD
592 if (res == SZ_OK)
593 res = p->writeRes;
594 #endif
595
596 if (res != SZ_OK)
597 MtCoder_Free(p);
598 return res;
599 }
600
601 #endif
602