From: Razvan Deaconescu Date: Sat, 27 Aug 2011 18:02:22 +0000 (+0300) Subject: ppf/new: Add methods for verbose and status line parsing. X-Git-Url: http://p2p-next.cs.pub.ro/gitweb/?a=commitdiff_plain;h=5c05dc997ec3801639865a0195ade29a1f62d235;p=cs-p2p-next.git ppf/new: Add methods for verbose and status line parsing. --- diff --git a/ppf/new/parsing.py b/ppf/new/parsing.py index 6b5c57e..18fefb9 100644 --- a/ppf/new/parsing.py +++ b/ppf/new/parsing.py @@ -11,6 +11,7 @@ import os import os.path import re import datetime +import time import logging import socket @@ -27,7 +28,7 @@ logger.setLevel(logging.DEBUG) # Create console handler and set level to ERROR. ch = logging.StreamHandler() -ch.setLevel(logging.ERROR) +ch.setLevel(logging.DEBUG) # Create formatter. formatter = logging.Formatter('%(filename)s:%(lineno)s - %(levelname)s: %(message)s') @@ -79,6 +80,9 @@ class LibtorrentLogParser(SessionLogParser): self.have_parsed = [] self.f = None # handler to file being parsed + self.verbose_log_ip = None + self.verbose_log_port = None + self.log_year = "2009" for entry in os.listdir(self.path): entry_path = os.path.join(self.path, entry) @@ -152,7 +156,9 @@ class LibtorrentLogParser(SessionLogParser): return self.message_list.pop() def read_next_line(self): - self.f.readline() + self.line = self.f.readline() + # Remove new line if existent. + self.line = re.sub(r'\n$', '', self.line) self.parse_log_line(self.line) def is_line_empty(self): @@ -161,9 +167,12 @@ class LibtorrentLogParser(SessionLogParser): def get_next_message_from_line(self): try: msg = self.pop_message() + logger.debug("Popped message type %s." %(msg.__class__.__name__)) except IndexError, e: msg = None + return msg + def get_next_message_from_file(self): while True: msg = self.get_next_message_from_line() @@ -198,16 +207,33 @@ class LibtorrentLogParser(SessionLogParser): return msg def get_log_line_type(self, line): - return (None, None) - - def parse_status_log_line(self, line): - pass - - def parse_peer_status_log_line(self, line): - pass + logger.debug("line is: %s" %(line)) + if re.search(r'^ps:', line): + return ("status", None) + + if re.search(r'^--Peers:', line): + return ("peer-status", None) + + if re.search(r'(<==|==>)\s+CHOKE', line): + return ("verbose", "CHOKE") + if re.search(r'(<==|==>)\s+UNCHOKE', line): + return ("verbose", "UNCHOKE") + if re.search(r'(<==|==>)\s+INTERESTED', line): + return ("verbose", "INTERESTED") + if re.search(r'(<==|==>)\s+NOT_INTERESTED', line): + return ("verbose", "NOT_INERESTED") + if re.search(r'(<==|==>)\s+HAVE', line): + return ("verbose", "BITFIELD") + if re.search(r'(<==|==>)\s+REQUEST', line): + return ("verbose", "REQUEST") + if re.search(r'(<==|==>)\s+PIECE', line): + return ("verbose", "PIECE") + if re.search(r'(<==|==>)\s+CANCEL', line): + return ("verbose", "CANCEL") + if re.search(r'(<==|==>)\s+DHT_PORT', line): + return ("verbose", "DHT_PORT") - def parse_verbose_log_line(self, line, verbose_line_type): - pass + return (None, None) def parse_log_line(self, line): """Parse a log line and establish its type. @@ -225,6 +251,192 @@ class LibtorrentLogParser(SessionLogParser): elif major_line_type == "verbose": self.parse_verbose_log_line(line, minor_line_type) + def parse_status_log_line(self, line): + """Parse Libtorrent status line. + + Sample: + ps: 1, dht: 8 <> dl: 119.51kb/s, ul: 3.63kb/s <> dld: 1mb, uld: 0mb, size: 698mb <> eta: 1h 39m 37s + """ + msg = storage.StatusMessage() + # TODO: Fix timestamp. + msg.timestamp = None + + string_array = re.split("\ *[,<>]+\ *", line) + for string in string_array: + pair = re.split("\ *:\ *", string) + if pair[0] == "ps": + msg.num_peers = self.canon_num_peers(pair[1]) + elif pair[0] == "dht": + msg.num_dht_peers = self.canon_dht(pair[1]) + elif pair[0] == "dl": + msg.download_speed = self.canon_download_speed(pair[1]) + elif pair[0] == "ul": + msg.upload_speed = self.canon_upload_speed(pair[1]) + elif pair[0] == "dld": + msg.download_size = self.canon_download_size(pair[1]) + elif pair[0] == "uld": + msg.upload_size = self.canon_upload_size(pair[1]) + elif pair[0] == "size": + pass + elif pair[0] == "eta": + eta = self.canon_eta(pair[1]) + msg.eta_seconds = eta.days * 24 * 3600 + eta.seconds + + # Add to message_list. + self.message_list.append(msg) + + def canon_num_peers(self, non_canon_value): + return int(non_canon_value) + + def canon_dht(self, non_canon_value): + return int(non_canon_value) + + def canon_download_speed(self, non_canon_value): + """119.51kb/s -> 119""" + return int(float(non_canon_value.strip("kb/s"))) + + def canon_upload_speed(self, non_canon_value): + """12119.51kb/s -> 12119""" + return int(float(non_canon_value.strip("kb/s"))) + + def canon_download_size(self, non_canon_value): + """698mb -> 698*1024""" + return int(non_canon_value.strip("mb")) * 1024 + + def canon_upload_size(self, non_canon_value): + """492mb -> 492*1024""" + return int(non_canon_value.strip("mb")) * 1024 + + def canon_eta(self, non_canon_value): + """1h 38m 37s -> [0, 1, 38, 37]; 3d 5h 24m 34s -> [3, 5, 24, 34]""" + eta_string_array = re.split('\ *[dhms]\ *', non_canon_value) + eta_string_array.remove('') + eta = [] + + for i in range(0, len(eta_string_array)): + eta.append(int(eta_string_array[i])) + for i in range(len(eta_string_array), 4): + eta.insert(0, 0) + + eta_td = datetime.timedelta(eta[0], eta[3], 0, 0, eta[2], eta[1], 0) + return eta_td + + def parse_peer_status_log_line(self, line): + pass + + def string_to_timestamp(self, date_string): + """Convert string 'Mon DD HH:MM:SS' to datetime.""" + try: + my_time = time.strptime(date_string + " %s" + % (self.log_year), + "%b %d %H:%M:%S %Y") + my_date = datetime.datetime(my_time[0], my_time[1], + my_time[2], my_time[3], my_time[4], + my_time[5], my_time[6]) + except ValueError: + return None + + return my_date + + def parse_verbose_log_line(self, line, message_type): + if self.verbose_log_ip == None or self.verbose_log_port == None: + # TODO: Initiate verbose_log_ip and verbose_log_port. + pass + msg = storage.VerboseMessage() + msg.peer_ip = self.verbose_log_ip + msg.peer_port = self.verbose_log_port + msg.message_type = message_type + + (msg.timestamp, msg.transfer_direction) = self.get_verbose_log_line_common_fields(line) + (msg.index, msg.begin, msg.length, msg.listen_port) = self.get_verbose_log_line_noncommon_fields(line, message_type) + + # Add to message_list. + self.message_list.append(msg) + + def get_verbose_log_line_common_fields(self, line): + if "==>" in line: + transfer_direction = "send" + elif "<==" in line: + transfer_direction = "receive" + else: + transfer_direction = None + + parts = re.split(r'[<=>]+', line) + timestamp = self.string_to_timestamp(parts[0].strip()) + + return (timestamp, transfer_direction) + + def get_verbose_log_line_noncommon_fields(self, line, message_type): + if message_type == "CHOKE" or message_type == "UNCHOKE" or \ + message_type == "INTERESTED" or \ + message_type == "NOT_INTERESTED" or \ + message_type == "BITFIELD": + return (0, 0, 0, 0) + elif message_type == "HAVE": + return self.parse_have(line) + elif message_type == "CANCEL": + return self.parse_cancel(line) + elif message_type == "REQUEST": + return self.parse_request(line) + elif message_type == "PIECE": + return self.parse_piece(line) + elif message_type == "DHT_PORT": + return self.parse_dht_port(line) + + # Shouldn't get here. + return (0, 0, 0, 0) + + def parse_have(self, line): + """Parse have line in libtorrent log file. + + Sample line: + Jan 08 22:20:48 <== HAVE [ piece: 839] + """ + parts = re.split("[\[\]<=>]+", line) + index = int("0x" + re.split(":", parts[2].strip())[1].strip(), 16) + return (index, 0, 0, 0) + + def parse_request(self, line): + """Parse request line in libtorrent log file. + + Sample line: + Jan 08 22:39:50 <== REQUEST [ piece: 6cc | s: 14000 | l: 4000 ] + """ + parts = re.split("[\[\]|<=>]+", line) + index = int("0x" + re.split(":", parts[2])[1].strip(), 16) + begin = int("0x" + re.split(":", parts[3])[1].strip(), 16) + length = int("0x" + re.split(":", parts[4])[1].strip(), 16) + return (index, begin, length, 0) + + def parse_piece(self, line): + """Parse piece line in libtorrent log file. + + Sample line: + Jan 08 22:39:50 ==> PIECE [ piece: 5c6 | s: 24000 | l: 4000 ] + """ + parts = re.split("[\[\]|<=>]+", line) + index = int("0x" + re.split(":", parts[2])[1].strip(), 16) + begin = int("0x" + re.split(":", parts[3])[1].strip(), 16) + length = int("0x" + re.split(":", parts[4])[1].strip(), 16) + return (index, begin, length, 0) + + def parse_port(self, line): + """Parse DHT port line in libtorrent log file. + + Sample line: + Jan 08 22:20:48 ==> DHT_PORT [ 50200 ] + """ + parts = re.split("[\[\]<=>]+", line) + index = 0 + begin = 0 + length = 0 + if transfer_direction_id == log_msg_dir["RECEIVE"]: + port = int("0x" + re.split(":", parts[2])[1].strip(), 16) + else: + port = int(parts[2].strip()) + + return (0, 0, 0, port) + class TriblerLogParser(SessionLogParser): """