add .gitignore
[swift-upb.git] / transfer.cpp
index bb6cdb7..ad121f3 100644 (file)
 /*
  *  transfer.cpp
- *  p2tp
+ *  some transfer-scope code
  *
  *  Created by Victor Grishchenko on 10/6/09.
  *  Copyright 2009 Delft University of Technology. All rights reserved.
  *
  */
-#ifdef _MSC_VER
-#include "compat/unixio.h"
-#else
-#include <sys/mman.h>
-#endif
 #include <errno.h>
 #include <string>
 #include <sstream>
-#include "p2tp.h"
-#include "compat/util.h"
+#include "swift.h"
 
-using namespace p2tp;
+#include "ext/seq_picker.cpp" // FIXME FIXME FIXME FIXME 
+
+using namespace swift;
 
 std::vector<FileTransfer*> FileTransfer::files(20);
 
-int FileTransfer::instance = 0;
 #define BINHASHSIZE (sizeof(bin64_t)+sizeof(Sha1Hash))
 
-#include "ext/seq_picker.cpp"
-
 // FIXME: separate Bootstrap() and Download(), then Size(), Progress(), SeqProgress()
 
 FileTransfer::FileTransfer (const char* filename, const Sha1Hash& _root_hash) :
-    root_hash_(_root_hash), fd_(0), hashfd_(0), dry_run_(false),
-    peak_count_(0), hashes_(NULL), error_(NULL), size_(0), sizek_(0),
-    complete_(0), completek_(0), seq_complete_(0)
+    file_(filename,_root_hash), hs_in_offset_(0), cb_installed(0)
 {
-       fd_ = open(filename,O_RDWR|O_CREAT,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
-       if (fd_<0)
-        return;
-    if (files.size()<fd_+1)
-        files.resize(fd_+1);
-    files[fd_] = this;
-    if (root_hash_==Sha1Hash::ZERO) // fresh submit, hash it
-        Submit();
-    else
-        RecoverProgress();
+    if (files.size()<fd()+1)
+        files.resize(fd()+1);
+    files[fd()] = this;
     picker_ = new SeqPiecePicker(this);
+    picker_->Randomize(rand()&63);
+    init_time_ = Datagram::Time();
 }
 
 
-void FileTransfer::LoadPeaks () {
-    std::string file_name = GetTempFilename(root_hash_,instance,std::string(".peaks"));
-    int peakfd = open(file_name.c_str(),O_RDONLY,0);
-    if (peakfd<0)
-        return;
-    bin64_t peak;
-    char hash[128];
-    while (sizeof(bin64_t)==read(peakfd,&peak,sizeof(bin64_t))) {
-        read(peakfd,hash,Sha1Hash::SIZE);
-        OfferPeak(peak, Sha1Hash(false,hash));
-    }
-    close(peakfd);
+void    Channel::CloseTransfer (FileTransfer* trans) {
+    for(int i=0; i<Channel::channels.size(); i++) 
+        if (Channel::channels[i] && Channel::channels[i]->transfer_==trans) 
+            delete Channel::channels[i];
 }
 
 
-/** Basically, simulated receiving every single packet, except
-    for some optimizations. */
-void            FileTransfer::RecoverProgress () {
-    dry_run_ = true;
-    LoadPeaks();
-    if (!size())
+void swift::AddProgressCallback (int transfer,ProgressCallback cb,uint8_t agg) {
+    FileTransfer* trans = FileTransfer::file(transfer);
+    if (!trans)
         return;
-    // at this point, we may use mmapd hashes already
-    // so, lets verify hashes and the data we've got
-    lseek(fd_,0,SEEK_SET);
-    for(int p=0; p<size_kilo(); p++) {
-        uint8_t buf[1<<10];
-        size_t rd = read(fd_,buf,1<<10);
-        OfferData(bin64_t(0,p), buf, rd);
-        if (rd<(1<<10))
-            break;
-    }
-    dry_run_ = false;
-}
-
-
-void    FileTransfer::SavePeaks () {
-    std::string file_name = GetTempFilename(root_hash_,instance,std::string(".peaks"));
-    int peakfd = open(file_name.c_str(),O_RDWR|O_CREAT,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
-    for(int i=0; i<peak_count(); i++) {
-        write(peakfd,&(peaks_[i]),sizeof(bin64_t));
-        write(peakfd,*peak_hashes_[i],Sha1Hash::SIZE);
-    }
-    close(peakfd);
+    trans->cb_agg[trans->cb_installed] = agg;
+    trans->callbacks[trans->cb_installed] = cb;
+    trans->cb_installed++;
 }
 
 
-void FileTransfer::SetSize (size_t bytes) { // peaks/root must be already set
-    size_ = bytes;
-    completek_ = complete_ = seq_complete_ = 0;
-       sizek_ = (size_>>10) + ((size_&1023) ? 1 : 0);
-
-       struct stat st;
-       fstat(fd_, &st);
-    if (st.st_size!=bytes)
-        if (ftruncate(fd_, bytes))
-            return; // remain in the 0-state
-    // mmap the hash file into memory
-    std::string file_name = GetTempFilename(root_hash_,instance,std::string(".hashes"));
-       hashfd_ = open(file_name.c_str(),O_RDWR|O_CREAT,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
-    size_t expected_size_ = Sha1Hash::SIZE * sizek_ * 2;
-       struct stat hash_file_st;
-       fstat(hashfd_, &hash_file_st);
-    if ( hash_file_st.st_size != expected_size_ )
-        ftruncate(hashfd_, expected_size_);
-#ifdef _MSC_VER
-    HANDLE hashhandle = (HANDLE)_get_osfhandle(hashfd_);
-    hashmaphandle_ = CreateFileMapping(hashhandle,
-                                             NULL,
-                                             PAGE_READWRITE,
-                                             0,
-                                             0,
-                                             NULL);
-       if (hashmaphandle_ != NULL)
-       {
-               hashes_ = (Sha1Hash*)MapViewOfFile(hashmaphandle_,
-                                                        FILE_MAP_WRITE,
-                                                    0,
-                                                    0,
-                                                    0);
-
-       }
-       if (hashmaphandle_ == NULL || hashes_ == NULL)
-#else
-    hashes_ = (Sha1Hash*) mmap (NULL, expected_size, PROT_READ|PROT_WRITE,
-                               MAP_SHARED, hashfd_, 0);
-    if (hashes_==MAP_FAILED)
-#endif
-    {
-        hashes_ = NULL;
-        size_ = sizek_ = complete_ = completek_ = seq_complete_ = 0;
-        error_ = strerror(errno); // FIXME dprintf()
-        perror("hash tree mmap failed");
+void swift::ExternallyRetrieved (int transfer,bin64_t piece) {
+    FileTransfer* trans = FileTransfer::file(transfer);
+    if (!trans)
         return;
-    }
-    for(int i=0; i<peak_count_; i++)
-        hashes_[peaks_[i]] = peak_hashes_[i];
-}
-
-
-void            FileTransfer::Submit () {
-       struct stat st; // TODO:   AppendData()   and   streaming
-       fstat(fd_, &st);
-    size_ = st.st_size;
-       sizek_ = (size_>>10) + ((size_&1023) ? 1 : 0);
-    hashes_ = (Sha1Hash*) malloc(Sha1Hash::SIZE*sizek_*2);
-    peak_count_ = bin64_t::peaks(sizek_,peaks_);
-    for (int p=0; p<peak_count_; p++) {
-        for(bin64_t b=peaks_[p].left_foot(); b.within(peaks_[p]); b=b.next_dfsio(0))
-            if (b.is_base()) {
-                uint8_t kilo[1<<10];
-                size_t rd = pread(fd_,kilo,1<<10,b.base_offset()<<10);
-                hashes_[b] = Sha1Hash(kilo,rd);
-            } else
-                hashes_[b] = Sha1Hash(hashes_[b.left()],hashes_[b.right()]);
-        peak_hashes_[p] = hashes_[peaks_[p]];
-        ack_out_.set(peaks_[p],bins::FILLED);
-    }
-    root_hash_ = DeriveRoot();
-    Sha1Hash *hash_tmp = hashes_;
-    SetSize(st.st_size);
-    SavePeaks();
-    seq_complete_ = complete_ = size_;
-    completek_ = sizek_;
-    memcpy(hashes_,hash_tmp,sizek_*Sha1Hash::SIZE*2);
-    free(hash_tmp);
-}
-
-
-bin64_t         FileTransfer::peak_for (bin64_t pos) const {
-    int pi=0;
-    while (pi<peak_count_ && !pos.within(peaks_[pi]))
-        pi++;
-    return pi==peak_count_ ? bin64_t(bin64_t::NONE) : peaks_[pi];
+    trans->ack_out().set(piece); // that easy
 }
 
 
-void            FileTransfer::OfferHash (bin64_t pos, const Sha1Hash& hash) {
-       if (!size_)  // only peak hashes are accepted at this point
-               return OfferPeak(pos,hash);
-    int pi=0;
-    while (pi<peak_count_ && !pos.within(peaks_[pi]))
-        pi++;
-    if (pi==peak_count_)
+void swift::RemoveProgressCallback (int transfer, ProgressCallback cb) {
+    FileTransfer* trans = FileTransfer::file(transfer);
+    if (!trans)
         return;
-    if (pos==peaks_[pi] && hash!=peak_hashes_[pi])
-        return;
-    else if (ack_out_.get(pos.parent())!=bins::EMPTY)
-        return; // have this hash already, even accptd data
-       hashes_[pos] = hash;
-}
-
-
-bin64_t         FileTransfer::data_in (int offset) {
-    if (offset>data_in_.size())
-        return bin64_t::NONE;
-    return data_in_[offset];
-}
-
-
-bool            FileTransfer::OfferData (bin64_t pos, const uint8_t* data, size_t length) {
-    if (!pos.is_base())
-        return false;
-    if (length<1024 && pos!=bin64_t(0,sizek_-1))
-        return false;
-    if (ack_out_.get(pos)==bins::FILLED)
-        return true; // ???
-    bin64_t peak = peak_for(pos);
-    if (peak==bin64_t::NONE)
-        return false;
-
-    Sha1Hash hash(data,length);
-    bin64_t p = pos;
-    while ( p!=peak && ack_out_.get(p)==bins::EMPTY ) {
-        hashes_[p] = hash;
-        p = p.parent();
-        hash = Sha1Hash(hashes_[p.left()],hashes_[p.right()]) ;
-    }
-    if (hash!=hashes_[p])
-        return false;
-
-    //printf("g %lli %s\n",(uint64_t)pos,hash.hex().c_str());
-       // walk to the nearest proven hash   FIXME 0-layer peak
-    ack_out_.set(pos,bins::FILLED);
-    pwrite(fd_,data,length,pos.base_offset()<<10);
-    complete_ += length;
-    completek_++;
-    if (length<1024) {
-        size_ -= 1024 - length;
-        ftruncate(fd_, size_);
-    }
-    while (ack_out_.get(bin64_t(0,seq_complete_>>10))==bins::FILLED)
-        seq_complete_+=1024;
-    if (seq_complete_>size_)
-        seq_complete_ = size_;
-    data_in_.push_back(pos);
-    return true;
-}
-
-
-Sha1Hash        FileTransfer::DeriveRoot () {
-       int c = peak_count_-1;
-       bin64_t p = peaks_[c];
-       Sha1Hash hash = peak_hashes_[c];
-       c--;
-       while (p!=bin64_t::ALL) {
-               if (p.is_left()) {
-                       p = p.parent();
-                       hash = Sha1Hash(hash,Sha1Hash::ZERO);
-               } else {
-                       if (c<0 || peaks_[c]!=p.sibling())
-                               return Sha1Hash::ZERO;
-                       hash = Sha1Hash(peak_hashes_[c],hash);
-                       p = p.parent();
-                       c--;
-               }
-        //printf("p %lli %s\n",(uint64_t)p,hash.hex().c_str());
-       }
-    return hash;
-}
-
-
-void            FileTransfer::OfferPeak (bin64_t pos, const Sha1Hash& hash) {
-    assert(!size_);
-    if (peak_count_) {
-        bin64_t last_peak = peaks_[peak_count_-1];
-        if ( pos.layer()>=last_peak.layer() ||
-             pos.base_offset()!=last_peak.base_offset()+last_peak.width() )
-            peak_count_ = 0;
-    }
-    peaks_[peak_count_] = pos;
-    peak_hashes_[peak_count_++] = hash;
-    // check whether peak hash candidates add up to the root hash
-    Sha1Hash mustbe_root = DeriveRoot();
-    if (mustbe_root!=root_hash_)
-        return;
-    // bingo, we now know the file size (rounded up to a KByte)
-    SetSize( (pos.base_offset()+pos.width()) << 10              );
-    SavePeaks();
+    for(int i=0; i<trans->cb_installed; i++)
+        if (trans->callbacks[i]==cb)
+            trans->callbacks[i]=trans->callbacks[--trans->cb_installed];
 }
 
 
 FileTransfer::~FileTransfer ()
 {
-#ifdef _MSC_VER
-       UnmapViewOfFile(hashes_);
-       CloseHandle(hashmaphandle_);
-#else
-    munmap(hashes_,sizek_*2*Sha1Hash::SIZE);
-    close(hashfd_);
-    close(fd_);
-    files[fd_] = NULL;
-#endif
+    Channel::CloseTransfer(this);
+    files[fd()] = NULL;
+    delete picker_;
 }
 
 
 FileTransfer* FileTransfer::Find (const Sha1Hash& root_hash) {
     for(int i=0; i<files.size(); i++)
-        if (files[i] && files[i]->root_hash_==root_hash)
+        if (files[i] && files[i]->root_hash()==root_hash)
             return files[i];
     return NULL;
 }
 
 
-
-std::string FileTransfer::GetTempFilename(Sha1Hash& root_hash, int instance, std::string postfix)
-{
-       std::string tempfile = gettmpdir();
-       std::stringstream ss;
-       ss << instance;
-       tempfile += std::string(".") + root_hash.hex() + std::string(".") + ss.str() + postfix;
-       return tempfile;
+int       swift:: Find (Sha1Hash hash) {
+    FileTransfer* t = FileTransfer::Find(hash);
+    if (t)
+        return t->fd();
+    return -1;
 }
 
 
 
-int      p2tp::Open (const char* filename, const Sha1Hash& hash) {
-    FileTransfer* ft = new FileTransfer(filename, hash);
-    int fdes = ft->file_descriptor();
-    if (fdes>0) {
-        if (FileTransfer::files.size()<fdes)
-            FileTransfer::files.resize(fdes);
-        FileTransfer::files[fdes] = ft;
-        return fdes;
+void            FileTransfer::OnPexIn (const Address& addr) {
+    for(int i=0; i<hs_in_.size(); i++) {
+        Channel* c = Channel::channel(hs_in_[i]);
+        if (c && c->transfer().fd()==this->fd() && c->peer()==addr)
+            return; // already connected
+    }
+    if (hs_in_.size()<20) {
+        new Channel(this,Datagram::default_socket(),addr);
     } else {
-        delete ft;
-        return -1;
+        pex_in_.push_back(addr);
+        if (pex_in_.size()>1000)
+            pex_in_.pop_front();
     }
 }
 
 
-/*
- for(int i=0; i<peak_hash_count; i++) {
- bin64_t x = peaks[i], end = x.sibling();
- do {
- while (!x.layer()>10) {
- OfferHash(x.right(), hashes[x.right()]);
- if ( ! OfferHash(x.left(), hashes[x.left()]) )
- break;
- x = x.left();
- }
-
- if (x.layer()==10) {
- if (recheck_data) {
- uint8_t data[1024];
- size_t rd = pread(fd,data,2<<10,x.base_offset());
- if (hashes[x]==Sha1Hash(data,rd))
- ack_out->set(x,bins::FILLED);
- // may avoid hashing by checking whether it is zero
- // and whether the hash matches hash of zero
- } else {
- ack_out->set(x,bins::FILLED);
- }
- }
-
- while (x.is_right() && x!=peaks[i])
- x = x.parent();
- x = x.sibling();
- } while (x!=end);
- }
-
-
-
-
- // open file
- if ( hash_file_st.st_size < (sizeof(bin64_t)+Sha1Hash::SIZE)*64 )
- return;
- // read root hash
- char hashbuf[128];
- uint64_t binbuf;
- lseek(hashfd_,0,SEEK_SET);
- read(hashfd_,&binbuf,sizeof(bin64_t));
- read(hashfd_,hashbuf,Sha1Hash::SIZE);
- Sha1Hash mustberoot(false,(const char*)hashbuf);
- if ( binbuf!=bin64_t::ALL || mustberoot != this->root_hash ) {
- ftruncate(hashfd_,Sha1Hash::SIZE*64);
- return;
- }
- // read peak hashes
- for(int i=1; i<64 && !this->size; i++){
- read(hashfd_,&binbuf,sizeof(bin64_t));
- read(hashfd_,hashbuf,Sha1Hash::SIZE);
- Sha1Hash mustbepeak(false,(const char*)hashbuf);
- if (mustbepeak==Sha1Hash::ZERO)
- break;
- OfferPeak(binbuf,mustbepeak);
- }
- if (!size)
- return;
-
+int        FileTransfer::RevealChannel (int& pex_out_) { // FIXME brainfuck
+    pex_out_ -= hs_in_offset_;
+    if (pex_out_<0)
+        pex_out_ = 0;
+    while (pex_out_<hs_in_.size()) {
+        Channel* c = Channel::channel(hs_in_[pex_out_]);
+        if (c && c->transfer().fd()==this->fd()) {
+            if (c->is_established()) {
+                pex_out_ += hs_in_offset_ + 1;
+                return c->id();
+            } else
+                pex_out_++;
+        } else {
+            hs_in_[pex_out_] = hs_in_[0];
+            hs_in_.pop_front();
+            hs_in_offset_++;
+        }
+    }
+    pex_out_ += hs_in_offset_;
+    return -1;
+}
 
- */