2 Parsers for P2P logging information.
4 Built on previous work by Adriana Draghici, Razvan Deaconescu
7 2011, Razvan Deaconescu, razvan.deaconescu@cs.pub.ro
18 import storage # Use *Message classes.
21 # Logging code heavily inspired by Logging HOWTO documentation:
22 # http://docs.python.org/dev/howto/logging.html#configuring-logging
25 # Create logger; default logging level is DEBUG.
26 logger = logging.getLogger(__name__)
27 logger.setLevel(logging.DEBUG)
29 # Create console handler and set level to ERROR.
30 ch = logging.StreamHandler()
31 ch.setLevel(logging.WARNING)
34 formatter = logging.Formatter('%(filename)s:%(lineno)s - %(levelname)s: %(message)s')
36 # Add formatter to console handler.
37 ch.setFormatter(formatter)
39 # Add console handler to logger.
42 # Define ONE_SECOND as a datatime.timedelta object.
43 ONE_SECOND = datetime.timedelta(seconds=1)
46 class SessionLogParser(object):
48 Top-level class for parsing log file(s) for a given BitTorrent session.
51 def __init__(self, path):
53 # parsing: file currently being parsed
56 def get_next_message(self):
58 Find next message in log file/folder. May be status, peer status
60 Return None when all logs have been parsed.
64 def get_current_parsing_filename(self):
65 """Return the name of the log file being currently parsed."""
69 class LibtorrentLogParser(SessionLogParser):
71 libtorrent-rasterbar log folder parser.
74 def __init__(self, path, start_time_string, priority=None):
76 If priority == "verbose" parse verbose log files first. Else, parse
79 super(LibtorrentLogParser, self).__init__(path)
80 self.start_time = datetime.datetime.strptime(start_time_string,
83 # to_parse: list of files to be parsed
84 # have_parsed: list of files that have been parsed
88 self.f = None # handler to file being parsed
90 # IP address and port are parsed from verbose log file name.
91 self.verbose_log_ip = None
92 self.verbose_log_port = None
93 self.statmsg_count = 0
95 for entry in os.listdir(self.path):
96 entry_path = os.path.join(self.path, entry)
97 if os.path.isfile(entry_path):
98 # If entry is file and name is IP_PORT.log add it to list.
99 if self.is_verbose_log_filename(entry):
100 logger.debug("Add entry path %s." %(entry_path))
101 self.to_parse.append(entry_path)
103 status_file_path = os.path.join(self.path, "status.log")
104 # In case status file doesn't exist, skip it.
105 if os.access(status_file_path, os.F_OK) and \
106 os.access(status_file_path, os.R_OK):
107 # List functions as a stack. First files go to the end.
108 if priority == "verbose":
109 self.to_parse.insert(0, status_file_path)
111 self.to_parse.append(status_file_path)
113 self.open_next_file()
115 def get_to_parse_list(self):
118 def get_have_parsed_list(self):
119 return self.have_parsed
121 def verbose_log_filename_to_ip_port(self, filename):
122 r = re.compile(r'^[0-9]+(?:\.[0-9]+){3}_[0-9]+\.log$')
123 if not r.match(filename):
126 # Check for valid IP address and port.
127 a = re.split('_', filename)
129 port = int(a[1].split('.')[0])
132 if port <= 0 or port > 65535:
135 # Check valid IP address.
138 except socket.error, e:
143 def is_verbose_log_filename(self, filename):
144 if (self.verbose_log_filename_to_ip_port(filename) == (None, None)):
148 def open_next_file(self):
150 Open next log file from to_parse list.
151 Update have_parsed and parsing accordingly.
153 if self.parsing is None: # first call
157 self.have_parsed.append(self.parsing)
159 self.parsing = self.to_parse.pop()
160 self.f = open(self.parsing, 'r')
162 self.message_list = []
164 # Initiate self.verbose_log_ip and self.verbose_log_port if case.
165 (self.verbose_log_ip, self.verbose_log_port) = self.verbose_log_filename_to_ip_port(os.path.basename(self.parsing))
167 # TODO: Log this information somewhere for snapshotting purpose.
168 # In case an error occurs parsing would resume from that point.
170 def pop_message(self):
171 return self.message_list.pop()
173 def read_next_line(self):
174 self.line = self.f.readline()
175 # Remove new line if existent.
176 self.line = re.sub(r'\n$', '', self.line)
177 self.parse_log_line(self.line)
179 def is_line_empty(self):
180 return self.line == ""
182 def get_next_message_from_line(self):
184 msg = self.pop_message()
185 logger.debug("Popped message type %s." %(msg.__class__.__name__))
186 except IndexError, e:
191 def get_next_message_from_file(self):
193 msg = self.get_next_message_from_line()
195 self.read_next_line()
196 if self.is_line_empty():
205 def get_next_message(self):
206 """Go through all files in libtorrent log folder and parse them.
207 Return the next message available or None when all log files have
212 msg = self.get_next_message_from_file()
214 self.open_next_file()
218 except (ValueError, IOError, IndexError), e:
219 # In case of no more files, return None.
224 def get_log_line_type(self, line):
225 logger.debug("line is: %s" %(line))
226 if re.search(r'^ps:', line):
227 return ("status", None)
229 if re.search(r'^--Peers:', line):
230 return ("peer-status", None)
232 if re.search(r'(<==|==>)\s+CHOKE', line):
233 return ("verbose", "CHOKE")
234 if re.search(r'(<==|==>)\s+UNCHOKE', line):
235 return ("verbose", "UNCHOKE")
236 if re.search(r'(<==|==>)\s+INTERESTED', line):
237 return ("verbose", "INTERESTED")
238 if re.search(r'(<==|==>)\s+NOT_INTERESTED', line):
239 return ("verbose", "NOT_INERESTED")
240 if re.search(r'(<==|==>)\s+HAVE', line):
241 return ("verbose", "HAVE")
242 if re.search(r'(<==|==>)\s+BITFIELD', line):
243 return ("verbose", "BITFIELD")
244 if re.search(r'(<==|==>)\s+REQUEST', line):
245 return ("verbose", "REQUEST")
246 if re.search(r'(<==|==>)\s+PIECE', line):
247 return ("verbose", "PIECE")
248 if re.search(r'(<==|==>)\s+CANCEL', line):
249 return ("verbose", "CANCEL")
250 if re.search(r'(<==|==>)\s+DHT_PORT', line):
251 return ("verbose", "DHT_PORT")
255 def parse_log_line(self, line):
256 """Parse a log line and establish its type.
258 Type may be status, verbose or peer status.
259 Return message in line, in case of message line, or None in case
260 of no message line."""
262 # Check log line type and call appropriate method.
263 (major_line_type, minor_line_type) = self.get_log_line_type(line)
264 if major_line_type == "status":
265 self.parse_status_log_line(line)
266 elif major_line_type == "peer-status":
267 self.parse_peer_status_log_line(line)
268 elif major_line_type == "verbose":
269 self.parse_verbose_log_line(line, minor_line_type)
271 def parse_status_log_line(self, line):
272 """Parse Libtorrent status line.
275 ps: 1, dht: 8 <> dl: 119.51kb/s, ul: 3.63kb/s <> dld: 1mb, uld: 0mb, size: 698mb <> eta: 1h 39m 37s
277 msg = storage.StatusMessage()
279 # Timestamp is swarm start_time plus a number of seconds equal to
280 # the number of messages. It is presumed that each message is
281 # delivered periodically each second.
282 msg.timestamp = self.start_time + self.statmsg_count * ONE_SECOND
283 self.statmsg_count = self.statmsg_count + 1
285 string_array = re.split("\ *[,<>]+\ *", line)
286 for string in string_array:
287 pair = re.split("\ *:\ *", string)
289 msg.num_peers = self.canon_num_peers(pair[1])
290 elif pair[0] == "dht":
291 msg.num_dht_peers = self.canon_dht(pair[1])
292 elif pair[0] == "dl":
293 msg.download_speed = self.canon_speed(pair[1])
294 elif pair[0] == "ul":
295 msg.upload_speed = self.canon_speed(pair[1])
296 elif pair[0] == "dld":
297 msg.download_size = self.canon_size(pair[1])
298 elif pair[0] == "uld":
299 msg.upload_size = self.canon_size(pair[1])
300 elif pair[0] == "size":
302 elif pair[0] == "eta":
303 eta = self.canon_eta(pair[1])
304 msg.eta_seconds = eta.days * 24 * 3600 + eta.seconds
306 # Add to message_list.
307 logger.debug("Add new status message.")
308 self.message_list.append(msg)
310 def parse_peer_status_log_line(self, line):
311 """Parse Libtorrent peer status line.
314 --Peers: (Aug 14 17:22:40) [ ip: 10.1.7.5:33289, dl: 23.6kb/s, ul: 94.91kb/s ][ ip: 10.1.6.5:47254, dl: 55.61kb/s, ul: 100.72kb/s ][ ip: 10.1.4.5:6881, dl: 93.99kb/s, ul: 100.11kb/s ][ ip: 10.1.10.5:48923, dl: 57.42kb/s, ul: 205.74kb/s ]")
316 parts = re.split(r'[\(\)\[\]]+', line)
318 date_string = parts[1].strip() + " %s" %(self.start_time.year)
319 timestamp = datetime.datetime.strptime(date_string,
322 for i in range(3, len(parts)):
323 msg = self.get_peer_status_message_from_part(re.sub(r'\s', '', parts[i]))
325 msg.timestamp = timestamp
326 logger.debug("Add new peer status message.")
327 self.message_list.append(msg)
329 def get_peer_status_message_from_part(self, part):
330 """Part sample ip:10.1.7.5:33289,dl:23.6kb/s,ul:94.91kb/s."""
331 subparts = re.split(r',', part.strip())
332 if len(subparts) != 3:
335 msg = storage.PeerStatusMessage()
337 pair = re.split(r':', s)
339 msg.peer_ip = pair[1]
340 msg.peer_port = pair[2]
341 elif pair[0] == 'dl':
342 msg.download_speed = self.canon_speed(pair[1])
343 elif pair[0] == 'ul':
344 msg.upload_speed = self.canon_speed(pair[1])
348 def parse_verbose_log_line(self, line, message_type):
349 if self.verbose_log_ip == None or self.verbose_log_port == None:
350 logger.warning("No IP or port information for verbose message.")
352 msg = storage.VerboseMessage()
353 msg.peer_ip = self.verbose_log_ip
354 msg.peer_port = self.verbose_log_port
355 msg.message_type = message_type
357 (msg.timestamp, msg.transfer_direction) = self.get_verbose_log_line_common_fields(line)
358 (msg.index, msg.begin, msg.length, msg.listen_port) = self.get_verbose_log_line_noncommon_fields(line, message_type)
360 # Add to message_list.
361 logger.debug("Add new verbose message.")
362 self.message_list.append(msg)
364 def get_verbose_log_line_common_fields(self, line):
366 transfer_direction = "send"
368 transfer_direction = "receive"
370 transfer_direction = None
372 # Convert string 'Mon DD HH:MM:SS' to datetime.
373 parts = re.split(r'[<=>]+', line)
374 date_string = parts[0].strip() + " %s" %(self.start_time.year)
375 timestamp = datetime.datetime.strptime(date_string,
378 return (timestamp, transfer_direction)
380 def get_verbose_log_line_noncommon_fields(self, line, message_type):
381 if message_type == "CHOKE" or message_type == "UNCHOKE" or \
382 message_type == "INTERESTED" or \
383 message_type == "NOT_INTERESTED" or \
384 message_type == "BITFIELD":
386 elif message_type == "HAVE":
387 return self.parse_have(line)
388 elif message_type == "CANCEL":
389 return self.parse_cancel(line)
390 elif message_type == "REQUEST":
391 return self.parse_request(line)
392 elif message_type == "PIECE":
393 return self.parse_piece(line)
394 elif message_type == "DHT_PORT":
395 return self.parse_dht_port(line)
397 # Shouldn't get here.
400 def parse_have(self, line):
401 """Parse have line in libtorrent log file.
404 Jan 08 22:20:48 <== HAVE [ piece: 839]
406 parts = re.split("[\[\]<=>]+", line)
407 index = int("0x" + re.split(":", parts[2].strip())[1].strip(), 16)
408 return (index, 0, 0, 0)
410 def parse_request(self, line):
411 """Parse request line in libtorrent log file.
414 Jan 08 22:39:50 <== REQUEST [ piece: 6cc | s: 14000 | l: 4000 ]
416 parts = re.split("[\[\]|<=>]+", line)
417 index = int("0x" + re.split(":", parts[2])[1].strip(), 16)
418 begin = int("0x" + re.split(":", parts[3])[1].strip(), 16)
419 length = int("0x" + re.split(":", parts[4])[1].strip(), 16)
420 return (index, begin, length, 0)
422 def parse_piece(self, line):
423 """Parse piece line in libtorrent log file.
426 Jan 08 22:39:50 ==> PIECE [ piece: 5c6 | s: 24000 | l: 4000 ]
428 parts = re.split("[\[\]|<=>]+", line)
429 index = int("0x" + re.split(":", parts[2])[1].strip(), 16)
430 begin = int("0x" + re.split(":", parts[3])[1].strip(), 16)
431 length = int("0x" + re.split(":", parts[4])[1].strip(), 16)
432 return (index, begin, length, 0)
434 def parse_port(self, line):
435 """Parse DHT port line in libtorrent log file.
438 Jan 08 22:20:48 ==> DHT_PORT [ 50200 ]
440 parts = re.split("[\[\]<=>]+", line)
444 if transfer_direction_id == log_msg_dir["RECEIVE"]:
445 port = int("0x" + re.split(":", parts[2])[1].strip(), 16)
447 port = int(parts[2].strip())
449 return (0, 0, 0, port)
451 def canon_num_peers(self, non_canon_value):
452 return int(non_canon_value)
454 def canon_dht(self, non_canon_value):
455 return int(non_canon_value)
457 def canon_speed(self, non_canon_value):
458 """119.51kb/s -> 119"""
459 return int(float(non_canon_value.strip("kb/s")))
461 def canon_size(self, non_canon_value):
462 """698mb -> 698*1024"""
463 return int(non_canon_value.strip("mb")) * 1024
465 def canon_eta(self, non_canon_value):
466 """1h 38m 37s -> [0, 1, 38, 37]; 3d 5h 24m 34s -> [3, 5, 24, 34]"""
467 eta_string_array = re.split('\ *[dhms]\ *', non_canon_value)
468 eta_string_array.remove('')
471 for i in range(0, len(eta_string_array)):
472 eta.append(int(eta_string_array[i]))
473 for i in range(len(eta_string_array), 4):
476 eta_td = datetime.timedelta(eta[0], eta[3], 0, 0, eta[2], eta[1], 0)
480 class TriblerLogParser(SessionLogParser):
482 Tribler log file parser.
485 def __init__(self, path, start_time_string):
486 super(TriblerLogParser, self).__init__(path)
488 self.start_time = datetime.datetime.strptime(start_time_string,
490 self.f = open(path, 'rt')
491 self.statmsg_count = 0
492 self.message_list = []
494 def pop_message(self):
495 return self.message_list.pop()
497 def read_next_line(self):
498 self.line = self.f.readline()
499 # Remove newline character if existent.
500 self.line = re.sub(r'\n$', '', self.line)
501 self.line = self.line.strip()
502 self.parse_log_line(self.line)
504 def is_line_empty(self):
505 return self.line == ""
507 def get_next_message_from_line(self):
509 msg = self.pop_message()
510 logger.debug("Popped message type %s." %(msg.__class__.__name__))
511 except IndexError, e:
516 def get_next_message(self):
517 """Go through Tribler log file. All log messages (verbose, status)
518 are stored in the same file.
519 Return the next message available or None when all messages have
522 self.read_next_line()
524 msg = self.get_next_message_from_line()
526 self.read_next_line()
527 if self.is_line_empty():
536 def get_log_line_type(self, line):
537 logger.debug("line is: %s" %(line))
538 if re.search(r'DLSTATUS_', line):
539 return ("status", None)
541 if re.search(r'^--Peers:', line):
542 return ("peer-status", None)
544 if re.search(r'CHOKE from', line):
545 return ("verbose", "CHOKE")
546 if re.search(r'UNCHOKE from', line):
547 return ("verbose", "UNCHOKE")
548 if re.search(r'INTERESTED', line):
549 return ("verbose", "INTERESTED")
550 if re.search(r'BITFIELD', line):
551 return ("verbose", "BITFIELD")
552 if re.search(r'HAVE\(', line):
553 return ("verbose", "HAVE")
554 if re.search(r'REQUEST\(', line) or re.search(r'new_request', line):
555 return ("verbose", "REQUEST")
556 if re.search(r'PIECE\(', line):
557 return ("verbose", "PIECE")
558 if re.search(r'sent cancel', line):
559 return ("verbose", "CANCEL")
563 def parse_log_line(self, line):
564 """Parse a log line and establish its type.
566 Type may be status, verbose or peer status.
567 Return message in line, in case of message line, or None in case
568 of no message line."""
570 # Check log line type and call appropriate method.
571 (major_line_type, minor_line_type) = self.get_log_line_type(line)
572 if major_line_type == "status":
573 self.parse_status_log_line(line)
574 elif major_line_type == "peer-status":
575 self.parse_peer_status_log_line(line)
576 elif major_line_type == "verbose":
577 self.parse_verbose_log_line(line, minor_line_type)
579 def parse_status_log_line(self, line):
580 """Parse Tribler status line.
583 03-11-2009 12:19:04 aqua.mpeg DLSTATUS_DOWNLOADING 93.89% None up 0.00KB/s down 5440.21KB/s eta 1.67072531777 peers 2
586 msg = storage.StatusMessage()
588 parts = re.split(r'\s+', line)
590 # Get timestamp and transform it in datetime format.
591 date_string = "%s %s" %(parts[0], parts[1])
592 timestamp = datetime.datetime.strptime(date_string,
596 while i < len(parts):
597 if parts[i] == "peers":
598 msg.num_peers = self.canon_num_peers(parts[i+1])
601 if parts[i] == "down":
602 msg.download_speed = self.canon_speed(parts[i+1])
606 msg.upload_speed = self.canon_speed(parts[i+1])
609 if parts[i] == "DLSTATUS_DOWNLOADING" or \
610 parts[i] == "DLSTATUS_SEEDING":
611 # TODO: Tribler download status is percentage.
612 msg.download_size = self.canon_size(parts[i+1])
615 if parts[i] == "eta":
616 msg.eta = self.canon_eta(parts[i+1])
621 # Add to message_list.
622 logger.debug("Add new status message.")
623 self.message_list.append(msg)
625 def parse_peer_status_log_line(self, line):
626 """Parse Tribler peer status line.
629 --Peers: (Apr 14 00:43:25) [ ip: 188.27.206.23, dl: 0.00KB/s, ul: 182.21KB/s ]
631 parts = re.split(r'[\(\)\[\]]+', line)
633 date_string = parts[1].strip() + " %s" %(self.start_time.year)
634 timestamp = datetime.datetime.strptime(date_string,
637 for i in range(3, len(parts)):
638 msg = self.get_peer_status_message_from_part(re.sub(r'\s', '', parts[i]))
640 msg.timestamp = timestamp
641 logger.debug("Add new peer status message.")
642 self.message_list.append(msg)
644 def get_peer_status_message_from_part(self, part):
645 """Part sample ip:10.1.7.5:33289,dl:23.6KB/s,ul:94.91KB/s."""
646 subparts = re.split(r',', part.strip())
647 if len(subparts) != 3:
650 msg = storage.PeerStatusMessage()
652 pair = re.split(r':', s)
654 msg.peer_ip = pair[1]
655 msg.peer_port = pair[2]
656 elif pair[0] == 'dl':
657 msg.download_speed = self.canon_speed(pair[1])
658 elif pair[0] == 'ul':
659 msg.upload_speed = self.canon_speed(pair[1])
663 def parse_verbose_log_line(self, line, message_type):
664 """Parse Tribler verbose line.
667 20-10-2009 12:56:32 Downloader: new_request 95 212992 16384 to 141.85.37.41 14398
669 msg = storage.VerboseMessage()
670 msg.message_type = message_type
672 (msg.timestamp, msg.transfer_direction, msg.peer_ip, msg.peer_port) = self.get_verbose_log_line_common_fields(line)
673 (msg.index, msg.begin, msg.length, msg.listen_port) = self.get_verbose_log_line_noncommon_fields(line, message_type)
675 # Add to message_list.
676 logger.debug("Add new verbose message.")
677 self.message_list.append(msg)
679 def get_verbose_log_line_common_fields(self, line):
680 # FIXME: Take into account message line to establish direction.
681 transfer_direction = None
683 # Convert string 'DD-MM-YYYY HH:MM:SS' to datetime.
684 parts = re.split(r'\s+', line)
685 date_string = "%s %s" %(parts[0], parts[1])
686 timestamp = datetime.datetime.strptime(date_string,
692 return (timestamp, transfer_direction, peer_ip, peer_port)
694 def get_verbose_log_line_noncommon_fields(self, line, message_type):
695 if message_type == "CHOKE" or message_type == "UNCHOKE" or \
696 message_type == "INTERESTED" or \
697 message_type == "NOT_INTERESTED" or \
698 message_type == "BITFIELD":
700 elif message_type == "HAVE":
701 return self.parse_have(line)
702 elif message_type == "CANCEL":
703 return self.parse_cancel(line)
704 elif message_type == "REQUEST":
705 return self.parse_request(line)
706 elif message_type == "PIECE":
707 return self.parse_piece(line)
708 elif message_type == "DHT_PORT":
709 return self.parse_dht_port(line)
711 # Shouldn't get here.
714 def parse_have(self, line):
715 """Parse HAVE line in Tribler log file.
718 14-11-2009 23:11:13 connecter: Got HAVE( 14 ) from 141.85.37.41
720 parts = re.split(r'\s+', line)
721 index = int(parts[5])
722 return (index, 0, 0, 0)
724 def parse_request(self, line):
725 """Parse REQUEST line in TRIBLER log file.
728 20-10-2009 12:56:39 Downloader: new_request 52 98304 16384 to 141.85.37.41 14398
729 27-11-2009 18:01:22 connecter: Got REQUEST( 1218 ) from 87.0.15.75
731 parts = re.split(r'\s+', line)
733 if re.search("new_request", line):
734 # Send request message.
735 index = int(parts[4])
736 begin = int(parts[5])
737 length = int(parts[6])
739 #peer_port = int(parts[9])
741 # Receive request message.
742 index = int(parts[5])
745 return (index, begin, length, 0)
747 def parse_piece(line):
748 """Parse PIECE line in TRIBLER log file.
751 14-11-2009 23:11:13 connecter: Got PIECE( 141 ) from 141.85.37.41
753 parts = re.split(r'\s+', line)
754 index = int(parts[5])
755 return (index, begin, length, 0)
757 def parse_cancel(line):
758 """Parse CANCEL line in libtorrent log file.
761 14-11-2009 23:11:24 sent cancel: 130: 114688-131072
763 # FIXME: proper handling.
764 parts = re.split(r'\s+', line)
765 index = int(parts[8])
766 begin = int(parts[9])
767 length = int(parts[10]) - begin
768 return (index, begin, length, 0)
770 def canon_num_peers(self, non_canon_value):
771 return int(non_canon_value)
773 def canon_dht(self, non_canon_value):
774 return int(non_canon_value)
776 def canon_speed(self, non_canon_value):
777 """119.51KB/s -> 119"""
778 return int(float(non_canon_value.strip("KB/s")))
780 def canon_size(self, non_canon_value):
782 return int(float(non_canon_value.strip("%")))
784 def canon_eta(self, non_canon_value):
785 """26.456787 -> 26 (seconds)"""
786 return int(float(non_canon_value))