towards swarming
[swift-upb.git] / transfer.cpp
1 /*
2  *  transfer.cpp
3  *  p2tp
4  *
5  *  Created by Victor Grishchenko on 10/6/09.
6  *  Copyright 2009 Delft University of Technology. All rights reserved.
7  *
8  */
9 #ifdef _WIN32
10 #include "compat/unixio.h"
11 #else
12 #include <sys/mman.h>
13 #endif
14 #include <errno.h>
15 #include <string>
16 #include <sstream>
17 #include "p2tp.h"
18 #include "compat/util.h"
19
20 using namespace p2tp;
21
22 std::vector<FileTransfer*> FileTransfer::files(20);
23
24 int FileTransfer::instance = 0;
25 #define BINHASHSIZE (sizeof(bin64_t)+sizeof(Sha1Hash))
26
27 #include "ext/seq_picker.cpp"
28
29 // FIXME: separate Bootstrap() and Download(), then Size(), Progress(), SeqProgress()
30
31 FileTransfer::FileTransfer (const char* filename, const Sha1Hash& _root_hash) :
32     root_hash_(_root_hash), fd_(0), hashfd_(0), dry_run_(false),
33     peak_count_(0), hashes_(NULL), error_(NULL), size_(0), sizek_(0),
34     complete_(0), completek_(0), seq_complete_(0), hs_in_offset_(0)
35 {
36         fd_ = open(filename,O_RDWR|O_CREAT,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
37         if (fd_<0)
38         return;
39     if (files.size()<fd_+1)
40         files.resize(fd_+1);
41     files[fd_] = this;
42     if (root_hash_==Sha1Hash::ZERO) // fresh submit, hash it
43         Submit();
44     else
45         RecoverProgress();
46     picker_ = new SeqPiecePicker(this);
47 }
48
49
50 void FileTransfer::LoadPeaks () {
51     std::string file_name = GetTempFilename(root_hash_,instance,std::string(".peaks"));
52     int peakfd = open(file_name.c_str(),O_RDONLY,0);
53     if (peakfd<0)
54         return;
55     bin64_t peak;
56     char hash[128];
57     while (sizeof(bin64_t)==read(peakfd,&peak,sizeof(bin64_t))) {
58         read(peakfd,hash,Sha1Hash::SIZE);
59         OfferPeak(peak, Sha1Hash(false,hash));
60     }
61     close(peakfd);
62 }
63
64
65 /** Basically, simulated receiving every single packet, except
66     for some optimizations. */
67 void            FileTransfer::RecoverProgress () {
68     dry_run_ = true;
69     LoadPeaks();
70     if (!size())
71         return;
72     // at this point, we may use mmapd hashes already
73     // so, lets verify hashes and the data we've got
74     lseek(fd_,0,SEEK_SET);
75     for(int p=0; p<size_kilo(); p++) {
76         uint8_t buf[1<<10];
77         size_t rd = read(fd_,buf,1<<10);
78         OfferData(bin64_t(0,p), buf, rd);
79         if (rd<(1<<10))
80             break;
81     }
82     dry_run_ = false;
83 }
84
85
86 void    FileTransfer::SavePeaks () {
87     std::string file_name = GetTempFilename(root_hash_,instance,std::string(".peaks"));
88     int peakfd = open(file_name.c_str(),O_RDWR|O_CREAT,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
89     for(int i=0; i<peak_count(); i++) {
90         write(peakfd,&(peaks_[i]),sizeof(bin64_t));
91         write(peakfd,*peak_hashes_[i],Sha1Hash::SIZE);
92     }
93     close(peakfd);
94 }
95
96
97 void FileTransfer::SetSize (size_t bytes) { // peaks/root must be already set
98     size_ = bytes;
99     completek_ = complete_ = seq_complete_ = 0;
100         sizek_ = (size_>>10) + ((size_&1023) ? 1 : 0);
101
102         struct stat st;
103         fstat(fd_, &st);
104     if (st.st_size!=bytes)
105         if (ftruncate(fd_, bytes))
106             return; // remain in the 0-state
107     // mmap the hash file into memory
108     std::string file_name = GetTempFilename(root_hash_,instance,std::string(".hashes"));
109         hashfd_ = open(file_name.c_str(),O_RDWR|O_CREAT,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
110     size_t expected_size_ = Sha1Hash::SIZE * sizek_ * 2;
111         struct stat hash_file_st;
112         fstat(hashfd_, &hash_file_st);
113     if ( hash_file_st.st_size != expected_size_ )
114         ftruncate(hashfd_, expected_size_);
115 #ifdef _WIN32
116     HANDLE hashhandle = (HANDLE)_get_osfhandle(hashfd_);
117     hashmaphandle_ = CreateFileMapping(hashhandle,
118                                               NULL,
119                                               PAGE_READWRITE,
120                                               0,
121                                               0,
122                                               NULL);
123         if (hashmaphandle_ != NULL)
124         {
125                 hashes_ = (Sha1Hash*)MapViewOfFile(hashmaphandle_,
126                                                          FILE_MAP_WRITE,
127                                                      0,
128                                                      0,
129                                                      0);
130
131         }
132         if (hashmaphandle_ == NULL || hashes_ == NULL)
133 #else
134     hashes_ = (Sha1Hash*) mmap (NULL, expected_size_, PROT_READ|PROT_WRITE,
135                                MAP_SHARED, hashfd_, 0);
136     if (hashes_==MAP_FAILED)
137 #endif
138     {
139         hashes_ = NULL;
140         size_ = sizek_ = complete_ = completek_ = seq_complete_ = 0;
141         error_ = strerror(errno); // FIXME dprintf()
142         perror("hash tree mmap failed");
143         return;
144     }
145     for(int i=0; i<peak_count_; i++)
146         hashes_[peaks_[i]] = peak_hashes_[i];
147 }
148
149
150 void            FileTransfer::Submit () {
151         struct stat st; // TODO:   AppendData()   and   streaming
152         fstat(fd_, &st);
153     size_ = st.st_size;
154         sizek_ = (size_>>10) + ((size_&1023) ? 1 : 0);
155     hashes_ = (Sha1Hash*) malloc(Sha1Hash::SIZE*sizek_*2);
156     peak_count_ = bin64_t::peaks(sizek_,peaks_);
157     for (int p=0; p<peak_count_; p++) {
158         for(bin64_t b=peaks_[p].left_foot(); b.within(peaks_[p]); b=b.next_dfsio(0))
159             if (b.is_base()) {
160                 uint8_t kilo[1<<10];
161                 size_t rd = pread(fd_,kilo,1<<10,b.base_offset()<<10);
162                 hashes_[b] = Sha1Hash(kilo,rd);
163             } else
164                 hashes_[b] = Sha1Hash(hashes_[b.left()],hashes_[b.right()]);
165         peak_hashes_[p] = hashes_[peaks_[p]];
166         //ack_out_.set(peaks_[p],bins::FILLED);
167         OnDataIn(peaks_[p]);
168     }
169     root_hash_ = DeriveRoot();
170     Sha1Hash *hash_tmp = hashes_;
171     SetSize(st.st_size);
172     SavePeaks();
173     seq_complete_ = complete_ = size_;
174     completek_ = sizek_;
175     memcpy(hashes_,hash_tmp,sizek_*Sha1Hash::SIZE*2);
176     free(hash_tmp);
177 }
178
179
180 bin64_t         FileTransfer::peak_for (bin64_t pos) const {
181     int pi=0;
182     while (pi<peak_count_ && !pos.within(peaks_[pi]))
183         pi++;
184     return pi==peak_count_ ? bin64_t(bin64_t::NONE) : peaks_[pi];
185 }
186
187
188 void            FileTransfer::OfferHash (bin64_t pos, const Sha1Hash& hash) {
189         if (!size_)  // only peak hashes are accepted at this point
190                 return OfferPeak(pos,hash);
191     int pi=0;
192     while (pi<peak_count_ && !pos.within(peaks_[pi]))
193         pi++;
194     if (pi==peak_count_)
195         return;
196     if (pos==peaks_[pi] && hash!=peak_hashes_[pi])
197         return;
198     else if (ack_out_.get(pos.parent())!=bins::EMPTY)
199         return; // have this hash already, even accptd data
200         hashes_[pos] = hash;
201 }
202
203
204 bool            FileTransfer::OfferData (bin64_t pos, const uint8_t* data, size_t length) {
205     if (!pos.is_base())
206         return false;
207     if (length<1024 && pos!=bin64_t(0,sizek_-1))
208         return false;
209     if (ack_out_.get(pos)==bins::FILLED)
210         return true; // ???
211     bin64_t peak = peak_for(pos);
212     if (peak==bin64_t::NONE)
213         return false;
214
215     Sha1Hash hash(data,length);
216     bin64_t p = pos;
217     while ( p!=peak && ack_out_.get(p)==bins::EMPTY ) {
218         hashes_[p] = hash;
219         p = p.parent();
220         hash = Sha1Hash(hashes_[p.left()],hashes_[p.right()]) ;
221     }
222     if (hash!=hashes_[p])
223         return false;
224
225     //printf("g %lli %s\n",(uint64_t)pos,hash.hex().c_str());
226         // walk to the nearest proven hash   FIXME 0-layer peak
227     OnDataIn(pos);
228     pwrite(fd_,data,length,pos.base_offset()<<10);
229     complete_ += length;
230     completek_++;
231     if (length<1024) {
232         size_ -= 1024 - length;
233         ftruncate(fd_, size_);
234     }
235     while (ack_out_.get(bin64_t(0,seq_complete_>>10))==bins::FILLED)
236         seq_complete_+=1024;
237     if (seq_complete_>size_)
238         seq_complete_ = size_;
239     return true;
240 }
241
242
243 /*bin64_t         FileTransfer::RevealAck (uint64_t& offset) {
244     if (offset<data_in_off_)
245         offset = data_in_off_;
246     for(int off=offset-data_in_off_; off<data_in_.size(); off++) {
247         offset++;
248         if (data_in_[off]!=bin64_t::NONE) {
249             bin64_t parent = data_in_[off].parent();
250             if (ack_out_.get(parent)!=bins::FILLED)
251                 return data_in_[off];
252             else
253                 data_in_[off] = bin64_t::NONE;
254         }
255     }
256     return bin64_t::NONE;
257 }*/
258
259
260 void            FileTransfer::OnDataIn (bin64_t pos) {
261     ack_out_.set(pos,bins::FILLED);
262     /*bin64_t closed = pos;
263     while (ack_out_.get(closed.parent())==bins::FILLED) // TODO optimize
264         closed = closed.parent();
265     data_in_.push_back(closed);
266     // rotating the queue
267     bin64_t parent = data_in_.front().parent();
268     if (ack_out_.get(parent)!=bins::FILLED)
269         data_in_.push_back(data_in_.front());
270     data_in_.front() = bin64_t::NONE;
271     while ( !data_in_.empty() && data_in_.front()==bin64_t::NONE) {
272         data_in_.pop_front();
273         data_in_off_++;
274     }*/
275 }
276
277
278 Sha1Hash        FileTransfer::DeriveRoot () {
279         int c = peak_count_-1;
280         bin64_t p = peaks_[c];
281         Sha1Hash hash = peak_hashes_[c];
282         c--;
283         while (p!=bin64_t::ALL) {
284                 if (p.is_left()) {
285                         p = p.parent();
286                         hash = Sha1Hash(hash,Sha1Hash::ZERO);
287                 } else {
288                         if (c<0 || peaks_[c]!=p.sibling())
289                                 return Sha1Hash::ZERO;
290                         hash = Sha1Hash(peak_hashes_[c],hash);
291                         p = p.parent();
292                         c--;
293                 }
294         //printf("p %lli %s\n",(uint64_t)p,hash.hex().c_str());
295         }
296     return hash;
297 }
298
299
300 void            FileTransfer::OfferPeak (bin64_t pos, const Sha1Hash& hash) {
301     assert(!size_);
302     if (peak_count_) {
303         bin64_t last_peak = peaks_[peak_count_-1];
304         if ( pos.layer()>=last_peak.layer() ||
305              pos.base_offset()!=last_peak.base_offset()+last_peak.width() )
306             peak_count_ = 0;
307     }
308     peaks_[peak_count_] = pos;
309     peak_hashes_[peak_count_++] = hash;
310     // check whether peak hash candidates add up to the root hash
311     Sha1Hash mustbe_root = DeriveRoot();
312     if (mustbe_root!=root_hash_)
313         return;
314     // bingo, we now know the file size (rounded up to a KByte)
315     SetSize( (pos.base_offset()+pos.width()) << 10               );
316     SavePeaks();
317 }
318
319
320 FileTransfer::~FileTransfer ()
321 {
322 #ifdef _WIN32
323         UnmapViewOfFile(hashes_);
324         CloseHandle(hashmaphandle_);
325 #else
326     munmap(hashes_,sizek_*2*Sha1Hash::SIZE);
327     close(hashfd_);
328     close(fd_);
329     files[fd_] = NULL;
330 #endif
331 }
332
333
334 FileTransfer* FileTransfer::Find (const Sha1Hash& root_hash) {
335     for(int i=0; i<files.size(); i++)
336         if (files[i] && files[i]->root_hash()==root_hash)
337             return files[i];
338     return NULL;
339 }
340
341
342 std::string FileTransfer::GetTempFilename(Sha1Hash& root_hash, int instance, std::string postfix)
343 {
344         std::string tempfile = gettmpdir();
345         std::stringstream ss;
346         ss << instance;
347         tempfile += std::string(".") + root_hash.hex() + std::string(".") + ss.str() + postfix;
348         return tempfile;
349 }
350
351
352 /*int      p2tp::Open (const char* filename, const Sha1Hash& hash) {
353     FileTransfer* ft = new FileTransfer(filename, hash);
354     int fdes = ft->file_descriptor();
355     if (fdes>0) {
356         if (FileTransfer::files.size()<fdes)
357             FileTransfer::files.resize(fdes);
358         FileTransfer::files[fdes] = ft;
359         return fdes;
360     } else {
361         delete ft;
362         return -1;
363     }
364 }*/
365
366
367 void            FileTransfer::OnPexIn (const Address& addr) {
368     for(int i=0; i<hs_in_.size(); i++) {
369         Channel* c = Channel::channels[hs_in_[i]];
370         if (c && c->file_==this && c->peer_==addr)
371             return; // already connected
372     }
373     if (hs_in_.size()<20) {
374         new Channel(this,Channel::sockets[0],addr);
375     } else {
376         pex_in_.push_back(addr);
377         if (pex_in_.size()>1000)
378             pex_in_.pop_front();
379     }
380 }
381
382
383 int        FileTransfer::RevealChannel (int& pex_out_) {
384     pex_out_ -= hs_in_offset_;
385     if (pex_out_<0)
386         pex_out_ = 0;
387     while (pex_out_<hs_in_.size()) {
388         Channel* c = Channel::channels[hs_in_[pex_out_]];
389         if (c && c->file_==this) {
390             pex_out_ += hs_in_offset_ + 1;
391             return c->id;
392         } else {
393             hs_in_[pex_out_] = hs_in_[0];
394             hs_in_.pop_front();
395             hs_in_offset_++;
396         }
397     }
398     pex_out_ += hs_in_offset_;
399     return -1;
400 }
401
402
403 /*
404  for(int i=0; i<peak_hash_count; i++) {
405  bin64_t x = peaks[i], end = x.sibling();
406  do {
407  while (!x.layer()>10) {
408  OfferHash(x.right(), hashes[x.right()]);
409  if ( ! OfferHash(x.left(), hashes[x.left()]) )
410  break;
411  x = x.left();
412  }
413
414  if (x.layer()==10) {
415  if (recheck_data) {
416  uint8_t data[1024];
417  size_t rd = pread(fd,data,2<<10,x.base_offset());
418  if (hashes[x]==Sha1Hash(data,rd))
419  ack_out->set(x,bins::FILLED);
420  // may avoid hashing by checking whether it is zero
421  // and whether the hash matches hash of zero
422  } else {
423  ack_out->set(x,bins::FILLED);
424  }
425  }
426
427  while (x.is_right() && x!=peaks[i])
428  x = x.parent();
429  x = x.sibling();
430  } while (x!=end);
431  }
432
433
434
435
436  // open file
437  if ( hash_file_st.st_size < (sizeof(bin64_t)+Sha1Hash::SIZE)*64 )
438  return;
439  // read root hash
440  char hashbuf[128];
441  uint64_t binbuf;
442  lseek(hashfd_,0,SEEK_SET);
443  read(hashfd_,&binbuf,sizeof(bin64_t));
444  read(hashfd_,hashbuf,Sha1Hash::SIZE);
445  Sha1Hash mustberoot(false,(const char*)hashbuf);
446  if ( binbuf!=bin64_t::ALL || mustberoot != this->root_hash ) {
447  ftruncate(hashfd_,Sha1Hash::SIZE*64);
448  return;
449  }
450  // read peak hashes
451  for(int i=1; i<64 && !this->size; i++){
452  read(hashfd_,&binbuf,sizeof(bin64_t));
453  read(hashfd_,hashbuf,Sha1Hash::SIZE);
454  Sha1Hash mustbepeak(false,(const char*)hashbuf);
455  if (mustbepeak==Sha1Hash::ZERO)
456  break;
457  OfferPeak(binbuf,mustbepeak);
458  }
459  if (!size)
460  return;
461
462
463  */