save ok
[swift-upb.git] / transfer.cpp
1 /*
2  *  transfer.cpp
3  *  p2tp
4  *
5  *  Created by Victor Grishchenko on 10/6/09.
6  *  Copyright 2009 Delft Technical University. All rights reserved.
7  *
8  */
9 #include <sys/mman.h>
10 #include <errno.h>
11 #include "p2tp.h"
12
13 using namespace p2tp;
14
15 std::vector<FileTransfer*> FileTransfer::files(20);
16 const char* FileTransfer::HASH_FILE_TEMPLATE = "/tmp/.%s.%i.hashes";
17 const char* FileTransfer::PEAK_FILE_TEMPLATE = "/tmp/.%s.%i.peaks";
18 int FileTransfer::instance = 0;
19 #define BINHASHSIZE (sizeof(bin64_t)+sizeof(Sha1Hash))
20
21 #include "ext/seq_picker.cpp"
22
23
24 FileTransfer::FileTransfer (const Sha1Hash& _root_hash, const char* filename) :
25     root_hash(_root_hash), fd(0), hashfd(0), dry_run(false), 
26     peak_count(0), hashes(NULL), error(NULL), size(0), sizek(0),
27     complete(0), completek(0), seq_complete(0)
28 {
29         fd = open(filename,O_RDWR|O_CREAT,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
30         if (fd<0)
31         return;
32     if (root_hash==Sha1Hash::ZERO) // fresh submit, hash it
33         Submit();
34     else
35         RecoverProgress();
36     picker = new SeqPiecePicker(this);
37 }
38
39 void FileTransfer::LoadPeaks () {
40     char file_name[1024];
41     sprintf(file_name,PEAK_FILE_TEMPLATE,root_hash.hex().c_str(),instance);
42     int peakfd = open(file_name,O_RDONLY);
43     if (peakfd<0)
44         return;
45     bin64_t peak;
46     char hash[128];
47     while (sizeof(bin64_t)==read(peakfd,&peak,sizeof(bin64_t))) {
48         read(peakfd,hash,Sha1Hash::SIZE);
49         OfferPeak(peak, Sha1Hash(false,hash));
50     }
51     close(peakfd);
52 }
53
54
55 /** Basically, simulated receiving every single packet, except
56     for some optimizations. */
57 void            FileTransfer::RecoverProgress () {
58     dry_run = true;
59     LoadPeaks();
60     if (!size)
61         return;
62     // at this point, we may use mmapd hashes already
63     // so, lets verify hashes and the data we've got
64     lseek(fd,0,SEEK_SET);
65     for(int p=0; p<sizek; p++) {
66         uint8_t buf[1<<10];
67         size_t rd = read(fd,buf,1<<10);
68         OfferData(bin64_t(0,p), buf, rd);
69         if (rd<(1<<10))
70             break;
71     }
72     dry_run = false;
73 }
74
75
76 void    FileTransfer::SavePeaks () {
77     char file_name[1024];
78     sprintf(file_name,PEAK_FILE_TEMPLATE,root_hash.hex().c_str(),instance);
79     int peakfd = open(file_name,O_RDWR|O_CREAT,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
80     for(int i=0; i<peak_count; i++) {
81         write(peakfd,&(peaks[i]),sizeof(bin64_t));
82         write(peakfd,*peak_hashes[i],Sha1Hash::SIZE);
83     }
84     close(peakfd);
85 }
86
87
88 void FileTransfer::SetSize (size_t bytes) { // peaks/root must be already set
89     size = bytes;
90     completek = complete = seq_complete = 0;
91         sizek = (size>>10) + ((size&1023) ? 1 : 0);
92     
93     char file_name[1024];
94         struct stat st;
95         fstat(fd, &st);
96     if (st.st_size!=bytes)
97         if (ftruncate(fd, bytes))
98             return; // remain in the 0-state
99     // mmap the hash file into memory
100     sprintf(file_name,HASH_FILE_TEMPLATE,root_hash.hex().c_str(),instance);
101         hashfd = open(file_name,O_RDWR|O_CREAT,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
102     size_t expected_size = Sha1Hash::SIZE * sizek * 2;
103         struct stat hash_file_st;
104         fstat(hashfd, &hash_file_st);
105     if ( hash_file_st.st_size != expected_size )
106         ftruncate(hashfd, expected_size);
107     hashes = (Sha1Hash*) mmap (NULL, expected_size, PROT_READ|PROT_WRITE, 
108                                MAP_SHARED, hashfd, 0);
109     if (hashes==MAP_FAILED) {
110         hashes = NULL;
111         size = sizek = complete = completek = seq_complete = 0;
112         error = strerror(errno);
113         perror("hash tree mmap failed");
114         return;
115     }
116     for(int i=0; i<peak_count; i++)
117         hashes[peaks[i]] = peak_hashes[i];
118 }  
119
120
121 void            FileTransfer::Submit () {
122         struct stat st; // TODO:   AppendData()   and   streaming
123         fstat(fd, &st);
124     size = st.st_size;
125         sizek = (size>>10) + ((size&1023) ? 1 : 0);
126     hashes = (Sha1Hash*) malloc(Sha1Hash::SIZE*sizek*2);
127     peak_count = bin64_t::peaks(sizek,peaks);
128     for (int p=0; p<peak_count; p++) {
129         for(bin64_t b=peaks[p].left_foot(); b.within(peaks[p]); b=b.next_dfsio(0)) 
130             if (b.is_base()) {
131                 uint8_t kilo[1<<10];
132                 size_t rd = pread(fd,kilo,1<<10,b.base_offset()<<10);
133                 hashes[b] = Sha1Hash(kilo,rd);
134             } else
135                 hashes[b] = Sha1Hash(hashes[b.left()],hashes[b.right()]);
136         peak_hashes[p] = hashes[peaks[p]];
137         ack_out.set(peaks[p],bins::FILLED);
138     }
139     root_hash = DeriveRoot();
140     Sha1Hash *hash_tmp = hashes;
141     SetSize(st.st_size);
142     SavePeaks();
143     seq_complete = complete = size;
144     completek = sizek;
145     memcpy(hashes,hash_tmp,sizek*Sha1Hash::SIZE*2);
146     free(hash_tmp);
147 }
148
149
150 void            FileTransfer::OfferHash (bin64_t pos, const Sha1Hash& hash) {    
151         if (!size)  // only peak hashes are accepted at this point
152                 return OfferPeak(pos,hash);
153         if (pos>=sizek*2)
154                 return;
155     if (ack_out.get(pos)!=bins::EMPTY)
156         return; // have this hash already, even accptd data
157         hashes[pos] = hash;
158 }
159
160
161 bool            FileTransfer::OfferData (bin64_t pos, uint8_t* data, size_t length) {
162     if (!pos.is_base())
163         return false;
164     if (length<1024 && pos!=bin64_t(0,sizek-1))
165         return false;
166     if (ack_out.get(pos)==bins::FILLED)
167         return true; // ???
168     int peak=0;
169     while (peak<peak_count && !pos.within(peaks[peak]))
170         peak++;
171     if (peak==peak_count)
172         return false;
173     Sha1Hash hash(data,length);
174     if (pos==peaks[peak]) {
175         if (hash!=peak_hashes[peak])
176             return false;
177     } else {
178         hashes[pos] = hash;
179         for(bin64_t p = pos.parent(); p.within(peaks[peak]) && ack_out.get(p)==bins::EMPTY; p=p.parent()) {
180             Sha1Hash phash = Sha1Hash(hashes[p.left()],hashes[p.right()]) ;
181             if (hashes[p]!=phash)
182                 return false; // hash mismatch
183         }
184     }
185     //printf("g %lli %s\n",(uint64_t)pos,hash.hex().c_str());
186         // walk to the nearest proven hash   FIXME 0-layer peak
187     ack_out.set(pos,bins::FILLED);
188     pwrite(fd,data,length,pos.base_offset()<<10);
189     complete += length;
190     completek++;
191     if (length<1024) {
192         size -= 1024 - length;
193         ftruncate(fd, size);
194     }
195     while (ack_out.get(bin64_t(0,seq_complete>>10))==bins::FILLED)
196         seq_complete+=1024;
197     if (seq_complete>size)
198         seq_complete = size;
199     return true;
200 }
201
202
203 Sha1Hash        FileTransfer::DeriveRoot () {
204         int c = peak_count-1;
205         bin64_t p = peaks[c];
206         Sha1Hash hash = peak_hashes[c];
207         c--;
208         while (p!=bin64_t::ALL) {
209                 if (p.is_left()) {
210                         p = p.parent();
211                         hash = Sha1Hash(hash,Sha1Hash::ZERO);
212                 } else {
213                         if (c<0 || peaks[c]!=p.sibling())
214                                 return Sha1Hash::ZERO;
215                         hash = Sha1Hash(peak_hashes[c],hash);
216                         p = p.parent();
217                         c--;
218                 }
219         //printf("p %lli %s\n",(uint64_t)p,hash.hex().c_str());
220         }
221     return hash;
222 }
223
224
225 void            FileTransfer::OfferPeak (bin64_t pos, const Sha1Hash& hash) {
226     assert(!size);
227     if (peak_count) {
228         bin64_t last_peak = peaks[peak_count-1];
229         if ( pos.layer()>=last_peak.layer() || 
230              pos.base_offset()!=last_peak.base_offset()+last_peak.width() )
231             peak_count = 0;
232     }
233     peaks[peak_count] = pos;
234     peak_hashes[peak_count++] = hash;
235     // check whether peak hash candidates add up to the root hash
236     Sha1Hash mustbe_root = DeriveRoot();
237     if (mustbe_root!=root_hash)
238         return;
239     // bingo, we now know the file size (rounded up to a KByte)
240     SetSize( (pos.base_offset()+pos.width()) << 10               );
241     SavePeaks();
242 }
243
244
245 FileTransfer::~FileTransfer () {
246     munmap(hashes,sizek*2*Sha1Hash::SIZE);
247     close(hashfd);
248     close(fd);
249 }
250
251                            
252 FileTransfer* FileTransfer::Find (const Sha1Hash& root_hash) {
253     for(int i=0; i<files.size(); i++)
254         if (files[i] && files[i]->root_hash==root_hash)
255             return files[i];
256     return NULL;
257 }
258
259
260 int      p2tp::Open (const char* filename) {
261     return Open(Sha1Hash::ZERO,filename);
262 }
263
264
265 int      p2tp::Open (const Sha1Hash& hash, const char* filename) {
266     FileTransfer* ft = new FileTransfer(hash, filename);
267     if (ft->fd>0) {
268         if (FileTransfer::files.size()<ft->fd)
269             FileTransfer::files.resize(ft->fd);
270         FileTransfer::files[ft->fd] = ft;
271         return ft->fd;
272     } else {
273         delete ft;
274         return -1;
275     }
276 }
277
278
279 void     Close (int fdes) {
280     // FIXME delete all channels
281     delete FileTransfer::files[fdes];
282     FileTransfer::files[fdes] = NULL;
283 }
284
285
286
287 /*
288  for(int i=0; i<peak_hash_count; i++) {
289  bin64_t x = peaks[i], end = x.sibling();
290  do {
291  while (!x.layer()>10) {
292  OfferHash(x.right(), hashes[x.right()]);
293  if ( ! OfferHash(x.left(), hashes[x.left()]) )
294  break;
295  x = x.left();
296  }
297  
298  if (x.layer()==10) {
299  if (recheck_data) {
300  uint8_t data[1024];
301  size_t rd = pread(fd,data,2<<10,x.base_offset());
302  if (hashes[x]==Sha1Hash(data,rd))
303  ack_out->set(x,bins::FILLED);
304  // may avoid hashing by checking whether it is zero
305  // and whether the hash matches hash of zero
306  } else {
307  ack_out->set(x,bins::FILLED);
308  }
309  }
310  
311  while (x.is_right() && x!=peaks[i])
312  x = x.parent();
313  x = x.sibling();
314  } while (x!=end);
315  }
316  
317  
318  
319  
320  // open file
321  if ( hash_file_st.st_size < (sizeof(bin64_t)+Sha1Hash::SIZE)*64 )
322  return;
323  // read root hash
324  char hashbuf[128];
325  uint64_t binbuf;
326  lseek(hashfd,0,SEEK_SET);
327  read(hashfd,&binbuf,sizeof(bin64_t));
328  read(hashfd,hashbuf,Sha1Hash::SIZE);
329  Sha1Hash mustberoot(false,(const char*)hashbuf);
330  if ( binbuf!=bin64_t::ALL || mustberoot != this->root_hash ) {
331  ftruncate(hashfd,Sha1Hash::SIZE*64);
332  return;
333  }
334  // read peak hashes
335  for(int i=1; i<64 && !this->size; i++){
336  read(hashfd,&binbuf,sizeof(bin64_t));
337  read(hashfd,hashbuf,Sha1Hash::SIZE);
338  Sha1Hash mustbepeak(false,(const char*)hashbuf);
339  if (mustbepeak==Sha1Hash::ZERO)
340  break;
341  OfferPeak(binbuf,mustbepeak);
342  }
343  if (!size)
344  return;
345  
346  
347  */