ppf/new: Complete implementation of TriblerLogParser.
authorRazvan Deaconescu <razvan.deaconescu@cs.pub.ro>
Thu, 1 Sep 2011 18:00:04 +0000 (21:00 +0300)
committerRazvan Deaconescu <razvan.deaconescu@cs.pub.ro>
Thu, 1 Sep 2011 18:00:52 +0000 (21:00 +0300)
There are still issues in the current implementation. It's kind of
sloppy just to get things working in the first phase.

ppf/new/parsing.py

index 65eff8c..799c56d 100644 (file)
@@ -238,6 +238,8 @@ class LibtorrentLogParser(SessionLogParser):
         if re.search(r'(<==|==>)\s+NOT_INTERESTED', line):
             return ("verbose", "NOT_INERESTED")
         if re.search(r'(<==|==>)\s+HAVE', line):
+            return ("verbose", "HAVE")
+        if re.search(r'(<==|==>)\s+BITFIELD', line):
             return ("verbose", "BITFIELD")
         if re.search(r'(<==|==>)\s+REQUEST', line):
             return ("verbose", "REQUEST")
@@ -305,36 +307,8 @@ class LibtorrentLogParser(SessionLogParser):
         logger.debug("Add new status message.")
         self.message_list.append(msg)
 
-    def canon_num_peers(self, non_canon_value):
-        return int(non_canon_value)
-
-    def canon_dht(self, non_canon_value):
-        return int(non_canon_value)
-
-    def canon_speed(self, non_canon_value):
-        """119.51kb/s -> 119"""
-        return int(float(non_canon_value.strip("kb/s")))
-
-    def canon_size(self, non_canon_value):
-        """698mb -> 698*1024"""
-        return int(non_canon_value.strip("mb")) * 1024
-
-    def canon_eta(self, non_canon_value):
-        """1h 38m 37s -> [0, 1, 38, 37]; 3d 5h 24m 34s -> [3, 5, 24, 34]"""
-        eta_string_array = re.split('\ *[dhms]\ *', non_canon_value)
-        eta_string_array.remove('')
-        eta = []
-
-        for i in range(0, len(eta_string_array)):
-            eta.append(int(eta_string_array[i]))
-        for i in range(len(eta_string_array), 4):
-            eta.insert(0, 0)
-
-        eta_td = datetime.timedelta(eta[0], eta[3], 0, 0, eta[2], eta[1], 0)
-        return eta_td
-
     def parse_peer_status_log_line(self, line):
-        """Parse Libtorrent status line.
+        """Parse Libtorrent peer status line.
 
         Sample:
             --Peers: (Aug 14 17:22:40) [ ip: 10.1.7.5:33289, dl: 23.6kb/s, ul: 94.91kb/s ][ ip: 10.1.6.5:47254, dl: 55.61kb/s, ul: 100.72kb/s ][ ip: 10.1.4.5:6881, dl: 93.99kb/s, ul: 100.11kb/s ][ ip: 10.1.10.5:48923, dl: 57.42kb/s, ul: 205.74kb/s ]")
@@ -474,6 +448,34 @@ class LibtorrentLogParser(SessionLogParser):
 
         return (0, 0, 0, port)
 
+    def canon_num_peers(self, non_canon_value):
+        return int(non_canon_value)
+
+    def canon_dht(self, non_canon_value):
+        return int(non_canon_value)
+
+    def canon_speed(self, non_canon_value):
+        """119.51kb/s -> 119"""
+        return int(float(non_canon_value.strip("kb/s")))
+
+    def canon_size(self, non_canon_value):
+        """698mb -> 698*1024"""
+        return int(non_canon_value.strip("mb")) * 1024
+
+    def canon_eta(self, non_canon_value):
+        """1h 38m 37s -> [0, 1, 38, 37]; 3d 5h 24m 34s -> [3, 5, 24, 34]"""
+        eta_string_array = re.split('\ *[dhms]\ *', non_canon_value)
+        eta_string_array.remove('')
+        eta = []
+
+        for i in range(0, len(eta_string_array)):
+            eta.append(int(eta_string_array[i]))
+        for i in range(len(eta_string_array), 4):
+            eta.insert(0, 0)
+
+        eta_td = datetime.timedelta(eta[0], eta[3], 0, 0, eta[2], eta[1], 0)
+        return eta_td
+
 
 class TriblerLogParser(SessionLogParser):
     """
@@ -496,6 +498,7 @@ class TriblerLogParser(SessionLogParser):
         self.line = self.f.readline()
         # Remove newline character if existent.
         self.line = re.sub(r'\n$', '', self.line)
+        self.line = self.line.strip()
         self.parse_log_line(self.line)
 
     def is_line_empty(self):
@@ -532,6 +535,29 @@ class TriblerLogParser(SessionLogParser):
 
     def get_log_line_type(self, line):
         logger.debug("line is: %s" %(line))
+        if re.search(r'DLSTATUS_', line):
+            return ("status", None)
+
+        if re.search(r'^--Peers:', line):
+            return ("peer-status", None)
+
+        if re.search(r'CHOKE from', line):
+            return ("verbose", "CHOKE")
+        if re.search(r'UNCHOKE from', line):
+            return ("verbose", "UNCHOKE")
+        if re.search(r'INTERESTED', line):
+            return ("verbose", "INTERESTED")
+        if re.search(r'BITFIELD', line):
+            return ("verbose", "BITFIELD")
+        if re.search(r'HAVE\(', line):
+            return ("verbose", "HAVE")
+        if re.search(r'REQUEST\(', line) or re.search(r'new_request', line):
+            return ("verbose", "REQUEST")
+        if re.search(r'PIECE\(', line):
+            return ("verbose", "PIECE")
+        if re.search(r'sent cancel', line):
+            return ("verbose", "CANCEL")
+
         return (None, None)
 
     def parse_log_line(self, line):
@@ -551,13 +577,210 @@ class TriblerLogParser(SessionLogParser):
             self.parse_verbose_log_line(line, minor_line_type)
 
     def parse_status_log_line(self, line):
-        """Parse Tribler status line."""
-        pass
+        """Parse Tribler status line.
+
+        Sample line:
+            03-11-2009 12:19:04   aqua.mpeg DLSTATUS_DOWNLOADING 93.89% None up     0.00KB/s down  5440.21KB/s eta 1.67072531777 peers 2
+        """
+
+        msg = storage.StatusMessage()
+
+        parts = re.split(r'\s+', line)
+
+        # Get timestamp and transform it in datetime format.
+        date_string = "%s %s" %(parts[0], parts[1])
+        timestamp = datetime.datetime.strptime(date_string,
+                "%d-%m-%Y %H:%M:%S")
+
+        i = 3
+        while i < len(parts):
+            if parts[i] == "peers":
+                msg.num_peers = self.canon_num_peers(parts[i+1])
+                i = i + 2
+                continue
+            if parts[i] == "down":
+                msg.download_speed = self.canon_speed(parts[i+1])
+                i = i + 2
+                continue
+            if parts[i] == "up":
+                msg.upload_speed = self.canon_speed(parts[i+1])
+                i = i + 2
+                continue
+            if parts[i] == "DLSTATUS_DOWNLOADING" or \
+                    parts[i] == "DLSTATUS_SEEDING":
+                # TODO: Tribler download status is percentage.
+                msg.download_size = self.canon_size(parts[i+1])
+                i = i + 2
+                continue
+            if parts[i] == "eta":
+                msg.eta = self.canon_eta(parts[i+1])
+                i = i + 2
+                continue
+            i = i + 1
+
+        # Add to message_list.
+        logger.debug("Add new status message.")
+        self.message_list.append(msg)
 
     def parse_peer_status_log_line(self, line):
-        """Parse Tribler peer status line."""
-        pass
+        """Parse Tribler peer status line.
+
+        Sample line:
+        --Peers: (Apr 14 00:43:25)  [ ip: 188.27.206.23, dl: 0.00KB/s, ul: 182.21KB/s ]
+        """
+        parts = re.split(r'[\(\)\[\]]+', line)
+
+        date_string = parts[1].strip() + " %s" %(self.start_time.year)
+        timestamp = datetime.datetime.strptime(date_string,
+                "%b %d %H:%M:%S %Y")
+
+        for i in range(3, len(parts)):
+            msg = self.get_peer_status_message_from_part(re.sub(r'\s', '', parts[i]))
+            if msg:
+                msg.timestamp = timestamp
+                logger.debug("Add new peer status message.")
+                self.message_list.append(msg)
+
+    def get_peer_status_message_from_part(self, part):
+        """Part sample ip:10.1.7.5:33289,dl:23.6KB/s,ul:94.91KB/s."""
+        subparts = re.split(r',', part.strip())
+        if len(subparts) != 3:
+            return None
+
+        msg = storage.PeerStatusMessage()
+        for s in subparts:
+            pair = re.split(r':', s)
+            if pair[0] == 'ip':
+                msg.peer_ip = pair[1]
+                msg.peer_port = pair[2]
+            elif pair[0] == 'dl':
+                msg.download_speed = self.canon_speed(pair[1])
+            elif pair[0] == 'ul':
+                msg.upload_speed = self.canon_speed(pair[1])
+
+        return msg
 
     def parse_verbose_log_line(self, line, message_type):
-        """Parse Tribler verbose line."""
-        pass
+        """Parse Tribler verbose line.
+
+        Sample line:
+        20-10-2009 12:56:32   Downloader: new_request 95 212992 16384 to 141.85.37.41 14398
+        """
+        msg = storage.VerboseMessage()
+        msg.message_type = message_type
+
+        (msg.timestamp, msg.transfer_direction, msg.peer_ip, msg.peer_port) = self.get_verbose_log_line_common_fields(line)
+        (msg.index, msg.begin, msg.length, msg.listen_port) = self.get_verbose_log_line_noncommon_fields(line, message_type)
+
+        # Add to message_list.
+        logger.debug("Add new verbose message.")
+        self.message_list.append(msg)
+
+    def get_verbose_log_line_common_fields(self, line):
+        # FIXME: Take into account message line to establish direction.
+        transfer_direction = None
+
+        # Convert string 'DD-MM-YYYY HH:MM:SS' to datetime.
+        parts = re.split(r'\s+', line)
+        date_string = "%s %s" %(parts[0], parts[1])
+        timestamp = datetime.datetime.strptime(date_string,
+                "%d-%m-%Y %H:%M:%S")
+        l = len(parts)
+        peer_ip = parts[l-1]
+        peer_port = None
+
+        return (timestamp, transfer_direction, peer_ip, peer_port)
+
+    def get_verbose_log_line_noncommon_fields(self, line, message_type):
+        if message_type == "CHOKE" or message_type == "UNCHOKE" or \
+                message_type == "INTERESTED" or \
+                message_type == "NOT_INTERESTED" or \
+                message_type == "BITFIELD":
+                    return (0, 0, 0, 0)
+        elif message_type == "HAVE":
+            return self.parse_have(line)
+        elif message_type == "CANCEL":
+            return self.parse_cancel(line)
+        elif message_type == "REQUEST":
+            return self.parse_request(line)
+        elif message_type == "PIECE":
+            return self.parse_piece(line)
+        elif message_type == "DHT_PORT":
+            return self.parse_dht_port(line)
+
+        # Shouldn't get here.
+        return (0, 0, 0, 0)
+
+    def parse_have(self, line):
+        """Parse HAVE line in Tribler log file.
+
+        Sample line:
+            14-11-2009 23:11:13   connecter: Got HAVE( 14 ) from 141.85.37.41
+        """
+        parts = re.split(r'\s+', line)
+        index = int(parts[5])
+        return (index, 0, 0, 0)
+
+    def parse_request(self, line):
+        """Parse REQUEST line in TRIBLER log file.
+
+        Sample lines:
+            20-10-2009 12:56:39   Downloader: new_request 52 98304 16384 to 141.85.37.41 14398
+            27-11-2009 18:01:22   connecter: Got REQUEST( 1218 ) from 87.0.15.75
+        """
+        parts = re.split(r'\s+', line)
+
+        if re.search("new_request", line):
+            # Send request message.
+            index = int(parts[4])
+            begin = int(parts[5])
+            length = int(parts[6])
+            # TODO: Parse port.
+            #peer_port = int(parts[9])
+        else:
+            # Receive request message.
+            index = int(parts[5])
+            peer_ip = parts[8]
+
+        return (index, begin, length, 0)
+
+    def parse_piece(line):
+        """Parse PIECE line in TRIBLER log file.
+
+        Sample line:
+            14-11-2009 23:11:13   connecter: Got PIECE( 141 ) from 141.85.37.41
+        """
+        parts = re.split(r'\s+', line)
+        index = int(parts[5])
+        return (index, begin, length, 0)
+
+    def parse_cancel(line):
+        """Parse CANCEL line in libtorrent log file.
+
+        Sample line:
+            14-11-2009 23:11:24   sent cancel: 130: 114688-131072
+        """
+        # FIXME: proper handling.
+        parts = re.split(r'\s+', line)
+        index = int(parts[8])
+        begin = int(parts[9])
+        length = int(parts[10]) - begin
+        return (index, begin, length, 0)
+
+    def canon_num_peers(self, non_canon_value):
+        return int(non_canon_value)
+
+    def canon_dht(self, non_canon_value):
+        return int(non_canon_value)
+
+    def canon_speed(self, non_canon_value):
+        """119.51KB/s -> 119"""
+        return int(float(non_canon_value.strip("KB/s")))
+
+    def canon_size(self, non_canon_value):
+        """98% -> 98"""
+        return int(float(non_canon_value.strip("%")))
+
+    def canon_eta(self, non_canon_value):
+        """26.456787 -> 26 (seconds)"""
+        return int(float(non_canon_value))