1 /***************************************************************************
2 * _ _ ____ _
3 * Project ___| | | | _ \| |
4 * / __| | | | |_) | |
5 * | (__| |_| | _ <| |___
6 * \___|\___/|_| \_\_____|
7 *
8 * Copyright (C) 1998 - 2019, Daniel Stenberg, <daniel@haxx.se>, et al.
9 *
10 * This software is licensed as described in the file COPYING, which
11 * you should have received as part of this distribution. The terms
12 * are also available at https://curl.haxx.se/docs/copyright.html.
13 *
14 * You may opt to use, copy, modify, merge, publish, distribute and/or sell
15 * copies of the Software, and permit persons to whom the Software is
16 * furnished to do so, under the terms of the COPYING file.
17 *
18 * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
19 * KIND, either express or implied.
20 *
21 ***************************************************************************/
22 /* <DESC>
23 * multi socket API usage together with with glib2
24 * </DESC>
25 */
26 /* Example application source code using the multi socket interface to
27 * download many files at once.
28 *
29 * Written by Jeff Pohlmeyer
30
31 Requires glib-2.x and a (POSIX?) system that has mkfifo().
32
33 This is an adaptation of libcurl's "hipev.c" and libevent's "event-test.c"
34 sample programs, adapted to use glib's g_io_channel in place of libevent.
35
36 When running, the program creates the named pipe "hiper.fifo"
37
38 Whenever there is input into the fifo, the program reads the input as a list
39 of URL's and creates some new easy handles to fetch each URL via the
40 curl_multi "hiper" API.
41
42
43 Thus, you can try a single URL:
44 % echo http://www.yahoo.com > hiper.fifo
45
46 Or a whole bunch of them:
47 % cat my-url-list > hiper.fifo
48
49 The fifo buffer is handled almost instantly, so you can even add more URL's
50 while the previous requests are still being downloaded.
51
52 This is purely a demo app, all retrieved data is simply discarded by the write
53 callback.
54
55 */
56
57 #include <glib.h>
58 #include <sys/stat.h>
59 #include <unistd.h>
60 #include <fcntl.h>
61 #include <stdlib.h>
62 #include <stdio.h>
63 #include <errno.h>
64 #include <curl/curl.h>
65
66 #define MSG_OUT g_print /* Change to "g_error" to write to stderr */
67 #define SHOW_VERBOSE 0 /* Set to non-zero for libcurl messages */
68 #define SHOW_PROGRESS 0 /* Set to non-zero to enable progress callback */
69
70 /* Global information, common to all connections */
71 typedef struct _GlobalInfo {
72 CURLM *multi;
73 guint timer_event;
74 int still_running;
75 } GlobalInfo;
76
77 /* Information associated with a specific easy handle */
78 typedef struct _ConnInfo {
79 CURL *easy;
80 char *url;
81 GlobalInfo *global;
82 char error[CURL_ERROR_SIZE];
83 } ConnInfo;
84
85 /* Information associated with a specific socket */
86 typedef struct _SockInfo {
87 curl_socket_t sockfd;
88 CURL *easy;
89 int action;
90 long timeout;
91 GIOChannel *ch;
92 guint ev;
93 GlobalInfo *global;
94 } SockInfo;
95
96 /* Die if we get a bad CURLMcode somewhere */
mcode_or_die(const char * where,CURLMcode code)97 static void mcode_or_die(const char *where, CURLMcode code)
98 {
99 if(CURLM_OK != code) {
100 const char *s;
101 switch(code) {
102 case CURLM_BAD_HANDLE: s = "CURLM_BAD_HANDLE"; break;
103 case CURLM_BAD_EASY_HANDLE: s = "CURLM_BAD_EASY_HANDLE"; break;
104 case CURLM_OUT_OF_MEMORY: s = "CURLM_OUT_OF_MEMORY"; break;
105 case CURLM_INTERNAL_ERROR: s = "CURLM_INTERNAL_ERROR"; break;
106 case CURLM_BAD_SOCKET: s = "CURLM_BAD_SOCKET"; break;
107 case CURLM_UNKNOWN_OPTION: s = "CURLM_UNKNOWN_OPTION"; break;
108 case CURLM_LAST: s = "CURLM_LAST"; break;
109 default: s = "CURLM_unknown";
110 }
111 MSG_OUT("ERROR: %s returns %s\n", where, s);
112 exit(code);
113 }
114 }
115
116 /* Check for completed transfers, and remove their easy handles */
check_multi_info(GlobalInfo * g)117 static void check_multi_info(GlobalInfo *g)
118 {
119 char *eff_url;
120 CURLMsg *msg;
121 int msgs_left;
122 ConnInfo *conn;
123 CURL *easy;
124 CURLcode res;
125
126 MSG_OUT("REMAINING: %d\n", g->still_running);
127 while((msg = curl_multi_info_read(g->multi, &msgs_left))) {
128 if(msg->msg == CURLMSG_DONE) {
129 easy = msg->easy_handle;
130 res = msg->data.result;
131 curl_easy_getinfo(easy, CURLINFO_PRIVATE, &conn);
132 curl_easy_getinfo(easy, CURLINFO_EFFECTIVE_URL, &eff_url);
133 MSG_OUT("DONE: %s => (%d) %s\n", eff_url, res, conn->error);
134 curl_multi_remove_handle(g->multi, easy);
135 free(conn->url);
136 curl_easy_cleanup(easy);
137 free(conn);
138 }
139 }
140 }
141
142 /* Called by glib when our timeout expires */
timer_cb(gpointer data)143 static gboolean timer_cb(gpointer data)
144 {
145 GlobalInfo *g = (GlobalInfo *)data;
146 CURLMcode rc;
147
148 rc = curl_multi_socket_action(g->multi,
149 CURL_SOCKET_TIMEOUT, 0, &g->still_running);
150 mcode_or_die("timer_cb: curl_multi_socket_action", rc);
151 check_multi_info(g);
152 return FALSE;
153 }
154
155 /* Update the event timer after curl_multi library calls */
update_timeout_cb(CURLM * multi,long timeout_ms,void * userp)156 static int update_timeout_cb(CURLM *multi, long timeout_ms, void *userp)
157 {
158 struct timeval timeout;
159 GlobalInfo *g = (GlobalInfo *)userp;
160 timeout.tv_sec = timeout_ms/1000;
161 timeout.tv_usec = (timeout_ms%1000)*1000;
162
163 MSG_OUT("*** update_timeout_cb %ld => %ld:%ld ***\n",
164 timeout_ms, timeout.tv_sec, timeout.tv_usec);
165
166 /*
167 * if timeout_ms is -1, just delete the timer
168 *
169 * For other values of timeout_ms, this should set or *update* the timer to
170 * the new value
171 */
172 if(timeout_ms >= 0)
173 g->timer_event = g_timeout_add(timeout_ms, timer_cb, g);
174 return 0;
175 }
176
177 /* Called by glib when we get action on a multi socket */
event_cb(GIOChannel * ch,GIOCondition condition,gpointer data)178 static gboolean event_cb(GIOChannel *ch, GIOCondition condition, gpointer data)
179 {
180 GlobalInfo *g = (GlobalInfo*) data;
181 CURLMcode rc;
182 int fd = g_io_channel_unix_get_fd(ch);
183
184 int action =
185 ((condition & G_IO_IN) ? CURL_CSELECT_IN : 0) |
186 ((condition & G_IO_OUT) ? CURL_CSELECT_OUT : 0);
187
188 rc = curl_multi_socket_action(g->multi, fd, action, &g->still_running);
189 mcode_or_die("event_cb: curl_multi_socket_action", rc);
190
191 check_multi_info(g);
192 if(g->still_running) {
193 return TRUE;
194 }
195 else {
196 MSG_OUT("last transfer done, kill timeout\n");
197 if(g->timer_event) {
198 g_source_remove(g->timer_event);
199 }
200 return FALSE;
201 }
202 }
203
204 /* Clean up the SockInfo structure */
remsock(SockInfo * f)205 static void remsock(SockInfo *f)
206 {
207 if(!f) {
208 return;
209 }
210 if(f->ev) {
211 g_source_remove(f->ev);
212 }
213 g_free(f);
214 }
215
216 /* Assign information to a SockInfo structure */
setsock(SockInfo * f,curl_socket_t s,CURL * e,int act,GlobalInfo * g)217 static void setsock(SockInfo *f, curl_socket_t s, CURL *e, int act,
218 GlobalInfo *g)
219 {
220 GIOCondition kind =
221 ((act & CURL_POLL_IN) ? G_IO_IN : 0) |
222 ((act & CURL_POLL_OUT) ? G_IO_OUT : 0);
223
224 f->sockfd = s;
225 f->action = act;
226 f->easy = e;
227 if(f->ev) {
228 g_source_remove(f->ev);
229 }
230 f->ev = g_io_add_watch(f->ch, kind, event_cb, g);
231 }
232
233 /* Initialize a new SockInfo structure */
addsock(curl_socket_t s,CURL * easy,int action,GlobalInfo * g)234 static void addsock(curl_socket_t s, CURL *easy, int action, GlobalInfo *g)
235 {
236 SockInfo *fdp = g_malloc0(sizeof(SockInfo));
237
238 fdp->global = g;
239 fdp->ch = g_io_channel_unix_new(s);
240 setsock(fdp, s, easy, action, g);
241 curl_multi_assign(g->multi, s, fdp);
242 }
243
244 /* CURLMOPT_SOCKETFUNCTION */
sock_cb(CURL * e,curl_socket_t s,int what,void * cbp,void * sockp)245 static int sock_cb(CURL *e, curl_socket_t s, int what, void *cbp, void *sockp)
246 {
247 GlobalInfo *g = (GlobalInfo*) cbp;
248 SockInfo *fdp = (SockInfo*) sockp;
249 static const char *whatstr[]={ "none", "IN", "OUT", "INOUT", "REMOVE" };
250
251 MSG_OUT("socket callback: s=%d e=%p what=%s ", s, e, whatstr[what]);
252 if(what == CURL_POLL_REMOVE) {
253 MSG_OUT("\n");
254 remsock(fdp);
255 }
256 else {
257 if(!fdp) {
258 MSG_OUT("Adding data: %s%s\n",
259 (what & CURL_POLL_IN) ? "READ" : "",
260 (what & CURL_POLL_OUT) ? "WRITE" : "");
261 addsock(s, e, what, g);
262 }
263 else {
264 MSG_OUT(
265 "Changing action from %d to %d\n", fdp->action, what);
266 setsock(fdp, s, e, what, g);
267 }
268 }
269 return 0;
270 }
271
272 /* CURLOPT_WRITEFUNCTION */
write_cb(void * ptr,size_t size,size_t nmemb,void * data)273 static size_t write_cb(void *ptr, size_t size, size_t nmemb, void *data)
274 {
275 size_t realsize = size * nmemb;
276 ConnInfo *conn = (ConnInfo*) data;
277 (void)ptr;
278 (void)conn;
279 return realsize;
280 }
281
282 /* CURLOPT_PROGRESSFUNCTION */
prog_cb(void * p,double dltotal,double dlnow,double ult,double uln)283 static int prog_cb(void *p, double dltotal, double dlnow, double ult,
284 double uln)
285 {
286 ConnInfo *conn = (ConnInfo *)p;
287 MSG_OUT("Progress: %s (%g/%g)\n", conn->url, dlnow, dltotal);
288 return 0;
289 }
290
291 /* Create a new easy handle, and add it to the global curl_multi */
new_conn(char * url,GlobalInfo * g)292 static void new_conn(char *url, GlobalInfo *g)
293 {
294 ConnInfo *conn;
295 CURLMcode rc;
296
297 conn = g_malloc0(sizeof(ConnInfo));
298 conn->error[0]='\0';
299 conn->easy = curl_easy_init();
300 if(!conn->easy) {
301 MSG_OUT("curl_easy_init() failed, exiting!\n");
302 exit(2);
303 }
304 conn->global = g;
305 conn->url = g_strdup(url);
306 curl_easy_setopt(conn->easy, CURLOPT_URL, conn->url);
307 curl_easy_setopt(conn->easy, CURLOPT_WRITEFUNCTION, write_cb);
308 curl_easy_setopt(conn->easy, CURLOPT_WRITEDATA, &conn);
309 curl_easy_setopt(conn->easy, CURLOPT_VERBOSE, (long)SHOW_VERBOSE);
310 curl_easy_setopt(conn->easy, CURLOPT_ERRORBUFFER, conn->error);
311 curl_easy_setopt(conn->easy, CURLOPT_PRIVATE, conn);
312 curl_easy_setopt(conn->easy, CURLOPT_NOPROGRESS, SHOW_PROGRESS?0L:1L);
313 curl_easy_setopt(conn->easy, CURLOPT_PROGRESSFUNCTION, prog_cb);
314 curl_easy_setopt(conn->easy, CURLOPT_PROGRESSDATA, conn);
315 curl_easy_setopt(conn->easy, CURLOPT_FOLLOWLOCATION, 1L);
316 curl_easy_setopt(conn->easy, CURLOPT_CONNECTTIMEOUT, 30L);
317 curl_easy_setopt(conn->easy, CURLOPT_LOW_SPEED_LIMIT, 1L);
318 curl_easy_setopt(conn->easy, CURLOPT_LOW_SPEED_TIME, 30L);
319
320 MSG_OUT("Adding easy %p to multi %p (%s)\n", conn->easy, g->multi, url);
321 rc = curl_multi_add_handle(g->multi, conn->easy);
322 mcode_or_die("new_conn: curl_multi_add_handle", rc);
323
324 /* note that the add_handle() will set a time-out to trigger very soon so
325 that the necessary socket_action() call will be called by this app */
326 }
327
328 /* This gets called by glib whenever data is received from the fifo */
fifo_cb(GIOChannel * ch,GIOCondition condition,gpointer data)329 static gboolean fifo_cb(GIOChannel *ch, GIOCondition condition, gpointer data)
330 {
331 #define BUF_SIZE 1024
332 gsize len, tp;
333 gchar *buf, *tmp, *all = NULL;
334 GIOStatus rv;
335
336 do {
337 GError *err = NULL;
338 rv = g_io_channel_read_line(ch, &buf, &len, &tp, &err);
339 if(buf) {
340 if(tp) {
341 buf[tp]='\0';
342 }
343 new_conn(buf, (GlobalInfo*)data);
344 g_free(buf);
345 }
346 else {
347 buf = g_malloc(BUF_SIZE + 1);
348 while(TRUE) {
349 buf[BUF_SIZE]='\0';
350 g_io_channel_read_chars(ch, buf, BUF_SIZE, &len, &err);
351 if(len) {
352 buf[len]='\0';
353 if(all) {
354 tmp = all;
355 all = g_strdup_printf("%s%s", tmp, buf);
356 g_free(tmp);
357 }
358 else {
359 all = g_strdup(buf);
360 }
361 }
362 else {
363 break;
364 }
365 }
366 if(all) {
367 new_conn(all, (GlobalInfo*)data);
368 g_free(all);
369 }
370 g_free(buf);
371 }
372 if(err) {
373 g_error("fifo_cb: %s", err->message);
374 g_free(err);
375 break;
376 }
377 } while((len) && (rv == G_IO_STATUS_NORMAL));
378 return TRUE;
379 }
380
init_fifo(void)381 int init_fifo(void)
382 {
383 struct stat st;
384 const char *fifo = "hiper.fifo";
385 int socket;
386
387 if(lstat (fifo, &st) == 0) {
388 if((st.st_mode & S_IFMT) == S_IFREG) {
389 errno = EEXIST;
390 perror("lstat");
391 exit(1);
392 }
393 }
394
395 unlink(fifo);
396 if(mkfifo (fifo, 0600) == -1) {
397 perror("mkfifo");
398 exit(1);
399 }
400
401 socket = open(fifo, O_RDWR | O_NONBLOCK, 0);
402
403 if(socket == -1) {
404 perror("open");
405 exit(1);
406 }
407 MSG_OUT("Now, pipe some URL's into > %s\n", fifo);
408
409 return socket;
410 }
411
main(int argc,char ** argv)412 int main(int argc, char **argv)
413 {
414 GlobalInfo *g;
415 GMainLoop*gmain;
416 int fd;
417 GIOChannel* ch;
418 g = g_malloc0(sizeof(GlobalInfo));
419
420 fd = init_fifo();
421 ch = g_io_channel_unix_new(fd);
422 g_io_add_watch(ch, G_IO_IN, fifo_cb, g);
423 gmain = g_main_loop_new(NULL, FALSE);
424 g->multi = curl_multi_init();
425 curl_multi_setopt(g->multi, CURLMOPT_SOCKETFUNCTION, sock_cb);
426 curl_multi_setopt(g->multi, CURLMOPT_SOCKETDATA, g);
427 curl_multi_setopt(g->multi, CURLMOPT_TIMERFUNCTION, update_timeout_cb);
428 curl_multi_setopt(g->multi, CURLMOPT_TIMERDATA, g);
429
430 /* we don't call any curl_multi_socket*() function yet as we have no handles
431 added! */
432
433 g_main_loop_run(gmain);
434 curl_multi_cleanup(g->multi);
435 return 0;
436 }
437