Add the source files for the swift library.
[swifty.git] / src / libswift / transfer.cpp
1 /*
2  *  transfer.cpp
3  *  some transfer-scope code
4  *
5  *  Created by Victor Grishchenko on 10/6/09.
6  *  Copyright 2009-2012 TECHNISCHE UNIVERSITEIT DELFT. All rights reserved.
7  *
8  */
9 #include <errno.h>
10 #include <string>
11 #include <sstream>
12 #include <cfloat>
13 #include "swift.h"
14
15 #include "ext/seq_picker.cpp" // FIXME FIXME FIXME FIXME
16 #include "ext/vod_picker.cpp"
17
18 using namespace swift;
19
20 std::vector<FileTransfer*> FileTransfer::files(20);
21
22 #define BINHASHSIZE (sizeof(bin64_t)+sizeof(Sha1Hash))
23
24
25 #define TRACKER_RETRY_INTERVAL_START    (5*TINT_SEC)
26 #define TRACKER_RETRY_INTERVAL_EXP              1.1                             // exponent used to increase INTERVAL_START
27 #define TRACKER_RETRY_INTERVAL_MAX              (1800*TINT_SEC) // 30 minutes
28
29 // FIXME: separate Bootstrap() and Download(), then Size(), Progress(), SeqProgress()
30
31 FileTransfer::FileTransfer (const char* filename, const Sha1Hash& _root_hash, bool force_check_diskvshash, bool check_netwvshash, uint32_t chunk_size) :
32     file_(filename,_root_hash,chunk_size,NULL,force_check_diskvshash,check_netwvshash), cb_installed(0), mychannels_(),
33     speedzerocount_(0), tracker_(), tracker_retry_interval_(TRACKER_RETRY_INTERVAL_START), tracker_retry_time_(NOW)
34 {
35     if (files.size()<fd()+1)
36         files.resize(fd()+1);
37     files[fd()] = this;
38     if (ENABLE_VOD_PIECEPICKER) {
39         // Ric: init availability
40         availability_ = new Availability();
41         // Ric: TODO assign picker based on input params...
42         picker_ = new VodPiecePicker(this);
43     }
44     else
45         picker_ = new SeqPiecePicker(this);
46     picker_->Randomize(rand()&63);
47     init_time_ = Channel::Time();
48     cur_speed_[DDIR_UPLOAD] = MovingAverageSpeed();
49     cur_speed_[DDIR_DOWNLOAD] = MovingAverageSpeed();
50     max_speed_[DDIR_UPLOAD] = DBL_MAX;
51     max_speed_[DDIR_DOWNLOAD] = DBL_MAX;
52
53     // SAFECLOSE
54     evtimer_assign(&evclean_,Channel::evbase,&FileTransfer::LibeventCleanCallback,this);
55     evtimer_add(&evclean_,tint2tv(5*TINT_SEC));
56
57 }
58
59
60 // SAFECLOSE
61 void FileTransfer::LibeventCleanCallback(int fd, short event, void *arg)
62 {
63         // Arno, 2012-02-24: Why-oh-why, update NOW
64         Channel::Time();
65
66         FileTransfer *ft = (FileTransfer *)arg;
67         if (ft == NULL)
68                 return;
69
70         // STL and MS and conditional delete from set not a happy place :-(
71         std::set<Channel *>     delset;
72         std::set<Channel *>::iterator iter;
73         bool hasestablishedpeers=false;
74         for (iter=ft->mychannels_.begin(); iter!=ft->mychannels_.end(); iter++)
75         {
76                 Channel *c = *iter;
77                 if (c != NULL) {
78                         if (c->IsScheduled4Close())
79                                 delset.insert(c);
80
81                         if (c->is_established ()) {
82                                 hasestablishedpeers = true;
83                                 //fprintf(stderr,"%s peer %s\n", ft->file().root_hash().hex().c_str(), c->peer().str() );
84                         }
85                 }
86         }
87         for (iter=delset.begin(); iter!=delset.end(); iter++)
88         {
89                 Channel *c = *iter;
90                 dprintf("%s #%u clean cb close\n",tintstr(),c->id());
91                 c->Close();
92                 ft->mychannels_.erase(c);
93                 delete c;
94     }
95
96         // Arno, 2012-02-24: Check for liveliness.
97         ft->ReConnectToTrackerIfAllowed(hasestablishedpeers);
98
99         // Reschedule cleanup
100     evtimer_add(&(ft->evclean_),tint2tv(5*TINT_SEC));
101 }
102
103
104 void FileTransfer::ReConnectToTrackerIfAllowed(bool hasestablishedpeers)
105 {
106         // If I'm not connected to any
107         // peers, try to contact the tracker again.
108         if (!hasestablishedpeers)
109         {
110                 if (NOW > tracker_retry_time_)
111                 {
112                         ConnectToTracker();
113
114                         tracker_retry_interval_ *= TRACKER_RETRY_INTERVAL_EXP;
115                         if (tracker_retry_interval_ > TRACKER_RETRY_INTERVAL_MAX)
116                                 tracker_retry_interval_ = TRACKER_RETRY_INTERVAL_MAX;
117                         tracker_retry_time_ = NOW + tracker_retry_interval_;
118                 }
119         }
120         else
121         {
122                 tracker_retry_interval_ = TRACKER_RETRY_INTERVAL_START;
123                 tracker_retry_time_ = NOW + tracker_retry_interval_;
124         }
125 }
126
127
128 void FileTransfer::ConnectToTracker()
129 {
130         Channel *c = NULL;
131     if (tracker_ != Address())
132         c = new Channel(this,INVALID_SOCKET,tracker_);
133     else if (Channel::tracker!=Address())
134         c = new Channel(this);
135 }
136
137
138 Channel * FileTransfer::FindChannel(const Address &addr, Channel *notc)
139 {
140         std::set<Channel *>::iterator iter;
141         for (iter=mychannels_.begin(); iter!=mychannels_.end(); iter++)
142         {
143                 Channel *c = *iter;
144                 if (c != NULL) {
145                         if (c != notc && (c->peer() == addr || c->recv_peer() == addr)) {
146                                 return c;
147                         }
148                 }
149         }
150         return NULL;
151 }
152
153
154
155
156 void    Channel::CloseTransfer (FileTransfer* trans) {
157     for(int i=0; i<Channel::channels.size(); i++)
158         if (Channel::channels[i] && Channel::channels[i]->transfer_==trans)
159         {
160                 //fprintf(stderr,"Channel::CloseTransfer: delete #%i\n", Channel::channels[i]->id());
161                 Channel::channels[i]->Close(); // ARNO
162             delete Channel::channels[i];
163         }
164 }
165
166
167 void swift::AddProgressCallback (int transfer,ProgressCallback cb,uint8_t agg) {
168
169         //fprintf(stderr,"swift::AddProgressCallback: transfer %i\n", transfer );
170
171     FileTransfer* trans = FileTransfer::file(transfer);
172     if (!trans)
173         return;
174
175     //fprintf(stderr,"swift::AddProgressCallback: ft obj %p %p\n", trans, cb );
176
177     trans->cb_agg[trans->cb_installed] = agg;
178     trans->callbacks[trans->cb_installed] = cb;
179     trans->cb_installed++;
180 }
181
182
183 void swift::ExternallyRetrieved (int transfer,bin_t piece) {
184     FileTransfer* trans = FileTransfer::file(transfer);
185     if (!trans)
186         return;
187     trans->ack_out().set(piece); // that easy
188 }
189
190
191 void swift::RemoveProgressCallback (int transfer, ProgressCallback cb) {
192
193         //fprintf(stderr,"swift::RemoveProgressCallback: transfer %i\n", transfer );
194
195     FileTransfer* trans = FileTransfer::file(transfer);
196     if (!trans)
197         return;
198
199     //fprintf(stderr,"swift::RemoveProgressCallback: transfer %i ft obj %p %p\n", transfer, trans, cb );
200
201     for(int i=0; i<trans->cb_installed; i++)
202         if (trans->callbacks[i]==cb)
203             trans->callbacks[i]=trans->callbacks[--trans->cb_installed];
204
205     for(int i=0; i<trans->cb_installed; i++)
206     {
207         fprintf(stderr,"swift::RemoveProgressCallback: transfer %i remain %p\n", transfer, trans->callbacks[i] );
208     }
209 }
210
211
212 FileTransfer::~FileTransfer ()
213 {
214     Channel::CloseTransfer(this);
215     files[fd()] = NULL;
216     delete picker_;
217     delete availability_;
218   
219     // Arno, 2012-02-06: Cancel cleanup timer, otherwise chaos!
220     evtimer_del(&evclean_);
221 }
222
223
224 FileTransfer* FileTransfer::Find (const Sha1Hash& root_hash) {
225     for(int i=0; i<files.size(); i++)
226         if (files[i] && files[i]->root_hash()==root_hash)
227             return files[i];
228     return NULL;
229 }
230
231
232 int swift:: Find (Sha1Hash hash) {
233     FileTransfer* t = FileTransfer::Find(hash);
234     if (t)
235         return t->fd();
236     return -1;
237 }
238
239
240
241 bool FileTransfer::OnPexIn (const Address& addr) {
242
243         //fprintf(stderr,"FileTransfer::OnPexIn: %s\n", addr.str() );
244         // Arno: this brings safety, but prevents private swift installations.
245         // TODO: detect public internet.
246         //if (addr.is_private())
247         //      return false;
248     // Gertjan fix: PEX redo
249     if (hs_in_.size()<SWIFT_MAX_CONNECTIONS)
250     {
251         // Arno, 2012-02-27: Check if already connected to this peer.
252                 Channel *c = FindChannel(addr,NULL);
253                 if (c == NULL)
254                         new Channel(this,Channel::default_socket(),addr);
255                 else
256                         return false;
257     }
258     return true;
259 }
260
261 //Gertjan
262 int FileTransfer::RandomChannel (int own_id) {
263     binqueue choose_from;
264     int i;
265
266     for (i = 0; i < (int) hs_in_.size(); i++) {
267         if (hs_in_[i].toUInt() == own_id)
268             continue;
269         Channel *c = Channel::channel(hs_in_[i].toUInt());
270         if (c == NULL || c->transfer().fd() != this->fd()) {
271             /* Channel was closed or is not associated with this FileTransfer (anymore). */
272             hs_in_[i] = hs_in_[0];
273             hs_in_.pop_front();
274             i--;
275             continue;
276         }
277         if (!c->is_established())
278             continue;
279         choose_from.push_back(hs_in_[i]);
280     }
281     if (choose_from.size() == 0)
282         return -1;
283
284     return choose_from[rand() % choose_from.size()].toUInt();
285 }
286
287 void            FileTransfer::OnRecvData(int n)
288 {
289         // Got n ~ 32K
290         cur_speed_[DDIR_DOWNLOAD].AddPoint((uint64_t)n);
291 }
292
293 void            FileTransfer::OnSendData(int n)
294 {
295         // Sent n ~ 1K
296         cur_speed_[DDIR_UPLOAD].AddPoint((uint64_t)n);
297 }
298
299
300 void            FileTransfer::OnSendNoData()
301 {
302         // AddPoint(0) everytime we don't AddData gives bad speed measurement
303         // batch 32 such events into 1.
304         speedzerocount_++;
305         if (speedzerocount_ >= 32)
306         {
307                 cur_speed_[DDIR_UPLOAD].AddPoint((uint64_t)0);
308                 speedzerocount_ = 0;
309         }
310 }
311
312
313 double          FileTransfer::GetCurrentSpeed(data_direction_t ddir)
314 {
315         return cur_speed_[ddir].GetSpeedNeutral();
316 }
317
318
319 void            FileTransfer::SetMaxSpeed(data_direction_t ddir, double m)
320 {
321         max_speed_[ddir] = m;
322         // Arno, 2012-01-04: Be optimistic, forget history.
323         cur_speed_[ddir].Reset();
324 }
325
326
327 double          FileTransfer::GetMaxSpeed(data_direction_t ddir)
328 {
329         return max_speed_[ddir];
330 }
331
332
333 uint32_t        FileTransfer::GetNumLeechers()
334 {
335         uint32_t count = 0;
336         std::set<Channel *>::iterator iter;
337     for (iter=mychannels_.begin(); iter!=mychannels_.end(); iter++)
338     {
339             Channel *c = *iter;
340             if (c != NULL)
341                     if (!c->IsComplete()) // incomplete?
342                             count++;
343     }
344     return count;
345 }
346
347
348 uint32_t        FileTransfer::GetNumSeeders()
349 {
350         uint32_t count = 0;
351         std::set<Channel *>::iterator iter;
352     for (iter=mychannels_.begin(); iter!=mychannels_.end(); iter++)
353     {
354             Channel *c = *iter;
355             if (c != NULL)
356                     if (c->IsComplete()) // complete?
357                             count++;
358     }
359     return count;
360 }