instrumentation: add next-share/
[cs-p2p-next.git] / instrumentation / next-share / BaseLib / Core / BitTornado / iprangeparse.py
1 # Written by John Hoffman
2 # see LICENSE.txt for license information
3
4 from bisect import bisect, insort
5
6 try:
7     True
8 except:
9     True = 1
10     False = 0
11     bool = lambda x: not not x
12
13
14 def to_long_ipv4(ip):
15     ip = ip.split('.')
16     if len(ip) != 4:
17         raise ValueError, "bad address"
18     b = 0L
19     for n in ip:
20         b *= 256
21         b += int(n)
22     return b
23
24
25 def to_long_ipv6(ip):
26     if not ip:
27         raise ValueError, "bad address"
28     if ip == '::':      # boundary handling
29         ip = ''
30     elif ip[:2] == '::':
31         ip = ip[1:]
32     elif ip[0] == ':':
33         raise ValueError, "bad address"
34     elif ip[-2:] == '::':
35         ip = ip[:-1]
36     elif ip[-1] == ':':
37         raise ValueError, "bad address"
38
39     b = []
40     doublecolon = False
41     for n in ip.split(':'):
42         if n == '':     # double-colon
43             if doublecolon:
44                 raise ValueError, "bad address"
45             doublecolon = True
46             b.append(None)
47             continue
48         if n.find('.') >= 0: # IPv4
49             n = n.split('.')
50             if len(n) != 4:
51                 raise ValueError, "bad address"
52             for i in n:
53                 b.append(int(i))
54             continue
55         n = ('0'*(4-len(n))) + n
56         b.append(int(n[:2], 16))
57         b.append(int(n[2:], 16))
58     bb = 0L
59     for n in b:
60         if n is None:
61             for i in xrange(17-len(b)):
62                 bb *= 256
63             continue
64         bb *= 256
65         bb += n
66     return bb
67
68 ipv4addrmask = 65535L*256*256*256*256
69
70 class IP_List:
71     def __init__(self):
72         self.ipv4list = []  # starts of ranges
73         self.ipv4dict = {}  # start: end of ranges
74         self.ipv6list = []  # "
75         self.ipv6dict = {}  # "
76
77     def __nonzero__(self):
78         return bool(self.ipv4list or self.ipv6list)
79
80
81     def append(self, ip_beg, ip_end = None):
82         if ip_end is None:
83             ip_end = ip_beg
84         else:
85             assert ip_beg <= ip_end
86         if ip_beg.find(':') < 0:        # IPv4
87             ip_beg = to_long_ipv4(ip_beg)
88             ip_end = to_long_ipv4(ip_end)
89             l = self.ipv4list
90             d = self.ipv4dict
91         else:
92             ip_beg = to_long_ipv6(ip_beg)
93             ip_end = to_long_ipv6(ip_end)
94             bb = ip_beg % (256*256*256*256)
95             if bb == ipv4addrmask:
96                 ip_beg -= bb
97                 ip_end -= bb
98                 l = self.ipv4list
99                 d = self.ipv4dict
100             else:
101                 l = self.ipv6list
102                 d = self.ipv6dict
103
104         pos = bisect(l, ip_beg)-1
105         done = pos < 0
106         while not done:
107             p = pos
108             while p < len(l):
109                 range_beg = l[p]
110                 if range_beg > ip_end+1:
111                     done = True
112                     break
113                 range_end = d[range_beg]
114                 if range_end < ip_beg-1:
115                     p += 1
116                     if p == len(l):
117                         done = True
118                         break
119                     continue
120                 # if neither of the above conditions is true, the ranges overlap
121                 ip_beg = min(ip_beg, range_beg)
122                 ip_end = max(ip_end, range_end)
123                 del l[p]
124                 del d[range_beg]
125                 break
126
127         insort(l, ip_beg)
128         d[ip_beg] = ip_end
129
130
131     def includes(self, ip):
132         if not (self.ipv4list or self.ipv6list):
133             return False
134         if ip.find(':') < 0:        # IPv4
135             ip = to_long_ipv4(ip)
136             l = self.ipv4list
137             d = self.ipv4dict
138         else:
139             ip = to_long_ipv6(ip)
140             bb = ip % (256*256*256*256)
141             if bb == ipv4addrmask:
142                 ip -= bb
143                 l = self.ipv4list
144                 d = self.ipv4dict
145             else:
146                 l = self.ipv6list
147                 d = self.ipv6dict
148         for ip_beg in l[bisect(l, ip)-1:]:
149             if ip == ip_beg:
150                 return True
151             ip_end = d[ip_beg]
152             if ip > ip_beg and ip <= ip_end:
153                 return True
154         return False
155
156
157     # reads a list from a file in the format 'whatever:whatever:ip-ip'
158     # (not IPv6 compatible at all)
159     def read_rangelist(self, file):
160         f = open(file, 'r')
161         while 1:
162             line = f.readline()
163             if not line:
164                 break
165             line = line.strip()
166             if not line or line[0] == '#':
167                 continue
168             line = line.split(':')[-1]
169             try:
170                 ip1, ip2 = line.split('-')
171             except:
172                 ip1 = line
173                 ip2 = line
174             try:
175                 self.append(ip1.strip(), ip2.strip())
176             except:
177                 print '*** WARNING *** could not parse IP range: '+line
178         f.close()
179
180 def is_ipv4(ip):
181     return ip.find(':') < 0
182
183 def is_valid_ip(ip):
184     try:
185         if is_ipv4(ip):
186             a = ip.split('.')
187             assert len(a) == 4
188             for i in a:
189                 chr(int(i))
190             return True
191         to_long_ipv6(ip)
192         return True
193     except:
194         return False