transfer test OK
authorvictor <victor@e16421f0-f15b-0410-abcd-98678b794739>
Tue, 13 Oct 2009 08:07:47 +0000 (08:07 +0000)
committervictor <victor@e16421f0-f15b-0410-abcd-98678b794739>
Tue, 13 Oct 2009 08:07:47 +0000 (08:07 +0000)
git-svn-id: https://ttuki.vtt.fi/svn/p2p-next/TUD/p2tp/trunk@402 e16421f0-f15b-0410-abcd-98678b794739

ext/seq_picker.cpp
p2tp.h
tests/transfertest.cpp
transfer.cpp

index dbb7e83..95dc351 100644 (file)
@@ -18,7 +18,7 @@ class SeqPiecePicker : public PiecePicker {
     
 public:
     
-    SeqPiecePicker (FileTransfer* file_) : file(file_) {
+    SeqPiecePicker (FileTransfer* file_) : file(file_), hint_out() {
     }
     
     virtual bin64_t Pick (bins& from, uint8_t layer) {
diff --git a/p2tp.h b/p2tp.h
index 7286975..2e3016f 100644 (file)
--- a/p2tp.h
+++ b/p2tp.h
@@ -108,6 +108,8 @@ namespace p2tp {
     public:
 
                static std::vector<FileTransfer*> files;
+        static const char* HASH_FILE_TEPMLATE;
+        static const char* PEAK_FILE_TEPMLATE;
                        
                /**     file descriptor. */
                int                             fd;
@@ -136,6 +138,8 @@ namespace p2tp {
         Sha1Hash*       hashes;
         /** for recovering saved state */
         bool            dry_run;
+        /** Error encountered */
+        char*           error;
         
     protected:
         void            SetSize(size_t bytes);
@@ -143,6 +147,8 @@ namespace p2tp {
         void            RecoverProgress();
         void            OfferPeak (bin64_t pos, const Sha1Hash& hash);
         Sha1Hash        DeriveRoot();
+        void            SavePeaks();
+        void            LoadPeaks();
 
                friend class Channel;
        };
index f8c6993..5e60ec6 100644 (file)
@@ -25,11 +25,19 @@ TEST(TransferTest,TransferFile) {
     E000 = Sha1Hash(E0,Sha1Hash::ZERO);
     ABCDE000 = Sha1Hash(ABCD,E000);
     ROOT = ABCDE000;
-    for (bin64_t pos(3,0); pos!=bin64_t::ALL; pos=pos.parent())
+    for (bin64_t pos(3,0); pos!=bin64_t::ALL; pos=pos.parent()) {
         ROOT = Sha1Hash(ROOT,Sha1Hash::ZERO);
+        //printf("m %lli %s\n",(uint64_t)pos.parent(),ROOT.hex().c_str());
+    }
     
     // submit a new file
     FileTransfer* seed = new FileTransfer(Sha1Hash::ZERO,BTF);
+    EXPECT_TRUE(A==seed->hashes[0]);
+    EXPECT_TRUE(E==seed->hashes[bin64_t(0,4)]);
+    EXPECT_TRUE(ABCD==seed->hashes[bin64_t(2,0)]);
+    EXPECT_TRUE(ROOT==seed->root_hash);
+    EXPECT_TRUE(ABCD==seed->peak_hashes[0]);
+    EXPECT_TRUE(E==seed->peak_hashes[1]);
     EXPECT_TRUE(ROOT==seed->root_hash);
     EXPECT_EQ(4100,seed->size);
     EXPECT_EQ(5,seed->sizek);
@@ -37,31 +45,33 @@ TEST(TransferTest,TransferFile) {
     EXPECT_EQ(4100,seed->seq_complete);
     
     // retrieve it
+    unlink("copy");
     FileTransfer* leech = new FileTransfer(seed->root_hash,"copy");
     // transfer peak hashes
     for(int i=0; i<seed->peak_count; i++)
         leech->OfferHash(seed->peaks[i],seed->peak_hashes[i]);
-    ASSERT_EQ(1<<12,leech->size);
-    ASSERT_EQ(4,leech->sizek);
+    ASSERT_EQ(5<<10,leech->size);
+    ASSERT_EQ(5,leech->sizek);
     ASSERT_EQ(0,leech->complete);
     // transfer data and hashes
-    //        ABCDHASH
-    //    ABHASH    CDHASH
-    //  AAAA BBBB  CCCC DDDD  E
+    //           ABCD            E000
+    //     AB         CD       E0    0
+    //  AAAA BBBB  CCCC DDDD  E  0  0  0
     leech->OfferHash(bin64_t(1,0), seed->hashes[bin64_t(1,0)]);
     leech->OfferHash(bin64_t(1,1), seed->hashes[bin64_t(1,1)]);
-    for (int i=0; i<4; i++) {
+    for (int i=0; i<5; i++) {
         /*if (leech->seq_complete==3) {
             delete leech;
             leech = new FileTransfer(seed,"copy");
             EXPECT_EQ(3,leech->complete);
         }*/
         bin64_t next = leech->picker->Pick(seed->ack_out,0);
+        ASSERT_NE(bin64_t::NONE,next);
         uint8_t buf[1024];         //size_t len = seed->storer->ReadData(next,&buf);
-        size_t len = pread(seed->fd,buf,1024,next.base_offset());
+        size_t len = pread(seed->fd,buf,1024,next.base_offset()<<10); // FIXME TEST FOR ERROR
         bin64_t sibling = next.sibling();
-        leech->OfferHash(sibling, seed->hashes[sibling]);
-        leech->OfferData(next, buf, len);
+        leech->OfferHash(sibling, seed->hashes[sibling]); // i=4 => out of bounds
+        EXPECT_TRUE(leech->OfferData(next, buf, len));
     }
     EXPECT_EQ(4100,leech->size);
     EXPECT_EQ(5,leech->sizek);
@@ -71,10 +81,17 @@ TEST(TransferTest,TransferFile) {
     unlink("copy");
     
 }
-
+/*
+ FIXME
+ - always rehashes (even fresh files)
+ - different heights => bins::remove is buggy
+ */
 
 int main (int argc, char** argv) {
     
+    unlink("/tmp/.70196e6065a42835b1f08227ac3e2fb419cf78c8.hashes");
+    unlink("/tmp/.70196e6065a42835b1f08227ac3e2fb419cf78c8.peaks");
+    
        int f = open(BTF,O_RDWR|O_CREAT|O_TRUNC,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
     uint8_t buf[1024];
     memset(buf,'A',1024);
index 8bb74c9..15b8404 100644 (file)
@@ -6,66 +6,64 @@
  *  Copyright 2009 Delft Technical University. All rights reserved.
  *
  */
-#include "p2tp.h"
 #include <sys/mman.h>
+#include <errno.h>
+#include "p2tp.h"
 
 using namespace p2tp;
 
 std::vector<FileTransfer*> FileTransfer::files(20);
+const char* FileTransfer::HASH_FILE_TEPMLATE = "/tmp/.%s.hashes";
+const char* FileTransfer::PEAK_FILE_TEPMLATE = "/tmp/.%s.peaks";
+#define BINHASHSIZE (sizeof(bin64_t)+sizeof(Sha1Hash))
+
+#include "ext/seq_picker.cpp"
 
 
 FileTransfer::FileTransfer (const Sha1Hash& _root_hash, const char* filename) :
-    root_hash(_root_hash), fd(0), hashfd(0), dry_run(false), peak_count(0), hashes(NULL)
+    root_hash(_root_hash), fd(0), hashfd(0), dry_run(false), 
+    peak_count(0), hashes(NULL), error(NULL)
 {
        fd = open(filename,O_RDWR|O_CREAT,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
-       hashfd = open("/tmp/hahs",O_RDWR|O_CREAT,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH); // FIXME
-       if (fd<0 || hashfd<0)
+       if (fd<0)
         return;
     if (root_hash==Sha1Hash::ZERO) // fresh submit, hash it
         Submit();
     else
-        RecoverProgress();    
+        RecoverProgress();
+    picker = new SeqPiecePicker(this);
+}
+
+void FileTransfer::LoadPeaks () {
+    char file_name[1024];
+    sprintf(file_name,PEAK_FILE_TEPMLATE,root_hash.hex().c_str());
+    int peakfd = open(file_name,O_RDONLY);
+    if (peakfd<0)
+        return;
+    bin64_t peak;
+    char hash[128];
+    while (sizeof(bin64_t)==read(peakfd,&peak,sizeof(bin64_t))) {
+        read(peakfd,hash,Sha1Hash::SIZE);
+        OfferPeak(peak, Sha1Hash(false,hash));
+    }
+    close(peakfd);
 }
 
 
 /** Basically, simulated receiving every single packet, except
     for some optimizations. */
 void            FileTransfer::RecoverProgress () {
-    // open file
-       struct stat hash_file_st;
-       fstat(fd, &hash_file_st);
-    if ( hash_file_st.st_size < (sizeof(bin64_t)+Sha1Hash::SIZE)*64 )
-        return;
-    // read root hash
-    char hashbuf[128];
-    uint64_t binbuf;
-    lseek(hashfd,0,SEEK_SET);
-    read(hashfd,&binbuf,sizeof(bin64_t));
-    read(hashfd,hashbuf,Sha1Hash::SIZE);
-    Sha1Hash mustberoot(false,(const char*)hashbuf);
-    if ( binbuf!=bin64_t::ALL || mustberoot != this->root_hash ) {
-        ftruncate(hashfd,Sha1Hash::SIZE*64);
-        return;
-    }
-    // read peak hashes
-    for(int i=1; i<64 && !this->size; i++){
-        read(hashfd,&binbuf,sizeof(bin64_t));
-        read(hashfd,hashbuf,Sha1Hash::SIZE);
-        Sha1Hash mustbepeak(false,(const char*)hashbuf);
-        if (mustbepeak==Sha1Hash::ZERO)
-            break;
-        OfferPeak(binbuf,mustbepeak);
-    }
+    dry_run = true;
+    LoadPeaks();
     if (!size)
         return;
     // at this point, we may use mmapd hashes already
     // so, lets verify hashes and the data we've got
-    dry_run = true;
     lseek(fd,0,SEEK_SET);
     for(int p=0; p<sizek; p++) {
         uint8_t buf[1<<10];
         size_t rd = read(fd,buf,1<<10);
-        OfferData(bin64_t(10,p), buf, rd);
+        OfferData(bin64_t(0,p), buf, rd);
         if (rd<(1<<10))
             break;
     }
@@ -73,64 +71,95 @@ void            FileTransfer::RecoverProgress () {
 }
 
 
-void FileTransfer::SetSize (size_t bytes) {
+void    FileTransfer::SavePeaks () {
+    char file_name[1024];
+    sprintf(file_name,PEAK_FILE_TEPMLATE,root_hash.hex().c_str());
+    int peakfd = open(file_name,O_RDWR|O_CREAT,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
+    for(int i=0; i<peak_count; i++) {
+        write(peakfd,&(peaks[i]),sizeof(bin64_t));
+        write(peakfd,*peak_hashes[i],Sha1Hash::SIZE);
+    }
+    close(peakfd);
+}
+
+
+void FileTransfer::SetSize (size_t bytes) { // peaks/root must be already set
+    size = bytes;
+    completek = complete = seq_complete = 0;
+       sizek = (size>>10) + ((size&1023) ? 1 : 0);
+    
+    char file_name[1024];
        struct stat st;
        fstat(fd, &st);
     if (st.st_size!=bytes)
         if (ftruncate(fd, bytes))
             return; // remain in the 0-state
-    complete = size = bytes;
-       completek = sizek = (size>>10) + (size%1023 ? 1 : 0);
-    peak_count = bin64_t::peaks(sizek,peaks);
-    ftruncate( hashfd, sizek*2*Sha1Hash::SIZE + 
-               (sizeof(bin64_t)+Sha1Hash::SIZE)*64 );
-    lseek(hashfd,0,SEEK_SET);
-    write(hashfd,&bin64_t::ALL,sizeof(bin64_t));
-    write(hashfd,*root_hash,Sha1Hash::SIZE);
-    for(int i=0; i<peak_count; i++) {
-        write(hashfd,&(peaks[i]),sizeof(bin64_t));
-        write(hashfd,*(peak_hashes[i]),Sha1Hash::SIZE);
+    // mmap the hash file into memory
+    sprintf(file_name,HASH_FILE_TEPMLATE,root_hash.hex().c_str());
+       hashfd = open(file_name,O_RDWR|O_CREAT,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
+    size_t expected_size = Sha1Hash::SIZE * sizek * 2;
+       struct stat hash_file_st;
+       fstat(hashfd, &hash_file_st);
+    if ( hash_file_st.st_size != expected_size )
+        ftruncate(hashfd, expected_size);
+    hashes = (Sha1Hash*) mmap (NULL, expected_size, PROT_READ|PROT_WRITE, 
+                               MAP_PRIVATE, hashfd, 0);
+    if (hashes==MAP_FAILED) {
+        hashes = NULL;
+        size = sizek = complete = completek = seq_complete = 0;
+        error = strerror(errno);
+        perror("hash tree mmap failed");
+        return;
     }
-    uint8_t zeros[256];
-    memset(zeros,0,256);
-    for(int i=peak_count; i<63; i++) 
-        write(hashfd,zeros,Sha1Hash::SIZE+sizeof(bin64_t));
-    hashes = (Sha1Hash*) mmap (NULL, sizek*2*sizeof(Sha1Hash), PROT_READ|PROT_WRITE, MAP_FILE,
-                   hashfd, (sizeof(bin64_t)+sizeof(Sha1Hash))*64 );
-}
+    for(int i=0; i<peak_count; i++)
+        hashes[peaks[i]] = peak_hashes[i];
+}  
+
 
 void            FileTransfer::Submit () {
-       struct stat st;
+       struct stat st; // TODO:   AppendData()   and   streaming
        fstat(fd, &st);
-    SetSize(st.st_size);
-    if (!size)
-        return;
+    size = st.st_size;
+       sizek = (size>>10) + ((size&1023) ? 1 : 0);
+    hashes = (Sha1Hash*) malloc(Sha1Hash::SIZE*sizek*2);
+    peak_count = bin64_t::peaks(sizek,peaks);
     for (int p=0; p<peak_count; p++) {
-        for(bin64_t b=peaks[p].left_foot(); b.within(peaks[p]); b=b.next_dfsio(10)) 
+        for(bin64_t b=peaks[p].left_foot(); b.within(peaks[p]); b=b.next_dfsio(0)) 
             if (b.is_base()) {
                 uint8_t kilo[1<<10];
-                size_t rd = pread(fd,kilo,1<<10,b.base_offset());
+                size_t rd = pread(fd,kilo,1<<10,b.base_offset()<<10);
                 hashes[b] = Sha1Hash(kilo,rd);
             } else
                 hashes[b] = Sha1Hash(hashes[b.left()],hashes[b.right()]);
-        peak_hashes[peaks[p]] = hashes[peaks[p]];
+        peak_hashes[p] = hashes[peaks[p]];
+        ack_out.set(peaks[p],bins::FILLED);
     }
     root_hash = DeriveRoot();
+    Sha1Hash *hash_tmp = hashes;
+    SetSize(st.st_size);
+    SavePeaks();
+    seq_complete = complete = size;
+    completek = sizek;
+    memcpy(hashes,hash_tmp,sizek*Sha1Hash::SIZE*2);
+    free(hash_tmp);
 }
 
 
 void            FileTransfer::OfferHash (bin64_t pos, const Sha1Hash& hash) {    
        if (!size)  // only peak hashes are accepted at this point
                return OfferPeak(pos,hash);
-       if (pos.base_offset()>=sizek)
+       if (pos>=sizek*2)
                return;
     if (ack_out.get(pos)!=bins::EMPTY)
         return; // have this hash already, even accptd data
        hashes[pos] = hash;
 }
 
+
 bool            FileTransfer::OfferData (bin64_t pos, uint8_t* data, size_t length) {
-    if (pos.layer()!=10)
+    if (!pos.is_base())
+        return false;
+    if (length<1024 && pos!=bin64_t(0,sizek-1))
         return false;
     if (ack_out.get(pos)==bins::FILLED)
         return true; // ???
@@ -140,22 +169,41 @@ bool            FileTransfer::OfferData (bin64_t pos, uint8_t* data, size_t leng
     if (peak==peak_count)
         return false;
     Sha1Hash hash(data,length);
-    hashes[pos] = hash;
-       // walk to the nearest proven hash
-    for(bin64_t p = pos.parent(); p.within(peaks[peak]) && ack_out.get(p)==bins::EMPTY; p=p.parent())
-        if (hashes[p]!=Sha1Hash(hashes[p.left()],hashes[p.right()]))
-            return false; // hash mismatch
+    if (pos==peaks[peak]) {
+        if (hash!=peak_hashes[peak])
+            return false;
+    } else {
+        hashes[pos] = hash;
+        for(bin64_t p = pos.parent(); p.within(peaks[peak]) && ack_out.get(p)==bins::EMPTY; p=p.parent()) {
+            Sha1Hash phash = Sha1Hash(hashes[p.left()],hashes[p.right()]) ;
+            if (hashes[p]!=phash)
+                return false; // hash mismatch
+        }
+    }
+    //printf("g %lli %s\n",(uint64_t)pos,hash.hex().c_str());
+       // walk to the nearest proven hash   FIXME 0-layer peak
     ack_out.set(pos,bins::FILLED);
     pwrite(fd,data,length,pos.base_offset());
+    complete += length;
+    completek++;
+    if (length<1024) {
+        size -= 1024 - length;
+        ftruncate(fd, size);
+    }
+    while (ack_out.get(bin64_t(0,seq_complete>>10))==bins::FILLED)
+        seq_complete+=1024;
+    if (seq_complete>size)
+        seq_complete = size;
     return true;
 }
 
+
 Sha1Hash        FileTransfer::DeriveRoot () {
        int c = peak_count-1;
        bin64_t p = peaks[c];
        Sha1Hash hash = peak_hashes[c];
        c--;
-       while (p<bin64_t::ALL) {
+       while (p!=bin64_t::ALL) {
                if (p.is_left()) {
                        p = p.parent();
                        hash = Sha1Hash(hash,Sha1Hash::ZERO);
@@ -166,10 +214,12 @@ Sha1Hash        FileTransfer::DeriveRoot () {
                        p = p.parent();
                        c--;
                }
+        //printf("p %lli %s\n",(uint64_t)p,hash.hex().c_str());
        }
     return hash;
 }
 
+
 void            FileTransfer::OfferPeak (bin64_t pos, const Sha1Hash& hash) {
     assert(!size);
     if (peak_count) {
@@ -182,17 +232,20 @@ void            FileTransfer::OfferPeak (bin64_t pos, const Sha1Hash& hash) {
     peak_hashes[peak_count++] = hash;
     // check whether peak hash candidates add up to the root hash
     Sha1Hash mustbe_root = DeriveRoot();
-    if (hash!=root_hash)
+    if (mustbe_root!=root_hash)
         return;
     // bingo, we now know the file size (rounded up to a KByte)
-    SetSize(pos.base_offset()+pos.width());
+    SetSize( (pos.base_offset()+pos.width()) << 10              );
+    SavePeaks();
 }
 
+
 FileTransfer::~FileTransfer () {
     munmap(hashes,sizek*2*Sha1Hash::SIZE);
     close(hashfd);
     close(fd);
 }
+
                            
 FileTransfer* FileTransfer::Find (const Sha1Hash& root_hash) {
     for(int i=0; i<files.size(); i++)
@@ -201,10 +254,12 @@ FileTransfer* FileTransfer::Find (const Sha1Hash& root_hash) {
     return NULL;
 }
 
+
 int      p2tp::Open (const char* filename) {
     return Open(Sha1Hash::ZERO,filename);
 }
 
+
 int      p2tp::Open (const Sha1Hash& hash, const char* filename) {
     FileTransfer* ft = new FileTransfer(hash, filename);
     if (ft->fd>0) {
@@ -218,6 +273,7 @@ int      p2tp::Open (const Sha1Hash& hash, const char* filename) {
     }
 }
 
+
 void     Close (int fdes) {
     // FIXME delete all channels
     delete FileTransfer::files[fdes];
@@ -256,4 +312,34 @@ void     Close (int fdes) {
  } while (x!=end);
  }
  
+ // open file
+ if ( hash_file_st.st_size < (sizeof(bin64_t)+Sha1Hash::SIZE)*64 )
+ return;
+ // read root hash
+ char hashbuf[128];
+ uint64_t binbuf;
+ lseek(hashfd,0,SEEK_SET);
+ read(hashfd,&binbuf,sizeof(bin64_t));
+ read(hashfd,hashbuf,Sha1Hash::SIZE);
+ Sha1Hash mustberoot(false,(const char*)hashbuf);
+ if ( binbuf!=bin64_t::ALL || mustberoot != this->root_hash ) {
+ ftruncate(hashfd,Sha1Hash::SIZE*64);
+ return;
+ }
+ // read peak hashes
+ for(int i=1; i<64 && !this->size; i++){
+ read(hashfd,&binbuf,sizeof(bin64_t));
+ read(hashfd,hashbuf,Sha1Hash::SIZE);
+ Sha1Hash mustbepeak(false,(const char*)hashbuf);
+ if (mustbepeak==Sha1Hash::ZERO)
+ break;
+ OfferPeak(binbuf,mustbepeak);
+ }
+ if (!size)
+ return;
  */