1 /* SPDX-License-Identifier: MIT */
2 /*
3 * gcc -Wall -O2 -D_GNU_SOURCE -o ucontext-cp ucontext-cp.c -luring
4 */
5 #define _POSIX_C_SOURCE 199309L
6 #include <stdio.h>
7 #include <fcntl.h>
8 #include <string.h>
9 #include <stdlib.h>
10 #include <unistd.h>
11 #include <assert.h>
12 #include <errno.h>
13 #include <ucontext.h>
14 #include <signal.h>
15 #include <inttypes.h>
16 #include <sys/types.h>
17 #include <sys/ioctl.h>
18 #include <sys/timerfd.h>
19 #include <poll.h>
20 #include "liburing.h"
21
22 #define QD 64
23 #define BS 1024
24
25 #ifndef SIGSTKSZ
26 #define SIGSTKSZ 8192
27 #endif
28
29 typedef struct {
30 struct io_uring *ring;
31 unsigned char *stack_buf;
32 ucontext_t ctx_main, ctx_fnew;
33 } async_context;
34
35 typedef struct {
36 async_context *pctx;
37 int *psuccess;
38 int *pfailure;
39 int infd;
40 int outfd;
41 } arguments_bundle;
42
43 #define DEFINE_AWAIT_OP(operation) \
44 static ssize_t await_##operation( \
45 async_context *pctx, \
46 int fd, \
47 const struct iovec *ioves, \
48 unsigned int nr_vecs, \
49 off_t offset) \
50 { \
51 struct io_uring_sqe *sqe = io_uring_get_sqe(pctx->ring); \
52 struct io_uring_cqe *cqe; \
53 \
54 if (!sqe) \
55 return -1; \
56 \
57 io_uring_prep_##operation(sqe, fd, ioves, nr_vecs, offset); \
58 io_uring_sqe_set_data(sqe, pctx); \
59 swapcontext(&pctx->ctx_fnew, &pctx->ctx_main); \
60 io_uring_peek_cqe(pctx->ring, &cqe); \
61 assert(cqe); \
62 io_uring_cqe_seen(pctx->ring, cqe); \
63 \
64 return cqe->res; \
65 }
66
67 DEFINE_AWAIT_OP(readv)
DEFINE_AWAIT_OP(writev)68 DEFINE_AWAIT_OP(writev)
69 #undef DEFINE_AWAIT_OP
70
71 static int await_delay(async_context *pctx, time_t seconds)
72 {
73 struct io_uring_sqe *sqe = io_uring_get_sqe(pctx->ring);
74 struct io_uring_cqe *cqe;
75 struct __kernel_timespec ts = {
76 .tv_sec = seconds,
77 .tv_nsec = 0
78 };
79
80 if (!sqe)
81 return -1;
82
83 io_uring_prep_timeout(sqe, &ts, 0, 0);
84 io_uring_sqe_set_data(sqe, pctx);
85 swapcontext(&pctx->ctx_fnew, &pctx->ctx_main);
86 io_uring_peek_cqe(pctx->ring, &cqe);
87 assert(cqe);
88 io_uring_cqe_seen(pctx->ring, cqe);
89
90 return 0;
91 }
92
setup_context(async_context * pctx,struct io_uring * ring)93 static int setup_context(async_context *pctx, struct io_uring *ring)
94 {
95 int ret;
96
97 pctx->ring = ring;
98 ret = getcontext(&pctx->ctx_fnew);
99 if (ret < 0) {
100 perror("getcontext");
101 return -1;
102 }
103 pctx->stack_buf = malloc(SIGSTKSZ);
104 if (!pctx->stack_buf) {
105 perror("malloc");
106 return -1;
107 }
108 pctx->ctx_fnew.uc_stack.ss_sp = pctx->stack_buf;
109 pctx->ctx_fnew.uc_stack.ss_size = SIGSTKSZ;
110 pctx->ctx_fnew.uc_link = &pctx->ctx_main;
111
112 return 0;
113 }
114
copy_file(async_context * pctx,int infd,int outfd,struct iovec * piov)115 static int copy_file(async_context *pctx, int infd, int outfd, struct iovec* piov)
116 {
117 off_t offset = 0;
118
119 for (;;) {
120 ssize_t bytes_read;
121
122 printf("%d->%d: readv %ld bytes from %ld\n", infd, outfd, (long) piov->iov_len, (long) offset);
123 if ((bytes_read = await_readv(pctx, infd, piov, 1, offset)) < 0) {
124 perror("await_readv");
125 return 1;
126 }
127 if (bytes_read == 0)
128 return 0;
129
130 piov->iov_len = bytes_read;
131
132 printf("%d->%d: writev %ld bytes from %ld\n", infd, outfd, (long) piov->iov_len, (long) offset);
133 if (await_writev(pctx, outfd, piov, 1, offset) != bytes_read) {
134 perror("await_writev");
135 return 1;
136 }
137 if (bytes_read < BS)
138 return 0;
139 offset += bytes_read;
140
141 printf("%d->%d: wait %ds\n", infd, outfd, 1);
142 await_delay(pctx, 1);
143 }
144 }
145
copy_file_wrapper(arguments_bundle * pbundle)146 static void copy_file_wrapper(arguments_bundle *pbundle)
147 {
148 struct iovec iov = {
149 .iov_base = malloc(BS),
150 .iov_len = BS,
151 };
152 async_context *pctx = pbundle->pctx;
153
154 int ret = copy_file(pctx, pbundle->infd, pbundle->outfd, &iov);
155
156 printf("%d->%d: done with ret code %d\n", pbundle->infd, pbundle->outfd, ret);
157
158 if (ret == 0) {
159 ++*pbundle->psuccess;
160 } else {
161 ++*pbundle->pfailure;
162 }
163
164 free(iov.iov_base);
165 close(pbundle->infd);
166 close(pbundle->outfd);
167 free(pbundle->pctx->stack_buf);
168 free(pbundle->pctx);
169 free(pbundle);
170
171 swapcontext(&pctx->ctx_fnew, &pctx->ctx_main);
172 }
173
main(int argc,char * argv[])174 int main(int argc, char *argv[])
175 {
176 struct io_uring ring;
177 int i, req_count, ret;
178 int success = 0, failure = 0;
179
180 if (argc < 3) {
181 fprintf(stderr, "%s: infile1 outfile1 [infile2 outfile2 [...]]\n", argv[0]);
182 return 1;
183 }
184
185 ret = io_uring_queue_init(QD, &ring, 0);
186 if (ret < 0) {
187 fprintf(stderr, "queue_init: %s\n", strerror(-ret));
188 return -1;
189 }
190
191 req_count = (argc - 1) / 2;
192 printf("copying %d files...\n", req_count);
193
194 for (i = 1; i < argc; i += 2) {
195 int infd, outfd;
196
197 async_context *pctx = malloc(sizeof(*pctx));
198
199 if (!pctx || setup_context(pctx, &ring))
200 return 1;
201
202 infd = open(argv[i], O_RDONLY);
203 if (infd < 0) {
204 perror("open infile");
205 return 1;
206 }
207 outfd = open(argv[i + 1], O_WRONLY | O_CREAT | O_TRUNC, 0644);
208 if (outfd < 0) {
209 perror("open outfile");
210 return 1;
211 }
212
213 arguments_bundle *pbundle = malloc(sizeof(*pbundle));
214 pbundle->pctx = pctx;
215 pbundle->psuccess = &success;
216 pbundle->pfailure = &failure;
217 pbundle->infd = infd;
218 pbundle->outfd = outfd;
219
220 makecontext(&pctx->ctx_fnew, (void (*)(void)) copy_file_wrapper, 1, pbundle);
221
222 if (swapcontext(&pctx->ctx_main, &pctx->ctx_fnew)) {
223 perror("swapcontext");
224 return 1;
225 }
226 }
227
228 /* event loop */
229 while (success + failure < req_count) {
230 struct io_uring_cqe *cqe;
231
232 /* usually be timed waiting */
233 ret = io_uring_submit_and_wait(&ring, 1);
234 if (ret < 0) {
235 fprintf(stderr, "submit_and_wait: %s\n", strerror(-ret));
236 return 1;
237 }
238
239 ret = io_uring_wait_cqe(&ring, &cqe);
240 if (ret < 0) {
241 fprintf(stderr, "wait_cqe: %s\n", strerror(-ret));
242 return 1;
243 }
244
245 async_context *pctx = io_uring_cqe_get_data(cqe);
246
247 if (swapcontext(&pctx->ctx_main, &pctx->ctx_fnew)) {
248 perror("swapcontext");
249 return 1;
250 }
251 }
252
253 io_uring_queue_exit(&ring);
254
255 printf("finished with %d success(es) and %d failure(s)\n", success, failure);
256
257 return failure > 0;
258 }
259