From cac8a3fa847ad96afef8c4a75ed23f146afc3f14 Mon Sep 17 00:00:00 2001 From: Razvan Deaconescu Date: Thu, 1 Sep 2011 21:00:04 +0300 Subject: [PATCH] ppf/new: Complete implementation of TriblerLogParser. There are still issues in the current implementation. It's kind of sloppy just to get things working in the first phase. --- ppf/new/parsing.py | 293 +++++++++++++++++++++++++++++++++++++++------ 1 file changed, 258 insertions(+), 35 deletions(-) diff --git a/ppf/new/parsing.py b/ppf/new/parsing.py index 65eff8c..799c56d 100644 --- a/ppf/new/parsing.py +++ b/ppf/new/parsing.py @@ -238,6 +238,8 @@ class LibtorrentLogParser(SessionLogParser): if re.search(r'(<==|==>)\s+NOT_INTERESTED', line): return ("verbose", "NOT_INERESTED") if re.search(r'(<==|==>)\s+HAVE', line): + return ("verbose", "HAVE") + if re.search(r'(<==|==>)\s+BITFIELD', line): return ("verbose", "BITFIELD") if re.search(r'(<==|==>)\s+REQUEST', line): return ("verbose", "REQUEST") @@ -305,36 +307,8 @@ class LibtorrentLogParser(SessionLogParser): logger.debug("Add new status message.") self.message_list.append(msg) - def canon_num_peers(self, non_canon_value): - return int(non_canon_value) - - def canon_dht(self, non_canon_value): - return int(non_canon_value) - - def canon_speed(self, non_canon_value): - """119.51kb/s -> 119""" - return int(float(non_canon_value.strip("kb/s"))) - - def canon_size(self, non_canon_value): - """698mb -> 698*1024""" - return int(non_canon_value.strip("mb")) * 1024 - - def canon_eta(self, non_canon_value): - """1h 38m 37s -> [0, 1, 38, 37]; 3d 5h 24m 34s -> [3, 5, 24, 34]""" - eta_string_array = re.split('\ *[dhms]\ *', non_canon_value) - eta_string_array.remove('') - eta = [] - - for i in range(0, len(eta_string_array)): - eta.append(int(eta_string_array[i])) - for i in range(len(eta_string_array), 4): - eta.insert(0, 0) - - eta_td = datetime.timedelta(eta[0], eta[3], 0, 0, eta[2], eta[1], 0) - return eta_td - def parse_peer_status_log_line(self, line): - """Parse Libtorrent status line. + """Parse Libtorrent peer status line. Sample: --Peers: (Aug 14 17:22:40) [ ip: 10.1.7.5:33289, dl: 23.6kb/s, ul: 94.91kb/s ][ ip: 10.1.6.5:47254, dl: 55.61kb/s, ul: 100.72kb/s ][ ip: 10.1.4.5:6881, dl: 93.99kb/s, ul: 100.11kb/s ][ ip: 10.1.10.5:48923, dl: 57.42kb/s, ul: 205.74kb/s ]") @@ -474,6 +448,34 @@ class LibtorrentLogParser(SessionLogParser): return (0, 0, 0, port) + def canon_num_peers(self, non_canon_value): + return int(non_canon_value) + + def canon_dht(self, non_canon_value): + return int(non_canon_value) + + def canon_speed(self, non_canon_value): + """119.51kb/s -> 119""" + return int(float(non_canon_value.strip("kb/s"))) + + def canon_size(self, non_canon_value): + """698mb -> 698*1024""" + return int(non_canon_value.strip("mb")) * 1024 + + def canon_eta(self, non_canon_value): + """1h 38m 37s -> [0, 1, 38, 37]; 3d 5h 24m 34s -> [3, 5, 24, 34]""" + eta_string_array = re.split('\ *[dhms]\ *', non_canon_value) + eta_string_array.remove('') + eta = [] + + for i in range(0, len(eta_string_array)): + eta.append(int(eta_string_array[i])) + for i in range(len(eta_string_array), 4): + eta.insert(0, 0) + + eta_td = datetime.timedelta(eta[0], eta[3], 0, 0, eta[2], eta[1], 0) + return eta_td + class TriblerLogParser(SessionLogParser): """ @@ -496,6 +498,7 @@ class TriblerLogParser(SessionLogParser): self.line = self.f.readline() # Remove newline character if existent. self.line = re.sub(r'\n$', '', self.line) + self.line = self.line.strip() self.parse_log_line(self.line) def is_line_empty(self): @@ -532,6 +535,29 @@ class TriblerLogParser(SessionLogParser): def get_log_line_type(self, line): logger.debug("line is: %s" %(line)) + if re.search(r'DLSTATUS_', line): + return ("status", None) + + if re.search(r'^--Peers:', line): + return ("peer-status", None) + + if re.search(r'CHOKE from', line): + return ("verbose", "CHOKE") + if re.search(r'UNCHOKE from', line): + return ("verbose", "UNCHOKE") + if re.search(r'INTERESTED', line): + return ("verbose", "INTERESTED") + if re.search(r'BITFIELD', line): + return ("verbose", "BITFIELD") + if re.search(r'HAVE\(', line): + return ("verbose", "HAVE") + if re.search(r'REQUEST\(', line) or re.search(r'new_request', line): + return ("verbose", "REQUEST") + if re.search(r'PIECE\(', line): + return ("verbose", "PIECE") + if re.search(r'sent cancel', line): + return ("verbose", "CANCEL") + return (None, None) def parse_log_line(self, line): @@ -551,13 +577,210 @@ class TriblerLogParser(SessionLogParser): self.parse_verbose_log_line(line, minor_line_type) def parse_status_log_line(self, line): - """Parse Tribler status line.""" - pass + """Parse Tribler status line. + + Sample line: + 03-11-2009 12:19:04 aqua.mpeg DLSTATUS_DOWNLOADING 93.89% None up 0.00KB/s down 5440.21KB/s eta 1.67072531777 peers 2 + """ + + msg = storage.StatusMessage() + + parts = re.split(r'\s+', line) + + # Get timestamp and transform it in datetime format. + date_string = "%s %s" %(parts[0], parts[1]) + timestamp = datetime.datetime.strptime(date_string, + "%d-%m-%Y %H:%M:%S") + + i = 3 + while i < len(parts): + if parts[i] == "peers": + msg.num_peers = self.canon_num_peers(parts[i+1]) + i = i + 2 + continue + if parts[i] == "down": + msg.download_speed = self.canon_speed(parts[i+1]) + i = i + 2 + continue + if parts[i] == "up": + msg.upload_speed = self.canon_speed(parts[i+1]) + i = i + 2 + continue + if parts[i] == "DLSTATUS_DOWNLOADING" or \ + parts[i] == "DLSTATUS_SEEDING": + # TODO: Tribler download status is percentage. + msg.download_size = self.canon_size(parts[i+1]) + i = i + 2 + continue + if parts[i] == "eta": + msg.eta = self.canon_eta(parts[i+1]) + i = i + 2 + continue + i = i + 1 + + # Add to message_list. + logger.debug("Add new status message.") + self.message_list.append(msg) def parse_peer_status_log_line(self, line): - """Parse Tribler peer status line.""" - pass + """Parse Tribler peer status line. + + Sample line: + --Peers: (Apr 14 00:43:25) [ ip: 188.27.206.23, dl: 0.00KB/s, ul: 182.21KB/s ] + """ + parts = re.split(r'[\(\)\[\]]+', line) + + date_string = parts[1].strip() + " %s" %(self.start_time.year) + timestamp = datetime.datetime.strptime(date_string, + "%b %d %H:%M:%S %Y") + + for i in range(3, len(parts)): + msg = self.get_peer_status_message_from_part(re.sub(r'\s', '', parts[i])) + if msg: + msg.timestamp = timestamp + logger.debug("Add new peer status message.") + self.message_list.append(msg) + + def get_peer_status_message_from_part(self, part): + """Part sample ip:10.1.7.5:33289,dl:23.6KB/s,ul:94.91KB/s.""" + subparts = re.split(r',', part.strip()) + if len(subparts) != 3: + return None + + msg = storage.PeerStatusMessage() + for s in subparts: + pair = re.split(r':', s) + if pair[0] == 'ip': + msg.peer_ip = pair[1] + msg.peer_port = pair[2] + elif pair[0] == 'dl': + msg.download_speed = self.canon_speed(pair[1]) + elif pair[0] == 'ul': + msg.upload_speed = self.canon_speed(pair[1]) + + return msg def parse_verbose_log_line(self, line, message_type): - """Parse Tribler verbose line.""" - pass + """Parse Tribler verbose line. + + Sample line: + 20-10-2009 12:56:32 Downloader: new_request 95 212992 16384 to 141.85.37.41 14398 + """ + msg = storage.VerboseMessage() + msg.message_type = message_type + + (msg.timestamp, msg.transfer_direction, msg.peer_ip, msg.peer_port) = self.get_verbose_log_line_common_fields(line) + (msg.index, msg.begin, msg.length, msg.listen_port) = self.get_verbose_log_line_noncommon_fields(line, message_type) + + # Add to message_list. + logger.debug("Add new verbose message.") + self.message_list.append(msg) + + def get_verbose_log_line_common_fields(self, line): + # FIXME: Take into account message line to establish direction. + transfer_direction = None + + # Convert string 'DD-MM-YYYY HH:MM:SS' to datetime. + parts = re.split(r'\s+', line) + date_string = "%s %s" %(parts[0], parts[1]) + timestamp = datetime.datetime.strptime(date_string, + "%d-%m-%Y %H:%M:%S") + l = len(parts) + peer_ip = parts[l-1] + peer_port = None + + return (timestamp, transfer_direction, peer_ip, peer_port) + + def get_verbose_log_line_noncommon_fields(self, line, message_type): + if message_type == "CHOKE" or message_type == "UNCHOKE" or \ + message_type == "INTERESTED" or \ + message_type == "NOT_INTERESTED" or \ + message_type == "BITFIELD": + return (0, 0, 0, 0) + elif message_type == "HAVE": + return self.parse_have(line) + elif message_type == "CANCEL": + return self.parse_cancel(line) + elif message_type == "REQUEST": + return self.parse_request(line) + elif message_type == "PIECE": + return self.parse_piece(line) + elif message_type == "DHT_PORT": + return self.parse_dht_port(line) + + # Shouldn't get here. + return (0, 0, 0, 0) + + def parse_have(self, line): + """Parse HAVE line in Tribler log file. + + Sample line: + 14-11-2009 23:11:13 connecter: Got HAVE( 14 ) from 141.85.37.41 + """ + parts = re.split(r'\s+', line) + index = int(parts[5]) + return (index, 0, 0, 0) + + def parse_request(self, line): + """Parse REQUEST line in TRIBLER log file. + + Sample lines: + 20-10-2009 12:56:39 Downloader: new_request 52 98304 16384 to 141.85.37.41 14398 + 27-11-2009 18:01:22 connecter: Got REQUEST( 1218 ) from 87.0.15.75 + """ + parts = re.split(r'\s+', line) + + if re.search("new_request", line): + # Send request message. + index = int(parts[4]) + begin = int(parts[5]) + length = int(parts[6]) + # TODO: Parse port. + #peer_port = int(parts[9]) + else: + # Receive request message. + index = int(parts[5]) + peer_ip = parts[8] + + return (index, begin, length, 0) + + def parse_piece(line): + """Parse PIECE line in TRIBLER log file. + + Sample line: + 14-11-2009 23:11:13 connecter: Got PIECE( 141 ) from 141.85.37.41 + """ + parts = re.split(r'\s+', line) + index = int(parts[5]) + return (index, begin, length, 0) + + def parse_cancel(line): + """Parse CANCEL line in libtorrent log file. + + Sample line: + 14-11-2009 23:11:24 sent cancel: 130: 114688-131072 + """ + # FIXME: proper handling. + parts = re.split(r'\s+', line) + index = int(parts[8]) + begin = int(parts[9]) + length = int(parts[10]) - begin + return (index, begin, length, 0) + + def canon_num_peers(self, non_canon_value): + return int(non_canon_value) + + def canon_dht(self, non_canon_value): + return int(non_canon_value) + + def canon_speed(self, non_canon_value): + """119.51KB/s -> 119""" + return int(float(non_canon_value.strip("KB/s"))) + + def canon_size(self, non_canon_value): + """98% -> 98""" + return int(float(non_canon_value.strip("%"))) + + def canon_eta(self, non_canon_value): + """26.456787 -> 26 (seconds)""" + return int(float(non_canon_value)) -- 2.20.1