/*
* transfer.cpp
- * p2tp
+ * some transfer-scope code
*
* Created by Victor Grishchenko on 10/6/09.
- * Copyright 2009 Delft Technical University. All rights reserved.
+ * Copyright 2009 Delft University of Technology. All rights reserved.
*
*/
-#include "p2tp.h"
-#include <sys/mman.h>
+#include <errno.h>
+#include <string>
+#include <sstream>
+#include "swift.h"
-using namespace p2tp;
+#include "ext/seq_picker.cpp" // FIXME FIXME FIXME FIXME
+
+using namespace swift;
std::vector<FileTransfer*> FileTransfer::files(20);
+#define BINHASHSIZE (sizeof(bin64_t)+sizeof(Sha1Hash))
+
+// FIXME: separate Bootstrap() and Download(), then Size(), Progress(), SeqProgress()
-FileTransfer::FileTransfer (const Sha1Hash& _root_hash, const char* filename) :
- root_hash(_root_hash), fd(0), hashfd(0), dry_run(false), peak_count(0), hashes(NULL)
+FileTransfer::FileTransfer (const char* filename, const Sha1Hash& _root_hash) :
+ file_(filename,_root_hash), hs_in_offset_(0), cb_installed(0)
{
- fd = open(filename,O_RDWR|O_CREAT,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
- hashfd = open("/tmp/hahs",O_RDWR|O_CREAT,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH); // FIXME
- if (fd<0 || hashfd<0)
- return;
- if (root_hash==Sha1Hash::ZERO) // fresh submit, hash it
- Submit();
- else
- RecoverProgress();
+ if (files.size()<fd()+1)
+ files.resize(fd()+1);
+ files[fd()] = this;
+ picker_ = new SeqPiecePicker(this);
+ picker_->Randomize(rand()&63);
+ init_time_ = Datagram::Time();
}
-/** Basically, simulated receiving every single packet, except
- for some optimizations. */
-void FileTransfer::RecoverProgress () {
- // open file
- struct stat hash_file_st;
- fstat(fd, &hash_file_st);
- if ( hash_file_st.st_size < (sizeof(bin64_t)+Sha1Hash::SIZE)*64 )
- return;
- // read root hash
- char hashbuf[128];
- uint64_t binbuf;
- lseek(hashfd,0,SEEK_SET);
- read(hashfd,&binbuf,sizeof(bin64_t));
- read(hashfd,hashbuf,Sha1Hash::SIZE);
- Sha1Hash mustberoot(false,(const char*)hashbuf);
- if ( binbuf!=bin64_t::ALL || mustberoot != this->root_hash ) {
- ftruncate(hashfd,Sha1Hash::SIZE*64);
- return;
- }
- // read peak hashes
- for(int i=1; i<64 && !this->size; i++){
- read(hashfd,&binbuf,sizeof(bin64_t));
- read(hashfd,hashbuf,Sha1Hash::SIZE);
- Sha1Hash mustbepeak(false,(const char*)hashbuf);
- if (mustbepeak==Sha1Hash::ZERO)
- break;
- OfferPeak(binbuf,mustbepeak);
- }
- if (!size)
- return;
- // at this point, we may use mmapd hashes already
- // so, lets verify hashes and the data we've got
- dry_run = true;
- lseek(fd,0,SEEK_SET);
- for(int p=0; p<sizek; p++) {
- uint8_t buf[1<<10];
- size_t rd = read(fd,buf,1<<10);
- OfferData(bin64_t(10,p), buf, rd);
- if (rd<(1<<10))
- break;
- }
- dry_run = false;
+void Channel::CloseTransfer (FileTransfer* trans) {
+ for(int i=0; i<Channel::channels.size(); i++)
+ if (Channel::channels[i] && Channel::channels[i]->transfer_==trans)
+ delete Channel::channels[i];
}
-void FileTransfer::SetSize (size_t bytes) {
- struct stat st;
- fstat(fd, &st);
- if (st.st_size!=bytes)
- if (ftruncate(fd, bytes))
- return; // remain in the 0-state
- complete = size = bytes;
- completek = sizek = (size>>10) + (size%1023 ? 1 : 0);
- peak_count = bin64_t::peaks(sizek,peaks);
- ftruncate( hashfd, sizek*2*Sha1Hash::SIZE +
- (sizeof(bin64_t)+Sha1Hash::SIZE)*64 );
- lseek(hashfd,0,SEEK_SET);
- write(hashfd,&bin64_t::ALL,sizeof(bin64_t));
- write(hashfd,*root_hash,Sha1Hash::SIZE);
- for(int i=0; i<peak_count; i++) {
- write(hashfd,&(peaks[i]),sizeof(bin64_t));
- write(hashfd,*(peak_hashes[i]),Sha1Hash::SIZE);
- }
- uint8_t zeros[256];
- memset(zeros,0,256);
- for(int i=peak_count; i<63; i++)
- write(hashfd,zeros,Sha1Hash::SIZE+sizeof(bin64_t));
- hashes = (Sha1Hash*) mmap (NULL, sizek*2*sizeof(Sha1Hash), PROT_READ|PROT_WRITE, MAP_FILE,
- hashfd, (sizeof(bin64_t)+sizeof(Sha1Hash))*64 );
+void swift::AddProgressCallback (int transfer,ProgressCallback cb,uint8_t agg) {
+ FileTransfer* trans = FileTransfer::file(transfer);
+ if (!trans)
+ return;
+ trans->cb_agg[trans->cb_installed] = agg;
+ trans->callbacks[trans->cb_installed] = cb;
+ trans->cb_installed++;
}
-void FileTransfer::Submit () {
- struct stat st;
- fstat(fd, &st);
- SetSize(st.st_size);
- if (!size)
+
+void swift::ExternallyRetrieved (int transfer,bin64_t piece) {
+ FileTransfer* trans = FileTransfer::file(transfer);
+ if (!trans)
return;
- for (int p=0; p<peak_count; p++) {
- for(bin64_t b=peaks[p].left_foot(); b.within(peaks[p]); b=b.next_dfsio(10))
- if (b.is_base()) {
- uint8_t kilo[1<<10];
- size_t rd = pread(fd,kilo,1<<10,b.base_offset());
- hashes[b] = Sha1Hash(kilo,rd);
- } else
- hashes[b] = Sha1Hash(hashes[b.left()],hashes[b.right()]);
- peak_hashes[peaks[p]] = hashes[peaks[p]];
- }
- root_hash = DeriveRoot();
+ trans->ack_out().set(piece); // that easy
}
-void FileTransfer::OfferHash (bin64_t pos, const Sha1Hash& hash) {
- if (!size) // only peak hashes are accepted at this point
- return OfferPeak(pos,hash);
- if (pos.base_offset()>=sizek)
- return;
- if (ack_out.get(pos)!=bins::EMPTY)
- return; // have this hash already, even accptd data
- hashes[pos] = hash;
+void swift::RemoveProgressCallback (int transfer, ProgressCallback cb) {
+ FileTransfer* trans = FileTransfer::file(transfer);
+ if (!trans)
+ return;
+ for(int i=0; i<trans->cb_installed; i++)
+ if (trans->callbacks[i]==cb)
+ trans->callbacks[i]=trans->callbacks[--trans->cb_installed];
}
-bool FileTransfer::OfferData (bin64_t pos, uint8_t* data, size_t length) {
- if (pos.layer()!=10)
- return false;
- if (ack_out.get(pos)==bins::FILLED)
- return true; // ???
- int peak=0;
- while (peak<peak_count && !pos.within(peaks[peak]))
- peak++;
- if (peak==peak_count)
- return false;
- Sha1Hash hash(data,length);
- hashes[pos] = hash;
- // walk to the nearest proven hash
- for(bin64_t p = pos.parent(); p.within(peaks[peak]) && ack_out.get(p)==bins::EMPTY; p=p.parent())
- if (hashes[p]!=Sha1Hash(hashes[p.left()],hashes[p.right()]))
- return false; // hash mismatch
- ack_out.set(pos,bins::FILLED);
- pwrite(fd,data,length,pos.base_offset());
- return true;
-}
-Sha1Hash FileTransfer::DeriveRoot () {
- int c = peak_count-1;
- bin64_t p = peaks[c];
- Sha1Hash hash = peak_hashes[c];
- c--;
- while (p<bin64_t::ALL) {
- if (p.is_left()) {
- p = p.parent();
- hash = Sha1Hash(hash,Sha1Hash::ZERO);
- } else {
- if (c<0 || peaks[c]!=p.sibling())
- return Sha1Hash::ZERO;
- hash = Sha1Hash(peak_hashes[c],hash);
- p = p.parent();
- c--;
- }
- }
- return hash;
+FileTransfer::~FileTransfer ()
+{
+ Channel::CloseTransfer(this);
+ files[fd()] = NULL;
+ delete picker_;
}
-void FileTransfer::OfferPeak (bin64_t pos, const Sha1Hash& hash) {
- assert(!size);
- if (peak_count) {
- bin64_t last_peak = peaks[peak_count-1];
- if ( pos.layer()>=last_peak.layer() ||
- pos.base_offset()!=last_peak.base_offset()+last_peak.width() )
- peak_count = 0;
- }
- peaks[peak_count] = pos;
- peak_hashes[peak_count++] = hash;
- // check whether peak hash candidates add up to the root hash
- Sha1Hash mustbe_root = DeriveRoot();
- if (hash!=root_hash)
- return;
- // bingo, we now know the file size (rounded up to a KByte)
- SetSize(pos.base_offset()+pos.width());
-}
-FileTransfer::~FileTransfer () {
- munmap(hashes,sizek*2*Sha1Hash::SIZE);
- close(hashfd);
- close(fd);
-}
-
FileTransfer* FileTransfer::Find (const Sha1Hash& root_hash) {
for(int i=0; i<files.size(); i++)
- if (files[i] && files[i]->root_hash==root_hash)
+ if (files[i] && files[i]->root_hash()==root_hash)
return files[i];
return NULL;
}
-int p2tp::Open (const char* filename) {
- return Open(Sha1Hash::ZERO,filename);
+
+int swift:: Find (Sha1Hash hash) {
+ FileTransfer* t = FileTransfer::Find(hash);
+ if (t)
+ return t->fd();
+ return -1;
}
-int p2tp::Open (const Sha1Hash& hash, const char* filename) {
- FileTransfer* ft = new FileTransfer(hash, filename);
- if (ft->fd>0) {
- if (FileTransfer::files.size()<ft->fd)
- FileTransfer::files.resize(ft->fd);
- FileTransfer::files[ft->fd] = ft;
- return ft->fd;
+
+
+void FileTransfer::OnPexIn (const Address& addr) {
+ for(int i=0; i<hs_in_.size(); i++) {
+ Channel* c = Channel::channel(hs_in_[i]);
+ if (c && c->transfer().fd()==this->fd() && c->peer()==addr)
+ return; // already connected
+ }
+ if (hs_in_.size()<20) {
+ new Channel(this,Datagram::default_socket(),addr);
} else {
- delete ft;
- return -1;
+ pex_in_.push_back(addr);
+ if (pex_in_.size()>1000)
+ pex_in_.pop_front();
}
}
-void Close (int fdes) {
- // FIXME delete all channels
- delete FileTransfer::files[fdes];
- FileTransfer::files[fdes] = NULL;
-}
-
+int FileTransfer::RevealChannel (int& pex_out_) { // FIXME brainfuck
+ pex_out_ -= hs_in_offset_;
+ if (pex_out_<0)
+ pex_out_ = 0;
+ while (pex_out_<hs_in_.size()) {
+ Channel* c = Channel::channel(hs_in_[pex_out_]);
+ if (c && c->transfer().fd()==this->fd()) {
+ if (c->is_established()) {
+ pex_out_ += hs_in_offset_ + 1;
+ return c->id();
+ } else
+ pex_out_++;
+ } else {
+ hs_in_[pex_out_] = hs_in_[0];
+ hs_in_.pop_front();
+ hs_in_offset_++;
+ }
+ }
+ pex_out_ += hs_in_offset_;
+ return -1;
+}
-/*
- for(int i=0; i<peak_hash_count; i++) {
- bin64_t x = peaks[i], end = x.sibling();
- do {
- while (!x.layer()>10) {
- OfferHash(x.right(), hashes[x.right()]);
- if ( ! OfferHash(x.left(), hashes[x.left()]) )
- break;
- x = x.left();
- }
-
- if (x.layer()==10) {
- if (recheck_data) {
- uint8_t data[1024];
- size_t rd = pread(fd,data,2<<10,x.base_offset());
- if (hashes[x]==Sha1Hash(data,rd))
- ack_out->set(x,bins::FILLED);
- // may avoid hashing by checking whether it is zero
- // and whether the hash matches hash of zero
- } else {
- ack_out->set(x,bins::FILLED);
- }
- }
-
- while (x.is_right() && x!=peaks[i])
- x = x.parent();
- x = x.sibling();
- } while (x!=end);
- }
-
- */