• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* MtDec.h -- Multi-thread Decoder
2 2023-04-02 : Igor Pavlov : Public domain */
3 
4 #ifndef ZIP7_INC_MT_DEC_H
5 #define ZIP7_INC_MT_DEC_H
6 
7 #include "7zTypes.h"
8 
9 #ifndef Z7_ST
10 #include "Threads.h"
11 #endif
12 
13 EXTERN_C_BEGIN
14 
15 #ifndef Z7_ST
16 
17 #ifndef Z7_ST
18   #define MTDEC_THREADS_MAX 32
19 #else
20   #define MTDEC_THREADS_MAX 1
21 #endif
22 
23 
24 typedef struct
25 {
26   ICompressProgressPtr progress;
27   SRes res;
28   UInt64 totalInSize;
29   UInt64 totalOutSize;
30   CCriticalSection cs;
31 } CMtProgress;
32 
33 void MtProgress_Init(CMtProgress *p, ICompressProgressPtr progress);
34 SRes MtProgress_Progress_ST(CMtProgress *p);
35 SRes MtProgress_ProgressAdd(CMtProgress *p, UInt64 inSize, UInt64 outSize);
36 SRes MtProgress_GetError(CMtProgress *p);
37 void MtProgress_SetError(CMtProgress *p, SRes res);
38 
39 struct CMtDec;
40 
41 typedef struct
42 {
43   struct CMtDec_ *mtDec;
44   unsigned index;
45   void *inBuf;
46 
47   size_t inDataSize_Start; // size of input data in start block
48   UInt64 inDataSize;       // total size of input data in all blocks
49 
50   CThread thread;
51   CAutoResetEvent canRead;
52   CAutoResetEvent canWrite;
53   void  *allocaPtr;
54 } CMtDecThread;
55 
56 void MtDecThread_FreeInBufs(CMtDecThread *t);
57 
58 
59 typedef enum
60 {
61   MTDEC_PARSE_CONTINUE, // continue this block with more input data
62   MTDEC_PARSE_OVERFLOW, // MT buffers overflow, need switch to single-thread
63   MTDEC_PARSE_NEW,      // new block
64   MTDEC_PARSE_END       // end of block threading. But we still can return to threading after Write(&needContinue)
65 } EMtDecParseState;
66 
67 typedef struct
68 {
69   // in
70   int startCall;
71   const Byte *src;
72   size_t srcSize;
73       // in  : (srcSize == 0) is allowed
74       // out : it's allowed to return less that actually was used ?
75   int srcFinished;
76 
77   // out
78   EMtDecParseState state;
79   BoolInt canCreateNewThread;
80   UInt64 outPos; // check it (size_t)
81 } CMtDecCallbackInfo;
82 
83 
84 typedef struct
85 {
86   void (*Parse)(void *p, unsigned coderIndex, CMtDecCallbackInfo *ci);
87 
88   // PreCode() and Code():
89   // (SRes_return_result != SZ_OK) means stop decoding, no need another blocks
90   SRes (*PreCode)(void *p, unsigned coderIndex);
91   SRes (*Code)(void *p, unsigned coderIndex,
92       const Byte *src, size_t srcSize, int srcFinished,
93       UInt64 *inCodePos, UInt64 *outCodePos, int *stop);
94   // stop - means stop another Code calls
95 
96 
97   /* Write() must be called, if Parse() was called
98       set (needWrite) if
99       {
100          && (was not interrupted by progress)
101          && (was not interrupted in previous block)
102       }
103 
104     out:
105       if (*needContinue), decoder still need to continue decoding with new iteration,
106          even after MTDEC_PARSE_END
107       if (*canRecode), we didn't flush current block data, so we still can decode current block later.
108   */
109   SRes (*Write)(void *p, unsigned coderIndex,
110       BoolInt needWriteToStream,
111       const Byte *src, size_t srcSize, BoolInt isCross,
112       // int srcFinished,
113       BoolInt *needContinue,
114       BoolInt *canRecode);
115 
116 } IMtDecCallback2;
117 
118 
119 
120 typedef struct CMtDec_
121 {
122   /* input variables */
123 
124   size_t inBufSize;        /* size of input block */
125   unsigned numThreadsMax;
126   // size_t inBlockMax;
127   unsigned numThreadsMax_2;
128 
129   ISeqInStreamPtr inStream;
130   // const Byte *inData;
131   // size_t inDataSize;
132 
133   ICompressProgressPtr progress;
134   ISzAllocPtr alloc;
135 
136   IMtDecCallback2 *mtCallback;
137   void *mtCallbackObject;
138 
139 
140   /* internal variables */
141 
142   size_t allocatedBufsSize;
143 
144   BoolInt exitThread;
145   WRes exitThreadWRes;
146 
147   UInt64 blockIndex;
148   BoolInt isAllocError;
149   BoolInt overflow;
150   SRes threadingErrorSRes;
151 
152   BoolInt needContinue;
153 
154   // CAutoResetEvent finishedEvent;
155 
156   SRes readRes;
157   SRes codeRes;
158 
159   BoolInt wasInterrupted;
160 
161   unsigned numStartedThreads_Limit;
162   unsigned numStartedThreads;
163 
164   Byte *crossBlock;
165   size_t crossStart;
166   size_t crossEnd;
167   UInt64 readProcessed;
168   BoolInt readWasFinished;
169   UInt64 inProcessed;
170 
171   unsigned filledThreadStart;
172   unsigned numFilledThreads;
173 
174   #ifndef Z7_ST
175   BoolInt needInterrupt;
176   UInt64 interruptIndex;
177   CMtProgress mtProgress;
178   CMtDecThread threads[MTDEC_THREADS_MAX];
179   #endif
180 } CMtDec;
181 
182 
183 void MtDec_Construct(CMtDec *p);
184 void MtDec_Destruct(CMtDec *p);
185 
186 /*
187 MtDec_Code() returns:
188   SZ_OK - in most cases
189   MY_SRes_HRESULT_FROM_WRes(WRes_error) - in case of unexpected error in threading function
190 */
191 
192 SRes MtDec_Code(CMtDec *p);
193 Byte *MtDec_GetCrossBuff(CMtDec *p);
194 
195 int MtDec_PrepareRead(CMtDec *p);
196 const Byte *MtDec_Read(CMtDec *p, size_t *inLim);
197 
198 #endif
199 
200 EXTERN_C_END
201 
202 #endif
203