tint Datagram::now = Datagram::Time();
tint Datagram::epoch = now;
-uint32_t Datagram::Address::LOCALHOST = INADDR_LOOPBACK;
+uint32_t Address::LOCALHOST = INADDR_LOOPBACK;
uint64_t Datagram::dgrams_up=0, Datagram::dgrams_down=0,
Datagram::bytes_up=0, Datagram::bytes_down=0;
#define INVALID_SOCKET -1
#endif
+
+struct Address {
+ struct sockaddr_in addr;
+ static uint32_t LOCALHOST;
+ void init(uint32_t ipv4=0, uint16_t port=0) {
+ memset(&addr,0,sizeof(struct sockaddr_in));
+ addr.sin_family = AF_INET;
+ addr.sin_port = htons(port);
+ addr.sin_addr.s_addr = htonl(ipv4);
+ }
+ Address() { init(); }
+ Address(const char* ip, uint16_t port) {
+ init(LOCALHOST,port);
+ inet_aton(ip,&(addr.sin_addr));
+ }
+ Address(uint16_t port) {
+ init(LOCALHOST,port);
+ }
+ Address(uint32_t ipv4addr, uint16_t port) {
+ init(ipv4addr,port);
+ }
+ Address(const struct sockaddr_in& address) : addr(address) {}
+ uint32_t ipv4 () const { return ntohl(addr.sin_addr.s_addr); }
+ uint16_t port () const { return ntohs(addr.sin_port); }
+ operator sockaddr_in () const {return addr;}
+ bool operator == (const Address& b) const {
+ return addr.sin_family==b.addr.sin_family &&
+ addr.sin_port==b.addr.sin_port &&
+ addr.sin_addr.s_addr==b.addr.sin_addr.s_addr;
+ }
+ std::string str () const {
+ char s[32];
+ sprintf(s,"%i.%i.%i.%i:%i",ipv4()>>24,(ipv4()>>16)&0xff,
+ (ipv4()>>8)&0xff,ipv4()&0xff,port());
+ return std::string(s);
+ }
+ bool operator != (const Address& b) const { return !(*this==b); }
+};
+
+
struct Datagram {
- struct Address {
- struct sockaddr_in addr;
- static uint32_t LOCALHOST;
- void init(uint32_t ipv4=0, uint16_t port=0) {
- memset(&addr,0,sizeof(struct sockaddr_in));
- addr.sin_family = AF_INET;
- addr.sin_port = htons(port);
- addr.sin_addr.s_addr = htonl(ipv4);
- }
- Address() { init(); }
- Address(const char* ip, uint16_t port) {
- init(LOCALHOST,port);
- inet_aton(ip,&(addr.sin_addr));
- }
- Address(uint16_t port) {
- init(LOCALHOST,port);
- }
- Address(uint32_t ipv4addr, uint16_t port) {
- init(ipv4addr,port);
- }
- Address(const struct sockaddr_in& address) : addr(address) {}
- uint32_t ipv4 () const { return ntohl(addr.sin_addr.s_addr); }
- uint16_t port () const { return ntohs(addr.sin_port); }
- operator sockaddr_in () const {return addr;}
- bool operator == (const Address& b) const {
- return addr.sin_family==b.addr.sin_family &&
- addr.sin_port==b.addr.sin_port &&
- addr.sin_addr.s_addr==b.addr.sin_addr.s_addr;
- }
- std::string str () const {
- char s[32];
- sprintf(s,"%i.%i.%i.%i:%i",ipv4()>>24,(ipv4()>>16)&0xff,
- (ipv4()>>8)&0xff,ipv4()&0xff,port());
- return std::string(s);
- }
- bool operator != (const Address& b) const { return !(*this==b); }
- };
-
Address addr;
SOCKET sock;
int offset, length;
using namespace p2tp;
+/** P2TP downloader. Params: root hash, filename, tracker ip/port, own ip/port */
int main (int argn, char** args) {
- assert(0<p2tp::Listen(Datagram::Address(INADDR_ANY,7002)));
- p2tp::SetTracker(Datagram::Address("130.161.211.198",7001));
- unlink("doc/sofi-copy.jpg");
- int file = p2tp::Open("doc/sofi-copy.jpg",
- Sha1Hash(true,"282a863d5567695161721686a59f0c667250a35d"));
+ if (argn<4) {
+ fprintf(stderr,"parameters: root_hash filename tracker_ip/port [own_ip/port]\n");
+ return -1;
+ }
+ Sha1Hash root_hash(true,args[1]);
+ if (root_hash==Sha1Hash::ZERO) {
+ fprintf(stderr,"Sha1 hash format: hex, 40 symbols\n");
+ return -2;
+ }
+
+ char* filename = args[2];
+
+ Address tracker(args[3]), bindaddr(args[4]);
+
+ if (tracker==Address()) {
+ fprintf(stderr,"Tracker address format: [1.2.3.4:]12345\n");
+ return -2;
+ }
+ if (bindaddr==Address())
+ bindaddr = Address(rand()%10000+7000);
+
+ assert(0<p2tp::Listen(bindaddr));
+
+ p2tp::SetTracker(tracker);
+
+ int file = p2tp::Open(filename,root_hash);
while (!p2tp::Complete(file)) {
- p2tp::Loop(TINT_SEC/10);
+ p2tp::Loop(TINT_SEC/100);
printf("%lli dgram %lli bytes up, %lli dgram %lli bytes down\n",
Datagram::dgrams_up, Datagram::bytes_up,
Datagram::dgrams_down, Datagram::bytes_down );
}
tint next_send_time ( ){
- return cwnd_ ? Datagram::now + (rtt_avg_/cwnd_) : TINT_NEVER; // TODO keepalives
+ return cwnd_ ? NOW + (rtt_avg_/cwnd_) : TINT_NEVER; // TODO keepalives
}
void OnDataSent(bin64_t b) {
cwnd_ = 0; // nothing to send; suspend
} else {
last_bin_sent_ = b;
- last_send_time_ = Datagram::now;
+ last_send_time_ = NOW;
in_flight_++;
}
}
void OnDataRecvd(bin64_t b) {
- last_recv_time_ = Datagram::now;
+ last_recv_time_ = NOW;
}
void OnAckRcvd(bin64_t b, tint peer_stamp) {
if (last_bin_sent_!=b)
return;
if (peer_stamp!=TINT_NEVER)
- rtt_avg_ = (rtt_avg_*7 + (Datagram::now-last_send_time_)) >> 3; // van Jac
+ rtt_avg_ = (rtt_avg_*7 + (NOW-last_send_time_)) >> 3; // van Jac
in_flight_--;
}
}
void PingPongController::OnDataSent(bin64_t b) {
- if ( (ch_->last_recv_time_ && ch_->last_recv_time_<Datagram::now-TINT_SEC*3) || //no reply
+ if ( (ch_->last_recv_time_ && ch_->last_recv_time_<NOW-TINT_SEC*3) || //no reply
(b==bin64_t::ALL && MaySendData()) ) // nothing to send
Swap(new KeepAliveController(this));
}
dprintf("%s #%i maysend %i < %f & %s (rtt %lli)\n",Datagram::TimeStr(),
ch_->id,(int)ch_->data_out_.size(),cwnd_,Datagram::TimeStr(NextSendTime()),
ch_->rtt_avg_);
- return ch_->data_out_.size() < cwnd_ && Datagram::now >= NextSendTime();
+ return ch_->data_out_.size() < cwnd_ && NOW >= NextSendTime();
}
tint CwndController::NextSendTime () {
public:
SimpleSelector () {
}
- void AddPeer (const Datagram::Address& addr, const Sha1Hash& root) {
+ void AddPeer (const Address& addr, const Sha1Hash& root) {
peers.push_front(memo_t(addr,root)); //,root.fingerprint() !!!
}
Address GetPeer (const Sha1Hash& for_root) {
this->id = channels.size();
channels.push_back(this);
cc_ = new PingPongController(this);
- RequeueSend(Datagram::now);
+ RequeueSend(NOW);
}
}
-int p2tp::Listen (Datagram::Address addr) {
+int p2tp::Listen (Address addr) {
int sock = Datagram::Bind(addr);
if (sock!=INVALID_SOCKET)
Channel::sockets[Channel::socket_count++] = sock;
}
-void p2tp::AddPeer (Datagram::Address address, const Sha1Hash& root) {
+void p2tp::AddPeer (Address address, const Sha1Hash& root) {
Channel::peer_selector->AddPeer(address,root);
}
namespace p2tp {
+ #define NOW Datagram::now
struct tintbin {
tint time;
bin64_t bin;
tintbin(const tintbin& b) : time(b.time), bin(b.bin) {}
tintbin() : time(0), bin(bin64_t::NONE) {}
tintbin(tint time_, bin64_t bin_) : time(time_), bin(bin_) {}
- tintbin(bin64_t bin_) : time(Datagram::now), bin(bin_) {}
+ tintbin(bin64_t bin_) : time(NOW), bin(bin_) {}
};
-
typedef std::deque<tintbin> tbqueue;
typedef std::deque<bin64_t> binqueue;
- typedef Datagram::Address Address;
+ typedef Address Address;
typedef enum {
P2TP_HANDSHAKE = 0,
we use a rotating queue of bin completion events. */
//bin64_t RevealAck (uint64_t& offset);
/** Rotating queue read for channels of this transmission. */
- uint32_t RevealChannel (int& i);
+ int RevealChannel (int& i);
static FileTransfer* Find (const Sha1Hash& hash);
static FileTransfer* file (int fd) {
bins& ack_out () { return ack_out_; }
int file_descriptor () const { return fd_; }
PiecePicker& picker () { return *picker_; }
- int channel_count () const { return handshake_in_.size(); }
+ int channel_count () const { return hs_in_.size(); }
static int instance; // FIXME this smells
char* error_;
/** Channels working for this transfer. */
- std::deque<int> handshake_in_;
+ binqueue hs_in_;
+ int hs_in_offset_;
std::deque<Address> pex_in_;
/** Messages we are accepting. */
uint64_t cap_out_;
class PeerSelector {
public:
- virtual void AddPeer (const Datagram::Address& addr, const Sha1Hash& root) = 0;
- virtual Datagram::Address GetPeer (const Sha1Hash& for_root) = 0;
+ virtual void AddPeer (const Address& addr, const Sha1Hash& root) = 0;
+ virtual Address GetPeer (const Sha1Hash& for_root) = 0;
};
/** Channel id: index in the channel array. */
uint32_t id;
/** Socket address of the peer. */
- Datagram::Address peer_;
+ Address peer_;
/** The UDP socket fd. */
int socket_;
/** Descriptor of the file in question. */
static Address tracker;
static std::vector<Channel*> channels;
- friend int Listen (Datagram::Address addr);
+ friend int Listen (Address addr);
friend void Shutdown (int sock_des);
- friend void AddPeer (Datagram::Address address, const Sha1Hash& root);
+ friend void AddPeer (Address address, const Sha1Hash& root);
friend void SetTracker(const Address& tracker);
friend int Open (const char*, const Sha1Hash&) ; // FIXME
/*************** The top-level API ****************/
/** Start listening a port. Returns socket descriptor. */
- int Listen (Datagram::Address addr);
+ int Listen (Address addr);
/** Run send/receive loop for the specified amount of time. */
void Loop (tint till);
/** Stop listening to a port. */
/** Add a possible peer which participares in a given transmission. In the case
root hash is zero, the peer might be talked to regarding any transmission
(likely, a tracker, cache or an archive). */
- void AddPeer (Datagram::Address address, const Sha1Hash& root=Sha1Hash::ZERO);
+ void AddPeer (Address address, const Sha1Hash& root=Sha1Hash::ZERO);
void SetTracker(const Address& tracker);
/*void Channel::CleanStaleHints () {
while ( !hint_out.empty() && file().ack_out().get(hint_out.front().bin)==bins::FILLED )
hint_out.pop_front(); // FIXME must normally clear fulfilled entries
- tint timed_out = Datagram::now - cc_->RoundTripTime()*8;
+ tint timed_out = NOW - cc_->RoundTripTime()*8;
while ( !hint_out.empty() && hint_out.front().time < timed_out ) {
file().picker()->Snubbed(hint_out.front().bin);
hint_out.pop_front();
void Channel::ClearStaleDataOut() {
int oldsize = data_out_.size();
while ( data_out_.size() && data_out_.front().time <
- Datagram::now - rtt_avg_ - dev_avg_*4 )
+ NOW - rtt_avg_ - dev_avg_*4 )
data_out_.pop_front();
if (data_out_.size()!=oldsize)
cc_->OnAckRcvd(bin64_t::NONE);
if ( is_established() ) {
AddAck(dgram);
AddHint(dgram);
+ AddPex(dgram);
ClearStaleDataOut();
if (cc_->MaySendData())
data = AddData(dgram);
if (dgram.size()==4) // only the channel id; bare keep-alive
data = bin64_t::ALL;
cc_->OnDataSent(data);
- last_send_time_ = Datagram::now;
+ last_send_time_ = NOW;
RequeueSend(cc_->NextSendTime());
}
while (!hint_out_.empty()) {
tintbin f = hint_out_.front();
- if (f.time<Datagram::now-rtt_avg_*8) {
+ if (f.time<NOW-rtt_avg_*8) {
hint_out_.pop_front();
} else {
int status = file().ack_out().get(f.bin);
}
}
/*while (!hint_out_.empty() &&
- (hint_out_.front().time<Datagram::now-TINT_SEC ||
+ (hint_out_.front().time<NOW-TINT_SEC ||
file().ack_out().get(hint_out_.front().bin)==bins::FILLED ) ) {
file().picker().Expired(hint_out_.front().bin);
hint_out_.pop_front();
void Channel::Recv (Datagram& dgram) {
if (last_send_time_ && rtt_avg_==TINT_SEC && dev_avg_==0) {
- rtt_avg_ = Datagram::now - last_send_time_;
+ rtt_avg_ = NOW - last_send_time_;
dev_avg_ = rtt_avg_;
dip_avg_ = rtt_avg_;
+ file_->hs_in_.push_back(id);
dprintf("%s #%i rtt init %lli\n",Datagram::TimeStr(),id,rtt_avg_);
}
bin64_t data = dgram.size() ? bin64_t::NONE : bin64_t::ALL;
}
}
cc_->OnDataRecvd(data);
- last_recv_time_ = Datagram::now;
+ last_recv_time_ = NOW;
if (data!=bin64_t::ALL)
- RequeueSend(Datagram::now);
+ RequeueSend(NOW);
}
bool ok = file().OfferData(pos, data, length) ;
dprintf("%s #%i %cdata (%lli)\n",Datagram::TimeStr(),id,ok?'-':'!',pos.offset());
if (ok) {
- data_in_ = tintbin(Datagram::now,pos);
+ data_in_ = tintbin(NOW,pos);
if (last_recv_time_) {
- tint dip = Datagram::now - last_recv_time_;
+ tint dip = NOW - last_recv_time_;
dip_avg_ = ( dip_avg_*3 + dip ) >> 2;
}
return pos;
dprintf("%s #%i -ack (%i,%lli)\n",Datagram::TimeStr(),id,ackd_pos.layer(),ackd_pos.offset());
for (int i=0; i<8 && i<data_out_.size(); i++)
if (data_out_[i].bin.within(ackd_pos)) {
- tint rtt = Datagram::now-data_out_[i].time;
+ tint rtt = NOW-data_out_[i].time;
rtt_avg_ = (rtt_avg_*3 + rtt) >> 2;
dev_avg_ = ( dev_avg_*3 + abs(rtt-rtt_avg_) ) >> 2;
dprintf("%s #%i rtt %lli dev %lli\n",
uint32_t ipv4 = dgram.Pull32();
uint16_t port = dgram.Pull16();
Address addr(ipv4,port);
- //if (peer_selector)
- // peer_selector->AddPeer(Datagram::Address(addr,port),file().root_hash());
- file_->pex_in_.push_back(addr);
- if (file_->pex_in_.size()>1000)
- file_->pex_in_.pop_front();
- static int ENOUGH_PEERS_THRESHOLD = 20;
- if (file_->channel_count()<ENOUGH_PEERS_THRESHOLD) {
- int i = 0, chno;
- while ( (chno=file_->RevealChannel(i)) != -1 ) {
- if (channels[i]->peer()==addr)
- break;
- }
- if (chno==-1)
- new Channel(file_,socket_,addr);
- }
+ dprintf("%s #%i -pex %s\n",Datagram::TimeStr(),id,addr.str().c_str());
+ file_->OnPexIn(addr);
}
void Channel::AddPex (Datagram& dgram) {
- int ch = file().RevealChannel(this->pex_out_);
- if (ch==-1)
+ int chid = file_->RevealChannel(pex_out_);
+ if (chid==-1 || chid==id)
return;
- Address a = channels[ch]->peer();
+ Address a = channels[chid]->peer();
dgram.Push8(P2TP_PEX_ADD);
dgram.Push32(a.ipv4());
dgram.Push16(a.port());
+ dprintf("%s #%i +pex %s\n",Datagram::TimeStr(),id,a.str().c_str());
}
RETLOG("datagram shorter than 4 bytes");
uint32_t mych = data.Pull32();
Sha1Hash hash;
- Channel* channel;
+ Channel* channel = NULL;
if (!mych) { // handshake initiated
if (data.size()<1+4+1+4+Sha1Hash::SIZE)
RETLOG ("incorrect size initial handshake packet");
if (!file)
RETLOG ("hash unknown, no such file");
dprintf("%s #0 -hash ALL %s\n",Datagram::TimeStr(),hash.hex().c_str());
+ for(binqueue::iterator i=file->hs_in_.begin(); i!=file->hs_in_.end(); i++)
+ if (channels[*i] && channels[*i]->peer_==data.addr)
+ RETLOG("have a channel already");
channel = new Channel(file, socket, data.address());
} else {
mych = DecodeID(mych);
}
if (send_time>limit)
send_time = limit;
- if (sender && send_time<=Datagram::now) {
+ if (sender && send_time<=NOW) {
dprintf("%s #%i sch_send %s\n",Datagram::TimeStr(),sender->id,
Datagram::TimeStr(send_time));
sender->Send();
pop_heap(send_queue.begin(), send_queue.end(), tblater);
send_queue.pop_back();
} else {
- tint towait = send_time - Datagram::now;
+ tint towait = send_time - NOW;
dprintf("%s waiting %lliusec\n",Datagram::TimeStr(),towait);
int rd = Datagram::Wait(socket_count,sockets,towait);
if (rd!=-1)
if (send_queue.empty())
dprintf("%s empty send_queue\n", Datagram::TimeStr());
- while ( Datagram::now <= untiltime && !send_queue.empty() ) {
+ while ( NOW <= untiltime && !send_queue.empty() ) {
// BUG BUG BUG no scheduled sends => just listen
if (sender->next_send_time_!=next_send.time)
continue;
- if (wake_on<Datagram::now)
- wake_on = Datagram::now;
+ if (wake_on<NOW)
+ wake_on = NOW;
if (wake_on>untiltime)
wake_on = untiltime;
- tint towait = min(wake_on-Datagram::now,TINT_SEC);
+ tint towait = min(wake_on-NOW,TINT_SEC);
dprintf("%s waiting %lliusec\n",Datagram::TimeStr(),towait);
int rd = Datagram::Wait(socket_count,sockets,towait);
if (rd!=-1)
using namespace p2tp;
-/*TEST(P2TP, ConnectTest) {
-
- uint8_t buf[1024];
- int f = open("test_file",O_RDWR|O_CREAT|O_TRUNC,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
- for(char c='a'; c<='c'; c++) {
- memset(buf,c,1024);
- write(f,buf,1024);
- }
- close(f);
-
- struct sockaddr_in addr;
- addr.sin_family = AF_INET;
- addr.sin_port = htons(7001);
- addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
-
- int sock = p2tp::Init(7001);
- ASSERT_TRUE(0<sock);
-
- int file = p2tp::Open("test_file");
- p2tp::File& fileobj = * p2tp::File::file(file);
- int copy = p2tp::Open(fileobj.root_hash(),"test_file_copy");
- p2tp::File& copyobj = * p2tp::File::file(copy);
- int chan = p2tp::Connect(copy,sock,addr); // TRICK: will open a channel to the first file
- p2tp::Loop(p2tp::TINT_1SEC);
- //ASSERT_EQ(p2tp::Channel::channel(chan)->state(),p2tp::Channel::HS_DONE); FIXME: status
- ASSERT_EQ(p2tp::file_size(file),p2tp::file_size(copy));
- ASSERT_EQ(p2tp::File::DONE,copyobj.status());
- p2tp::Close(file);
- p2tp::Close(copy);
-
- p2tp::Shutdown(sock);
-
-}*/
-
TEST(P2TP,CwndTest) {
ASSERT_EQ(0,stat("doc/sofi.jpg", &st));
int size = st.st_size;//, sizek = (st.st_size>>10) + (st.st_size%1024?1:0) ;
- /*int f = open("big_test_file",O_RDWR|O_CREAT|O_TRUNC,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
- int size = 60<<10; //rand()%(1<<19) + (1<<19);
- int sizek = (size>>10) + ((size&1023)?1:0);
- char* b = (char*)malloc(size);
- for(int i=0; i<size; i++)
- b[i] = (i%1024!=1023) ? ('A' + rand()%('Z'-'A')) : ('\n');
- write(f,b,size);
- free(b);
- close(f);*/
-
- /*
- struct sockaddr_in addr1, addr2;
- addr1.sin_family = AF_INET;
- addr1.sin_port = htons(7003);
- addr1.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
-
- addr2.sin_family = AF_INET;
- addr2.sin_port = htons(7004);
- addr2.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
- */
int sock1 = p2tp::Listen(7001);
ASSERT_TRUE(sock1>=0);
- //ASSERT_TRUE(sock2>=0);
int file = p2tp::Open("doc/sofi.jpg");
FileTransfer* fileobj = FileTransfer::file(file);
FileTransfer::instance++;
- p2tp::SetTracker(Datagram::Address("127.0.0.1",7001));
+ p2tp::SetTracker(Address("127.0.0.1",7001));
int copy = p2tp::Open("doc/sofi-copy.jpg",fileobj->root_hash());
p2tp::Close(copy);
p2tp::Shutdown(sock1);
- //p2tp::Release(sock2);
}
int main (int argc, char** argv) {
- //bin::init();
- //bins::init();
google::InitGoogleLogging(argv[0]);
testing::InitGoogleTest(&argc, argv);
int ret = RUN_ALL_TESTS();
addr.sin_family = AF_INET;
addr.sin_port = htons(7001);
addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
- Datagram d(socket,addr); //Datagram::Address(7001));
+ Datagram d(socket,addr); //Address(7001));
const char * text = "text";
const uint8_t num8 = 0xab;
const uint16_t num16 = 0xabcd;
addr2.sin_port = htons(10002);
addr2.sin_addr.s_addr = htonl(INADDR_LOOPBACK);*/
- Datagram send(sock1,Datagram::Address(10002));
+ Datagram send(sock1,Address(10002));
send.Push32(1234);
send.Send();
FileTransfer::FileTransfer (const char* filename, const Sha1Hash& _root_hash) :
root_hash_(_root_hash), fd_(0), hashfd_(0), dry_run_(false),
peak_count_(0), hashes_(NULL), error_(NULL), size_(0), sizek_(0),
- complete_(0), completek_(0), seq_complete_(0) //, data_in_off_(0)
+ complete_(0), completek_(0), seq_complete_(0), hs_in_offset_(0)
{
fd_ = open(filename,O_RDWR|O_CREAT,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
if (fd_<0)
}
-void FileTransfer::OnPexIn (const Address& addr) {
- pex_in_.push_back(addr);
- if (pex_in_.size()>1000)
- pex_in_.pop_front();
-}
-
-
std::string FileTransfer::GetTempFilename(Sha1Hash& root_hash, int instance, std::string postfix)
{
std::string tempfile = gettmpdir();
}*/
-uint32_t FileTransfer::RevealChannel (int& offset) {
- while (offset<Channel::channels.size() &&
- (!Channel::channels[offset] || Channel::channels[offset]->file_!=this) )
- offset++;
- return offset < Channel::channels.size() ? offset : -1;
+void FileTransfer::OnPexIn (const Address& addr) {
+ for(int i=0; i<hs_in_.size(); i++) {
+ Channel* c = Channel::channels[hs_in_[i]];
+ if (c && c->file_==this && c->peer_==addr)
+ return; // already connected
+ }
+ if (hs_in_.size()<20) {
+ new Channel(this,Channel::sockets[0],addr);
+ } else {
+ pex_in_.push_back(addr);
+ if (pex_in_.size()>1000)
+ pex_in_.pop_front();
+ }
+}
+
+
+int FileTransfer::RevealChannel (int& pex_out_) {
+ pex_out_ -= hs_in_offset_;
+ if (pex_out_<0)
+ pex_out_ = 0;
+ while (pex_out_<hs_in_.size()) {
+ Channel* c = Channel::channels[hs_in_[pex_out_]];
+ if (c && c->file_==this) {
+ pex_out_ += hs_in_offset_ + 1;
+ return c->id;
+ } else {
+ hs_in_[pex_out_] = hs_in_[0];
+ hs_in_.pop_front();
+ hs_in_offset_++;
+ }
+ }
+ pex_out_ += hs_in_offset_;
+ return -1;
}