From: Victor Grishchenko Date: Thu, 25 Feb 2010 17:47:29 +0000 (+0100) Subject: Towards ACK : HAVE split. X-Git-Url: http://p2p-next.cs.pub.ro/gitweb/?a=commitdiff_plain;h=9af0d3f7a1cbd5adba8fdb1da74092fc9a93904b;p=swift-upb.git Towards ACK : HAVE split. One message for all worked well on a single connection; in a swarm, creates a mess. --- diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..a3c4828 --- /dev/null +++ b/Makefile @@ -0,0 +1,7 @@ +CPPFLAGS=-I. + +all: swift + +swift: swift.o sha1.o compat.o sendrecv.o send_control.o hashtree.o bin64.o bins.o channel.o datagram.o transfer.o + g++ -I. *.o -o swift + diff --git a/sendrecv.cpp b/sendrecv.cpp index b36b41e..e357a02 100644 --- a/sendrecv.cpp +++ b/sendrecv.cpp @@ -45,20 +45,25 @@ void Channel::AddUncleHashes (Datagram& dgram, bin64_t pos) { } +bin64_t Channel::ImposeHint () { + uint64_t twist = peer_channel_id_; // got no hints, send something randomly + twist &= file().peak(0); // FIXME may make it semi-seq here + file().ack_out().twist(twist); + ack_in_.twist(twist); + bin64_t my_pick = + file().ack_out().find_filtered(ack_in_,bin64_t::ALL,binmap_t::FILLED); + while (my_pick.width()>max(1,(int)cwnd_)) + my_pick = my_pick.left(); + file().ack_out().twist(0); + ack_in_.twist(0); + return my_pick.twisted(twist); +} + + bin64_t Channel::DequeueHint () { - if (hint_in_.empty() && last_recv_time_>NOW-rtt_avg_-TINT_SEC) { - uint64_t twist = peer_channel_id_; // got no hints, send something randomly - twist &= file().peak(0); // may make it semi-seq here - file().ack_out().twist(twist); - ack_in_.twist(twist); - bin64_t my_pick = - file().ack_out().find_filtered(ack_in_,bin64_t::ALL,binmap_t::FILLED); - while (my_pick.width()>max(1,(int)cwnd_)) - my_pick = my_pick.left(); - file().ack_out().twist(0); - ack_in_.twist(0); + if (hint_in_.empty() && last_recv_time_>NOW-rtt_avg_-TINT_SEC) { + bin64_t my_pick = ImposeHint(); // FIXME move to the loop if (my_pick!=bin64_t::NONE) { - my_pick = my_pick.twisted(twist); hint_in_.push_back(my_pick); dprintf("%s #%u *hint %s\n",tintstr(),id_,my_pick.str()); } @@ -74,7 +79,7 @@ bin64_t Channel::DequeueHint () { } //if (time < NOW-TINT_SEC*3/2 ) // continue; bad idea - if (ack_in_.get(hint)!=binmap_t::FILLED) + if (ack_in_.get(hint)!=binmap_t::FILLED) send = hint; } uint64_t mass = 0; @@ -112,7 +117,7 @@ void Channel::Send () { if (!file().is_complete()) AddHint(dgram); AddPex(dgram); - CleanDataOut(); + TimeoutDataOut(); data = AddData(dgram); } else { AddHandshake(dgram); @@ -122,18 +127,6 @@ void Channel::Send () { tintstr(),id_,dgram.size(),peer().str(),peer_channel_id_); if (dgram.size()==4) {// only the channel id; bare keep-alive data = bin64_t::ALL; - //dprintf("%s #%u considering keepalive %i %f %s\n", - // tintstr(),id_,(int)data_out_.size(),cwnd_,SEND_CONTROL_MODES[send_control_]); - //if (data_out_.size()>4; // may wake up a bit earlier if (data_out_.size()NOW-TINT_SEC || data_out_.empty())) + + if (tosend==bin64_t::NONE)// && (last_data_out_time_>NOW-TINT_SEC || data_out_.empty())) return bin64_t::NONE; // once in a while, empty data is sent just to check rtt FIXED - + if (ack_in_.is_empty() && file().size()) AddPeakHashes(dgram); AddUncleHashes(dgram,tosend); @@ -208,12 +198,12 @@ bin64_t Channel::AddData (Datagram& dgram) { dgram.Send(); // kind of fragmentation dgram.Push32(peer_channel_id_); } - + dgram.Push8(SWIFT_DATA); dgram.Push32(tosend.to32()); - + uint8_t buf[1024]; - size_t r = pread(file().file_descriptor(),buf,1024,tosend.base_offset()<<10); + size_t r = pread(file().file_descriptor(),buf,1024,tosend.base_offset()<<10); // TODO: corrupted data, retries, caching if (r<0) { print_error("error on reading"); @@ -221,56 +211,52 @@ bin64_t Channel::AddData (Datagram& dgram) { } assert(dgram.space()>=r+4+1); dgram.Push(buf,r); - + last_data_out_time_ = NOW; data_out_.push_back(tosend); dprintf("%s #%u +data %s\n",tintstr(),id_,tosend.str()); - + return tosend; } -void Channel::AddTs (Datagram& dgram) { - dgram.Push8(SWIFT_TS); - dgram.Push64(data_in_.time); - dprintf("%s #%u +ts %s\n",tintstr(),id_,tintstr(data_in_.time)); +void Channel::AddAck (Datagram& dgram) { + if (data_in_==tintbin()) + return; + dgram.Push8(SWIFT_ACK); + dgram.Push32(data_in_.bin.to32()); + dgram.Push32(data_in_.time); + ack_out_.set(data_in_.bin); + dprintf("%s #%u +ack %s %s\n", + tintstr(),id_,data_in_.bin.str(),tintstr(data_in_.time)); + if (data_in_.bin.layer()>2) + data_in_dbl_ = data_in_.bin; + data_in_ = tintbin(); } -void Channel::AddAck (Datagram& dgram) { +void Channel::AddHave (Datagram& dgram) { if (data_in_dbl_!=bin64_t::NONE) { // TODO: do redundancy better - dgram.Push8(SWIFT_ACK); + dgram.Push8(SWIFT_HAVE); dgram.Push32(data_in_dbl_.to32()); data_in_dbl_=bin64_t::NONE; } - if (data_in_.time!=TINT_NEVER) { // TODO: ACK NONE for corrupted data - AddTs(dgram); - bin64_t pos = data_in_.bin; // be precise file().ack_out().cover(data_in_.bin); - dgram.Push8(SWIFT_ACK); - dgram.Push32(pos.to32()); - //dgram.Push64(data_in_.time); - ack_out_.set(pos); - dprintf("%s #%u +ack %s %s\n",tintstr(),id_,pos.str(),tintstr(data_in_.time)); - data_in_ = tintbin(TINT_NEVER,bin64_t::NONE); - if (pos.layer()>2) - data_in_dbl_ = pos; - } for(int count=0; count<4; count++) { - bin64_t ack = file().ack_out().find_filtered(ack_out_, bin64_t::ALL, binmap_t::FILLED); + bin64_t ack = file().ack_out().find_filtered // FIXME: do rotating queue + (ack_out_, bin64_t::ALL, binmap_t::FILLED); if (ack==bin64_t::NONE) break; ack = file().ack_out().cover(ack); ack_out_.set(ack); - dgram.Push8(SWIFT_ACK); + dgram.Push8(SWIFT_HAVE); dgram.Push32(ack.to32()); - dprintf("%s #%u +ack %s\n",tintstr(),id_,ack.str()); + dprintf("%s #%u +have %s\n",tintstr(),id_,ack.str()); } } void Channel::Recv (Datagram& dgram) { dprintf("%s #%u recvd %i\n",tintstr(),id_,dgram.size()+4); - peer_send_time_ = 0; // has scope of 1 datagram dgrams_rcvd_++; if (last_send_time_ && rtt_avg_==TINT_SEC && dev_avg_==0) { rtt_avg_ = NOW - last_send_time_; @@ -284,7 +270,7 @@ void Channel::Recv (Datagram& dgram) { switch (type) { case SWIFT_HANDSHAKE: OnHandshake(dgram); break; case SWIFT_DATA: data=OnData(dgram); break; - case SWIFT_TS: OnTs(dgram); break; + case SWIFT_HAVE: OnHave(dgram); break; case SWIFT_ACK: OnAck(dgram); break; case SWIFT_HASH: OnHash(dgram); break; case SWIFT_HINT: OnHint(dgram); break; @@ -303,7 +289,6 @@ void Channel::OnHash (Datagram& dgram) { bin64_t pos = dgram.Pull32(); Sha1Hash hash = dgram.PullHash(); file().OfferHash(pos,hash); - //DLOG(INFO)<<"#"<bin==data_out_[i].bin) - peer_send_time_=0; // possibly retransmit - if (peer_send_time_) { // well, it is sorta ACK ACK - tint rtt = NOW-data_out_[i].time; - rtt_avg_ = (rtt_avg_*7 + rtt) >> 3; - dev_avg_ = ( dev_avg_*3 + abs(rtt-rtt_avg_) ) >> 2; - assert(data_out_[i].time!=TINT_NEVER); - tint owd = peer_send_time_ - data_out_[i].time; - owd_cur_bin_ = (owd_cur_bin_+1) & 3; - owd_current_[owd_cur_bin_] = owd; - if ( owd_min_bin_start_+TINT_SEC*30 < NOW ) { - owd_min_bin_start_ = NOW; - owd_min_bin_ = (owd_min_bin_+1) & 3; - owd_min_bins_[owd_min_bin_] = TINT_NEVER; - } - if (owd_min_bins_[owd_min_bin_]>owd) - owd_min_bins_[owd_min_bin_] = owd; - peer_send_time_ = 0; - } - dprintf("%s #%u sendctrl rtt %lli dev %lli based on %s\n", - tintstr(),id_,rtt_avg_,dev_avg_,data_out_[i].bin.str()); - bin64_t pos = data_out_[i].bin; - ack_rcvd_recent_++; - data_out_[i]=tintbin(); - max_ack_off = i; - if (ackd_pos==pos) - break; - } - } - while (!data_out_.empty() && data_out_.front().bin==bin64_t::NONE) { - data_out_.pop_front(); - max_ack_off--; - } - static const int MAX_REORDERING = 2; // the triple-ACK principle - if (max_ack_off>MAX_REORDERING) { - while (max_ack_off && (data_out_.front().bin==bin64_t::NONE - || ack_in_.is_filled(data_out_.front().bin)) ) { - data_out_.pop_front(); - max_ack_off--; - } - while (max_ack_off>MAX_REORDERING) { - ack_not_rcvd_recent_++; - data_out_tmo_.push_back(data_out_.front().bin); - dprintf("%s #%u Rdata %s\n",tintstr(),id_,data_out_.front().bin.str()); - data_out_.pop_front(); - max_ack_off--; - data_out_cap_ = bin64_t::ALL; - } +void Channel::OnAck (Datagram& dgram) { + bin64_t ackd_pos = dgram.Pull32(); + tint peer_time_ = dgram.Pull64(); + if (ackd_pos==bin64_t::NONE) + return; // likely, brocken packet / insufficient hashes + if (file().size() && ackd_pos.base_offset()>=file().packet_size()) { + eprintf("invalid ack: %s\n",ackd_pos.str()); + return; + } + ack_in_.set(ackd_pos); + int di = 0, ri = 0; + // find an entry for the send (data out) event + while ( di> 3; + dev_avg_ = ( dev_avg_*3 + abs(rtt-rtt_avg_) ) >> 2; + assert(data_out_[di].time!=TINT_NEVER); + // one-way delay calculations + tint owd = peer_time_ - data_out_[di].time; + owd_cur_bin_ = (owd_cur_bin_+1) & 3; + owd_current_[owd_cur_bin_] = owd; + if ( owd_min_bin_start_+TINT_SEC*30 < NOW ) { + owd_min_bin_start_ = NOW; + owd_min_bin_ = (owd_min_bin_+1) & 3; + owd_min_bins_[owd_min_bin_] = TINT_NEVER; } - peer_send_time_ = 0; + if (owd_min_bins_[owd_min_bin_]>owd) + owd_min_bins_[owd_min_bin_] = owd; + dprintf("%s #%u sendctrl rtt %lli dev %lli based on %s\n", + tintstr(),id_,rtt_avg_,dev_avg_,data_out_[di].bin.str()); + ack_rcvd_recent_++; + data_out_[di]=tintbin(); + } + // early loss detection by packet reordering + for (int re=0; re=file().packet_size()) { - eprintf("invalid ack: %s\n",ackd_pos.str()); - return; - } - dprintf("%s #%u -ack %s\n",tintstr(),id_,ackd_pos.str()); + return; // wow, peer has hashes ack_in_.set(ackd_pos); - CleanDataOut(ackd_pos); // FIXME do AFTER all ACKs -} - - -void Channel::OnTs (Datagram& dgram) { - peer_send_time_ = dgram.Pull64(); - dprintf("%s #%u -ts %lli\n",tintstr(),id_,peer_send_time_); + dprintf("%s #%u -have %s\n",tintstr(),id_,ackd_pos.str()); } void Channel::OnHint (Datagram& dgram) { bin64_t hint = dgram.Pull32(); + // FIXME: wake up here hint_in_.push_back(hint); - //ack_in_.set(hint,binmap_t::EMPTY); - //RequeueSend(cc_->OnHintRecvd(hint)); dprintf("%s #%u -hint %s\n",tintstr(),id_,hint.str()); } @@ -501,40 +474,40 @@ Channel* Channel::RecvDatagram (int socket) { data.Recv(); const Address& addr = data.address(); #define return_log(...) { printf(__VA_ARGS__); return NULL; } - if (data.size()<4) + if (data.size()<4) return_log("datagram shorter than 4 bytes %s\n",addr.str()); uint32_t mych = data.Pull32(); Sha1Hash hash; Channel* channel = NULL; if (!mych) { // handshake initiated - if (data.size()<1+4+1+4+Sha1Hash::SIZE) + if (data.size()<1+4+1+4+Sha1Hash::SIZE) return_log ("%s #0 incorrect size %i initial handshake packet %s\n", tintstr(),data.size(),addr.str()); uint8_t hashid = data.Pull8(); - if (hashid!=SWIFT_HASH) + if (hashid!=SWIFT_HASH) return_log ("%s #0 no hash in the initial handshake %s\n", tintstr(),addr.str()); bin64_t pos = data.Pull32(); - if (pos!=bin64_t::ALL) + if (pos!=bin64_t::ALL) return_log ("%s #0 that is not the root hash %s\n",tintstr(),addr.str()); hash = data.PullHash(); FileTransfer* file = FileTransfer::Find(hash); - if (!file) + if (!file) return_log ("%s #0 hash %s unknown, no such file %s\n",tintstr(),hash.hex().c_str(),addr.str()); dprintf("%s #0 -hash ALL %s\n",tintstr(),hash.hex().c_str()); for(binqueue::iterator i=file->hs_in_.begin(); i!=file->hs_in_.end(); i++) - if (channels[*i] && channels[*i]->peer_==data.address() && + if (channels[*i] && channels[*i]->peer_==data.address() && channels[*i]->last_recv_time_>NOW-TINT_SEC*2) return_log("%s #0 have a channel already to %s\n",tintstr(),addr.str()); channel = new Channel(file, socket, data.address()); } else { mych = DecodeID(mych); - if (mych>=channels.size()) + if (mych>=channels.size()) return_log("%s invalid channel #%u, %s\n",tintstr(),mych,addr.str()); channel = channels[mych]; - if (!channel) + if (!channel) return_log ("%s #%u is already closed\n",tintstr(),mych,addr.str()); - if (channel->peer() != addr) + if (channel->peer() != addr) return_log ("%s #%u invalid peer address %s!=%s\n", tintstr(),mych,channel->peer().str(),addr.str()); channel->own_id_mentioned_ = true; @@ -545,10 +518,10 @@ Channel* Channel::RecvDatagram (int socket) { } -void Channel::Loop (tint howlong) { - +void Channel::Loop (tint howlong) { + tint limit = Datagram::Time() + howlong; - + do { tint send_time(TINT_NEVER); @@ -561,16 +534,16 @@ void Channel::Loop (tint howlong) { sender->next_send_time_!=TINT_NEVER ) sender = NULL; // it was a stale entry } - + if ( sender!=NULL && send_time<=NOW ) { // it's time - + dprintf("%s #%u sch_send %s\n",tintstr(),sender->id(), tintstr(send_time)); sender->Send(); sender->Reschedule(); - + } else { // it's too early, wait - + tint towait = min(limit,send_time) - NOW; dprintf("%s #0 waiting %lliusec\n",tintstr(),towait); int rd = Datagram::Wait(socket_count,sockets,towait); @@ -581,20 +554,21 @@ void Channel::Loop (tint howlong) { } if (sender) // get back to that later send_queue.push(tintbin(send_time,sender->id())); - + } - + } while (NOWSwitchSendControl(CLOSE_CONTROL); } void Channel::Reschedule () { + TimeoutDataOut(); // precaution to know free cwnd next_send_time_ = NextSendTime(); if (next_send_time_!=TINT_NEVER) { assert(next_send_time_ plus some nice operators. - Most frequently used in different queues (acknowledgements, requests, + Most frequently used in different queues (acknowledgements, requests, etc). */ struct tintbin { tint time; @@ -72,7 +72,7 @@ namespace swift { tintbin() : time(TINT_NEVER), bin(bin64_t::NONE) {} tintbin(tint time_, bin64_t bin_) : time(time_), bin(bin_) {} tintbin(bin64_t bin_) : time(NOW), bin(bin_) {} - bool operator < (const tintbin& b) const + bool operator < (const tintbin& b) const { return time > b.time; } bool operator == (const tintbin& b) const { return time==b.time && bin==b.bin; } @@ -110,13 +110,12 @@ namespace swift { SWIFT_HANDSHAKE = 0, SWIFT_DATA = 1, SWIFT_ACK = 2, - SWIFT_TS = 8, - SWIFT_HINT = 3, + SWIFT_HAVE = 3, SWIFT_HASH = 4, SWIFT_PEX_ADD = 5, SWIFT_PEX_RM = 6, SWIFT_SIGNED_HASH = 7, - SWIFT_MSGTYPE_SENT = 8, + SWIFT_HINT = 8, SWIFT_MSGTYPE_RCVD = 9, SWIFT_MESSAGE_COUNT = 10 } messageid_t; @@ -131,8 +130,8 @@ namespace swift { public: - /** A constructor. Open/submit/retrieve a file. - * @param file_name the name of the file + /** A constructor. Open/submit/retrieve a file. + * @param file_name the name of the file * @param root_hash the root hash of the file; zero hash if the file is newly submitted */ FileTransfer(const char *file_name, const Sha1Hash& root_hash=Sha1Hash::ZERO); @@ -184,7 +183,7 @@ namespace swift { /** Messages we are accepting. */ uint64_t cap_out_; - + tint init_time_; public: @@ -209,7 +208,7 @@ namespace swift { public: virtual void Randomize (uint64_t twist) = 0; /** The piece picking method itself. - * @param offered the daata acknowledged by the peer + * @param offered the daata acknowledged by the peer * @param max_width maximum number of packets to ask for * @param expires (not used currently) when to consider request expired * @return the bin number to request */ @@ -238,11 +237,11 @@ namespace swift { being transferred between two peers. As we don't need buffers and lots of other TCP stuff, sizeof(Channel+members) must be below 1K. Normally, API users do not deal with this class. */ - class Channel { + class Channel { public: Channel (FileTransfer* file, int socket=-1, Address peer=Address()); ~Channel(); - + typedef enum { KEEP_ALIVE_CONTROL, PING_PONG_CONTROL, @@ -251,7 +250,7 @@ namespace swift { LEDBAT_CONTROL, CLOSE_CONTROL } send_control_t; - + static const char* SEND_CONTROL_MODES[]; static Channel* @@ -263,7 +262,7 @@ namespace swift { void Close (); void OnAck (Datagram& dgram); - void OnTs (Datagram& dgram); + void OnHave (Datagram& dgram); bin64_t OnData (Datagram& dgram); void OnHint (Datagram& dgram); void OnHash (Datagram& dgram); @@ -272,7 +271,7 @@ namespace swift { void AddHandshake (Datagram& dgram); bin64_t AddData (Datagram& dgram); void AddAck (Datagram& dgram); - void AddTs (Datagram& dgram); + void AddHave (Datagram& dgram); void AddHint (Datagram& dgram); void AddUncleHashes (Datagram& dgram, bin64_t pos); void AddPeakHashes (Datagram& dgram); @@ -287,7 +286,7 @@ namespace swift { tint SlowStartNextSendTime (); tint AimdNextSendTime (); tint LedbatNextSendTime (); - + static int MAX_REORDERING; static tint TIMEOUT; static tint MIN_DEV; @@ -298,7 +297,7 @@ namespace swift { static bool SELF_CONN_OK; static tint MAX_POSSIBLE_RTT; static FILE* debug_file; - + const std::string id_string () const; /** A channel is "established" if had already sent and received packets. */ bool is_established () { return peer_channel_id_ && own_id_mentioned_; } @@ -308,10 +307,10 @@ namespace swift { tint ack_timeout () { tint dev = dev_avg_ < MIN_DEV ? MIN_DEV : dev_avg_; tint tmo = rtt_avg_ + dev * 4; - return tmo < 30*TINT_SEC ? tmo : 30*TINT_SEC; + return tmo < 30*TINT_SEC ? tmo : 30*TINT_SEC; } uint32_t id () const { return id_; } - + static int DecodeID(int scrambled); static int EncodeID(int unscrambled); static Channel* channel(int i) { @@ -363,7 +362,6 @@ namespace swift { tint last_data_in_time_; tint last_loss_time_; tint next_send_time_; - tint peer_send_time_; /** Congestion window; TODO: int, bytes. */ float cwnd_; /** Data sending interval. */ @@ -391,7 +389,8 @@ namespace swift { } /** Get a request for one packet from the queue of peer's requests. */ bin64_t DequeueHint(); - void CleanDataOut (bin64_t acks_pos=bin64_t::NONE); + bin64_t ImposeHint(); + void TimeoutDataOut (); void CleanStaleHintOut(); void CleanHintOut(bin64_t pos); void Reschedule(); @@ -401,7 +400,7 @@ namespace swift { static SOCKET sockets[8]; static int socket_count; static tint last_tick; - static tbheap send_queue; + static tbheap send_queue; static Address tracker; static std::vector channels;