twisted picker
[swift-upb.git] / transfer.cpp
1 /*
2  *  transfer.cpp
3  *  p2tp
4  *
5  *  Created by Victor Grishchenko on 10/6/09.
6  *  Copyright 2009 Delft University of Technology. All rights reserved.
7  *
8  */
9 #ifdef _WIN32
10 #include "compat/unixio.h"
11 #else
12 #include <sys/mman.h>
13 #endif
14 #include <errno.h>
15 #include <string>
16 #include <sstream>
17 #include "p2tp.h"
18 #include "compat/util.h"
19
20 using namespace p2tp;
21
22 std::vector<FileTransfer*> FileTransfer::files(20);
23
24 int FileTransfer::instance = 0;
25 #define BINHASHSIZE (sizeof(bin64_t)+sizeof(Sha1Hash))
26
27 #include "ext/seq_picker.cpp"
28
29 // FIXME: separate Bootstrap() and Download(), then Size(), Progress(), SeqProgress()
30
31 FileTransfer::FileTransfer (const char* filename, const Sha1Hash& _root_hash) :
32     root_hash_(_root_hash), fd_(0), hashfd_(0), dry_run_(false),
33     peak_count_(0), hashes_(NULL), error_(NULL), size_(0), sizek_(0),
34     complete_(0), completek_(0), seq_complete_(0), hs_in_offset_(0)
35 {
36         fd_ = open(filename,O_RDWR|O_CREAT,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
37         if (fd_<0)
38         return;
39     if (files.size()<fd_+1)
40         files.resize(fd_+1);
41     files[fd_] = this;
42     if (root_hash_==Sha1Hash::ZERO) // fresh submit, hash it
43         Submit();
44     else
45         RecoverProgress();
46     picker_ = new SeqPiecePicker(this);
47 }
48
49
50 void FileTransfer::LoadPeaks () {
51     std::string file_name = GetTempFilename(root_hash_,instance,std::string(".peaks"));
52     int peakfd = open(file_name.c_str(),O_RDONLY,0);
53     if (peakfd<0)
54         return;
55     bin64_t peak;
56     char hash[128];
57     while (sizeof(bin64_t)==read(peakfd,&peak,sizeof(bin64_t))) {
58         read(peakfd,hash,Sha1Hash::SIZE);
59         OfferPeak(peak, Sha1Hash(false,hash));
60     }
61     close(peakfd);
62 }
63
64
65 /** Basically, simulated receiving every single packet, except
66     for some optimizations. */
67 void            FileTransfer::RecoverProgress () {
68     dry_run_ = true;
69     LoadPeaks();
70     if (!size())
71         return;
72     // at this point, we may use mmapd hashes already
73     // so, lets verify hashes and the data we've got
74     lseek(fd_,0,SEEK_SET);
75     for(int p=0; p<size_kilo(); p++) {
76         uint8_t buf[1<<10];
77         size_t rd = read(fd_,buf,1<<10);
78         OfferData(bin64_t(0,p), buf, rd);
79         if (rd<(1<<10))
80             break;
81     }
82     dry_run_ = false;
83 }
84
85
86 void    FileTransfer::SavePeaks () {
87     std::string file_name = GetTempFilename(root_hash_,instance,std::string(".peaks"));
88     int peakfd = open(file_name.c_str(),O_RDWR|O_CREAT,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
89     for(int i=0; i<peak_count(); i++) {
90         write(peakfd,&(peaks_[i]),sizeof(bin64_t));
91         write(peakfd,*peak_hashes_[i],Sha1Hash::SIZE);
92     }
93     close(peakfd);
94 }
95
96
97 void FileTransfer::SetSize (size_t bytes) { // peaks/root must be already set
98     size_ = bytes;
99     completek_ = complete_ = seq_complete_ = 0;
100         sizek_ = (size_>>10) + ((size_&1023) ? 1 : 0);
101
102         struct stat st;
103         fstat(fd_, &st);
104     if (st.st_size!=bytes)
105         if (ftruncate(fd_, bytes))
106             return; // remain in the 0-state
107     // mmap the hash file into memory
108     std::string file_name = GetTempFilename(root_hash_,instance,std::string(".hashes"));
109         hashfd_ = open(file_name.c_str(),O_RDWR|O_CREAT,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
110     size_t expected_size_ = Sha1Hash::SIZE * sizek_ * 2;
111         struct stat hash_file_st;
112         fstat(hashfd_, &hash_file_st);
113     if ( hash_file_st.st_size != expected_size_ )
114         ftruncate(hashfd_, expected_size_);
115 #ifdef _WIN32
116     HANDLE hashhandle = (HANDLE)_get_osfhandle(hashfd_);
117     hashmaphandle_ = CreateFileMapping(hashhandle,
118                                               NULL,
119                                               PAGE_READWRITE,
120                                               0,
121                                               0,
122                                               NULL);
123         if (hashmaphandle_ != NULL)
124         {
125                 hashes_ = (Sha1Hash*)MapViewOfFile(hashmaphandle_,
126                                                          FILE_MAP_WRITE,
127                                                      0,
128                                                      0,
129                                                      0);
130
131         }
132         if (hashmaphandle_ == NULL || hashes_ == NULL)
133 #else
134     hashes_ = (Sha1Hash*) mmap (NULL, expected_size_, PROT_READ|PROT_WRITE,
135                                MAP_SHARED, hashfd_, 0);
136     if (hashes_==MAP_FAILED)
137 #endif
138     {
139         hashes_ = NULL;
140         size_ = sizek_ = complete_ = completek_ = seq_complete_ = 0;
141         error_ = strerror(errno); // FIXME dprintf()
142         perror("hash tree mmap failed");
143         return;
144     }
145     for(int i=0; i<peak_count_; i++)
146         hashes_[peaks_[i]] = peak_hashes_[i];
147     picker_->Randomize(rand()&31&(sizek_-1));
148 }
149
150
151 void            FileTransfer::Submit () {
152         struct stat st; // TODO:   AppendData()   and   streaming
153         fstat(fd_, &st);
154     size_ = st.st_size;
155         sizek_ = (size_>>10) + ((size_&1023) ? 1 : 0);
156     hashes_ = (Sha1Hash*) malloc(Sha1Hash::SIZE*sizek_*2);
157     peak_count_ = bin64_t::peaks(sizek_,peaks_);
158     for (int p=0; p<peak_count_; p++) {
159         for(bin64_t b=peaks_[p].left_foot(); b.within(peaks_[p]); b=b.next_dfsio(0))
160             if (b.is_base()) {
161                 uint8_t kilo[1<<10];
162                 size_t rd = pread(fd_,kilo,1<<10,b.base_offset()<<10);
163                 hashes_[b] = Sha1Hash(kilo,rd);
164             } else
165                 hashes_[b] = Sha1Hash(hashes_[b.left()],hashes_[b.right()]);
166         peak_hashes_[p] = hashes_[peaks_[p]];
167         //ack_out_.set(peaks_[p],bins::FILLED);
168         OnDataIn(peaks_[p]);
169     }
170     root_hash_ = DeriveRoot();
171     Sha1Hash *hash_tmp = hashes_;
172     SetSize(st.st_size);
173     SavePeaks();
174     seq_complete_ = complete_ = size_;
175     completek_ = sizek_;
176     memcpy(hashes_,hash_tmp,sizek_*Sha1Hash::SIZE*2);
177     free(hash_tmp);
178 }
179
180
181 bin64_t         FileTransfer::peak_for (bin64_t pos) const {
182     int pi=0;
183     while (pi<peak_count_ && !pos.within(peaks_[pi]))
184         pi++;
185     return pi==peak_count_ ? bin64_t(bin64_t::NONE) : peaks_[pi];
186 }
187
188
189 void            FileTransfer::OfferHash (bin64_t pos, const Sha1Hash& hash) {
190         if (!size_)  // only peak hashes are accepted at this point
191                 return OfferPeak(pos,hash);
192     int pi=0;
193     while (pi<peak_count_ && !pos.within(peaks_[pi]))
194         pi++;
195     if (pi==peak_count_)
196         return;
197     if (pos==peaks_[pi] && hash!=peak_hashes_[pi])
198         return;
199     else if (ack_out_.get(pos.parent())!=bins::EMPTY)
200         return; // have this hash already, even accptd data
201         hashes_[pos] = hash;
202 }
203
204
205 bool            FileTransfer::OfferData (bin64_t pos, const uint8_t* data, size_t length) {
206     if (!pos.is_base())
207         return false;
208     if (length<1024 && pos!=bin64_t(0,sizek_-1))
209         return false;
210     if (ack_out_.get(pos)==bins::FILLED)
211         return true; // ???
212     bin64_t peak = peak_for(pos);
213     if (peak==bin64_t::NONE)
214         return false;
215
216     Sha1Hash hash(data,length);
217     bin64_t p = pos;
218     while ( p!=peak && ack_out_.get(p)==bins::EMPTY ) {
219         hashes_[p] = hash;
220         p = p.parent();
221         hash = Sha1Hash(hashes_[p.left()],hashes_[p.right()]) ;
222     }
223     if (hash!=hashes_[p])
224         return false;
225
226     //printf("g %lli %s\n",(uint64_t)pos,hash.hex().c_str());
227         // walk to the nearest proven hash   FIXME 0-layer peak
228     OnDataIn(pos);
229     pwrite(fd_,data,length,pos.base_offset()<<10);
230     complete_ += length;
231     completek_++;
232     if (length<1024) {
233         size_ -= 1024 - length;
234         ftruncate(fd_, size_);
235     }
236     while (ack_out_.get(bin64_t(0,seq_complete_>>10))==bins::FILLED)
237         seq_complete_+=1024;
238     if (seq_complete_>size_)
239         seq_complete_ = size_;
240     return true;
241 }
242
243
244 /*bin64_t         FileTransfer::RevealAck (uint64_t& offset) {
245     if (offset<data_in_off_)
246         offset = data_in_off_;
247     for(int off=offset-data_in_off_; off<data_in_.size(); off++) {
248         offset++;
249         if (data_in_[off]!=bin64_t::NONE) {
250             bin64_t parent = data_in_[off].parent();
251             if (ack_out_.get(parent)!=bins::FILLED)
252                 return data_in_[off];
253             else
254                 data_in_[off] = bin64_t::NONE;
255         }
256     }
257     return bin64_t::NONE;
258 }*/
259
260
261 void            FileTransfer::OnDataIn (bin64_t pos) {
262     ack_out_.set(pos,bins::FILLED);
263     /*bin64_t closed = pos;
264     while (ack_out_.get(closed.parent())==bins::FILLED) // TODO optimize
265         closed = closed.parent();
266     data_in_.push_back(closed);
267     // rotating the queue
268     bin64_t parent = data_in_.front().parent();
269     if (ack_out_.get(parent)!=bins::FILLED)
270         data_in_.push_back(data_in_.front());
271     data_in_.front() = bin64_t::NONE;
272     while ( !data_in_.empty() && data_in_.front()==bin64_t::NONE) {
273         data_in_.pop_front();
274         data_in_off_++;
275     }*/
276 }
277
278
279 Sha1Hash        FileTransfer::DeriveRoot () {
280         int c = peak_count_-1;
281         bin64_t p = peaks_[c];
282         Sha1Hash hash = peak_hashes_[c];
283         c--;
284         while (p!=bin64_t::ALL) {
285                 if (p.is_left()) {
286                         p = p.parent();
287                         hash = Sha1Hash(hash,Sha1Hash::ZERO);
288                 } else {
289                         if (c<0 || peaks_[c]!=p.sibling())
290                                 return Sha1Hash::ZERO;
291                         hash = Sha1Hash(peak_hashes_[c],hash);
292                         p = p.parent();
293                         c--;
294                 }
295         //printf("p %lli %s\n",(uint64_t)p,hash.hex().c_str());
296         }
297     return hash;
298 }
299
300
301 void            FileTransfer::OfferPeak (bin64_t pos, const Sha1Hash& hash) {
302     assert(!size_);
303     if (peak_count_) {
304         bin64_t last_peak = peaks_[peak_count_-1];
305         if ( pos.layer()>=last_peak.layer() ||
306              pos.base_offset()!=last_peak.base_offset()+last_peak.width() )
307             peak_count_ = 0;
308     }
309     peaks_[peak_count_] = pos;
310     peak_hashes_[peak_count_++] = hash;
311     // check whether peak hash candidates add up to the root hash
312     Sha1Hash mustbe_root = DeriveRoot();
313     if (mustbe_root!=root_hash_)
314         return;
315     // bingo, we now know the file size (rounded up to a KByte)
316     SetSize( (pos.base_offset()+pos.width()) << 10               );
317     SavePeaks();
318 }
319
320
321 FileTransfer::~FileTransfer ()
322 {
323 #ifdef _WIN32
324         UnmapViewOfFile(hashes_);
325         CloseHandle(hashmaphandle_);
326 #else
327     munmap(hashes_,sizek_*2*Sha1Hash::SIZE);
328     close(hashfd_);
329     close(fd_);
330     files[fd_] = NULL;
331 #endif
332 }
333
334
335 FileTransfer* FileTransfer::Find (const Sha1Hash& root_hash) {
336     for(int i=0; i<files.size(); i++)
337         if (files[i] && files[i]->root_hash()==root_hash)
338             return files[i];
339     return NULL;
340 }
341
342
343 std::string FileTransfer::GetTempFilename(Sha1Hash& root_hash, int instance, std::string postfix)
344 {
345         std::string tempfile = gettmpdir();
346         std::stringstream ss;
347         ss << instance;
348         tempfile += std::string(".") + root_hash.hex() + std::string(".") + ss.str() + postfix;
349         return tempfile;
350 }
351
352
353 /*int      p2tp::Open (const char* filename, const Sha1Hash& hash) {
354     FileTransfer* ft = new FileTransfer(filename, hash);
355     int fdes = ft->file_descriptor();
356     if (fdes>0) {
357         if (FileTransfer::files.size()<fdes)
358             FileTransfer::files.resize(fdes);
359         FileTransfer::files[fdes] = ft;
360         return fdes;
361     } else {
362         delete ft;
363         return -1;
364     }
365 }*/
366
367
368 void            FileTransfer::OnPexIn (const Address& addr) {
369     for(int i=0; i<hs_in_.size(); i++) {
370         Channel* c = Channel::channels[hs_in_[i]];
371         if (c && c->file_==this && c->peer_==addr)
372             return; // already connected
373     }
374     if (hs_in_.size()<20) {
375         new Channel(this,Channel::sockets[0],addr);
376     } else {
377         pex_in_.push_back(addr);
378         if (pex_in_.size()>1000)
379             pex_in_.pop_front();
380     }
381 }
382
383
384 int        FileTransfer::RevealChannel (int& pex_out_) {
385     pex_out_ -= hs_in_offset_;
386     if (pex_out_<0)
387         pex_out_ = 0;
388     while (pex_out_<hs_in_.size()) {
389         Channel* c = Channel::channels[hs_in_[pex_out_]];
390         if (c && c->file_==this) {
391             pex_out_ += hs_in_offset_ + 1;
392             return c->id;
393         } else {
394             hs_in_[pex_out_] = hs_in_[0];
395             hs_in_.pop_front();
396             hs_in_offset_++;
397         }
398     }
399     pex_out_ += hs_in_offset_;
400     return -1;
401 }
402
403
404 /*
405  for(int i=0; i<peak_hash_count; i++) {
406  bin64_t x = peaks[i], end = x.sibling();
407  do {
408  while (!x.layer()>10) {
409  OfferHash(x.right(), hashes[x.right()]);
410  if ( ! OfferHash(x.left(), hashes[x.left()]) )
411  break;
412  x = x.left();
413  }
414
415  if (x.layer()==10) {
416  if (recheck_data) {
417  uint8_t data[1024];
418  size_t rd = pread(fd,data,2<<10,x.base_offset());
419  if (hashes[x]==Sha1Hash(data,rd))
420  ack_out->set(x,bins::FILLED);
421  // may avoid hashing by checking whether it is zero
422  // and whether the hash matches hash of zero
423  } else {
424  ack_out->set(x,bins::FILLED);
425  }
426  }
427
428  while (x.is_right() && x!=peaks[i])
429  x = x.parent();
430  x = x.sibling();
431  } while (x!=end);
432  }
433
434
435
436
437  // open file
438  if ( hash_file_st.st_size < (sizeof(bin64_t)+Sha1Hash::SIZE)*64 )
439  return;
440  // read root hash
441  char hashbuf[128];
442  uint64_t binbuf;
443  lseek(hashfd_,0,SEEK_SET);
444  read(hashfd_,&binbuf,sizeof(bin64_t));
445  read(hashfd_,hashbuf,Sha1Hash::SIZE);
446  Sha1Hash mustberoot(false,(const char*)hashbuf);
447  if ( binbuf!=bin64_t::ALL || mustberoot != this->root_hash ) {
448  ftruncate(hashfd_,Sha1Hash::SIZE*64);
449  return;
450  }
451  // read peak hashes
452  for(int i=1; i<64 && !this->size; i++){
453  read(hashfd_,&binbuf,sizeof(bin64_t));
454  read(hashfd_,hashbuf,Sha1Hash::SIZE);
455  Sha1Hash mustbepeak(false,(const char*)hashbuf);
456  if (mustbepeak==Sha1Hash::ZERO)
457  break;
458  OfferPeak(binbuf,mustbepeak);
459  }
460  if (!size)
461  return;
462
463
464  */