ppf/new: Use subprocess module instead of sqlparse. Connection to database
[cs-p2p-next.git] / ppf / new / tests / test_storage.py
1 """
2 Test suite for storage. Uses unittest module.
3
4 2011, Razvan Deaconescu, razvan.deaconescu@cs.pub.ro.
5 2011, Mariana Marasoiu, mariana.marasoiu@gmail.com
6 """
7
8 import unittest
9 import os
10 import os.path
11 import shutil
12 import sqlite3
13 import MySQLdb
14 import subprocess
15 import sys
16 import datetime
17
18 import storage
19
20 def create_non_numeric_subfolders(path):
21     os.mkdir(os.path.join(path, "ana"))
22     os.mkdir(os.path.join(path, "are"))
23     os.mkdir(os.path.join(path, "mere"))
24     os.mkdir(os.path.join(path, "si"))
25     os.mkdir(os.path.join(path, "pere"))
26
27 def create_numeric_subfolders_sequential(path):
28     os.mkdir(os.path.join(path, "1"))
29     os.mkdir(os.path.join(path, "2"))
30     os.mkdir(os.path.join(path, "3"))
31     os.mkdir(os.path.join(path, "4"))
32     os.mkdir(os.path.join(path, "5"))
33
34 def create_numeric_subfolders_non_sequential(path):
35     os.mkdir(os.path.join(path, "32"))
36     os.mkdir(os.path.join(path, "5"))
37     os.mkdir(os.path.join(path, "423"))
38     os.mkdir(os.path.join(path, "1401"))
39     os.mkdir(os.path.join(path, "92"))
40
41 def create_numeric_files(path):
42     f = open(os.path.join(path, "33"), 'w')
43     f.close()
44     f = open(os.path.join(path, "6"), 'w')
45     f.close()
46     f = open(os.path.join(path, "424"), 'w')
47     f.close()
48     f = open(os.path.join(path, "1402"), 'w')
49     f.close()
50     f = open(os.path.join(path, "93"), 'w')
51     f.close()
52
53 class StorageTest(unittest.TestCase):
54     """
55     Test suite for stand alone functions in storage.py.
56     """
57
58     # Class specific variables. Initialized in setUp, used throughout tests.
59     path = "/tmp/ttfa-test"
60
61     def setUp(self):
62         """Create folder."""
63         os.mkdir(self.path)
64
65     def tearDown(self):
66         """Remove base folder."""
67         shutil.rmtree(self.path)
68
69     def test_find_last_numeric_subfolder_no_subfolders(self):
70         """Test return of None when no subfolders are present."""
71         id = storage.find_last_numeric_subfolder(self.path)
72         self.assertEqual(id, None)
73
74     def test_find_last_numeric_subfolder_no_numeric_subfolders(self):
75         """Test return of None when no numeric subfolders are present."""
76         create_non_numeric_subfolders(self.path)
77
78         id = storage.find_last_numeric_subfolder(self.path)
79         self.assertEqual(id, None)
80
81     def test_find_last_numeric_subfolder_only_numeric_subfolders_sequential(self):
82         """
83         Test return of correct id when sequential numeric named
84         subfolders only are present (1, 2, 3, ...).
85         """
86         create_numeric_subfolders_sequential(self.path)
87
88         id = storage.find_last_numeric_subfolder(self.path)
89         self.assertEqual(id, 5)
90
91     def test_find_last_numeric_subfolder_only_numeric_subfolders_nonsequential(self):
92         """
93         Test return of correct id when nonsequential numeric named
94         subfolders only are present (32, 5, 423, ...).
95         """
96         create_numeric_subfolders_non_sequential(self.path)
97
98         id = storage.find_last_numeric_subfolder(self.path)
99         self.assertEqual(id, 1401)
100
101     def test_find_last_numeric_subfolder_mixed_subfolders_sequential(self):
102         """
103         Test return of None when mixed named subfolders (numeric and
104         others) are present. Numeric folders are sequential.
105         """
106         create_non_numeric_subfolders(self.path)
107         create_numeric_subfolders_sequential(self.path)
108
109         id = storage.find_last_numeric_subfolder(self.path)
110         self.assertEqual(id, 5)
111
112     def test_find_last_numeric_subfolder_mixed_subfolders_nonsequential(self):
113         """
114         Test return of None when mixed named subfolders (numeric and
115         others) are present. Numeric folders are non-sequential.
116         """
117         create_non_numeric_subfolders(self.path)
118         create_numeric_subfolders_non_sequential(self.path)
119
120         id = storage.find_last_numeric_subfolder(self.path)
121         self.assertEqual(id, 1401)
122
123     def test_find_last_numeric_subfolder_mixed_subfolders_files(self):
124         """
125         Test return of None when mixed named subfolders (numeric and
126         others) together with files are present. Numeric folders are
127         non-sequential.
128         """
129         create_non_numeric_subfolders(self.path)
130         create_numeric_subfolders_non_sequential(self.path)
131         create_numeric_files(self.path)
132
133         id = storage.find_last_numeric_subfolder(self.path)
134         self.assertEqual(id, 1401)
135
136
137 class TreeTextFileAccessTest(unittest.TestCase):
138     """
139     Test suite for TreeTextFileAccess class in storage.py.
140     """
141
142     # Class specific variables. Initialized in setUp, used throughout tests.
143     path = "/tmp/ttfa-test"
144
145     def setUp(self):
146         """Create folder. Initialize object. """
147         os.mkdir(self.path)
148
149     def tearDown(self):
150         """Remove base folder."""
151         shutil.rmtree(self.path)
152
153     def test_add_swarm_folder_and_file_are_created(self):
154         # Create instance of class and add swarm.
155         a = storage.TreeTextFileAccess(self.path)
156         s = storage.Swarm(torrent_filename="fedora.torrent",
157                 data_size=102400)
158         a.add_swarm(s)
159
160         expected_swarm_subfolder = 1
161         expected_swarm_path = os.path.join(self.path,
162                 str(expected_swarm_subfolder))
163         expected_swarm_config = os.path.join(expected_swarm_path, "swarm.conf")
164
165         self.assertEqual(os.path.isdir(expected_swarm_path), True)
166         self.assertEqual(os.path.isfile(expected_swarm_config), True)
167
168     def test_add_client_session_folder_and_file_are_created(self):
169         a = storage.TreeTextFileAccess(self.path)
170
171         # Add swarm.
172         s = storage.Swarm(torrent_filename="fedora.torrent",
173                 data_size=102400)
174         a.add_swarm(s)
175
176         # Add client session.
177         cs = storage.ClientSession(swarm_id=1, btclient="Tribler",
178                 system_os="Linux", system_os_version="2.6.26",
179                 system_ram=2048, system_cpu=3000, public_ip="141.85.224.201",
180                 public_port="50500", ds_limit=300, us_limit=200)
181         a.add_client_session(cs)
182
183         expected_swarm_subfolder = 1
184         expected_session_subfolder = 1
185         expected_swarm_path = os.path.join(self.path,
186                 str(expected_swarm_subfolder))
187         expected_session_path = os.path.join(expected_swarm_path,
188                 str(expected_session_subfolder))
189         expected_session_config = os.path.join(expected_session_path,
190                 "client_session.conf")
191
192         self.assertEqual(os.path.isdir(expected_session_path), True)
193         self.assertEqual(os.path.isfile(expected_session_config), True)
194
195     def test_add_peer_status_message_file_is_created(self):
196         a = storage.TreeTextFileAccess(self.path)
197
198         # Add swarm.
199         s = storage.Swarm(torrent_filename="fedora.torrent",
200                 data_size=102400)
201         a.add_swarm(s)
202
203         # Add client session.
204         cs = storage.ClientSession(swarm_id=1, btclient="Tribler",
205                 system_os="Linux", system_os_version="2.6.26",
206                 system_ram=2048, system_cpu=3000, public_ip="141.85.224.201",
207                 public_port="50500", ds_limit=300, us_limit=200)
208         a.add_client_session(cs)
209
210         # Add peer status message.
211         msg = storage.PeerStatusMessage(swarm_id=1, client_session_id=1,
212                 peer_ip="141.85.224.202", peer_port="12345",
213                 download_speed=13, upload_speed=98)
214         a.add_peer_status_message(msg)
215
216         expected_swarm_subfolder = 1
217         expected_session_subfolder = 1
218         expected_swarm_path = os.path.join(self.path,
219                 str(expected_swarm_subfolder))
220         expected_session_path = os.path.join(expected_swarm_path,
221                 str(expected_session_subfolder))
222         expected_message_file = os.path.join(expected_session_path,
223                 "peer_status.txt")
224
225         self.assertEqual(os.path.isfile(expected_message_file), True)
226
227     def test_add_peer_status_message_number_of_lines(self):
228         self.assertEqual(True, True)
229
230     def test_add_status_message_file_is_created(self):
231         a = storage.TreeTextFileAccess(self.path)
232
233         # Add swarm.
234         s = storage.Swarm(torrent_filename="fedora.torrent",
235                 data_size=102400)
236         a.add_swarm(s)
237
238         # Add client session.
239         cs = storage.ClientSession(swarm_id=1, btclient="Tribler",
240                 system_os="Linux", system_os_version="2.6.26",
241                 system_ram=2048, system_cpu=3000, public_ip="141.85.224.201",
242                 public_port="50500", ds_limit=300, us_limit=200)
243         a.add_client_session(cs)
244
245         # Add status message.
246         msg = storage.StatusMessage(swarm_id=1, client_session_id=1,
247                 num_peers=10, num_dht_peers=3, download_speed=102,
248                 upload_speed=99, download_size=10213, upload_size=3301)
249         a.add_status_message(msg)
250
251         expected_swarm_subfolder = 1
252         expected_session_subfolder = 1
253         expected_swarm_path = os.path.join(self.path,
254                 str(expected_swarm_subfolder))
255         expected_session_path = os.path.join(expected_swarm_path,
256                 str(expected_session_subfolder))
257         expected_message_file = os.path.join(expected_session_path,
258                 "status.txt")
259
260         self.assertEqual(os.path.isfile(expected_message_file), True)
261
262     def test_add_status_message_number_of_lines(self):
263         self.assertEqual(True, True)
264
265     def test_add_verbose_message_file_is_created(self):
266         a = storage.TreeTextFileAccess(self.path)
267
268         # Add swarm.
269         s = storage.Swarm(torrent_filename="fedora.torrent",
270                 data_size=102400)
271         a.add_swarm(s)
272
273         # Add client session.
274         cs = storage.ClientSession(swarm_id=1, btclient="Tribler",
275                 system_os="Linux", system_os_version="2.6.26",
276                 system_ram=2048, system_cpu=3000, public_ip="141.85.224.201",
277                 public_port="50500", ds_limit=300, us_limit=200)
278         a.add_client_session(cs)
279
280         # Add verbose message.
281         msg = storage.VerboseMessage(swarm_id=1, client_session_id=1,
282             transfer_direction="send", peer_ip="141.85.224.202",
283             peer_port="12345", message_type="CHOKE")
284         a.add_verbose_message(msg)
285
286         expected_swarm_subfolder = 1
287         expected_session_subfolder = 1
288         expected_swarm_path = os.path.join(self.path,
289                 str(expected_swarm_subfolder))
290         expected_session_path = os.path.join(expected_swarm_path,
291                 str(expected_session_subfolder))
292         expected_message_file = os.path.join(expected_session_path,
293                 "verbose.txt")
294
295         self.assertEqual(os.path.isfile(expected_message_file), True)
296
297     def test_add_verbose_message_number_of_lines(self):
298         self.assertEqual(True, True)
299
300 class SQLiteDatabaseAccessTest(unittest.TestCase):
301     """
302     Test suite for SQLiteDatabaseAccess class in storage.py.
303     """
304
305     # Class specific variables. Initialized in setUp, used throughout tests.
306     database = "test.db"
307     sql_create_script = "p2p-log-sqlite.sql"
308     sql_init_script = "p2p-init-test.sql"
309     expected_swarm_count = 0
310     expected_session_count = 0
311     expected_statmsg_count = 0
312     expected_pstatmsg_count = 0
313     expected_verbmsg_count = 0
314
315     def get_swarm_count(self):
316         """ Retrieve number of entries in `swarms` table. """
317         conn = sqlite3.connect(self.database)
318         cursor = conn.cursor()
319
320         cursor.execute('SELECT COUNT(id) FROM swarms')
321         count = cursor.fetchone()[0]
322
323         cursor.close()
324         conn.close()
325
326         return count
327
328     def get_session_count(self):
329         """ Retrieve number of entries in `client_sessions` table. """
330         conn = sqlite3.connect(self.database)
331         cursor = conn.cursor()
332
333         cursor.execute('SELECT COUNT(id) FROM client_sessions')
334         count = cursor.fetchone()[0]
335
336         cursor.close()
337         conn.close()
338
339         return count
340
341     def get_statmsg_count(self):
342         """ Retrieve number of entries in `status_messages` table. """
343         conn = sqlite3.connect(self.database)
344         cursor = conn.cursor()
345
346         cursor.execute('SELECT COUNT(id) FROM status_messages')
347         count = cursor.fetchone()[0]
348
349         cursor.close()
350         conn.close()
351
352         return count
353
354     def get_pstatmsg_count(self):
355         """ Retrieve number of entries in `peer_status_messages` table. """
356         conn = sqlite3.connect(self.database)
357         cursor = conn.cursor()
358
359         cursor.execute('SELECT COUNT(id) FROM peer_status_messages')
360         count = cursor.fetchone()[0]
361
362         cursor.close()
363         conn.close()
364
365         return count
366
367     def get_verbmsg_count(self):
368         """ Retrieve number of entries in `verbose_messages` table. """
369         conn = sqlite3.connect(self.database)
370         cursor = conn.cursor()
371
372         cursor.execute('SELECT COUNT(id) FROM verbose_messages')
373         count = cursor.fetchone()[0]
374
375         cursor.close()
376         conn.close()
377
378         return count
379
380     def setUp(self):
381         """ Create database file and instantiate objects. """
382         # Create database file. Create tables and indices.
383         # TODO: Check exceptions.
384         sql_create_script_path = os.path.join(os.path.dirname(__file__),
385                 self.sql_create_script)
386         f = open(sql_create_script_path, 'r')
387         p = subprocess.Popen(["sqlite3", self.database], stdin=f)
388         ret = os.waitpid(p.pid, 0)[1]
389         f.close()
390
391         # Populate database.
392         # TODO: Check exceptions.
393         sql_init_script_path = os.path.join(os.path.dirname(__file__),
394                 self.sql_init_script)
395         f = open(sql_init_script_path, 'r')
396         p = subprocess.Popen(["sqlite3", self.database], stdin=f)
397         ret = os.waitpid(p.pid, 0)[1]
398         f.close()
399
400         # Initialize to number of table entries inserted in the init script.
401         self.expected_swarm_count = 2
402         self.expected_session_count = 2
403         self.expected_statmsg_count = 2
404         self.expected_pstatmsg_count = 2
405         self.expected_verbmsg_count = 2
406
407     def tearDown(self):
408         """ Close connection and remove database file. """
409         os.remove(self.database)
410
411     def test_add_swarm_new_entry_in_table(self):
412         # Add new swarm.
413         s = storage.Swarm(torrent_filename="fedora.torrent",
414                 data_size=102400)
415         a = storage.SQLiteDatabaseAccess(self.database)
416         a.connect()
417         a.add_swarm(s)
418         a.disconnect()
419         self.expected_swarm_count = self.expected_swarm_count + 1
420
421         # Select number of swarms.
422         swarm_count = self.get_swarm_count()
423
424         self.assertEqual(swarm_count, self.expected_swarm_count)
425
426     def test_add_client_session_new_entry_in_table(self):
427         # Add new client session.
428         cs = storage.ClientSession(swarm_id=1, btclient="Tribler",
429                 system_os="Linux", system_os_version="2.6.26",
430                 system_ram=2048, system_cpu=3000, public_ip="141.85.224.201",
431                 public_port="50500", ds_limit=300, us_limit=200)
432         a = storage.SQLiteDatabaseAccess(self.database)
433         a.connect()
434         a.add_client_session(cs)
435         a.disconnect()
436         self.expected_session_count = self.expected_session_count + 1
437
438         # Select number of swarms.
439         session_count = self.get_session_count()
440
441         self.assertEqual(session_count, self.expected_session_count)
442
443     def test_add_status_message_new_entry_in_table(self):
444         # Add new status message.
445         ts = datetime.datetime.strptime("2010-09-12 08:43:15",
446                 "%Y-%m-%d %H:%M:%S")
447         msg = storage.StatusMessage(client_session_id=1, timestamp=ts,
448                 num_peers=10, num_dht_peers=3, download_speed=102,
449                 upload_speed=99, download_size=10213, upload_size=3301)
450         a = storage.SQLiteDatabaseAccess(self.database)
451         a.connect()
452         a.add_status_message(msg)
453         a.disconnect()
454         self.expected_statmsg_count = self.expected_statmsg_count + 1
455
456         # Select number of status messages.
457         statmsg_count = self.get_statmsg_count()
458
459         self.assertEqual(statmsg_count, self.expected_statmsg_count)
460
461     def test_add_peer_status_message_new_entry_in_table(self):
462         # Add new peer status message.
463         ts = datetime.datetime.strptime("2010-09-12 13:43:25",
464                 "%Y-%m-%d %H:%M:%S")
465         msg = storage.PeerStatusMessage(client_session_id=1, timestamp=ts,
466                 peer_ip="141.85.224.202", peer_port="12345",
467                 download_speed=13, upload_speed=98)
468         a = storage.SQLiteDatabaseAccess(self.database)
469         a.connect()
470         a.add_peer_status_message(msg)
471         a.disconnect()
472         self.expected_pstatmsg_count = self.expected_pstatmsg_count + 1
473
474         # Select number of peer status messages.
475         pstatmsg_count = self.get_pstatmsg_count()
476
477         self.assertEqual(pstatmsg_count, self.expected_pstatmsg_count)
478
479     def test_add_verbose_message_new_entry_in_databas(self):
480         # Add new verbose message.
481         ts = datetime.datetime.strptime("2010-09-12 13:43:24",
482                 "%Y-%m-%d %H:%M:%S")
483         msg = storage.VerboseMessage(client_session_id=1, timestamp=ts,
484             transfer_direction="send", peer_ip="141.85.224.202",
485             peer_port="12345", message_type="CHOKE")
486         a = storage.SQLiteDatabaseAccess(self.database)
487         a.connect()
488         a.add_verbose_message(msg)
489         a.disconnect()
490         self.expected_verbmsg_count = self.expected_verbmsg_count + 1
491
492         # Select number of verbose messages.
493         verbmsg_count = self.get_verbmsg_count()
494
495         self.assertEqual(verbmsg_count, self.expected_verbmsg_count)
496
497
498 class MySQLDatabaseAccessTest(unittest.TestCase):
499     """
500     Test suite for MySQLDatabaseAccess class in storage.py.
501     """
502
503     # Class specific variables. Initialized in setUp, used throughout tests.
504     database = "p2p_test"
505     user = "root"
506     password = "p2p4th3m45535"
507     conn = None
508     cursor = None
509     sql_create_script = "p2p-log-mysql.sql"
510     sql_init_script = "p2p-init-test-mysql.sql"
511     expected_swarm_count = 0
512     expected_session_count = 0
513     expected_statmsg_count = 0
514     expected_pstatmsg_count = 0
515     expected_verbmsg_count = 0
516
517     def get_swarm_count(self):
518         """ Retrieve number of entries in `swarms` table. """
519
520         self.cursor.execute('SELECT COUNT(id) FROM swarms')
521         count = self.cursor.fetchone()[0]
522
523         return count
524
525     def get_session_count(self):
526         """ Retrieve number of entries in `client_sessions` table. """
527         self.cursor.execute('SELECT COUNT(id) FROM client_sessions')
528         count = self.cursor.fetchone()[0]
529
530         return count
531
532     def get_statmsg_count(self):
533         """ Retrieve number of entries in `status_messages` table. """
534         self.cursor.execute('SELECT COUNT(id) FROM status_messages')
535         count = self.cursor.fetchone()[0]
536
537         return count
538
539     def get_pstatmsg_count(self):
540         """ Retrieve number of entries in `peer_status_messages` table. """
541         self.cursor.execute('SELECT COUNT(id) FROM peer_status_messages')
542         count = self.cursor.fetchone()[0]
543
544         return count
545
546     def get_verbmsg_count(self):
547         """ Retrieve number of entries in `verbose_messages` table. """
548         self.cursor.execute('SELECT COUNT(id) FROM verbose_messages')
549         count = self.cursor.fetchone()[0]
550
551         return count
552
553     def setUp(self):
554         """ Create database and instantiate objects. """
555         # Create database. Create tables and indices.
556         # TODO: Check exceptions.
557
558         sql_create_script_path = os.path.join(os.path.dirname(__file__),
559                                               self.sql_create_script)
560
561         self.conn = MySQLdb.Connection(user=self.user, passwd=self.password)
562         self.cursor = self.conn.cursor()
563
564         self.cursor.execute("CREATE DATABASE %s" %self.database)
565         self.cursor.execute("USE %s" %self.database)
566
567         f = open(sql_create_script_path, 'r')
568         p = subprocess.Popen(["mysql", self.database, "--user=%s" %self.user,
569                              "--password=%s" %self.password], stdin=f)
570         ret = os.waitpid(p.pid, 0)[1]
571         f.close()
572         
573         # Populate database.
574         # TODO: Check exceptions.
575         sql_init_script_path = os.path.join(os.path.dirname(__file__),
576                 self.sql_init_script)
577
578         f = open(sql_init_script_path, 'r')
579         p = subprocess.Popen(["mysql", self.database, "--user=%s" %self.user,
580                              "--password=%s" %self.password], stdin=f)
581         ret = os.waitpid(p.pid, 0)[1]
582         f.close()
583
584         # Initialize to number of table entries inserted in the init script.
585         self.expected_swarm_count = 2
586         self.expected_session_count = 2
587         self.expected_statmsg_count = 2
588         self.expected_pstatmsg_count = 2
589         self.expected_verbmsg_count = 2
590
591     def tearDown(self):
592         """ Delete database and close connection. """
593         self.cursor.execute("DROP DATABASE %s" %self.database)
594         self.cursor.close()
595         self.conn.close()
596
597     def test_add_swarm_new_entry_in_table(self):
598         # Add new swarm.
599         s = storage.Swarm(torrent_filename="fedora.torrent",
600                 data_size=102400)
601         a = storage.MySQLDatabaseAccess(self.database)
602         a.connect()
603         a.add_swarm(s)
604         a.disconnect()
605         self.expected_swarm_count = self.expected_swarm_count + 1
606
607         # Select number of swarms.
608         swarm_count = self.get_swarm_count()
609
610         self.assertEqual(swarm_count, self.expected_swarm_count)
611
612     def test_add_client_session_new_entry_in_table(self):
613         # Add new client session.
614         cs = storage.ClientSession(swarm_id=1, btclient="Tribler",
615                 system_os="Linux", system_os_version="2.6.26",
616                 system_ram=2048, system_cpu=3000, public_ip="141.85.224.201",
617                 public_port="50500", ds_limit=300, us_limit=200)
618         a = storage.MySQLDatabaseAccess(self.database)
619         a.connect()
620         a.add_client_session(cs)
621         a.disconnect()
622         self.expected_session_count = self.expected_session_count + 1
623
624         # Select number of swarms.
625         session_count = self.get_session_count()
626
627         self.assertEqual(session_count, self.expected_session_count)
628
629     def test_add_status_message_new_entry_in_table(self):
630         # Add new status message.
631         ts = datetime.datetime.strptime("2010-09-12 08:43:15",
632                 "%Y-%m-%d %H:%M:%S")
633         msg = storage.StatusMessage(client_session_id=1, timestamp=ts,
634                 num_peers=10, num_dht_peers=3, download_speed=102,
635                 upload_speed=99, download_size=10213, upload_size=3301)
636         a = storage.MySQLDatabaseAccess(self.database)
637         a.connect()
638         a.add_status_message(msg)
639         a.disconnect()
640         self.expected_statmsg_count = self.expected_statmsg_count + 1
641
642         # Select number of status messages.
643         statmsg_count = self.get_statmsg_count()
644
645         self.assertEqual(statmsg_count, self.expected_statmsg_count)
646
647     def test_add_peer_status_message_new_entry_in_table(self):
648         # Add new peer status message.
649         ts = datetime.datetime.strptime("2010-09-12 13:43:25",
650                 "%Y-%m-%d %H:%M:%S")
651         msg = storage.PeerStatusMessage(client_session_id=1, timestamp=ts,
652                 peer_ip="141.85.224.202", peer_port="12345",
653                 download_speed=13, upload_speed=98)
654         a = storage.MySQLDatabaseAccess(self.database)
655         a.connect()
656         a.add_peer_status_message(msg)
657         a.disconnect()
658         self.expected_pstatmsg_count = self.expected_pstatmsg_count + 1
659
660         # Select number of peer status messages.
661         pstatmsg_count = self.get_pstatmsg_count()
662
663         self.assertEqual(pstatmsg_count, self.expected_pstatmsg_count)
664
665     def test_add_verbose_message_new_entry_in_databas(self):
666         # Add new verbose message.
667         ts = datetime.datetime.strptime("2010-09-12 13:43:24",
668                 "%Y-%m-%d %H:%M:%S")
669         msg = storage.VerboseMessage(client_session_id=1, timestamp=ts,
670             transfer_direction="send", peer_ip="141.85.224.202",
671             peer_port="12345", message_type="CHOKE")
672         a = storage.MySQLDatabaseAccess(self.database)
673         a.connect()
674         a.add_verbose_message(msg)
675         a.disconnect()
676         self.expected_verbmsg_count = self.expected_verbmsg_count + 1
677
678         # Select number of verbose messages.
679         verbmsg_count = self.get_verbmsg_count()
680
681         self.assertEqual(verbmsg_count, self.expected_verbmsg_count)
682
683 if __name__ == "__main__":
684     unittest.main()