1 # Written by Andrea Reale
2 # see LICENSE.txt for license information
4 from __future__ import with_statement
5 from BaseLib.Core.CacheDB.MetadataDBHandler import MetadataDBHandler
6 from BaseLib.Core.Subtitles.MetadataDomainObjects.MetadataDTO import MetadataDTO
7 from BaseLib.Core.Subtitles.MetadataDomainObjects.SubtitleInfo import SubtitleInfo
8 from BaseLib.Core.CacheDB.sqlitecachedb import bin2str
9 from BaseLib.Test.Core.CacheDB.SimpleMetadataDB import SimpleMetadataDB
10 from BaseLib.Core.Overlay.permid import generate_keypair
17 from BaseLib.Core.Subtitles.MetadataDomainObjects.MetadataExceptions import MetadataDBException
21 RES_DIR = os.path.join('..','..','subtitles_test_res')
23 CREATE_SQL_FILE = os.path.join(RES_DIR,'schema_sdb_v5.sql')
24 SQL_DB = ":memory:"#"res/test.sdb"
26 logging.basicConfig(level=logging.DEBUG)
29 class TestMetadataDBHandler(unittest.TestCase):
30 _keypair1 = generate_keypair()
31 aPermId = str(_keypair1.pub().get_der())
32 _keypair2 = generate_keypair()
33 anotherPermId = str(_keypair2.pub().get_der())
36 #createDB = not os.path.isfile(SQL_DB)
37 self.db = SimpleMetadataDB(CREATE_SQL_FILE, SQL_DB)
38 self.underTest = MetadataDBHandler(self.db)
42 #if os.path.isfile(SQL_DB) :
47 def testInitHandler(self):
48 self.assertTrue(self.underTest is not None)
50 def testSingleton(self):
52 instance1 = MetadataDBHandler.getInstance()
53 instance2 = MetadataDBHandler.getInstance()
54 self.assertTrue(instance1 is instance2)
56 def testInsertNewMetadataSubs(self):
57 metadataDTO = MockMetadataDTO(["nld","ita"])
58 metadataDTO.sign(metadataDTO._keypair)
59 self.underTest.insertMetadata(metadataDTO)
61 testquery = "SELECT * FROM Metadata WHERE publisher_id=?" \
63 results = self.db.fetchall(testquery, (bin2str(metadataDTO.channel),bin2str(metadataDTO.infohash)))
65 self.assertTrue(len(results) == 1)
67 self.assertTrue(tupl[0] is not None and isinstance(tupl[0], int))
68 self.assertEquals(bin2str(metadataDTO.channel),tupl[1])
69 self.assertEquals(bin2str(metadataDTO.infohash),tupl[2])
70 self.assertEquals(metadataDTO.description, tupl[3])
71 self.assertEquals(metadataDTO.timestamp, tupl[4])
72 self.assertEquals(bin2str(metadataDTO.signature), tupl[5])
74 subtitlesQuery = "SELECT * FROM Subtitles WHERE metadata_id_fk=?;"
76 subtitles = self.db.fetchall(subtitlesQuery, (tupl[0],))
77 self.assertEquals(2,len(subtitles))
79 for lang in ("ita", "nld"):
82 for subtuple in subtitles:
83 if subtuple[1] == lang:
88 self.assertTrue(found)
89 self.assertEquals(bin2str(metadataDTO.getSubtitle(lang).checksum), foundSub[3])
92 def testGetMetadataInstance(self):
93 metadataDTO = MockMetadataDTO(["nld","ita"])
94 metadataDTO.sign(metadataDTO._keypair)
95 self.underTest.insertMetadata(metadataDTO)
97 retrievedMetadata = self.underTest.getMetadata(metadataDTO.channel,
100 self.assertFalse(retrievedMetadata is None)
101 self.assertFalse(retrievedMetadata is metadataDTO)
102 self.assertEquals(metadataDTO,retrievedMetadata)
107 def testInsertNewMetadataNoSubs(self):
108 metadataDTO = MockMetadataDTO([])
109 metadataDTO.sign(metadataDTO._keypair)
110 self.underTest.insertMetadata(metadataDTO)
112 testquery = "SELECT * FROM Metadata WHERE publisher_id=?" \
115 channel = bin2str(metadataDTO.channel)
116 infohash = bin2str(metadataDTO.infohash)
117 results = self.db.fetchall(testquery, (channel,infohash))
119 self.assertTrue(len(results) == 1)
121 self.assertTrue(tupl[0] is not None and isinstance(tupl[0], int))
122 self.assertEquals(channel,tupl[1])
123 self.assertEquals(infohash,tupl[2])
124 self.assertEquals(metadataDTO.description, tupl[3])
125 self.assertEquals(metadataDTO.timestamp, tupl[4])
126 self.assertEquals(bin2str(metadataDTO.signature), tupl[5])
128 subtitlesQuery = "SELECT * FROM Subtitles WHERE metadata_id_fk=?;"
130 subtitles = self.db.fetchall(subtitlesQuery, (tupl[0],))
131 self.assertEquals(0,len(subtitles))
133 def testUpdateExistingWithOlder(self):
136 metadataDTO = MockMetadataDTO(["nld", "ita"])
137 metadataDTO.sign(metadataDTO._keypair)
138 self.underTest.insertMetadata(metadataDTO)
140 olderMetadataDTO = copy.copy(metadataDTO)
141 olderMetadataDTO.timestamp = 1 #*really* old
142 olderMetadataDTO.sign(olderMetadataDTO._keypair)
144 self.underTest.insertMetadata(olderMetadataDTO)
146 #assert the the older did not replace the newer
147 testquery = "SELECT * FROM Metadata WHERE publisher_id=?" \
149 channel = bin2str(metadataDTO.channel)
150 infohash = bin2str(metadataDTO.infohash)
151 results = self.db.fetchall(testquery, (channel,infohash))
153 self.assertTrue(len(results) == 1)
155 self.assertTrue(tupl[0] is not None and isinstance(tupl[0], int))
156 self.assertEquals(channel,tupl[1])
157 self.assertEquals(infohash,tupl[2])
158 self.assertEquals(metadataDTO.description, tupl[3])
159 self.assertEquals(metadataDTO.timestamp, tupl[4])
160 self.assertEquals(bin2str(metadataDTO.signature), tupl[5])
162 subtitlesQuery = "SELECT * FROM Subtitles WHERE metadata_id_fk=?;"
164 subtitles = self.db.fetchall(subtitlesQuery, (tupl[0],))
165 self.assertEquals(2,len(subtitles))
167 for lang in ("ita", "nld"):
170 for subtuple in subtitles:
171 if subtuple[1] == lang:
176 self.assertTrue(found)
177 self.assertEquals(bin2str(metadataDTO.getSubtitle(lang).checksum), foundSub[3])
180 def testUpdateExistingWithNewerSameSub(self):
181 metadataDTO = MockMetadataDTO(["nld", "ita"])
182 metadataDTO.sign(metadataDTO._keypair)
183 self.underTest.insertMetadata(metadataDTO)
185 newerMetadataDTO = copy.copy(metadataDTO)
186 newerMetadataDTO.description = u"I'm newer!"
187 newerMetadataDTO.timestamp = newerMetadataDTO.timestamp +1 #newer
188 newerMetadataDTO.sign(newerMetadataDTO._keypair)
191 self.underTest.insertMetadata(newerMetadataDTO)
193 #assert the the older has been replaced
194 testquery = "SELECT * FROM Metadata WHERE publisher_id=?" \
197 channel = bin2str(metadataDTO.channel)
198 infohash = bin2str(metadataDTO.infohash)
199 results = self.db.fetchall(testquery, (channel,infohash))
201 self.assertTrue(len(results) == 1)
203 self.assertTrue(tupl[0] is not None and isinstance(tupl[0], int))
204 self.assertEquals(channel,tupl[1])
205 self.assertEquals(infohash,tupl[2])
206 self.assertEquals(newerMetadataDTO.description, tupl[3])
207 self.assertEquals(newerMetadataDTO.timestamp, tupl[4])
208 self.assertEquals(bin2str(newerMetadataDTO.signature), tupl[5])
210 #testing subtitles with the old once since they are not changed
211 subtitlesQuery = "SELECT * FROM Subtitles WHERE metadata_id_fk=?;"
213 subtitles = self.db.fetchall(subtitlesQuery, (tupl[0],))
214 self.assertEquals(2,len(subtitles))
216 for lang in ("ita", "nld"):
219 for subtuple in subtitles:
220 if subtuple[1] == lang:
225 self.assertTrue(found)
226 self.assertEquals(bin2str(metadataDTO.getSubtitle(lang).checksum), foundSub[3])
230 def testUpdateExistingWithNewerNewSubs(self):
231 metadataDTO = MockMetadataDTO(["nld", "ita"])
232 metadataDTO.sign(metadataDTO._keypair)
233 self.underTest.insertMetadata(metadataDTO)
235 newerMetadataDTO = MockMetadataDTO(["nld","ita","eng"])
236 newerMetadataDTO.channel = metadataDTO.channel
237 newerMetadataDTO.infohash = metadataDTO.infohash
238 newerMetadataDTO._keypair = metadataDTO._keypair
239 newerMetadataDTO.timestamp = metadataDTO.timestamp +1 #newer
240 newerMetadataDTO.sign(newerMetadataDTO._keypair)
243 self.underTest.insertMetadata(newerMetadataDTO)
245 #assert the the older has been replaced
246 testquery = "SELECT * FROM Metadata WHERE publisher_id=?" \
249 channel = bin2str(metadataDTO.channel)
250 infohash = bin2str(metadataDTO.infohash)
251 results = self.db.fetchall(testquery, (channel,infohash))
253 self.assertTrue(len(results) == 1)
255 self.assertTrue(tupl[0] is not None and isinstance(tupl[0], int))
256 self.assertEquals(channel,tupl[1])
257 self.assertEquals(infohash,tupl[2])
258 self.assertEquals(newerMetadataDTO.description, tupl[3])
259 self.assertEquals(newerMetadataDTO.timestamp, tupl[4])
260 self.assertEquals(bin2str(newerMetadataDTO.signature), tupl[5])
262 subtitlesQuery = "SELECT * FROM Subtitles WHERE metadata_id_fk=?;"
264 subtitles = self.db.fetchall(subtitlesQuery, (tupl[0],))
265 self.assertEquals(3,len(subtitles))
267 for lang in ("ita", "nld","eng"):
270 for subtuple in subtitles:
271 if subtuple[1] == lang:
276 self.assertTrue(found)
277 self.assertEquals(bin2str(newerMetadataDTO.getSubtitle(lang).checksum), foundSub[3])
279 def testUpdateExistingWithNewerSubsDeleted(self):
280 metadataDTO = MockMetadataDTO(["nld", "ita"])
281 metadataDTO.sign(metadataDTO._keypair)
282 self.underTest.insertMetadata(metadataDTO)
284 newerMetadataDTO = MockMetadataDTO(["nld","eng"])
285 newerMetadataDTO.channel = metadataDTO.channel
286 newerMetadataDTO.infohash = metadataDTO.infohash
287 newerMetadataDTO._keypair = metadataDTO._keypair
288 newerMetadataDTO.timestamp = metadataDTO.timestamp +1 #newer
289 newerMetadataDTO.sign(newerMetadataDTO._keypair)
292 self.underTest.insertMetadata(newerMetadataDTO)
294 #assert the the older has been replaced
295 testquery = "SELECT * FROM Metadata WHERE publisher_id=?" \
297 channel = bin2str(metadataDTO.channel)
298 infohash = bin2str(metadataDTO.infohash)
299 results = self.db.fetchall(testquery, (channel,infohash))
301 self.assertTrue(len(results) == 1)
303 self.assertTrue(tupl[0] is not None and isinstance(tupl[0], int))
304 self.assertEquals(channel,tupl[1])
305 self.assertEquals(infohash,tupl[2])
306 self.assertEquals(newerMetadataDTO.description, tupl[3])
307 self.assertEquals(newerMetadataDTO.timestamp, tupl[4])
308 self.assertEquals(bin2str(newerMetadataDTO.signature), tupl[5])
310 subtitlesQuery = "SELECT * FROM Subtitles WHERE metadata_id_fk=?;"
312 subtitles = self.db.fetchall(subtitlesQuery, (tupl[0],))
313 self.assertEquals(2,len(subtitles))
315 for lang in ("nld","eng"):
318 for subtuple in subtitles:
319 if subtuple[1] == lang:
324 self.assertTrue(found)
325 self.assertEquals(bin2str(newerMetadataDTO.getSubtitle(lang).checksum), foundSub[3])
327 def testGetAllMetadataForInfohashEmtpy(self):
328 metadataDTO = MockMetadataDTO(["nld", "ita"])
329 metadataDTO.sign(metadataDTO._keypair)
330 self.underTest.insertMetadata(metadataDTO)
333 otherinfohash = _generateFakeInfohash()
335 results = self.underTest.getAllMetadataForInfohash(otherinfohash)
336 self.assertTrue(len(results)==0)
338 def testGetAllMetadataForInfohashNotEmpty(self):
339 infohash = _generateFakeInfohash()
340 metadataDTO1 = MockMetadataDTO(["nld", "ita"],infohash)
341 metadataDTO1.sign(metadataDTO1._keypair)
342 self.underTest.insertMetadata(metadataDTO1)
344 #different channels since the channel is automatically
345 #generated by MockMetadata DTO
346 metadataDTO2 = MockMetadataDTO(["rus", "eng"],infohash)
347 metadataDTO2.sign(metadataDTO2._keypair)
348 self.underTest.insertMetadata(metadataDTO2)
350 #a 3rd instance with different channel and infohash
351 metadataDTO3 = MockMetadataDTO(["rus", "spa", "jpn"])
352 metadataDTO3.sign(metadataDTO3._keypair)
353 self.underTest.insertMetadata(metadataDTO3)
355 results = self.underTest.getAllMetadataForInfohash(infohash)
356 self.assertTrue(len(results)==2)
358 #in checks for equality, not reference equality
359 self.assertTrue(metadataDTO1 in results)
360 self.assertTrue(metadataDTO2 in results)
361 self.assertFalse(metadataDTO3 in results)
365 def testDeleteSubtitle(self):
366 infohash = _generateFakeInfohash()
367 metadataDTO = MockMetadataDTO(["eng","kor"], infohash)
369 metadataDTO.sign(metadataDTO._keypair)
370 self.underTest.insertMetadata(metadataDTO)
372 res = self.underTest.getAllSubtitles(metadataDTO.channel, infohash)
373 self.assertTrue("eng" in res and "kor" in res)
375 #delete a subtitle that does not exist
376 self.underTest._deleteSubtitleByChannel(metadataDTO.channel, infohash, "ita")
377 res = self.underTest.getAllSubtitles(metadataDTO.channel, infohash)
378 self.assertTrue("eng" in res and "kor" in res)
380 self.underTest._deleteSubtitleByChannel(metadataDTO.channel, infohash, "eng")
381 res = self.underTest.getAllSubtitles(metadataDTO.channel, infohash)
382 self.assertTrue("kor" in res and not "eng" in res)
385 def testSelectLocalSubtitles(self):
387 infohash1 = _generateFakeInfohash()
388 metadataDTO1 = MockMetadataDTO(["eng","kor"], infohash1)
390 metadataDTO1.sign(metadataDTO1._keypair)
391 self.underTest.insertMetadata(metadataDTO1)
393 res = self.underTest.getAllLocalSubtitles()
395 self.assertTrue(len(res) == 0)
397 infohash2 = _generateFakeInfohash()
398 metadataDTO2 = MockMetadataDTO(["nld","spa"], infohash2)
400 metadataDTO2.getSubtitle("nld").path = "/bla/bla"
402 metadataDTO2.sign(metadataDTO2._keypair)
403 self.underTest.insertMetadata(metadataDTO2)
405 res = self.underTest.getAllLocalSubtitles()
407 self.assertTrue(len(res) == 1)
409 self.assertTrue(metadataDTO2.channel in res)
411 self.assertTrue(infohash2 in res[metadataDTO2.channel])
412 self.assertEquals(1, len(res[metadataDTO2.channel][infohash2]))
414 self.assertEquals(metadataDTO2.getSubtitle("nld"), res[metadataDTO2.channel][infohash2][0])
416 def testSelectLocalSubtitles2(self):
417 infohash1 = _generateFakeInfohash()
418 metadataDTO1 = MockMetadataDTO(["eng","kor", "nld"], infohash1)
420 metadataDTO1.getSubtitle("nld").path = "/bla/bla"
421 metadataDTO1.getSubtitle("eng").path = "/bla/bla"
422 metadataDTO1.sign(metadataDTO1._keypair)
423 self.underTest.insertMetadata(metadataDTO1)
425 infohash2 = _generateFakeInfohash()
426 metadataDTO2 = MockMetadataDTO(["ita","spa"], infohash2)
427 metadataDTO2.getSubtitle("ita").path = "/a/b"
428 metadataDTO2.getSubtitle("spa").path = "/c/d"
429 metadataDTO2.sign(metadataDTO2._keypair)
430 self.underTest.insertMetadata(metadataDTO2)
433 res = self.underTest.getLocalSubtitles(metadataDTO1.channel, infohash1)
434 self.assertEquals(2, len(res))
436 self.assertTrue("eng" in res)
437 self.assertEquals(metadataDTO1.getSubtitle("eng"), res["eng"])
439 self.assertTrue("nld" in res)
440 self.assertEquals(metadataDTO1.getSubtitle("nld"), res["nld"])
442 self.assertFalse("kor" in res)
444 def testUpdateSubtitlesWithNonePathValue(self):
447 infohash1 = _generateFakeInfohash()
448 metadataDTO1 = MockMetadataDTO(["eng","kor"], infohash1)
450 metadataDTO1.getSubtitle("eng").path = os.path.abspath(os.path.join("bla","bla"))
451 metadataDTO1.sign(metadataDTO1._keypair)
452 self.underTest.insertMetadata(metadataDTO1)
454 sub = self.underTest.getSubtitle(metadataDTO1.channel, infohash1, "eng")
455 self.assertEquals(os.path.abspath(os.path.join("bla","bla")), sub.path)
457 self.underTest.updateSubtitlePath(metadataDTO1.channel, infohash1,
460 sub = self.underTest.getSubtitle(metadataDTO1.channel, infohash1, "eng")
461 self.assertEquals(None, sub.path)
464 def testUpdateSubtitles(self):
465 sub1path= os.path.join(RES_DIR,"fake0.srt")
466 sub2path=os.path.join(RES_DIR,"fake1.srt")
467 infohash = _generateFakeInfohash()
468 metadataDTO = MockMetadataDTO([], infohash)
469 sub1 = SubtitleInfo("ita", None, _computeSHA1(sub1path))
470 sub2 = SubtitleInfo("eng",None,_computeSHA1(sub2path))
472 metadataDTO.addSubtitle(sub1)
473 metadataDTO.addSubtitle(sub2)
474 metadataDTO.sign(metadataDTO._keypair)
475 self.underTest.insertMetadata(metadataDTO)
477 res1 = self.underTest.getSubtitle(metadataDTO.channel, infohash,"ita")
478 self.assertEquals(sub1,res1)
480 res2 = self.underTest.getSubtitle(metadataDTO.channel, infohash, "eng")
481 self.assertEquals(sub2,res2)
483 sub1bis = copy.copy(sub1)
484 sub1bis.path = sub1path
485 sub2bis = copy.copy(sub2)
486 sub2bis.path = sub2path
488 self.underTest.updateSubtitlePath(metadataDTO.channel, infohash,
489 sub1bis.lang, sub1bis.path, False)
490 self.underTest.updateSubtitlePath(metadataDTO.channel, infohash,
491 sub2bis.lang, sub2bis.path , False)
494 self.underTest.commit()
496 #still unchanged since I did not commit
497 res1 = self.underTest.getSubtitle(metadataDTO.channel, infohash,"ita")
498 self.assertTrue(sub1== res1 and sub1.path != res1.path)
499 self.assertTrue(sub1bis == res1 and sub1bis.path == res1.path)
501 res2 = self.underTest.getSubtitle(metadataDTO.channel, infohash, "eng")
502 self.assertTrue(sub2 == res2 and sub2.path != res2.path)
503 self.assertTrue(sub2bis == res2 and sub2bis.path == res2.path)
506 # 30-05-2010 Testing of the new added table (SubtitlesHave) manipulation
509 def testInsertAndGetHaveMask(self):
512 infohash = _generateFakeInfohash()
513 metadataDTO1 = MockMetadataDTO(["nld","spa"], infohash)
514 channel = metadataDTO1.channel
516 metadataDTO1.sign(metadataDTO1._keypair)
517 self.underTest.insertMetadata(metadataDTO1)
519 peer_id = TestMetadataDBHandler.anotherPermId
521 #inserting a negative mask has to be refused
524 lambda : self.underTest.insertHaveMask(channel, infohash,
527 self.assertRaises(MetadataDBException, funcToTest)
529 #also a bitmask must be smaller then 2**32
533 lambda : self.underTest.insertHaveMask(channel, infohash,
536 self.assertRaises(MetadataDBException, funcToTest)
539 #now it's time for a correct value
541 self.underTest.insertHaveMask(channel, infohash, peer_id, havemask1)
543 mask = self.underTest.getHaveMask(channel, infohash,peer_id)
544 self.assertEqual(mask,havemask1)
546 #duplicate insertions should raise an error
549 lambda : self.underTest.insertHaveMask(channel, infohash,
552 self.assertRaises(MetadataDBException, funcToTest)
554 #insertion for another peer should go fine
555 self.underTest.insertHaveMask(channel, infohash, channel, havemask2)
557 mask1 = self.underTest.getHaveMask(channel, infohash,peer_id)
558 self.assertEqual(mask1,havemask1)
559 mask2 = self.underTest.getHaveMask(channel, infohash,channel)
560 self.assertEqual(mask2,havemask2)
562 #getting an have mask for an unexistent channel, infohash shall
565 self.underTest.getHaveMask(channel, _generateFakeInfohash(),peer_id)
566 self.assertTrue(mask1 is None)
568 #as it should happen for asking for an unexisting peer_id
569 mask1 = self.underTest.getHaveMask(channel, infohash,
570 TestMetadataDBHandler.aPermId)
571 self.assertTrue(mask1 is None)
573 def testUpdateHaveMask(self):
574 infohash = _generateFakeInfohash()
575 metadataDTO1 = MockMetadataDTO(["nld","spa"], infohash)
576 channel = metadataDTO1.channel
578 metadataDTO1.sign(metadataDTO1._keypair)
579 self.underTest.insertMetadata(metadataDTO1)
581 peer_id = TestMetadataDBHandler.anotherPermId
584 #adding an have mask to the db
586 self.underTest.insertHaveMask(channel, infohash, peer_id, havemask1)
588 mask = self.underTest.getHaveMask(channel, infohash,peer_id)
589 self.assertEqual(mask,havemask1)
591 #updating it to a different value
592 new_havemask = 0x1111ffff
593 self.underTest.updateHaveMask(channel, infohash, peer_id,
595 mask = self.underTest.getHaveMask(channel, infohash,peer_id)
596 self.assertEqual(mask,new_havemask)
598 #trying to update a non existing row should cause an error
599 # -- currently this doesn't happen
600 # implementing this beahaviour would slow down the db
602 # lambda: self.underTest.updateHaveMask(channel, infohash,
603 # channel, new_havemask)
604 # self.assertRaises(MetadataDBException, funcToTest)
607 def testDeleteHaveEntry(self):
608 infohash = _generateFakeInfohash()
609 metadataDTO1 = MockMetadataDTO(["nld","spa"], infohash)
610 channel = metadataDTO1.channel
612 metadataDTO1.sign(metadataDTO1._keypair)
613 self.underTest.insertMetadata(metadataDTO1)
615 peer_id = TestMetadataDBHandler.anotherPermId
618 #adding an have mask to the db
620 self.underTest.insertHaveMask(channel, infohash, peer_id, havemask1)
623 self.underTest.insertHaveMask(channel, infohash, channel, havemask2)
625 self.underTest.deleteHaveEntry(channel, infohash, peer_id)
627 mask = self.underTest.getHaveMask(channel, infohash, peer_id)
628 self.assertTrue(mask is None)
630 mask = self.underTest.getHaveMask(channel, infohash, channel)
631 self.assertEquals(havemask2,mask)
633 # deleting an entry that does not exist should leave
635 self.underTest.deleteHaveEntry(channel, infohash, peer_id)
637 mask = self.underTest.getHaveMask(channel, infohash, channel)
638 self.assertEquals(havemask2,mask)
641 def testGetAllHaveEntries(self):
643 infohash = _generateFakeInfohash()
644 metadataDTO1 = MockMetadataDTO(["nld","spa"], infohash)
645 channel = metadataDTO1.channel
647 metadataDTO1.sign(metadataDTO1._keypair)
648 self.underTest.insertMetadata(metadataDTO1)
650 peer_id = TestMetadataDBHandler.anotherPermId
653 #adding an have mask to the db
655 self.underTest.insertHaveMask(channel, infohash, peer_id, havemask1)
657 time.sleep(1) # otherwise they would have the same timestamp
659 self.underTest.insertHaveMask(channel, infohash, channel, havemask2)
661 d = self.underTest.getHaveEntries(channel, infohash)
663 #the second inserted havemask has to be returned first
666 self.assertEquals(channel, firstTuple[0])
667 self.assertEquals(havemask2,firstTuple[1])
668 self.assertTrue(firstTuple[2] is not None)
670 self.assertEquals(peer_id, d[1][0])
671 self.assertEquals(havemask1,d[1][1])
672 self.assertTrue(d[1][2] is not None)
676 def testCleanUpAllHave(self):
677 infohash1 = _generateFakeInfohash()
678 metadataDTO1 = MockMetadataDTO(["nld","spa"], infohash1)
679 channel1 = metadataDTO1.channel
681 metadataDTO1.sign(metadataDTO1._keypair)
682 self.underTest.insertMetadata(metadataDTO1)
684 infohash2 = _generateFakeInfohash()
685 metadataDTO2 = MockMetadataDTO(["nld","spa"], infohash2)
686 channel2 = metadataDTO2.channel
688 metadataDTO2.sign(metadataDTO2._keypair)
689 self.underTest.insertMetadata(metadataDTO2)
692 peer_id1 = TestMetadataDBHandler.anotherPermId
693 peer_id2 = TestMetadataDBHandler.aPermId
695 #inserting some data: 4 have maskes for each of the two channels with
697 # older then 1275295300
698 self.underTest.insertHaveMask(channel1, infohash1, channel1, 0x42, 1275295290)
699 self.underTest.insertHaveMask(channel1, infohash1, peer_id1, 0x42, 1275295291)
700 # newer then 1275295300
701 self.underTest.insertHaveMask(channel1, infohash1, peer_id2, 0x42, 1275295300)
702 self.underTest.insertHaveMask(channel1, infohash1, channel2, 0x42, 1275295301)
705 # older then 1275295300
706 self.underTest.insertHaveMask(channel2, infohash2, channel1, 0x42, 1275295290)
707 self.underTest.insertHaveMask(channel2, infohash2, peer_id1, 0x42, 1275295291)
709 # newer then 1275295300
710 self.underTest.insertHaveMask(channel2, infohash2, peer_id2, 0x42, 1275295300)
711 self.underTest.insertHaveMask(channel2, infohash2, channel2, 0x42, 1275295301)
713 self.underTest.cleanupOldHave(1275295300)
714 haveForEntry1 = self.underTest.getHaveEntries(channel1, infohash1)
715 expectedList1 = [(channel2,0x42,1275295301), (peer_id2, 0x42, 1275295300),
716 (channel1, 0x42, 1275295290)]
717 self.assertEquals(expectedList1, haveForEntry1)
719 haveForEntry2 = self.underTest.getHaveEntries(channel2, infohash2)
720 expectedList2 = [(channel2, 0x42, 1275295301),(peer_id2, 0x42, 1275295300)]
721 self.assertEquals(expectedList2,haveForEntry2)
728 def _generateFakeInfohash():
729 hasher = hashlib.sha1()
730 hasher.update(str(random.randint(0,65535)))
731 return hasher.digest()
733 def _computeSHA1(path):
734 hasher = hashlib.sha1()
735 with codecs.open(path, "rb", "utf-8") as file:
736 contents = file.read()
738 hasher.update(contents)
739 return hasher.digest()
744 class MockMetadataDTO(MetadataDTO):
747 def __init__(self, availableLangs, infohash = None):
749 self._keypair = generate_keypair()
751 self._permId = str(self._keypair.pub().get_der())
753 if infohash == None :
754 hasher = hashlib.sha1()
755 hasher.update(self._permId + "a")
756 infohash = hasher.digest()
758 self.channel = self._permId
759 self.infohash = infohash
760 self.description = u""
761 self.resetTimestamp()
764 hasher = hashlib.sha1() #fake checksums for subs
766 for lang in availableLangs:
767 hasher.update(lang + "123")
768 checksum = hasher.digest()
769 self.addSubtitle(SubtitleInfo(lang, None, checksum))
772 return unittest.TestLoader().loadTestsFromTestCase(TestMetadataDBHandler)
776 if __name__ == "__main__":
777 #import sys;sys.argv = ['', 'Test.testInitHandler']