instrumentation: add next-share/
[cs-p2p-next.git] / instrumentation / next-share / BaseLib / Core / BitTornado / subnetparse.py
1 # Written by John Hoffman
2 # see LICENSE.txt for license information
3
4 from bisect import bisect, insort
5
6 try:
7     True
8 except:
9     True = 1
10     False = 0
11     bool = lambda x: not not x
12
13 hexbinmap = {
14     '0': '0000',
15     '1': '0001',
16     '2': '0010',
17     '3': '0011',
18     '4': '0100',
19     '5': '0101',
20     '6': '0110',
21     '7': '0111',
22     '8': '1000',
23     '9': '1001',
24     'a': '1010',
25     'b': '1011',
26     'c': '1100',
27     'd': '1101',
28     'e': '1110',
29     'f': '1111',
30     'x': '0000',
31 }
32
33 chrbinmap = {}
34 for n in xrange(256):
35     b = []
36     nn = n
37     for i in xrange(8):
38         if nn & 0x80:
39             b.append('1')
40         else:
41             b.append('0')
42         nn <<= 1
43     chrbinmap[n] = ''.join(b)
44
45
46 def to_bitfield_ipv4(ip):
47     ip = ip.split('.')
48     if len(ip) != 4:
49         raise ValueError, "bad address"
50     b = []
51     for i in ip:
52         b.append(chrbinmap[int(i)])
53     return ''.join(b)
54
55 def to_bitfield_ipv6(ip):
56     b = ''
57     doublecolon = False
58
59     if not ip:
60         raise ValueError, "bad address"
61     if ip == '::':      # boundary handling
62         ip = ''
63     elif ip[:2] == '::':
64         ip = ip[1:]
65     elif ip[0] == ':':
66         raise ValueError, "bad address"
67     elif ip[-2:] == '::':
68         ip = ip[:-1]
69     elif ip[-1] == ':':
70         raise ValueError, "bad address"
71     for n in ip.split(':'):
72         if n == '':     # double-colon
73             if doublecolon:
74                 raise ValueError, "bad address"
75             doublecolon = True
76             b += ':'
77             continue
78         if n.find('.') >= 0: # IPv4
79             n = to_bitfield_ipv4(n)
80             b += n + '0'*(32-len(n))
81             continue
82         n = ('x'*(4-len(n))) + n
83         for i in n:
84             b += hexbinmap[i]
85     if doublecolon:
86         pos = b.find(':')
87         b = b[:pos]+('0'*(129-len(b)))+b[pos+1:]
88     if len(b) != 128:   # always check size
89         raise ValueError, "bad address"
90     return b
91
92 ipv4addrmask = to_bitfield_ipv6('::ffff:0:0')[:96]
93
94 class IP_List:
95     def __init__(self):
96         self.ipv4list = []
97         self.ipv6list = []
98
99     def __nonzero__(self):
100         return bool(self.ipv4list or self.ipv6list)
101
102
103     def append(self, ip, depth = 256):
104         if ip.find(':') < 0:        # IPv4
105             insort(self.ipv4list, to_bitfield_ipv4(ip)[:depth])
106         else:
107             b = to_bitfield_ipv6(ip)
108             if b.startswith(ipv4addrmask):
109                 insort(self.ipv4list, b[96:][:depth-96])
110             else:
111                 insort(self.ipv6list, b[:depth])
112
113
114     def includes(self, ip):
115         if not (self.ipv4list or self.ipv6list):
116             return False
117         if ip.find(':') < 0:        # IPv4
118             b = to_bitfield_ipv4(ip)
119         else:
120             b = to_bitfield_ipv6(ip)
121             if b.startswith(ipv4addrmask):
122                 b = b[96:]
123         if len(b) > 32:
124             l = self.ipv6list
125         else:
126             l = self.ipv4list
127         for map in l[bisect(l, b)-1:]:
128             if b.startswith(map):
129                 return True
130             if map > b:
131                 return False
132         return False
133
134
135     def read_fieldlist(self, file):   # reads a list from a file in the format 'ip/len <whatever>'
136         f = open(file, 'r')
137         while 1:
138             line = f.readline()
139             if not line:
140                 break
141             line = line.strip().expandtabs()
142             if not line or line[0] == '#':
143                 continue
144             try:
145                 line, garbage = line.split(' ', 1)
146             except:
147                 pass
148             try:
149                 line, garbage = line.split('#', 1)
150             except:
151                 pass
152             try:
153                 ip, depth = line.split('/')
154             except:
155                 ip = line
156                 depth = None
157             try:
158                 if depth is not None:                
159                     depth = int(depth)
160                 self.append(ip, depth)
161             except:
162                 print '*** WARNING *** could not parse IP range: '+line
163         f.close()
164
165
166     def set_intranet_addresses(self):
167         self.append('127.0.0.1', 8)
168         self.append('10.0.0.0', 8)
169         self.append('172.16.0.0', 12)
170         self.append('192.168.0.0', 16)
171         self.append('169.254.0.0', 16)
172         self.append('::1')
173         self.append('fe80::', 16)
174         self.append('fec0::', 16)
175
176     def set_ipv4_addresses(self):
177         self.append('::ffff:0:0', 96)
178
179 def ipv6_to_ipv4(ip):
180     ip = to_bitfield_ipv6(ip)
181     if not ip.startswith(ipv4addrmask):
182         raise ValueError, "not convertible to IPv4"
183     ip = ip[-32:]
184     x = ''
185     for i in range(4):
186         x += str(int(ip[:8], 2))
187         if i < 3:
188             x += '.'
189         ip = ip[8:]
190     return x
191
192 def to_ipv4(ip):
193     if is_ipv4(ip):
194         _valid_ipv4(ip)
195         return ip
196     return ipv6_to_ipv4(ip)
197
198 def is_ipv4(ip):
199     return ip.find(':') < 0
200
201 def _valid_ipv4(ip):
202     ip = ip.split('.')
203     if len(ip) != 4:
204         raise ValueError
205     for i in ip:
206         chr(int(i))
207
208 def is_valid_ip(ip):
209     try:
210         if not ip:
211             return False
212         if is_ipv4(ip):
213             _valid_ipv4(ip)
214             return True
215         to_bitfield_ipv6(ip)
216         return True
217     except:
218         return False