66c15dec0507756ce6aaa538823cd0f4ef605037
[cs-p2p-next.git] / ppf / new / tests / test_storage.py
1 """
2 Test suite for storage. Uses unittest module.
3
4 2011, Razvan Deaconescu, razvan.deaconescu@cs.pub.ro.
5 2011, Mariana Marasoiu, mariana.marasoiu@gmail.com
6 """
7
8 import unittest
9 import os
10 import os.path
11 import shutil
12 import sqlite3
13 import MySQLdb
14 import subprocess
15 import sys
16 import datetime
17
18 import storage
19
20 def create_non_numeric_subfolders(path):
21     os.mkdir(os.path.join(path, "ana"))
22     os.mkdir(os.path.join(path, "are"))
23     os.mkdir(os.path.join(path, "mere"))
24     os.mkdir(os.path.join(path, "si"))
25     os.mkdir(os.path.join(path, "pere"))
26
27 def create_numeric_subfolders_sequential(path):
28     os.mkdir(os.path.join(path, "1"))
29     os.mkdir(os.path.join(path, "2"))
30     os.mkdir(os.path.join(path, "3"))
31     os.mkdir(os.path.join(path, "4"))
32     os.mkdir(os.path.join(path, "5"))
33
34 def create_numeric_subfolders_non_sequential(path):
35     os.mkdir(os.path.join(path, "32"))
36     os.mkdir(os.path.join(path, "5"))
37     os.mkdir(os.path.join(path, "423"))
38     os.mkdir(os.path.join(path, "1401"))
39     os.mkdir(os.path.join(path, "92"))
40
41 def create_numeric_files(path):
42     f = open(os.path.join(path, "33"), 'w')
43     f.close()
44     f = open(os.path.join(path, "6"), 'w')
45     f.close()
46     f = open(os.path.join(path, "424"), 'w')
47     f.close()
48     f = open(os.path.join(path, "1402"), 'w')
49     f.close()
50     f = open(os.path.join(path, "93"), 'w')
51     f.close()
52
53 class StorageTest(unittest.TestCase):
54
55     """Test suite for stand alone functions in storage.py."""
56
57     # Class specific variables. Initialized in setUp, used throughout tests.
58     path = "/tmp/ttfa-test"
59
60     def setUp(self):
61         """Create folder."""
62         os.mkdir(self.path)
63
64     def tearDown(self):
65         """Remove base folder."""
66         shutil.rmtree(self.path)
67
68     def test_find_last_numeric_subfolder_no_subfolders(self):
69         """Test return of None when no subfolders are present."""
70         id = storage.find_last_numeric_subfolder(self.path)
71         self.assertEqual(id, None)
72
73     def test_find_last_numeric_subfolder_no_numeric_subfolders(self):
74         """Test return of None when no numeric subfolders are present."""
75         create_non_numeric_subfolders(self.path)
76
77         id = storage.find_last_numeric_subfolder(self.path)
78         self.assertEqual(id, None)
79
80     def test_find_last_numeric_subfolder_only_numeric_subfolders_sequential(
81             self):
82         """
83         Test return of correct id when sequential numeric named
84         subfolders only are present (1, 2, 3, ...).
85         """
86         create_numeric_subfolders_sequential(self.path)
87
88         id = storage.find_last_numeric_subfolder(self.path)
89         self.assertEqual(id, 5)
90
91     def test_find_last_numeric_subfolder_only_numeric_subfolders_nonsequential(
92             self):
93         """
94         Test return of correct id when nonsequential numeric named
95         subfolders only are present (32, 5, 423, ...).
96         """
97         create_numeric_subfolders_non_sequential(self.path)
98
99         id = storage.find_last_numeric_subfolder(self.path)
100         self.assertEqual(id, 1401)
101
102     def test_find_last_numeric_subfolder_mixed_subfolders_sequential(self):
103         """
104         Test return of None when mixed named subfolders (numeric and
105         others) are present. Numeric folders are sequential.
106         """
107         create_non_numeric_subfolders(self.path)
108         create_numeric_subfolders_sequential(self.path)
109
110         id = storage.find_last_numeric_subfolder(self.path)
111         self.assertEqual(id, 5)
112
113     def test_find_last_numeric_subfolder_mixed_subfolders_nonsequential(self):
114         """
115         Test return of None when mixed named subfolders (numeric and
116         others) are present. Numeric folders are non-sequential.
117         """
118         create_non_numeric_subfolders(self.path)
119         create_numeric_subfolders_non_sequential(self.path)
120
121         id = storage.find_last_numeric_subfolder(self.path)
122         self.assertEqual(id, 1401)
123
124     def test_find_last_numeric_subfolder_mixed_subfolders_files(self):
125         """
126         Test return of None when mixed named subfolders (numeric and
127         others) together with files are present. Numeric folders are
128         non-sequential.
129         """
130         create_non_numeric_subfolders(self.path)
131         create_numeric_subfolders_non_sequential(self.path)
132         create_numeric_files(self.path)
133
134         id = storage.find_last_numeric_subfolder(self.path)
135         self.assertEqual(id, 1401)
136
137
138 class TreeTextFileAccessTest(unittest.TestCase):
139
140     """Test suite for TreeTextFileAccess class in storage.py."""
141
142     # Class specific variables. Initialized in setUp, used throughout tests.
143     path = "/tmp/ttfa-test"
144
145     def setUp(self):
146         """Create folder. Initialize object. """
147         os.mkdir(self.path)
148
149     def tearDown(self):
150         """Remove base folder."""
151         shutil.rmtree(self.path)
152
153     def test_add_swarm_folder_and_file_are_created(self):
154         # Create instance of class and add swarm.
155         a = storage.TreeTextFileAccess(self.path)
156         s = storage.Swarm(torrent_filename="fedora.torrent",
157                           data_size=102400)
158         a.add_swarm(s)
159
160         expected_swarm_subfolder = 1
161         expected_swarm_path = os.path.join(self.path,
162                                            str(expected_swarm_subfolder))
163         expected_swarm_config = os.path.join(expected_swarm_path, "swarm.conf")
164
165         self.assertEqual(os.path.isdir(expected_swarm_path), True)
166         self.assertEqual(os.path.isfile(expected_swarm_config), True)
167
168     def test_add_client_session_folder_and_file_are_created(self):
169         a = storage.TreeTextFileAccess(self.path)
170
171         # Add swarm.
172         s = storage.Swarm(torrent_filename="fedora.torrent",
173                           data_size=102400)
174         a.add_swarm(s)
175
176         # Add client session.
177         cs = storage.ClientSession(swarm_id=1, btclient="Tribler",
178                                    system_os="Linux",
179                                    system_os_version="2.6.26",
180                                    system_ram=2048, system_cpu=3000,
181                                    public_ip="141.85.224.201",
182                                    public_port="50500", ds_limit=300,
183                                    us_limit=200)
184         a.add_client_session(cs)
185
186         expected_swarm_subfolder = 1
187         expected_session_subfolder = 1
188         expected_swarm_path = os.path.join(self.path,
189                                            str(expected_swarm_subfolder))
190         expected_session_path = os.path.join(expected_swarm_path,
191                                              str(expected_session_subfolder))
192         expected_session_config = os.path.join(expected_session_path,
193                                                "client_session.conf")
194
195         self.assertEqual(os.path.isdir(expected_session_path), True)
196         self.assertEqual(os.path.isfile(expected_session_config), True)
197
198     def test_add_peer_status_message_file_is_created(self):
199         a = storage.TreeTextFileAccess(self.path)
200
201         # Add swarm.
202         s = storage.Swarm(torrent_filename="fedora.torrent",
203                           data_size=102400)
204         a.add_swarm(s)
205
206         # Add client session.
207         cs = storage.ClientSession(swarm_id=1, btclient="Tribler",
208                                    system_os="Linux",
209                                    system_os_version="2.6.26",
210                                    system_ram=2048, system_cpu=3000,
211                                    public_ip="141.85.224.201",
212                                    public_port="50500", ds_limit=300,
213                                    us_limit=200)
214         a.add_client_session(cs)
215
216         # Add peer status message.
217         msg = storage.PeerStatusMessage(swarm_id=1, client_session_id=1,
218                                         peer_ip="141.85.224.202",
219                                         peer_port="12345", download_speed=13,
220                                         upload_speed=98)
221         a.add_peer_status_message(msg)
222
223         expected_swarm_subfolder = 1
224         expected_session_subfolder = 1
225         expected_swarm_path = os.path.join(self.path,
226                                            str(expected_swarm_subfolder))
227         expected_session_path = os.path.join(expected_swarm_path,
228                                              str(expected_session_subfolder))
229         expected_message_file = os.path.join(expected_session_path,
230                                              "peer_status.txt")
231
232         self.assertEqual(os.path.isfile(expected_message_file), True)
233
234     def test_add_peer_status_message_number_of_lines(self):
235         self.assertEqual(True, True)
236
237     def test_add_status_message_file_is_created(self):
238         a = storage.TreeTextFileAccess(self.path)
239
240         # Add swarm.
241         s = storage.Swarm(torrent_filename="fedora.torrent",
242                           data_size=102400)
243         a.add_swarm(s)
244
245         # Add client session.
246         cs = storage.ClientSession(swarm_id=1, btclient="Tribler",
247                                    system_os="Linux",
248                                    system_os_version="2.6.26",
249                                    system_ram=2048, system_cpu=3000,
250                                    public_ip="141.85.224.201",
251                                    public_port="50500", ds_limit=300,
252                                    us_limit=200)
253         a.add_client_session(cs)
254
255         # Add status message.
256         msg = storage.StatusMessage(swarm_id=1, client_session_id=1,
257                                     num_peers=10, num_dht_peers=3,
258                                     download_speed=102, upload_speed=99,
259                                     download_size=10213, upload_size=3301)
260         a.add_status_message(msg)
261
262         expected_swarm_subfolder = 1
263         expected_session_subfolder = 1
264         expected_swarm_path = os.path.join(self.path,
265                                            str(expected_swarm_subfolder))
266         expected_session_path = os.path.join(expected_swarm_path,
267                                              str(expected_session_subfolder))
268         expected_message_file = os.path.join(expected_session_path,
269                                              "status.txt")
270
271         self.assertEqual(os.path.isfile(expected_message_file), True)
272
273     def test_add_status_message_number_of_lines(self):
274         self.assertEqual(True, True)
275
276     def test_add_verbose_message_file_is_created(self):
277         a = storage.TreeTextFileAccess(self.path)
278
279         # Add swarm.
280         s = storage.Swarm(torrent_filename="fedora.torrent",
281                           data_size=102400)
282         a.add_swarm(s)
283
284         # Add client session.
285         cs = storage.ClientSession(swarm_id=1, btclient="Tribler",
286                                    system_os="Linux",
287                                    system_os_version="2.6.26",
288                                    system_ram=2048, system_cpu=3000,
289                                    public_ip="141.85.224.201",
290                                    public_port="50500", ds_limit=300,
291                                    us_limit=200)
292         a.add_client_session(cs)
293
294         # Add verbose message.
295         msg = storage.VerboseMessage(swarm_id=1, client_session_id=1,
296                                      transfer_direction="send",
297                                      peer_ip="141.85.224.202",
298                                      peer_port="12345", message_type="CHOKE")
299         a.add_verbose_message(msg)
300
301         expected_swarm_subfolder = 1
302         expected_session_subfolder = 1
303         expected_swarm_path = os.path.join(self.path,
304                                            str(expected_swarm_subfolder))
305         expected_session_path = os.path.join(expected_swarm_path,
306                                              str(expected_session_subfolder))
307         expected_message_file = os.path.join(expected_session_path,
308                                              "verbose.txt")
309
310         self.assertEqual(os.path.isfile(expected_message_file), True)
311
312     def test_add_verbose_message_number_of_lines(self):
313         self.assertEqual(True, True)
314
315
316 class SQLiteDatabaseAccessTest(unittest.TestCase):
317
318     """Test suite for SQLiteDatabaseAccess class in storage.py."""
319
320     # Class specific variables. Initialized in setUp, used throughout tests.
321     database = "test.db"
322     sql_create_script = "p2p-log-sqlite.sql"
323     sql_init_script = "p2p-init-test.sql"
324     expected_swarm_count = 0
325     expected_session_count = 0
326     expected_statmsg_count = 0
327     expected_pstatmsg_count = 0
328     expected_verbmsg_count = 0
329
330     def get_swarm_count(self):
331         """Retrieve number of entries in `swarms` table."""
332         conn = sqlite3.connect(self.database)
333         cursor = conn.cursor()
334
335         cursor.execute('SELECT COUNT(id) FROM swarms')
336         count = cursor.fetchone()[0]
337
338         cursor.close()
339         conn.close()
340
341         return count
342
343     def get_session_count(self):
344         """Retrieve number of entries in `client_sessions` table."""
345         conn = sqlite3.connect(self.database)
346         cursor = conn.cursor()
347
348         cursor.execute('SELECT COUNT(id) FROM client_sessions')
349         count = cursor.fetchone()[0]
350
351         cursor.close()
352         conn.close()
353
354         return count
355
356     def get_statmsg_count(self):
357         """Retrieve number of entries in `status_messages` table."""
358         conn = sqlite3.connect(self.database)
359         cursor = conn.cursor()
360
361         cursor.execute('SELECT COUNT(id) FROM status_messages')
362         count = cursor.fetchone()[0]
363
364         cursor.close()
365         conn.close()
366
367         return count
368
369     def get_pstatmsg_count(self):
370         """Retrieve number of entries in `peer_status_messages` table."""
371         conn = sqlite3.connect(self.database)
372         cursor = conn.cursor()
373
374         cursor.execute('SELECT COUNT(id) FROM peer_status_messages')
375         count = cursor.fetchone()[0]
376
377         cursor.close()
378         conn.close()
379
380         return count
381
382     def get_verbmsg_count(self):
383         """Retrieve number of entries in `verbose_messages` table."""
384         conn = sqlite3.connect(self.database)
385         cursor = conn.cursor()
386
387         cursor.execute('SELECT COUNT(id) FROM verbose_messages')
388         count = cursor.fetchone()[0]
389
390         cursor.close()
391         conn.close()
392
393         return count
394
395     def setUp(self):
396         """Create database file and instantiate objects."""
397         # Create database file. Create tables and indices.
398         # TODO: Check exceptions.
399         sql_create_script_path = os.path.join(os.path.dirname(__file__),
400                                               self.sql_create_script)
401         f = open(sql_create_script_path, 'r')
402         p = subprocess.Popen(["sqlite3", self.database], stdin=f)
403         ret = os.waitpid(p.pid, 0)[1]
404         f.close()
405
406         # Populate database.
407         # TODO: Check exceptions.
408         sql_init_script_path = os.path.join(os.path.dirname(__file__),
409                                             self.sql_init_script)
410         f = open(sql_init_script_path, 'r')
411         p = subprocess.Popen(["sqlite3", self.database], stdin=f)
412         ret = os.waitpid(p.pid, 0)[1]
413         f.close()
414
415         # Initialize to number of table entries inserted in the init script.
416         self.expected_swarm_count = 2
417         self.expected_session_count = 2
418         self.expected_statmsg_count = 2
419         self.expected_pstatmsg_count = 2
420         self.expected_verbmsg_count = 2
421
422     def tearDown(self):
423         """Close connection and remove database file."""
424         os.remove(self.database)
425
426     def test_add_swarm_new_entry_in_table(self):
427         # Add new swarm.
428         s = storage.Swarm(torrent_filename="fedora.torrent",
429                           data_size=102400)
430         a = storage.SQLiteDatabaseAccess(self.database)
431         a.connect()
432         a.add_swarm(s)
433         a.disconnect()
434         self.expected_swarm_count = self.expected_swarm_count + 1
435
436         # Select number of swarms.
437         swarm_count = self.get_swarm_count()
438
439         self.assertEqual(swarm_count, self.expected_swarm_count)
440
441     def test_add_client_session_new_entry_in_table(self):
442         # Add new client session.
443         cs = storage.ClientSession(swarm_id=1, btclient="Tribler",
444                                    system_os="Linux",
445                                    system_os_version="2.6.26",
446                                    system_ram=2048, system_cpu=3000,
447                                    public_ip="141.85.224.201",
448                                    public_port="50500", ds_limit=300,
449                                    us_limit=200)
450         a = storage.SQLiteDatabaseAccess(self.database)
451         a.connect()
452         a.add_client_session(cs)
453         a.disconnect()
454         self.expected_session_count = self.expected_session_count + 1
455
456         # Select number of swarms.
457         session_count = self.get_session_count()
458
459         self.assertEqual(session_count, self.expected_session_count)
460
461     def test_add_status_message_new_entry_in_table(self):
462         # Add new status message.
463         ts = datetime.datetime.strptime("2010-09-12 08:43:15",
464                                         "%Y-%m-%d %H:%M:%S")
465         msg = storage.StatusMessage(client_session_id=1, timestamp=ts,
466                                     num_peers=10, num_dht_peers=3,
467                                     download_speed=102, upload_speed=99,
468                                     download_size=10213, upload_size=3301)
469         a = storage.SQLiteDatabaseAccess(self.database)
470         a.connect()
471         a.add_status_message(msg)
472         a.disconnect()
473         self.expected_statmsg_count = self.expected_statmsg_count + 1
474
475         # Select number of status messages.
476         statmsg_count = self.get_statmsg_count()
477
478         self.assertEqual(statmsg_count, self.expected_statmsg_count)
479
480     def test_add_peer_status_message_new_entry_in_table(self):
481         # Add new peer status message.
482         ts = datetime.datetime.strptime("2010-09-12 13:43:25",
483                                         "%Y-%m-%d %H:%M:%S")
484         msg = storage.PeerStatusMessage(client_session_id=1, timestamp=ts,
485                                         peer_ip="141.85.224.202",
486                                         peer_port="12345", download_speed=13,
487                                         upload_speed=98)
488         a = storage.SQLiteDatabaseAccess(self.database)
489         a.connect()
490         a.add_peer_status_message(msg)
491         a.disconnect()
492         self.expected_pstatmsg_count = self.expected_pstatmsg_count + 1
493
494         # Select number of peer status messages.
495         pstatmsg_count = self.get_pstatmsg_count()
496
497         self.assertEqual(pstatmsg_count, self.expected_pstatmsg_count)
498
499     def test_add_verbose_message_new_entry_in_database(self):
500         # Add new verbose message.
501         ts = datetime.datetime.strptime("2010-09-12 13:43:24",
502                                         "%Y-%m-%d %H:%M:%S")
503         msg = storage.VerboseMessage(client_session_id=1, timestamp=ts,
504                                      transfer_direction="send",
505                                      peer_ip="141.85.224.202",
506                                      peer_port="12345",
507                                      message_type="CHOKE")
508         a = storage.SQLiteDatabaseAccess(self.database)
509         a.connect()
510         a.add_verbose_message(msg)
511         a.disconnect()
512         self.expected_verbmsg_count = self.expected_verbmsg_count + 1
513
514         # Select number of verbose messages.
515         verbmsg_count = self.get_verbmsg_count()
516
517         self.assertEqual(verbmsg_count, self.expected_verbmsg_count)
518
519
520 class MySQLDatabaseAccessTest(unittest.TestCase):
521
522     """Test suite for MySQLDatabaseAccess class in storage.py."""
523
524     # Class specific variables. Initialized in setUp, used throughout tests.
525     database = "p2p_test"
526     user = "root"
527     password = "p2p4th3m45535"
528     conn = None
529     cursor = None
530     sql_create_script = "p2p-log-mysql.sql"
531     sql_init_script = "p2p-init-test-mysql.sql"
532     expected_swarm_count = 0
533     expected_session_count = 0
534     expected_statmsg_count = 0
535     expected_pstatmsg_count = 0
536     expected_verbmsg_count = 0
537
538     def get_swarm_count(self):
539         """Retrieve number of entries in `swarms` table."""
540         self.cursor.execute('SELECT COUNT(id) FROM swarms')
541         count = self.cursor.fetchone()[0]
542
543         return count
544
545     def get_session_count(self):
546         """Retrieve number of entries in `client_sessions` table."""
547         self.cursor.execute('SELECT COUNT(id) FROM client_sessions')
548         count = self.cursor.fetchone()[0]
549
550         return count
551
552     def get_statmsg_count(self):
553         """Retrieve number of entries in `status_messages` table."""
554         self.cursor.execute('SELECT COUNT(id) FROM status_messages')
555         count = self.cursor.fetchone()[0]
556
557         return count
558
559     def get_pstatmsg_count(self):
560         """Retrieve number of entries in `peer_status_messages` table."""
561         self.cursor.execute('SELECT COUNT(id) FROM peer_status_messages')
562         count = self.cursor.fetchone()[0]
563
564         return count
565
566     def get_verbmsg_count(self):
567         """Retrieve number of entries in `verbose_messages` table."""
568         self.cursor.execute('SELECT COUNT(id) FROM verbose_messages')
569         count = self.cursor.fetchone()[0]
570
571         return count
572
573     def setUp(self):
574         """Create database and instantiate objects."""
575         # Create database. Create tables and indices.
576         # TODO: Check exceptions.
577
578         sql_create_script_path = os.path.join(os.path.dirname(__file__),
579                                               self.sql_create_script)
580
581         self.conn = MySQLdb.Connection(user=self.user, passwd=self.password)
582         self.cursor = self.conn.cursor()
583
584         self.cursor.execute("CREATE DATABASE %s" %self.database)
585         self.cursor.execute("USE %s" %self.database)
586
587         f = open(sql_create_script_path, 'r')
588         p = subprocess.Popen(["mysql", self.database, "--user=%s" %self.user,
589                              "--password=%s" %self.password], stdin=f)
590         ret = os.waitpid(p.pid, 0)[1]
591         f.close()
592
593         # Populate database.
594         # TODO: Check exceptions.
595         sql_init_script_path = os.path.join(os.path.dirname(__file__),
596                                             self.sql_init_script)
597
598         f = open(sql_init_script_path, 'r')
599         p = subprocess.Popen(["mysql", self.database, "--user=%s" %self.user,
600                              "--password=%s" %self.password], stdin=f)
601         ret = os.waitpid(p.pid, 0)[1]
602         f.close()
603
604         # Initialize to number of table entries inserted in the init script.
605         self.expected_swarm_count = 2
606         self.expected_session_count = 2
607         self.expected_statmsg_count = 2
608         self.expected_pstatmsg_count = 2
609         self.expected_verbmsg_count = 2
610
611     def tearDown(self):
612         """ Delete database and close connection. """
613         self.cursor.execute("DROP DATABASE %s" %self.database)
614         self.cursor.close()
615         self.conn.close()
616
617     def test_add_swarm_new_entry_in_table(self):
618         # Add new swarm.
619         s = storage.Swarm(torrent_filename="fedora.torrent",
620                           data_size=102400)
621         a = storage.MySQLDatabaseAccess(self.database)
622         a.connect()
623         a.add_swarm(s)
624         a.disconnect()
625         self.expected_swarm_count = self.expected_swarm_count + 1
626
627         # Select number of swarms.
628         swarm_count = self.get_swarm_count()
629
630         self.assertEqual(swarm_count, self.expected_swarm_count)
631
632     def test_add_client_session_new_entry_in_table(self):
633         # Add new client session.
634         cs = storage.ClientSession(swarm_id=1, btclient="Tribler",
635                                    system_os="Linux",
636                                    system_os_version="2.6.26",
637                                    system_ram=2048, system_cpu=3000,
638                                    public_ip="141.85.224.201",
639                                    public_port="50500", ds_limit=300,
640                                    us_limit=200)
641         a = storage.MySQLDatabaseAccess(self.database)
642         a.connect()
643         a.add_client_session(cs)
644         a.disconnect()
645         self.expected_session_count = self.expected_session_count + 1
646
647         # Select number of swarms.
648         session_count = self.get_session_count()
649
650         self.assertEqual(session_count, self.expected_session_count)
651
652     def test_add_status_message_new_entry_in_table(self):
653         # Add new status message.
654         ts = datetime.datetime.strptime("2010-09-12 08:43:15",
655                                         "%Y-%m-%d %H:%M:%S")
656         msg = storage.StatusMessage(client_session_id=1, timestamp=ts,
657                                     num_peers=10, num_dht_peers=3,
658                                     download_speed=102, upload_speed=99,
659                                     download_size=10213, upload_size=3301)
660         a = storage.MySQLDatabaseAccess(self.database)
661         a.connect()
662         a.add_status_message(msg)
663         a.disconnect()
664         self.expected_statmsg_count = self.expected_statmsg_count + 1
665
666         # Select number of status messages.
667         statmsg_count = self.get_statmsg_count()
668
669         self.assertEqual(statmsg_count, self.expected_statmsg_count)
670
671     def test_add_peer_status_message_new_entry_in_table(self):
672         # Add new peer status message.
673         ts = datetime.datetime.strptime("2010-09-12 13:43:25",
674                                         "%Y-%m-%d %H:%M:%S")
675         msg = storage.PeerStatusMessage(client_session_id=1, timestamp=ts,
676                                         peer_ip="141.85.224.202",
677                                         peer_port="12345", download_speed=13,
678                                         upload_speed=98)
679         a = storage.MySQLDatabaseAccess(self.database)
680         a.connect()
681         a.add_peer_status_message(msg)
682         a.disconnect()
683         self.expected_pstatmsg_count = self.expected_pstatmsg_count + 1
684
685         # Select number of peer status messages.
686         pstatmsg_count = self.get_pstatmsg_count()
687
688         self.assertEqual(pstatmsg_count, self.expected_pstatmsg_count)
689
690     def test_add_verbose_message_new_entry_in_database(self):
691         # Add new verbose message.
692         ts = datetime.datetime.strptime("2010-09-12 13:43:24",
693                                         "%Y-%m-%d %H:%M:%S")
694         msg = storage.VerboseMessage(client_session_id=1, timestamp=ts,
695                                      transfer_direction="send",
696                                      peer_ip="141.85.224.202",
697                                      peer_port="12345",
698                                      message_type="CHOKE")
699         a = storage.MySQLDatabaseAccess(self.database)
700         a.connect()
701         a.add_verbose_message(msg)
702         a.disconnect()
703         self.expected_verbmsg_count = self.expected_verbmsg_count + 1
704
705         # Select number of verbose messages.
706         verbmsg_count = self.get_verbmsg_count()
707
708         self.assertEqual(verbmsg_count, self.expected_verbmsg_count)
709
710
711 if __name__ == "__main__":
712     unittest.main()