instrumentation: add next-share/
[cs-p2p-next.git] / instrumentation / next-share / BaseLib / Core / Overlay / SecureOverlay.py
1 # Written by Arno Bakker, Bram Cohen, Jie Yang, George Milescu
2 # see LICENSE.txt for license information
3 #
4 # Please apply networking code fixes also to DialbackConnHandler.py
5
6 from cStringIO import StringIO
7 from struct import pack,unpack
8 from threading import currentThread
9 from time import time
10 from traceback import print_exc,print_stack
11 import sys
12
13 from BaseLib.Core.BitTornado.BT1.MessageID import protocol_name,option_pattern,getMessageName
14 from BaseLib.Core.BitTornado.BT1.convert import tobinary,toint
15 from BaseLib.Core.BitTornado.__init__ import createPeerID
16 from BaseLib.Core.CacheDB.sqlitecachedb import safe_dict,bin2str
17 from BaseLib.Core.Overlay.permid import ChallengeResponse
18 from BaseLib.Core.Utilities.utilities import show_permid_short,hostname_or_ip2ip
19 from BaseLib.Core.simpledefs import *
20
21 DEBUG = False
22
23 #
24 # Public definitions
25 #
26 overlay_infohash = '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
27
28 # Overlay-protocol version numbers in use in the wild
29 OLPROTO_VER_FIRST  = 1  # Internally used only.
30 OLPROTO_VER_SECOND = 2  # First public release, >= 3.3.4
31 OLPROTO_VER_THIRD  = 3  # Second public release, >= 3.6.0, Dialback, BuddyCast2
32 OLPROTO_VER_FOURTH = 4  # Third public release, >= 3.7.0, BuddyCast3
33 OLPROTO_VER_FIFTH = 5   # Fourth public release, >= 4.0.0, SOCIAL_OVERLAP
34 OLPROTO_VER_SIXTH = 6   # Fifth public release, >= 4.1.0, extra BC fields, remote query
35 OLPROTO_VER_SEVENTH = 7 # Sixth public release, >= 4.5.0, supports CRAWLER_REQUEST and CRAWLER_REPLY messages
36 OLPROTO_VER_EIGHTH = 8  # Seventh public release, >= 5.0, supporting BuddyCast with clicklog info.
37 OLPROTO_VER_NINETH = 9  # Eighth public release, >= 5.1, additional torrent_size in remote search query reply.
38 OLPROTO_VER_TENTH = 10  # Nineth public release, M18, simplified the VOD statistics (this code is not likely to be used in public, but still).
39 OLPROTO_VER_ELEVENTH = 11  # Tenth public release, trial M23, swarm size info part of BC message
40 OLPROTO_VER_TWELFTH = 12  # 11th public release M24, SIMPLE+METADATA query + ChannelCast BASE64.
41 OLPROTO_VER_THIRTEENTH = 13 # 12th public release >= 5.2, ChannelCast binary.
42 OLPROTO_VER_FOURTEENTH = 14 # 13th public release >= M30, ProxyService + Subtitle dissemination through ChannelCast + SUBS and GET_SUBS messages 
43
44 # Overlay-swarm protocol version numbers
45 OLPROTO_VER_CURRENT = OLPROTO_VER_FOURTEENTH
46
47 OLPROTO_VER_LOWEST = OLPROTO_VER_SECOND
48 SupportedVersions = range(OLPROTO_VER_LOWEST, OLPROTO_VER_CURRENT+1)
49
50 #
51 # Private definitions
52 #
53
54 # States for overlay connection
55 STATE_INITIAL = 0
56 STATE_HS_FULL_WAIT = 1
57 STATE_HS_PEERID_WAIT = 2
58 STATE_AUTH_WAIT = 3
59 STATE_DATA_WAIT = 4
60 STATE_CLOSED = 5
61
62 # Misc
63 EXPIRE_THRESHOLD =      300    # seconds::  keep consistent with sockethandler
64 EXPIRE_CHECK_INTERVAL = 60     # seconds
65 NO_REMOTE_LISTEN_PORT_KNOWN = -481
66
67
68 class SecureOverlay:
69     __single = None
70
71     def __init__(self):
72         if SecureOverlay.__single:
73             raise RuntimeError, "SecureOverlay is Singleton"
74         SecureOverlay.__single = self 
75         self.olproto_ver_current = OLPROTO_VER_CURRENT
76         self.usermsghandler = None
77         self.userconnhandler = None
78         # ARNOCOMMENT: Remove this, DB should be fast enough. Don't want caches allover
79         self.dns = safe_dict()
80
81        
82     #
83     # Interface for upper layer
84     #
85     def getInstance(*args, **kw):
86         if SecureOverlay.__single is None:
87             SecureOverlay(*args, **kw)
88         return SecureOverlay.__single
89     getInstance = staticmethod(getInstance)
90
91     def register(self,launchmanycore, max_len):
92         self.lm = launchmanycore
93         self.rawserver = self.lm.rawserver
94         self.sock_hand = self.rawserver.sockethandler
95         self.multihandler = self.lm.multihandler
96         self.overlay_rawserver = self.multihandler.newRawServer(overlay_infohash, 
97                                                                 self.rawserver.doneflag,
98                                                                 protocol_name)
99         self.max_len = max_len
100         self.iplport2oc = {}    # (IP,listen port) -> OverlayConnection
101         self.peer_db = self.lm.peer_db
102         self.mykeypair = self.lm.session.keypair
103         self.permid = self.lm.session.get_permid()
104         self.myip = self.lm.get_ext_ip()
105         self.myport = self.lm.session.get_listen_port()
106         self.myid = create_my_peer_id(self.myport)
107
108         # 25/01/10 boudewijn: because there is no 'keep alive' message
109         # the last_activity check is prone to get false positives.
110         # The higher-ups decided that this feature should be removed
111         # entirely.
112         # self.last_activity = time()
113
114     def resetSingleton(self):
115         """ For testing purposes """
116         SecureOverlay.__single = None 
117
118     def start_listening(self):
119         self.overlay_rawserver.start_listening(self)
120         # self.overlay_rawserver.add_task(self.secover_mon_netwact, 2)
121
122     # 25/01/10 boudewijn: because there is no 'keep alive' message the
123     # last_activity check is prone to get false positives.  The
124     # higher-ups decided that this feature should be removed entirely.
125     # def secover_mon_netwact(self):
126     #     """
127     #     periodically notify the network status
128     #     """
129     #     diff = time() - self.last_activity
130     #     if diff > 120 + 1:
131     #         # 120 is set as the check_period for buddycast until a
132     #         # KEEP_ALIVE message is send
133     #         msg = "no network"
134     #     else:
135     #         msg = "network active"
136     #     self.lm.set_activity(NTFY_ACT_ACTIVE, msg, diff)
137     #     self.overlay_rawserver.add_task(self.secover_mon_netwact, 2)
138
139     def connect_dns(self,dns,callback):
140         """ Connects to the indicated endpoint and determines the permid 
141             at that endpoint. Non-blocking. 
142             
143             Pre: "dns" must be an IP address, not a hostname.
144             
145             Network thread calls "callback(exc,dns,permid,selver)" when the connection
146             is established or when an error occurs during connection 
147             establishment. In the former case, exc is None, otherwise
148             it contains an Exception.
149
150             The established connection will auto close after EXPIRE_THRESHOLD
151             seconds of inactivity.
152         """
153         if DEBUG:
154             print >> sys.stderr,"secover: connect_dns",dns
155         # To prevent concurrency problems on sockets the calling thread 
156         # delegates to the network thread.
157         task = Task(self._connect_dns,dns,callback)
158         self.rawserver.add_task(task.start, 0)
159
160
161     def connect(self,permid,callback):
162         """ Connects to the indicated permid. Non-blocking.
163             
164             Network thread calls "callback(exc,dns,permid,selver)" when the connection
165             is established or when an error occurs during connection 
166             establishment. In the former case, exc is None, otherwise
167             it contains an Exception.
168
169             The established connection will auto close after EXPIRE_THRESHOLD
170             seconds of inactivity.
171         """
172         if DEBUG:
173             print >> sys.stderr,"secover: connect",show_permid_short(permid), currentThread().getName()
174         # To prevent concurrency problems on sockets the calling thread 
175         # delegates to the network thread.
176         
177         dns = self.get_dns_from_peerdb(permid)
178         task = Task(self._connect,permid,dns,callback)
179
180         if DEBUG:
181             print >> sys.stderr,"secover: connect",show_permid_short(permid),"currently at",dns
182         
183         self.rawserver.add_task(task.start, 0)
184
185
186     def send(self,permid,msg,callback):
187         """ Sends a message to the indicated permid. Non-blocking.
188             
189             Pre: connection to permid must have been established successfully.
190
191             Network thread calls "callback(exc,permid)" when the message is sent
192             or when an error occurs during sending. In the former case, exc 
193             is None, otherwise it contains an Exception.
194         """
195         # To prevent concurrency problems on sockets the calling thread 
196         # delegates to the network thread.
197         dns = self.get_dns_from_peerdb(permid)
198         task = Task(self._send,permid,dns,msg,callback)
199         self.rawserver.add_task(task.start, 0)
200
201
202
203     def close(self,permid):
204         """ Closes any connection to indicated permid. Non-blocking.
205             
206             Pre: connection to permid must have been established successfully.
207
208             Network thread calls "callback(exc,permid,selver)" when the connection
209             is closed.
210         """
211         # To prevent concurrency problems on sockets the calling thread 
212         # delegates to the network thread.
213         task = Task(self._close,permid)
214         self.rawserver.add_task(task.start, 0)
215
216
217     def register_recv_callback(self,callback):
218         """ Register a callback to be called when receiving a message from 
219             any permid. Non-blocking.
220
221             Network thread calls "callback(exc,permid,selver,msg)" when a message 
222             is received. The callback is not called on errors e.g. remote 
223             connection close.
224             
225             The callback must return True to keep the connection open.
226         """
227         self.usermsghandler = callback
228
229     def register_conns_callback(self,callback):
230         """ Register a callback to be called when receiving a connection from 
231             any permid. Non-blocking.
232
233             Network thread calls "callback(exc,permid,selver,locally_initiated)" 
234             when a connection is established (locally initiated or remote), or
235             when a connection is closed locally or remotely. In the former case, 
236             exc is None, otherwise it contains an Exception.
237
238             Note that this means that if a callback is registered via this method,
239             both this callback and the callback passed to a connect() method 
240             will be called.
241         """
242         self.userconnhandler = callback
243
244
245     #
246     # Internal methods
247     #
248     def _connect_dns(self,dns,callback):
249         try:
250             if DEBUG:
251                 print >> sys.stderr,"secover: actual connect_dns",dns
252             if dns[0] == self.myip and int(dns[1]) == self.myport:
253                 callback(KeyError('IP and port of the target is the same as myself'),dns,None,0)
254             iplport = ip_and_port2str(dns[0],dns[1])
255             oc = None
256             try:
257                 oc = self.iplport2oc[iplport]
258             except KeyError:
259                 pass
260             if oc is None:
261                 oc = self.start_connection(dns)
262                 self.iplport2oc[iplport] = oc
263             if not oc.is_auth_done():
264                 oc.queue_callback(dns,callback)
265             else:
266                 callback(None,dns,oc.get_auth_permid(),oc.get_sel_proto_ver())
267         except Exception,exc:
268             if DEBUG:
269                 print_exc()
270             callback(exc,dns,None,0)
271
272     def _connect(self,expectedpermid,dns,callback):
273         if DEBUG:
274             print >> sys.stderr,"secover: actual connect",show_permid_short(expectedpermid), currentThread().getName()
275         if expectedpermid == self.permid:
276             callback(KeyError('The target permid is the same as my permid'),None,expectedpermid,0)
277         try:
278             oc = self.get_oc_by_permid(expectedpermid)
279             if oc is None:
280                 if dns is None:
281                     callback(KeyError('IP address + port for permid unknown'),dns,expectedpermid,0)
282                 else:
283                     self._connect_dns(dns,lambda exc,dns2,peerpermid,selver:\
284                           self._whoishe_callback(exc,dns2,peerpermid,selver,expectedpermid,callback))
285             else:
286                 # We already have a connection to this permid
287                 self._whoishe_callback(None,(oc.get_ip(),oc.get_auth_listen_port()),expectedpermid,oc.get_sel_proto_ver(),expectedpermid,callback)
288         except Exception,exc:
289             if DEBUG:
290                 print_exc()
291             callback(exc,None,expectedpermid,0)
292
293     def _whoishe_callback(self,exc,dns,peerpermid,selver,expectedpermid,callback):
294         """ Called by network thread after the permid on the other side is known
295             or an error occured
296         """
297         try:
298             if exc is None:
299                 # Connect went OK
300                 if peerpermid == expectedpermid:
301                     callback(None,dns,expectedpermid,selver)
302                 else:
303                     # Someone else answered the phone
304                     callback(KeyError('Recorded IP address + port now of other permid'),
305                                      dns,expectedpermid,0)
306             else:
307                 callback(exc,dns,expectedpermid,0)
308         except Exception,exc:
309             if DEBUG:
310                 print_exc()
311             callback(exc,dns,expectedpermid,0)
312
313     def _send(self,permid,dns,message,callback):
314         if DEBUG:
315             print >> sys.stderr,"secover: actual send",getMessageName(message[0]),\
316                         "to",show_permid_short(permid), currentThread().getName()
317         try:
318             if dns is None:
319                 callback(KeyError('IP address + port for permid unknown'),permid)
320             else:
321                 iplport = ip_and_port2str(dns[0],dns[1])
322                 oc = None
323                 try:
324                     oc = self.iplport2oc[iplport]
325                 except KeyError:
326                     pass
327                 if oc is None:
328                     callback(KeyError('Not connected to permid'),permid)
329                 elif oc.is_auth_done():
330                     if oc.get_auth_permid() == permid:
331                         oc.send_message(message)
332                         callback(None,permid)
333                     else:
334                         callback(KeyError('Recorded IP address + port now of other permid'),permid)
335                 else:
336                     callback(KeyError('Connection not yet established'),permid)
337         except Exception,exc:
338             if DEBUG:
339                 print_exc()
340             callback(exc,permid)
341
342
343     def _close(self,permid):
344         if DEBUG:
345             print >> sys.stderr,"secover: actual close",show_permid_short(permid)
346         try:
347             oc = self.get_oc_by_permid(permid)
348             if not oc:
349                 if DEBUG:
350                     print >> sys.stderr,"secover: error - actual close, but no connection to peer in admin"
351             else:
352                 oc.close()
353         except Exception,e:
354             print_exc()
355
356     #
357     # Interface for SocketHandler
358     #
359     def get_handler(self):
360         return self
361     
362     def external_connection_made(self,singsock):
363         """ incoming connection (never used) """
364         if DEBUG:
365             print >> sys.stderr,"secover: external_connection_made",singsock.get_ip(),singsock.get_port()
366         # self.last_activity = time()
367         oc = OverlayConnection(self,singsock,self.rawserver)
368         singsock.set_handler(oc)
369
370     def connection_flushed(self,singsock):
371         """ sockethandler flushes connection """
372         if DEBUG:
373             print >> sys.stderr,"secover: connection_flushed",singsock.get_ip(),singsock.get_port()
374     
375     #
376     # Interface for ServerPortHandler
377     #
378     def externally_handshaked_connection_made(self, singsock, options, msg_remainder):
379         """ incoming connection, handshake partially read to identity 
380             as an it as overlay connection (used always)
381         """
382         if DEBUG:
383             print >> sys.stderr,"secover: externally_handshaked_connection_made",\
384                 singsock.get_ip(),singsock.get_port()
385         oc = OverlayConnection(self,singsock,self.rawserver,ext_handshake = True, options = options)
386         singsock.set_handler(oc)
387         if msg_remainder:
388             oc.data_came_in(singsock,msg_remainder)
389         return True
390
391
392     #
393     # Interface for OverlayConnection
394     #
395     def got_auth_connection(self,oc):
396         """ authentication of peer via identity protocol succesful """
397         if DEBUG:
398             print >> sys.stderr,"secover: got_auth_connection", \
399                 show_permid_short(oc.get_auth_permid()),oc.get_ip(),oc.get_auth_listen_port(), currentThread().getName()
400
401         if oc.is_locally_initiated() and oc.get_port() != oc.get_auth_listen_port():
402             if DEBUG:
403                 print >> sys.stderr,"secover: got_auth_connection: closing because auth", \
404                     "listen port not as expected",oc.get_port(),oc.get_auth_listen_port()
405             self.cleanup_admin_and_callbacks(oc,Exception('closing because auth listen port not as expected'))
406             return False
407
408         # self.last_activity = time()
409
410         ret = True
411         iplport = ip_and_port2str(oc.get_ip(),oc.get_auth_listen_port())
412         known = iplport in self.iplport2oc
413         if not known:
414             self.iplport2oc[iplport] = oc
415         elif known and not oc.is_locally_initiated():
416             # Locally initiated connections will already be registered,
417             # so if it's not a local connection and we already have one 
418             # we have a duplicate, and we close the new one.
419             if DEBUG:
420                 print >> sys.stderr,"secover: got_auth_connection:", \
421                     "closing because we already have a connection to",iplport
422             self.cleanup_admin_and_callbacks(oc,
423                      Exception('closing because we already have a connection to peer'))
424             ret = False
425             
426         if ret:
427             if oc.is_auth_done():
428                 hisdns = (oc.get_ip(),oc.get_auth_listen_port())
429             else:
430                 hisdns = None
431
432             #if DEBUG:
433             #    print >>sys.stderr,"secover: userconnhandler is",self.userconnhandler
434             
435             if self.userconnhandler is not None:
436                 try:
437                     self.userconnhandler(None,oc.get_auth_permid(),oc.get_sel_proto_ver(),oc.is_locally_initiated(),hisdns)
438                 except:
439                     # Catch all
440                     print_exc()
441             oc.dequeue_callbacks()
442         return ret
443
444     def local_close(self,oc):
445         """ our side is closing the connection """
446         if DEBUG:
447             print >> sys.stderr,"secover: local_close"
448         self.cleanup_admin_and_callbacks(oc,CloseException('local close',oc.is_auth_done()))
449
450     def connection_lost(self,oc):
451         """ overlay connection telling us to clear admin """
452         if DEBUG:
453             print >> sys.stderr,"secover: connection_lost"
454         self.cleanup_admin_and_callbacks(oc,CloseException('connection lost',oc.is_auth_done()))
455
456
457     def got_message(self,permid,message,selversion):
458         """ received message from authenticated peer, pass to upper layer """
459         if DEBUG:
460             print >> sys.stderr,"secover: got_message",getMessageName(message[0]),\
461                             "v"+str(selversion)
462         # self.last_activity = time()
463         if self.usermsghandler is None:
464             if DEBUG:
465                 print >> sys.stderr,"secover: User receive callback not set"
466             return
467         try:
468             
469             #if DEBUG:
470             #    print >>sys.stderr,"secover: usermsghandler is",self.usermsghandler
471             
472             ret = self.usermsghandler(permid,selversion,message)
473             if ret is None:
474                 if DEBUG:
475                     print >> sys.stderr,"secover: INTERNAL ERROR:", \
476                         "User receive callback returned None, not True or False"
477                 ret = False
478             elif DEBUG:
479                 print >> sys.stderr,"secover: message handler returned",ret
480             return ret
481         except:
482             # Catch all
483             print_exc()
484             return False
485
486         
487     def get_max_len(self):
488         return self.max_len
489     
490     def get_my_peer_id(self):
491         return self.myid
492
493     def get_my_keypair(self):
494         return self.mykeypair
495
496     def measurefunc(self,length):
497         pass
498
499     #
500     # Interface for OverlayThreadingBridge
501     #
502     def get_dns_from_peerdb(self,permid,use_cache=True):
503         # Called by any thread, except NetworkThread
504         
505         if currentThread().getName().startswith("NetworkThread"):
506             print >>sys.stderr,"secover: get_dns_from_peerdb: called by NetworkThread!"
507             print_stack()
508         
509         dns = self.dns.get(permid, None)
510
511         if not dns:
512             values = ('ip', 'port')
513             peer = self.peer_db.getOne(values, permid=bin2str(permid))
514             if peer and peer[0] and peer[1]:
515                 ip = hostname_or_ip2ip(peer[0])
516                 dns = (ip, int(peer[1]))
517         return dns
518  
519     def add_peer_to_db(self,permid,dns,selversion):
520         """ add a connected peer to database """
521         # Called by OverlayThread
522         
523         if currentThread().getName().startswith("NetworkThread"):
524             print >>sys.stderr,"secover: add_peer_to_peerdb: called by NetworkThread!"
525             print_stack()
526         if DEBUG:
527             print >>sys.stderr,"secover: add_peer_to_peerdb: called by",currentThread().getName()
528         
529         self.dns[permid] = dns    # cache it to avoid querying db later
530         now = int(time())
531         peer_data = {'permid':permid, 'ip':dns[0], 'port':dns[1], 'oversion':selversion, 'last_seen':now, 'last_connected':now}
532         self.peer_db.addPeer(permid, peer_data, update_dns=True, update_connected=True, commit=True)
533         #self.peer_db.updateTimes(permid, 'connected_times', 1, commit=True)
534         
535
536     def update_peer_status(self,permid,authwasdone):
537         """ update last_seen and last_connected in peer db when close """
538         # Called by OverlayThread
539         
540         if currentThread().getName().startswith("NetworkThread"):
541             print >>sys.stderr,"secover: update_peer_status: called by NetworkThread!"
542             print_stack()
543         
544         now = int(time())
545         if authwasdone:
546             self.peer_db.updatePeer(permid, last_seen=now, last_connected=now)
547             self.lm.session.uch.notify(NTFY_PEERS, NTFY_CONNECTION, permid, False)
548     #
549     # Interface for debugging
550     #
551     def debug_get_live_connections(self):
552         """ return a list of (permid,dns) tuples of the peers with which we 
553             are connected. Like all methods here it must be called by the network thread
554         """
555         live_conn = []
556         for iplport in self.iplport2oc:
557             oc = self.iplport2oc[iplport]
558             if oc:
559                 peer_permid = oc.get_auth_permid()
560                 if peer_permid:
561                     live_conn.append((peer_permid,(oc.get_ip(),oc.get_port())))
562         return live_conn
563
564
565     #
566     # Internal methods
567     #
568     def start_connection(self,dns):
569         if DEBUG:
570             print >> sys.stderr,"secover: Attempt to connect to",dns
571         singsock = self.sock_hand.start_connection(dns)
572         oc = OverlayConnection(self,singsock,self.rawserver,
573                                locally_initiated=True,specified_dns=dns)
574         singsock.set_handler(oc)
575         return oc
576
577     def cleanup_admin_and_callbacks(self,oc,exc):
578         oc.cleanup_callbacks(exc)
579         self.cleanup_admin(oc)
580         if oc.is_auth_done() and self.userconnhandler is not None:
581             self.userconnhandler(exc,oc.get_auth_permid(),oc.get_sel_proto_ver(),
582                                  oc.is_locally_initiated(),None)
583
584     def cleanup_admin(self,oc):
585         iplports = []
586         d = 0
587         for key in self.iplport2oc.keys():
588             #print "***** iplport2oc:", key, self.iplport2oc[key]
589             if self.iplport2oc[key] == oc:
590                 del self.iplport2oc[key]
591                 #print "*****!!! del", key, oc
592                 d += 1
593         
594     def get_oc_by_permid(self, permid):
595         """ return the OverlayConnection instance given a permid """
596
597         for iplport in self.iplport2oc:
598             oc = self.iplport2oc[iplport]
599             if oc.get_auth_permid() == permid:
600                 return oc
601         return None
602
603
604
605 class Task:
606     def __init__(self,method,*args, **kwargs):
607         self.method = method
608         self.args = args
609         self.kwargs = kwargs
610
611     def start(self):
612         if DEBUG:
613             print >> sys.stderr,"secover: task: start",self.method
614             #print_stack()
615         self.method(*self.args,**self.kwargs)
616
617     
618 class CloseException(Exception):
619     def __init__(self,msg=None,authdone=False):
620         Exception.__init__(self,msg)
621         self.authdone= authdone
622
623     def __str__(self):
624         return str(self.__class__)+': '+Exception.__str__(self)
625
626     def was_auth_done(self):
627         return self.authdone
628     
629
630 class OverlayConnection:
631     def __init__(self,handler,singsock,rawserver,locally_initiated = False,
632                  specified_dns = None, ext_handshake = False,options = None):
633         self.handler = handler        
634         self.singsock = singsock # for writing
635         self.rawserver = rawserver
636         self.buffer = StringIO()
637         self.cb_queue = []
638         self.auth_permid = None
639         self.unauth_peer_id = None
640         self.auth_peer_id = None
641         self.auth_listen_port = None
642         self.low_proto_ver = 0
643         self.cur_proto_ver = 0
644         self.sel_proto_ver = 0
645         self.options = None
646         self.locally_initiated = locally_initiated
647         self.specified_dns = specified_dns
648         self.last_use = time()
649
650         self.state = STATE_INITIAL
651         self.write(chr(len(protocol_name)) + protocol_name + 
652                 option_pattern + overlay_infohash + self.handler.get_my_peer_id())
653         if ext_handshake:
654             self.state = STATE_HS_PEERID_WAIT
655             self.next_len = 20
656             self.next_func = self.read_peer_id
657             self.set_options(options)
658         else:
659             self.state = STATE_HS_FULL_WAIT
660             self.next_len = 1
661             self.next_func = self.read_header_len
662             
663         # Leave autoclose here instead of SecureOverlay, as that doesn't record
664         # remotely-initiated OverlayConnections before authentication is done.
665         self.rawserver.add_task(self._olconn_auto_close, EXPIRE_CHECK_INTERVAL)
666
667     #
668     # Interface for SocketHandler
669     #
670     def data_came_in(self, singsock, data):
671         """ sockethandler received data """
672         # now we got something we can ask for the peer's real port
673         dummy_port = singsock.get_port(True)
674
675         if DEBUG:
676             print >> sys.stderr,"olconn: data_came_in",singsock.get_ip(),singsock.get_port()
677         self.handler.measurefunc(len(data))
678         self.last_use = time()
679         while 1:
680             if self.state == STATE_CLOSED:
681                 return
682             i = self.next_len - self.buffer.tell()
683             if i > len(data):
684                 self.buffer.write(data)
685                 return
686             self.buffer.write(data[:i])
687             data = data[i:]
688             m = self.buffer.getvalue()
689             self.buffer.reset()
690             self.buffer.truncate()
691             try:
692                 if DEBUG:
693                     print >> sys.stderr,"olconn: Trying to read",self.next_len #,"using",self.next_func
694                 x = self.next_func(m)
695             except:
696                 self.next_len, self.next_func = 1, self.read_dead
697                 if DEBUG:
698                     print_exc()
699                 raise
700             if x is None:
701                 if DEBUG:
702                     print >> sys.stderr,"olconn: next_func returned None",self.next_func
703                 self.close()
704                 return
705             self.next_len, self.next_func = x
706
707     def connection_lost(self,singsock):
708         """ kernel or socket handler reports connection lost """
709         if DEBUG:
710             print >> sys.stderr,"olconn: connection_lost",singsock.get_ip(),singsock.get_port(),self.state
711         if self.state != STATE_CLOSED:
712             self.state = STATE_CLOSED
713             self.handler.connection_lost(self)
714
715     def connection_flushed(self,singsock):
716         """ sockethandler flushes connection """
717         pass
718
719     # 
720     # Interface for SecureOverlay
721     #
722     def send_message(self,message):
723         self.last_use = time()
724         s = tobinary(len(message))+message
725         self.write(s)
726
727     def is_locally_initiated(self):
728         return self.locally_initiated
729
730     def get_ip(self):
731         return self.singsock.get_ip()
732
733     def get_port(self):
734         return self.singsock.get_port()
735
736     def is_auth_done(self):
737         return self.auth_permid is not None
738
739     def get_auth_permid(self):
740         return self.auth_permid
741
742     def get_auth_listen_port(self):
743         return self.auth_listen_port
744
745     def get_remote_listen_port(self):
746         if self.is_auth_done():
747             return self.auth_listen_port
748         elif self.is_locally_initiated():
749             return self.specified_dns[1]
750         else:
751             return NO_REMOTE_LISTEN_PORT_KNOWN
752
753     def get_low_proto_ver(self):
754         return self.low_proto_ver
755
756     def get_cur_proto_ver(self):
757         return self.cur_proto_ver
758
759     def get_sel_proto_ver(self):
760         return self.sel_proto_ver
761
762     def queue_callback(self,dns,callback):
763         if callback is not None:
764             self.cb_queue.append(callback)
765
766     def dequeue_callbacks(self):
767         try:
768             permid = self.get_auth_permid()
769             for callback in self.cb_queue:
770                 callback(None,self.specified_dns,permid,self.get_sel_proto_ver())
771             self.cb_queue = []
772         except Exception,e:
773             print_exc()
774
775
776     def cleanup_callbacks(self,exc):
777         if DEBUG:
778             print >> sys.stderr,"olconn: cleanup_callbacks: #callbacks is",len(self.cb_queue)
779         try:
780             for callback in self.cb_queue:
781                 ## Failure connecting
782                 if DEBUG:
783                     print >> sys.stderr,"olconn: cleanup_callbacks: callback is",callback
784                 callback(exc,self.specified_dns,self.get_auth_permid(),0)
785         except Exception,e:
786             print_exc()
787
788     #
789     # Interface for ChallengeResponse
790     #
791     def get_unauth_peer_id(self):
792         return self.unauth_peer_id
793
794     def got_auth_connection(self,singsock,permid,peer_id):
795         """ authentication of peer via identity protocol succesful """
796         self.auth_permid = str(permid)
797         self.auth_peer_id = peer_id
798         self.auth_listen_port = decode_auth_listen_port(peer_id)
799
800         self.state = STATE_DATA_WAIT
801
802         if not self.handler.got_auth_connection(self):
803             self.close()
804             return
805
806     #
807     # Internal methods
808     #
809     def read_header_len(self, s):
810         if ord(s) != len(protocol_name):
811             return None
812         return len(protocol_name), self.read_header
813
814     def read_header(self, s):
815         if s != protocol_name:
816             return None
817         return 8, self.read_reserved
818
819     def read_reserved(self, s):
820         if DEBUG:
821             print >> sys.stderr,"olconn: Reserved bits:", `s`
822         self.set_options(s)
823         return 20, self.read_download_id
824
825     def read_download_id(self, s):
826         if s != overlay_infohash:
827             return None
828         return 20, self.read_peer_id
829
830     def read_peer_id(self, s):
831         self.unauth_peer_id = s
832         
833         [self.low_proto_ver,self.cur_proto_ver] = get_proto_version_from_peer_id(self.unauth_peer_id)
834         self.sel_proto_ver = select_supported_protoversion(self.low_proto_ver,self.cur_proto_ver)
835         if not self.sel_proto_ver:
836             if DEBUG:
837                 print >> sys.stderr,"olconn: We don't support peer's version of the protocol"
838             return None
839         elif DEBUG:
840             print >> sys.stderr,"olconn: Selected protocol version",self.sel_proto_ver
841
842         if self.cur_proto_ver <= 2:
843             # Arno, 2010-02-04: Kick TorrentSwapper clones, still around 
844             print >>sys.stderr,"olconn: Kicking ancient peer",`self.unauth_peer_id`,self.get_ip()
845             return None
846
847         self.state = STATE_AUTH_WAIT
848         self.cr = ChallengeResponse(self.handler.get_my_keypair(),self.handler.get_my_peer_id(),self)
849         if self.locally_initiated:
850             self.cr.start_cr(self)
851         return 4, self.read_len
852     
853
854     def read_len(self, s):
855         l = toint(s)
856         if l > self.handler.get_max_len():
857             return None
858         return l, self.read_message
859
860     def read_message(self, s):
861         if s != '':
862             if self.state == STATE_AUTH_WAIT:
863                 if not self.cr.got_message(self,s):
864                     return None
865             elif self.state == STATE_DATA_WAIT:
866                 if not self.handler.got_message(self.auth_permid,s,self.sel_proto_ver):
867                     return None
868             else:
869                 if DEBUG:
870                     print >> sys.stderr,"olconn: Received message while in illegal state, internal error!"
871                 return None
872         return 4, self.read_len
873
874     def read_dead(self, s):
875         return None
876
877     def write(self,s):
878         self.singsock.write(s)
879
880     def set_options(self,options):
881         self.options = options
882
883     def close(self):
884         if DEBUG:
885             print >> sys.stderr,"olconn: we close()",self.get_ip(),self.get_port()
886             #print_stack()
887         self.state_when_error = self.state
888         if self.state != STATE_CLOSED:
889             self.state = STATE_CLOSED
890             self.handler.local_close(self)
891             self.singsock.close()
892         return
893
894     def _olconn_auto_close(self):
895         if (time() - self.last_use) > EXPIRE_THRESHOLD:
896             self.close()
897         else:
898             self.rawserver.add_task(self._olconn_auto_close, EXPIRE_CHECK_INTERVAL)
899
900
901 #
902 # Internal functions
903 #
904 def create_my_peer_id(my_listen_port):
905     myid = createPeerID()
906     myid = myid[:16] + pack('<H', OLPROTO_VER_LOWEST) + pack('<H', OLPROTO_VER_CURRENT)
907     myid = myid[:14] + pack('<H', my_listen_port) + myid[16:]
908     return myid
909
910 def get_proto_version_from_peer_id(peerid):
911     """ overlay swarm versioning solution- use last 4 bytes in PeerID """
912
913     low_ver_str = peerid[16:18]
914     cur_ver_str = peerid[18:20]
915     low_ver = unpack('<H', low_ver_str)[0]
916     cur_ver = unpack('<H', cur_ver_str)[0]
917     return [low_ver,cur_ver]
918
919 def is_proto_version_supported(low_ver,cur_ver):
920     if cur_ver != OLPROTO_VER_CURRENT:
921         if low_ver > OLPROTO_VER_CURRENT:    # the other's version is too high
922             return False
923         if cur_ver < OLPROTO_VER_LOWEST:     # the other's version is too low
924             return False           
925         if cur_ver < OLPROTO_VER_CURRENT and \
926            cur_ver not in SupportedVersions:   # the other's version is not supported
927             return False
928     return True
929
930 def select_supported_protoversion(his_low_ver,his_cur_ver):
931     selected = None
932     if his_cur_ver != OLPROTO_VER_CURRENT:
933         if his_low_ver > OLPROTO_VER_CURRENT:    # the other's low version is too high
934             return selected
935         if his_cur_ver < OLPROTO_VER_LOWEST:     # the other's current version is too low
936             return selected        
937         if his_cur_ver < OLPROTO_VER_CURRENT and \
938            his_cur_ver not in SupportedVersions:   # the other's current version is not supported (peer of this version is abondoned)
939             return selected
940         
941     selected = min(his_cur_ver,OLPROTO_VER_CURRENT)
942     return selected
943
944 def decode_auth_listen_port(peerid):
945     bin = peerid[14:16]
946     tup = unpack('<H', bin)
947     return tup[0]
948
949 def ip_and_port2str(ip,port):
950     return ip+':'+str(port)