3 * gateway for serving swift content via HTTP, libevent2 based.
5 * Created by Victor Grishchenko, Arno Bakker
6 * Copyright 2010-2012 TECHNISCHE UNIVERSITEIT DELFT. All rights reserved.
10 #include <event2/http.h>
11 #include <event2/bufferevent.h>
13 using namespace swift;
15 #define HTTPGW_PROGRESS_STEP_BYTES (256*1024)
16 // For best performance make bigger than HTTPGW_PROGRESS_STEP_BYTES
17 #define HTTPGW_MAX_WRITE_BYTES (512*1024)
19 // Report swift download progress every 2^layer * chunksize bytes (so 0 = report every chunk)
20 #define HTTPGW_FIRST_PROGRESS_BYTE_INTERVAL_AS_LAYER 0
22 // Arno: libevent2 has a liberal understanding of socket writability,
23 // that may result in tens of megabytes being cached in memory. Limit that
24 // amount at app level.
25 #define HTTPGW_MAX_PREBUF_BYTES (2*1024*1024)
27 #define HTTPGW_MAX_REQUEST 128
34 uint64_t lastcpoffset; // last offset at which we checkpointed
35 struct evhttp_request *sinkevreq;
36 struct event *sinkevwrite;
40 } http_requests[HTTPGW_MAX_REQUEST];
43 int http_gw_reqs_open = 0;
44 int http_gw_reqs_count = 0;
45 struct evhttp *http_gw_event;
46 struct evhttp_bound_socket *http_gw_handle;
47 uint32_t httpgw_chunk_size = SWIFT_DEFAULT_CHUNK_SIZE; // Copy of cmdline param
48 double *httpgw_maxspeed = NULL; // Copy of cmdline param
50 // Arno, 2010-11-30: for SwarmPlayer 3000 backend autoquit when no HTTP req is received
51 bool sawhttpconn = false;
54 http_gw_t *HttpGwFindRequestByEV(struct evhttp_request *evreq) {
55 for (int httpc=0; httpc<http_gw_reqs_open; httpc++) {
56 if (http_requests[httpc].sinkevreq==evreq)
57 return &http_requests[httpc];
62 http_gw_t *HttpGwFindRequestByTransfer(int transfer) {
63 for (int httpc=0; httpc<http_gw_reqs_open; httpc++) {
64 if (http_requests[httpc].transfer==transfer) {
65 return &http_requests[httpc];
71 void HttpGwCloseConnection (http_gw_t* req) {
72 dprintf("%s @%i cleanup http request evreq %p\n",tintstr(),req->id, req->sinkevreq);
74 struct evhttp_connection *evconn = evhttp_request_get_connection(req->sinkevreq);
76 dprintf("%s @%i cleanup: before send reply\n",tintstr(),req->id);
80 evhttp_send_reply_end(req->sinkevreq); //WARNING: calls HttpGwLibeventCloseCallback
82 evhttp_request_free(req->sinkevreq);
84 dprintf("%s @%i cleanup: reset evreq\n",tintstr(),req->id);
85 req->sinkevreq = NULL;
86 dprintf("%s @%i cleanup: after reset evreq\n",tintstr(),req->id);
88 // Note: for some reason calling conn_free here prevents the last chunks
89 // to be sent to the requester?
90 // evhttp_connection_free(evconn); // WARNING: calls HttpGwLibeventCloseCallback
92 // Current close policy: checkpoint and DO NOT close transfer, keep on
93 // seeding forever. More sophisticated clients should use CMD GW and issue
95 swift::Checkpoint(req->transfer);
97 //swift::Close(req->transfer);
99 *req = http_requests[--http_gw_reqs_open];
103 void HttpGwLibeventCloseCallback(struct evhttp_connection *evconn, void *evreqvoid) {
104 // Called by libevent on connection close, either when the other side closes
105 // or when we close (because we call evhttp_connection_free()). To prevent
106 // doing cleanup twice, we see if there is a http_gw_req that has the
107 // passed evreqvoid as sinkevreq. If so, clean up, if not, ignore.
108 // I.e. evhttp_request * is used as sort of request ID
110 fprintf(stderr,"HttpGwLibeventCloseCallback: called\n");
111 http_gw_t * req = HttpGwFindRequestByEV((struct evhttp_request *)evreqvoid);
113 dprintf("%s http conn already closed\n",tintstr() );
115 dprintf("%s T%i http close conn\n",tintstr(),req->transfer);
117 dprintf("%s http conn already closing\n",tintstr() );
119 HttpGwCloseConnection(req);
126 void HttpGwMayWriteCallback (int transfer) {
127 // Write some data to client
129 http_gw_t* req = HttpGwFindRequestByTransfer(transfer);
131 print_error("httpgw: MayWrite: can't find req for transfer");
135 uint64_t complete = swift::SeqComplete(req->transfer);
137 //dprintf("%s @%i http write complete %lli offset %lli\n",tintstr(),req->id, complete, req->offset);
138 //fprintf(stderr,"offset %lli seqcomp %lli comp %lli\n",req->offset, complete, swift::Complete(req->transfer) );
140 struct evhttp_connection *evconn = evhttp_request_get_connection(req->sinkevreq);
141 struct bufferevent* buffy = evhttp_connection_get_bufferevent(evconn);
142 struct evbuffer *outbuf = bufferevent_get_output(buffy);
144 //fprintf(stderr,"httpgw: MayWrite avail %i bufev outbuf %i\n",complete-req->offset, evbuffer_get_length(outbuf) );
146 if (complete > req->offset && evbuffer_get_length(outbuf) < HTTPGW_MAX_PREBUF_BYTES)
148 // Received more than I pushed to player, send data
149 char buf[HTTPGW_MAX_WRITE_BYTES];
150 // Arno, 2010-08-16, TODO
152 uint64_t tosend = min(HTTPGW_MAX_WRITE_BYTES,complete-req->offset);
154 uint64_t tosend = std::min((uint64_t)HTTPGW_MAX_WRITE_BYTES,complete-req->offset);
156 size_t rd = pread(req->transfer,buf,tosend,req->offset); // hope it is cached
158 print_error("httpgw: MayWrite: error pread");
159 HttpGwCloseConnection(req);
163 // Construct evbuffer and send incrementally
164 struct evbuffer *evb = evbuffer_new();
165 int ret = evbuffer_add(evb,buf,rd);
167 print_error("httpgw: MayWrite: error evbuffer_add");
169 HttpGwCloseConnection(req);
173 if (req->offset == 0) {
174 // Not just for chunked encoding, see libevent2's http.c
175 evhttp_send_reply_start(req->sinkevreq, 200, "OK");
178 evhttp_send_reply_chunk(req->sinkevreq, evb);
182 dprintf("%s @%i http sent data %ib\n",tintstr(),req->id,(int)wn);
188 FileTransfer *ft = FileTransfer::file(transfer);
191 ft->picker().updatePlaybackPos( wn/ft->file().chunk_size() );
194 // Arno, 2010-11-30: tosend is set to fuzzy len, so need extra/other test.
195 if (req->tosend==0 || req->offset == swift::Size(req->transfer)) {
196 // done; wait for new HTTP request
197 dprintf("%s @%i done\n",tintstr(),req->id);
198 //fprintf(stderr,"httpgw: MayWrite: done, wait for buffer empty before send_end_reply\n" );
200 if (evbuffer_get_length(outbuf) == 0) {
201 //fprintf(stderr,"httpgw: MayWrite: done, buffer empty, end req\n" );
202 HttpGwCloseConnection(req);
207 dprintf("%s @%i waiting for data\n",tintstr(),req->id);
211 void HttpGwLibeventMayWriteCallback(evutil_socket_t fd, short events, void *evreqvoid );
213 void HttpGwSubscribeToWrite(http_gw_t * req) {
214 struct evhttp_connection *evconn = evhttp_request_get_connection(req->sinkevreq);
215 struct event_base *evbase = evhttp_connection_get_base(evconn);
216 struct bufferevent* evbufev = evhttp_connection_get_bufferevent(evconn);
218 if (req->sinkevwrite != NULL)
219 event_free(req->sinkevwrite); // FAXME: clean in CloseConn
221 req->sinkevwrite = event_new(evbase,bufferevent_getfd(evbufev),EV_WRITE,HttpGwLibeventMayWriteCallback,req->sinkevreq);
224 int ret = event_add(req->sinkevwrite,&t);
225 //fprintf(stderr,"httpgw: HttpGwSubscribeToWrite: added event\n");
229 void HttpGwLibeventMayWriteCallback(evutil_socket_t fd, short events, void *evreqvoid )
231 //fprintf(stderr,"httpgw: MayWrite: %d events %d evreq is %p\n", fd, events, evreqvoid);
233 http_gw_t * req = HttpGwFindRequestByEV((struct evhttp_request *)evreqvoid);
235 //fprintf(stderr,"httpgw: MayWrite: %d events %d httpreq is %p\n", fd, events, req);
236 HttpGwMayWriteCallback(req->transfer);
239 // Arno, 2011-12-20: No autoreschedule, let HttpGwSwiftProgressCallback do that
240 //if (req->sinkevreq != NULL) // Conn closed
241 // HttpGwSubscribeToWrite(req);
243 //fprintf(stderr,"GOTO WRITE %lli >= %lli\n", swift::Complete(req->transfer)+HTTPGW_MAX_WRITE_BYTES, swift::Size(req->transfer) );
245 if (swift::Complete(req->transfer)+HTTPGW_MAX_WRITE_BYTES >= swift::Size(req->transfer)) {
247 // We don't get progress callback for last chunk < chunk size, nor
248 // when all data is already on disk. In that case, just keep on
249 // subscribing to HTTP socket writability until all data is sent.
250 if (req->sinkevreq != NULL) // Conn closed
251 HttpGwSubscribeToWrite(req);
256 void HttpGwSwiftProgressCallback (int transfer, bin_t bin) {
257 // Subsequent HTTPGW_PROGRESS_STEP_BYTES available
259 dprintf("%s T%i http more progress\n",tintstr(),transfer);
260 http_gw_t* req = HttpGwFindRequestByTransfer(transfer);
264 // Arno, 2011-12-20: We have new data to send, wait for HTTP socket writability
265 if (req->sinkevreq != NULL) { // Conn closed
266 HttpGwSubscribeToWrite(req);
271 void HttpGwFirstProgressCallback (int transfer, bin_t bin) {
272 // First chunk of data available
273 dprintf("%s T%i http first progress\n",tintstr(),transfer);
275 if (!bin.contains(bin_t(0,0))) // need the first chunk
278 swift::RemoveProgressCallback(transfer,&HttpGwFirstProgressCallback);
279 int progresslayer = bytes2layer(HTTPGW_PROGRESS_STEP_BYTES,swift::ChunkSize(transfer));
280 swift::AddProgressCallback(transfer,&HttpGwSwiftProgressCallback,progresslayer);
282 http_gw_t* req = HttpGwFindRequestByTransfer(transfer);
285 if (req->tosend==0) { // FIXME states
286 uint64_t filesize = swift::Size(transfer);
287 char filesizestr[256];
288 sprintf(filesizestr,"%lli",filesize);
290 struct evkeyvalq *headers = evhttp_request_get_output_headers(req->sinkevreq);
291 //evhttp_add_header(headers, "Connection", "keep-alive" );
292 evhttp_add_header(headers, "Connection", "close" );
293 evhttp_add_header(headers, "Content-Type", "video/ogg" );
294 evhttp_add_header(headers, "X-Content-Duration",req->xcontentdur );
295 evhttp_add_header(headers, "Content-Length", filesizestr );
296 evhttp_add_header(headers, "Accept-Ranges", "none" );
298 req->tosend = filesize;
299 dprintf("%s @%i headers_sent size %lli\n",tintstr(),req->id,filesize);
302 * Arno, 2011-10-17: Swift ProgressCallbacks are only called when
303 * the data is downloaded, not when it is already on disk. So we need
304 * to handle the situation where all or part of the data is already
305 * on disk. Subscribing to writability of the socket works,
306 * but requires libevent2 >= 2.1 (or our backported version)
308 HttpGwSubscribeToWrite(req);
313 void HttpGwNewRequestCallback (struct evhttp_request *evreq, void *arg) {
315 dprintf("%s @%i http new request\n",tintstr(),http_gw_reqs_count+1);
317 if (evhttp_request_get_command(evreq) != EVHTTP_REQ_GET) {
323 const char *uri = evhttp_request_get_uri(evreq);
324 //struct evkeyvalq *headers = evhttp_request_get_input_headers(evreq);
325 //const char *contentrangestr =evhttp_find_header(headers,"Content-Range");
327 char *tokenuri = (char *)malloc(strlen(uri)+1);
328 strcpy(tokenuri,uri);
329 char * hashch=strtok(tokenuri,"/"), hash[41];
330 while (hashch && (1!=sscanf(hashch,"%40[0123456789abcdefABCDEF]",hash) || strlen(hash)!=40))
331 hashch = strtok(NULL,"/");
333 if (strlen(hash)!=40) {
334 evhttp_send_error(evreq,400,"Path must be root hash in hex, 40 bytes.");
337 char *xcontentdur = NULL;
338 if (strlen(uri) > 42) {
339 xcontentdur = (char *)malloc(strlen(uri)-42+1);
340 strcpy(xcontentdur,&uri[42]);
343 xcontentdur = (char *)"0";
344 dprintf("%s @%i demands %s %s\n",tintstr(),http_gw_reqs_open+1,hash,xcontentdur);
346 // initiate transmission
347 Sha1Hash root_hash = Sha1Hash(true,hash);
348 int transfer = swift::Find(root_hash);
350 transfer = swift::Open(hash,root_hash,Address(),false,true,httpgw_chunk_size); // ARNOTODO: allow for chunk size to be set via URL?
351 dprintf("%s @%i trying to HTTP GET swarm %s that has not been STARTed\n",tintstr(),http_gw_reqs_open+1,hash);
353 // Arno, 2011-12-20: Only on new transfers, otherwise assume that CMD GW
355 FileTransfer *ft = FileTransfer::file(transfer);
356 ft->SetMaxSpeed(DDIR_DOWNLOAD,httpgw_maxspeed[DDIR_DOWNLOAD]);
357 ft->SetMaxSpeed(DDIR_UPLOAD,httpgw_maxspeed[DDIR_UPLOAD]);
361 http_gw_t* req = http_requests + http_gw_reqs_open++;
362 req->id = ++http_gw_reqs_count;
363 req->sinkevreq = evreq;
364 req->xcontentdur = xcontentdur;
367 req->transfer = transfer;
368 req->lastcpoffset = 0;
369 req->sinkevwrite = NULL;
370 req->closing = false;
372 fprintf(stderr,"httpgw: Opened %s\n",hash);
374 // We need delayed replying, so take ownership.
375 // See http://code.google.com/p/libevent-longpolling/source/browse/trunk/main.c
376 // Careful: libevent docs are broken. It doesn't say that evhttp_send_reply_send
377 // actually calls evhttp_request_free, i.e. releases ownership for you.
379 evhttp_request_own(evreq);
381 // Register callback for connection close
382 struct evhttp_connection *evconn = evhttp_request_get_connection(req->sinkevreq);
383 evhttp_connection_set_closecb(evconn,HttpGwLibeventCloseCallback,req->sinkevreq);
385 if (swift::Size(transfer)) {
386 HttpGwFirstProgressCallback(transfer,bin_t(0,0));
388 swift::AddProgressCallback(transfer,&HttpGwFirstProgressCallback,HTTPGW_FIRST_PROGRESS_BYTE_INTERVAL_AS_LAYER);
393 bool InstallHTTPGateway (struct event_base *evbase,Address bindaddr, uint32_t chunk_size, double *maxspeed) {
394 // Arno, 2011-10-04: From libevent's http-server.c example
396 /* Create a new evhttp object to handle requests. */
397 http_gw_event = evhttp_new(evbase);
398 if (!http_gw_event) {
399 print_error("httpgw: evhttp_new failed");
403 /* Install callback for all requests */
404 evhttp_set_gencb(http_gw_event, HttpGwNewRequestCallback, NULL);
406 /* Now we tell the evhttp what port to listen on */
407 http_gw_handle = evhttp_bind_socket_with_handle(http_gw_event, bindaddr.ipv4str(), bindaddr.port());
408 if (!http_gw_handle) {
409 print_error("httpgw: evhttp_bind_socket_with_handle failed");
413 httpgw_chunk_size = chunk_size;
414 httpgw_maxspeed = maxspeed;
419 uint64_t lastoffset=0;
420 uint64_t lastcomplete=0;
423 /** For SwarmPlayer 3000's HTTP failover. We should exit if swift isn't
424 * delivering such that the extension can start talking HTTP to the backup.
428 if (http_gw_reqs_open > 0)
430 FileTransfer *ft = FileTransfer::file(http_requests[http_gw_reqs_open-1].transfer);
432 fprintf(stderr,"httpgw: upload %lf\n",ft->GetCurrentSpeed(DDIR_UPLOAD)/1024.0);
433 fprintf(stderr,"httpgw: dwload %lf\n",ft->GetCurrentSpeed(DDIR_DOWNLOAD)/1024.0);
434 fprintf(stderr,"httpgw: seqcmp %llu\n", swift::SeqComplete(http_requests[http_gw_reqs_open-1].transfer));
439 // TODO: reactivate when used in SwiftTransport / SwarmPlayer 3000.
447 if (NOW > test_time+5*1000*1000)
449 fprintf(stderr,"http alive: httpc count is %d\n", http_gw_reqs_open );
451 if (http_gw_reqs_open == 0 && !sawhttpconn)
453 fprintf(stderr,"http alive: no HTTP activity ever, quiting\n");
459 for (int httpc=0; httpc<http_gw_reqs_open; httpc++)
463 if (http_requests[httpc].offset >= 100000)
465 fprintf(stderr,"http alive: 100K sent, quit\n");
470 fprintf(stderr,"http alive: sent %lli\n", http_requests[httpc].offset );
476 // a. don't know anything about content (i.e., size still 0) or
477 // b. not sending to HTTP client and not at end, and
478 // not downloading from P2P and not at end
480 if ( swift::Size(http_requests[httpc].transfer) == 0 || \
481 (http_requests[httpc].offset == lastoffset &&
482 http_requests[httpc].offset != swift::Size(http_requests[httpc].transfer) && \
483 swift::Complete(http_requests[httpc].transfer) == lastcomplete && \
484 swift::Complete(http_requests[httpc].transfer) != swift::Size(http_requests[httpc].transfer)))
486 fprintf(stderr,"http alive: no progress, quiting\n");
492 if (http_requests[httpc].offset == swift::Size(http_requests[httpc].transfer))
494 // TODO: seed for a while.
495 fprintf(stderr,"http alive: data delivered to client, quiting\n");
500 lastoffset = http_requests[httpc].offset;
501 lastcomplete = swift::Complete(http_requests[httpc].transfer);