1 # Written by Arno Bakker, Bram Cohen, Jie Yang, George Milescu
2 # see LICENSE.txt for license information
4 # Please apply networking code fixes also to DialbackConnHandler.py
6 from cStringIO import StringIO
7 from struct import pack,unpack
8 from threading import currentThread
10 from traceback import print_exc,print_stack
13 from BaseLib.Core.BitTornado.BT1.MessageID import protocol_name,option_pattern,getMessageName
14 from BaseLib.Core.BitTornado.BT1.convert import tobinary,toint
15 from BaseLib.Core.BitTornado.__init__ import createPeerID
16 from BaseLib.Core.CacheDB.sqlitecachedb import safe_dict,bin2str
17 from BaseLib.Core.Overlay.permid import ChallengeResponse
18 from BaseLib.Core.Utilities.utilities import show_permid_short,hostname_or_ip2ip
19 from BaseLib.Core.simpledefs import *
26 overlay_infohash = '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
28 # Overlay-protocol version numbers in use in the wild
29 OLPROTO_VER_FIRST = 1 # Internally used only.
30 OLPROTO_VER_SECOND = 2 # First public release, >= 3.3.4
31 OLPROTO_VER_THIRD = 3 # Second public release, >= 3.6.0, Dialback, BuddyCast2
32 OLPROTO_VER_FOURTH = 4 # Third public release, >= 3.7.0, BuddyCast3
33 OLPROTO_VER_FIFTH = 5 # Fourth public release, >= 4.0.0, SOCIAL_OVERLAP
34 OLPROTO_VER_SIXTH = 6 # Fifth public release, >= 4.1.0, extra BC fields, remote query
35 OLPROTO_VER_SEVENTH = 7 # Sixth public release, >= 4.5.0, supports CRAWLER_REQUEST and CRAWLER_REPLY messages
36 OLPROTO_VER_EIGHTH = 8 # Seventh public release, >= 5.0, supporting BuddyCast with clicklog info.
37 OLPROTO_VER_NINETH = 9 # Eighth public release, >= 5.1, additional torrent_size in remote search query reply.
38 OLPROTO_VER_TENTH = 10 # Nineth public release, M18, simplified the VOD statistics (this code is not likely to be used in public, but still).
39 OLPROTO_VER_ELEVENTH = 11 # Tenth public release, trial M23, swarm size info part of BC message
40 OLPROTO_VER_TWELFTH = 12 # 11th public release M24, SIMPLE+METADATA query + ChannelCast BASE64.
41 OLPROTO_VER_THIRTEENTH = 13 # 12th public release >= 5.2, ChannelCast binary.
42 OLPROTO_VER_FOURTEENTH = 14 # 13th public release >= M30, ProxyService + Subtitle dissemination through ChannelCast + SUBS and GET_SUBS messages
44 # Overlay-swarm protocol version numbers
45 OLPROTO_VER_CURRENT = OLPROTO_VER_FOURTEENTH
47 OLPROTO_VER_LOWEST = OLPROTO_VER_SECOND
48 SupportedVersions = range(OLPROTO_VER_LOWEST, OLPROTO_VER_CURRENT+1)
54 # States for overlay connection
56 STATE_HS_FULL_WAIT = 1
57 STATE_HS_PEERID_WAIT = 2
63 EXPIRE_THRESHOLD = 300 # seconds:: keep consistent with sockethandler
64 EXPIRE_CHECK_INTERVAL = 60 # seconds
65 NO_REMOTE_LISTEN_PORT_KNOWN = -481
72 if SecureOverlay.__single:
73 raise RuntimeError, "SecureOverlay is Singleton"
74 SecureOverlay.__single = self
75 self.olproto_ver_current = OLPROTO_VER_CURRENT
76 self.usermsghandler = None
77 self.userconnhandler = None
78 # ARNOCOMMENT: Remove this, DB should be fast enough. Don't want caches allover
79 self.dns = safe_dict()
83 # Interface for upper layer
85 def getInstance(*args, **kw):
86 if SecureOverlay.__single is None:
87 SecureOverlay(*args, **kw)
88 return SecureOverlay.__single
89 getInstance = staticmethod(getInstance)
91 def register(self,launchmanycore, max_len):
92 self.lm = launchmanycore
93 self.rawserver = self.lm.rawserver
94 self.sock_hand = self.rawserver.sockethandler
95 self.multihandler = self.lm.multihandler
96 self.overlay_rawserver = self.multihandler.newRawServer(overlay_infohash,
97 self.rawserver.doneflag,
99 self.max_len = max_len
100 self.iplport2oc = {} # (IP,listen port) -> OverlayConnection
101 self.peer_db = self.lm.peer_db
102 self.mykeypair = self.lm.session.keypair
103 self.permid = self.lm.session.get_permid()
104 self.myip = self.lm.get_ext_ip()
105 self.myport = self.lm.session.get_listen_port()
106 self.myid = create_my_peer_id(self.myport)
108 # 25/01/10 boudewijn: because there is no 'keep alive' message
109 # the last_activity check is prone to get false positives.
110 # The higher-ups decided that this feature should be removed
112 # self.last_activity = time()
114 def resetSingleton(self):
115 """ For testing purposes """
116 SecureOverlay.__single = None
118 def start_listening(self):
119 self.overlay_rawserver.start_listening(self)
120 # self.overlay_rawserver.add_task(self.secover_mon_netwact, 2)
122 # 25/01/10 boudewijn: because there is no 'keep alive' message the
123 # last_activity check is prone to get false positives. The
124 # higher-ups decided that this feature should be removed entirely.
125 # def secover_mon_netwact(self):
127 # periodically notify the network status
129 # diff = time() - self.last_activity
131 # # 120 is set as the check_period for buddycast until a
132 # # KEEP_ALIVE message is send
135 # msg = "network active"
136 # self.lm.set_activity(NTFY_ACT_ACTIVE, msg, diff)
137 # self.overlay_rawserver.add_task(self.secover_mon_netwact, 2)
139 def connect_dns(self,dns,callback):
140 """ Connects to the indicated endpoint and determines the permid
141 at that endpoint. Non-blocking.
143 Pre: "dns" must be an IP address, not a hostname.
145 Network thread calls "callback(exc,dns,permid,selver)" when the connection
146 is established or when an error occurs during connection
147 establishment. In the former case, exc is None, otherwise
148 it contains an Exception.
150 The established connection will auto close after EXPIRE_THRESHOLD
151 seconds of inactivity.
154 print >> sys.stderr,"secover: connect_dns",dns
155 # To prevent concurrency problems on sockets the calling thread
156 # delegates to the network thread.
157 task = Task(self._connect_dns,dns,callback)
158 self.rawserver.add_task(task.start, 0)
161 def connect(self,permid,callback):
162 """ Connects to the indicated permid. Non-blocking.
164 Network thread calls "callback(exc,dns,permid,selver)" when the connection
165 is established or when an error occurs during connection
166 establishment. In the former case, exc is None, otherwise
167 it contains an Exception.
169 The established connection will auto close after EXPIRE_THRESHOLD
170 seconds of inactivity.
173 print >> sys.stderr,"secover: connect",show_permid_short(permid), currentThread().getName()
174 # To prevent concurrency problems on sockets the calling thread
175 # delegates to the network thread.
177 dns = self.get_dns_from_peerdb(permid)
178 task = Task(self._connect,permid,dns,callback)
181 print >> sys.stderr,"secover: connect",show_permid_short(permid),"currently at",dns
183 self.rawserver.add_task(task.start, 0)
186 def send(self,permid,msg,callback):
187 """ Sends a message to the indicated permid. Non-blocking.
189 Pre: connection to permid must have been established successfully.
191 Network thread calls "callback(exc,permid)" when the message is sent
192 or when an error occurs during sending. In the former case, exc
193 is None, otherwise it contains an Exception.
195 # To prevent concurrency problems on sockets the calling thread
196 # delegates to the network thread.
197 dns = self.get_dns_from_peerdb(permid)
198 task = Task(self._send,permid,dns,msg,callback)
199 self.rawserver.add_task(task.start, 0)
203 def close(self,permid):
204 """ Closes any connection to indicated permid. Non-blocking.
206 Pre: connection to permid must have been established successfully.
208 Network thread calls "callback(exc,permid,selver)" when the connection
211 # To prevent concurrency problems on sockets the calling thread
212 # delegates to the network thread.
213 task = Task(self._close,permid)
214 self.rawserver.add_task(task.start, 0)
217 def register_recv_callback(self,callback):
218 """ Register a callback to be called when receiving a message from
219 any permid. Non-blocking.
221 Network thread calls "callback(exc,permid,selver,msg)" when a message
222 is received. The callback is not called on errors e.g. remote
225 The callback must return True to keep the connection open.
227 self.usermsghandler = callback
229 def register_conns_callback(self,callback):
230 """ Register a callback to be called when receiving a connection from
231 any permid. Non-blocking.
233 Network thread calls "callback(exc,permid,selver,locally_initiated)"
234 when a connection is established (locally initiated or remote), or
235 when a connection is closed locally or remotely. In the former case,
236 exc is None, otherwise it contains an Exception.
238 Note that this means that if a callback is registered via this method,
239 both this callback and the callback passed to a connect() method
242 self.userconnhandler = callback
248 def _connect_dns(self,dns,callback):
251 print >> sys.stderr,"secover: actual connect_dns",dns
252 if dns[0] == self.myip and int(dns[1]) == self.myport:
253 callback(KeyError('IP and port of the target is the same as myself'),dns,None,0)
254 iplport = ip_and_port2str(dns[0],dns[1])
257 oc = self.iplport2oc[iplport]
261 oc = self.start_connection(dns)
262 self.iplport2oc[iplport] = oc
263 if not oc.is_auth_done():
264 oc.queue_callback(dns,callback)
266 callback(None,dns,oc.get_auth_permid(),oc.get_sel_proto_ver())
267 except Exception,exc:
270 callback(exc,dns,None,0)
272 def _connect(self,expectedpermid,dns,callback):
274 print >> sys.stderr,"secover: actual connect",show_permid_short(expectedpermid), currentThread().getName()
275 if expectedpermid == self.permid:
276 callback(KeyError('The target permid is the same as my permid'),None,expectedpermid,0)
278 oc = self.get_oc_by_permid(expectedpermid)
281 callback(KeyError('IP address + port for permid unknown'),dns,expectedpermid,0)
283 self._connect_dns(dns,lambda exc,dns2,peerpermid,selver:\
284 self._whoishe_callback(exc,dns2,peerpermid,selver,expectedpermid,callback))
286 # We already have a connection to this permid
287 self._whoishe_callback(None,(oc.get_ip(),oc.get_auth_listen_port()),expectedpermid,oc.get_sel_proto_ver(),expectedpermid,callback)
288 except Exception,exc:
291 callback(exc,None,expectedpermid,0)
293 def _whoishe_callback(self,exc,dns,peerpermid,selver,expectedpermid,callback):
294 """ Called by network thread after the permid on the other side is known
300 if peerpermid == expectedpermid:
301 callback(None,dns,expectedpermid,selver)
303 # Someone else answered the phone
304 callback(KeyError('Recorded IP address + port now of other permid'),
305 dns,expectedpermid,0)
307 callback(exc,dns,expectedpermid,0)
308 except Exception,exc:
311 callback(exc,dns,expectedpermid,0)
313 def _send(self,permid,dns,message,callback):
315 print >> sys.stderr,"secover: actual send",getMessageName(message[0]),\
316 "to",show_permid_short(permid), currentThread().getName()
319 callback(KeyError('IP address + port for permid unknown'),permid)
321 iplport = ip_and_port2str(dns[0],dns[1])
324 oc = self.iplport2oc[iplport]
328 callback(KeyError('Not connected to permid'),permid)
329 elif oc.is_auth_done():
330 if oc.get_auth_permid() == permid:
331 oc.send_message(message)
332 callback(None,permid)
334 callback(KeyError('Recorded IP address + port now of other permid'),permid)
336 callback(KeyError('Connection not yet established'),permid)
337 except Exception,exc:
343 def _close(self,permid):
345 print >> sys.stderr,"secover: actual close",show_permid_short(permid)
347 oc = self.get_oc_by_permid(permid)
350 print >> sys.stderr,"secover: error - actual close, but no connection to peer in admin"
357 # Interface for SocketHandler
359 def get_handler(self):
362 def external_connection_made(self,singsock):
363 """ incoming connection (never used) """
365 print >> sys.stderr,"secover: external_connection_made",singsock.get_ip(),singsock.get_port()
366 # self.last_activity = time()
367 oc = OverlayConnection(self,singsock,self.rawserver)
368 singsock.set_handler(oc)
370 def connection_flushed(self,singsock):
371 """ sockethandler flushes connection """
373 print >> sys.stderr,"secover: connection_flushed",singsock.get_ip(),singsock.get_port()
376 # Interface for ServerPortHandler
378 def externally_handshaked_connection_made(self, singsock, options, msg_remainder):
379 """ incoming connection, handshake partially read to identity
380 as an it as overlay connection (used always)
383 print >> sys.stderr,"secover: externally_handshaked_connection_made",\
384 singsock.get_ip(),singsock.get_port()
385 oc = OverlayConnection(self,singsock,self.rawserver,ext_handshake = True, options = options)
386 singsock.set_handler(oc)
388 oc.data_came_in(singsock,msg_remainder)
393 # Interface for OverlayConnection
395 def got_auth_connection(self,oc):
396 """ authentication of peer via identity protocol succesful """
398 print >> sys.stderr,"secover: got_auth_connection", \
399 show_permid_short(oc.get_auth_permid()),oc.get_ip(),oc.get_auth_listen_port(), currentThread().getName()
401 if oc.is_locally_initiated() and oc.get_port() != oc.get_auth_listen_port():
403 print >> sys.stderr,"secover: got_auth_connection: closing because auth", \
404 "listen port not as expected",oc.get_port(),oc.get_auth_listen_port()
405 self.cleanup_admin_and_callbacks(oc,Exception('closing because auth listen port not as expected'))
408 # self.last_activity = time()
411 iplport = ip_and_port2str(oc.get_ip(),oc.get_auth_listen_port())
412 known = iplport in self.iplport2oc
414 self.iplport2oc[iplport] = oc
415 elif known and not oc.is_locally_initiated():
416 # Locally initiated connections will already be registered,
417 # so if it's not a local connection and we already have one
418 # we have a duplicate, and we close the new one.
420 print >> sys.stderr,"secover: got_auth_connection:", \
421 "closing because we already have a connection to",iplport
422 self.cleanup_admin_and_callbacks(oc,
423 Exception('closing because we already have a connection to peer'))
427 if oc.is_auth_done():
428 hisdns = (oc.get_ip(),oc.get_auth_listen_port())
433 # print >>sys.stderr,"secover: userconnhandler is",self.userconnhandler
435 if self.userconnhandler is not None:
437 self.userconnhandler(None,oc.get_auth_permid(),oc.get_sel_proto_ver(),oc.is_locally_initiated(),hisdns)
441 oc.dequeue_callbacks()
444 def local_close(self,oc):
445 """ our side is closing the connection """
447 print >> sys.stderr,"secover: local_close"
448 self.cleanup_admin_and_callbacks(oc,CloseException('local close',oc.is_auth_done()))
450 def connection_lost(self,oc):
451 """ overlay connection telling us to clear admin """
453 print >> sys.stderr,"secover: connection_lost"
454 self.cleanup_admin_and_callbacks(oc,CloseException('connection lost',oc.is_auth_done()))
457 def got_message(self,permid,message,selversion):
458 """ received message from authenticated peer, pass to upper layer """
460 print >> sys.stderr,"secover: got_message",getMessageName(message[0]),\
462 # self.last_activity = time()
463 if self.usermsghandler is None:
465 print >> sys.stderr,"secover: User receive callback not set"
470 # print >>sys.stderr,"secover: usermsghandler is",self.usermsghandler
472 ret = self.usermsghandler(permid,selversion,message)
475 print >> sys.stderr,"secover: INTERNAL ERROR:", \
476 "User receive callback returned None, not True or False"
479 print >> sys.stderr,"secover: message handler returned",ret
487 def get_max_len(self):
490 def get_my_peer_id(self):
493 def get_my_keypair(self):
494 return self.mykeypair
496 def measurefunc(self,length):
500 # Interface for OverlayThreadingBridge
502 def get_dns_from_peerdb(self,permid,use_cache=True):
503 # Called by any thread, except NetworkThread
505 if currentThread().getName().startswith("NetworkThread"):
506 print >>sys.stderr,"secover: get_dns_from_peerdb: called by NetworkThread!"
509 dns = self.dns.get(permid, None)
512 values = ('ip', 'port')
513 peer = self.peer_db.getOne(values, permid=bin2str(permid))
514 if peer and peer[0] and peer[1]:
515 ip = hostname_or_ip2ip(peer[0])
516 dns = (ip, int(peer[1]))
519 def add_peer_to_db(self,permid,dns,selversion):
520 """ add a connected peer to database """
521 # Called by OverlayThread
523 if currentThread().getName().startswith("NetworkThread"):
524 print >>sys.stderr,"secover: add_peer_to_peerdb: called by NetworkThread!"
527 print >>sys.stderr,"secover: add_peer_to_peerdb: called by",currentThread().getName()
529 self.dns[permid] = dns # cache it to avoid querying db later
531 peer_data = {'permid':permid, 'ip':dns[0], 'port':dns[1], 'oversion':selversion, 'last_seen':now, 'last_connected':now}
532 self.peer_db.addPeer(permid, peer_data, update_dns=True, update_connected=True, commit=True)
533 #self.peer_db.updateTimes(permid, 'connected_times', 1, commit=True)
536 def update_peer_status(self,permid,authwasdone):
537 """ update last_seen and last_connected in peer db when close """
538 # Called by OverlayThread
540 if currentThread().getName().startswith("NetworkThread"):
541 print >>sys.stderr,"secover: update_peer_status: called by NetworkThread!"
546 self.peer_db.updatePeer(permid, last_seen=now, last_connected=now)
547 self.lm.session.uch.notify(NTFY_PEERS, NTFY_CONNECTION, permid, False)
549 # Interface for debugging
551 def debug_get_live_connections(self):
552 """ return a list of (permid,dns) tuples of the peers with which we
553 are connected. Like all methods here it must be called by the network thread
556 for iplport in self.iplport2oc:
557 oc = self.iplport2oc[iplport]
559 peer_permid = oc.get_auth_permid()
561 live_conn.append((peer_permid,(oc.get_ip(),oc.get_port())))
568 def start_connection(self,dns):
570 print >> sys.stderr,"secover: Attempt to connect to",dns
571 singsock = self.sock_hand.start_connection(dns)
572 oc = OverlayConnection(self,singsock,self.rawserver,
573 locally_initiated=True,specified_dns=dns)
574 singsock.set_handler(oc)
577 def cleanup_admin_and_callbacks(self,oc,exc):
578 oc.cleanup_callbacks(exc)
579 self.cleanup_admin(oc)
580 if oc.is_auth_done() and self.userconnhandler is not None:
581 self.userconnhandler(exc,oc.get_auth_permid(),oc.get_sel_proto_ver(),
582 oc.is_locally_initiated(),None)
584 def cleanup_admin(self,oc):
587 for key in self.iplport2oc.keys():
588 #print "***** iplport2oc:", key, self.iplport2oc[key]
589 if self.iplport2oc[key] == oc:
590 del self.iplport2oc[key]
591 #print "*****!!! del", key, oc
594 def get_oc_by_permid(self, permid):
595 """ return the OverlayConnection instance given a permid """
597 for iplport in self.iplport2oc:
598 oc = self.iplport2oc[iplport]
599 if oc.get_auth_permid() == permid:
606 def __init__(self,method,*args, **kwargs):
613 print >> sys.stderr,"secover: task: start",self.method
615 self.method(*self.args,**self.kwargs)
618 class CloseException(Exception):
619 def __init__(self,msg=None,authdone=False):
620 Exception.__init__(self,msg)
621 self.authdone= authdone
624 return str(self.__class__)+': '+Exception.__str__(self)
626 def was_auth_done(self):
630 class OverlayConnection:
631 def __init__(self,handler,singsock,rawserver,locally_initiated = False,
632 specified_dns = None, ext_handshake = False,options = None):
633 self.handler = handler
634 self.singsock = singsock # for writing
635 self.rawserver = rawserver
636 self.buffer = StringIO()
638 self.auth_permid = None
639 self.unauth_peer_id = None
640 self.auth_peer_id = None
641 self.auth_listen_port = None
642 self.low_proto_ver = 0
643 self.cur_proto_ver = 0
644 self.sel_proto_ver = 0
646 self.locally_initiated = locally_initiated
647 self.specified_dns = specified_dns
648 self.last_use = time()
650 self.state = STATE_INITIAL
651 self.write(chr(len(protocol_name)) + protocol_name +
652 option_pattern + overlay_infohash + self.handler.get_my_peer_id())
654 self.state = STATE_HS_PEERID_WAIT
656 self.next_func = self.read_peer_id
657 self.set_options(options)
659 self.state = STATE_HS_FULL_WAIT
661 self.next_func = self.read_header_len
663 # Leave autoclose here instead of SecureOverlay, as that doesn't record
664 # remotely-initiated OverlayConnections before authentication is done.
665 self.rawserver.add_task(self._olconn_auto_close, EXPIRE_CHECK_INTERVAL)
668 # Interface for SocketHandler
670 def data_came_in(self, singsock, data):
671 """ sockethandler received data """
672 # now we got something we can ask for the peer's real port
673 dummy_port = singsock.get_port(True)
676 print >> sys.stderr,"olconn: data_came_in",singsock.get_ip(),singsock.get_port()
677 self.handler.measurefunc(len(data))
678 self.last_use = time()
680 if self.state == STATE_CLOSED:
682 i = self.next_len - self.buffer.tell()
684 self.buffer.write(data)
686 self.buffer.write(data[:i])
688 m = self.buffer.getvalue()
690 self.buffer.truncate()
693 print >> sys.stderr,"olconn: Trying to read",self.next_len #,"using",self.next_func
694 x = self.next_func(m)
696 self.next_len, self.next_func = 1, self.read_dead
702 print >> sys.stderr,"olconn: next_func returned None",self.next_func
705 self.next_len, self.next_func = x
707 def connection_lost(self,singsock):
708 """ kernel or socket handler reports connection lost """
710 print >> sys.stderr,"olconn: connection_lost",singsock.get_ip(),singsock.get_port(),self.state
711 if self.state != STATE_CLOSED:
712 self.state = STATE_CLOSED
713 self.handler.connection_lost(self)
715 def connection_flushed(self,singsock):
716 """ sockethandler flushes connection """
720 # Interface for SecureOverlay
722 def send_message(self,message):
723 self.last_use = time()
724 s = tobinary(len(message))+message
727 def is_locally_initiated(self):
728 return self.locally_initiated
731 return self.singsock.get_ip()
734 return self.singsock.get_port()
736 def is_auth_done(self):
737 return self.auth_permid is not None
739 def get_auth_permid(self):
740 return self.auth_permid
742 def get_auth_listen_port(self):
743 return self.auth_listen_port
745 def get_remote_listen_port(self):
746 if self.is_auth_done():
747 return self.auth_listen_port
748 elif self.is_locally_initiated():
749 return self.specified_dns[1]
751 return NO_REMOTE_LISTEN_PORT_KNOWN
753 def get_low_proto_ver(self):
754 return self.low_proto_ver
756 def get_cur_proto_ver(self):
757 return self.cur_proto_ver
759 def get_sel_proto_ver(self):
760 return self.sel_proto_ver
762 def queue_callback(self,dns,callback):
763 if callback is not None:
764 self.cb_queue.append(callback)
766 def dequeue_callbacks(self):
768 permid = self.get_auth_permid()
769 for callback in self.cb_queue:
770 callback(None,self.specified_dns,permid,self.get_sel_proto_ver())
776 def cleanup_callbacks(self,exc):
778 print >> sys.stderr,"olconn: cleanup_callbacks: #callbacks is",len(self.cb_queue)
780 for callback in self.cb_queue:
781 ## Failure connecting
783 print >> sys.stderr,"olconn: cleanup_callbacks: callback is",callback
784 callback(exc,self.specified_dns,self.get_auth_permid(),0)
789 # Interface for ChallengeResponse
791 def get_unauth_peer_id(self):
792 return self.unauth_peer_id
794 def got_auth_connection(self,singsock,permid,peer_id):
795 """ authentication of peer via identity protocol succesful """
796 self.auth_permid = str(permid)
797 self.auth_peer_id = peer_id
798 self.auth_listen_port = decode_auth_listen_port(peer_id)
800 self.state = STATE_DATA_WAIT
802 if not self.handler.got_auth_connection(self):
809 def read_header_len(self, s):
810 if ord(s) != len(protocol_name):
812 return len(protocol_name), self.read_header
814 def read_header(self, s):
815 if s != protocol_name:
817 return 8, self.read_reserved
819 def read_reserved(self, s):
821 print >> sys.stderr,"olconn: Reserved bits:", `s`
823 return 20, self.read_download_id
825 def read_download_id(self, s):
826 if s != overlay_infohash:
828 return 20, self.read_peer_id
830 def read_peer_id(self, s):
831 self.unauth_peer_id = s
833 [self.low_proto_ver,self.cur_proto_ver] = get_proto_version_from_peer_id(self.unauth_peer_id)
834 self.sel_proto_ver = select_supported_protoversion(self.low_proto_ver,self.cur_proto_ver)
835 if not self.sel_proto_ver:
837 print >> sys.stderr,"olconn: We don't support peer's version of the protocol"
840 print >> sys.stderr,"olconn: Selected protocol version",self.sel_proto_ver
842 if self.cur_proto_ver <= 2:
843 # Arno, 2010-02-04: Kick TorrentSwapper clones, still around
844 print >>sys.stderr,"olconn: Kicking ancient peer",`self.unauth_peer_id`,self.get_ip()
847 self.state = STATE_AUTH_WAIT
848 self.cr = ChallengeResponse(self.handler.get_my_keypair(),self.handler.get_my_peer_id(),self)
849 if self.locally_initiated:
850 self.cr.start_cr(self)
851 return 4, self.read_len
854 def read_len(self, s):
856 if l > self.handler.get_max_len():
858 return l, self.read_message
860 def read_message(self, s):
862 if self.state == STATE_AUTH_WAIT:
863 if not self.cr.got_message(self,s):
865 elif self.state == STATE_DATA_WAIT:
866 if not self.handler.got_message(self.auth_permid,s,self.sel_proto_ver):
870 print >> sys.stderr,"olconn: Received message while in illegal state, internal error!"
872 return 4, self.read_len
874 def read_dead(self, s):
878 self.singsock.write(s)
880 def set_options(self,options):
881 self.options = options
885 print >> sys.stderr,"olconn: we close()",self.get_ip(),self.get_port()
887 self.state_when_error = self.state
888 if self.state != STATE_CLOSED:
889 self.state = STATE_CLOSED
890 self.handler.local_close(self)
891 self.singsock.close()
894 def _olconn_auto_close(self):
895 if (time() - self.last_use) > EXPIRE_THRESHOLD:
898 self.rawserver.add_task(self._olconn_auto_close, EXPIRE_CHECK_INTERVAL)
904 def create_my_peer_id(my_listen_port):
905 myid = createPeerID()
906 myid = myid[:16] + pack('<H', OLPROTO_VER_LOWEST) + pack('<H', OLPROTO_VER_CURRENT)
907 myid = myid[:14] + pack('<H', my_listen_port) + myid[16:]
910 def get_proto_version_from_peer_id(peerid):
911 """ overlay swarm versioning solution- use last 4 bytes in PeerID """
913 low_ver_str = peerid[16:18]
914 cur_ver_str = peerid[18:20]
915 low_ver = unpack('<H', low_ver_str)[0]
916 cur_ver = unpack('<H', cur_ver_str)[0]
917 return [low_ver,cur_ver]
919 def is_proto_version_supported(low_ver,cur_ver):
920 if cur_ver != OLPROTO_VER_CURRENT:
921 if low_ver > OLPROTO_VER_CURRENT: # the other's version is too high
923 if cur_ver < OLPROTO_VER_LOWEST: # the other's version is too low
925 if cur_ver < OLPROTO_VER_CURRENT and \
926 cur_ver not in SupportedVersions: # the other's version is not supported
930 def select_supported_protoversion(his_low_ver,his_cur_ver):
932 if his_cur_ver != OLPROTO_VER_CURRENT:
933 if his_low_ver > OLPROTO_VER_CURRENT: # the other's low version is too high
935 if his_cur_ver < OLPROTO_VER_LOWEST: # the other's current version is too low
937 if his_cur_ver < OLPROTO_VER_CURRENT and \
938 his_cur_ver not in SupportedVersions: # the other's current version is not supported (peer of this version is abondoned)
941 selected = min(his_cur_ver,OLPROTO_VER_CURRENT)
944 def decode_auth_listen_port(peerid):
946 tup = unpack('<H', bin)
949 def ip_and_port2str(ip,port):
950 return ip+':'+str(port)