2 Storage class for P2P logging information.
4 Built on previous work by Adriana Draghici and Razvan Deaconescu.
6 2011, Razvan Deaconescu, razvan.deaconescu@cs.pub.ro
7 2011, Mariana Marasoiu, mariana.marasoiu@gmail.com
19 # Logging code heavily inspired by Logging HOWTO documentation:
20 # http://docs.python.org/dev/howto/logging.html#configuring-logging
23 # Create logger; default logging level is DEBUG.
24 logger = logging.getLogger(__name__)
25 logger.setLevel(logging.DEBUG)
27 # Create console handler and set level to ERROR.
28 ch = logging.StreamHandler()
29 ch.setLevel(logging.DEBUG)
32 formatter = logging.Formatter('%(filename)s:%(lineno)s - %(levelname)s: %(message)s')
34 # Add formatter to console handler.
35 ch.setFormatter(formatter)
37 # Add console handler to logger.
42 'CHOKE': {'id': 1, 'parameters': None},
43 'UNCHOKE': {'id': 2, 'parameters': None},
44 'INTERESTED': {'id': 3, 'parameters': None},
45 'NOT_INTERESTED': {'id': 4, 'parameters': None},
46 'HAVE': {'id': 5, 'parameters': None},
47 'BITFIELD': {'id': 6, 'parameters': None},
48 'REQUEST': {'id': 7, 'parameters': None},
49 'PIECE': {'id': 8, 'parameters': None},
50 'CANCEL': {'id': 9, 'parameters': None},
51 'DHT_PORT': {'id': 10, 'parameters': None}
54 bittorrent_clients = {
58 'url': 'http://www.tribler.org/trac',
60 'streaming_support': True,
67 'url': 'https://trac.p2p-next.org/',
69 'streaming_support': True,
73 'libtorrent-rasterbar': {
76 'url': 'http://www.rasterbar.com/products/libtorrent/',
78 'streaming_support': True,
85 'url': 'http://www.vuze.com/',
87 'streaming_support': True,
94 'url': 'http://www.transmissionbt.com/',
96 'streaming_support': False,
103 'url': 'http://aria2.sourceforge.net/',
105 'streaming_support': False,
111 'language': 'Python',
112 'url': 'http://www.bittorrent.com/',
114 'streaming_support': False,
120 transfer_directions = {
126 """ Class mimics a C structure. """
127 def __init__(self, torrent_filename=None, data_size=None,
129 self.torrent_filename = torrent_filename
130 self.data_size = data_size
131 self.description = description
133 class ClientSession(object):
134 """ Class mimics a C structure. """
135 # TODO: Add timezone.
136 def __init__(self, swarm_id=None, btclient=None, system_os=None,
137 system_os_version=None, system_ram=None, system_cpu=None,
138 public_ip=None, public_port=None, ds_limit=None, us_limit=None,
139 start_time=None, dht_enabled=None, pxe_enabled=None,
140 streaming_enabled=None, features=None, description=None):
141 self.swarm_id = swarm_id
142 self.btclient = btclient
143 self.system_os = system_os
144 self.system_os_version = system_os_version
145 self.system_ram = system_ram
146 self.system_cpu = system_cpu
147 self.public_ip = public_ip
148 self.public_port = public_port
149 self.ds_limit = ds_limit
150 self.us_limit = us_limit
151 self.start_time = start_time
152 self.dht_enabled = dht_enabled
153 self.pxe_enabled = pxe_enabled
154 self.streaming_enabled = streaming_enabled
155 self.features = features
156 self.description = description
158 class PeerStatusMessage(object):
159 """ Class mimics a C structure. """
160 def __init__(self, swarm_id=None, client_session_id=None, timestamp=None,
161 peer_ip=None, peer_port=None, download_speed=None,
163 self.swarm_id = swarm_id
164 self.client_session_id = client_session_id
165 self.timestamp = timestamp
166 self.peer_ip = peer_ip
167 self.peer_port = peer_port
168 self.download_speed = download_speed
169 self.upload_speed = upload_speed
171 class StatusMessage(object):
172 """ Class mimics a C structure. """
173 def __init__(self, swarm_id=None, client_session_id=None, timestamp=None,
174 time=None, num_peers=None, num_dht_peers=None,
175 download_speed=None, upload_speed=None, download_size=None,
176 upload_size=None, eta=None):
177 self.swarm_id = swarm_id
178 self.client_session_id = client_session_id
179 self.timestamp = timestamp
180 self.num_peers = num_peers
181 self.num_dht_peers = num_dht_peers
182 self.download_speed = download_speed
183 self.upload_speed = upload_speed
184 self.download_size = download_size
185 self.upload_size = upload_size
188 class VerboseMessage(object):
189 """ Class mimics a C structure. """
190 def __init__(self, swarm_id=None, client_session_id=None, timestamp=None,
191 transfer_direction=None, peer_ip=None, peer_port=None,
192 message_type=None, index=None, begin=None, length=None,
194 self.swarm_id = swarm_id
195 self.client_session_id = client_session_id
196 self.timestamp = timestamp
197 self.transfer_direction = transfer_direction
198 self.peer_ip = peer_ip
199 self.peer_port = peer_port
200 self.message_type = message_type
204 self.listen_port = listen_port
206 class SwarmDataAccess(object):
210 def add_swarm(self, swarm):
213 def remove_swarm(self):
219 def update_swarm(self):
222 def add_client_session(self, session):
225 def remove_client_session(self):
228 def get_client_session(self):
231 def update_client_session(self):
234 def add_peer_status_message(self, msg):
237 def remove_peer_status_message(self):
240 def get_peer_status_message(self):
243 def update_peer_status_message(self):
246 def add_status_message(self, msg):
249 def remove_status_message(self):
252 def get_status_message(self):
255 def update_status_message(self):
258 def add_verbose_message(self, msg):
261 def remove_verbose_message(self):
264 def get_verbose_message(self):
267 def update_verbose_message(self):
270 class FileAccess(SwarmDataAccess):
271 def __init__(self, path):
272 self.base_path = path
274 def find_last_numeric_subfolder(path):
276 Find last numeric folder in base_path folder.
277 The last numeric folder is the last swarm_id.
280 pattern = re.compile("[0-9]+")
282 # Browse entries in base_path folder.
283 listing = os.listdir(path)
284 for entry in listing:
285 # If directory name is a number (id) add it to the list.
286 if os.path.isdir(os.path.join(path, entry)):
287 if pattern.match(entry):
288 dir_list.append(int(entry))
294 return dir_list[len(dir_list)-1]
296 class TreeTextFileAccess(FileAccess):
297 def __init__(self, path):
298 super(TreeTextFileAccess, self).__init__(path)
300 def add_swarm(self, swarm):
302 Create a subfolder with an unique id. Add 1 to the last numeric
303 subfolder id. In case none exists, use 1 as id.
305 id = find_last_numeric_subfolder(self.base_path)
311 swarm_path = os.path.join(self.base_path, str(id))
314 swarm_config = os.path.join(swarm_path, "swarm.conf")
315 f = open(swarm_config, 'w')
317 torrent_filename = %s
320 """ %(id, swarm.torrent_filename, swarm.data_size, swarm.description))
323 def add_client_session(self, session):
325 Create session subfolder in swarm subfolder and add config file.
326 TODO: Throw exception in case swarm subfolder doesn't exist.
328 swarm_path = os.path.join(self.base_path, str(session.swarm_id))
330 # Search first available folder in swarm_path.
331 id = find_last_numeric_subfolder(swarm_path)
337 # Create session subfolder.
338 session_path = os.path.join(swarm_path, str(id))
339 os.mkdir(session_path)
341 # Create and populate configuration file.
342 session_config = os.path.join(session_path, "client_session.conf")
343 f = open(session_config, 'w')
348 system_os_version = %s
358 streaming_enabled = %s
361 """ %(id, session.swarm_id, session.btclient, session.system_os,
362 session.system_os_version, session.system_ram, session.system_cpu,
363 session.public_ip, session.public_port, session.ds_limit,
364 session.us_limit, session.start_time, session.dht_enabled,
365 session.pxe_enabled, session.streaming_enabled,
366 session.features, session.description))
369 def add_peer_status_message(self, msg):
370 # TODO: id is number of lines in file.
371 swarm_path = os.path.join(self.base_path, str(msg.swarm_id))
372 session_path = os.path.join(swarm_path, str(msg.client_session_id))
373 message_file = os.path.join(session_path, "peer_status.txt")
375 f = open(message_file, 'a')
376 f.write("""%s,%s,%s,%s,%s,%s,%s,%s\n"""
377 %(1, msg.swarm_id, msg.client_session_id, msg.timestamp,
378 msg.peer_ip, msg.peer_port, msg.download_speed,
382 def add_status_message(self, msg):
383 # TODO: id is number of lines in file.
384 swarm_path = os.path.join(self.base_path, str(msg.swarm_id))
385 session_path = os.path.join(swarm_path, str(msg.client_session_id))
386 message_file = os.path.join(session_path, "status.txt")
388 f = open(message_file, 'a')
389 f.write("""%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s\n"""
390 %(1, msg.swarm_id, msg.client_session_id, msg.timestamp,
391 msg.num_peers, msg.num_dht_peers, msg.download_speed,
392 msg.upload_speed, msg.download_size, msg.upload_size,
396 def add_verbose_message(self, msg):
397 # TODO: id is number of lines in file.
398 swarm_path = os.path.join(self.base_path, str(msg.swarm_id))
399 session_path = os.path.join(swarm_path, str(msg.client_session_id))
400 message_file = os.path.join(session_path, "verbose.txt")
402 f = open(message_file, 'a')
403 f.write("""%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s\n"""
404 %(1, msg.swarm_id, msg.client_session_id, msg.timestamp,
405 msg.transfer_direction, msg.peer_ip, msg.peer_port,
406 msg.message_type, msg.index, msg.begin, msg.length,
410 class DatabaseAccess(SwarmDataAccess):
411 def __init__(self, database):
412 self.database = database
418 def disconnect(self):
422 class SQLiteDatabaseAccess(DatabaseAccess):
423 def __init___(self, database):
424 super(SQLiteDatabaseAccess, self).__init__(database)
427 self.conn = sqlite3.connect(self.database)
428 self.cursor = self.conn.cursor()
429 # Use foreign key support if available.
430 self.cursor.execute("PRAGMA foreign_keys = ON")
433 def reset_query(self):
437 def append_to_insert_query(self, data_name, data_value):
438 if data_value is not None:
439 self.columns = self.columns + data_name + ", "
440 self.values = self.values + "'" + str(data_value) + "'" + ", "
442 def add_swarm(self, swarm):
444 self.append_to_insert_query("torrent_filename", swarm.torrent_filename)
445 self.append_to_insert_query("data_size", swarm.data_size)
446 self.append_to_insert_query("description", swarm.description)
448 self.columns = re.sub(',\s*$', '', self.columns)
449 self.values = re.sub(',\s*$', '', self.values)
450 insert_query = "INSERT INTO swarms(" + self.columns +")" + \
451 " VALUES(" + self.values + ")"
452 self.cursor.execute(insert_query)
455 def add_client_session(self, session):
457 self.append_to_insert_query("swarm_id", session.swarm_id)
458 # TODO: search database for client ID
459 self.append_to_insert_query("btclient_id",
460 bittorrent_clients[session.btclient]['id'])
461 self.append_to_insert_query("system_os", session.system_os)
462 self.append_to_insert_query("system_os_version", session.system_os_version)
463 self.append_to_insert_query("system_ram", session.system_ram)
464 self.append_to_insert_query("system_cpu", session.system_cpu)
465 self.append_to_insert_query("public_ip", session.public_ip)
466 self.append_to_insert_query("public_port", session.public_port)
467 self.append_to_insert_query("ds_limit", session.ds_limit)
468 self.append_to_insert_query("us_limit", session.us_limit)
469 self.append_to_insert_query("start_time", session.start_time)
470 self.append_to_insert_query("dht_enabled", session.dht_enabled)
471 self.append_to_insert_query("pxe_enabled", session.pxe_enabled)
472 self.append_to_insert_query("streaming_enabled", session.streaming_enabled)
473 self.append_to_insert_query("features", session.features)
474 self.append_to_insert_query("description", session.description)
476 self.columns = re.sub(',\s*$', '', self.columns)
477 self.values = re.sub(',\s*$', '', self.values)
478 insert_query = "INSERT INTO client_sessions(" + self.columns +")" + \
479 " VALUES(" + self.values + ")"
480 self.cursor.execute(insert_query)
483 def get_string_timestamp(self, ts):
484 # Timestamp is Python Datatime. Convert it to string format and
485 # pass it to internal SQLITE julianday() function.
486 return "%s-%s-%s %s:%s:%s.%s" \
487 %(ts.year, ts.month, ts.day, ts.hour, ts.minute, ts.second,
488 1000 * ts.microsecond)
490 def add_peer_status_message(self, msg):
492 self.append_to_insert_query("client_session_id", msg.client_session_id)
494 # TODO: Check msg.timestamp is not None. Raise exception.
495 timestamp_string = self.get_string_timestamp(msg.timestamp)
496 value = "julianday(" + timestamp_string + ")"
497 self.append_to_insert_query("timestamp", value)
499 self.append_to_insert_query("timestamp", msg.timestamp)
500 self.append_to_insert_query("peer_ip", msg.peer_ip)
501 self.append_to_insert_query("peer_port", msg.peer_port)
502 self.append_to_insert_query("download_speed", msg.download_speed)
503 self.append_to_insert_query("upload_speed", msg.upload_speed)
505 self.columns = re.sub(',\s*$', '', self.columns)
506 self.values = re.sub(',\s*$', '', self.values)
507 insert_query = "INSERT INTO peer_status_messages(" + \
508 self.columns +")" + " VALUES(" + self.values + ")"
509 self.cursor.execute(insert_query)
512 def add_status_message(self, msg):
514 self.append_to_insert_query("client_session_id", msg.client_session_id)
516 # TODO: Check msg.timestamp is not None. Raise exception.
517 timestamp_string = self.get_string_timestamp(msg.timestamp)
518 value = "julianday(" + timestamp_string + ")"
519 self.append_to_insert_query("timestamp", value)
521 self.append_to_insert_query("num_peers", msg.num_peers)
522 self.append_to_insert_query("num_dht_peers", msg.num_dht_peers)
523 self.append_to_insert_query("download_speed", msg.download_speed)
524 self.append_to_insert_query("upload_speed", msg.upload_speed)
525 self.append_to_insert_query("download_size", msg.download_size)
526 self.append_to_insert_query("upload_size", msg.upload_size)
527 self.append_to_insert_query("eta", msg.eta)
529 self.columns = re.sub(',\s*$', '', self.columns)
530 self.values = re.sub(',\s*$', '', self.values)
531 insert_query = "INSERT INTO status_messages(" + self.columns +")" + \
532 " VALUES(" + self.values + ")"
533 self.cursor.execute(insert_query)
536 def add_verbose_message(self, msg):
538 self.append_to_insert_query("client_session_id", msg.client_session_id)
540 # TODO: Check msg.timestamp is not None. Raise exception.
541 timestamp_string = self.get_string_timestamp(msg.timestamp)
542 value = "julianday(" + timestamp_string + ")"
543 self.append_to_insert_query("timestamp", value)
545 self.append_to_insert_query("transfer_direction_id",
546 transfer_directions[msg.transfer_direction])
547 self.append_to_insert_query("peer_ip", msg.peer_ip)
548 self.append_to_insert_query("peer_port", msg.peer_port)
549 self.append_to_insert_query("message_type_id",
550 message_types[msg.message_type]['id'])
551 self.append_to_insert_query("index", msg.index)
552 self.append_to_insert_query("begin", msg.begin)
553 self.append_to_insert_query("length", msg.length)
554 self.append_to_insert_query("listen_port", msg.listen_port)
556 self.columns = re.sub(',\s*$', '', self.columns)
557 self.values = re.sub(',\s*$', '', self.values)
558 insert_query = "INSERT INTO verbose_messages(" + self.columns +")" + \
559 " VALUES(" + self.values + ")"
560 self.cursor.execute(insert_query)
563 class MySQLDatabaseAccess(DatabaseAccess):
564 def __init___(self, database):
565 super(MySQLDatabaseAccess, self).__init__(database)
568 # TODO Add support for reading connection information from config_file
569 self.conn = MySQLdb.Connection(db=self.database['database'],
570 user=self.database['user'],
571 passwd=self.database['password'],
572 host=self.database['host'],
573 port=self.database['port'])
574 self.cursor = self.conn.cursor()
577 def reset_query(self):
581 def append_to_insert_query(self, data_name, data_value):
582 if data_value is not None:
583 self.columns = self.columns + data_name + ", "
584 self.values = self.values + "'" + str(data_value) + "'" + ", "
586 def add_swarm(self, swarm):
588 self.append_to_insert_query("torrent_filename", swarm.torrent_filename)
589 self.append_to_insert_query("data_size", swarm.data_size)
590 self.append_to_insert_query("description", swarm.description)
592 self.columns = re.sub(',\s*$', '', self.columns)
593 self.values = re.sub(',\s*$', '', self.values)
594 insert_query = "INSERT INTO swarms(" + self.columns +")" + \
595 " VALUES(" + self.values + ")"
596 self.cursor.execute(insert_query)
599 def add_client_session(self, session):
601 self.append_to_insert_query("swarm_id", session.swarm_id)
602 # TODO: search database for client ID
603 self.append_to_insert_query("btclient_id",
604 bittorrent_clients[session.btclient]['id'])
605 self.append_to_insert_query("system_os", session.system_os)
606 self.append_to_insert_query("system_os_version", session.system_os_version)
607 self.append_to_insert_query("system_ram", session.system_ram)
608 self.append_to_insert_query("system_cpu", session.system_cpu)
609 self.append_to_insert_query("public_ip", session.public_ip)
610 self.append_to_insert_query("public_port", session.public_port)
611 self.append_to_insert_query("ds_limit", session.ds_limit)
612 self.append_to_insert_query("us_limit", session.us_limit)
613 self.append_to_insert_query("start_time", session.start_time)
614 self.append_to_insert_query("dht_enabled", session.dht_enabled)
615 self.append_to_insert_query("pxe_enabled", session.pxe_enabled)
616 self.append_to_insert_query("streaming_enabled", session.streaming_enabled)
617 self.append_to_insert_query("features", session.features)
618 self.append_to_insert_query("description", session.description)
620 self.columns = re.sub(',\s*$', '', self.columns)
621 self.values = re.sub(',\s*$', '', self.values)
622 insert_query = "INSERT INTO client_sessions(" + self.columns +")" + \
623 " VALUES(" + self.values + ")"
624 self.cursor.execute(insert_query)
627 def get_string_timestamp(self, ts):
628 # Timestamp is Python Datatime. Convert it to string format.
629 return "%s-%s-%s %s:%s:%s" \
630 %(ts.year, ts.month, ts.day, ts.hour, ts.minute, ts.second)
632 def add_peer_status_message(self, msg):
634 self.append_to_insert_query("client_session_id", msg.client_session_id)
636 # TODO: Check msg.timestamp is not None. Raise exception.
637 timestamp_string = self.get_string_timestamp(msg.timestamp)
639 self.append_to_insert_query("timestamp", timestamp_string)
640 self.append_to_insert_query("peer_ip", msg.peer_ip)
641 self.append_to_insert_query("peer_port", msg.peer_port)
642 self.append_to_insert_query("download_speed", msg.download_speed)
643 self.append_to_insert_query("upload_speed", msg.upload_speed)
645 self.columns = re.sub(',\s*$', '', self.columns)
646 self.values = re.sub(',\s*$', '', self.values)
647 insert_query = "INSERT INTO peer_status_messages(" + \
648 self.columns +")" + " VALUES(" + self.values + ")"
649 self.cursor.execute(insert_query)
652 def add_status_message(self, msg):
654 self.append_to_insert_query("client_session_id", msg.client_session_id)
656 # TODO: Check msg.timestamp is not None. Raise exception.
657 timestamp_string = self.get_string_timestamp(msg.timestamp)
659 self.append_to_insert_query("timestamp", timestamp_string)
660 self.append_to_insert_query("num_peers", msg.num_peers)
661 self.append_to_insert_query("num_dht_peers", msg.num_dht_peers)
662 self.append_to_insert_query("download_speed", msg.download_speed)
663 self.append_to_insert_query("upload_speed", msg.upload_speed)
664 self.append_to_insert_query("download_size", msg.download_size)
665 self.append_to_insert_query("upload_size", msg.upload_size)
666 self.append_to_insert_query("eta", msg.eta)
668 self.columns = re.sub(',\s*$', '', self.columns)
669 self.values = re.sub(',\s*$', '', self.values)
670 insert_query = "INSERT INTO status_messages(" + self.columns +")" + \
671 " VALUES(" + self.values + ")"
672 self.cursor.execute(insert_query)
675 def add_verbose_message(self, msg):
677 self.append_to_insert_query("client_session_id", msg.client_session_id)
679 # TODO: Check msg.timestamp is not None. Raise exception.
680 timestamp_string = self.get_string_timestamp(msg.timestamp)
682 self.append_to_insert_query("timestamp", timestamp_string)
683 self.append_to_insert_query("transfer_direction_id",
684 transfer_directions[msg.transfer_direction])
685 self.append_to_insert_query("peer_ip", msg.peer_ip)
686 self.append_to_insert_query("peer_port", msg.peer_port)
687 self.append_to_insert_query("message_type_id",
688 message_types[msg.message_type]['id'])
689 self.append_to_insert_query("index", msg.index)
690 self.append_to_insert_query("begin", msg.begin)
691 self.append_to_insert_query("length", msg.length)
692 self.append_to_insert_query("listen_port", msg.listen_port)
694 self.columns = re.sub(',\s*$', '', self.columns)
695 self.values = re.sub(',\s*$', '', self.values)
696 insert_query = "INSERT INTO verbose_messages(" + self.columns +")" + \
697 " VALUES(" + self.values + ")"
698 self.cursor.execute(insert_query)
701 class SwarmWriter(object):
703 Wrapper class for swarm storage write actions. Multiple *Access
704 objects may be added to the clas resulting in multiple storage types.
705 For example, adding a swarm could result in
706 * adding a table entry in a MySQL database (MySQLDatabaseAccess)
707 * adding a table entry in an SQLite database (SQLiteDatabaseAccess)
708 * adding a new folder in a tree structure (TreeTextFileAccess)
714 def add_access_handle(self, handle):
715 self.handlers.append(handle)
717 def remove_access_handle(self, handle):
718 self.handlers.remove(handle)
720 def add_swarm(self, swarm):
721 for h in self.handlers:
724 def add_client_session(self, session):
725 for h in self.handlers:
726 h.add_client_session(session)
728 def add_peer_status_message(self, msg):
729 for h in self.handlers:
730 h.add_peer_status_message(msg)
732 def add_status_message(self, msg):
733 for h in self.handlers:
734 h.add_status_message(msg)
736 def add_verbose_message(self, msg):
737 for h in self.handlers:
738 h.add_verbose_message(msg)