X-Git-Url: http://p2p-next.cs.pub.ro/gitweb/?a=blobdiff_plain;ds=sidebyside;f=src%2Flibswift%2Fhttpgw.cpp;fp=src%2Flibswift%2Fhttpgw.cpp;h=542b1e5f30192646eba34b9462cccc6a5954367f;hb=45963a7511531cd1656ad5d3847d2dafd015c54d;hp=0000000000000000000000000000000000000000;hpb=d069796805ad79542fd7e4406d1e9c6d2d8c2ef7;p=swifty.git diff --git a/src/libswift/httpgw.cpp b/src/libswift/httpgw.cpp new file mode 100644 index 0000000..542b1e5 --- /dev/null +++ b/src/libswift/httpgw.cpp @@ -0,0 +1,509 @@ +/* + * httpgw.cpp + * gateway for serving swift content via HTTP, libevent2 based. + * + * Created by Victor Grishchenko, Arno Bakker + * Copyright 2010-2012 TECHNISCHE UNIVERSITEIT DELFT. All rights reserved. + * + */ +#include "swift.h" +#include +#include + +using namespace swift; + +#define HTTPGW_PROGRESS_STEP_BYTES (256*1024) +// For best performance make bigger than HTTPGW_PROGRESS_STEP_BYTES +#define HTTPGW_MAX_WRITE_BYTES (512*1024) + +// Report swift download progress every 2^layer * chunksize bytes (so 0 = report every chunk) +#define HTTPGW_FIRST_PROGRESS_BYTE_INTERVAL_AS_LAYER 0 + +// Arno: libevent2 has a liberal understanding of socket writability, +// that may result in tens of megabytes being cached in memory. Limit that +// amount at app level. +#define HTTPGW_MAX_PREBUF_BYTES (2*1024*1024) + +#define HTTPGW_MAX_REQUEST 128 + +struct http_gw_t { + int id; + uint64_t offset; + uint64_t tosend; + int transfer; + uint64_t lastcpoffset; // last offset at which we checkpointed + struct evhttp_request *sinkevreq; + struct event *sinkevwrite; + char* xcontentdur; + bool closing; + +} http_requests[HTTPGW_MAX_REQUEST]; + + +int http_gw_reqs_open = 0; +int http_gw_reqs_count = 0; +struct evhttp *http_gw_event; +struct evhttp_bound_socket *http_gw_handle; +uint32_t httpgw_chunk_size = SWIFT_DEFAULT_CHUNK_SIZE; // Copy of cmdline param +double *httpgw_maxspeed = NULL; // Copy of cmdline param + +// Arno, 2010-11-30: for SwarmPlayer 3000 backend autoquit when no HTTP req is received +bool sawhttpconn = false; + + +http_gw_t *HttpGwFindRequestByEV(struct evhttp_request *evreq) { + for (int httpc=0; httpcid, req->sinkevreq); + + struct evhttp_connection *evconn = evhttp_request_get_connection(req->sinkevreq); + + dprintf("%s @%i cleanup: before send reply\n",tintstr(),req->id); + + req->closing = true; + if (req->offset > 0) + evhttp_send_reply_end(req->sinkevreq); //WARNING: calls HttpGwLibeventCloseCallback + else + evhttp_request_free(req->sinkevreq); + + dprintf("%s @%i cleanup: reset evreq\n",tintstr(),req->id); + req->sinkevreq = NULL; + dprintf("%s @%i cleanup: after reset evreq\n",tintstr(),req->id); + + // Note: for some reason calling conn_free here prevents the last chunks + // to be sent to the requester? + // evhttp_connection_free(evconn); // WARNING: calls HttpGwLibeventCloseCallback + + // Current close policy: checkpoint and DO NOT close transfer, keep on + // seeding forever. More sophisticated clients should use CMD GW and issue + // REMOVE. + swift::Checkpoint(req->transfer); + + //swift::Close(req->transfer); + + *req = http_requests[--http_gw_reqs_open]; +} + + +void HttpGwLibeventCloseCallback(struct evhttp_connection *evconn, void *evreqvoid) { + // Called by libevent on connection close, either when the other side closes + // or when we close (because we call evhttp_connection_free()). To prevent + // doing cleanup twice, we see if there is a http_gw_req that has the + // passed evreqvoid as sinkevreq. If so, clean up, if not, ignore. + // I.e. evhttp_request * is used as sort of request ID + // + fprintf(stderr,"HttpGwLibeventCloseCallback: called\n"); + http_gw_t * req = HttpGwFindRequestByEV((struct evhttp_request *)evreqvoid); + if (req == NULL) + dprintf("%s http conn already closed\n",tintstr() ); + else { + dprintf("%s T%i http close conn\n",tintstr(),req->transfer); + if (req->closing) + dprintf("%s http conn already closing\n",tintstr() ); + else + HttpGwCloseConnection(req); + } +} + + + + +void HttpGwMayWriteCallback (int transfer) { + // Write some data to client + + http_gw_t* req = HttpGwFindRequestByTransfer(transfer); + if (req == NULL) { + print_error("httpgw: MayWrite: can't find req for transfer"); + return; + } + + uint64_t complete = swift::SeqComplete(req->transfer); + + //dprintf("%s @%i http write complete %lli offset %lli\n",tintstr(),req->id, complete, req->offset); + //fprintf(stderr,"offset %lli seqcomp %lli comp %lli\n",req->offset, complete, swift::Complete(req->transfer) ); + + struct evhttp_connection *evconn = evhttp_request_get_connection(req->sinkevreq); + struct bufferevent* buffy = evhttp_connection_get_bufferevent(evconn); + struct evbuffer *outbuf = bufferevent_get_output(buffy); + + //fprintf(stderr,"httpgw: MayWrite avail %i bufev outbuf %i\n",complete-req->offset, evbuffer_get_length(outbuf) ); + + if (complete > req->offset && evbuffer_get_length(outbuf) < HTTPGW_MAX_PREBUF_BYTES) + { + // Received more than I pushed to player, send data + char buf[HTTPGW_MAX_WRITE_BYTES]; +// Arno, 2010-08-16, TODO +#ifdef WIN32 + uint64_t tosend = min(HTTPGW_MAX_WRITE_BYTES,complete-req->offset); +#else + uint64_t tosend = std::min((uint64_t)HTTPGW_MAX_WRITE_BYTES,complete-req->offset); +#endif + size_t rd = pread(req->transfer,buf,tosend,req->offset); // hope it is cached + if (rd<0) { + print_error("httpgw: MayWrite: error pread"); + HttpGwCloseConnection(req); + return; + } + + // Construct evbuffer and send incrementally + struct evbuffer *evb = evbuffer_new(); + int ret = evbuffer_add(evb,buf,rd); + if (ret < 0) { + print_error("httpgw: MayWrite: error evbuffer_add"); + evbuffer_free(evb); + HttpGwCloseConnection(req); + return; + } + + if (req->offset == 0) { + // Not just for chunked encoding, see libevent2's http.c + evhttp_send_reply_start(req->sinkevreq, 200, "OK"); + } + + evhttp_send_reply_chunk(req->sinkevreq, evb); + evbuffer_free(evb); + + int wn = rd; + dprintf("%s @%i http sent data %ib\n",tintstr(),req->id,(int)wn); + + req->offset += wn; + req->tosend -= wn; + + // PPPLUG + FileTransfer *ft = FileTransfer::file(transfer); + if (ft == NULL) + return; + ft->picker().updatePlaybackPos( wn/ft->file().chunk_size() ); + } + + // Arno, 2010-11-30: tosend is set to fuzzy len, so need extra/other test. + if (req->tosend==0 || req->offset == swift::Size(req->transfer)) { + // done; wait for new HTTP request + dprintf("%s @%i done\n",tintstr(),req->id); + //fprintf(stderr,"httpgw: MayWrite: done, wait for buffer empty before send_end_reply\n" ); + + if (evbuffer_get_length(outbuf) == 0) { + //fprintf(stderr,"httpgw: MayWrite: done, buffer empty, end req\n" ); + HttpGwCloseConnection(req); + } + } + else { + // wait for data + dprintf("%s @%i waiting for data\n",tintstr(),req->id); + } +} + +void HttpGwLibeventMayWriteCallback(evutil_socket_t fd, short events, void *evreqvoid ); + +void HttpGwSubscribeToWrite(http_gw_t * req) { + struct evhttp_connection *evconn = evhttp_request_get_connection(req->sinkevreq); + struct event_base *evbase = evhttp_connection_get_base(evconn); + struct bufferevent* evbufev = evhttp_connection_get_bufferevent(evconn); + + if (req->sinkevwrite != NULL) + event_free(req->sinkevwrite); // FAXME: clean in CloseConn + + req->sinkevwrite = event_new(evbase,bufferevent_getfd(evbufev),EV_WRITE,HttpGwLibeventMayWriteCallback,req->sinkevreq); + struct timeval t; + t.tv_sec = 10; + int ret = event_add(req->sinkevwrite,&t); + //fprintf(stderr,"httpgw: HttpGwSubscribeToWrite: added event\n"); +} + + +void HttpGwLibeventMayWriteCallback(evutil_socket_t fd, short events, void *evreqvoid ) +{ + //fprintf(stderr,"httpgw: MayWrite: %d events %d evreq is %p\n", fd, events, evreqvoid); + + http_gw_t * req = HttpGwFindRequestByEV((struct evhttp_request *)evreqvoid); + if (req != NULL) { + //fprintf(stderr,"httpgw: MayWrite: %d events %d httpreq is %p\n", fd, events, req); + HttpGwMayWriteCallback(req->transfer); + + + // Arno, 2011-12-20: No autoreschedule, let HttpGwSwiftProgressCallback do that + //if (req->sinkevreq != NULL) // Conn closed + // HttpGwSubscribeToWrite(req); + + //fprintf(stderr,"GOTO WRITE %lli >= %lli\n", swift::Complete(req->transfer)+HTTPGW_MAX_WRITE_BYTES, swift::Size(req->transfer) ); + + if (swift::Complete(req->transfer)+HTTPGW_MAX_WRITE_BYTES >= swift::Size(req->transfer)) { + + // We don't get progress callback for last chunk < chunk size, nor + // when all data is already on disk. In that case, just keep on + // subscribing to HTTP socket writability until all data is sent. + if (req->sinkevreq != NULL) // Conn closed + HttpGwSubscribeToWrite(req); + } + } +} + +void HttpGwSwiftProgressCallback (int transfer, bin_t bin) { + // Subsequent HTTPGW_PROGRESS_STEP_BYTES available + + dprintf("%s T%i http more progress\n",tintstr(),transfer); + http_gw_t* req = HttpGwFindRequestByTransfer(transfer); + if (req == NULL) + return; + + // Arno, 2011-12-20: We have new data to send, wait for HTTP socket writability + if (req->sinkevreq != NULL) { // Conn closed + HttpGwSubscribeToWrite(req); + } +} + + +void HttpGwFirstProgressCallback (int transfer, bin_t bin) { + // First chunk of data available + dprintf("%s T%i http first progress\n",tintstr(),transfer); + + if (!bin.contains(bin_t(0,0))) // need the first chunk + return; + + swift::RemoveProgressCallback(transfer,&HttpGwFirstProgressCallback); + int progresslayer = bytes2layer(HTTPGW_PROGRESS_STEP_BYTES,swift::ChunkSize(transfer)); + swift::AddProgressCallback(transfer,&HttpGwSwiftProgressCallback,progresslayer); + + http_gw_t* req = HttpGwFindRequestByTransfer(transfer); + if (req == NULL) + return; + if (req->tosend==0) { // FIXME states + uint64_t filesize = swift::Size(transfer); + char filesizestr[256]; + sprintf(filesizestr,"%lli",filesize); + + struct evkeyvalq *headers = evhttp_request_get_output_headers(req->sinkevreq); + //evhttp_add_header(headers, "Connection", "keep-alive" ); + evhttp_add_header(headers, "Connection", "close" ); + evhttp_add_header(headers, "Content-Type", "video/ogg" ); + evhttp_add_header(headers, "X-Content-Duration",req->xcontentdur ); + evhttp_add_header(headers, "Content-Length", filesizestr ); + evhttp_add_header(headers, "Accept-Ranges", "none" ); + + req->tosend = filesize; + dprintf("%s @%i headers_sent size %lli\n",tintstr(),req->id,filesize); + + /* + * Arno, 2011-10-17: Swift ProgressCallbacks are only called when + * the data is downloaded, not when it is already on disk. So we need + * to handle the situation where all or part of the data is already + * on disk. Subscribing to writability of the socket works, + * but requires libevent2 >= 2.1 (or our backported version) + */ + HttpGwSubscribeToWrite(req); + } +} + + +void HttpGwNewRequestCallback (struct evhttp_request *evreq, void *arg) { + + dprintf("%s @%i http new request\n",tintstr(),http_gw_reqs_count+1); + + if (evhttp_request_get_command(evreq) != EVHTTP_REQ_GET) { + return; + } + sawhttpconn = true; + + // Parse URI + const char *uri = evhttp_request_get_uri(evreq); + //struct evkeyvalq *headers = evhttp_request_get_input_headers(evreq); + //const char *contentrangestr =evhttp_find_header(headers,"Content-Range"); + + char *tokenuri = (char *)malloc(strlen(uri)+1); + strcpy(tokenuri,uri); + char * hashch=strtok(tokenuri,"/"), hash[41]; + while (hashch && (1!=sscanf(hashch,"%40[0123456789abcdefABCDEF]",hash) || strlen(hash)!=40)) + hashch = strtok(NULL,"/"); + free(tokenuri); + if (strlen(hash)!=40) { + evhttp_send_error(evreq,400,"Path must be root hash in hex, 40 bytes."); + return; + } + char *xcontentdur = NULL; + if (strlen(uri) > 42) { + xcontentdur = (char *)malloc(strlen(uri)-42+1); + strcpy(xcontentdur,&uri[42]); + } + else + xcontentdur = (char *)"0"; + dprintf("%s @%i demands %s %s\n",tintstr(),http_gw_reqs_open+1,hash,xcontentdur); + + // initiate transmission + Sha1Hash root_hash = Sha1Hash(true,hash); + int transfer = swift::Find(root_hash); + if (transfer==-1) { + transfer = swift::Open(hash,root_hash,Address(),false,true,httpgw_chunk_size); // ARNOTODO: allow for chunk size to be set via URL? + dprintf("%s @%i trying to HTTP GET swarm %s that has not been STARTed\n",tintstr(),http_gw_reqs_open+1,hash); + + // Arno, 2011-12-20: Only on new transfers, otherwise assume that CMD GW + // controls speed + FileTransfer *ft = FileTransfer::file(transfer); + ft->SetMaxSpeed(DDIR_DOWNLOAD,httpgw_maxspeed[DDIR_DOWNLOAD]); + ft->SetMaxSpeed(DDIR_UPLOAD,httpgw_maxspeed[DDIR_UPLOAD]); + } + + // Record request + http_gw_t* req = http_requests + http_gw_reqs_open++; + req->id = ++http_gw_reqs_count; + req->sinkevreq = evreq; + req->xcontentdur = xcontentdur; + req->offset = 0; + req->tosend = 0; + req->transfer = transfer; + req->lastcpoffset = 0; + req->sinkevwrite = NULL; + req->closing = false; + + fprintf(stderr,"httpgw: Opened %s\n",hash); + + // We need delayed replying, so take ownership. + // See http://code.google.com/p/libevent-longpolling/source/browse/trunk/main.c + // Careful: libevent docs are broken. It doesn't say that evhttp_send_reply_send + // actually calls evhttp_request_free, i.e. releases ownership for you. + // + evhttp_request_own(evreq); + + // Register callback for connection close + struct evhttp_connection *evconn = evhttp_request_get_connection(req->sinkevreq); + evhttp_connection_set_closecb(evconn,HttpGwLibeventCloseCallback,req->sinkevreq); + + if (swift::Size(transfer)) { + HttpGwFirstProgressCallback(transfer,bin_t(0,0)); + } else { + swift::AddProgressCallback(transfer,&HttpGwFirstProgressCallback,HTTPGW_FIRST_PROGRESS_BYTE_INTERVAL_AS_LAYER); + } +} + + +bool InstallHTTPGateway (struct event_base *evbase,Address bindaddr, uint32_t chunk_size, double *maxspeed) { + // Arno, 2011-10-04: From libevent's http-server.c example + + /* Create a new evhttp object to handle requests. */ + http_gw_event = evhttp_new(evbase); + if (!http_gw_event) { + print_error("httpgw: evhttp_new failed"); + return false; + } + + /* Install callback for all requests */ + evhttp_set_gencb(http_gw_event, HttpGwNewRequestCallback, NULL); + + /* Now we tell the evhttp what port to listen on */ + http_gw_handle = evhttp_bind_socket_with_handle(http_gw_event, bindaddr.ipv4str(), bindaddr.port()); + if (!http_gw_handle) { + print_error("httpgw: evhttp_bind_socket_with_handle failed"); + return false; + } + + httpgw_chunk_size = chunk_size; + httpgw_maxspeed = maxspeed; + return true; +} + + +uint64_t lastoffset=0; +uint64_t lastcomplete=0; +tint test_time = 0; + +/** For SwarmPlayer 3000's HTTP failover. We should exit if swift isn't + * delivering such that the extension can start talking HTTP to the backup. + */ +bool HTTPIsSending() +{ + if (http_gw_reqs_open > 0) + { + FileTransfer *ft = FileTransfer::file(http_requests[http_gw_reqs_open-1].transfer); + if (ft != NULL) { + fprintf(stderr,"httpgw: upload %lf\n",ft->GetCurrentSpeed(DDIR_UPLOAD)/1024.0); + fprintf(stderr,"httpgw: dwload %lf\n",ft->GetCurrentSpeed(DDIR_DOWNLOAD)/1024.0); + fprintf(stderr,"httpgw: seqcmp %llu\n", swift::SeqComplete(http_requests[http_gw_reqs_open-1].transfer)); + } + } + return true; + + // TODO: reactivate when used in SwiftTransport / SwarmPlayer 3000. + + if (test_time == 0) + { + test_time = NOW; + return true; + } + + if (NOW > test_time+5*1000*1000) + { + fprintf(stderr,"http alive: httpc count is %d\n", http_gw_reqs_open ); + + if (http_gw_reqs_open == 0 && !sawhttpconn) + { + fprintf(stderr,"http alive: no HTTP activity ever, quiting\n"); + return false; + } + else + sawhttpconn = true; + + for (int httpc=0; httpc= 100000) + { + fprintf(stderr,"http alive: 100K sent, quit\n"); + return false; + } + else + { + fprintf(stderr,"http alive: sent %lli\n", http_requests[httpc].offset ); + return true; + } + */ + + // If + // a. don't know anything about content (i.e., size still 0) or + // b. not sending to HTTP client and not at end, and + // not downloading from P2P and not at end + // then stop. + if ( swift::Size(http_requests[httpc].transfer) == 0 || \ + (http_requests[httpc].offset == lastoffset && + http_requests[httpc].offset != swift::Size(http_requests[httpc].transfer) && \ + swift::Complete(http_requests[httpc].transfer) == lastcomplete && \ + swift::Complete(http_requests[httpc].transfer) != swift::Size(http_requests[httpc].transfer))) + { + fprintf(stderr,"http alive: no progress, quiting\n"); + //getchar(); + return false; + } + + /* + if (http_requests[httpc].offset == swift::Size(http_requests[httpc].transfer)) + { + // TODO: seed for a while. + fprintf(stderr,"http alive: data delivered to client, quiting\n"); + return false; + } + */ + + lastoffset = http_requests[httpc].offset; + lastcomplete = swift::Complete(http_requests[httpc].transfer); + } + test_time = NOW; + + return true; + } + else + return true; +}