• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) Meta Platforms, Inc. and affiliates.
3  * All rights reserved.
4  *
5  * This source code is licensed under both the BSD-style license (found in the
6  * LICENSE file in the root directory of this source tree) and the GPLv2 (found
7  * in the COPYING file in the root directory of this source tree).
8  * You may select, at your option, one of the above-listed licenses.
9  */
10 
11  /*
12   * FileIO AsyncIO exposes read/write IO pools that allow doing IO asynchronously.
13   * Current implementation relies on having one thread that reads and one that
14   * writes.
15   * Each IO pool supports up to `MAX_IO_JOBS` that can be enqueued for work, but
16   * are performed serially by the appropriate worker thread.
17   * Most systems exposes better primitives to perform asynchronous IO, such as
18   * io_uring on newer linux systems. The API is built in such a way that in the
19   * future we could replace the threads with better solutions when available.
20   */
21 
22 #ifndef ZSTD_FILEIO_ASYNCIO_H
23 #define ZSTD_FILEIO_ASYNCIO_H
24 
25 #include "../lib/common/mem.h"     /* U32, U64 */
26 #include "fileio_types.h"
27 #include "platform.h"
28 #include "util.h"
29 #include "../lib/common/pool.h"
30 #include "../lib/common/threading.h"
31 
32 #define MAX_IO_JOBS          (10)
33 
34 typedef struct {
35     /* These struct fields should be set only on creation and not changed afterwards */
36     POOL_ctx* threadPool;
37     int threadPoolActive;
38     int totalIoJobs;
39     const FIO_prefs_t* prefs;
40     POOL_function poolFunction;
41 
42     /* Controls the file we currently write to, make changes only by using provided utility functions */
43     FILE* file;
44 
45     /* The jobs and availableJobsCount fields are accessed by both the main and worker threads and should
46      * only be mutated after locking the mutex */
47     ZSTD_pthread_mutex_t ioJobsMutex;
48     void* availableJobs[MAX_IO_JOBS];
49     int availableJobsCount;
50     size_t jobBufferSize;
51 } IOPoolCtx_t;
52 
53 typedef struct {
54     IOPoolCtx_t base;
55 
56     /* State regarding the currently read file */
57     int reachedEof;
58     U64 nextReadOffset;
59     U64 waitingOnOffset;
60 
61     /* We may hold an IOJob object as needed if we actively expose its buffer. */
62     void *currentJobHeld;
63 
64     /* Coalesce buffer is used to join two buffers in case where we need to read more bytes than left in
65      * the first of them. Shouldn't be accessed from outside ot utility functions. */
66     U8 *coalesceBuffer;
67 
68     /* Read buffer can be used by consumer code, take care when copying this pointer aside as it might
69      * change when consuming / refilling buffer. */
70     U8 *srcBuffer;
71     size_t srcBufferLoaded;
72 
73     /* We need to know what tasks completed so we can use their buffers when their time comes.
74      * Should only be accessed after locking base.ioJobsMutex . */
75     void* completedJobs[MAX_IO_JOBS];
76     int completedJobsCount;
77     ZSTD_pthread_cond_t jobCompletedCond;
78 } ReadPoolCtx_t;
79 
80 typedef struct {
81     IOPoolCtx_t base;
82     unsigned storedSkips;
83 } WritePoolCtx_t;
84 
85 typedef struct {
86     /* These fields are automatically set and shouldn't be changed by non WritePool code. */
87     void *ctx;
88     FILE* file;
89     void *buffer;
90     size_t bufferSize;
91 
92     /* This field should be changed before a job is queued for execution and should contain the number
93      * of bytes to write from the buffer. */
94     size_t usedBufferSize;
95     U64 offset;
96 } IOJob_t;
97 
98 /* AIO_supported:
99  * Returns 1 if AsyncIO is supported on the system, 0 otherwise. */
100 int AIO_supported(void);
101 
102 
103 /* AIO_WritePool_releaseIoJob:
104  * Releases an acquired job back to the pool. Doesn't execute the job. */
105 void AIO_WritePool_releaseIoJob(IOJob_t *job);
106 
107 /* AIO_WritePool_acquireJob:
108  * Returns an available write job to be used for a future write. */
109 IOJob_t* AIO_WritePool_acquireJob(WritePoolCtx_t *ctx);
110 
111 /* AIO_WritePool_enqueueAndReacquireWriteJob:
112  * Enqueues a write job for execution and acquires a new one.
113  * After execution `job`'s pointed value would change to the newly acquired job.
114  * Make sure to set `usedBufferSize` to the wanted length before call.
115  * The queued job shouldn't be used directly after queueing it. */
116 void AIO_WritePool_enqueueAndReacquireWriteJob(IOJob_t **job);
117 
118 /* AIO_WritePool_sparseWriteEnd:
119  * Ends sparse writes to the current file.
120  * Blocks on completion of all current write jobs before executing. */
121 void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t *ctx);
122 
123 /* AIO_WritePool_setFile:
124  * Sets the destination file for future writes in the pool.
125  * Requires completion of all queues write jobs and release of all otherwise acquired jobs.
126  * Also requires ending of sparse write if a previous file was used in sparse mode. */
127 void AIO_WritePool_setFile(WritePoolCtx_t *ctx, FILE* file);
128 
129 /* AIO_WritePool_getFile:
130  * Returns the file the writePool is currently set to write to. */
131 FILE* AIO_WritePool_getFile(const WritePoolCtx_t* ctx);
132 
133 /* AIO_WritePool_closeFile:
134  * Ends sparse write and closes the writePool's current file and sets the file to NULL.
135  * Requires completion of all queues write jobs and release of all otherwise acquired jobs.  */
136 int AIO_WritePool_closeFile(WritePoolCtx_t *ctx);
137 
138 /* AIO_WritePool_create:
139  * Allocates and sets and a new write pool including its included jobs.
140  * bufferSize should be set to the maximal buffer we want to write to at a time. */
141 WritePoolCtx_t* AIO_WritePool_create(const FIO_prefs_t* prefs, size_t bufferSize);
142 
143 /* AIO_WritePool_free:
144  * Frees and releases a writePool and its resources. Closes destination file. */
145 void AIO_WritePool_free(WritePoolCtx_t* ctx);
146 
147 /* AIO_WritePool_setAsync:
148  * Allows (de)activating async mode, to be used when the expected overhead
149  * of asyncio costs more than the expected gains. */
150 void AIO_WritePool_setAsync(WritePoolCtx_t* ctx, int async);
151 
152 /* AIO_ReadPool_create:
153  * Allocates and sets and a new readPool including its included jobs.
154  * bufferSize should be set to the maximal buffer we want to read at a time, will also be used
155  * as our basic read size. */
156 ReadPoolCtx_t* AIO_ReadPool_create(const FIO_prefs_t* prefs, size_t bufferSize);
157 
158 /* AIO_ReadPool_free:
159  * Frees and releases a readPool and its resources. Closes source file. */
160 void AIO_ReadPool_free(ReadPoolCtx_t* ctx);
161 
162 /* AIO_ReadPool_setAsync:
163  * Allows (de)activating async mode, to be used when the expected overhead
164  * of asyncio costs more than the expected gains. */
165 void AIO_ReadPool_setAsync(ReadPoolCtx_t* ctx, int async);
166 
167 /* AIO_ReadPool_consumeBytes:
168  * Consumes byes from srcBuffer's beginning and updates srcBufferLoaded accordingly. */
169 void AIO_ReadPool_consumeBytes(ReadPoolCtx_t *ctx, size_t n);
170 
171 /* AIO_ReadPool_fillBuffer:
172  * Makes sure buffer has at least n bytes loaded (as long as n is not bigger than the initialized bufferSize).
173  * Returns if srcBuffer has at least n bytes loaded or if we've reached the end of the file.
174  * Return value is the number of bytes added to the buffer.
175  * Note that srcBuffer might have up to 2 times bufferSize bytes. */
176 size_t AIO_ReadPool_fillBuffer(ReadPoolCtx_t *ctx, size_t n);
177 
178 /* AIO_ReadPool_consumeAndRefill:
179  * Consumes the current buffer and refills it with bufferSize bytes. */
180 size_t AIO_ReadPool_consumeAndRefill(ReadPoolCtx_t *ctx);
181 
182 /* AIO_ReadPool_setFile:
183  * Sets the source file for future read in the pool. Initiates reading immediately if file is not NULL.
184  * Waits for all current enqueued tasks to complete if a previous file was set. */
185 void AIO_ReadPool_setFile(ReadPoolCtx_t *ctx, FILE* file);
186 
187 /* AIO_ReadPool_getFile:
188  * Returns the current file set for the read pool. */
189 FILE* AIO_ReadPool_getFile(const ReadPoolCtx_t *ctx);
190 
191 /* AIO_ReadPool_closeFile:
192  * Closes the current set file. Waits for all current enqueued tasks to complete and resets state. */
193 int AIO_ReadPool_closeFile(ReadPoolCtx_t *ctx);
194 
195 #endif /* ZSTD_FILEIO_ASYNCIO_H */
196