instrumentation: add next-share/
[cs-p2p-next.git] / instrumentation / next-share / BaseLib / Core / ProxyService / Helper.py
1 # Written by Pawel Garbacki, George Milescu
2 # see LICENSE.txt for license information
3
4 import sys
5 from traceback import print_exc
6 from time import time
7 from collections import deque
8 from threading import Lock
9
10 from BaseLib.Core.BitTornado.bencode import bencode
11 from BaseLib.Core.BitTornado.BT1.MessageID import ASK_FOR_HELP, STOP_HELPING, REQUEST_PIECES, CANCEL_PIECE, JOIN_HELPERS, RESIGN_AS_HELPER, DROPPED_PIECE, PROXY_HAVE, PROXY_UNHAVE
12
13 from BaseLib.Core.Overlay.OverlayThreadingBridge import OverlayThreadingBridge
14 from BaseLib.Core.CacheDB.CacheDBHandler import PeerDBHandler, TorrentDBHandler 
15 from BaseLib.Core.Utilities.utilities import show_permid_short
16
17 # ???
18 MAX_ROUNDS = 200
19 # ???
20 DEBUG = False
21
22 class SingleDownloadHelperInterface:
23     """ This interface should contain all methods that the PiecePiecker/Helper
24         calls on the SingleDownload class.
25     """
26     def __init__(self):
27         self.frozen_by_helper = False
28
29     def helper_set_freezing(self,val):
30         self.frozen_by_helper = val
31
32     def is_frozen_by_helper(self):
33         return self.frozen_by_helper
34
35     def is_choked(self):
36         pass
37
38     def helper_forces_unchoke(self):
39         pass
40
41     def _request_more(self, new_unchoke = False):
42         pass
43
44
45 class Helper:
46     def __init__(self, torrent_hash, num_pieces, coordinator_permid, coordinator = None):
47         
48         self.torrent_hash = torrent_hash
49         self.coordinator = coordinator
50
51         if coordinator_permid is not None and coordinator_permid == '':
52             self.coordinator_permid = None
53         else:
54             self.coordinator_permid = coordinator_permid
55
56         # Get coordinator ip and address
57         self.coordinator_ip = None  # see is_coordinator()
58         self.coordinator_port = -1
59         if self.coordinator_permid is not None:
60             peerdb = PeerDBHandler.getInstance()
61             peer = peerdb.getPeer(coordinator_permid)
62             if peer is not None:
63                 self.coordinator_ip = peer['ip']
64                 self.coordinator_port = peer['port']
65         
66         self.overlay_bridge = OverlayThreadingBridge.getInstance()
67         
68         self.reserved_pieces = [False] * num_pieces
69         self.ignored_pieces = [False] * num_pieces
70         self.distr_reserved_pieces = [False] * num_pieces
71
72         self.requested_pieces = deque()
73         self.requested_pieces_lock = Lock()
74         
75         self.counter = 0
76         self.completed = False
77         self.marker = [True] * num_pieces
78         self.round = 0
79         self.encoder = None
80         self.continuations = []
81         self.outstanding = None
82         self.last_req_time = 0
83         
84         # The challenge sent by the coordinator
85         self.challenge = None
86         
87
88     def test(self):
89         result = self.reserve_piece(10,None)
90         print >> sys.stderr,"reserve piece returned: " + str(result)
91         print >> sys.stderr,"Test passed"
92
93
94
95
96
97     def notify(self):
98         """ Called by HelperMessageHandler to "wake up" the download that's
99             waiting for its coordinator to reserve it a piece 
100         """
101         if self.outstanding is None:
102             if DEBUG:
103                 print >> sys.stderr,"helper: notify: No continuation waiting?"
104         else:
105             if DEBUG:
106                 print >> sys.stderr,"helper: notify: Waking downloader"
107             sdownload = self.outstanding
108             self.outstanding = None # must be not before calling self.restart!
109             self.restart(sdownload)
110             
111             #self.send_reservation()
112             l = self.continuations[:] # copy just to be sure
113             self.continuations = []
114             for sdownload in l:
115                 self.restart(sdownload)
116
117     def restart(self,sdownload):
118         """ TODO ???
119         """
120         # Chokes can get in while we're waiting for reply from coordinator.
121         # But as we were called from _request_more() we were not choked
122         # just before, so pretend we didn't see the message yet.
123         if sdownload.is_choked():
124             sdownload.helper_forces_unchoke()
125         sdownload.helper_set_freezing(False)
126         sdownload._request_more()
127
128
129
130
131
132     #
133     # Send messages
134     # 
135
136     def send_join_helpers(self, permid):
137         """ Send a confirmation to the coordinator that the current node will provide proxy services
138         
139         Called by self.got_ask_for_help()
140         
141         @param permid: The permid of the node that will become coordinator
142         """
143
144         if DEBUG:
145             print "helper: send_join_helpers: sending a join_helpers message to", show_permid_short(permid)
146
147         olthread_send_join_helpers_lambda = lambda:self.olthread_send_join_helpers()
148         self.overlay_bridge.add_task(olthread_send_join_helpers_lambda,0)
149
150         
151     def olthread_send_join_helpers(self):
152         """ Creates a bridge connection for the join helpers message to be sent
153         
154         Called by the overlay thread.
155         """
156         # TODO: ??? We need to create the message according to protocol version, so need to pass all info.
157         olthread_join_helpers_connect_callback_lambda = lambda e,d,p,s:self.olthread_join_helpers_connect_callback(e,d,p,s)
158         self.overlay_bridge.connect(self.coordinator_permid,olthread_join_helpers_connect_callback_lambda)
159
160
161     def olthread_join_helpers_connect_callback(self,exc,dns,permid,selversion):
162         """ Sends the join helpers message on the connection with the coordinator
163         
164         Called by the overlay thread.
165         
166         @param exc: Peer reachable/unreachable information. None = peer reachable
167         @param dns:
168         @param permid: the permid of the coordinator
169         @param selversion:
170         """
171         if exc is None:
172             # Create message according to protocol version
173             message = JOIN_HELPERS + self.torrent_hash
174
175             if DEBUG:
176                 print >> sys.stderr,"helper: olthread_join_helpers_connect_callback: Sending JOIN_HELPERS to",show_permid_short(permid)
177
178             self.overlay_bridge.send(permid, message, self.olthread_join_helpers_send_callback)
179         elif DEBUG:
180             # The coordinator is unreachable
181             print >> sys.stderr,"helper: olthread_join_helpers_connect_callback: error connecting to",show_permid_short(permid),exc
182
183
184     def olthread_join_helpers_send_callback(self, exc, permid):
185         """ Callback function for error checking in network communication
186         
187         Called by the overlay thread.
188         
189         @param exc: Peer reachable/unreachable information. None = peer reachable
190         @param permid: the permid of the peer that is contacted for helping
191         """
192
193         if exc is not None:
194             if DEBUG:
195                 print >> sys.stderr,"helper: olthread_join_helpers_send_callback: error sending message to",show_permid_short(permid),exc
196         
197         pass
198
199
200
201
202
203     def send_proxy_have(self, aggregated_haves):
204         """ Send a list of aggregated have and bitfield information
205         
206         Called by Downloader.aggregate_and_send_haves
207         
208         @param aggregated_haves: A Bitfield object, containing an aggregated list of stored haves
209         """
210
211         if DEBUG:
212             print "helper: send_proxy_have: sending a proxy_have message to", show_permid_short(self.coordinator_permid)
213
214         aggregated_string = aggregated_haves.tostring()
215         olthread_send_proxy_have_lambda = lambda:self.olthread_send_proxy_have(aggregated_string)
216         self.overlay_bridge.add_task(olthread_send_proxy_have_lambda,0)
217
218         
219     def olthread_send_proxy_have(self, aggregated_string):
220         """ Creates a bridge connection for the proxy_have message to be sent
221         
222         Called by the overlay thread.
223         
224         @param aggregated_string: a bitstring of available piesces
225         """
226         # TODO: ??? We need to create the message according to protocol version, so need to pass all info.
227         olthread_proxy_have_connect_callback_lambda = lambda e,d,p,s:self.olthread_proxy_have_connect_callback(e,d,p,s,aggregated_string)
228         self.overlay_bridge.connect(self.coordinator_permid,olthread_proxy_have_connect_callback_lambda)
229
230
231     def olthread_proxy_have_connect_callback(self,exc,dns,permid,selversion, aggregated_string):
232         """ Sends the proxy_have message on the connection with the coordinator
233         
234         Called by the overlay thread.
235         
236         @param exc: Peer reachable/unreachable information. None = peer reachable
237         @param dns:
238         @param permid: the permid of the coordinator
239         @param selversion: selected (buddycast?) version
240         @param aggregated_string: a bitstring of available pieces
241         """
242         if exc is None:
243             # Create message according to protocol version
244             message = PROXY_HAVE + self.torrent_hash + bencode(aggregated_string)
245
246             if DEBUG:
247                 print >> sys.stderr,"helper: olthread_proxy_have_connect_callback: Sending PROXY_HAVE to",show_permid_short(permid)
248
249             self.overlay_bridge.send(permid, message, self.olthread_proxy_have_send_callback)
250         elif DEBUG:
251             # The coordinator is unreachable
252             print >> sys.stderr,"helper: olthread_proxy_have_connect_callback: error connecting to",show_permid_short(permid),exc
253
254
255     def olthread_proxy_have_send_callback(self, exc, permid):
256         """ Callback function for error checking in network communication
257         
258         Called by the overlay thread.
259         
260         @param exc: Peer reachable/unreachable information. None = peer reachable
261         @param permid: the permid of the peer that is contacted for helping
262         """
263
264         if exc is not None:
265             if DEBUG:
266                 print >> sys.stderr,"helper: olthread_proxy_have_send_callback: error sending message to",show_permid_short(permid),exc
267         
268         pass
269
270
271
272
273
274     def send_resign_as_helper(self, permid):
275         """ Send a message to the coordinator that the current node will stop providing proxy services
276         
277         Called by TODO
278         
279         @param permid: The permid of the coordinator
280         """
281
282         if DEBUG:
283             print "helper: send_resign_as_helper: sending a resign_as_helper message to", permid
284
285         olthread_send_resign_as_helper_lambda = lambda:self.olthread_send_resign_as_helper()
286         self.overlay_bridge.add_task(olthread_send_resign_as_helper_lambda,0)
287
288         
289     def olthread_send_resign_as_helper(self):
290         """ Creates a bridge connection for the resign_as_helper message to be sent
291         
292         Called by the overlay thread.
293         """
294         olthread_resign_as_helper_connect_callback_lambda = lambda e,d,p,s:self.olthread_resign_as_helper_connect_callback(e,d,p,s)
295         self.overlay_bridge.connect(self.coordinator_permid,olthread_resign_as_helper_connect_callback_lambda)
296
297
298     def olthread_resign_as_helper_connect_callback(self,exc,dns,permid,selversion):
299         """ Sends the resign_as_helper message on the connection with the coordinator
300         
301         Called by the overlay thread.
302         
303         @param exc: Peer reachable/unreachable information. None = peer reachable
304         @param dns:
305         @param permid: the permid of the coordinator
306         @param selversion:
307         """
308         if exc is None:
309             # Create message according to protocol version
310             message = RESIGN_AS_HELPER + self.torrent_hash
311
312             if DEBUG:
313                 print >> sys.stderr,"helper: olthread_resign_as_helper_connect_callback: Sending RESIGN_AS_HELPER to",show_permid_short(permid)
314
315             self.overlay_bridge.send(permid, message, self.olthread_resign_as_helper_send_callback)
316         elif DEBUG:
317             # The coordinator is unreachable
318             print >> sys.stderr,"helper: olthread_resign_as_helper_connect_callback: error connecting to",show_permid_short(permid),exc
319
320
321     def olthread_resign_as_helper_send_callback(self,exc,permid):
322         """ Callback function for error checking in network communication
323         
324         Called by the overlay thread.
325         
326         @param exc: Peer reachable/unreachable information. None = peer reachable
327         @param permid: the permid of the peer that is contacted for helping
328         """
329         
330         if exc is not None:
331             if DEBUG:
332                 print >> sys.stderr,"helper: olthread_resign_as_helper_send_callback: error sending message to",show_permid_short(permid),exc
333         
334         pass
335
336     
337     
338     
339
340     #
341     # Got (received) messages
342     # 
343     def got_ask_for_help(self, permid, infohash, challenge):
344         """ Start helping a coordinator or reply with an resign_as_helper message
345         
346         @param permid: The permid of the node sending the help request message
347         @param infohash: the infohash of the torrent for which help is requested 
348         @param challenge: The challenge sent by the coordinator
349         """
350         if DEBUG:
351             print >>sys.stderr,"helper: got_ask_for_help: will answer to the help request from", show_permid_short(permid)
352         if self.can_help(infohash):
353             # Send JOIN_HELPERS
354             if DEBUG:
355                 print >>sys.stderr,"helper: got_ask_for_help: received a help request, going to send join_helpers"
356             self.send_join_helpers(permid)
357             self.challenge = challenge
358         else:
359             # Send RESIGN_AS_HELPER
360             if DEBUG:
361                 print >>sys.stderr,"helper: got_ask_for_help: received a help request, going to send resign_as_helper"
362             self.send_resign_as_helper(permid)
363             return False
364
365         return True
366
367
368     def can_help(self, infohash):
369         """ Decide if the current node can help a coordinator for the current torrent
370         
371         @param infohash: the infohash of the torrent for which help is requested 
372         """        
373         #TODO: test if I can help the cordinator to download this file
374         #Future support: make the decision based on my preference
375         return True
376
377
378
379
380
381     def got_stop_helping(self, permid, infohash):
382         """ Stop helping a coordinator
383         
384         @param permid: The permid of the node sending the message
385         @param infohash: the infohash of the torrent for which help is released 
386         """        
387         #TODO: decide what to do here
388         return True
389
390
391
392
393
394     def got_request_pieces(self, permid, piece):
395         """ Start downloading the pieces that the coordinator requests
396         
397         @param permid: The permid of the node requesting the pieces
398         @param piece: a piece number, that is going to be downloaded 
399         """        
400         if DEBUG:
401             print "helper: got_request_pieces: received request_pieces for piece", piece
402
403         # Mark the piece as requested in the local data structures
404         self.reserved_pieces[piece] = True
405 #        if self.distr_reserved_pieces[piece] == True:
406             # if the piece was previously requested by the same coordinator, don't do anything
407             #self.distr_reserved_pieces[piece] = True
408 #            print "Received duplicate proxy request for", piece
409 #            return
410
411         self.distr_reserved_pieces[piece] = True
412         self.ignored_pieces[piece] = False
413         
414         self.requested_pieces_lock.acquire()
415         self.requested_pieces.append(piece)
416         self.requested_pieces_lock.release()
417
418         # Start data connection
419         self.start_data_connection()
420
421     def start_data_connection(self):
422         """ Start a data connection with the coordinator
423         
424         @param permid: The permid of the coordinator
425         """
426         # Do this always, will return quickly when connection already exists
427         dns = (self.coordinator_ip, self.coordinator_port)
428         if DEBUG:
429             print >> sys.stderr,"helper: start_data_connection: Starting data connection to coordinator at", dns
430         
431         self.encoder.start_connection(dns, id = None, coord_con = True, challenge = self.challenge)
432
433
434
435     #
436     # Util functions
437     #
438     def is_coordinator(self, permid):
439         """ Check if the permid is the current coordinator
440         
441         @param permid: The permid to be checked if it is the coordinator
442         @return: True, if the permid is the current coordinator; False, if the permid is not the current coordinator
443         """
444         # If we could get coordinator_ip, don't help
445         if self.coordinator_ip is None:
446             return False
447
448         if self.coordinator_permid == permid:
449             return True
450         else:
451             return False
452
453
454     def next_request(self):
455         """ Returns the next piece in the list of coordinator-requested pieces
456         
457         Called by the PiecePicker
458         
459         @return: a piece number, if there is a requested piece pending download; None, if there is no pending piece
460         """
461         self.requested_pieces_lock.acquire()
462         if len(self.requested_pieces) == 0:
463             self.requested_pieces_lock.release()
464             if DEBUG:
465                 print >>sys.stderr,"helper: next_request: no requested pieces yet. Returning None"
466             return None
467         else:
468             next_piece = self.requested_pieces.popleft()
469             self.requested_pieces_lock.release()
470             if DEBUG:
471                 print >>sys.stderr,"helper: next_request: Returning", next_piece
472             return next_piece
473         
474         
475     def set_encoder(self, encoder):
476         """ Sets the current encoder.
477         
478         Called from download_bt1.py
479         
480         @param encoder: the new encoder that will be set
481         """
482         self.encoder = encoder
483         self.encoder.set_coordinator_ip(self.coordinator_ip)
484         # To support a helping user stopping and restarting a torrent
485         if self.coordinator_permid is not None:
486             self.start_data_connection()   
487
488
489     def get_coordinator_permid(self):
490         """ Returns the coordinator permid
491         
492         Called from SingleDownload.py
493         
494         @return: Coordinator permid
495         """
496         return self.coordinator_permid
497
498
499     def is_reserved(self, piece):
500         """ Check if a piece is reserved (requested) by a coordinator
501         
502         Called by the network thread (Interface for PiecePicker and Downloader)
503         
504         @param piece: the piece whose status is to be checked
505         @return: True, if the piece is reqested by a coordinator; False, otherwise.
506         """
507         if self.reserved_pieces[piece] or (self.coordinator is not None and self.is_complete()):
508             return True
509         return self.reserved_pieces[piece]
510
511
512     def is_ignored(self, piece):
513         """ Check if a piece is ignored by a coordinator
514         
515         Called by the network thread (Interface for PiecePicker and Downloader)
516         
517         @param piece: the piece whose status is to be checked
518         @return: True, if the piece is ignored by a coordinator; False, otherwise.
519         """
520         if not self.ignored_pieces[piece] or (self.coordinator is not None and self.is_complete()):
521             return False
522         return self.ignored_pieces[piece]
523
524
525     def is_complete(self):
526         """ Check torrent is completely downloaded
527         
528         Called by the network thread (Interface for PiecePicker and Downloader)
529         
530         @return: True, all the pieces are downloaded; False, otherwise.
531         """
532         if self.completed:
533             return True
534         
535         self.round = (self.round + 1) % MAX_ROUNDS
536         
537         if self.round != 0:
538             return False
539         if self.coordinator is not None:
540             self.completed = (self.coordinator.reserved_pieces == self.marker)
541         else:
542             self.completed = (self.distr_reserved_pieces == self.marker)
543         return self.completed