instrumentation: add next-share/
[cs-p2p-next.git] / instrumentation / next-share / BaseLib / Core / BitTornado / BT1 / Choker.py
1 # Written by Bram Cohen, Pawel Garbacki, Boxun Zhang
2 # see LICENSE.txt for license information
3
4 from random import randrange, shuffle
5 import sys
6
7 from BaseLib.Core.BitTornado.clock import clock
8
9 try:
10     True
11 except:
12     True = 1
13     False = 0
14
15 DEBUG = False
16
17 class Choker:
18     def __init__(self, config, schedule, picker, seeding_selector, done = lambda: False):
19         self.config = config
20         self.round_robin_period = config['round_robin_period']
21         self.schedule = schedule
22         self.picker = picker
23         self.connections = []
24         self.last_preferred = 0
25         self.last_round_robin = clock()
26         self.done = done
27         self.super_seed = False
28         self.paused = False
29         schedule(self._round_robin, 5)
30         
31         # SelectiveSeeding
32         self.seeding_manager = None
33
34         
35     def set_round_robin_period(self, x):
36         self.round_robin_period = x
37
38     def _round_robin(self):
39         self.schedule(self._round_robin, 5)
40         if self.super_seed:
41             cons = range(len(self.connections))
42             to_close = []
43             count = self.config['min_uploads']-self.last_preferred
44             if count > 0:   # optimization
45                 shuffle(cons)
46             for c in cons:
47                 # SelectiveSeeding
48                 if self.seeding_manager is None or self.seeding_manager.is_conn_eligible(c):
49
50                     i = self.picker.next_have(self.connections[c], count > 0)
51                     if i is None:
52                         continue
53                     if i < 0:
54                         to_close.append(self.connections[c])
55                         continue
56                     self.connections[c].send_have(i)
57                     count -= 1
58                 else:
59                     # Drop non-eligible connections 
60                     to_close.append(self.connections[c])
61             for c in to_close:
62                 c.close()
63         if self.last_round_robin + self.round_robin_period < clock():
64             self.last_round_robin = clock()
65             for i in xrange(1, len(self.connections)):
66                 c = self.connections[i]
67                 
68                 # SelectiveSeeding
69                 if self.seeding_manager is None or self.seeding_manager.is_conn_eligible(c):
70                     u = c.get_upload()
71                     if u.is_choked() and u.is_interested():
72                         self.connections = self.connections[i:] + self.connections[:i]
73                         break
74         self._rechoke()
75
76     def _rechoke(self):
77         # 2fast
78         helper = self.picker.helper
79         if helper is not None and helper.coordinator is None and helper.is_complete():
80             for c in self.connections:
81                 if not c.connection.is_coordinator_con():
82                     u = c.get_upload()
83                     u.choke()
84             return
85
86         if self.paused:
87             for c in self.connections:
88                 c.get_upload().choke()
89             return
90
91         # NETWORK AWARE
92         if 'unchoke_bias_for_internal' in self.config:
93             checkinternalbias = self.config['unchoke_bias_for_internal']
94         else:
95             checkinternalbias = 0
96
97         if DEBUG:
98             print >>sys.stderr,"choker: _rechoke: checkinternalbias",checkinternalbias
99             
100         # 0. Construct candidate list
101         preferred = []
102         maxuploads = self.config['max_uploads']
103         if maxuploads > 1:
104             
105             # 1. Get some regular candidates
106             for c in self.connections:
107
108                 # g2g: unchoke some g2g peers later
109                 if c.use_g2g:
110                     continue
111
112                 # SelectiveSeeding
113                 if self.seeding_manager is None or self.seeding_manager.is_conn_eligible(c):
114                     u = c.get_upload()
115                     if not u.is_interested():
116                         continue
117                     if self.done():
118                         r = u.get_rate()
119                     else:
120                         d = c.get_download()
121                         r = d.get_rate()
122                         if r < 1000 or d.is_snubbed():
123                             continue
124                        
125                     # NETWORK AWARENESS 
126                     if checkinternalbias and c.na_get_address_distance() == 0:
127                         r += checkinternalbias
128                         if DEBUG:
129                             print >>sys.stderr,"choker: _rechoke: BIASING",c.get_ip(),c.get_port()
130
131                     preferred.append((-r, c))
132                     
133             self.last_preferred = len(preferred)
134             preferred.sort()
135             del preferred[maxuploads-1:]
136             if DEBUG:
137                 print >>sys.stderr,"choker: _rechoke: NORMAL UNCHOKE",preferred
138             preferred = [x[1] for x in preferred]
139
140             # 2. Get some g2g candidates 
141             g2g_preferred = []
142             for c in self.connections:
143                 if not c.use_g2g:
144                     continue
145
146                 # SelectiveSeeding
147                 if self.seeding_manager is None or self.seeding_manager.is_conn_eligible(c):
148
149                     u = c.get_upload()
150                     if not u.is_interested():
151                         continue
152     
153                     r = c.g2g_score()
154                     if checkinternalbias and c.na_get_address_distance() == 0:
155                         r[0] += checkinternalbias
156                         r[1] += checkinternalbias
157                         if DEBUG:
158                             print >>sys.stderr,"choker: _rechoke: G2G BIASING",c.get_ip(),c.get_port()
159                    
160                     g2g_preferred.append((-r[0], -r[1], c))
161                     
162             g2g_preferred.sort()
163             del g2g_preferred[maxuploads-1:]
164             if DEBUG:
165                 print  >>sys.stderr,"choker: _rechoke: G2G UNCHOKE",g2g_preferred
166             g2g_preferred = [x[2] for x in g2g_preferred]
167
168             preferred += g2g_preferred
169
170
171         # 
172         count = len(preferred)
173         hit = False
174         to_unchoke = []
175         
176         # 3. The live source must always unchoke its auxiliary seeders
177         # LIVESOURCE
178         if 'live_aux_seeders' in self.config:
179             
180             for hostport in self.config['live_aux_seeders']:
181                 for c in self.connections:
182                     if c.get_ip() == hostport[0]:
183                         u = c.get_upload()
184                         to_unchoke.append(u)
185                         #print >>sys.stderr,"Choker: _rechoke: LIVE: Permanently unchoking aux seed",hostport
186
187         # 4. Select from candidate lists, aux seeders always selected
188         for c in self.connections:
189             u = c.get_upload()
190             if c in preferred:
191                 to_unchoke.append(u)
192             else:
193                 if count < maxuploads or not hit:
194                     if self.seeding_manager is None or self.seeding_manager.is_conn_eligible(c):
195                         to_unchoke.append(u)
196                         if u.is_interested():
197                             count += 1
198                             if DEBUG and not hit: print  >>sys.stderr,"choker: OPTIMISTIC UNCHOKE",c
199                             hit = True
200                         
201                 else:
202                     if not c.connection.is_coordinator_con() and not c.connection.is_helper_con():
203                         u.choke()
204                     elif u.is_choked():
205                         to_unchoke.append(u)
206
207         # 5. Unchoke selected candidates
208         for u in to_unchoke:
209             u.unchoke()
210
211
212     def add_connection(self, connection, p = None):
213         """
214         Just add a connection, do not start doing anything yet
215         Must call "start_connection" later!
216         """
217         print >>sys.stderr, "Added connection",connection
218         if p is None:
219             p = randrange(-2, len(self.connections) + 1)
220         connection.get_upload().choke()
221         self.connections.insert(max(p, 0), connection)
222         self.picker.got_peer(connection)
223         self._rechoke()
224         
225     def start_connection(self, connection):
226         connection.get_upload().unchoke()
227     
228     def connection_made(self, connection, p = None):
229         if p is None:
230             p = randrange(-2, len(self.connections) + 1)
231         self.connections.insert(max(p, 0), connection)
232         self.picker.got_peer(connection)
233         self._rechoke()
234
235     def connection_lost(self, connection): 
236         """ connection is a Connecter.Connection """
237         # Raynor Vliegendhart, RePEX:
238         # The RePEX code can close a connection right after the handshake 
239         # but before the Choker has been informed via connection_made. 
240         # However, Choker.connection_lost is still called when a connection
241         # is closed, so we should check whether Choker knows the connection:
242         if connection in self.connections:
243             self.connections.remove(connection)
244             self.picker.lost_peer(connection)
245             if connection.get_upload().is_interested() and not connection.get_upload().is_choked():
246                 self._rechoke()
247
248     def interested(self, connection):
249         if not connection.get_upload().is_choked():
250             self._rechoke()
251
252     def not_interested(self, connection):
253         if not connection.get_upload().is_choked():
254             self._rechoke()
255
256     def set_super_seed(self):
257         while self.connections:             # close all connections
258             self.connections[0].close()
259         self.picker.set_superseed()
260         self.super_seed = True
261
262     def pause(self, flag):
263         self.paused = flag
264         self._rechoke()
265     
266     # SelectiveSeeding
267     def set_seeding_manager(self, manager):
268         # When seeding starts, a non-trivial seeding manager will be set
269         self.seeding_manager = manager