add .gitignore
[swift-upb.git] / sendrecv.cpp
index bfe1ee7..1f808ba 100644 (file)
@@ -24,7 +24,6 @@ void    Channel::AddPeakHashes (Datagram& dgram) {
         dgram.Push8(SWIFT_HASH);
         dgram.Push32((uint32_t)peak);
         dgram.PushHash(file().peak_hash(i));
-        //DLOG(INFO)<<"#"<<id<<" +pHASH"<<file().peak(i);
         dprintf("%s #%u +phash %s\n",tintstr(),id_,peak.str());
     }
 }
@@ -38,7 +37,6 @@ void    Channel::AddUncleHashes (Datagram& dgram, bin64_t pos) {
         dgram.Push8(SWIFT_HASH);
         dgram.Push32((uint32_t)uncle);
         dgram.PushHash( file().hash(uncle) );
-        //DLOG(INFO)<<"#"<<id<<" +uHASH"<<uncle;
         dprintf("%s #%u +hash %s\n",tintstr(),id_,uncle.str());
         pos = pos.parent();
     }
@@ -113,6 +111,7 @@ void    Channel::Send () {
     bin64_t data = bin64_t::NONE;
     if ( is_established() ) {
         // FIXME: seeder check
+        AddHave(dgram);
         AddAck(dgram);
         if (!file().is_complete())
             AddHint(dgram);
@@ -121,6 +120,7 @@ void    Channel::Send () {
         data = AddData(dgram);
     } else {
         AddHandshake(dgram);
+        AddHave(dgram);
         AddAck(dgram);
     }
     dprintf("%s #%u sent %ib %s:%x\n",
@@ -133,6 +133,7 @@ void    Channel::Send () {
     last_send_time_ = NOW;
     sent_since_recv_++;
     dgrams_sent_++;
+    Reschedule();
 }
 
 
@@ -183,7 +184,7 @@ bin64_t        Channel::AddData (Datagram& dgram) {
         }
     } else
         dprintf("%s #%u sendctrl wait cwnd %f data_out %i next %s\n",
-                tintstr(),id_,cwnd_,data_out_.size(),tintstr(last_data_out_time_+NOW-send_interval_));
+                tintstr(),id_,cwnd_,(int)data_out_.size(),tintstr(last_data_out_time_+NOW-send_interval_));
 
     if (tosend==bin64_t::NONE)// && (last_data_out_time_>NOW-TINT_SEC || data_out_.empty()))
         return bin64_t::NONE; // once in a while, empty data is sent just to check rtt FIXED
@@ -256,7 +257,7 @@ void    Channel::AddHave (Datagram& dgram) {
 
 
 void    Channel::Recv (Datagram& dgram) {
-    dprintf("%s #%u recvd %i\n",tintstr(),id_,dgram.size()+4);
+    dprintf("%s #%u recvd %ib\n",tintstr(),id_,dgram.size()+4);
     dgrams_rcvd_++;
     if (last_send_time_ && rtt_avg_==TINT_SEC && dev_avg_==0) {
         rtt_avg_ = NOW - last_send_time_;
@@ -282,6 +283,7 @@ void    Channel::Recv (Datagram& dgram) {
     }
     last_recv_time_ = NOW;
     sent_since_recv_ = 0;
+    Reschedule();
 }
 
 
@@ -318,11 +320,16 @@ bin64_t Channel::OnData (Datagram& dgram) {  // TODO: HAVE NONE for corrupted da
     bin64_t pos = dgram.Pull32();
     uint8_t *data;
     int length = dgram.Pull(&data,1024);
-    bool ok = (pos==bin64_t::NONE) || file().OfferData(pos, (char*)data, length) ;
+    bool ok = (pos==bin64_t::NONE) || 
+        (!file().ack_out().get(pos) && file().OfferData(pos, (char*)data, length) );
     dprintf("%s #%u %cdata %s\n",tintstr(),id_,ok?'-':'!',pos.str());
     data_in_ = tintbin(NOW,bin64_t::NONE);
     if (!ok)
         return bin64_t::NONE;
+    bin64_t cover = transfer().ack_out().cover(pos);
+    for(int i=0; i<transfer().cb_installed; i++)
+        if (cover.layer()>=transfer().cb_agg[i])
+            transfer().callbacks[i](transfer().fd(),cover);  // FIXME
     data_in_.bin = pos;
     if (pos!=bin64_t::NONE) {
         if (last_data_in_time_) {
@@ -338,7 +345,7 @@ bin64_t Channel::OnData (Datagram& dgram) {  // TODO: HAVE NONE for corrupted da
 
 void    Channel::OnAck (Datagram& dgram) {
     bin64_t ackd_pos = dgram.Pull32();
-    tint peer_time_ = dgram.Pull64(); // FIXME 32
+    tint peer_time = dgram.Pull64(); // FIXME 32
     // FIXME FIXME: wrap around here
     if (ackd_pos==bin64_t::NONE)
         return; // likely, brocken packet / insufficient hashes
@@ -357,16 +364,16 @@ void    Channel::OnAck (Datagram& dgram) {
     while (  ri<data_out_tmo_.size() && !data_out_tmo_[ri].bin.within(ackd_pos) )
         ri++;
     dprintf("%s #%u %cack %s %lli\n",tintstr(),id_,
-            di==data_out_.size()?'?':'-',ackd_pos.str(),peer_time_);
+            di==data_out_.size()?'?':'-',ackd_pos.str(),peer_time);
     if (di!=data_out_.size() && ri==data_out_tmo_.size()) { // not a retransmit
             // round trip time calculations
         tint rtt = NOW-data_out_[di].time;
         rtt_avg_ = (rtt_avg_*7 + rtt) >> 3;
-        dev_avg_ = ( dev_avg_*3 + abs(rtt-rtt_avg_) ) >> 2;
+        dev_avg_ = ( dev_avg_*3 + ::abs(rtt-rtt_avg_) ) >> 2;
         assert(data_out_[di].time!=TINT_NEVER);
             // one-way delay calculations
-        tint owd = peer_time_ - data_out_[di].time;
-        owd_cur_bin_ = (owd_cur_bin_+1) & 3;
+        tint owd = peer_time - data_out_[di].time;
+        owd_cur_bin_ = 0;//(owd_cur_bin_+1) & 3;
         owd_current_[owd_cur_bin_] = owd;
         if ( owd_min_bin_start_+TINT_SEC*30 < NOW ) {
             owd_min_bin_start_ = NOW;
@@ -378,18 +385,19 @@ void    Channel::OnAck (Datagram& dgram) {
         dprintf("%s #%u sendctrl rtt %lli dev %lli based on %s\n",
                 tintstr(),id_,rtt_avg_,dev_avg_,data_out_[di].bin.str());
         ack_rcvd_recent_++;
-        data_out_[di]=tintbin();
-    }
-    // early loss detection by packet reordering
-    for (int re=0; re<di-MAX_REORDERING; re++) {
-        if (data_out_[re]==tintbin())
-            continue;
-        ack_not_rcvd_recent_++;
-        data_out_tmo_.push_back(data_out_.front().bin);
-        dprintf("%s #%u Rdata %s\n",tintstr(),id_,data_out_.front().bin.str());
-        data_out_cap_ = bin64_t::ALL;
-        data_out_[ri] = tintbin();
+        // early loss detection by packet reordering
+        for (int re=0; re<di-MAX_REORDERING; re++) {
+            if (data_out_[re]==tintbin())
+                continue;
+            ack_not_rcvd_recent_++;
+            data_out_tmo_.push_back(data_out_[re].bin);
+            dprintf("%s #%u Rdata %s\n",tintstr(),id_,data_out_.front().bin.str());
+            data_out_cap_ = bin64_t::ALL;
+            data_out_[re] = tintbin();
+        }
     }
+    if (di!=data_out_.size())
+        data_out_[di]=tintbin();
     // clear zeroed items
     while (!data_out_.empty() && ( data_out_.front()==tintbin() ||
             ack_in_.is_filled(data_out_.front().bin) ) )
@@ -401,7 +409,8 @@ void    Channel::OnAck (Datagram& dgram) {
 void Channel::TimeoutDataOut ( ) {
     // losses: timeouted packets
     tint timeout = NOW - ack_timeout();
-    for (int i=0; i<data_out_.size() && data_out_[i].time<timeout; i++) {
+    while (!data_out_.empty() && 
+        ( data_out_.front().time<timeout || data_out_.front()==tintbin() ) ) {
         if (data_out_.front()!=tintbin() && ack_in_.is_empty(data_out_.front().bin)) {
             ack_not_rcvd_recent_++;
             data_out_cap_ = bin64_t::ALL;
@@ -470,17 +479,17 @@ void    Channel::AddPex (Datagram& dgram) {
 }
 
 
-Channel*    Channel::RecvDatagram (int socket) {
+void    Channel::RecvDatagram (SOCKET socket) {
     Datagram data(socket);
     data.Recv();
     const Address& addr = data.address();
-#define return_log(...) { printf(__VA_ARGS__); return NULL; }
+#define return_log(...) { fprintf(stderr,__VA_ARGS__); return; }
     if (data.size()<4)
         return_log("datagram shorter than 4 bytes %s\n",addr.str());
     uint32_t mych = data.Pull32();
     Sha1Hash hash;
     Channel* channel = NULL;
-    if (!mych) { // handshake initiated
+    if (mych==0) { // handshake initiated
         if (data.size()<1+4+1+4+Sha1Hash::SIZE)
             return_log ("%s #0 incorrect size %i initial handshake packet %s\n",
                         tintstr(),data.size(),addr.str());
@@ -515,7 +524,6 @@ Channel*    Channel::RecvDatagram (int socket) {
     }
     //dprintf("recvd %i bytes for %i\n",data.size(),channel->id);
     channel->Recv(data);
-    return channel;
 }
 
 
@@ -541,18 +549,12 @@ void    Channel::Loop (tint howlong) {
             dprintf("%s #%u sch_send %s\n",tintstr(),sender->id(),
                     tintstr(send_time));
             sender->Send();
-            sender->Reschedule();
 
         } else {  // it's too early, wait
 
             tint towait = min(limit,send_time) - NOW;
             dprintf("%s #0 waiting %lliusec\n",tintstr(),towait);
-            int rd = Datagram::Wait(socket_count,sockets,towait);
-            if (rd!=INVALID_SOCKET) { // in meantime, received something
-                Channel* receiver = RecvDatagram(rd);
-                if (receiver) // receiver's state may have changed
-                    receiver->Reschedule();
-            }
+            Datagram::Wait(towait);
             if (sender)  // get back to that later
                 send_queue.push(tintbin(send_time,sender->id()));
 
@@ -569,7 +571,6 @@ void Channel::Close () {
 
 
 void Channel::Reschedule () {
-    TimeoutDataOut(); // precaution to know free cwnd
     next_send_time_ = NextSendTime();
     if (next_send_time_!=TINT_NEVER) {
         assert(next_send_time_<NOW+TINT_MIN);