Add the source files for the swift library.
[swifty.git] / src / libswift / httpgw.cpp
1 /*
2  *  httpgw.cpp
3  *  gateway for serving swift content via HTTP, libevent2 based.
4  *
5  *  Created by Victor Grishchenko, Arno Bakker
6  *  Copyright 2010-2012 TECHNISCHE UNIVERSITEIT DELFT. All rights reserved.
7  *
8  */
9 #include "swift.h"
10 #include <event2/http.h>
11 #include <event2/bufferevent.h>
12
13 using namespace swift;
14
15 #define HTTPGW_PROGRESS_STEP_BYTES              (256*1024)
16 // For best performance make bigger than HTTPGW_PROGRESS_STEP_BYTES
17 #define HTTPGW_MAX_WRITE_BYTES                  (512*1024)
18
19 // Report swift download progress every 2^layer * chunksize bytes (so 0 = report every chunk)
20 #define HTTPGW_FIRST_PROGRESS_BYTE_INTERVAL_AS_LAYER    0
21
22 // Arno: libevent2 has a liberal understanding of socket writability,
23 // that may result in tens of megabytes being cached in memory. Limit that
24 // amount at app level.
25 #define HTTPGW_MAX_PREBUF_BYTES                 (2*1024*1024)
26
27 #define HTTPGW_MAX_REQUEST 128
28
29 struct http_gw_t {
30     int      id;
31     uint64_t offset;
32     uint64_t tosend;
33     int      transfer;
34     uint64_t lastcpoffset; // last offset at which we checkpointed
35     struct evhttp_request *sinkevreq;
36     struct event                  *sinkevwrite;
37     char*    xcontentdur;
38     bool   closing;
39
40 } http_requests[HTTPGW_MAX_REQUEST];
41
42
43 int http_gw_reqs_open = 0;
44 int http_gw_reqs_count = 0;
45 struct evhttp *http_gw_event;
46 struct evhttp_bound_socket *http_gw_handle;
47 uint32_t httpgw_chunk_size = SWIFT_DEFAULT_CHUNK_SIZE; // Copy of cmdline param
48 double *httpgw_maxspeed = NULL;                                          // Copy of cmdline param
49
50 // Arno, 2010-11-30: for SwarmPlayer 3000 backend autoquit when no HTTP req is received
51 bool sawhttpconn = false;
52
53
54 http_gw_t *HttpGwFindRequestByEV(struct evhttp_request *evreq) {
55         for (int httpc=0; httpc<http_gw_reqs_open; httpc++) {
56                 if (http_requests[httpc].sinkevreq==evreq)
57                         return &http_requests[httpc];
58     }
59         return NULL;
60 }
61
62 http_gw_t *HttpGwFindRequestByTransfer(int transfer) {
63         for (int httpc=0; httpc<http_gw_reqs_open; httpc++) {
64                 if (http_requests[httpc].transfer==transfer) {
65                         return &http_requests[httpc];
66                 }
67         }
68         return NULL;
69 }
70
71 void HttpGwCloseConnection (http_gw_t* req) {
72         dprintf("%s @%i cleanup http request evreq %p\n",tintstr(),req->id, req->sinkevreq);
73
74         struct evhttp_connection *evconn = evhttp_request_get_connection(req->sinkevreq);
75
76         dprintf("%s @%i cleanup: before send reply\n",tintstr(),req->id);
77
78         req->closing = true;
79         if (req->offset > 0)
80                 evhttp_send_reply_end(req->sinkevreq); //WARNING: calls HttpGwLibeventCloseCallback
81         else
82                 evhttp_request_free(req->sinkevreq);
83
84         dprintf("%s @%i cleanup: reset evreq\n",tintstr(),req->id);
85         req->sinkevreq = NULL;
86         dprintf("%s @%i cleanup: after reset evreq\n",tintstr(),req->id);
87
88         // Note: for some reason calling conn_free here prevents the last chunks
89         // to be sent to the requester?
90         //      evhttp_connection_free(evconn); // WARNING: calls HttpGwLibeventCloseCallback
91
92         // Current close policy: checkpoint and DO NOT close transfer, keep on
93         // seeding forever. More sophisticated clients should use CMD GW and issue
94         // REMOVE.
95         swift::Checkpoint(req->transfer);
96
97         //swift::Close(req->transfer);
98
99         *req = http_requests[--http_gw_reqs_open];
100 }
101
102
103 void HttpGwLibeventCloseCallback(struct evhttp_connection *evconn, void *evreqvoid) {
104         // Called by libevent on connection close, either when the other side closes
105         // or when we close (because we call evhttp_connection_free()). To prevent
106         // doing cleanup twice, we see if there is a http_gw_req that has the
107         // passed evreqvoid as sinkevreq. If so, clean up, if not, ignore.
108         // I.e. evhttp_request * is used as sort of request ID
109         //
110         fprintf(stderr,"HttpGwLibeventCloseCallback: called\n");
111         http_gw_t * req = HttpGwFindRequestByEV((struct evhttp_request *)evreqvoid);
112         if (req == NULL)
113                 dprintf("%s http conn already closed\n",tintstr() );
114         else {
115                 dprintf("%s T%i http close conn\n",tintstr(),req->transfer);
116                 if (req->closing)
117                         dprintf("%s http conn already closing\n",tintstr() );
118                 else
119                         HttpGwCloseConnection(req);
120         }
121 }
122
123
124
125
126 void HttpGwMayWriteCallback (int transfer) {
127         // Write some data to client
128
129         http_gw_t* req = HttpGwFindRequestByTransfer(transfer);
130         if (req == NULL) {
131         print_error("httpgw: MayWrite: can't find req for transfer");
132         return;
133     }
134
135     uint64_t complete = swift::SeqComplete(req->transfer);
136
137     //dprintf("%s @%i http write complete %lli offset %lli\n",tintstr(),req->id, complete, req->offset);
138     //fprintf(stderr,"offset %lli seqcomp %lli comp %lli\n",req->offset, complete, swift::Complete(req->transfer) );
139
140         struct evhttp_connection *evconn = evhttp_request_get_connection(req->sinkevreq);
141         struct bufferevent* buffy = evhttp_connection_get_bufferevent(evconn);
142         struct evbuffer *outbuf = bufferevent_get_output(buffy);
143
144         //fprintf(stderr,"httpgw: MayWrite avail %i bufev outbuf %i\n",complete-req->offset, evbuffer_get_length(outbuf) );
145
146     if (complete > req->offset && evbuffer_get_length(outbuf) < HTTPGW_MAX_PREBUF_BYTES)
147     {
148         // Received more than I pushed to player, send data
149         char buf[HTTPGW_MAX_WRITE_BYTES];
150 // Arno, 2010-08-16, TODO
151 #ifdef WIN32
152         uint64_t tosend = min(HTTPGW_MAX_WRITE_BYTES,complete-req->offset);
153 #else
154         uint64_t tosend = std::min((uint64_t)HTTPGW_MAX_WRITE_BYTES,complete-req->offset);
155 #endif
156         size_t rd = pread(req->transfer,buf,tosend,req->offset); // hope it is cached
157         if (rd<0) {
158                 print_error("httpgw: MayWrite: error pread");
159             HttpGwCloseConnection(req);
160             return;
161         }
162
163         // Construct evbuffer and send incrementally
164         struct evbuffer *evb = evbuffer_new();
165         int ret = evbuffer_add(evb,buf,rd);
166         if (ret < 0) {
167                 print_error("httpgw: MayWrite: error evbuffer_add");
168                 evbuffer_free(evb);
169             HttpGwCloseConnection(req);
170             return;
171         }
172
173         if (req->offset == 0) {
174                 // Not just for chunked encoding, see libevent2's http.c
175                 evhttp_send_reply_start(req->sinkevreq, 200, "OK");
176         }
177
178         evhttp_send_reply_chunk(req->sinkevreq, evb);
179         evbuffer_free(evb);
180
181         int wn = rd;
182         dprintf("%s @%i http sent data %ib\n",tintstr(),req->id,(int)wn);
183
184         req->offset += wn;
185         req->tosend -= wn;
186
187         // PPPLUG
188         FileTransfer *ft = FileTransfer::file(transfer);
189         if (ft == NULL)
190                 return;
191         ft->picker().updatePlaybackPos( wn/ft->file().chunk_size() );
192     }
193
194     // Arno, 2010-11-30: tosend is set to fuzzy len, so need extra/other test.
195     if (req->tosend==0 || req->offset ==  swift::Size(req->transfer)) {
196         // done; wait for new HTTP request
197         dprintf("%s @%i done\n",tintstr(),req->id);
198         //fprintf(stderr,"httpgw: MayWrite: done, wait for buffer empty before send_end_reply\n" );
199
200         if (evbuffer_get_length(outbuf) == 0) {
201                 //fprintf(stderr,"httpgw: MayWrite: done, buffer empty, end req\n" );
202                 HttpGwCloseConnection(req);
203         }
204     }
205     else {
206         // wait for data
207         dprintf("%s @%i waiting for data\n",tintstr(),req->id);
208     }
209 }
210
211 void HttpGwLibeventMayWriteCallback(evutil_socket_t fd, short events, void *evreqvoid );
212
213 void HttpGwSubscribeToWrite(http_gw_t * req) {
214         struct evhttp_connection *evconn = evhttp_request_get_connection(req->sinkevreq);
215         struct event_base *evbase =     evhttp_connection_get_base(evconn);
216         struct bufferevent* evbufev = evhttp_connection_get_bufferevent(evconn);
217
218         if (req->sinkevwrite != NULL)
219                 event_free(req->sinkevwrite); // FAXME: clean in CloseConn
220
221         req->sinkevwrite = event_new(evbase,bufferevent_getfd(evbufev),EV_WRITE,HttpGwLibeventMayWriteCallback,req->sinkevreq);
222         struct timeval t;
223         t.tv_sec = 10;
224         int ret = event_add(req->sinkevwrite,&t);
225         //fprintf(stderr,"httpgw: HttpGwSubscribeToWrite: added event\n");
226 }
227
228
229 void HttpGwLibeventMayWriteCallback(evutil_socket_t fd, short events, void *evreqvoid )
230 {
231         //fprintf(stderr,"httpgw: MayWrite: %d events %d evreq is %p\n", fd, events, evreqvoid);
232
233         http_gw_t * req = HttpGwFindRequestByEV((struct evhttp_request *)evreqvoid);
234         if (req != NULL) {
235                 //fprintf(stderr,"httpgw: MayWrite: %d events %d httpreq is %p\n", fd, events, req);
236                 HttpGwMayWriteCallback(req->transfer);
237
238
239                 // Arno, 2011-12-20: No autoreschedule, let HttpGwSwiftProgressCallback do that
240                 //if (req->sinkevreq != NULL) // Conn closed
241                 //      HttpGwSubscribeToWrite(req);
242
243                 //fprintf(stderr,"GOTO WRITE %lli >= %lli\n", swift::Complete(req->transfer)+HTTPGW_MAX_WRITE_BYTES, swift::Size(req->transfer) );
244
245                 if (swift::Complete(req->transfer)+HTTPGW_MAX_WRITE_BYTES >= swift::Size(req->transfer)) {
246
247                 // We don't get progress callback for last chunk < chunk size, nor
248                 // when all data is already on disk. In that case, just keep on
249                         // subscribing to HTTP socket writability until all data is sent.
250                         if (req->sinkevreq != NULL) // Conn closed
251                                 HttpGwSubscribeToWrite(req);
252                 }
253         }
254 }
255
256 void HttpGwSwiftProgressCallback (int transfer, bin_t bin) {
257         // Subsequent HTTPGW_PROGRESS_STEP_BYTES available
258
259         dprintf("%s T%i http more progress\n",tintstr(),transfer);
260         http_gw_t* req = HttpGwFindRequestByTransfer(transfer);
261         if (req == NULL)
262                 return;
263
264         // Arno, 2011-12-20: We have new data to send, wait for HTTP socket writability
265         if (req->sinkevreq != NULL) { // Conn closed
266                 HttpGwSubscribeToWrite(req);
267         }
268 }
269
270
271 void HttpGwFirstProgressCallback (int transfer, bin_t bin) {
272         // First chunk of data available
273         dprintf("%s T%i http first progress\n",tintstr(),transfer);
274
275         if (!bin.contains(bin_t(0,0))) // need the first chunk
276         return;
277
278         swift::RemoveProgressCallback(transfer,&HttpGwFirstProgressCallback);
279     int progresslayer = bytes2layer(HTTPGW_PROGRESS_STEP_BYTES,swift::ChunkSize(transfer));
280     swift::AddProgressCallback(transfer,&HttpGwSwiftProgressCallback,progresslayer);
281
282         http_gw_t* req = HttpGwFindRequestByTransfer(transfer);
283         if (req == NULL)
284                 return;
285         if (req->tosend==0) { // FIXME states
286                 uint64_t filesize = swift::Size(transfer);
287                 char filesizestr[256];
288                 sprintf(filesizestr,"%lli",filesize);
289
290                 struct evkeyvalq *headers = evhttp_request_get_output_headers(req->sinkevreq);
291                 //evhttp_add_header(headers, "Connection", "keep-alive" );
292                 evhttp_add_header(headers, "Connection", "close" );
293                 evhttp_add_header(headers, "Content-Type", "video/ogg" );
294                 evhttp_add_header(headers, "X-Content-Duration",req->xcontentdur );
295                 evhttp_add_header(headers, "Content-Length", filesizestr );
296                 evhttp_add_header(headers, "Accept-Ranges", "none" );
297
298                 req->tosend = filesize;
299                 dprintf("%s @%i headers_sent size %lli\n",tintstr(),req->id,filesize);
300
301                 /*
302                  * Arno, 2011-10-17: Swift ProgressCallbacks are only called when
303                  * the data is downloaded, not when it is already on disk. So we need
304                  * to handle the situation where all or part of the data is already
305                  * on disk. Subscribing to writability of the socket works,
306                  * but requires libevent2 >= 2.1 (or our backported version)
307                  */
308                 HttpGwSubscribeToWrite(req);
309     }
310 }
311
312
313 void HttpGwNewRequestCallback (struct evhttp_request *evreq, void *arg) {
314
315     dprintf("%s @%i http new request\n",tintstr(),http_gw_reqs_count+1);
316
317     if (evhttp_request_get_command(evreq) != EVHTTP_REQ_GET) {
318             return;
319     }
320         sawhttpconn = true;
321
322     // Parse URI
323     const char *uri = evhttp_request_get_uri(evreq);
324     //struct evkeyvalq *headers =       evhttp_request_get_input_headers(evreq);
325     //const char *contentrangestr =evhttp_find_header(headers,"Content-Range");
326
327     char *tokenuri = (char *)malloc(strlen(uri)+1);
328     strcpy(tokenuri,uri);
329     char * hashch=strtok(tokenuri,"/"), hash[41];
330     while (hashch && (1!=sscanf(hashch,"%40[0123456789abcdefABCDEF]",hash) || strlen(hash)!=40))
331         hashch = strtok(NULL,"/");
332     free(tokenuri);
333     if (strlen(hash)!=40) {
334         evhttp_send_error(evreq,400,"Path must be root hash in hex, 40 bytes.");
335         return;
336     }
337     char *xcontentdur = NULL;
338     if (strlen(uri) > 42) {
339         xcontentdur = (char *)malloc(strlen(uri)-42+1);
340         strcpy(xcontentdur,&uri[42]);
341     }
342     else
343         xcontentdur = (char *)"0";
344     dprintf("%s @%i demands %s %s\n",tintstr(),http_gw_reqs_open+1,hash,xcontentdur);
345
346     // initiate transmission
347     Sha1Hash root_hash = Sha1Hash(true,hash);
348     int transfer = swift::Find(root_hash);
349     if (transfer==-1) {
350         transfer = swift::Open(hash,root_hash,Address(),false,true,httpgw_chunk_size); // ARNOTODO: allow for chunk size to be set via URL?
351         dprintf("%s @%i trying to HTTP GET swarm %s that has not been STARTed\n",tintstr(),http_gw_reqs_open+1,hash);
352
353         // Arno, 2011-12-20: Only on new transfers, otherwise assume that CMD GW
354         // controls speed
355         FileTransfer *ft = FileTransfer::file(transfer);
356         ft->SetMaxSpeed(DDIR_DOWNLOAD,httpgw_maxspeed[DDIR_DOWNLOAD]);
357         ft->SetMaxSpeed(DDIR_UPLOAD,httpgw_maxspeed[DDIR_UPLOAD]);
358     }
359
360     // Record request
361     http_gw_t* req = http_requests + http_gw_reqs_open++;
362     req->id = ++http_gw_reqs_count;
363     req->sinkevreq = evreq;
364     req->xcontentdur = xcontentdur;
365     req->offset = 0;
366     req->tosend = 0;
367     req->transfer = transfer;
368     req->lastcpoffset = 0;
369     req->sinkevwrite = NULL;
370     req->closing = false;
371
372     fprintf(stderr,"httpgw: Opened %s\n",hash);
373
374     // We need delayed replying, so take ownership.
375     // See http://code.google.com/p/libevent-longpolling/source/browse/trunk/main.c
376         // Careful: libevent docs are broken. It doesn't say that evhttp_send_reply_send
377         // actually calls evhttp_request_free, i.e. releases ownership for you.
378         //
379     evhttp_request_own(evreq);
380
381     // Register callback for connection close
382     struct evhttp_connection *evconn = evhttp_request_get_connection(req->sinkevreq);
383     evhttp_connection_set_closecb(evconn,HttpGwLibeventCloseCallback,req->sinkevreq);
384
385     if (swift::Size(transfer)) {
386         HttpGwFirstProgressCallback(transfer,bin_t(0,0));
387     } else {
388         swift::AddProgressCallback(transfer,&HttpGwFirstProgressCallback,HTTPGW_FIRST_PROGRESS_BYTE_INTERVAL_AS_LAYER);
389     }
390 }
391
392
393 bool InstallHTTPGateway (struct event_base *evbase,Address bindaddr, uint32_t chunk_size, double *maxspeed) {
394         // Arno, 2011-10-04: From libevent's http-server.c example
395
396         /* Create a new evhttp object to handle requests. */
397         http_gw_event = evhttp_new(evbase);
398         if (!http_gw_event) {
399                 print_error("httpgw: evhttp_new failed");
400                 return false;
401         }
402
403         /* Install callback for all requests */
404         evhttp_set_gencb(http_gw_event, HttpGwNewRequestCallback, NULL);
405
406         /* Now we tell the evhttp what port to listen on */
407         http_gw_handle = evhttp_bind_socket_with_handle(http_gw_event, bindaddr.ipv4str(), bindaddr.port());
408         if (!http_gw_handle) {
409                 print_error("httpgw: evhttp_bind_socket_with_handle failed");
410                 return false;
411         }
412
413         httpgw_chunk_size = chunk_size;
414         httpgw_maxspeed = maxspeed;
415         return true;
416 }
417
418
419 uint64_t lastoffset=0;
420 uint64_t lastcomplete=0;
421 tint test_time = 0;
422
423 /** For SwarmPlayer 3000's HTTP failover. We should exit if swift isn't
424  * delivering such that the extension can start talking HTTP to the backup.
425  */
426 bool HTTPIsSending()
427 {
428         if (http_gw_reqs_open > 0)
429         {
430                 FileTransfer *ft = FileTransfer::file(http_requests[http_gw_reqs_open-1].transfer);
431                 if (ft != NULL) {
432                         fprintf(stderr,"httpgw: upload %lf\n",ft->GetCurrentSpeed(DDIR_UPLOAD)/1024.0);
433                         fprintf(stderr,"httpgw: dwload %lf\n",ft->GetCurrentSpeed(DDIR_DOWNLOAD)/1024.0);
434                         fprintf(stderr,"httpgw: seqcmp %llu\n", swift::SeqComplete(http_requests[http_gw_reqs_open-1].transfer));
435                 }
436         }
437     return true;
438
439     // TODO: reactivate when used in SwiftTransport / SwarmPlayer 3000.
440
441         if (test_time == 0)
442         {
443                 test_time = NOW;
444                 return true;
445         }
446
447         if (NOW > test_time+5*1000*1000)
448         {
449                 fprintf(stderr,"http alive: httpc count is %d\n", http_gw_reqs_open );
450
451                 if (http_gw_reqs_open == 0 && !sawhttpconn)
452                 {
453                         fprintf(stderr,"http alive: no HTTP activity ever, quiting\n");
454                         return false;
455                 }
456                 else
457                         sawhttpconn = true;
458
459             for (int httpc=0; httpc<http_gw_reqs_open; httpc++)
460             {
461
462                 /*
463                 if (http_requests[httpc].offset >= 100000)
464                 {
465                         fprintf(stderr,"http alive: 100K sent, quit\n");
466                                 return false;
467                 }
468                 else
469                 {
470                         fprintf(stderr,"http alive: sent %lli\n", http_requests[httpc].offset );
471                         return true;
472                 }
473                 */
474
475                         // If
476                         // a. don't know anything about content (i.e., size still 0) or
477                         // b. not sending to HTTP client and not at end, and
478                         //    not downloading from P2P and not at end
479                         // then stop.
480                         if ( swift::Size(http_requests[httpc].transfer) == 0 || \
481                                  (http_requests[httpc].offset == lastoffset &&
482                                  http_requests[httpc].offset != swift::Size(http_requests[httpc].transfer) && \
483                              swift::Complete(http_requests[httpc].transfer) == lastcomplete && \
484                              swift::Complete(http_requests[httpc].transfer) != swift::Size(http_requests[httpc].transfer)))
485                         {
486                                 fprintf(stderr,"http alive: no progress, quiting\n");
487                                 //getchar();
488                                 return false;
489                         }
490
491                         /*
492                         if (http_requests[httpc].offset == swift::Size(http_requests[httpc].transfer))
493                         {
494                                 // TODO: seed for a while.
495                                 fprintf(stderr,"http alive: data delivered to client, quiting\n");
496                                 return false;
497                         }
498                         */
499
500                         lastoffset = http_requests[httpc].offset;
501                         lastcomplete = swift::Complete(http_requests[httpc].transfer);
502             }
503                 test_time = NOW;
504
505             return true;
506         }
507         else
508                 return true;
509 }