2 Test suite for storage. Uses unittest module.
4 2011, Razvan Deaconescu, razvan.deaconescu@cs.pub.ro.
5 2011, Mariana Marasoiu, mariana.marasoiu@gmail.com
20 def create_non_numeric_subfolders(path):
21 os.mkdir(os.path.join(path, "ana"))
22 os.mkdir(os.path.join(path, "are"))
23 os.mkdir(os.path.join(path, "mere"))
24 os.mkdir(os.path.join(path, "si"))
25 os.mkdir(os.path.join(path, "pere"))
27 def create_numeric_subfolders_sequential(path):
28 os.mkdir(os.path.join(path, "1"))
29 os.mkdir(os.path.join(path, "2"))
30 os.mkdir(os.path.join(path, "3"))
31 os.mkdir(os.path.join(path, "4"))
32 os.mkdir(os.path.join(path, "5"))
34 def create_numeric_subfolders_non_sequential(path):
35 os.mkdir(os.path.join(path, "32"))
36 os.mkdir(os.path.join(path, "5"))
37 os.mkdir(os.path.join(path, "423"))
38 os.mkdir(os.path.join(path, "1401"))
39 os.mkdir(os.path.join(path, "92"))
41 def create_numeric_files(path):
42 f = open(os.path.join(path, "33"), 'w')
44 f = open(os.path.join(path, "6"), 'w')
46 f = open(os.path.join(path, "424"), 'w')
48 f = open(os.path.join(path, "1402"), 'w')
50 f = open(os.path.join(path, "93"), 'w')
53 class StorageTest(unittest.TestCase):
55 Test suite for stand alone functions in storage.py.
58 # Class specific variables. Initialized in setUp, used throughout tests.
59 path = "/tmp/ttfa-test"
66 """Remove base folder."""
67 shutil.rmtree(self.path)
69 def test_find_last_numeric_subfolder_no_subfolders(self):
70 """Test return of None when no subfolders are present."""
71 id = storage.find_last_numeric_subfolder(self.path)
72 self.assertEqual(id, None)
74 def test_find_last_numeric_subfolder_no_numeric_subfolders(self):
75 """Test return of None when no numeric subfolders are present."""
76 create_non_numeric_subfolders(self.path)
78 id = storage.find_last_numeric_subfolder(self.path)
79 self.assertEqual(id, None)
81 def test_find_last_numeric_subfolder_only_numeric_subfolders_sequential(self):
83 Test return of correct id when sequential numeric named
84 subfolders only are present (1, 2, 3, ...).
86 create_numeric_subfolders_sequential(self.path)
88 id = storage.find_last_numeric_subfolder(self.path)
89 self.assertEqual(id, 5)
91 def test_find_last_numeric_subfolder_only_numeric_subfolders_nonsequential(self):
93 Test return of correct id when nonsequential numeric named
94 subfolders only are present (32, 5, 423, ...).
96 create_numeric_subfolders_non_sequential(self.path)
98 id = storage.find_last_numeric_subfolder(self.path)
99 self.assertEqual(id, 1401)
101 def test_find_last_numeric_subfolder_mixed_subfolders_sequential(self):
103 Test return of None when mixed named subfolders (numeric and
104 others) are present. Numeric folders are sequential.
106 create_non_numeric_subfolders(self.path)
107 create_numeric_subfolders_sequential(self.path)
109 id = storage.find_last_numeric_subfolder(self.path)
110 self.assertEqual(id, 5)
112 def test_find_last_numeric_subfolder_mixed_subfolders_nonsequential(self):
114 Test return of None when mixed named subfolders (numeric and
115 others) are present. Numeric folders are non-sequential.
117 create_non_numeric_subfolders(self.path)
118 create_numeric_subfolders_non_sequential(self.path)
120 id = storage.find_last_numeric_subfolder(self.path)
121 self.assertEqual(id, 1401)
123 def test_find_last_numeric_subfolder_mixed_subfolders_files(self):
125 Test return of None when mixed named subfolders (numeric and
126 others) together with files are present. Numeric folders are
129 create_non_numeric_subfolders(self.path)
130 create_numeric_subfolders_non_sequential(self.path)
131 create_numeric_files(self.path)
133 id = storage.find_last_numeric_subfolder(self.path)
134 self.assertEqual(id, 1401)
137 class TreeTextFileAccessTest(unittest.TestCase):
139 Test suite for TreeTextFileAccess class in storage.py.
142 # Class specific variables. Initialized in setUp, used throughout tests.
143 path = "/tmp/ttfa-test"
146 """Create folder. Initialize object. """
150 """Remove base folder."""
151 shutil.rmtree(self.path)
153 def test_add_swarm_folder_and_file_are_created(self):
154 # Create instance of class and add swarm.
155 a = storage.TreeTextFileAccess(self.path)
156 s = storage.Swarm(torrent_filename="fedora.torrent",
160 expected_swarm_subfolder = 1
161 expected_swarm_path = os.path.join(self.path,
162 str(expected_swarm_subfolder))
163 expected_swarm_config = os.path.join(expected_swarm_path, "swarm.conf")
165 self.assertEqual(os.path.isdir(expected_swarm_path), True)
166 self.assertEqual(os.path.isfile(expected_swarm_config), True)
168 def test_add_client_session_folder_and_file_are_created(self):
169 a = storage.TreeTextFileAccess(self.path)
172 s = storage.Swarm(torrent_filename="fedora.torrent",
176 # Add client session.
177 cs = storage.ClientSession(swarm_id=1, btclient="Tribler",
178 system_os="Linux", system_os_version="2.6.26",
179 system_ram=2048, system_cpu=3000, public_ip="141.85.224.201",
180 public_port="50500", ds_limit=300, us_limit=200)
181 a.add_client_session(cs)
183 expected_swarm_subfolder = 1
184 expected_session_subfolder = 1
185 expected_swarm_path = os.path.join(self.path,
186 str(expected_swarm_subfolder))
187 expected_session_path = os.path.join(expected_swarm_path,
188 str(expected_session_subfolder))
189 expected_session_config = os.path.join(expected_session_path,
190 "client_session.conf")
192 self.assertEqual(os.path.isdir(expected_session_path), True)
193 self.assertEqual(os.path.isfile(expected_session_config), True)
195 def test_add_peer_status_message_file_is_created(self):
196 a = storage.TreeTextFileAccess(self.path)
199 s = storage.Swarm(torrent_filename="fedora.torrent",
203 # Add client session.
204 cs = storage.ClientSession(swarm_id=1, btclient="Tribler",
205 system_os="Linux", system_os_version="2.6.26",
206 system_ram=2048, system_cpu=3000, public_ip="141.85.224.201",
207 public_port="50500", ds_limit=300, us_limit=200)
208 a.add_client_session(cs)
210 # Add peer status message.
211 msg = storage.PeerStatusMessage(swarm_id=1, client_session_id=1,
212 peer_ip="141.85.224.202", peer_port="12345",
213 download_speed=13, upload_speed=98)
214 a.add_peer_status_message(msg)
216 expected_swarm_subfolder = 1
217 expected_session_subfolder = 1
218 expected_swarm_path = os.path.join(self.path,
219 str(expected_swarm_subfolder))
220 expected_session_path = os.path.join(expected_swarm_path,
221 str(expected_session_subfolder))
222 expected_message_file = os.path.join(expected_session_path,
225 self.assertEqual(os.path.isfile(expected_message_file), True)
227 def test_add_peer_status_message_number_of_lines(self):
228 self.assertEqual(True, True)
230 def test_add_status_message_file_is_created(self):
231 a = storage.TreeTextFileAccess(self.path)
234 s = storage.Swarm(torrent_filename="fedora.torrent",
238 # Add client session.
239 cs = storage.ClientSession(swarm_id=1, btclient="Tribler",
240 system_os="Linux", system_os_version="2.6.26",
241 system_ram=2048, system_cpu=3000, public_ip="141.85.224.201",
242 public_port="50500", ds_limit=300, us_limit=200)
243 a.add_client_session(cs)
245 # Add status message.
246 msg = storage.StatusMessage(swarm_id=1, client_session_id=1,
247 num_peers=10, num_dht_peers=3, download_speed=102,
248 upload_speed=99, download_size=10213, upload_size=3301)
249 a.add_status_message(msg)
251 expected_swarm_subfolder = 1
252 expected_session_subfolder = 1
253 expected_swarm_path = os.path.join(self.path,
254 str(expected_swarm_subfolder))
255 expected_session_path = os.path.join(expected_swarm_path,
256 str(expected_session_subfolder))
257 expected_message_file = os.path.join(expected_session_path,
260 self.assertEqual(os.path.isfile(expected_message_file), True)
262 def test_add_status_message_number_of_lines(self):
263 self.assertEqual(True, True)
265 def test_add_verbose_message_file_is_created(self):
266 a = storage.TreeTextFileAccess(self.path)
269 s = storage.Swarm(torrent_filename="fedora.torrent",
273 # Add client session.
274 cs = storage.ClientSession(swarm_id=1, btclient="Tribler",
275 system_os="Linux", system_os_version="2.6.26",
276 system_ram=2048, system_cpu=3000, public_ip="141.85.224.201",
277 public_port="50500", ds_limit=300, us_limit=200)
278 a.add_client_session(cs)
280 # Add verbose message.
281 msg = storage.VerboseMessage(swarm_id=1, client_session_id=1,
282 transfer_direction="send", peer_ip="141.85.224.202",
283 peer_port="12345", message_type="CHOKE")
284 a.add_verbose_message(msg)
286 expected_swarm_subfolder = 1
287 expected_session_subfolder = 1
288 expected_swarm_path = os.path.join(self.path,
289 str(expected_swarm_subfolder))
290 expected_session_path = os.path.join(expected_swarm_path,
291 str(expected_session_subfolder))
292 expected_message_file = os.path.join(expected_session_path,
295 self.assertEqual(os.path.isfile(expected_message_file), True)
297 def test_add_verbose_message_number_of_lines(self):
298 self.assertEqual(True, True)
300 class SQLiteDatabaseAccessTest(unittest.TestCase):
302 Test suite for SQLiteDatabaseAccess class in storage.py.
305 # Class specific variables. Initialized in setUp, used throughout tests.
307 sql_create_script = "p2p-log-sqlite.sql"
308 sql_init_script = "p2p-init-test.sql"
309 expected_swarm_count = 0
310 expected_session_count = 0
311 expected_statmsg_count = 0
312 expected_pstatmsg_count = 0
313 expected_verbmsg_count = 0
315 def get_swarm_count(self):
316 """ Retrieve number of entries in `swarms` table. """
317 conn = sqlite3.connect(self.database)
318 cursor = conn.cursor()
320 cursor.execute('SELECT COUNT(id) FROM swarms')
321 count = cursor.fetchone()[0]
328 def get_session_count(self):
329 """ Retrieve number of entries in `client_sessions` table. """
330 conn = sqlite3.connect(self.database)
331 cursor = conn.cursor()
333 cursor.execute('SELECT COUNT(id) FROM client_sessions')
334 count = cursor.fetchone()[0]
341 def get_statmsg_count(self):
342 """ Retrieve number of entries in `status_messages` table. """
343 conn = sqlite3.connect(self.database)
344 cursor = conn.cursor()
346 cursor.execute('SELECT COUNT(id) FROM status_messages')
347 count = cursor.fetchone()[0]
354 def get_pstatmsg_count(self):
355 """ Retrieve number of entries in `peer_status_messages` table. """
356 conn = sqlite3.connect(self.database)
357 cursor = conn.cursor()
359 cursor.execute('SELECT COUNT(id) FROM peer_status_messages')
360 count = cursor.fetchone()[0]
367 def get_verbmsg_count(self):
368 """ Retrieve number of entries in `verbose_messages` table. """
369 conn = sqlite3.connect(self.database)
370 cursor = conn.cursor()
372 cursor.execute('SELECT COUNT(id) FROM verbose_messages')
373 count = cursor.fetchone()[0]
381 """ Create database file and instantiate objects. """
382 # Create database file. Create tables and indices.
383 # TODO: Check exceptions.
384 sql_create_script_path = os.path.join(os.path.dirname(__file__),
385 self.sql_create_script)
386 f = open(sql_create_script_path, 'r')
387 p = subprocess.Popen(["sqlite3", self.database], stdin=f)
388 ret = os.waitpid(p.pid, 0)[1]
392 # TODO: Check exceptions.
393 sql_init_script_path = os.path.join(os.path.dirname(__file__),
394 self.sql_init_script)
395 f = open(sql_init_script_path, 'r')
396 p = subprocess.Popen(["sqlite3", self.database], stdin=f)
397 ret = os.waitpid(p.pid, 0)[1]
400 # Initialize to number of table entries inserted in the init script.
401 self.expected_swarm_count = 2
402 self.expected_session_count = 2
403 self.expected_statmsg_count = 2
404 self.expected_pstatmsg_count = 2
405 self.expected_verbmsg_count = 2
408 """ Close connection and remove database file. """
409 os.remove(self.database)
411 def test_add_swarm_new_entry_in_table(self):
413 s = storage.Swarm(torrent_filename="fedora.torrent",
415 a = storage.SQLiteDatabaseAccess(self.database)
419 self.expected_swarm_count = self.expected_swarm_count + 1
421 # Select number of swarms.
422 swarm_count = self.get_swarm_count()
424 self.assertEqual(swarm_count, self.expected_swarm_count)
426 def test_add_client_session_new_entry_in_table(self):
427 # Add new client session.
428 cs = storage.ClientSession(swarm_id=1, btclient="Tribler",
429 system_os="Linux", system_os_version="2.6.26",
430 system_ram=2048, system_cpu=3000, public_ip="141.85.224.201",
431 public_port="50500", ds_limit=300, us_limit=200)
432 a = storage.SQLiteDatabaseAccess(self.database)
434 a.add_client_session(cs)
436 self.expected_session_count = self.expected_session_count + 1
438 # Select number of swarms.
439 session_count = self.get_session_count()
441 self.assertEqual(session_count, self.expected_session_count)
443 def test_add_status_message_new_entry_in_table(self):
444 # Add new status message.
445 ts = datetime.datetime.strptime("2010-09-12 08:43:15",
447 msg = storage.StatusMessage(client_session_id=1, timestamp=ts,
448 num_peers=10, num_dht_peers=3, download_speed=102,
449 upload_speed=99, download_size=10213, upload_size=3301)
450 a = storage.SQLiteDatabaseAccess(self.database)
452 a.add_status_message(msg)
454 self.expected_statmsg_count = self.expected_statmsg_count + 1
456 # Select number of status messages.
457 statmsg_count = self.get_statmsg_count()
459 self.assertEqual(statmsg_count, self.expected_statmsg_count)
461 def test_add_peer_status_message_new_entry_in_table(self):
462 # Add new peer status message.
463 ts = datetime.datetime.strptime("2010-09-12 13:43:25",
465 msg = storage.PeerStatusMessage(client_session_id=1, timestamp=ts,
466 peer_ip="141.85.224.202", peer_port="12345",
467 download_speed=13, upload_speed=98)
468 a = storage.SQLiteDatabaseAccess(self.database)
470 a.add_peer_status_message(msg)
472 self.expected_pstatmsg_count = self.expected_pstatmsg_count + 1
474 # Select number of peer status messages.
475 pstatmsg_count = self.get_pstatmsg_count()
477 self.assertEqual(pstatmsg_count, self.expected_pstatmsg_count)
479 def test_add_verbose_message_new_entry_in_databas(self):
480 # Add new verbose message.
481 ts = datetime.datetime.strptime("2010-09-12 13:43:24",
483 msg = storage.VerboseMessage(client_session_id=1, timestamp=ts,
484 transfer_direction="send", peer_ip="141.85.224.202",
485 peer_port="12345", message_type="CHOKE")
486 a = storage.SQLiteDatabaseAccess(self.database)
488 a.add_verbose_message(msg)
490 self.expected_verbmsg_count = self.expected_verbmsg_count + 1
492 # Select number of verbose messages.
493 verbmsg_count = self.get_verbmsg_count()
495 self.assertEqual(verbmsg_count, self.expected_verbmsg_count)
498 class MySQLDatabaseAccessTest(unittest.TestCase):
500 Test suite for MySQLDatabaseAccess class in storage.py.
503 # Class specific variables. Initialized in setUp, used throughout tests.
504 database = "p2p_test"
506 password = "p2p4th3m45535"
509 sql_create_script = "p2p-log-mysql.sql"
510 sql_init_script = "p2p-init-test-mysql.sql"
511 expected_swarm_count = 0
512 expected_session_count = 0
513 expected_statmsg_count = 0
514 expected_pstatmsg_count = 0
515 expected_verbmsg_count = 0
517 def get_swarm_count(self):
518 """ Retrieve number of entries in `swarms` table. """
520 self.cursor.execute('SELECT COUNT(id) FROM swarms')
521 count = self.cursor.fetchone()[0]
525 def get_session_count(self):
526 """ Retrieve number of entries in `client_sessions` table. """
527 self.cursor.execute('SELECT COUNT(id) FROM client_sessions')
528 count = self.cursor.fetchone()[0]
532 def get_statmsg_count(self):
533 """ Retrieve number of entries in `status_messages` table. """
534 self.cursor.execute('SELECT COUNT(id) FROM status_messages')
535 count = self.cursor.fetchone()[0]
539 def get_pstatmsg_count(self):
540 """ Retrieve number of entries in `peer_status_messages` table. """
541 self.cursor.execute('SELECT COUNT(id) FROM peer_status_messages')
542 count = self.cursor.fetchone()[0]
546 def get_verbmsg_count(self):
547 """ Retrieve number of entries in `verbose_messages` table. """
548 self.cursor.execute('SELECT COUNT(id) FROM verbose_messages')
549 count = self.cursor.fetchone()[0]
554 """ Create database and instantiate objects. """
555 # Create database. Create tables and indices.
556 # TODO: Check exceptions.
558 sql_create_script_path = os.path.join(os.path.dirname(__file__),
559 self.sql_create_script)
561 self.conn = MySQLdb.Connection(user=self.user, passwd=self.password)
562 self.cursor = self.conn.cursor()
564 self.cursor.execute("CREATE DATABASE %s" %self.database)
565 self.cursor.execute("USE %s" %self.database)
567 f = open(sql_create_script_path, 'r')
568 p = subprocess.Popen(["mysql", self.database, "--user=%s" %self.user,
569 "--password=%s" %self.password], stdin=f)
570 ret = os.waitpid(p.pid, 0)[1]
574 # TODO: Check exceptions.
575 sql_init_script_path = os.path.join(os.path.dirname(__file__),
576 self.sql_init_script)
578 f = open(sql_init_script_path, 'r')
579 p = subprocess.Popen(["mysql", self.database, "--user=%s" %self.user,
580 "--password=%s" %self.password], stdin=f)
581 ret = os.waitpid(p.pid, 0)[1]
584 # Initialize to number of table entries inserted in the init script.
585 self.expected_swarm_count = 2
586 self.expected_session_count = 2
587 self.expected_statmsg_count = 2
588 self.expected_pstatmsg_count = 2
589 self.expected_verbmsg_count = 2
592 """ Delete database and close connection. """
593 self.cursor.execute("DROP DATABASE %s" %self.database)
597 def test_add_swarm_new_entry_in_table(self):
599 s = storage.Swarm(torrent_filename="fedora.torrent",
601 a = storage.MySQLDatabaseAccess(self.database)
605 self.expected_swarm_count = self.expected_swarm_count + 1
607 # Select number of swarms.
608 swarm_count = self.get_swarm_count()
610 self.assertEqual(swarm_count, self.expected_swarm_count)
612 def test_add_client_session_new_entry_in_table(self):
613 # Add new client session.
614 cs = storage.ClientSession(swarm_id=1, btclient="Tribler",
615 system_os="Linux", system_os_version="2.6.26",
616 system_ram=2048, system_cpu=3000, public_ip="141.85.224.201",
617 public_port="50500", ds_limit=300, us_limit=200)
618 a = storage.MySQLDatabaseAccess(self.database)
620 a.add_client_session(cs)
622 self.expected_session_count = self.expected_session_count + 1
624 # Select number of swarms.
625 session_count = self.get_session_count()
627 self.assertEqual(session_count, self.expected_session_count)
629 def test_add_status_message_new_entry_in_table(self):
630 # Add new status message.
631 ts = datetime.datetime.strptime("2010-09-12 08:43:15",
633 msg = storage.StatusMessage(client_session_id=1, timestamp=ts,
634 num_peers=10, num_dht_peers=3, download_speed=102,
635 upload_speed=99, download_size=10213, upload_size=3301)
636 a = storage.MySQLDatabaseAccess(self.database)
638 a.add_status_message(msg)
640 self.expected_statmsg_count = self.expected_statmsg_count + 1
642 # Select number of status messages.
643 statmsg_count = self.get_statmsg_count()
645 self.assertEqual(statmsg_count, self.expected_statmsg_count)
647 def test_add_peer_status_message_new_entry_in_table(self):
648 # Add new peer status message.
649 ts = datetime.datetime.strptime("2010-09-12 13:43:25",
651 msg = storage.PeerStatusMessage(client_session_id=1, timestamp=ts,
652 peer_ip="141.85.224.202", peer_port="12345",
653 download_speed=13, upload_speed=98)
654 a = storage.MySQLDatabaseAccess(self.database)
656 a.add_peer_status_message(msg)
658 self.expected_pstatmsg_count = self.expected_pstatmsg_count + 1
660 # Select number of peer status messages.
661 pstatmsg_count = self.get_pstatmsg_count()
663 self.assertEqual(pstatmsg_count, self.expected_pstatmsg_count)
665 def test_add_verbose_message_new_entry_in_databas(self):
666 # Add new verbose message.
667 ts = datetime.datetime.strptime("2010-09-12 13:43:24",
669 msg = storage.VerboseMessage(client_session_id=1, timestamp=ts,
670 transfer_direction="send", peer_ip="141.85.224.202",
671 peer_port="12345", message_type="CHOKE")
672 a = storage.MySQLDatabaseAccess(self.database)
674 a.add_verbose_message(msg)
676 self.expected_verbmsg_count = self.expected_verbmsg_count + 1
678 # Select number of verbose messages.
679 verbmsg_count = self.get_verbmsg_count()
681 self.assertEqual(verbmsg_count, self.expected_verbmsg_count)
683 if __name__ == "__main__":