1 /***************************************************************************
2 * _ _ ____ _
3 * Project ___| | | | _ \| |
4 * / __| | | | |_) | |
5 * | (__| |_| | _ <| |___
6 * \___|\___/|_| \_\_____|
7 *
8 * Copyright (C) 1998 - 2016, Daniel Stenberg, <daniel@haxx.se>, et al.
9 *
10 * This software is licensed as described in the file COPYING, which
11 * you should have received as part of this distribution. The terms
12 * are also available at https://curl.haxx.se/docs/copyright.html.
13 *
14 * You may opt to use, copy, modify, merge, publish, distribute and/or sell
15 * copies of the Software, and permit persons to whom the Software is
16 * furnished to do so, under the terms of the COPYING file.
17 *
18 * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
19 * KIND, either express or implied.
20 *
21 ***************************************************************************/
22 /* <DESC>
23 * multi socket API usage together with with glib2
24 * </DESC>
25 */
26 /* Example application source code using the multi socket interface to
27 * download many files at once.
28 *
29 * Written by Jeff Pohlmeyer
30
31 Requires glib-2.x and a (POSIX?) system that has mkfifo().
32
33 This is an adaptation of libcurl's "hipev.c" and libevent's "event-test.c"
34 sample programs, adapted to use glib's g_io_channel in place of libevent.
35
36 When running, the program creates the named pipe "hiper.fifo"
37
38 Whenever there is input into the fifo, the program reads the input as a list
39 of URL's and creates some new easy handles to fetch each URL via the
40 curl_multi "hiper" API.
41
42
43 Thus, you can try a single URL:
44 % echo http://www.yahoo.com > hiper.fifo
45
46 Or a whole bunch of them:
47 % cat my-url-list > hiper.fifo
48
49 The fifo buffer is handled almost instantly, so you can even add more URL's
50 while the previous requests are still being downloaded.
51
52 This is purely a demo app, all retrieved data is simply discarded by the write
53 callback.
54
55 */
56
57 #include <glib.h>
58 #include <sys/stat.h>
59 #include <unistd.h>
60 #include <fcntl.h>
61 #include <stdlib.h>
62 #include <stdio.h>
63 #include <errno.h>
64 #include <curl/curl.h>
65
66 #define MSG_OUT g_print /* Change to "g_error" to write to stderr */
67 #define SHOW_VERBOSE 0 /* Set to non-zero for libcurl messages */
68 #define SHOW_PROGRESS 0 /* Set to non-zero to enable progress callback */
69
70 /* Global information, common to all connections */
71 typedef struct _GlobalInfo {
72 CURLM *multi;
73 guint timer_event;
74 int still_running;
75 } GlobalInfo;
76
77 /* Information associated with a specific easy handle */
78 typedef struct _ConnInfo {
79 CURL *easy;
80 char *url;
81 GlobalInfo *global;
82 char error[CURL_ERROR_SIZE];
83 } ConnInfo;
84
85 /* Information associated with a specific socket */
86 typedef struct _SockInfo {
87 curl_socket_t sockfd;
88 CURL *easy;
89 int action;
90 long timeout;
91 GIOChannel *ch;
92 guint ev;
93 GlobalInfo *global;
94 } SockInfo;
95
96 /* Die if we get a bad CURLMcode somewhere */
mcode_or_die(const char * where,CURLMcode code)97 static void mcode_or_die(const char *where, CURLMcode code) {
98 if(CURLM_OK != code) {
99 const char *s;
100 switch (code) {
101 case CURLM_BAD_HANDLE: s="CURLM_BAD_HANDLE"; break;
102 case CURLM_BAD_EASY_HANDLE: s="CURLM_BAD_EASY_HANDLE"; break;
103 case CURLM_OUT_OF_MEMORY: s="CURLM_OUT_OF_MEMORY"; break;
104 case CURLM_INTERNAL_ERROR: s="CURLM_INTERNAL_ERROR"; break;
105 case CURLM_BAD_SOCKET: s="CURLM_BAD_SOCKET"; break;
106 case CURLM_UNKNOWN_OPTION: s="CURLM_UNKNOWN_OPTION"; break;
107 case CURLM_LAST: s="CURLM_LAST"; break;
108 default: s="CURLM_unknown";
109 }
110 MSG_OUT("ERROR: %s returns %s\n", where, s);
111 exit(code);
112 }
113 }
114
115 /* Check for completed transfers, and remove their easy handles */
check_multi_info(GlobalInfo * g)116 static void check_multi_info(GlobalInfo *g)
117 {
118 char *eff_url;
119 CURLMsg *msg;
120 int msgs_left;
121 ConnInfo *conn;
122 CURL *easy;
123 CURLcode res;
124
125 MSG_OUT("REMAINING: %d\n", g->still_running);
126 while((msg = curl_multi_info_read(g->multi, &msgs_left))) {
127 if(msg->msg == CURLMSG_DONE) {
128 easy = msg->easy_handle;
129 res = msg->data.result;
130 curl_easy_getinfo(easy, CURLINFO_PRIVATE, &conn);
131 curl_easy_getinfo(easy, CURLINFO_EFFECTIVE_URL, &eff_url);
132 MSG_OUT("DONE: %s => (%d) %s\n", eff_url, res, conn->error);
133 curl_multi_remove_handle(g->multi, easy);
134 free(conn->url);
135 curl_easy_cleanup(easy);
136 free(conn);
137 }
138 }
139 }
140
141 /* Called by glib when our timeout expires */
timer_cb(gpointer data)142 static gboolean timer_cb(gpointer data)
143 {
144 GlobalInfo *g = (GlobalInfo *)data;
145 CURLMcode rc;
146
147 rc = curl_multi_socket_action(g->multi,
148 CURL_SOCKET_TIMEOUT, 0, &g->still_running);
149 mcode_or_die("timer_cb: curl_multi_socket_action", rc);
150 check_multi_info(g);
151 return FALSE;
152 }
153
154 /* Update the event timer after curl_multi library calls */
update_timeout_cb(CURLM * multi,long timeout_ms,void * userp)155 static int update_timeout_cb(CURLM *multi, long timeout_ms, void *userp)
156 {
157 struct timeval timeout;
158 GlobalInfo *g=(GlobalInfo *)userp;
159 timeout.tv_sec = timeout_ms/1000;
160 timeout.tv_usec = (timeout_ms%1000)*1000;
161
162 MSG_OUT("*** update_timeout_cb %ld => %ld:%ld ***\n",
163 timeout_ms, timeout.tv_sec, timeout.tv_usec);
164
165 g->timer_event = g_timeout_add(timeout_ms, timer_cb, g);
166 return 0;
167 }
168
169 /* Called by glib when we get action on a multi socket */
event_cb(GIOChannel * ch,GIOCondition condition,gpointer data)170 static gboolean event_cb(GIOChannel *ch, GIOCondition condition, gpointer data)
171 {
172 GlobalInfo *g = (GlobalInfo*) data;
173 CURLMcode rc;
174 int fd=g_io_channel_unix_get_fd(ch);
175
176 int action =
177 (condition & G_IO_IN ? CURL_CSELECT_IN : 0) |
178 (condition & G_IO_OUT ? CURL_CSELECT_OUT : 0);
179
180 rc = curl_multi_socket_action(g->multi, fd, action, &g->still_running);
181 mcode_or_die("event_cb: curl_multi_socket_action", rc);
182
183 check_multi_info(g);
184 if(g->still_running) {
185 return TRUE;
186 }
187 else {
188 MSG_OUT("last transfer done, kill timeout\n");
189 if(g->timer_event) {
190 g_source_remove(g->timer_event);
191 }
192 return FALSE;
193 }
194 }
195
196 /* Clean up the SockInfo structure */
remsock(SockInfo * f)197 static void remsock(SockInfo *f)
198 {
199 if(!f) {
200 return;
201 }
202 if(f->ev) {
203 g_source_remove(f->ev);
204 }
205 g_free(f);
206 }
207
208 /* Assign information to a SockInfo structure */
setsock(SockInfo * f,curl_socket_t s,CURL * e,int act,GlobalInfo * g)209 static void setsock(SockInfo*f, curl_socket_t s, CURL*e, int act, GlobalInfo*g)
210 {
211 GIOCondition kind =
212 (act&CURL_POLL_IN?G_IO_IN:0)|(act&CURL_POLL_OUT?G_IO_OUT:0);
213
214 f->sockfd = s;
215 f->action = act;
216 f->easy = e;
217 if(f->ev) {
218 g_source_remove(f->ev);
219 }
220 f->ev=g_io_add_watch(f->ch, kind, event_cb, g);
221 }
222
223 /* Initialize a new SockInfo structure */
addsock(curl_socket_t s,CURL * easy,int action,GlobalInfo * g)224 static void addsock(curl_socket_t s, CURL *easy, int action, GlobalInfo *g)
225 {
226 SockInfo *fdp = g_malloc0(sizeof(SockInfo));
227
228 fdp->global = g;
229 fdp->ch=g_io_channel_unix_new(s);
230 setsock(fdp, s, easy, action, g);
231 curl_multi_assign(g->multi, s, fdp);
232 }
233
234 /* CURLMOPT_SOCKETFUNCTION */
sock_cb(CURL * e,curl_socket_t s,int what,void * cbp,void * sockp)235 static int sock_cb(CURL *e, curl_socket_t s, int what, void *cbp, void *sockp)
236 {
237 GlobalInfo *g = (GlobalInfo*) cbp;
238 SockInfo *fdp = (SockInfo*) sockp;
239 static const char *whatstr[]={ "none", "IN", "OUT", "INOUT", "REMOVE" };
240
241 MSG_OUT("socket callback: s=%d e=%p what=%s ", s, e, whatstr[what]);
242 if(what == CURL_POLL_REMOVE) {
243 MSG_OUT("\n");
244 remsock(fdp);
245 }
246 else {
247 if(!fdp) {
248 MSG_OUT("Adding data: %s%s\n",
249 what&CURL_POLL_IN?"READ":"",
250 what&CURL_POLL_OUT?"WRITE":"");
251 addsock(s, e, what, g);
252 }
253 else {
254 MSG_OUT(
255 "Changing action from %d to %d\n", fdp->action, what);
256 setsock(fdp, s, e, what, g);
257 }
258 }
259 return 0;
260 }
261
262 /* CURLOPT_WRITEFUNCTION */
write_cb(void * ptr,size_t size,size_t nmemb,void * data)263 static size_t write_cb(void *ptr, size_t size, size_t nmemb, void *data)
264 {
265 size_t realsize = size * nmemb;
266 ConnInfo *conn = (ConnInfo*) data;
267 (void)ptr;
268 (void)conn;
269 return realsize;
270 }
271
272 /* CURLOPT_PROGRESSFUNCTION */
prog_cb(void * p,double dltotal,double dlnow,double ult,double uln)273 static int prog_cb (void *p, double dltotal, double dlnow, double ult,
274 double uln)
275 {
276 ConnInfo *conn = (ConnInfo *)p;
277 MSG_OUT("Progress: %s (%g/%g)\n", conn->url, dlnow, dltotal);
278 return 0;
279 }
280
281 /* Create a new easy handle, and add it to the global curl_multi */
new_conn(char * url,GlobalInfo * g)282 static void new_conn(char *url, GlobalInfo *g)
283 {
284 ConnInfo *conn;
285 CURLMcode rc;
286
287 conn = g_malloc0(sizeof(ConnInfo));
288 conn->error[0]='\0';
289 conn->easy = curl_easy_init();
290 if(!conn->easy) {
291 MSG_OUT("curl_easy_init() failed, exiting!\n");
292 exit(2);
293 }
294 conn->global = g;
295 conn->url = g_strdup(url);
296 curl_easy_setopt(conn->easy, CURLOPT_URL, conn->url);
297 curl_easy_setopt(conn->easy, CURLOPT_WRITEFUNCTION, write_cb);
298 curl_easy_setopt(conn->easy, CURLOPT_WRITEDATA, &conn);
299 curl_easy_setopt(conn->easy, CURLOPT_VERBOSE, (long)SHOW_VERBOSE);
300 curl_easy_setopt(conn->easy, CURLOPT_ERRORBUFFER, conn->error);
301 curl_easy_setopt(conn->easy, CURLOPT_PRIVATE, conn);
302 curl_easy_setopt(conn->easy, CURLOPT_NOPROGRESS, SHOW_PROGRESS?0L:1L);
303 curl_easy_setopt(conn->easy, CURLOPT_PROGRESSFUNCTION, prog_cb);
304 curl_easy_setopt(conn->easy, CURLOPT_PROGRESSDATA, conn);
305 curl_easy_setopt(conn->easy, CURLOPT_FOLLOWLOCATION, 1L);
306 curl_easy_setopt(conn->easy, CURLOPT_CONNECTTIMEOUT, 30L);
307 curl_easy_setopt(conn->easy, CURLOPT_LOW_SPEED_LIMIT, 1L);
308 curl_easy_setopt(conn->easy, CURLOPT_LOW_SPEED_TIME, 30L);
309
310 MSG_OUT("Adding easy %p to multi %p (%s)\n", conn->easy, g->multi, url);
311 rc =curl_multi_add_handle(g->multi, conn->easy);
312 mcode_or_die("new_conn: curl_multi_add_handle", rc);
313
314 /* note that the add_handle() will set a time-out to trigger very soon so
315 that the necessary socket_action() call will be called by this app */
316 }
317
318 /* This gets called by glib whenever data is received from the fifo */
fifo_cb(GIOChannel * ch,GIOCondition condition,gpointer data)319 static gboolean fifo_cb (GIOChannel *ch, GIOCondition condition, gpointer data)
320 {
321 #define BUF_SIZE 1024
322 gsize len, tp;
323 gchar *buf, *tmp, *all=NULL;
324 GIOStatus rv;
325
326 do {
327 GError *err=NULL;
328 rv = g_io_channel_read_line(ch, &buf, &len, &tp, &err);
329 if(buf) {
330 if(tp) {
331 buf[tp]='\0';
332 }
333 new_conn(buf, (GlobalInfo*)data);
334 g_free(buf);
335 }
336 else {
337 buf = g_malloc(BUF_SIZE+1);
338 while(TRUE) {
339 buf[BUF_SIZE]='\0';
340 g_io_channel_read_chars(ch, buf, BUF_SIZE, &len, &err);
341 if(len) {
342 buf[len]='\0';
343 if(all) {
344 tmp=all;
345 all=g_strdup_printf("%s%s", tmp, buf);
346 g_free(tmp);
347 }
348 else {
349 all = g_strdup(buf);
350 }
351 }
352 else {
353 break;
354 }
355 }
356 if(all) {
357 new_conn(all, (GlobalInfo*)data);
358 g_free(all);
359 }
360 g_free(buf);
361 }
362 if(err) {
363 g_error("fifo_cb: %s", err->message);
364 g_free(err);
365 break;
366 }
367 } while((len) && (rv == G_IO_STATUS_NORMAL));
368 return TRUE;
369 }
370
init_fifo(void)371 int init_fifo(void)
372 {
373 struct stat st;
374 const char *fifo = "hiper.fifo";
375 int socket;
376
377 if(lstat (fifo, &st) == 0) {
378 if((st.st_mode & S_IFMT) == S_IFREG) {
379 errno = EEXIST;
380 perror("lstat");
381 exit (1);
382 }
383 }
384
385 unlink (fifo);
386 if(mkfifo (fifo, 0600) == -1) {
387 perror("mkfifo");
388 exit (1);
389 }
390
391 socket = open (fifo, O_RDWR | O_NONBLOCK, 0);
392
393 if(socket == -1) {
394 perror("open");
395 exit (1);
396 }
397 MSG_OUT("Now, pipe some URL's into > %s\n", fifo);
398
399 return socket;
400 }
401
main(int argc,char ** argv)402 int main(int argc, char **argv)
403 {
404 GlobalInfo *g;
405 CURLMcode rc;
406 GMainLoop*gmain;
407 int fd;
408 GIOChannel* ch;
409 g=g_malloc0(sizeof(GlobalInfo));
410
411 fd=init_fifo();
412 ch=g_io_channel_unix_new(fd);
413 g_io_add_watch(ch, G_IO_IN, fifo_cb, g);
414 gmain=g_main_loop_new(NULL, FALSE);
415 g->multi = curl_multi_init();
416 curl_multi_setopt(g->multi, CURLMOPT_SOCKETFUNCTION, sock_cb);
417 curl_multi_setopt(g->multi, CURLMOPT_SOCKETDATA, g);
418 curl_multi_setopt(g->multi, CURLMOPT_TIMERFUNCTION, update_timeout_cb);
419 curl_multi_setopt(g->multi, CURLMOPT_TIMERDATA, g);
420
421 /* we don't call any curl_multi_socket*() function yet as we have no handles
422 added! */
423
424 g_main_loop_run(gmain);
425 curl_multi_cleanup(g->multi);
426 return 0;
427 }
428