5 * Created by Victor Grishchenko on 10/6/09.
6 * Copyright 2009 Delft Technical University. All rights reserved.
15 std::vector<FileTransfer*> FileTransfer::files(20);
16 const char* FileTransfer::HASH_FILE_TEMPLATE = "/tmp/.%s.%i.hashes";
17 const char* FileTransfer::PEAK_FILE_TEMPLATE = "/tmp/.%s.%i.peaks";
18 int FileTransfer::instance = 0;
19 #define BINHASHSIZE (sizeof(bin64_t)+sizeof(Sha1Hash))
21 #include "ext/seq_picker.cpp"
24 FileTransfer::FileTransfer (const Sha1Hash& _root_hash, const char* filename) :
25 root_hash(_root_hash), fd(0), hashfd(0), dry_run(false),
26 peak_count(0), hashes(NULL), error(NULL), size(0), sizek(0),
27 complete(0), completek(0), seq_complete(0)
29 fd = open(filename,O_RDWR|O_CREAT,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
32 if (root_hash==Sha1Hash::ZERO) // fresh submit, hash it
36 picker = new SeqPiecePicker(this);
39 void FileTransfer::LoadPeaks () {
41 sprintf(file_name,PEAK_FILE_TEMPLATE,root_hash.hex().c_str(),instance);
42 int peakfd = open(file_name,O_RDONLY);
47 while (sizeof(bin64_t)==read(peakfd,&peak,sizeof(bin64_t))) {
48 read(peakfd,hash,Sha1Hash::SIZE);
49 OfferPeak(peak, Sha1Hash(false,hash));
55 /** Basically, simulated receiving every single packet, except
56 for some optimizations. */
57 void FileTransfer::RecoverProgress () {
62 // at this point, we may use mmapd hashes already
63 // so, lets verify hashes and the data we've got
65 for(int p=0; p<sizek; p++) {
67 size_t rd = read(fd,buf,1<<10);
68 OfferData(bin64_t(0,p), buf, rd);
76 void FileTransfer::SavePeaks () {
78 sprintf(file_name,PEAK_FILE_TEMPLATE,root_hash.hex().c_str(),instance);
79 int peakfd = open(file_name,O_RDWR|O_CREAT,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
80 for(int i=0; i<peak_count; i++) {
81 write(peakfd,&(peaks[i]),sizeof(bin64_t));
82 write(peakfd,*peak_hashes[i],Sha1Hash::SIZE);
88 void FileTransfer::SetSize (size_t bytes) { // peaks/root must be already set
90 completek = complete = seq_complete = 0;
91 sizek = (size>>10) + ((size&1023) ? 1 : 0);
96 if (st.st_size!=bytes)
97 if (ftruncate(fd, bytes))
98 return; // remain in the 0-state
99 // mmap the hash file into memory
100 sprintf(file_name,HASH_FILE_TEMPLATE,root_hash.hex().c_str(),instance);
101 hashfd = open(file_name,O_RDWR|O_CREAT,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
102 size_t expected_size = Sha1Hash::SIZE * sizek * 2;
103 struct stat hash_file_st;
104 fstat(hashfd, &hash_file_st);
105 if ( hash_file_st.st_size != expected_size )
106 ftruncate(hashfd, expected_size);
107 hashes = (Sha1Hash*) mmap (NULL, expected_size, PROT_READ|PROT_WRITE,
108 MAP_SHARED, hashfd, 0);
109 if (hashes==MAP_FAILED) {
111 size = sizek = complete = completek = seq_complete = 0;
112 error = strerror(errno);
113 perror("hash tree mmap failed");
116 for(int i=0; i<peak_count; i++)
117 hashes[peaks[i]] = peak_hashes[i];
121 void FileTransfer::Submit () {
122 struct stat st; // TODO: AppendData() and streaming
125 sizek = (size>>10) + ((size&1023) ? 1 : 0);
126 hashes = (Sha1Hash*) malloc(Sha1Hash::SIZE*sizek*2);
127 peak_count = bin64_t::peaks(sizek,peaks);
128 for (int p=0; p<peak_count; p++) {
129 for(bin64_t b=peaks[p].left_foot(); b.within(peaks[p]); b=b.next_dfsio(0))
132 size_t rd = pread(fd,kilo,1<<10,b.base_offset()<<10);
133 hashes[b] = Sha1Hash(kilo,rd);
135 hashes[b] = Sha1Hash(hashes[b.left()],hashes[b.right()]);
136 peak_hashes[p] = hashes[peaks[p]];
137 ack_out.set(peaks[p],bins::FILLED);
139 root_hash = DeriveRoot();
140 Sha1Hash *hash_tmp = hashes;
143 seq_complete = complete = size;
145 memcpy(hashes,hash_tmp,sizek*Sha1Hash::SIZE*2);
150 void FileTransfer::OfferHash (bin64_t pos, const Sha1Hash& hash) {
151 if (!size) // only peak hashes are accepted at this point
152 return OfferPeak(pos,hash);
155 if (ack_out.get(pos)!=bins::EMPTY)
156 return; // have this hash already, even accptd data
161 bool FileTransfer::OfferData (bin64_t pos, uint8_t* data, size_t length) {
164 if (length<1024 && pos!=bin64_t(0,sizek-1))
166 if (ack_out.get(pos)==bins::FILLED)
169 while (peak<peak_count && !pos.within(peaks[peak]))
171 if (peak==peak_count)
173 Sha1Hash hash(data,length);
174 if (pos==peaks[peak]) {
175 if (hash!=peak_hashes[peak])
179 for(bin64_t p = pos.parent(); p.within(peaks[peak]) && ack_out.get(p)==bins::EMPTY; p=p.parent()) {
180 Sha1Hash phash = Sha1Hash(hashes[p.left()],hashes[p.right()]) ;
181 if (hashes[p]!=phash)
182 return false; // hash mismatch
185 //printf("g %lli %s\n",(uint64_t)pos,hash.hex().c_str());
186 // walk to the nearest proven hash FIXME 0-layer peak
187 ack_out.set(pos,bins::FILLED);
188 pwrite(fd,data,length,pos.base_offset()<<10);
192 size -= 1024 - length;
195 while (ack_out.get(bin64_t(0,seq_complete>>10))==bins::FILLED)
197 if (seq_complete>size)
203 Sha1Hash FileTransfer::DeriveRoot () {
204 int c = peak_count-1;
205 bin64_t p = peaks[c];
206 Sha1Hash hash = peak_hashes[c];
208 while (p!=bin64_t::ALL) {
211 hash = Sha1Hash(hash,Sha1Hash::ZERO);
213 if (c<0 || peaks[c]!=p.sibling())
214 return Sha1Hash::ZERO;
215 hash = Sha1Hash(peak_hashes[c],hash);
219 //printf("p %lli %s\n",(uint64_t)p,hash.hex().c_str());
225 void FileTransfer::OfferPeak (bin64_t pos, const Sha1Hash& hash) {
228 bin64_t last_peak = peaks[peak_count-1];
229 if ( pos.layer()>=last_peak.layer() ||
230 pos.base_offset()!=last_peak.base_offset()+last_peak.width() )
233 peaks[peak_count] = pos;
234 peak_hashes[peak_count++] = hash;
235 // check whether peak hash candidates add up to the root hash
236 Sha1Hash mustbe_root = DeriveRoot();
237 if (mustbe_root!=root_hash)
239 // bingo, we now know the file size (rounded up to a KByte)
240 SetSize( (pos.base_offset()+pos.width()) << 10 );
245 FileTransfer::~FileTransfer () {
246 munmap(hashes,sizek*2*Sha1Hash::SIZE);
252 FileTransfer* FileTransfer::Find (const Sha1Hash& root_hash) {
253 for(int i=0; i<files.size(); i++)
254 if (files[i] && files[i]->root_hash==root_hash)
260 int p2tp::Open (const char* filename) {
261 return Open(Sha1Hash::ZERO,filename);
265 int p2tp::Open (const Sha1Hash& hash, const char* filename) {
266 FileTransfer* ft = new FileTransfer(hash, filename);
268 if (FileTransfer::files.size()<ft->fd)
269 FileTransfer::files.resize(ft->fd);
270 FileTransfer::files[ft->fd] = ft;
279 void Close (int fdes) {
280 // FIXME delete all channels
281 delete FileTransfer::files[fdes];
282 FileTransfer::files[fdes] = NULL;
288 for(int i=0; i<peak_hash_count; i++) {
289 bin64_t x = peaks[i], end = x.sibling();
291 while (!x.layer()>10) {
292 OfferHash(x.right(), hashes[x.right()]);
293 if ( ! OfferHash(x.left(), hashes[x.left()]) )
301 size_t rd = pread(fd,data,2<<10,x.base_offset());
302 if (hashes[x]==Sha1Hash(data,rd))
303 ack_out->set(x,bins::FILLED);
304 // may avoid hashing by checking whether it is zero
305 // and whether the hash matches hash of zero
307 ack_out->set(x,bins::FILLED);
311 while (x.is_right() && x!=peaks[i])
321 if ( hash_file_st.st_size < (sizeof(bin64_t)+Sha1Hash::SIZE)*64 )
326 lseek(hashfd,0,SEEK_SET);
327 read(hashfd,&binbuf,sizeof(bin64_t));
328 read(hashfd,hashbuf,Sha1Hash::SIZE);
329 Sha1Hash mustberoot(false,(const char*)hashbuf);
330 if ( binbuf!=bin64_t::ALL || mustberoot != this->root_hash ) {
331 ftruncate(hashfd,Sha1Hash::SIZE*64);
335 for(int i=1; i<64 && !this->size; i++){
336 read(hashfd,&binbuf,sizeof(bin64_t));
337 read(hashfd,hashbuf,Sha1Hash::SIZE);
338 Sha1Hash mustbepeak(false,(const char*)hashbuf);
339 if (mustbepeak==Sha1Hash::ZERO)
341 OfferPeak(binbuf,mustbepeak);