3 * some transfer-scope code
5 * Created by Victor Grishchenko on 10/6/09.
6 * Copyright 2009-2012 TECHNISCHE UNIVERSITEIT DELFT. All rights reserved.
15 #include "ext/seq_picker.cpp" // FIXME FIXME FIXME FIXME
16 #include "ext/vod_picker.cpp"
18 using namespace swift;
20 std::vector<FileTransfer*> FileTransfer::files(20);
22 #define BINHASHSIZE (sizeof(bin64_t)+sizeof(Sha1Hash))
25 #define TRACKER_RETRY_INTERVAL_START (5*TINT_SEC)
26 #define TRACKER_RETRY_INTERVAL_EXP 1.1 // exponent used to increase INTERVAL_START
27 #define TRACKER_RETRY_INTERVAL_MAX (1800*TINT_SEC) // 30 minutes
29 // FIXME: separate Bootstrap() and Download(), then Size(), Progress(), SeqProgress()
31 FileTransfer::FileTransfer (const char* filename, const Sha1Hash& _root_hash, bool force_check_diskvshash, bool check_netwvshash, uint32_t chunk_size) :
32 file_(filename,_root_hash,chunk_size,NULL,force_check_diskvshash,check_netwvshash), cb_installed(0), mychannels_(),
33 speedzerocount_(0), tracker_(), tracker_retry_interval_(TRACKER_RETRY_INTERVAL_START), tracker_retry_time_(NOW)
35 if (files.size()<fd()+1)
38 if (ENABLE_VOD_PIECEPICKER) {
39 // Ric: init availability
40 availability_ = new Availability();
41 // Ric: TODO assign picker based on input params...
42 picker_ = new VodPiecePicker(this);
45 picker_ = new SeqPiecePicker(this);
46 picker_->Randomize(rand()&63);
47 init_time_ = Channel::Time();
48 cur_speed_[DDIR_UPLOAD] = MovingAverageSpeed();
49 cur_speed_[DDIR_DOWNLOAD] = MovingAverageSpeed();
50 max_speed_[DDIR_UPLOAD] = DBL_MAX;
51 max_speed_[DDIR_DOWNLOAD] = DBL_MAX;
54 evtimer_assign(&evclean_,Channel::evbase,&FileTransfer::LibeventCleanCallback,this);
55 evtimer_add(&evclean_,tint2tv(5*TINT_SEC));
61 void FileTransfer::LibeventCleanCallback(int fd, short event, void *arg)
63 // Arno, 2012-02-24: Why-oh-why, update NOW
66 FileTransfer *ft = (FileTransfer *)arg;
70 // STL and MS and conditional delete from set not a happy place :-(
71 std::set<Channel *> delset;
72 std::set<Channel *>::iterator iter;
73 bool hasestablishedpeers=false;
74 for (iter=ft->mychannels_.begin(); iter!=ft->mychannels_.end(); iter++)
78 if (c->IsScheduled4Close())
81 if (c->is_established ()) {
82 hasestablishedpeers = true;
83 //fprintf(stderr,"%s peer %s\n", ft->file().root_hash().hex().c_str(), c->peer().str() );
87 for (iter=delset.begin(); iter!=delset.end(); iter++)
90 dprintf("%s #%u clean cb close\n",tintstr(),c->id());
92 ft->mychannels_.erase(c);
96 // Arno, 2012-02-24: Check for liveliness.
97 ft->ReConnectToTrackerIfAllowed(hasestablishedpeers);
100 evtimer_add(&(ft->evclean_),tint2tv(5*TINT_SEC));
104 void FileTransfer::ReConnectToTrackerIfAllowed(bool hasestablishedpeers)
106 // If I'm not connected to any
107 // peers, try to contact the tracker again.
108 if (!hasestablishedpeers)
110 if (NOW > tracker_retry_time_)
114 tracker_retry_interval_ *= TRACKER_RETRY_INTERVAL_EXP;
115 if (tracker_retry_interval_ > TRACKER_RETRY_INTERVAL_MAX)
116 tracker_retry_interval_ = TRACKER_RETRY_INTERVAL_MAX;
117 tracker_retry_time_ = NOW + tracker_retry_interval_;
122 tracker_retry_interval_ = TRACKER_RETRY_INTERVAL_START;
123 tracker_retry_time_ = NOW + tracker_retry_interval_;
128 void FileTransfer::ConnectToTracker()
131 if (tracker_ != Address())
132 c = new Channel(this,INVALID_SOCKET,tracker_);
133 else if (Channel::tracker!=Address())
134 c = new Channel(this);
138 Channel * FileTransfer::FindChannel(const Address &addr, Channel *notc)
140 std::set<Channel *>::iterator iter;
141 for (iter=mychannels_.begin(); iter!=mychannels_.end(); iter++)
145 if (c != notc && (c->peer() == addr || c->recv_peer() == addr)) {
156 void Channel::CloseTransfer (FileTransfer* trans) {
157 for(int i=0; i<Channel::channels.size(); i++)
158 if (Channel::channels[i] && Channel::channels[i]->transfer_==trans)
160 //fprintf(stderr,"Channel::CloseTransfer: delete #%i\n", Channel::channels[i]->id());
161 Channel::channels[i]->Close(); // ARNO
162 delete Channel::channels[i];
167 void swift::AddProgressCallback (int transfer,ProgressCallback cb,uint8_t agg) {
169 //fprintf(stderr,"swift::AddProgressCallback: transfer %i\n", transfer );
171 FileTransfer* trans = FileTransfer::file(transfer);
175 //fprintf(stderr,"swift::AddProgressCallback: ft obj %p %p\n", trans, cb );
177 trans->cb_agg[trans->cb_installed] = agg;
178 trans->callbacks[trans->cb_installed] = cb;
179 trans->cb_installed++;
183 void swift::ExternallyRetrieved (int transfer,bin_t piece) {
184 FileTransfer* trans = FileTransfer::file(transfer);
187 trans->ack_out().set(piece); // that easy
191 void swift::RemoveProgressCallback (int transfer, ProgressCallback cb) {
193 //fprintf(stderr,"swift::RemoveProgressCallback: transfer %i\n", transfer );
195 FileTransfer* trans = FileTransfer::file(transfer);
199 //fprintf(stderr,"swift::RemoveProgressCallback: transfer %i ft obj %p %p\n", transfer, trans, cb );
201 for(int i=0; i<trans->cb_installed; i++)
202 if (trans->callbacks[i]==cb)
203 trans->callbacks[i]=trans->callbacks[--trans->cb_installed];
205 for(int i=0; i<trans->cb_installed; i++)
207 fprintf(stderr,"swift::RemoveProgressCallback: transfer %i remain %p\n", transfer, trans->callbacks[i] );
212 FileTransfer::~FileTransfer ()
214 Channel::CloseTransfer(this);
217 delete availability_;
219 // Arno, 2012-02-06: Cancel cleanup timer, otherwise chaos!
220 evtimer_del(&evclean_);
224 FileTransfer* FileTransfer::Find (const Sha1Hash& root_hash) {
225 for(int i=0; i<files.size(); i++)
226 if (files[i] && files[i]->root_hash()==root_hash)
232 int swift:: Find (Sha1Hash hash) {
233 FileTransfer* t = FileTransfer::Find(hash);
241 bool FileTransfer::OnPexIn (const Address& addr) {
243 //fprintf(stderr,"FileTransfer::OnPexIn: %s\n", addr.str() );
244 // Arno: this brings safety, but prevents private swift installations.
245 // TODO: detect public internet.
246 //if (addr.is_private())
248 // Gertjan fix: PEX redo
249 if (hs_in_.size()<SWIFT_MAX_CONNECTIONS)
251 // Arno, 2012-02-27: Check if already connected to this peer.
252 Channel *c = FindChannel(addr,NULL);
254 new Channel(this,Channel::default_socket(),addr);
262 int FileTransfer::RandomChannel (int own_id) {
263 binqueue choose_from;
266 for (i = 0; i < (int) hs_in_.size(); i++) {
267 if (hs_in_[i].toUInt() == own_id)
269 Channel *c = Channel::channel(hs_in_[i].toUInt());
270 if (c == NULL || c->transfer().fd() != this->fd()) {
271 /* Channel was closed or is not associated with this FileTransfer (anymore). */
272 hs_in_[i] = hs_in_[0];
277 if (!c->is_established())
279 choose_from.push_back(hs_in_[i]);
281 if (choose_from.size() == 0)
284 return choose_from[rand() % choose_from.size()].toUInt();
287 void FileTransfer::OnRecvData(int n)
290 cur_speed_[DDIR_DOWNLOAD].AddPoint((uint64_t)n);
293 void FileTransfer::OnSendData(int n)
296 cur_speed_[DDIR_UPLOAD].AddPoint((uint64_t)n);
300 void FileTransfer::OnSendNoData()
302 // AddPoint(0) everytime we don't AddData gives bad speed measurement
303 // batch 32 such events into 1.
305 if (speedzerocount_ >= 32)
307 cur_speed_[DDIR_UPLOAD].AddPoint((uint64_t)0);
313 double FileTransfer::GetCurrentSpeed(data_direction_t ddir)
315 return cur_speed_[ddir].GetSpeedNeutral();
319 void FileTransfer::SetMaxSpeed(data_direction_t ddir, double m)
321 max_speed_[ddir] = m;
322 // Arno, 2012-01-04: Be optimistic, forget history.
323 cur_speed_[ddir].Reset();
327 double FileTransfer::GetMaxSpeed(data_direction_t ddir)
329 return max_speed_[ddir];
333 uint32_t FileTransfer::GetNumLeechers()
336 std::set<Channel *>::iterator iter;
337 for (iter=mychannels_.begin(); iter!=mychannels_.end(); iter++)
341 if (!c->IsComplete()) // incomplete?
348 uint32_t FileTransfer::GetNumSeeders()
351 std::set<Channel *>::iterator iter;
352 for (iter=mychannels_.begin(); iter!=mychannels_.end(); iter++)
356 if (c->IsComplete()) // complete?