add .gitignore
[swift-upb.git] / sendrecv.cpp
index 978618b..1f808ba 100644 (file)
@@ -24,7 +24,6 @@ void    Channel::AddPeakHashes (Datagram& dgram) {
         dgram.Push8(SWIFT_HASH);
         dgram.Push32((uint32_t)peak);
         dgram.PushHash(file().peak_hash(i));
-        //DLOG(INFO)<<"#"<<id<<" +pHASH"<<file().peak(i);
         dprintf("%s #%u +phash %s\n",tintstr(),id_,peak.str());
     }
 }
@@ -38,7 +37,6 @@ void    Channel::AddUncleHashes (Datagram& dgram, bin64_t pos) {
         dgram.Push8(SWIFT_HASH);
         dgram.Push32((uint32_t)uncle);
         dgram.PushHash( file().hash(uncle) );
-        //DLOG(INFO)<<"#"<<id<<" +uHASH"<<uncle;
         dprintf("%s #%u +hash %s\n",tintstr(),id_,uncle.str());
         pos = pos.parent();
     }
@@ -135,6 +133,7 @@ void    Channel::Send () {
     last_send_time_ = NOW;
     sent_since_recv_++;
     dgrams_sent_++;
+    Reschedule();
 }
 
 
@@ -185,7 +184,7 @@ bin64_t        Channel::AddData (Datagram& dgram) {
         }
     } else
         dprintf("%s #%u sendctrl wait cwnd %f data_out %i next %s\n",
-                tintstr(),id_,cwnd_,data_out_.size(),tintstr(last_data_out_time_+NOW-send_interval_));
+                tintstr(),id_,cwnd_,(int)data_out_.size(),tintstr(last_data_out_time_+NOW-send_interval_));
 
     if (tosend==bin64_t::NONE)// && (last_data_out_time_>NOW-TINT_SEC || data_out_.empty()))
         return bin64_t::NONE; // once in a while, empty data is sent just to check rtt FIXED
@@ -284,6 +283,7 @@ void    Channel::Recv (Datagram& dgram) {
     }
     last_recv_time_ = NOW;
     sent_since_recv_ = 0;
+    Reschedule();
 }
 
 
@@ -320,11 +320,16 @@ bin64_t Channel::OnData (Datagram& dgram) {  // TODO: HAVE NONE for corrupted da
     bin64_t pos = dgram.Pull32();
     uint8_t *data;
     int length = dgram.Pull(&data,1024);
-    bool ok = (pos==bin64_t::NONE) || file().OfferData(pos, (char*)data, length) ;
+    bool ok = (pos==bin64_t::NONE) || 
+        (!file().ack_out().get(pos) && file().OfferData(pos, (char*)data, length) );
     dprintf("%s #%u %cdata %s\n",tintstr(),id_,ok?'-':'!',pos.str());
     data_in_ = tintbin(NOW,bin64_t::NONE);
     if (!ok)
         return bin64_t::NONE;
+    bin64_t cover = transfer().ack_out().cover(pos);
+    for(int i=0; i<transfer().cb_installed; i++)
+        if (cover.layer()>=transfer().cb_agg[i])
+            transfer().callbacks[i](transfer().fd(),cover);  // FIXME
     data_in_.bin = pos;
     if (pos!=bin64_t::NONE) {
         if (last_data_in_time_) {
@@ -340,7 +345,7 @@ bin64_t Channel::OnData (Datagram& dgram) {  // TODO: HAVE NONE for corrupted da
 
 void    Channel::OnAck (Datagram& dgram) {
     bin64_t ackd_pos = dgram.Pull32();
-    tint peer_time_ = dgram.Pull64(); // FIXME 32
+    tint peer_time = dgram.Pull64(); // FIXME 32
     // FIXME FIXME: wrap around here
     if (ackd_pos==bin64_t::NONE)
         return; // likely, brocken packet / insufficient hashes
@@ -359,16 +364,16 @@ void    Channel::OnAck (Datagram& dgram) {
     while (  ri<data_out_tmo_.size() && !data_out_tmo_[ri].bin.within(ackd_pos) )
         ri++;
     dprintf("%s #%u %cack %s %lli\n",tintstr(),id_,
-            di==data_out_.size()?'?':'-',ackd_pos.str(),peer_time_);
+            di==data_out_.size()?'?':'-',ackd_pos.str(),peer_time);
     if (di!=data_out_.size() && ri==data_out_tmo_.size()) { // not a retransmit
             // round trip time calculations
         tint rtt = NOW-data_out_[di].time;
         rtt_avg_ = (rtt_avg_*7 + rtt) >> 3;
-        dev_avg_ = ( dev_avg_*3 + abs(rtt-rtt_avg_) ) >> 2;
+        dev_avg_ = ( dev_avg_*3 + ::abs(rtt-rtt_avg_) ) >> 2;
         assert(data_out_[di].time!=TINT_NEVER);
             // one-way delay calculations
-        tint owd = peer_time_ - data_out_[di].time;
-        owd_cur_bin_ = (owd_cur_bin_+1) & 3;
+        tint owd = peer_time - data_out_[di].time;
+        owd_cur_bin_ = 0;//(owd_cur_bin_+1) & 3;
         owd_current_[owd_cur_bin_] = owd;
         if ( owd_min_bin_start_+TINT_SEC*30 < NOW ) {
             owd_min_bin_start_ = NOW;
@@ -474,17 +479,17 @@ void    Channel::AddPex (Datagram& dgram) {
 }
 
 
-Channel*    Channel::RecvDatagram (int socket) {
+void    Channel::RecvDatagram (SOCKET socket) {
     Datagram data(socket);
     data.Recv();
     const Address& addr = data.address();
-#define return_log(...) { printf(__VA_ARGS__); return NULL; }
+#define return_log(...) { fprintf(stderr,__VA_ARGS__); return; }
     if (data.size()<4)
         return_log("datagram shorter than 4 bytes %s\n",addr.str());
     uint32_t mych = data.Pull32();
     Sha1Hash hash;
     Channel* channel = NULL;
-    if (!mych) { // handshake initiated
+    if (mych==0) { // handshake initiated
         if (data.size()<1+4+1+4+Sha1Hash::SIZE)
             return_log ("%s #0 incorrect size %i initial handshake packet %s\n",
                         tintstr(),data.size(),addr.str());
@@ -519,7 +524,6 @@ Channel*    Channel::RecvDatagram (int socket) {
     }
     //dprintf("recvd %i bytes for %i\n",data.size(),channel->id);
     channel->Recv(data);
-    return channel;
 }
 
 
@@ -545,18 +549,12 @@ void    Channel::Loop (tint howlong) {
             dprintf("%s #%u sch_send %s\n",tintstr(),sender->id(),
                     tintstr(send_time));
             sender->Send();
-            sender->Reschedule();
 
         } else {  // it's too early, wait
 
             tint towait = min(limit,send_time) - NOW;
             dprintf("%s #0 waiting %lliusec\n",tintstr(),towait);
-            int rd = Datagram::Wait(socket_count,sockets,towait);
-            if (rd!=INVALID_SOCKET) { // in meantime, received something
-                Channel* receiver = RecvDatagram(rd);
-                if (receiver) // receiver's state may have changed
-                    receiver->Reschedule();
-            }
+            Datagram::Wait(towait);
             if (sender)  // get back to that later
                 send_queue.push(tintbin(send_time,sender->id()));