ppf: Add SwarmWriter class.
[cs-p2p-next.git] / ppf / new / storage.py
1 """
2 Storage class for P2P logging information.
3
4 Built on previous work by Adriana Draghici and Razvan Deaconescu.
5
6 2011, Razvan Deaconescu, razvan.deaconescu@cs.pub.ro
7 """
8
9 import os
10 import os.path
11 import re
12 import logging
13 import sqlite3
14 import datetime
15
16 #
17 # Logging code heavily inspired by Logging HOWTO documentation:
18 #     http://docs.python.org/dev/howto/logging.html#configuring-logging
19 #
20
21 # Create logger; default logging level is DEBUG.
22 logger = logging.getLogger(__name__)
23 logger.setLevel(logging.DEBUG)
24
25 # Create console handler and set level to ERROR.
26 ch = logging.StreamHandler()
27 ch.setLevel(logging.DEBUG)
28
29 # Create formatter.
30 formatter = logging.Formatter('%(filename)s:%(lineno)s - %(levelname)s: %(message)s')
31
32 # Add formatter to console handler.
33 ch.setFormatter(formatter)
34
35 # Add console handler to logger.
36 logger.addHandler(ch)
37
38
39 message_types = {
40         'CHOKE': {'id': 1, 'parameters': None},
41         'UNCHOKE': {'id': 2, 'parameters': None},
42         'INTERESTED': {'id': 3, 'parameters': None},
43         'NOT_INTERESTED': {'id': 4, 'parameters': None},
44         'HAVE': {'id': 5, 'parameters': None},
45         'BITFIELD': {'id': 6, 'parameters': None},
46         'REQUEST': {'id': 7, 'parameters': None},
47         'PIECE': {'id': 8, 'parameters': None},
48         'CANCEL': {'id': 9, 'parameters': None},
49         'DHT_PORT': {'id': 10, 'parameters': None}
50 }
51
52 bittorrent_clients = {
53         'Tribler': {
54             'id': 1,
55             'language': 'Python',
56             'url': 'http://www.tribler.org/trac',
57             'dht_support': True,
58             'streaming_support': True,
59             'pxe_support': None,
60             'features': None
61         },
62         'NextShare': {
63             'id': 2,
64             'language': 'Python',
65             'url': 'https://trac.p2p-next.org/',
66             'dht_support': True,
67             'streaming_support': True,
68             'pxe_support': None,
69             'features': None
70         },
71         'libtorrent-rasterbar': {
72             'id': 3,
73             'language': 'C++',
74             'url': 'http://www.rasterbar.com/products/libtorrent/',
75             'dht_support': True,
76             'streaming_support': True,
77             'pxe_support': None,
78             'features': None
79         },
80         'Vuze': {
81             'id': 4,
82             'language': 'Java',
83             'url': 'http://www.vuze.com/',
84             'dht_support': True,
85             'streaming_support': True,
86             'pxe_support': None,
87             'features': None
88         },
89         'Transmission': {
90             'id': 5,
91             'language': 'C',
92             'url': 'http://www.transmissionbt.com/',
93             'dht_support': True,
94             'streaming_support': False,
95             'pxe_support': None,
96             'features': None
97         },
98         'Aria': {
99             'id': 6,
100             'language': 'C',
101             'url': 'http://aria2.sourceforge.net/',
102             'dht_support': True,
103             'streaming_support': False,
104             'pxe_support': None,
105             'features': None
106         },
107         'Mainline': {
108             'id': 7,
109             'language': 'Python',
110             'url': 'http://www.bittorrent.com/',
111             'dht_support': True,
112             'streaming_support': False,
113             'pxe_support': None,
114             'features': None
115         }
116 }
117
118 transfer_directions = {
119         'receive': 1,
120         'send': 2
121 }
122
123 class Swarm(object):
124     """ Class mimics a C structure. """
125     def __init__(self, torrent_filename=None, data_size=None,
126             description=None):
127         self.torrent_filename = torrent_filename
128         self.data_size = data_size
129         self.description = description
130
131 class ClientSession(object):
132     """ Class mimics a C structure. """
133     # TODO: Add timezone.
134     def __init__(self, swarm_id=None, btclient=None, system_os=None,
135             system_os_version=None, system_ram=None, system_cpu=None,
136             public_ip=None, public_port=None, ds_limit=None, us_limit=None,
137             start_time=None, dht_enabled=None, pxe_enabled=None,
138             streaming_enabled=None, features=None, description=None):
139         self.swarm_id = swarm_id
140         self.btclient = btclient
141         self.system_os = system_os
142         self.system_os_version = system_os_version
143         self.system_ram = system_ram
144         self.system_cpu = system_cpu
145         self.public_ip = public_ip
146         self.public_port = public_port
147         self.ds_limit = ds_limit
148         self.us_limit = us_limit
149         self.start_time = start_time
150         self.dht_enabled = dht_enabled
151         self.pxe_enabled = pxe_enabled
152         self.streaming_enabled = streaming_enabled
153         self.features = features
154         self.description = description
155
156 class PeerStatusMessage(object):
157     """ Class mimics a C structure. """
158     def __init__(self, swarm_id=None, client_session_id=None, timestamp=None,
159             peer_ip=None, peer_port=None, download_speed=None,
160             upload_speed=None):
161         self.swarm_id = swarm_id
162         self.client_session_id = client_session_id
163         self.timestamp = timestamp
164         self.peer_ip = peer_ip
165         self.peer_port = peer_port
166         self.download_speed = download_speed
167         self.upload_speed = upload_speed
168
169 class StatusMessage(object):
170     """ Class mimics a C structure. """
171     def __init__(self, swarm_id=None, client_session_id=None, timestamp=None,
172             time=None, num_peers=None, num_dht_peers=None,
173             download_speed=None, upload_speed=None, download_size=None,
174             upload_size=None, eta=None):
175         self.swarm_id = swarm_id
176         self.client_session_id = client_session_id
177         self.timestamp = timestamp
178         self.num_peers = num_peers
179         self.num_dht_peers = num_dht_peers
180         self.download_speed = download_speed
181         self.upload_speed = upload_speed
182         self.download_size = download_size
183         self.upload_size = upload_size
184         self.eta = eta
185
186 class VerboseMessage(object):
187     """ Class mimics a C structure. """
188     def __init__(self, swarm_id=None, client_session_id=None, timestamp=None,
189             transfer_direction=None, peer_ip=None, peer_port=None,
190             message_type=None, index=None, begin=None, length=None,
191             listen_port=None):
192         self.swarm_id = swarm_id
193         self.client_session_id = client_session_id
194         self.timestamp = timestamp
195         self.transfer_direction = transfer_direction
196         self.peer_ip = peer_ip
197         self.peer_port = peer_port
198         self.message_type = message_type
199         self.index = index
200         self.begin = begin
201         self.length = length
202         self.listen_port = listen_port
203
204 class SwarmDataAccess(object):
205     def __init__(self):
206         pass
207
208     def add_swarm(self, swarm):
209         pass
210
211     def remove_swarm(self):
212         pass
213
214     def get_swarm(self):
215         pass
216
217     def update_swarm(self):
218         pass
219
220     def add_client_session(self, session):
221         pass
222
223     def remove_client_session(self):
224         pass
225
226     def get_client_session(self):
227         pass
228
229     def update_client_session(self):
230         pass
231
232     def add_peer_status_message(self, msg):
233         pass
234
235     def remove_peer_status_message(self):
236         pass
237
238     def get_peer_status_message(self):
239         pass
240
241     def update_peer_status_message(self):
242         pass
243
244     def add_status_message(self, msg):
245         pass
246
247     def remove_status_message(self):
248         pass
249
250     def get_status_message(self):
251         pass
252
253     def update_status_message(self):
254         pass
255
256     def add_verbose_message(self, msg):
257         pass
258
259     def remove_verbose_message(self):
260         pass
261
262     def get_verbose_message(self):
263         pass
264
265     def update_verbose_message(self):
266         pass
267
268 class FileAccess(SwarmDataAccess):
269     def __init__(self, path):
270         self.base_path = path
271
272 def find_last_numeric_subfolder(path):
273     """
274     Find last numeric folder in base_path folder.
275     The last numeric folder is the last swarm_id.
276     """
277     dir_list = []
278     pattern = re.compile("[0-9]+")
279
280     # Browse entries in base_path folder.
281     listing = os.listdir(path)
282     for entry in listing:
283         # If directory name is a number (id) add it to the list.
284         if os.path.isdir(os.path.join(path, entry)):
285             if pattern.match(entry):
286                 dir_list.append(int(entry))
287
288     if not dir_list:
289         return None
290     else:
291         dir_list.sort()
292         return dir_list[len(dir_list)-1]
293
294 class TreeTextFileAccess(FileAccess):
295     def __init__(self, path):
296         super(TreeTextFileAccess, self).__init__(path)
297
298     def add_swarm(self, swarm):
299         """
300         Create a subfolder with an unique id. Add 1 to the last numeric
301         subfolder id. In case none exists, use 1 as id.
302         """
303         id = find_last_numeric_subfolder(self.base_path)
304         if id == None:
305             id = 1
306         else:
307             id = id+1
308
309         swarm_path = os.path.join(self.base_path, str(id))
310         os.mkdir(swarm_path)
311
312         swarm_config = os.path.join(swarm_path, "swarm.conf")
313         f = open(swarm_config, 'w')
314         f.write("""id = %s
315         torrent_filename = %s
316         data_size = %s
317         description = %s
318         """ %(id, swarm.torrent_filename, swarm.data_size, swarm.description))
319         f.close()
320
321     def add_client_session(self, session):
322         """
323         Create session subfolder in swarm subfolder and add config file.
324         TODO: Throw exception in case swarm subfolder doesn't exist.
325         """
326         swarm_path = os.path.join(self.base_path, str(session.swarm_id))
327
328         # Search first available folder in swarm_path.
329         id = find_last_numeric_subfolder(swarm_path)
330         if id == None:
331             id = 1
332         else:
333             id = id+1
334
335         # Create session subfolder.
336         session_path = os.path.join(swarm_path, str(id))
337         os.mkdir(session_path)
338
339         # Create and populate configuration file.
340         session_config = os.path.join(session_path, "client_session.conf")
341         f = open(session_config, 'w')
342         f.write("""id = %s
343         swarm_id = %s
344         btclient = %s
345         system_os = %s
346         system_os_version = %s
347         system_ram = %s
348         system_cpu = %s
349         public_ip = %s
350         public_port = %s
351         ds_limit = %s
352         us_limit = %s
353         start_time = %s
354         dht_enabled = %s
355         pxe_enabled = %s
356         streaming_enabled = %s
357         features = %s
358         description = %s
359         """ %(id, session.swarm_id, session.btclient, session.system_os,
360             session.system_os_version, session.system_ram, session.system_cpu,
361             session.public_ip, session.public_port, session.ds_limit,
362             session.us_limit, session.start_time, session.dht_enabled,
363             session.pxe_enabled, session.streaming_enabled,
364             session.features, session.description))
365         f.close()
366
367     def add_peer_status_message(self, msg):
368         # TODO: id is number of lines in file.
369         swarm_path = os.path.join(self.base_path, str(msg.swarm_id))
370         session_path = os.path.join(swarm_path, str(msg.client_session_id))
371         message_file = os.path.join(session_path, "peer_status.txt")
372
373         f = open(message_file, 'a')
374         f.write("""%s,%s,%s,%s,%s,%s,%s,%s\n"""
375                 %(1, msg.swarm_id, msg.client_session_id, msg.timestamp,
376                     msg.peer_ip, msg.peer_port, msg.download_speed,
377                     msg.upload_speed))
378         f.close()
379
380     def add_status_message(self, msg):
381         # TODO: id is number of lines in file.
382         swarm_path = os.path.join(self.base_path, str(msg.swarm_id))
383         session_path = os.path.join(swarm_path, str(msg.client_session_id))
384         message_file = os.path.join(session_path, "status.txt")
385
386         f = open(message_file, 'a')
387         f.write("""%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s\n"""
388                 %(1, msg.swarm_id, msg.client_session_id, msg.timestamp,
389                     msg.num_peers, msg.num_dht_peers, msg.download_speed,
390                     msg.upload_speed, msg.download_size, msg.upload_size,
391                     msg.eta))
392         f.close()
393
394     def add_verbose_message(self, msg):
395         # TODO: id is number of lines in file.
396         swarm_path = os.path.join(self.base_path, str(msg.swarm_id))
397         session_path = os.path.join(swarm_path, str(msg.client_session_id))
398         message_file = os.path.join(session_path, "verbose.txt")
399
400         f = open(message_file, 'a')
401         f.write("""%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s\n"""
402                 %(1, msg.swarm_id, msg.client_session_id, msg.timestamp,
403                     msg.transfer_direction, msg.peer_ip, msg.peer_port,
404                     msg.message_type, msg.index, msg.begin, msg.length,
405                     msg.listen_port))
406         f.close()
407
408 class DatabaseAccess(SwarmDataAccess):
409     def __init__(self, database):
410         self.database = database
411
412     def connect(self):
413         self.conn = None
414         self.cursor = None
415
416     def disconnect(self):
417         self.cursor.close()
418         self.conn.close()
419
420 class SQLiteDatabaseAccess(DatabaseAccess):
421     def __init___(self, database):
422         super(SQLiteDatabaseAccess, self).__init__(database)
423
424     def connect(self):
425         self.conn = sqlite3.connect(self.database)
426         self.cursor = self.conn.cursor()
427         # Use foreign key support if available.
428         self.cursor.execute("PRAGMA foreign_keys = ON")
429         self.conn.commit()
430
431     def reset_query(self):
432         self.columns = ""
433         self.values = ""
434
435     def append_to_insert_query(self, data_name, data_value):
436         if data_value is not None:
437             self.columns = self.columns + data_name + ", "
438             self.values = self.values + "'" + str(data_value) + "'" + ", "
439
440     def add_swarm(self, swarm):
441         self.reset_query()
442         self.append_to_insert_query("torrent_filename", swarm.torrent_filename)
443         self.append_to_insert_query("data_size", swarm.data_size)
444         self.append_to_insert_query("description", swarm.description)
445
446         self.columns = re.sub(',\s*$', '', self.columns)
447         self.values = re.sub(',\s*$', '', self.values)
448         insert_query = "INSERT INTO swarms(" + self.columns +")" + \
449                 " VALUES(" + self.values + ")"
450         self.cursor.execute(insert_query)
451         self.conn.commit()
452
453     def add_client_session(self, session):
454         self.reset_query()
455         self.append_to_insert_query("swarm_id", session.swarm_id)
456         # TODO: search database for client ID
457         self.append_to_insert_query("btclient_id",
458                 bittorrent_clients[session.btclient]['id'])
459         self.append_to_insert_query("system_os", session.system_os)
460         self.append_to_insert_query("system_os_version", session.system_os_version)
461         self.append_to_insert_query("system_ram", session.system_ram)
462         self.append_to_insert_query("system_cpu", session.system_cpu)
463         self.append_to_insert_query("public_ip", session.public_ip)
464         self.append_to_insert_query("public_port", session.public_port)
465         self.append_to_insert_query("ds_limit", session.ds_limit)
466         self.append_to_insert_query("us_limit", session.us_limit)
467         self.append_to_insert_query("start_time", session.start_time)
468         self.append_to_insert_query("dht_enabled", session.dht_enabled)
469         self.append_to_insert_query("pxe_enabled", session.pxe_enabled)
470         self.append_to_insert_query("streaming_enabled", session.streaming_enabled)
471         self.append_to_insert_query("features", session.features)
472         self.append_to_insert_query("description", session.description)
473
474         self.columns = re.sub(',\s*$', '', self.columns)
475         self.values = re.sub(',\s*$', '', self.values)
476         insert_query = "INSERT INTO client_sessions(" + self.columns +")" + \
477                 " VALUES(" + self.values + ")"
478         self.cursor.execute(insert_query)
479         self.conn.commit()
480
481     def get_string_timestamp(self, ts):
482         # Timestamp is Python Datatime. Convert it to string format and
483         # pass it to internal SQLITE julianday() function.
484         return "%s-%s-%s %s:%s:%s.%s" \
485                 %(ts.year, ts.month, ts.day, ts.hour, ts.minute, ts.second,
486                         1000 * ts.microsecond)
487
488     def add_peer_status_message(self, msg):
489         self.reset_query()
490         self.append_to_insert_query("client_session_id", msg.client_session_id)
491
492         # TODO: Check msg.timestamp is not None. Raise exception.
493         timestamp_string = self.get_string_timestamp(msg.timestamp)
494         value = "julianday(" + timestamp_string + ")"
495         self.append_to_insert_query("timestamp", value)
496
497         self.append_to_insert_query("timestamp", msg.timestamp)
498         self.append_to_insert_query("peer_ip", msg.peer_ip)
499         self.append_to_insert_query("peer_port", msg.peer_port)
500         self.append_to_insert_query("download_speed", msg.download_speed)
501         self.append_to_insert_query("upload_speed", msg.upload_speed)
502
503         self.columns = re.sub(',\s*$', '', self.columns)
504         self.values = re.sub(',\s*$', '', self.values)
505         insert_query = "INSERT INTO peer_status_messages(" + \
506                 self.columns +")" + " VALUES(" + self.values + ")"
507         self.cursor.execute(insert_query)
508         self.conn.commit()
509
510     def add_status_message(self, msg):
511         self.reset_query()
512         self.append_to_insert_query("client_session_id", msg.client_session_id)
513
514         # TODO: Check msg.timestamp is not None. Raise exception.
515         timestamp_string = self.get_string_timestamp(msg.timestamp)
516         value = "julianday(" + timestamp_string + ")"
517         self.append_to_insert_query("timestamp", value)
518
519         self.append_to_insert_query("num_peers", msg.num_peers)
520         self.append_to_insert_query("num_dht_peers", msg.num_dht_peers)
521         self.append_to_insert_query("download_speed", msg.download_speed)
522         self.append_to_insert_query("upload_speed", msg.upload_speed)
523         self.append_to_insert_query("download_size", msg.download_size)
524         self.append_to_insert_query("upload_size", msg.upload_size)
525         self.append_to_insert_query("eta", msg.eta)
526
527         self.columns = re.sub(',\s*$', '', self.columns)
528         self.values = re.sub(',\s*$', '', self.values)
529         insert_query = "INSERT INTO status_messages(" + self.columns +")" + \
530                 " VALUES(" + self.values + ")"
531         self.cursor.execute(insert_query)
532         self.conn.commit()
533
534     def add_verbose_message(self, msg):
535         self.reset_query()
536         self.append_to_insert_query("client_session_id", msg.client_session_id)
537
538         # TODO: Check msg.timestamp is not None. Raise exception.
539         timestamp_string = self.get_string_timestamp(msg.timestamp)
540         value = "julianday(" + timestamp_string + ")"
541         self.append_to_insert_query("timestamp", value)
542
543         self.append_to_insert_query("transfer_direction_id",
544                 transfer_directions[msg.transfer_direction])
545         self.append_to_insert_query("peer_ip", msg.peer_ip)
546         self.append_to_insert_query("peer_port", msg.peer_port)
547         self.append_to_insert_query("message_type_id",
548                 message_types[msg.message_type]['id'])
549         self.append_to_insert_query("index", msg.index)
550         self.append_to_insert_query("begin", msg.begin)
551         self.append_to_insert_query("length", msg.length)
552         self.append_to_insert_query("listen_port", msg.listen_port)
553
554         self.columns = re.sub(',\s*$', '', self.columns)
555         self.values = re.sub(',\s*$', '', self.values)
556         insert_query = "INSERT INTO verbose_messages(" + self.columns +")" + \
557                 " VALUES(" + self.values + ")"
558         self.cursor.execute(insert_query)
559         self.conn.commit()
560
561 class MySQLDatabaseAccess(DatabaseAccess):
562     def __init___(self, database):
563         super(SQLiteDatabaseAccess, self).__init__(database)
564
565     def connect(self):
566         pass
567
568     def add_swarm(self, swarm):
569         pass
570
571     def add_client_session(self, session):
572         pass
573
574     def add_peer_status_message(self, msg):
575         pass
576
577     def add_status_message(self, msg):
578         pass
579
580     def add_verbose_message(self, msg):
581         pass
582
583 class SwarmWriter(object):
584     """
585     Wrapper class for swarm storage write actions. Multiple *Access
586     objects may be added to the clas resulting in multiple storage types.
587     For example, adding a swarm could result in
588        * adding a table entry in a MySQL database (MySQLDatabaseAccess)
589        * adding a table entry in an SQLite database (SQLiteDatabaseAccess)
590        * adding a new folder in a tree structure (TreeTextFileAccess)
591     """
592
593     def __init__(self):
594         handlers = []
595
596     def add_access_handle(self, handle):
597         handlers.append(handle)
598
599     def remove_access_handle(self, handle):
600         handlers.remove(handle)
601
602     def add_swarm(self, swarm):
603         for h in handlers:
604             h.add_swarm(swarm)
605
606     def add_client_session(self, session):
607         for h in handlers:
608             h.add_client_session(swarm)
609
610     def add_peer_status_message(self, msg):
611         for h in handlers:
612             h.add_peer_status_message(swarm)
613
614     def add_status_message(self, msg):
615         for h in handlers:
616             h.add_status_message(swarm)
617
618     def add_verbose_message(self, msg):
619         for h in handlers:
620             h.add_verbose_message(swarm)